Author: rajith
Date: Wed Jan 11 02:12:38 2012
New Revision: 1229857

URL: http://svn.apache.org/viewvc?rev=1229857&view=rev
Log:
QPID-3604 The code now drains individual consumer queues as well as the
dispatch queue (via syncDipatchQueue method) and releases both unacked
and prefetched messages, while only the former being marked redelivered.
Also all of these transfers are being marked as completed to ensure
credits don't dry up.

Modified:
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1229857&r1=1229856&r2=1229857&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Wed Jan 11 02:12:38 2012
@@ -371,7 +371,7 @@ public abstract class AMQSession<C exten
      * Set when the dispatcher should direct incoming messages straight into 
the UnackedMessage list instead of
      * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. 
in Session.recover().
      */
-    private volatile boolean _usingDispatcherForCleanup;
+    protected volatile boolean _usingDispatcherForCleanup;
 
     /** Used to indicates that the connection to which this session belongs, 
has been stopped. */
     private boolean _connectionStopped;

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1229857&r1=1229856&r2=1229857&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Wed Jan 11 02:12:38 2012
@@ -795,11 +795,43 @@ public class AMQSession_0_10 extends AMQ
     {
         if (suspend)
         {
-            for (BasicMessageConsumer consumer : _consumers.values())
-            {
-                
getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
-                                             Option.UNRELIABLE);
-            }
+                synchronized (getMessageDeliveryLock())
+                {
+                    for (BasicMessageConsumer consumer : _consumers.values())
+                   {
+                       
getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
+                                                    Option.UNRELIABLE);
+                       sync();
+                       List<Long> tags = 
consumer.drainReceiverQueueAndRetrieveDeliveryTags();
+                       _prefetchedMessageTags.addAll(tags);
+                   }
+                }
+
+                _usingDispatcherForCleanup = true;
+                syncDispatchQueue();
+                _usingDispatcherForCleanup = false;
+
+                RangeSet delivered = 
gatherRangeSet(_unacknowledgedMessageTags);
+               RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags);
+               RangeSet all = RangeSetFactory.createRangeSet(delivered.size()
+                                       + prefetched.size());
+
+               for (Iterator<Range> deliveredIter = delivered.iterator(); 
deliveredIter.hasNext();)
+               {
+                       Range range = deliveredIter.next();
+                       all.add(range);
+               }
+
+               for (Iterator<Range> prefetchedIter = prefetched.iterator(); 
prefetchedIter.hasNext();)
+               {
+                       Range range = prefetchedIter.next();
+                       all.add(range);
+               }
+
+               flushProcessed(all, false);
+               
getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED);
+               getQpidSession().messageRelease(prefetched);
+               sync();
         }
         else
         {

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java?rev=1229857&r1=1229856&r2=1229857&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
 Wed Jan 11 02:12:38 2012
@@ -5,12 +5,14 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.Connection;
+import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -133,4 +135,41 @@ public class PrefetchBehaviourTest exten
         assertFalse("Unexpecte exception during async message 
processing",_exceptionCaught.get());
     }
 
+    /**
+     * Test Goal: Verify if connection stop releases all messages in it's 
prefetch buffer.
+     * Test Strategy: Send 10 messages to a queue. Create a consumer with 
maxprefetch of 5, but never consume them.
+     *                Stop the connection. Create a new connection and a 
consumer with maxprefetch 10 on the same queue.
+     *                Try to receive all 10 messages.
+     */
+    public void testConnectionStop() throws Exception
+    {
+        setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, 
"10");
+        Connection con = getConnection();
+        con.start();
+        Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination queue = ssn.createQueue("ADDR:my-queue;{create: always}");
+
+        MessageProducer prod = ssn.createProducer(queue);
+        for (int i=0; i<10;i++)
+        {
+           prod.send(ssn.createTextMessage("Msg" + i));
+        }
+
+        MessageConsumer consumer = ssn.createConsumer(queue);
+        // This is to ensure we get the first client to prefetch.
+        Message msg = consumer.receive(1000);
+        assertNotNull("The first consumer should get one message",msg);
+        con.stop();
+
+        Connection con2 = getConnection();
+        con2.start();
+        Session ssn2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer2 = ssn2.createConsumer(queue);
+        for (int i=0; i<9;i++)
+        {
+           TextMessage m = (TextMessage)consumer2.receive(1000);
+           assertNotNull("The second consumer should get 9 messages, but 
received only " + i,m);
+        }
+    }
+
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to