Author: rajith Date: Wed Jan 11 02:12:38 2012 New Revision: 1229857 URL: http://svn.apache.org/viewvc?rev=1229857&view=rev Log: QPID-3604 The code now drains individual consumer queues as well as the dispatch queue (via syncDipatchQueue method) and releases both unacked and prefetched messages, while only the former being marked redelivered. Also all of these transfers are being marked as completed to ensure credits don't dry up.
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1229857&r1=1229856&r2=1229857&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Jan 11 02:12:38 2012 @@ -371,7 +371,7 @@ public abstract class AMQSession<C exten * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover(). */ - private volatile boolean _usingDispatcherForCleanup; + protected volatile boolean _usingDispatcherForCleanup; /** Used to indicates that the connection to which this session belongs, has been stopped. */ private boolean _connectionStopped; Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1229857&r1=1229856&r2=1229857&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Wed Jan 11 02:12:38 2012 @@ -795,11 +795,43 @@ public class AMQSession_0_10 extends AMQ { if (suspend) { - for (BasicMessageConsumer consumer : _consumers.values()) - { - getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), - Option.UNRELIABLE); - } + synchronized (getMessageDeliveryLock()) + { + for (BasicMessageConsumer consumer : _consumers.values()) + { + getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), + Option.UNRELIABLE); + sync(); + List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); + _prefetchedMessageTags.addAll(tags); + } + } + + _usingDispatcherForCleanup = true; + syncDispatchQueue(); + _usingDispatcherForCleanup = false; + + RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags); + RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags); + RangeSet all = RangeSetFactory.createRangeSet(delivered.size() + + prefetched.size()); + + for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();) + { + Range range = deliveredIter.next(); + all.add(range); + } + + for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();) + { + Range range = prefetchedIter.next(); + all.add(range); + } + + flushProcessed(all, false); + getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED); + getQpidSession().messageRelease(prefetched); + sync(); } else { Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java?rev=1229857&r1=1229856&r2=1229857&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java (original) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java Wed Jan 11 02:12:38 2012 @@ -5,12 +5,14 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; +import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; +import javax.jms.TextMessage; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -133,4 +135,41 @@ public class PrefetchBehaviourTest exten assertFalse("Unexpecte exception during async message processing",_exceptionCaught.get()); } + /** + * Test Goal: Verify if connection stop releases all messages in it's prefetch buffer. + * Test Strategy: Send 10 messages to a queue. Create a consumer with maxprefetch of 5, but never consume them. + * Stop the connection. Create a new connection and a consumer with maxprefetch 10 on the same queue. + * Try to receive all 10 messages. + */ + public void testConnectionStop() throws Exception + { + setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "10"); + Connection con = getConnection(); + con.start(); + Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination queue = ssn.createQueue("ADDR:my-queue;{create: always}"); + + MessageProducer prod = ssn.createProducer(queue); + for (int i=0; i<10;i++) + { + prod.send(ssn.createTextMessage("Msg" + i)); + } + + MessageConsumer consumer = ssn.createConsumer(queue); + // This is to ensure we get the first client to prefetch. + Message msg = consumer.receive(1000); + assertNotNull("The first consumer should get one message",msg); + con.stop(); + + Connection con2 = getConnection(); + con2.start(); + Session ssn2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer2 = ssn2.createConsumer(queue); + for (int i=0; i<9;i++) + { + TextMessage m = (TextMessage)consumer2.receive(1000); + assertNotNull("The second consumer should get 9 messages, but received only " + i,m); + } + } + } --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org