I'm trying to attach the files yet again, renaming the files to .txt.
Sorry about the confusion.
On Thu, Jul 9, 2009 at 3:43 PM, Erik Bernhardsson<[email protected]> wrote:
> It seems like most of the attachments were somehow lost, so I'll try
> sending them again. There should be a CppClient.cpp, CppServer.cpp,
> TAsync.h, TAsioAsync.h and TAsioAsync.cpp
>
> On Thu, Jul 9, 2009 at 2:41 PM, Erik Bernhardsson<[email protected]> wrote:
>> Hi everyone,
>>
>> There's been some discussions about asynchronous C++ clients and
>> servers (THRIFT-1). Inspired by the Twisted server (THRIFT-148) by
>> Esteve Fernandez, as well as his C++ ideas (THRIFT-311), we (Erik
>> Bernhardsson, Mattias de Zalenski) decided to implement our own
>> version here at Spotify.
>>
>> Based on TFramedTransport and Boost.ASIO, this version is event-driven
>> and runs fully asynchronously in a single thread. There is no overhead
>> in terms of additional threads. This code is not thread-safe yet, in
>> the sense that multiple threads cannot simultaneously invoke calls on
>> the same client, though we're definitely interested in discussing how
>> to implement this.
>>
>> We have implemented an additional C++ stub generator (invoked by --gen
>> cpp:async) which adds two extra arguments to all client and server
>> methods. These are a callback and an errback, which are both Boost
>> closures. On the client side, these are passed when making a call, and
>> will be invoked when the call returns. On the server side, the server
>> is free to invoke these at any point in the future in order to respond
>> to the client. Calls and responses can be sent in any order. See code
>> example below.
>>
>> The stub generator is independent of the ASIO code and can easily be
>> plugged into other reactor loops/frameworks. It shares most of the
>> code with the standard cpp generator (and does generate the
>> synchronous stubs as well). We have attached a diff between them.
>>
>> This is still very much an early version, so feel welcome to submit
>> your comments. There are a few design decisions we still aren't 100%
>> sure of. Additionally, we haven't worked out how to handle low-level
>> failures such as disconnects. We are also working on a stress test and
>> some unit tests.
>>
>> We have compiled everything by adding the stub generator to
>> compiler/cpp/src/ and the client/server code to lib/cpp/src/async/. In
>> order for this to work, the corresponding Makefiles have to be
>> modified and everything has to be compiled with -lboost_system.
>>
>> Code example for the server follows below. We have modified the add
>> method so that it sleeps for num1+num2 seconds before returning, so
>> that there is an easy way of generating responses in a different order
>> than requests. We only include the most relevant parts below, but have
>> attached the full source code. Client code follows after the server
>> code.
>>
>> class CalculatorAsyncHandler : public CalculatorAsyncIf {
>> public:
>> CalculatorAsyncHandler() {}
>>
>> virtual void ping(boost::function<void (void)> callback,
>> boost::function<void (Calculator_ping_result)> errback) {
>> printf("ping()\n");
>> callback();
>> }
>>
>> virtual void add(const int32_t num1, const int32_t num2,
>> boost::function<void (int32_t)> callback, boost::function<void
>> (Calculator_add_result)> errback) {
>> printf("add(%d,%d)\n", num1, num2);
>> boost::shared_ptr<boost::asio::deadline_timer> timer(new
>> boost::asio::deadline_timer(io_service,
>> boost::posix_time::seconds(num1 + num2)));
>> timer->async_wait(boost::bind(&CalculatorAsyncHandler::wait_done,
>> this, num1 + num2, callback, timer));
>> }
>> virtual void wait_done(const int32_t sum, boost::function<void
>> (int32_t)> callback, boost::shared_ptr<boost::asio::deadline_timer>) {
>> callback(sum);
>> // timer will fall out of scope now and will be deleted
>> }
>>
>> virtual void calculate(const int32_t logid, const Work& w,
>> boost::function<void (int32_t)> callback, boost::function<void
>> (Calculator_calculate_result)> errback) {
>> (...)
>> case DIVIDE:
>> if (w.num2 == 0) {
>> InvalidOperation io;
>> io.what = w.op;
>> io.why = "Cannot divide by 0";
>> errback(calculate_ouch(io));
>> return;
>> }
>> val = w.num1 / w.num2;
>> break;
>> default:
>> errback(calculate_failure(std::string("Invalid Operation")));
>> return;
>> }
>> (...)
>> }
>> (... other methods, omitted for brevity)
>>
>> };
>>
>> int main(int argc, char **argv) {
>> boost::shared_ptr<protocol::TProtocolFactory> protocolFactory(new
>> protocol::TBinaryProtocolFactory());
>> boost::shared_ptr<CalculatorAsyncHandler> handler(new
>> CalculatorAsyncHandler());
>> boost::shared_ptr<TProcessor> processor(new
>> CalculatorAsyncProcessor(handler));
>>
>> boost::shared_ptr<apache::thrift::async::TAsioServer> server(
>> new
>> apache::thrift::async::TAsioServer(
>>
>> io_service,
>>
>> 9090,
>>
>> protocolFactory,
>>
>> protocolFactory,
>>
>> processor));
>>
>> server->start(); // Nonblocking
>> io_service.run(); // Blocking
>>
>> return 0;
>> }
>>
>> Code for the client:
>>
>> void pingback() {
>> printf("ping()\n");
>> }
>>
>> void pingerr(tutorial::Calculator_ping_result result) {
>> printf("Exception caught\n");
>> }
>>
>> void addback(int32_t a, int32_t b, int32_t sum) {
>> printf("%d+%d=%d\n", a, b, sum);
>> }
>>
>> void adderr(tutorial::Calculator_add_result result) {
>> printf("Exception caught\n");
>> }
>>
>> void connected(boost::shared_ptr<tutorial::CalculatorAsyncClient> client) {
>> client->ping(pingback, pingerr);
>>
>> client->add(2, 3, boost::bind(&addback, 2, 3, _1), &adderr); //
>> will return after 5s
>> client->add(1, 2, boost::bind(&addback, 1, 2, _1), &adderr); //
>> will return after 3s
>> client->add(1, 1, boost::bind(&addback, 1, 1, _1), &adderr); //
>> will return after 2s
>> }
>>
>> int main(int argc, char* argv[])
>> {
>> try
>> {
>> boost::asio::io_service io_service;
>>
>> boost::shared_ptr<protocol::TProtocolFactory> protocolFactory(new
>> protocol::TBinaryProtocolFactory());
>>
>> boost::shared_ptr<async::TAsioClient> client (
>> new async::TAsioClient(
>>
>> io_service,
>>
>> protocolFactory,
>>
>> protocolFactory));
>>
>> client->connect("localhost", 9090, connected); // the type of the
>> client (tutorial::CalculatorAsyncClient) is inferred from the
>> signature of connected
>>
>> io_service.run();
>> }
>> catch (std::exception& e)
>> {
>> std::cout << "Exception: " << e.what() << "\n";
>> }
>>
>> return 0;
>> }
>>
>> Regards,
>> Erik Bernhardsson,
>> Mattias de Zalenski
>> Spotify - http://www.spotify.com/
>>
>
#include <iostream>
#include <istream>
#include <ostream>
#include <sstream>
#include <string>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <async/TAsioAsync.h>
#include <protocol/TBinaryProtocol.h>
#include "Calculator.h"
using namespace apache::thrift;
void pingback() {
printf("ping()\n");
}
void pingerr(tutorial::Calculator_ping_result result) {
printf("Exception caught\n");
}
void addback(int32_t a, int32_t b, int32_t sum) {
printf("%d+%d=%d\n", a, b, sum);
}
void adderr(tutorial::Calculator_add_result result) {
printf("Exception caught\n");
}
void connected(boost::shared_ptr<tutorial::CalculatorAsyncClient> client) {
client->ping(pingback, pingerr);
client->add(2, 3, boost::bind(&addback, 2, 3, _1), &adderr); // will return
after 5s
client->add(1, 2, boost::bind(&addback, 1, 2, _1), &adderr); // will return
after 3s
client->add(1, 1, boost::bind(&addback, 1, 1, _1), &adderr); // will return
after 2s
}
int main(int argc, char* argv[])
{
try
{
boost::asio::io_service io_service;
boost::shared_ptr<protocol::TProtocolFactory> protocolFactory(new
protocol::TBinaryProtocolFactory());
boost::shared_ptr<async::TAsioClient> client (
new async::TAsioClient(
io_service,
protocolFactory,
protocolFactory));
client->connect("localhost", 9090, connected);
io_service.run();
}
catch (std::exception& e)
{
std::cout << "Exception: " << e.what() << "\n";
}
return 0;
}
#include <protocol/TBinaryProtocol.h>
#include <iostream>
#include <stdexcept>
#include <sstream>
#include <time.h>
#include "../gen-cpp/Calculator.h"
#include <async/TAsioAsync.h>
using namespace apache::thrift;
using namespace tutorial;
using namespace shared;
boost::asio::io_service io_service; // todo: use singleton
class CalculatorAsyncHandler : public CalculatorAsyncIf {
public:
CalculatorAsyncHandler() {}
virtual void ping(boost::function<void (void)> callback, boost::function<void
(Calculator_ping_result)> errback) {
printf("ping()\n");
callback();
}
virtual void add(const int32_t num1, const int32_t num2, boost::function<void
(int32_t)> callback, boost::function<void (Calculator_add_result)> errback) {
printf("add(%d,%d)\n", num1, num2);
boost::shared_ptr<boost::asio::deadline_timer> timer(new
boost::asio::deadline_timer(io_service, boost::posix_time::seconds(num1 +
num2)));
timer->async_wait(boost::bind(&CalculatorAsyncHandler::wait_done, this,
num1 + num2, callback, timer));
}
virtual void wait_done(const int32_t sum, boost::function<void (int32_t)>
callback, boost::shared_ptr<boost::asio::deadline_timer>) {
callback(sum);
// timer will fall out of scope now and will be deleted
}
virtual void calculate(const int32_t logid, const Work& w,
boost::function<void (int32_t)> callback, boost::function<void
(Calculator_calculate_result)> errback) {
printf("calculate(%d,{%d,%d,%d})\n", logid, w.op, w.num1, w.num2);
int32_t val;
switch (w.op) {
case ADD:
val = w.num1 + w.num2;
break;
case SUBTRACT:
val = w.num1 - w.num2;
break;
case MULTIPLY:
val = w.num1 * w.num2;
break;
case DIVIDE:
if (w.num2 == 0) {
InvalidOperation io;
io.what = w.op;
io.why = "Cannot divide by 0";
errback(calculate_ouch(io));
return;
}
val = w.num1 / w.num2;
break;
default:
errback(calculate_failure(std::string("Invalid Operation")));
return;
}
SharedStruct ss;
ss.key = logid;
char buffer[12];
snprintf(buffer, sizeof(buffer), "%d", val);
ss.value = buffer;
log[logid] = ss;
callback(val);
}
virtual void getStruct(const int32_t key, boost::function<void
(SharedStruct)> callback, boost::function<void
(SharedService_getStruct_result)> errback) {
printf("getStruct(%d)\n", key);
callback(log[key]);
}
virtual void zip() {
printf("zip()\n");
}
protected:
std::map<int32_t, SharedStruct> log;
};
int main(int argc, char **argv) {
boost::shared_ptr<protocol::TProtocolFactory> protocolFactory(new
protocol::TBinaryProtocolFactory());
boost::shared_ptr<CalculatorAsyncHandler> handler(new
CalculatorAsyncHandler());
boost::shared_ptr<TProcessor> processor(new
CalculatorAsyncProcessor(handler));
boost::shared_ptr<apache::thrift::async::TAsioServer> server(
new
apache::thrift::async::TAsioServer(
io_service,
9090,
protocolFactory,
protocolFactory,
processor));
server->start(); // Nonblocking
io_service.run(); // Blocking
return 0;
}
#include "TAsioAsync.h"
// #define DEBUG_PRINT_PACKETS
namespace apache { namespace thrift { namespace async {
// namespace { // begin anonymous namespace
struct message {
int len;
boost::shared_array<uint8_t> buf;
};
class TAsioOutputTransport : public TAsyncOutputTransport {
// This class should not be used outside TAsioAsync.cpp, consider moving it
there.
public:
TAsioOutputTransport(boost::shared_ptr<boost::asio::ip::tcp::socket> socket) :
socket_(socket),
isCurrentlyWriting_(false)
{}
void write(const uint8_t* buf, uint32_t len) { // from transport::TTransport
outData_.insert(outData_.end(), &buf[0], &buf[len]);
}
void flush() { // from transport::TTransport
uint32_t outDataSize = outData_.size();
message m;
m.len = outDataSize + 4;
m.buf = boost::shared_array<uint8_t>(new uint8_t[outDataSize + 4]);
// Put data in the outbuffer
m.buf[0] = (outDataSize >> 24) & 0xff;
m.buf[1] = (outDataSize >> 16) & 0xff;
m.buf[2] = (outDataSize >> 8) & 0xff;
m.buf[3] = (outDataSize >> 0) & 0xff;
std::copy(&outData_[0], &outData_[outDataSize], &m.buf[4]);
#ifdef DEBUG_PRINT_PACKETS
printf("output: ");
for (int i = 0; i < outDataSize; i++)
printf("%02x ", outData_[i]);
printf("\n");
#endif
// Delete internal buffer
outData_.clear();
// Queue into buffer
outQueue_.push_back(m);
// If not currently writing, start
if (!isCurrentlyWriting_) {
isCurrentlyWriting_ = true;
writeNext();
}
}
void writeNext() {
if (!outQueue_.size()) {
isCurrentlyWriting_ = false;
return;
}
// By passing m.buf as an argument, we are guaranteed that it will remain
in memory
message m = outQueue_.front();
outQueue_.pop_front();
boost::asio::async_write(*socket_, boost::asio::buffer(m.buf.get(), m.len),
boost::bind(&TAsioOutputTransport::handleWriteFinished, this,
boost::asio::placeholders::error, m));
}
void handleWriteFinished(const boost::system::error_code& error, const
message& m) {
// Temporary memory buffer will be deleted when this scope closes
if (error) {
std::cout << "An error occurred when writing message" << std::endl;
return;
}
writeNext();
}
private:
boost::shared_ptr<boost::asio::ip::tcp::socket> socket_;
std::vector<uint8_t> outData_;
bool isCurrentlyWriting_;
std::deque<message> outQueue_;
};
class TAsioInputTransport {
public:
TAsioInputTransport(boost::shared_ptr<boost::asio::ip::tcp::socket> socket) :
socket_(socket)
{}
void read(boost::function<void (const uint8_t* buf, uint32_t len)> callback) {
boost::asio::async_read(*socket_, boost::asio::buffer(length_, 4),
boost::bind(&TAsioInputTransport::handleReadLength, this/*shared_from_this()*/,
_1, callback));
}
private:
void handleReadLength(const boost::system::error_code& error,
boost::function<void (const uint8_t* buf, uint32_t len)> callback) {
if (error) {
std::cout << "An error occurred when reading length header" << std::endl;
return;
// Note on memory: TAsioServerClient is actually owned by a shared_ptr
from the callback, so it will be automatically destructed
}
uint32_t bufLen =
((length_[0] & 0xff) << 24) |
((length_[1] & 0xff) << 16) |
((length_[2] & 0xff) << 8) |
((length_[3] & 0xff) << 0);
inData_.resize(bufLen);
boost::asio::async_read(*socket_, boost::asio::buffer(inData_, bufLen),
boost::bind(&TAsioInputTransport::handleReadMessage,
this/*shared_from_this()*/, _1, callback));
}
void handleReadMessage(const boost::system::error_code& error,
boost::function<void (const uint8_t* buf, uint32_t len)> callback) {
if (error) {
std::cout << "An error occurred when reading frame" << std::endl;
return;
// See comment about memory in handleReadLength
}
callback(&inData_[0], inData_.size());
inData_.clear();
}
uint8_t length_[4];
std::vector<uint8_t> inData_;
boost::shared_ptr<boost::asio::ip::tcp::socket> socket_;
};
class TAsioServerClient : public
boost::enable_shared_from_this<TAsioServerClient> {
// This class should not be used outside TAsioAsync.cpp, consider moving it
there.
public:
TAsioServerClient(boost::shared_ptr<boost::asio::ip::tcp::socket> socket,
boost::shared_ptr<protocol::TProtocolFactory>
iProtocolFactory,
boost::shared_ptr<protocol::TProtocolFactory>
oProtocolFactory,
boost::shared_ptr<TProcessor> processor) :
iProtocolFactory_(iProtocolFactory),
oProtocolFactory_(oProtocolFactory),
processor_(processor),
iTransport_(socket),
oTransport_(new TAsioOutputTransport(socket))
{ }
void start() { read(); }
~TAsioServerClient() { std::cout << "Disconnecting client" << std::endl; }
private:
void read() {
iTransport_.read(boost::bind(&TAsioServerClient::handleMessage, /* this
*/shared_from_this(), _1, _2)); // todo: errback
}
void handleMessage(const uint8_t* buf, uint32_t len) {
// todo: add error handling
boost::shared_ptr<transport::TTransport> iTransport(new
transport::TMemoryBuffer(const_cast<uint8_t*>(buf), len)); // TMemoryBuffer
guarantees it will not touch
boost::shared_ptr<protocol::TProtocol>
iProtocol(iProtocolFactory_->getProtocol(iTransport));
boost::shared_ptr<protocol::TProtocol>
oProtocol(oProtocolFactory_->getProtocol(oTransport_));
#ifdef DEBUG_PRINT_PACKETS
printf("input: ");
for (int i = 0; i < len; i++)
printf("%02x ", buf[i]);
printf("\n");
#endif
processor_->process(iProtocol, oProtocol);
read();
}
boost::shared_ptr<boost::asio::ip::tcp::socket> socket_;
boost::shared_ptr<protocol::TProtocolFactory> iProtocolFactory_;
boost::shared_ptr<protocol::TProtocolFactory> oProtocolFactory_;
boost::shared_ptr<TProcessor> processor_;
TAsioInputTransport iTransport_;
boost::shared_ptr<TAsioOutputTransport> oTransport_;
};
// } // end anonymous namespace
void TAsioServer::serve() {
socket_.reset(new boost::asio::ip::tcp::socket(io_service_)); // socket of
the next client
acceptor_.async_accept(*socket_, boost::bind(&TAsioServer::handleAccept,
shared_from_this(), boost::asio::placeholders::error));
}
void TAsioServer::handleAccept(const boost::system::error_code& error) {
if (error) {
std::cout << "An error occurred while accepting" << std::endl;
return;
// We will stop accepting at this point
// Call errback?
}
boost::shared_ptr<TAsioServerClient> client(new TAsioServerClient(socket_,
iProtocolFactory_, oProtocolFactory_, processor_));
client->start();
serve(); // next client plz
}
boost::shared_ptr<TAsyncOutputTransport> TAsioClient::getOutputTransport() {
return boost::shared_ptr<TAsyncOutputTransport>(new
TAsioOutputTransport(socket_));
}
void TAsioClient::startReadingInput(boost::shared_ptr<TProcessor> processor) {
boost::shared_ptr<TAsioInputTransport> iTransport(new
TAsioInputTransport(socket_));
// Start read loop. iTransport will stay in memory as long as the loop is
still running, since it is being referred
// to from the shared_ptr in the callback.
iTransport->read(boost::bind(&TAsioClient::handleMessage, shared_from_this(),
_1, _2, iTransport, processor)); // todo: errback
}
void TAsioClient::handleMessage(const uint8_t* buf, uint32_t len,
boost::shared_ptr<TAsioInputTransport> iTransport,
boost::shared_ptr<TProcessor> processor) {
// I'm binding iTransport and processor so that these will stay in scope as
long as data is read. Not too happy about
// this, but is there a better way of doing it?
boost::shared_ptr<transport::TTransport> iTransport2(new
transport::TMemoryBuffer(const_cast<uint8_t*>(buf), len));
boost::shared_ptr<protocol::TProtocol>
iProtocol(iProtocolFactory_->getProtocol(iTransport2));
processor->process(iProtocol);
// Read next message
iTransport->read(boost::bind(&TAsioClient::handleMessage, shared_from_this(),
_1, _2, iTransport, processor)); // todo: errback
}
void TAsioClient::connectSimple(const std::string& host, short port,
boost::function<void (void)> callback) {
// Start an asynchronous resolve to translate the server and service names
// into a list of endpoints.
std::stringstream ss;
ss << port;
boost::asio::ip::tcp::resolver::query query(host, ss.str());
resolver_.async_resolve(query,
boost::bind(&TAsioClient::handleResolve, this,
boost::asio::placeholders::error,
boost::asio::placeholders::iterator,
callback));
}
void TAsioClient::handleResolve(const boost::system::error_code& err,
boost::asio::ip::tcp::resolver::iterator
endpoint_iterator,
boost::function<void (void)> callback) {
if (!err) {
// Attempt a connection to the first endpoint in the list. Each endpoint
// will be tried until we successfully establish a connection.
boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
socket_->async_connect(endpoint,
boost::bind(&TAsioClient::handleConnect, this,
boost::asio::placeholders::error,
++endpoint_iterator,
callback));
} else {
std::cout << "Error: " << err.message() << std::endl;
}
}
void TAsioClient::handleConnect(const boost::system::error_code& err,
boost::asio::ip::tcp::resolver::iterator
endpoint_iterator,
boost::function<void (void)> callback) {
if (!err) {
// Everything is connected, now let's build the client and etc...
callback();
} else if (endpoint_iterator != boost::asio::ip::tcp::resolver::iterator()) {
// The connection failed. Try the next endpoint in the list.
socket_->close();
boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
socket_->async_connect(endpoint,
boost::bind(&TAsioClient::handleConnect, this,
boost::asio::placeholders::error,
++endpoint_iterator,
callback));
} else {
std::cout << "Error: " << err.message() << std::endl;
// Todo: call user-provided errback
}
}
} } }
#ifndef _TASIOASYNC_H_
#define _TASIOASYNC_H_
#include <async/TAsync.h>
#include <transport/TBufferTransports.h>
#include <boost/enable_shared_from_this.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/shared_array.hpp>
#include <vector>
#include <deque>
#include <protocol/TProtocol.h>
#include <TProcessor.h>
namespace apache { namespace thrift { namespace async {
class TAsioInputTransport; // fwd declaration
class TAsioServer : public boost::enable_shared_from_this<TAsioServer> {
// Class for a Thrift server.
// The server must be instantiated within a shared_ptr or else it will throw
tr1::weak_ptr
public:
TAsioServer(boost::asio::io_service& io_service,
short port,
boost::shared_ptr<protocol::TProtocolFactory> iProtocolFactory,
boost::shared_ptr<protocol::TProtocolFactory> oProtocolFactory,
boost::shared_ptr<TProcessor> processor) :
io_service_(io_service),
acceptor_(io_service,
boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)),
iProtocolFactory_(iProtocolFactory),
oProtocolFactory_(oProtocolFactory),
processor_(processor)
{}
void start() { serve(); }
private:
void serve();
void handleAccept(const boost::system::error_code& error);
boost::asio::io_service& io_service_;
boost::asio::ip::tcp::acceptor acceptor_;
boost::shared_ptr<protocol::TProtocolFactory> iProtocolFactory_;
boost::shared_ptr<protocol::TProtocolFactory> oProtocolFactory_;
boost::shared_ptr<TProcessor> processor_;
boost::shared_ptr<boost::asio::ip::tcp::socket> socket_;
};
class TAsioClient : public boost::enable_shared_from_this<TAsioClient> {
// Class for client connections
// This class must be instantiated within a boost::shared_ptr or else it will
throw a
// tr1::weak_ptr upon connect.
// Bad: TAsioClient t(...)
// Good: boost::shared_ptr<TAsioClient> t(...)
public:
TAsioClient(boost::asio::io_service& io_service,
boost::shared_ptr<protocol::TProtocolFactory> iProtocolFactory,
boost::shared_ptr<protocol::TProtocolFactory> oProtocolFactory
) :
resolver_(io_service),
socket_(new boost::asio::ip::tcp::socket(io_service)),
iProtocolFactory_(iProtocolFactory),
oProtocolFactory_(oProtocolFactory)
{}
template<typename T>
void connect(const std::string& host, short port, void
(&callback)(boost::shared_ptr<T> client)) {
// Constructor of boost::function seems to be unable to do an automatic
cast of raw function pointers, so we provide
// this convenience method wrapping connect(...).
boost::function<void (boost::shared_ptr<T> client)> callback2(callback);
connect(host, port, callback2);
}
template<typename T>
void connect(const std::string& host, short port, boost::function<void
(boost::shared_ptr<T> client)> callback) {
// T is inferred from the type of the callback. I'm not too sure if this is
apparent.
// This function untemplatizes by binding a new callback to build<T> and
passing it on to the internal connect code.
// The reason we need to use templates at all is that we have to construct
arbitrary processor objects within the
// scope of TAsioClient. We couldn't let the user provide us with an
processor object since the processor is
// dependent on internals of TAsioClient (the output transport), and
internals of TAsioClient (the input transport)
// is dependent on the processor. See how build(...) works.
boost::function<void (void)> finalCallback =
boost::bind(&TAsioClient::build<T>, this, callback);
connectSimple(host, port, finalCallback);
}
private:
template<typename T>
void build(boost::function<void (boost::shared_ptr<T> client)> callback) {
// We are connected, let's build the client.
boost::shared_ptr<T> client(new T(getOutputTransport(), oProtocolFactory_));
boost::shared_ptr<TProcessor> processor(client); // Simply reinterpreted as
a base class (T is derived from TProcessor)
startReadingInput(processor);
// Call user-provided callback to signal success.
callback(client);
}
boost::shared_ptr<TAsyncOutputTransport> getOutputTransport();
void startReadingInput(boost::shared_ptr<TProcessor> processor);
void handleMessage(const uint8_t* buf, uint32_t len,
boost::shared_ptr<TAsioInputTransport> iTransport,
boost::shared_ptr<TProcessor> processor);
void connectSimple(const std::string& host, short port, boost::function<void
(void)> callback);
void handleResolve(const boost::system::error_code& err,
boost::asio::ip::tcp::resolver::iterator endpoint_iterator,
boost::function<void (void)> callback);
void handleConnect(const boost::system::error_code& err,
boost::asio::ip::tcp::resolver::iterator endpoint_iterator,
boost::function<void (void)> callback);
boost::asio::ip::tcp::resolver resolver_;
boost::shared_ptr<boost::asio::ip::tcp::socket> socket_;
boost::shared_ptr<protocol::TProtocolFactory> iProtocolFactory_;
boost::shared_ptr<protocol::TProtocolFactory> oProtocolFactory_;
};
} } }
#endif
#ifndef _TASYNC_H_
#define _TASYNC_H_ 1
#include <transport/TTransport.h>
namespace apache { namespace thrift { namespace async {
class TAsyncOutputTransport : public transport::TTransport {
// Asynchronous transport.
//
// The asynchronousity gives us an interesting asymmetry. We are
// free to write to an underlying socket at any time, but cannot
// read from it in any way. This class is simply meant to be a
// superclass of unidirectional nonblocking out transports,
// disallowing the read method.
public:
virtual void write(const uint8_t* /* buf */, uint32_t /* len */) = 0;
virtual void flush() = 0;
virtual uint32_t read(uint8_t* /* buf */, uint32_t /* len */) {
// This method should not be overridden!!
throw
transport::TTransportException(transport::TTransportException::NOT_OPEN,
"Asynchronous transports cannot read.");
}
virtual ~TAsyncOutputTransport() {}
};
// More stuff should go here, so far not enough generalization has been done
} } }
#endif