Author: gsim Date: Tue Jun 23 10:46:15 2009 New Revision: 787625 URL: http://svn.apache.org/viewvc?rev=787625&view=rev Log: QPID-1936: Fix potential deadlock for durable ring queue
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=787625&r1=787624&r2=787625&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Tue Jun 23 10:46:15 2009 @@ -551,11 +551,16 @@ } void Queue::push(boost::intrusive_ptr<Message>& msg, bool isRecovery){ + Messages dequeues; QueueListeners::NotificationSet copy; { Mutex::ScopedLock locker(messageLock); QueuedMessage qm(this, msg, ++sequence); - if (policy.get()) policy->tryEnqueue(qm); + if (policy.get()) { + policy->tryEnqueue(qm); + //depending on policy, may have some dequeues + if (!isRecovery) pendingDequeues.swap(dequeues); + } if (insertSeqNo) msg->getOrInsertHeaders().setInt64(seqNoKey, sequence); LVQ::iterator i; @@ -591,6 +596,10 @@ } } copy.notify(); + if (!dequeues.empty()) { + //depending on policy, may have some dequeues + for_each(dequeues.begin(), dequeues.end(), boost::bind(&Queue::dequeue, this, (TransactionContext*) 0, _1)); + } } QueuedMessage Queue::getFront() @@ -1026,4 +1035,10 @@ return !policy.get() || policy->isEnqueued(msg); } +void Queue::addPendingDequeue(const QueuedMessage& msg) +{ + //assumes lock is held - true at present but rather nasty as this is a public method + pendingDequeues.push_back(msg); +} + QueueListeners& Queue::getListeners() { return listeners; } Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=787625&r1=787624&r2=787625&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Tue Jun 23 10:46:15 2009 @@ -326,6 +326,18 @@ */ void recoveryComplete(); + /** + * This is a hack to avoid deadlocks in durable ring + * queues. It is used for dequeueing messages in response + * to an enqueue while avoid holding lock over call to + * store. + * + * Assumes messageLock is held - true for curent use case + * (QueuePolicy::tryEnqueue()) but rather nasty as this is a public + * method + **/ + void addPendingDequeue(const QueuedMessage &msg); + // For cluster update QueueListeners& getListeners(); }; Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp?rev=787625&r1=787624&r2=787625&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.cpp Tue Jun 23 10:46:15 2009 @@ -207,13 +207,9 @@ { qpid::sys::Mutex::ScopedLock l(lock); //find and remove m from queue - for (Messages::iterator i = queue.begin(); i != queue.end(); i++) { - if (i->payload == m.payload) { - queue.erase(i); - //now update count and size - QueuePolicy::dequeued(m); - break; - } + if (find(m, pendingDequeues, true) || find(m, queue, true)) { + //now update count and size + QueuePolicy::dequeued(m); } } @@ -223,12 +219,7 @@ //for non-strict ring policy, a message can be replaced (and //therefore dequeued) before it is accepted or released by //subscriber; need to detect this - for (Messages::const_iterator i = queue.begin(); i != queue.end(); i++) { - if (i->payload == m.payload) { - return true; - } - } - return false; + return find(m, pendingDequeues, false) || find(m, queue, false); } bool RingQueuePolicy::checkLimit(const QueuedMessage& m) @@ -248,7 +239,18 @@ oldest = queue.front(); } if (oldest.queue->acquire(oldest) || !strict) { - oldest.queue->dequeue(0, oldest); + { + //TODO: fix this! In the current code, this method is + //only ever called with the Queue lock already taken. This + //should not be relied upon going forward however and + //clearly the locking in this class is insufficient as + //there is no guarantee that the message previously atthe + //front is still there. + qpid::sys::Mutex::ScopedLock l(lock); + queue.pop_front(); + pendingDequeues.push_back(oldest); + } + oldest.queue->addPendingDequeue(oldest); QPID_LOG(debug, "Ring policy triggered in queue " << (m.queue ? m.queue->getName() : std::string("unknown queue")) << ": removed message " << oldest.position << " to make way for " << m.position); @@ -264,6 +266,17 @@ } } +bool RingQueuePolicy::find(const QueuedMessage& m, Messages& q, bool remove) +{ + for (Messages::iterator i = q.begin(); i != q.end(); i++) { + if (i->payload == m.payload) { + if (remove) q.erase(i); + return true; + } + } + return false; +} + std::auto_ptr<QueuePolicy> QueuePolicy::createQueuePolicy(const qpid::framing::FieldTable& settings) { uint32_t maxCount = getInt(settings, maxCountKey, 0); Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h?rev=787625&r1=787624&r2=787625&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/QueuePolicy.h Tue Jun 23 10:46:15 2009 @@ -101,8 +101,11 @@ private: typedef std::deque<QueuedMessage> Messages; qpid::sys::Mutex lock; + Messages pendingDequeues; Messages queue; const bool strict; + + bool find(const QueuedMessage&, Messages&, bool remove); }; }} --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org