Author: rajith
Date: Wed Jan 11 15:24:44 2012
New Revision: 1230088

URL: http://svn.apache.org/viewvc?rev=1230088&view=rev
Log:
QPID-3604 Reverting the changes as it releases messages everytime the
channel is suspended. This results in several test failures.

Modified:
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1230088&r1=1230087&r2=1230088&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
 Wed Jan 11 15:24:44 2012
@@ -371,7 +371,7 @@ public abstract class AMQSession<C exten
      * Set when the dispatcher should direct incoming messages straight into 
the UnackedMessage list instead of
      * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. 
in Session.recover().
      */
-    protected volatile boolean _usingDispatcherForCleanup;
+    private volatile boolean _usingDispatcherForCleanup;
 
     /** Used to indicates that the connection to which this session belongs, 
has been stopped. */
     private boolean _connectionStopped;
@@ -3570,3 +3570,4 @@ public abstract class AMQSession<C exten
     }
 
 }
+

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1230088&r1=1230087&r2=1230088&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Wed Jan 11 15:24:44 2012
@@ -795,43 +795,11 @@ public class AMQSession_0_10 extends AMQ
     {
         if (suspend)
         {
-                synchronized (getMessageDeliveryLock())
-                {
-                    for (BasicMessageConsumer consumer : _consumers.values())
-                   {
-                       
getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
-                                                    Option.UNRELIABLE);
-                       sync();
-                       List<Long> tags = 
consumer.drainReceiverQueueAndRetrieveDeliveryTags();
-                       _prefetchedMessageTags.addAll(tags);
-                   }
-                }
-
-                _usingDispatcherForCleanup = true;
-                syncDispatchQueue();
-                _usingDispatcherForCleanup = false;
-
-                RangeSet delivered = 
gatherRangeSet(_unacknowledgedMessageTags);
-               RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags);
-               RangeSet all = RangeSetFactory.createRangeSet(delivered.size()
-                                       + prefetched.size());
-
-               for (Iterator<Range> deliveredIter = delivered.iterator(); 
deliveredIter.hasNext();)
-               {
-                       Range range = deliveredIter.next();
-                       all.add(range);
-               }
-
-               for (Iterator<Range> prefetchedIter = prefetched.iterator(); 
prefetchedIter.hasNext();)
-               {
-                       Range range = prefetchedIter.next();
-                       all.add(range);
-               }
-
-               flushProcessed(all, false);
-               
getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED);
-               getQpidSession().messageRelease(prefetched);
-               sync();
+            for (BasicMessageConsumer consumer : _consumers.values())
+            {
+                
getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()),
+                                             Option.UNRELIABLE);
+            }
         }
         else
         {
@@ -1387,3 +1355,4 @@ public class AMQSession_0_10 extends AMQ
         getQpidSession().sync();
     }
 }
+

Modified: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java?rev=1230088&r1=1230087&r2=1230088&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
 (original)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java
 Wed Jan 11 15:24:44 2012
@@ -5,14 +5,12 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.Connection;
-import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
-import javax.jms.TextMessage;
 
 import org.apache.qpid.configuration.ClientProperties;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
@@ -135,41 +133,5 @@ public class PrefetchBehaviourTest exten
         assertFalse("Unexpecte exception during async message 
processing",_exceptionCaught.get());
     }
 
-    /**
-     * Test Goal: Verify if connection stop releases all messages in it's 
prefetch buffer.
-     * Test Strategy: Send 10 messages to a queue. Create a consumer with 
maxprefetch of 5, but never consume them.
-     *                Stop the connection. Create a new connection and a 
consumer with maxprefetch 10 on the same queue.
-     *                Try to receive all 10 messages.
-     */
-    public void testConnectionStop() throws Exception
-    {
-        setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, 
"10");
-        Connection con = getConnection();
-        con.start();
-        Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Destination queue = ssn.createQueue("ADDR:my-queue;{create: always}");
-
-        MessageProducer prod = ssn.createProducer(queue);
-        for (int i=0; i<10;i++)
-        {
-           prod.send(ssn.createTextMessage("Msg" + i));
-        }
-
-        MessageConsumer consumer = ssn.createConsumer(queue);
-        // This is to ensure we get the first client to prefetch.
-        Message msg = consumer.receive(1000);
-        assertNotNull("The first consumer should get one message",msg);
-        con.stop();
-
-        Connection con2 = getConnection();
-        con2.start();
-        Session ssn2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        MessageConsumer consumer2 = ssn2.createConsumer(queue);
-        for (int i=0; i<9;i++)
-        {
-           TextMessage m = (TextMessage)consumer2.receive(1000);
-           assertNotNull("The second consumer should get 9 messages, but 
received only " + i,m);
-        }
-    }
-
 }
+



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to