Author: aconway Date: Thu Dec 22 20:36:10 2011 New Revision: 1222432 URL: http://svn.apache.org/viewvc?rev=1222432&view=rev Log: QPID-3603: Lifecycle and locking fixes for QueueReplicator
Separate bridge de-activation from destruction in QueueReplicator: Only deactivate if destroyed by the WiringReplicator because of a queue delete. If destroyed for any other reason (e.g. broker destruction) don't de-activate the bridge as required resources may not exist. Added missing locks in QueueReplicator functions. Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1222432&r1=1222431&r2=1222432&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Thu Dec 22 20:36:10 2011 @@ -82,14 +82,18 @@ void QueueReplicator::activate() { ); } -QueueReplicator::~QueueReplicator() { +QueueReplicator::~QueueReplicator() {} + +void QueueReplicator::deactivate() { + sys::Mutex::ScopedLock l(lock); queue->getBroker()->getLinks().destroy( link->getHost(), link->getPort(), queue->getName(), getName(), string()); } // Called in a broker connection thread when the bridge is created. -// shared_ptr to self is just to ensure we are still in memory. void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) { + sys::Mutex::ScopedLock l(lock); + framing::AMQP_ServerProxy peer(sessionHandler.out); const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs()); framing::FieldTable settings; Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1222432&r1=1222431&r2=1222432&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/QueueReplicator.h Thu Dec 22 20:36:10 2011 @@ -46,7 +46,7 @@ namespace ha { * Creates a ReplicatingSubscription on the primary by passing special * arguments to the consume command. * - * THREAD UNSAFE: Only called in the connection thread of the source queue. + * THREAD SAFE: Called in different connection threads. */ class QueueReplicator : public broker::Exchange, public boost::enable_shared_from_this<QueueReplicator> @@ -59,10 +59,12 @@ class QueueReplicator : public broker::E QueueReplicator(boost::shared_ptr<broker::Queue> q, boost::shared_ptr<broker::Link> l); ~QueueReplicator(); - void activate(); + void activate(); // Call after ctor + void deactivate(); // Call before dtor std::string getType() const; - bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); + bool bind(boost::shared_ptr<broker::Queue + >, const std::string&, const framing::FieldTable*); bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*); void route(broker::Deliverable&, const std::string&, const framing::FieldTable*); bool isBound(boost::shared_ptr<broker::Queue>, const std::string* const, const framing::FieldTable* const); Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp?rev=1222432&r1=1222431&r2=1222432&view=diff ============================================================================== --- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp (original) +++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/WiringReplicator.cpp Thu Dec 22 20:36:10 2011 @@ -296,6 +296,11 @@ void WiringReplicator::doEventQueueDelet values[USER].asString(), values[RHOST].asString()); // Delete the QueueReplicator exchange for this queue. + boost::shared_ptr<broker::Exchange> ex = + broker.getExchanges().find(QueueReplicator::replicatorName(name)); + boost::shared_ptr<QueueReplicator> qr = + boost::dynamic_pointer_cast<QueueReplicator>(ex); + if (qr) qr->deactivate(); broker.getExchanges().destroy(QueueReplicator::replicatorName(name)); } } --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org