Author: ritchiem Date: Wed Oct 28 15:31:04 2009 New Revision: 830591 URL: http://svn.apache.org/viewvc?rev=830591&view=rev Log: r817742 (the fix for QPID-2102) did not cover the case for 2pc transactions recovered in the prepared state; this fixes that case.
Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Queue.cpp qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Queue.h qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Queue.cpp URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Queue.cpp?rev=830591&r1=830590&r2=830591&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Queue.cpp (original) +++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Queue.cpp Wed Oct 28 15:31:04 2009 @@ -179,6 +179,10 @@ } } +void Queue::recoverPrepared(boost::intrusive_ptr<Message>& msg) +{ + if (policy.get()) policy->recoverEnqueued(msg); +} void Queue::recover(boost::intrusive_ptr<Message>& msg){ if (policy.get()) policy->recoverEnqueued(msg); Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Queue.h URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Queue.h?rev=830591&r1=830590&r2=830591&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Queue.h (original) +++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/Queue.h Wed Oct 28 15:31:04 2009 @@ -336,6 +336,12 @@ // For cluster update QueueListeners& getListeners(); + + /** + * Reserve space in policy for an enqueued message that + * has been recovered in the prepared state (dtx only) + */ + void recoverPrepared(boost::intrusive_ptr<Message>& msg); }; } } Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp?rev=830591&r1=830590&r2=830591&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp (original) +++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/RecoveredDequeue.cpp Wed Oct 28 15:31:04 2009 @@ -23,17 +23,24 @@ using boost::intrusive_ptr; using namespace qpid::broker; -RecoveredDequeue::RecoveredDequeue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg) {} +RecoveredDequeue::RecoveredDequeue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg) +{ + queue->recoverPrepared(msg); +} -bool RecoveredDequeue::prepare(TransactionContext*) throw(){ +bool RecoveredDequeue::prepare(TransactionContext*) throw() +{ //should never be called; transaction has already prepared if an enqueue is recovered return false; } -void RecoveredDequeue::commit() throw(){ +void RecoveredDequeue::commit() throw() +{ + queue->enqueueAborted(msg); } -void RecoveredDequeue::rollback() throw(){ +void RecoveredDequeue::rollback() throw() +{ msg->enqueueComplete(); queue->process(msg); } Modified: qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp?rev=830591&r1=830590&r2=830591&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp (original) +++ qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/broker/RecoveredEnqueue.cpp Wed Oct 28 15:31:04 2009 @@ -23,7 +23,10 @@ using boost::intrusive_ptr; using namespace qpid::broker; -RecoveredEnqueue::RecoveredEnqueue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg) {} +RecoveredEnqueue::RecoveredEnqueue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg) +{ + queue->recoverPrepared(msg); +} bool RecoveredEnqueue::prepare(TransactionContext*) throw(){ //should never be called; transaction has already prepared if an enqueue is recovered @@ -36,5 +39,6 @@ } void RecoveredEnqueue::rollback() throw(){ + queue->enqueueAborted(msg); } --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org