Author: gsim Date: Thu Oct 8 08:55:44 2009 New Revision: 823094 URL: http://svn.apache.org/viewvc?rev=823094&view=rev Log: QPID-2132: Applied patch from Ken Giusti
Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp?rev=823094&r1=823093&r2=823094&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.cpp Thu Oct 8 08:55:44 2009 @@ -78,7 +78,7 @@ const string ManagementAgentImpl::storeMagicNumber("MA02"); ManagementAgentImpl::ManagementAgentImpl() : - interval(10), extThread(false), + interval(10), extThread(false), pipeHandle(0), initialized(false), connected(false), lastFailure("never connected"), clientWasAdded(true), requestedBrokerBank(0), requestedAgentBank(0), assignedBrokerBank(0), assignedAgentBank(0), bootSequence(0), @@ -89,13 +89,11 @@ ManagementAgentImpl::~ManagementAgentImpl() { + // shutdown the connection thread connThreadBody.close(); + connThread.join(); - // If the thread is doing work on the connection, we must wait for it to - // complete before shutting down. - if (!connThreadBody.isSleeping()) { - connThread.join(); - } + // @todo need to shutdown pubThread? // Release the memory associated with stored management objects. { @@ -777,6 +775,7 @@ static const int delayFactor(2); int delay(delayMin); string dest("qmfagent"); + ConnectionThread::shared_ptr tmp; sessionId.generate(); queueName << "qmfagent-" << sessionId; @@ -787,7 +786,7 @@ QPID_LOG(debug, "QMF Agent attempting to connect to the broker..."); connection.open(agent.connectionSettings); session = connection.newSession(queueName.str()); - subscriptions = new client::SubscriptionManager(session); + subscriptions.reset(new client::SubscriptionManager(session)); session.queueDeclare(arg::queue=queueName.str(), arg::autoDelete=true, arg::exclusive=true); @@ -811,11 +810,12 @@ operational = false; agent.connected = false; + tmp = subscriptions; + subscriptions.reset(); } + tmp.reset(); // frees the subscription outside the lock delay = delayMin; connection.close(); - delete subscriptions; - subscriptions = 0; } } catch (exception &e) { if (delay < delayMax) @@ -824,14 +824,19 @@ } { + // sleep for "delay" seconds, but peridically check if the + // agent is shutting down so we don't hang for up to delayMax + // seconds during agent shutdown Mutex::ScopedLock _lock(connLock); if (shutdown) return; sleeping = true; - { - Mutex::ScopedUnlock _unlock(connLock); - ::sleep(delay); - } + int totalSleep = 0; + do { + Mutex::ScopedUnlock _unlock(connLock); + ::sleep(delayMin); + totalSleep += delayMin; + } while (totalSleep < delay && !shutdown); sleeping = false; if (shutdown) return; @@ -848,10 +853,12 @@ const string& exchange, const string& routingKey) { + ConnectionThread::shared_ptr s; { Mutex::ScopedLock _lock(connLock); if (!operational) return; + s = subscriptions; } Message msg; @@ -866,8 +873,8 @@ } catch(exception& e) { QPID_LOG(error, "Exception caught in sendBuffer: " << e.what()); // Bounce the connection - if (subscriptions) - subscriptions->stop(); + if (s) + s->stop(); } } @@ -881,12 +888,14 @@ void ManagementAgentImpl::ConnectionThread::close() { + ConnectionThread::shared_ptr s; { Mutex::ScopedLock _lock(connLock); shutdown = true; + s = subscriptions; } - if (subscriptions) - subscriptions->stop(); + if (s) + s->stop(); } bool ManagementAgentImpl::ConnectionThread::isSleeping() const Modified: qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h?rev=823094&r1=823093&r2=823094&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/agent/ManagementAgentImpl.h Thu Oct 8 08:55:44 2009 @@ -163,12 +163,14 @@ friend class ConnectionThread; class ConnectionThread : public sys::Runnable { + typedef boost::shared_ptr<client::SubscriptionManager> shared_ptr; + bool operational; ManagementAgentImpl& agent; framing::Uuid sessionId; client::Connection connection; client::Session session; - client::SubscriptionManager* subscriptions; + ConnectionThread::shared_ptr subscriptions; std::stringstream queueName; mutable sys::Mutex connLock; bool shutdown; @@ -176,7 +178,7 @@ void run(); public: ConnectionThread(ManagementAgentImpl& _agent) : - operational(false), agent(_agent), subscriptions(0), + operational(false), agent(_agent), shutdown(false), sleeping(false) {} ~ConnectionThread(); void sendBuffer(qpid::framing::Buffer& buf, --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org