Author: cctrieloff Date: Wed Jul 8 16:10:29 2009 New Revision: 792208 URL: http://svn.apache.org/viewvc?rev=792208&view=rev Log: More tests and complete fix for svn791672 commit -- correct requeue
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp?rev=792208&r1=792207&r2=792208&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.cpp Wed Jul 8 16:10:29 2009 @@ -90,6 +90,16 @@ } } +bool PersistableMessage::isStoredOnQueue(PersistableQueue::shared_ptr queue){ + if (store && (queue->getPersistenceId()!=0)) { + for (syncList::iterator i = synclist.begin(); i != synclist.end(); ++i) { + PersistableQueue::shared_ptr q(i->lock()); + if (q && q->getPersistenceId() == queue->getPersistenceId()) return true; + } + } + return false; +} + void PersistableMessage::enqueueAsync(PersistableQueue::shared_ptr queue, MessageStore* _store) { if (_store){ sys::ScopedLock<sys::Mutex> l(storeLock); Modified: qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h?rev=792208&r1=792207&r2=792208&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/PersistableMessage.h Wed Jul 8 16:10:29 2009 @@ -111,6 +111,8 @@ MessageStore* _store); QPID_BROKER_EXTERN void dequeueAsync(); + + bool isStoredOnQueue(PersistableQueue::shared_ptr queue); }; }} Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp?rev=792208&r1=792207&r2=792208&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.cpp Wed Jul 8 16:10:29 2009 @@ -99,8 +99,7 @@ eventMode(0), eventMgr(0), insertSeqNo(0), - broker(b), - lastForcedPosition(0) + broker(b) { if (parent != 0 && broker != 0) { @@ -211,6 +210,14 @@ msg.payload->enqueueComplete(); // mark the message as enqueued messages.push_front(msg); listeners.populate(copy); + + // for persistLastNode - don't force a message twice to disk, but force it if no force before + if(inLastNodeFailure && persistLastNode && !msg.payload->isStoredOnQueue(shared_from_this())) { + msg.payload->forcePersistent(); + if (msg.payload->isForcedPersistent() ){ + enqueue(0, msg.payload); + } + } } copy.notify(); } @@ -660,7 +667,6 @@ void Queue::clearLastNodeFailure() { inLastNodeFailure = false; - lastForcedPosition = sequence; } void Queue::setLastNodeFailure() @@ -669,19 +675,19 @@ Mutex::ScopedLock locker(messageLock); for ( Messages::iterator i = messages.begin(); i != messages.end(); ++i ) { // don't force a message twice to disk. - if(i->position > lastForcedPosition) { + if(!i->payload->isStoredOnQueue(shared_from_this())) { if (lastValueQueue) checkLvqReplace(*i); i->payload->forcePersistent(); if (i->payload->isForcedPersistent() ){ enqueue(0, i->payload); } - lastForcedPosition = i->position; } } inLastNodeFailure = true; } } + // return true if store exists, bool Queue::enqueue(TransactionContext* ctxt, boost::intrusive_ptr<Message> msg) { Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h?rev=792208&r1=792207&r2=792208&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Queue.h Wed Jul 8 16:10:29 2009 @@ -106,7 +106,6 @@ bool insertSeqNo; std::string seqNoKey; Broker* broker; - framing::SequenceNumber lastForcedPosition; void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false); void setPolicy(std::auto_ptr<QueuePolicy> policy); --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org