初探Thrift客户端异步模式

背景

在某项目中,我们广泛使用thrift作为我们内部接口调用的RPC框架,而且基本上都是使用多线程请求等待应答的同步模式。但是在一些情况下(例如大数据量同步),如果可以使用异步模式,可以优化程序结构和提高模块性能。

分析

thrift有提供一套异步模式供我们使用,首先我们像往常一样编写thrift协议文件。

namespace cpp uctest

service Test{
string pingpong(1: string data);
}

不同的是,需要加入cpp:cob_type来生成代码。生成的代码文件外表与之前的基本相同,但在Test.h和Test.cpp中内涵就丰富了,增加了异步客户端和异步服务器使用的类。

异步客户端代码有TestCobClient以及它继承的虚拟类。

class TestCobClient : virtual public TestCobClIf {
public:
TestCobClient(boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel, ::apache::thrift::protocol::TProtocolFactory* protocolFactory) :
channel_(channel),
itrans_(new ::apache::thrift::transport::TMemoryBuffer()),
otrans_(new ::apache::thrift::transport::TMemoryBuffer()),
piprot_(protocolFactory->getProtocol(itrans_)),
poprot_(protocolFactory->getProtocol(otrans_)) {
iprot_ = piprot_.get();
oprot_ = poprot_.get();
}
boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> getChannel() {
return channel_;
}
virtual void completed__(bool /* success */) {}
void pingpong(std::tr1::function<void(TestCobClient* client)> cob, const std::string& data);
void send_pingpong(const std::string& data);
void recv_pingpong(std::string& _return);
protected:
boost::shared_ptr< ::apache::thrift::async::TAsyncChannel> channel_;
boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> itrans_;
boost::shared_ptr< ::apache::thrift::transport::TMemoryBuffer> otrans_;
boost::shared_ptr< ::apache::thrift::protocol::TProtocol> piprot_;
boost::shared_ptr< ::apache::thrift::protocol::TProtocol> poprot_;
::apache::thrift::protocol::TProtocol* iprot_;
::apache::thrift::protocol::TProtocol* oprot_;
};

从源文件上看,异步功能的核心在于TAsyncChannel,它是用于回调函数注册和异步收发数据。send_pingpong和 recv_pingpong分别向缓冲区(TMemoryBuffer)写入和读取数据。而pingpong则通过调用TAsyncChannel的 sendAndRecvMessage接口注册回调函数。

TAsyncChannel作为接口类定义了三个接口函数。

/**
* Send a message over the channel.
*/
virtual void sendMessage(const VoidCallback& cob,
apache::thrift::transport::TMemoryBuffer* message) = 0;

/**
* Receive a message from the channel.
*/
virtual void recvMessage(const VoidCallback& cob,
apache::thrift::transport::TMemoryBuffer* message) = 0;

/**
* Send a message over the channel and receive a response.
*/
virtual void sendAndRecvMessage(const VoidCallback& cob,
apache::thrift::transport::TMemoryBuffer* sendBuf,
apache::thrift::transport::TMemoryBuffer* recvBuf);

TAsyncChannel目前为止(0.9.1版本)只有一种客户端实现类TEvhttpClientChannel,顾名思义它是基于 libevent和http协议实现的。 使用libevent的方法就不在这里累赘了,主要看下sendAndRecvMessage的实现。

void TEvhttpClientChannel::sendAndRecvMessage(
const VoidCallback& cob,
apache::thrift::transport::TMemoryBuffer* sendBuf,
apache::thrift::transport::TMemoryBuffer* recvBuf) {
cob_ = cob;
recvBuf_ = recvBuf;

struct evhttp_request* req = evhttp_request_new(response, this);

uint8_t* obuf;
uint32_t sz;
sendBuf->getBuffer(&obuf, &sz);
rv = evbuffer_add(req->output_buffer, obuf, sz);

rv = evhttp_make_request(conn_, req, EVHTTP_REQ_POST, path_.c_str());

}

通过向evhttp_request中注册相应回调函数respones和传入回调实例本身的指针,在相应时候回调函数中调用TEvhttpClientChannel实例的finish接口完成数据接收,并写入缓存中,供应用层获取使用。

实验

有文章认为:使用Thrift异步客户端需要配合使用对应异步服务器才能工作。如果这个观点成立,我们改造目前程序代码的成本就会很高,而且可能会丧失使用Thrift的便捷性。

通过上述代码的阅读,发现唯一的局限性是服务器必须使用Http的传输层,此外只需要协议层保持一致,并不需要一定使用异步服务器。

下面我们通过简单代码基于上文“uctest”的协议实现了一个异步客户端和同步服务器。

服务器代码:

class TestHandler : virtual public TestIf {
public:
TestHandler() { }

void pingpong(std::string& _return, const std::string& data) {
if(data=="ping")
printf("[%d] recv ping\n", (int)time(NULL));
_return = "pong";
printf("[%d] send pong\n", (int)time(NULL));
}
};

int main(int argc, char **argv) {
int port = 9091;

shared_ptr<TestHandler> handler(new TestHandler());
shared_ptr<TProcessor> processor(new TestProcessor(handler));
shared_ptr<TServerTransport> serverTransport(new TServerSocket(port));
shared_ptr<TTransportFactory> transportFactory(new THttpServerTransportFactory());
shared_ptr<TProtocolFactory> protocolFactory(new TBinaryProtocolFactory());

TSimpleServer server(processor, serverTransport, transportFactory, protocolFactory);
server.serve();
return 0;
}

客户端代码:

void my_ping_pong(TestCobClient* client)
{
std::string pong;
client->recv_pingpong(pong);
printf("[%d] recv:%s\n", (int)time(NULL), pong.c_str());
}

int main(int argc, char **argv) {
try{
event_base* evbase = event_base_new();
boost::shared_ptr<TAsyncChannel> channel(new TEvhttpClientChannel("127.0.0.1", "/", "127.0.0.1", 9091, evbase));
TestCobClient client(channel, new TBinaryProtocolFactory());
function<void(TestCobClient* client)> cob = bind(&my_ping_pong,_1);

client.pingpong(cob, "ping");
printf("[%d] ping\n", (int)time(NULL));

for(int i=0;i<5;i++)
{
printf("[%d] running...\n", (int)time(NULL));
sleep(1);
}

event_base_dispatch(evbase);
event_base_free(evbase);
}

catch(...)
{
printf("exception");
return 1;
}

return 0;
};

运行结果如下:

[tangzheng@dev10 server]$ ./demo.serv
[1388639886] recv ping
[1388639886] send pong

[tangzheng@dev10 client]$ ./demo.client
[1388639881] ping
[1388639881] running…
[1388639882] running…
[1388639883] running…
[1388639884] running…
[1388639885] running…
[1388639886] recv:pong

达到异步客户端预期的效果。

结果

初步掌握了thrift异步客户端的用法,我们即可在需要的时候使用,或者优化当前的程序。 由于这种提供的异步模式必须基于HTTP传输层,使用有一定的局限性。之后将会继续研究是否可以在TAsyncChannel的基础上,开发支持其他传输层的接口。

相关文章

初探Thrift客户端异步模式,首发于博客 - 伯乐在线

X