Author: astitcher Date: Thu May 28 05:18:09 2009 New Revision: 779435 URL: http://svn.apache.org/viewvc?rev=779435&view=rev Log: QPID-1879 Don't use a thread for every new client Connection - By default the max number of threads now used for network io is the number of cpus available. - This can be overridden with the QPID_MAX_IOTHREADS environment variable or the config file
Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp qpid/trunk/qpid/cpp/src/qpid/client/Connector.h qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=779435&r1=779434&r2=779435&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Thu May 28 05:18:09 2009 @@ -18,7 +18,13 @@ * under the License. * */ + #include "ConnectionImpl.h" + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + #include "Connector.h" #include "ConnectionSettings.h" #include "SessionImpl.h" @@ -28,11 +34,16 @@ #include "qpid/Url.h" #include "qpid/framing/enum.h" #include "qpid/framing/reply_exceptions.h" +#include "qpid/sys/Poller.h" +#include "qpid/sys/SystemInfo.h" +#include "qpid/Options.h" #include <boost/bind.hpp> #include <boost/format.hpp> +#include <boost/shared_ptr.hpp> #include <limits> +#include <vector> namespace qpid { namespace client { @@ -42,7 +53,10 @@ using namespace qpid::sys; using namespace qpid::framing::connection;//for connection error codes -// Get timer singleton +namespace { +// Maybe should amalgamate the singletons into a single client singleton + +// Get timer singleton Timer& theTimer() { static Mutex timerInitLock; ScopedLock<Mutex> l(timerInitLock); @@ -51,6 +65,73 @@ return t; } +struct IOThreadOptions : public qpid::Options { + int maxIOThreads; + + IOThreadOptions(int c) : + Options("IO threading options"), + maxIOThreads(c) + { + addOptions() + ("max-iothreads", optValue(maxIOThreads, "N"), "Maximum number of io threads to use"); + } +}; + +// IO threads +class IOThread { + int maxIOThreads; + int ioThreads; + int connections; + Mutex threadLock; + std::vector<Thread> t; + Poller::shared_ptr poller_; + +public: + void add() { + ScopedLock<Mutex> l(threadLock); + ++connections; + if (ioThreads < maxIOThreads) { + QPID_LOG(debug, "Created IO thread: " << ioThreads); + ++ioThreads; + t.push_back( Thread(poller_.get()) ); + } + } + + void sub() { + ScopedLock<Mutex> l(threadLock); + --connections; + } + + Poller::shared_ptr poller() const { + return poller_; + } + + // Here is where the maximum number of threads is set + IOThread(int c) : + ioThreads(0), + connections(0), + poller_(new Poller) + { + IOThreadOptions options(c); + options.parse(0, 0, QPIDC_CONF_FILE, true); + maxIOThreads = (options.maxIOThreads != -1) ? + options.maxIOThreads : 1; + } + + // We can't destroy threads one-by-one as the only + // control we have is to shutdown the whole lot + // and we can't do that before we're unloaded as we can't + // restart the Poller after shutting it down + ~IOThread() { + poller_->shutdown(); + for (int i=0; i<ioThreads; ++i) { + t[i].join(); + } + } +}; + +static IOThread io(SystemInfo::concurrency()); + class HeartbeatTask : public TimerTask { TimeoutHandler& timeout; @@ -67,6 +148,8 @@ {} }; +} + ConnectionImpl::ConnectionImpl(framing::ProtocolVersion v, const ConnectionSettings& settings) : Bounds(settings.maxFrameSize * settings.bounds), handler(settings, v), @@ -90,6 +173,7 @@ // is running. failover.reset(); if (connector) connector->close(); + io.sub(); } void ConnectionImpl::addSession(const boost::shared_ptr<SessionImpl>& session, uint16_t channel) @@ -126,7 +210,6 @@ return handler.isOpen(); } - void ConnectionImpl::open() { const std::string& protocol = handler.protocol; @@ -134,7 +217,8 @@ int port = handler.port; QPID_LOG(info, "Connecting to " << protocol << ":" << host << ":" << port); - connector.reset(Connector::create(protocol, version, handler, this)); + io.add(); + connector.reset(Connector::create(protocol, io.poller(), version, handler, this)); connector->setInputHandler(&handler); connector->setShutdownHandler(this); connector->connect(host, port); @@ -238,7 +322,7 @@ { return handler; } - + std::vector<qpid::Url> ConnectionImpl::getKnownBrokers() { return failover ? failover->getKnownBrokers() : handler.knownBrokersUrls; } Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=779435&r1=779434&r2=779435&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Thu May 28 05:18:09 2009 @@ -27,9 +27,11 @@ #include "qpid/sys/Codec.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/InitiationHandler.h" #include "qpid/sys/AsynchIO.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" +#include "qpid/sys/Socket.h" #include "qpid/sys/SecurityLayer.h" #include "qpid/Msg.h" @@ -51,21 +53,23 @@ // Stuff for the registry of protocol connectors (maybe should be moved to its own file) namespace { typedef std::map<std::string, Connector::Factory*> ProtocolRegistry; - + ProtocolRegistry& theProtocolRegistry() { static ProtocolRegistry protocolRegistry; - + return protocolRegistry; } } -Connector* Connector::create(const std::string& proto, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) +Connector* Connector::create(const std::string& proto, + Poller::shared_ptr p, + framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { ProtocolRegistry::const_iterator i = theProtocolRegistry().find(proto); if (i==theProtocolRegistry().end()) { throw Exception(QPID_MSG("Unknown protocol: " << proto)); } - return (i->second)(v, s, c); + return (i->second)(p, v, s, c); } void Connector::registerFactory(const std::string& proto, Factory* connectorFactory) @@ -81,7 +85,7 @@ { } -class TCPConnector : public Connector, public sys::Codec, private sys::Runnable +class TCPConnector : public Connector, public sys::Codec { typedef std::deque<framing::AMQFrame> Frames; struct Buff; @@ -93,7 +97,7 @@ size_t lastEof; // Position after last EOF in frames uint64_t currentSize; Bounds* bounds; - + framing::ProtocolVersion version; bool initiated; bool closed; @@ -104,28 +108,25 @@ framing::InitiationHandler* initialiser; framing::OutputHandler* output; - sys::Thread receiver; - sys::Socket socket; sys::AsynchIO* aio; std::string identifier; - boost::shared_ptr<sys::Poller> poller; + Poller::shared_ptr poller; std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; ~TCPConnector(); - void run(); void handleClosed(); bool closeInternal(); - + bool readbuff(qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*); void writebuff(qpid::sys::AsynchIO&); void writeDataBlock(const framing::AMQDataBlock& data); void eof(qpid::sys::AsynchIO&); boost::weak_ptr<ConnectionImpl> impl; - + void connect(const std::string& host, int port); void init(); void close(); @@ -142,18 +143,23 @@ size_t decode(const char* buffer, size_t size); size_t encode(const char* buffer, size_t size); bool canEncode(); - public: - TCPConnector(framing::ProtocolVersion pVersion, - const ConnectionSettings&, + TCPConnector(Poller::shared_ptr, + framing::ProtocolVersion pVersion, + const ConnectionSettings&, ConnectionImpl*); }; +struct TCPConnector::Buff : public AsynchIO::BufferBase { + Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} + ~Buff() { delete [] bytes;} +}; + // Static constructor which registers connector here namespace { - Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { - return new TCPConnector(v, s, c); + Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { + return new TCPConnector(p, v, s, c); } struct StaticInit { @@ -163,19 +169,21 @@ } init; } -TCPConnector::TCPConnector(ProtocolVersion ver, +TCPConnector::TCPConnector(Poller::shared_ptr p, + ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), lastEof(0), currentSize(0), bounds(cimpl), - version(ver), + version(ver), initiated(false), closed(true), joined(true), shutdownHandler(0), aio(0), + poller(p), impl(cimpl->shared_from_this()) { QPID_LOG(debug, "TCPConnector created for " << version.toString()); @@ -197,7 +205,6 @@ } identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); - poller = Poller::shared_ptr(new Poller); aio = AsynchIO::create(socket, boost::bind(&TCPConnector::readbuff, this, _1, _2), boost::bind(&TCPConnector::eof, this, _1), @@ -214,28 +221,24 @@ ProtocolInitiation init(version); writeDataBlock(init); joined = false; - receiver = Thread(this); + for (int i = 0; i < 32; i++) { + aio->queueReadBuffer(new Buff(maxFrameSize)); + } + + aio->start(poller); } bool TCPConnector::closeInternal() { - bool ret; - { Mutex::ScopedLock l(lock); - ret = !closed; + bool ret = !closed; if (!closed) { closed = true; aio->queueForDeletion(); - poller->shutdown(); - } - if (joined || receiver.id() == Thread::current().id()) { - return ret; - } - joined = true; + socket.close(); } - receiver.join(); return ret; } - + void TCPConnector::close() { closeInternal(); } @@ -285,18 +288,13 @@ shutdownHandler->shutdown(); } -struct TCPConnector::Buff : public AsynchIO::BufferBase { - Buff(size_t size) : AsynchIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - void TCPConnector::writebuff(AsynchIO& /*aio*/) { Codec* codec = securityLayer.get() ? (Codec*) securityLayer.get() : (Codec*) this; if (codec->canEncode()) { std::auto_ptr<AsynchIO::BufferBase> buffer = std::auto_ptr<AsynchIO::BufferBase>(aio->getQueuedBuffer()); if (!buffer.get()) buffer = std::auto_ptr<AsynchIO::BufferBase>(new Buff(maxFrameSize)); - + size_t encoded = codec->encode(buffer->bytes, buffer->byteCount); buffer->dataStart = 0; @@ -382,28 +380,6 @@ handleClosed(); } -void TCPConnector::run() { - // Keep the connection impl in memory until run() completes. - boost::shared_ptr<ConnectionImpl> protect = impl.lock(); - assert(protect); - try { - Dispatcher d(poller); - - for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff(maxFrameSize)); - } - - aio->start(poller); - d.run(); - } catch (const std::exception& e) { - QPID_LOG(error, QPID_MSG("FAIL " << identifier << ": " << e.what())); - handleClosed(); - } - try { - socket.close(); - } catch (const std::exception&) {} -} - void TCPConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) { securityLayer = sl; Modified: qpid/trunk/qpid/cpp/src/qpid/client/Connector.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/Connector.h?rev=779435&r1=779434&r2=779435&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/Connector.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/Connector.h Thu May 28 05:18:09 2009 @@ -22,27 +22,24 @@ #define _Connector_ -#include "qpid/framing/InputHandler.h" #include "qpid/framing/OutputHandler.h" -#include "qpid/framing/InitiationHandler.h" -#include "qpid/framing/ProtocolInitiation.h" #include "qpid/framing/ProtocolVersion.h" -#include "qpid/sys/ShutdownHandler.h" -#include "qpid/sys/TimeoutHandler.h" -#include "qpid/sys/Thread.h" -#include "qpid/sys/Runnable.h" -#include "qpid/sys/Mutex.h" -#include "qpid/sys/Socket.h" -#include "qpid/sys/Time.h" -#include <queue> -#include <boost/weak_ptr.hpp> #include <boost/shared_ptr.hpp> +#include <string> + namespace qpid { namespace sys { +class ShutdownHandler; class SecurityLayer; +class Poller; +} + +namespace framing { +class InputHandler; +class AMQFrame; } namespace client { @@ -52,11 +49,14 @@ ///@internal class Connector : public framing::OutputHandler -{ +{ public: // Protocol connector factory related stuff (it might be better to separate this code from the TCP Connector in the future) - typedef Connector* Factory(framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*); - static Connector* create(const std::string& proto, framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*); + typedef Connector* Factory(boost::shared_ptr<qpid::sys::Poller>, + framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*); + static Connector* create(const std::string& proto, + boost::shared_ptr<qpid::sys::Poller>, + framing::ProtocolVersion, const ConnectionSettings&, ConnectionImpl*); static void registerFactory(const std::string& proto, Factory* connectorFactory); virtual ~Connector() {}; @@ -73,7 +73,6 @@ virtual const std::string& getIdentifier() const = 0; virtual void activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer>); - }; }} Modified: qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp?rev=779435&r1=779434&r2=779435&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/RdmaConnector.cpp Thu May 28 05:18:09 2009 @@ -26,6 +26,7 @@ #include "qpid/log/Statement.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/InitiationHandler.h" #include "qpid/sys/rdma/RdmaIO.h" #include "qpid/sys/Dispatcher.h" #include "qpid/sys/Poller.h" @@ -48,7 +49,7 @@ using boost::format; using boost::str; - class RdmaConnector : public Connector, public sys::Codec, private sys::Runnable + class RdmaConnector : public Connector, public sys::Codec { struct Buff; @@ -60,13 +61,12 @@ Frames frames; size_t lastEof; // Position after last EOF in frames uint64_t currentSize; - Bounds* bounds; - - + Bounds* bounds; + framing::ProtocolVersion version; bool initiated; - sys::Mutex pollingLock; + sys::Mutex pollingLock; bool polling; bool joined; @@ -75,15 +75,12 @@ framing::InitiationHandler* initialiser; framing::OutputHandler* output; - sys::Thread receiver; - Rdma::AsynchIO* aio; sys::Poller::shared_ptr poller; std::auto_ptr<qpid::sys::SecurityLayer> securityLayer; ~RdmaConnector(); - void run(); void handleClosed(); bool closeInternal(); @@ -101,7 +98,7 @@ std::string identifier; ConnectionImpl* impl; - + void connect(const std::string& host, int port); void close(); void send(framing::AMQFrame& frame); @@ -119,15 +116,16 @@ bool canEncode(); public: - RdmaConnector(framing::ProtocolVersion pVersion, + RdmaConnector(Poller::shared_ptr, + framing::ProtocolVersion pVersion, const ConnectionSettings&, ConnectionImpl*); }; // Static constructor which registers connector here namespace { - Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { - return new RdmaConnector(v, s, c); + Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { + return new RdmaConnector(p, v, s, c); } struct StaticInit { @@ -139,7 +137,8 @@ } -RdmaConnector::RdmaConnector(ProtocolVersion ver, +RdmaConnector::RdmaConnector(Poller::shared_ptr p, + ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), @@ -152,6 +151,7 @@ joined(true), shutdownHandler(0), aio(0), + poller(p), impl(cimpl) { QPID_LOG(debug, "RdmaConnector created for " << version); @@ -165,7 +165,6 @@ Mutex::ScopedLock l(pollingLock); assert(!polling); assert(joined); - poller = Poller::shared_ptr(new Poller); // This stuff needs to abstracted out of here to a platform specific file ::addrinfo *res; @@ -190,7 +189,6 @@ polling = true; joined = false; - receiver = Thread(this); } // The following only gets run when connected @@ -226,23 +224,14 @@ bool RdmaConnector::closeInternal() { bool ret; - { Mutex::ScopedLock l(pollingLock); ret = polling; if (polling) { polling = false; - poller->shutdown(); } - if (joined || receiver.id() == Thread::current().id()) { - return ret; - } - joined = true; - } - - receiver.join(); return ret; } - + void RdmaConnector::close() { closeInternal(); } @@ -366,28 +355,6 @@ handleClosed(); } -void RdmaConnector::run(){ - // Keep the connection impl in memory until run() completes. - //GRS: currently the ConnectionImpls destructor is where the Io thread is joined - //boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this(); - //assert(protect); - try { - Dispatcher d(poller); - - //aio->start(poller); - d.run(); - //aio->queueForDeletion(); - } catch (const std::exception& e) { - { - // We're no longer polling - Mutex::ScopedLock l(pollingLock); - polling = false; - } - QPID_LOG(error, e.what()); - handleClosed(); - } -} - void RdmaConnector::activateSecurityLayer(std::auto_ptr<qpid::sys::SecurityLayer> sl) { securityLayer = sl; Modified: qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp?rev=779435&r1=779434&r2=779435&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/client/SslConnector.cpp Thu May 28 05:18:09 2009 @@ -28,6 +28,7 @@ #include "qpid/log/Statement.h" #include "qpid/sys/Time.h" #include "qpid/framing/AMQFrame.h" +#include "qpid/framing/InitiationHandler.h" #include "qpid/sys/ssl/util.h" #include "qpid/sys/ssl/SslIo.h" #include "qpid/sys/ssl/SslSocket.h" @@ -50,7 +51,7 @@ using boost::str; -class SslConnector : public Connector, private sys::Runnable +class SslConnector : public Connector { struct Buff; @@ -68,25 +69,25 @@ framing::Buffer encode; size_t framesEncoded; std::string identifier; - Bounds* bounds; - + Bounds* bounds; + void writeOne(); void newBuffer(); public: - + Writer(uint16_t maxFrameSize, Bounds*); ~Writer(); void init(std::string id, sys::ssl::SslIO*); void handle(framing::AMQFrame&); void write(sys::ssl::SslIO&); }; - + const uint16_t maxFrameSize; framing::ProtocolVersion version; bool initiated; - sys::Mutex closedLock; + sys::Mutex closedLock; bool closed; bool joined; @@ -96,20 +97,17 @@ framing::OutputHandler* output; Writer writer; - - sys::Thread receiver; sys::ssl::SslSocket socket; sys::ssl::SslIO* aio; - boost::shared_ptr<sys::Poller> poller; + Poller::shared_ptr poller; ~SslConnector(); - void run(); void handleClosed(); bool closeInternal(); - + void readbuff(qpid::sys::ssl::SslIO&, qpid::sys::ssl::SslIOBufferBase*); void writebuff(qpid::sys::ssl::SslIO&); void writeDataBlock(const framing::AMQDataBlock& data); @@ -118,7 +116,7 @@ std::string identifier; ConnectionImpl* impl; - + void connect(const std::string& host, int port); void init(); void close(); @@ -132,15 +130,20 @@ const std::string& getIdentifier() const; public: - SslConnector(framing::ProtocolVersion pVersion, + SslConnector(Poller::shared_ptr p, framing::ProtocolVersion pVersion, const ConnectionSettings&, ConnectionImpl*); }; +struct SslConnector::Buff : public SslIO::BufferBase { + Buff(size_t size) : SslIO::BufferBase(new char[size], size) {} + ~Buff() { delete [] bytes;} +}; + // Static constructor which registers connector here namespace { - Connector* create(framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { - return new SslConnector(v, s, c); + Connector* create(Poller::shared_ptr p, framing::ProtocolVersion v, const ConnectionSettings& s, ConnectionImpl* c) { + return new SslConnector(p, v, s, c); } struct StaticInit { @@ -149,9 +152,9 @@ SslOptions options; options.parse (0, 0, QPIDC_CONF_FILE, true); if (options.certDbPath.empty()) { - QPID_LOG(info, "SSL connector not enabled, you must set QPID_SSL_CERT_DB to enable it."); + QPID_LOG(info, "SSL connector not enabled, you must set QPID_SSL_CERT_DB to enable it."); } else { - initNSS(options); + initNSS(options); Connector::registerFactory("ssl", &create); } } catch (const std::exception& e) { @@ -163,7 +166,8 @@ } init; } -SslConnector::SslConnector(ProtocolVersion ver, +SslConnector::SslConnector(Poller::shared_ptr p, + ProtocolVersion ver, const ConnectionSettings& settings, ConnectionImpl* cimpl) : maxFrameSize(settings.maxFrameSize), @@ -174,6 +178,7 @@ shutdownHandler(0), writer(maxFrameSize, cimpl), aio(0), + poller(p), impl(cimpl) { QPID_LOG(debug, "SslConnector created for " << version.toString()); @@ -197,7 +202,6 @@ identifier = str(format("[%1% %2%]") % socket.getLocalPort() % socket.getPeerAddress()); closed = false; - poller = Poller::shared_ptr(new Poller); aio = new SslIO(socket, boost::bind(&SslConnector::readbuff, this, _1, _2), boost::bind(&SslConnector::eof, this, _1), @@ -214,7 +218,10 @@ ProtocolInitiation init(version); writeDataBlock(init); joined = false; - receiver = Thread(this); + for (int i = 0; i < 32; i++) { + aio->queueReadBuffer(new Buff(maxFrameSize)); + } + aio->start(poller); } bool SslConnector::closeInternal() { @@ -223,16 +230,11 @@ if (!closed) { closed = true; aio->queueForDeletion(); - poller->shutdown(); - } - if (!joined && receiver.id() != Thread::current().id()) { - joined = true; - Mutex::ScopedUnlock u(closedLock); - receiver.join(); + socket.close(); } return ret; } - + void SslConnector::close() { closeInternal(); } @@ -266,11 +268,6 @@ shutdownHandler->shutdown(); } -struct SslConnector::Buff : public SslIO::BufferBase { - Buff(size_t size) : SslIO::BufferBase(new char[size], size) {} - ~Buff() { delete [] bytes;} -}; - SslConnector::Writer::Writer(uint16_t s, Bounds* b) : maxFrameSize(s), aio(0), buffer(0), lastEof(0), bounds(b) { } @@ -375,25 +372,4 @@ handleClosed(); } -void SslConnector::run(){ - // Keep the connection impl in memory until run() completes. - boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this(); - assert(protect); - try { - Dispatcher d(poller); - - for (int i = 0; i < 32; i++) { - aio->queueReadBuffer(new Buff(maxFrameSize)); - } - - aio->start(poller); - d.run(); - socket.close(); - } catch (const std::exception& e) { - QPID_LOG(error, e.what()); - handleClosed(); - } -} - - }} // namespace qpid::client --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org