I'm trying to attach the files yet again, renaming the files to .txt.
Sorry about the confusion.

On Thu, Jul 9, 2009 at 3:43 PM, Erik Bernhardsson<[email protected]> wrote:
> It seems like most of the attachments were somehow lost, so I'll try
> sending them again. There should  be a CppClient.cpp, CppServer.cpp,
> TAsync.h, TAsioAsync.h and TAsioAsync.cpp
>
> On Thu, Jul 9, 2009 at 2:41 PM, Erik Bernhardsson<[email protected]> wrote:
>> Hi everyone,
>>
>> There's been some discussions about asynchronous C++ clients and
>> servers (THRIFT-1). Inspired by the Twisted server (THRIFT-148) by
>> Esteve Fernandez, as well as his C++ ideas (THRIFT-311), we (Erik
>> Bernhardsson, Mattias de Zalenski) decided to implement our own
>> version here at Spotify.
>>
>> Based on TFramedTransport and Boost.ASIO, this version is event-driven
>> and runs fully asynchronously in a single thread. There is no overhead
>> in terms of additional threads. This code is not thread-safe yet, in
>> the sense that multiple threads cannot simultaneously invoke calls on
>> the same client, though we're definitely interested in discussing how
>> to implement this.
>>
>> We have implemented an additional C++ stub generator (invoked by --gen
>> cpp:async) which adds two extra arguments to all client and server
>> methods. These are a callback and an errback, which are both Boost
>> closures. On the client side, these are passed when making a call, and
>> will be invoked when the call returns. On the server side, the server
>> is free to invoke these at any point in the future in order to respond
>> to the client. Calls and responses can be sent in any order. See code
>> example below.
>>
>> The stub generator is independent of the ASIO code and can easily be
>> plugged into other reactor loops/frameworks. It shares most of the
>> code with the standard cpp generator (and does generate the
>> synchronous stubs as well). We have attached a diff between them.
>>
>> This is still very much an early version, so feel welcome to submit
>> your comments. There are a few design decisions we still aren't 100%
>> sure of. Additionally, we haven't worked out how to handle low-level
>> failures such as disconnects. We are also working on a stress test and
>> some unit tests.
>>
>> We have compiled everything by adding the stub generator to
>> compiler/cpp/src/ and the client/server code to lib/cpp/src/async/. In
>> order for this to work, the corresponding Makefiles have to be
>> modified and everything has to be compiled with -lboost_system.
>>
>> Code example for the server follows below. We have modified the add
>> method so that it sleeps for num1+num2 seconds before returning, so
>> that there is an easy way of generating responses in a different order
>> than requests. We only include the most relevant parts below, but have
>> attached the full source code. Client code follows after the server
>> code.
>>
>> class CalculatorAsyncHandler : public CalculatorAsyncIf {
>>  public:
>>  CalculatorAsyncHandler() {}
>>
>>  virtual void ping(boost::function<void (void)> callback,
>> boost::function<void (Calculator_ping_result)> errback) {
>>    printf("ping()\n");
>>    callback();
>>  }
>>
>>  virtual void add(const int32_t num1, const int32_t num2,
>> boost::function<void (int32_t)> callback, boost::function<void
>> (Calculator_add_result)> errback) {
>>    printf("add(%d,%d)\n", num1, num2);
>>    boost::shared_ptr<boost::asio::deadline_timer> timer(new
>> boost::asio::deadline_timer(io_service,
>> boost::posix_time::seconds(num1 + num2)));
>>    timer->async_wait(boost::bind(&CalculatorAsyncHandler::wait_done,
>> this, num1 + num2, callback, timer));
>>  }
>>  virtual void wait_done(const int32_t sum, boost::function<void
>> (int32_t)> callback, boost::shared_ptr<boost::asio::deadline_timer>) {
>>    callback(sum);
>>    // timer will fall out of scope now and will be deleted
>>  }
>>
>>  virtual void calculate(const int32_t logid, const Work& w,
>> boost::function<void (int32_t)> callback, boost::function<void
>> (Calculator_calculate_result)> errback) {
>> (...)
>>    case DIVIDE:
>>      if (w.num2 == 0) {
>>        InvalidOperation io;
>>        io.what = w.op;
>>        io.why = "Cannot divide by 0";
>>        errback(calculate_ouch(io));
>>        return;
>>      }
>>      val = w.num1 / w.num2;
>>      break;
>>    default:
>>      errback(calculate_failure(std::string("Invalid Operation")));
>>      return;
>>    }
>>  (...)
>> }
>> (... other methods, omitted for brevity)
>>
>> };
>>
>> int main(int argc, char **argv) {
>>  boost::shared_ptr<protocol::TProtocolFactory> protocolFactory(new
>> protocol::TBinaryProtocolFactory());
>>  boost::shared_ptr<CalculatorAsyncHandler> handler(new
>> CalculatorAsyncHandler());
>>  boost::shared_ptr<TProcessor> processor(new
>> CalculatorAsyncProcessor(handler));
>>
>>  boost::shared_ptr<apache::thrift::async::TAsioServer> server(
>>                                                               new
>> apache::thrift::async::TAsioServer(
>>
>>                               io_service,
>>
>>                               9090,
>>
>>                               protocolFactory,
>>
>>                               protocolFactory,
>>
>>                               processor));
>>
>>  server->start(); // Nonblocking
>>  io_service.run(); // Blocking
>>
>>  return 0;
>> }
>>
>> Code for the client:
>>
>> void pingback() {
>>  printf("ping()\n");
>> }
>>
>> void pingerr(tutorial::Calculator_ping_result result) {
>>  printf("Exception caught\n");
>> }
>>
>> void addback(int32_t a, int32_t b, int32_t sum) {
>>  printf("%d+%d=%d\n", a, b, sum);
>> }
>>
>> void adderr(tutorial::Calculator_add_result result) {
>>  printf("Exception caught\n");
>> }
>>
>> void connected(boost::shared_ptr<tutorial::CalculatorAsyncClient> client) {
>>  client->ping(pingback, pingerr);
>>
>>  client->add(2, 3, boost::bind(&addback, 2, 3, _1), &adderr);  //
>> will return after 5s
>>  client->add(1, 2, boost::bind(&addback, 1, 2, _1), &adderr);  //
>> will return after 3s
>>  client->add(1, 1, boost::bind(&addback, 1, 1, _1), &adderr);  //
>> will return after 2s
>> }
>>
>> int main(int argc, char* argv[])
>> {
>>  try
>>  {
>>    boost::asio::io_service io_service;
>>
>>    boost::shared_ptr<protocol::TProtocolFactory> protocolFactory(new
>> protocol::TBinaryProtocolFactory());
>>
>>    boost::shared_ptr<async::TAsioClient> client (
>>                                                  new async::TAsioClient(
>>
>>  io_service,
>>
>>  protocolFactory,
>>
>>  protocolFactory));
>>
>>    client->connect("localhost", 9090, connected); // the type of the
>> client (tutorial::CalculatorAsyncClient) is inferred from the
>> signature of connected
>>
>>    io_service.run();
>>  }
>>  catch (std::exception& e)
>>  {
>>    std::cout << "Exception: " << e.what() << "\n";
>>  }
>>
>>  return 0;
>> }
>>
>> Regards,
>> Erik Bernhardsson,
>> Mattias de Zalenski
>> Spotify - http://www.spotify.com/
>>
>
#include <iostream>
#include <istream>
#include <ostream>
#include <sstream>
#include <string>
#include <boost/asio.hpp>
#include <boost/bind.hpp>

#include <async/TAsioAsync.h>
#include <protocol/TBinaryProtocol.h>

#include "Calculator.h"

using namespace apache::thrift;

void pingback() {
  printf("ping()\n");
}

void pingerr(tutorial::Calculator_ping_result result) {
  printf("Exception caught\n");
}

void addback(int32_t a, int32_t b, int32_t sum) {
  printf("%d+%d=%d\n", a, b, sum);
}

void adderr(tutorial::Calculator_add_result result) {
  printf("Exception caught\n");
}

void connected(boost::shared_ptr<tutorial::CalculatorAsyncClient> client) {
  client->ping(pingback, pingerr);

  client->add(2, 3, boost::bind(&addback, 2, 3, _1), &adderr);  // will return 
after 5s                                                                        
                                                                                
                                                                           
  client->add(1, 2, boost::bind(&addback, 1, 2, _1), &adderr);  // will return 
after 3s                                                                        
                                                                                
                                                                           
  client->add(1, 1, boost::bind(&addback, 1, 1, _1), &adderr);  // will return 
after 2s                                                                        
                                                                                
                                                                           
}

int main(int argc, char* argv[])
{
  try
    {
      boost::asio::io_service io_service;

      boost::shared_ptr<protocol::TProtocolFactory> protocolFactory(new 
protocol::TBinaryProtocolFactory());

      boost::shared_ptr<async::TAsioClient> client (
                                                    new async::TAsioClient(
                                                                           
io_service,
                                                                           
protocolFactory,
                                                                           
protocolFactory));
                                                                                
                                                                                
                      
      client->connect("localhost", 9090, connected);

      io_service.run();
    }
  catch (std::exception& e)
    {
      std::cout << "Exception: " << e.what() << "\n";
    }

  return 0;
}
#include <protocol/TBinaryProtocol.h>

#include <iostream>
#include <stdexcept>
#include <sstream>

#include <time.h>

#include "../gen-cpp/Calculator.h"

#include <async/TAsioAsync.h>

using namespace apache::thrift;

using namespace tutorial;
using namespace shared;

boost::asio::io_service io_service; // todo: use singleton

class CalculatorAsyncHandler : public CalculatorAsyncIf {
 public:
  CalculatorAsyncHandler() {}

  virtual void ping(boost::function<void (void)> callback, boost::function<void 
(Calculator_ping_result)> errback) {
    printf("ping()\n");
    callback();
  }

  virtual void add(const int32_t num1, const int32_t num2, boost::function<void 
(int32_t)> callback, boost::function<void (Calculator_add_result)> errback) {
    printf("add(%d,%d)\n", num1, num2);
    boost::shared_ptr<boost::asio::deadline_timer> timer(new 
boost::asio::deadline_timer(io_service, boost::posix_time::seconds(num1 + 
num2)));
    timer->async_wait(boost::bind(&CalculatorAsyncHandler::wait_done, this, 
num1 + num2, callback, timer));
  }

  virtual void wait_done(const int32_t sum, boost::function<void (int32_t)> 
callback, boost::shared_ptr<boost::asio::deadline_timer>) {
    callback(sum);
    // timer will fall out of scope now and will be deleted
  }

  virtual void calculate(const int32_t logid, const Work& w, 
boost::function<void (int32_t)> callback, boost::function<void 
(Calculator_calculate_result)> errback) {
    printf("calculate(%d,{%d,%d,%d})\n", logid, w.op, w.num1, w.num2);
    int32_t val;

    switch (w.op) {
    case ADD:
      val = w.num1 + w.num2;
      break;
    case SUBTRACT:
      val = w.num1 - w.num2;
      break;
    case MULTIPLY:
      val = w.num1 * w.num2;
      break;
    case DIVIDE:
      if (w.num2 == 0) {
        InvalidOperation io;
        io.what = w.op;
        io.why = "Cannot divide by 0";
        errback(calculate_ouch(io));
        return;
      }
      val = w.num1 / w.num2;
      break;
    default:
      errback(calculate_failure(std::string("Invalid Operation")));
      return;
    }

    SharedStruct ss;
    ss.key = logid;
    char buffer[12];
    snprintf(buffer, sizeof(buffer), "%d", val);
    ss.value = buffer;

    log[logid] = ss;

    callback(val);
  }

  virtual void getStruct(const int32_t key, boost::function<void 
(SharedStruct)> callback, boost::function<void 
(SharedService_getStruct_result)> errback) {
    printf("getStruct(%d)\n", key);
    callback(log[key]);
  }

  virtual void zip() {
    printf("zip()\n");
  }

protected:
  std::map<int32_t, SharedStruct> log;
};

int main(int argc, char **argv) {
  boost::shared_ptr<protocol::TProtocolFactory> protocolFactory(new 
protocol::TBinaryProtocolFactory());
  boost::shared_ptr<CalculatorAsyncHandler> handler(new 
CalculatorAsyncHandler());
  boost::shared_ptr<TProcessor> processor(new 
CalculatorAsyncProcessor(handler));

  boost::shared_ptr<apache::thrift::async::TAsioServer> server(
                                                               new 
apache::thrift::async::TAsioServer(
                                                                                
                      io_service,
                                                                                
                      9090,
                                                                                
                      protocolFactory,
                                                                                
                      protocolFactory,
                                                                                
                      processor));

  server->start(); // Nonblocking
  io_service.run(); // Blocking

  return 0;
}
#include "TAsioAsync.h"

// #define DEBUG_PRINT_PACKETS

namespace apache { namespace thrift { namespace async {

  // namespace { // begin anonymous namespace

struct message {
  int len;
  boost::shared_array<uint8_t> buf;
};

class TAsioOutputTransport : public TAsyncOutputTransport {
  // This class should not be used outside TAsioAsync.cpp, consider moving it 
there.
 public:
  TAsioOutputTransport(boost::shared_ptr<boost::asio::ip::tcp::socket> socket) :
    socket_(socket),
    isCurrentlyWriting_(false)
    {}

  void write(const uint8_t* buf, uint32_t len) {  // from transport::TTransport
    outData_.insert(outData_.end(), &buf[0], &buf[len]);
  }
  
  void flush() {  // from transport::TTransport
    uint32_t outDataSize = outData_.size();
    message m;
    m.len = outDataSize + 4;
    m.buf = boost::shared_array<uint8_t>(new uint8_t[outDataSize + 4]);
    
    // Put data in the outbuffer
    m.buf[0] = (outDataSize >> 24) & 0xff;
    m.buf[1] = (outDataSize >> 16) & 0xff;
    m.buf[2] = (outDataSize >>  8) & 0xff;
    m.buf[3] = (outDataSize >>  0) & 0xff;
    std::copy(&outData_[0], &outData_[outDataSize], &m.buf[4]);
    
#ifdef DEBUG_PRINT_PACKETS
    printf("output: ");
    for (int i = 0; i < outDataSize; i++)
      printf("%02x ", outData_[i]);
    printf("\n");
#endif

    // Delete internal buffer
    outData_.clear();
    
    // Queue into buffer
    outQueue_.push_back(m);  
  
    // If not currently writing, start
    if (!isCurrentlyWriting_) {
      isCurrentlyWriting_ = true;
      writeNext();
    }
  }
  
  void writeNext() {
    if (!outQueue_.size()) {
      isCurrentlyWriting_ = false;
      return;
    }

    // By passing m.buf as an argument, we are guaranteed that it will remain 
in memory
    message m = outQueue_.front();
    outQueue_.pop_front();
    boost::asio::async_write(*socket_, boost::asio::buffer(m.buf.get(), m.len), 
boost::bind(&TAsioOutputTransport::handleWriteFinished, this, 
boost::asio::placeholders::error, m));
  }
  
  void handleWriteFinished(const boost::system::error_code& error, const 
message& m) {
    // Temporary memory buffer will be deleted when this scope closes
    
    if (error) {
      std::cout << "An error occurred when writing message" << std::endl;
      return;
    }

    writeNext();
  }

 private:
  boost::shared_ptr<boost::asio::ip::tcp::socket> socket_;
  std::vector<uint8_t> outData_;
  bool isCurrentlyWriting_;
  std::deque<message> outQueue_;
};

class TAsioInputTransport {
 public:
  TAsioInputTransport(boost::shared_ptr<boost::asio::ip::tcp::socket> socket) : 
    socket_(socket)
    {}

  void read(boost::function<void (const uint8_t* buf, uint32_t len)> callback) {
    boost::asio::async_read(*socket_, boost::asio::buffer(length_, 4), 
boost::bind(&TAsioInputTransport::handleReadLength, this/*shared_from_this()*/, 
_1, callback));
  }

 private:
  void handleReadLength(const boost::system::error_code& error, 
boost::function<void (const uint8_t* buf, uint32_t len)> callback) {
    if (error) {
      std::cout << "An error occurred when reading length header" << std::endl;
      return;
      
      // Note on memory: TAsioServerClient is actually owned by a shared_ptr 
from the callback, so it will be automatically destructed
    }
    uint32_t bufLen =
      ((length_[0] & 0xff) << 24) |
      ((length_[1] & 0xff) << 16) |
      ((length_[2] & 0xff) <<  8) |
      ((length_[3] & 0xff) <<  0);
    
    inData_.resize(bufLen);
    boost::asio::async_read(*socket_, boost::asio::buffer(inData_, bufLen), 
boost::bind(&TAsioInputTransport::handleReadMessage, 
this/*shared_from_this()*/, _1, callback));
  }

  void handleReadMessage(const boost::system::error_code& error, 
boost::function<void (const uint8_t* buf, uint32_t len)> callback) {
    if (error) {
      std::cout << "An error occurred when reading frame" << std::endl;
      return;
      
      // See comment about memory in handleReadLength
    }

    callback(&inData_[0], inData_.size());
    inData_.clear();
  }

  uint8_t length_[4];
  std::vector<uint8_t> inData_;
  boost::shared_ptr<boost::asio::ip::tcp::socket> socket_;
};


class TAsioServerClient : public 
boost::enable_shared_from_this<TAsioServerClient> {
  // This class should not be used outside TAsioAsync.cpp, consider moving it 
there.
 public:
  TAsioServerClient(boost::shared_ptr<boost::asio::ip::tcp::socket> socket,
                    boost::shared_ptr<protocol::TProtocolFactory> 
iProtocolFactory,
                    boost::shared_ptr<protocol::TProtocolFactory> 
oProtocolFactory,
                    boost::shared_ptr<TProcessor> processor) :
    iProtocolFactory_(iProtocolFactory),
    oProtocolFactory_(oProtocolFactory),
    processor_(processor),
    iTransport_(socket),
    oTransport_(new TAsioOutputTransport(socket))
    { }

  void start() { read(); }
  
  ~TAsioServerClient() { std::cout << "Disconnecting client" << std::endl; }

 private:  
  void read() {
    iTransport_.read(boost::bind(&TAsioServerClient::handleMessage, /* this 
*/shared_from_this(), _1, _2)); // todo: errback
  }
  
  void handleMessage(const uint8_t* buf, uint32_t len) {
    // todo: add error handling
    
    boost::shared_ptr<transport::TTransport> iTransport(new 
transport::TMemoryBuffer(const_cast<uint8_t*>(buf), len));  // TMemoryBuffer 
guarantees it will not touch
    boost::shared_ptr<protocol::TProtocol> 
iProtocol(iProtocolFactory_->getProtocol(iTransport));
    boost::shared_ptr<protocol::TProtocol> 
oProtocol(oProtocolFactory_->getProtocol(oTransport_));
    
#ifdef DEBUG_PRINT_PACKETS
    printf("input: ");
    for (int i = 0; i < len; i++)
      printf("%02x ", buf[i]);
    printf("\n");
#endif
    
    processor_->process(iProtocol, oProtocol);
    read();
  }

  boost::shared_ptr<boost::asio::ip::tcp::socket> socket_;
  boost::shared_ptr<protocol::TProtocolFactory> iProtocolFactory_;
  boost::shared_ptr<protocol::TProtocolFactory> oProtocolFactory_;
  boost::shared_ptr<TProcessor> processor_;
  TAsioInputTransport iTransport_;
  boost::shared_ptr<TAsioOutputTransport> oTransport_;
};

  // } // end anonymous namespace

void TAsioServer::serve() {
  socket_.reset(new boost::asio::ip::tcp::socket(io_service_)); // socket of 
the next client
  acceptor_.async_accept(*socket_, boost::bind(&TAsioServer::handleAccept, 
shared_from_this(), boost::asio::placeholders::error));
}

void TAsioServer::handleAccept(const boost::system::error_code& error) {
  if (error) {
    std::cout << "An error occurred while accepting" << std::endl;
    return;
    // We will stop accepting at this point
    // Call errback?
  }
  
  boost::shared_ptr<TAsioServerClient> client(new TAsioServerClient(socket_, 
iProtocolFactory_, oProtocolFactory_, processor_));
  client->start();
  
  serve(); // next client plz
}

boost::shared_ptr<TAsyncOutputTransport> TAsioClient::getOutputTransport() {
  return boost::shared_ptr<TAsyncOutputTransport>(new 
TAsioOutputTransport(socket_));
}

void TAsioClient::startReadingInput(boost::shared_ptr<TProcessor> processor) {
  boost::shared_ptr<TAsioInputTransport> iTransport(new 
TAsioInputTransport(socket_));
    
  // Start read loop. iTransport will stay in memory as long as the loop is 
still running, since it is being referred
  // to from the shared_ptr in the callback.
  iTransport->read(boost::bind(&TAsioClient::handleMessage, shared_from_this(), 
_1, _2, iTransport, processor)); // todo: errback
}

void TAsioClient::handleMessage(const uint8_t* buf, uint32_t len, 
boost::shared_ptr<TAsioInputTransport> iTransport, 
boost::shared_ptr<TProcessor> processor) {
  // I'm binding iTransport and processor so that these will stay in scope as 
long as data is read. Not too happy about
  // this, but is there a better way of doing it?

  boost::shared_ptr<transport::TTransport> iTransport2(new 
transport::TMemoryBuffer(const_cast<uint8_t*>(buf), len));
  boost::shared_ptr<protocol::TProtocol> 
iProtocol(iProtocolFactory_->getProtocol(iTransport2));
  
  processor->process(iProtocol);

  // Read next message
  iTransport->read(boost::bind(&TAsioClient::handleMessage, shared_from_this(), 
_1, _2, iTransport, processor)); // todo: errback
}

void TAsioClient::connectSimple(const std::string& host, short port, 
boost::function<void (void)> callback) {
  // Start an asynchronous resolve to translate the server and service names
  // into a list of endpoints.
  std::stringstream ss;
  ss << port;
  boost::asio::ip::tcp::resolver::query query(host, ss.str());
  resolver_.async_resolve(query,
                          boost::bind(&TAsioClient::handleResolve, this,
                                      boost::asio::placeholders::error,
                                      boost::asio::placeholders::iterator,
                                      callback));
}

void TAsioClient::handleResolve(const boost::system::error_code& err,
                            boost::asio::ip::tcp::resolver::iterator 
endpoint_iterator,
                            boost::function<void (void)> callback) {
  if (!err) {
    // Attempt a connection to the first endpoint in the list. Each endpoint
    // will be tried until we successfully establish a connection.
    boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
    socket_->async_connect(endpoint,
                           boost::bind(&TAsioClient::handleConnect, this,
                                       boost::asio::placeholders::error,
                                       ++endpoint_iterator,
                                       callback));
  } else {
    std::cout << "Error: " << err.message() << std::endl;
  }
}

void TAsioClient::handleConnect(const boost::system::error_code& err,
                            boost::asio::ip::tcp::resolver::iterator 
endpoint_iterator,
                            boost::function<void (void)> callback) {
  if (!err) {
    // Everything is connected, now let's build the client and etc...
    callback();
    
  } else if (endpoint_iterator != boost::asio::ip::tcp::resolver::iterator()) {
    // The connection failed. Try the next endpoint in the list.
    socket_->close();
    boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator;
    socket_->async_connect(endpoint,
                           boost::bind(&TAsioClient::handleConnect, this,
                                       boost::asio::placeholders::error,
                                       ++endpoint_iterator,
                                       callback));
  } else {
    std::cout << "Error: " << err.message() << std::endl;
    
    // Todo: call user-provided errback
  }
}

} } }
#ifndef _TASIOASYNC_H_
#define _TASIOASYNC_H_

#include <async/TAsync.h>
#include <transport/TBufferTransports.h>
#include <boost/enable_shared_from_this.hpp>
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/function.hpp>
#include <boost/shared_array.hpp>
#include <vector>
#include <deque>
#include <protocol/TProtocol.h>
#include <TProcessor.h>



namespace apache { namespace thrift { namespace async {

class TAsioInputTransport; // fwd declaration

class TAsioServer : public boost::enable_shared_from_this<TAsioServer> {
  // Class for a Thrift server.
  // The server must be instantiated within a shared_ptr or else it will throw 
tr1::weak_ptr
 public:
  TAsioServer(boost::asio::io_service& io_service,
              short port,
              boost::shared_ptr<protocol::TProtocolFactory> iProtocolFactory,
              boost::shared_ptr<protocol::TProtocolFactory> oProtocolFactory,
              boost::shared_ptr<TProcessor> processor) :
    io_service_(io_service),
    acceptor_(io_service, 
boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)),
    iProtocolFactory_(iProtocolFactory),
    oProtocolFactory_(oProtocolFactory),
    processor_(processor)
    {}

  void start() { serve(); }

 private:
  void serve();
  void handleAccept(const boost::system::error_code& error);

  boost::asio::io_service& io_service_;
  boost::asio::ip::tcp::acceptor acceptor_;
  boost::shared_ptr<protocol::TProtocolFactory> iProtocolFactory_;
  boost::shared_ptr<protocol::TProtocolFactory> oProtocolFactory_;
  boost::shared_ptr<TProcessor> processor_;
  boost::shared_ptr<boost::asio::ip::tcp::socket> socket_;
};

class TAsioClient : public boost::enable_shared_from_this<TAsioClient> {
  // Class for client connections

  // This class must be instantiated within a boost::shared_ptr or else it will 
throw a
  // tr1::weak_ptr upon connect. 
  // Bad:  TAsioClient t(...)
  // Good: boost::shared_ptr<TAsioClient> t(...)

 public:
  TAsioClient(boost::asio::io_service& io_service,
              boost::shared_ptr<protocol::TProtocolFactory> iProtocolFactory,
              boost::shared_ptr<protocol::TProtocolFactory> oProtocolFactory
              ) :
    resolver_(io_service),
    socket_(new boost::asio::ip::tcp::socket(io_service)),
    iProtocolFactory_(iProtocolFactory),
    oProtocolFactory_(oProtocolFactory)
    {}

  template<typename T>
    void connect(const std::string& host, short port, void 
(&callback)(boost::shared_ptr<T> client)) {
      // Constructor of boost::function seems to be unable to do an automatic 
cast of raw function pointers, so we provide
      // this convenience method wrapping connect(...).
      boost::function<void (boost::shared_ptr<T> client)> callback2(callback);
      connect(host, port, callback2);
    }

  template<typename T>
    void connect(const std::string& host, short port, boost::function<void 
(boost::shared_ptr<T> client)> callback) {
    // T is inferred from the type of the callback. I'm not too sure if this is 
apparent.
    // This function untemplatizes by binding a new callback to build<T> and 
passing it on to the internal connect code.
    // The reason we need to use templates at all is that we have to construct 
arbitrary processor objects within the
    // scope of TAsioClient. We couldn't let the user provide us with an 
processor object since the processor is
    // dependent on internals of TAsioClient (the output transport), and 
internals of TAsioClient (the input transport)
    // is dependent on the processor. See how build(...) works.
    boost::function<void (void)> finalCallback = 
boost::bind(&TAsioClient::build<T>, this, callback);
    connectSimple(host, port, finalCallback);
  }

 private:
  template<typename T>
    void build(boost::function<void (boost::shared_ptr<T> client)> callback) {
    // We are connected, let's build the client.
    boost::shared_ptr<T> client(new T(getOutputTransport(), oProtocolFactory_));
    boost::shared_ptr<TProcessor> processor(client); // Simply reinterpreted as 
a base class (T is derived from TProcessor)

    startReadingInput(processor);

    // Call user-provided callback to signal success.
    callback(client);
  }

  boost::shared_ptr<TAsyncOutputTransport> getOutputTransport();
  void startReadingInput(boost::shared_ptr<TProcessor> processor);

  void handleMessage(const uint8_t* buf, uint32_t len, 
boost::shared_ptr<TAsioInputTransport> iTransport, 
boost::shared_ptr<TProcessor> processor);
  void connectSimple(const std::string& host, short port, boost::function<void 
(void)> callback);
  void handleResolve(const boost::system::error_code& err,
                     boost::asio::ip::tcp::resolver::iterator endpoint_iterator,
                     boost::function<void (void)> callback);
  void handleConnect(const boost::system::error_code& err,
                     boost::asio::ip::tcp::resolver::iterator endpoint_iterator,
                     boost::function<void (void)> callback);


  boost::asio::ip::tcp::resolver resolver_;
  boost::shared_ptr<boost::asio::ip::tcp::socket> socket_;
  boost::shared_ptr<protocol::TProtocolFactory> iProtocolFactory_;
  boost::shared_ptr<protocol::TProtocolFactory> oProtocolFactory_;
};

} } }

#endif
#ifndef _TASYNC_H_
#define _TASYNC_H_ 1

#include <transport/TTransport.h>

namespace apache { namespace thrift { namespace async {

class TAsyncOutputTransport : public transport::TTransport {
  // Asynchronous transport.
  //
  // The asynchronousity gives us an interesting asymmetry. We are
  // free to write to an underlying socket at any time, but cannot
  // read from it in any way. This class is simply meant to be a
  // superclass of unidirectional nonblocking out transports,
  // disallowing the read method.

 public:
  virtual void write(const uint8_t* /* buf */, uint32_t /* len */) = 0;
  virtual void flush() = 0;

  virtual uint32_t read(uint8_t* /* buf */, uint32_t /* len */) {
    // This method should not be overridden!!
    throw 
transport::TTransportException(transport::TTransportException::NOT_OPEN, 
"Asynchronous transports cannot read.");
  }

  virtual ~TAsyncOutputTransport() {}
};

// More stuff should go here, so far not enough generalization has been done

} } }

#endif

Reply via email to