Author: rajith Date: Wed Jan 11 15:24:44 2012 New Revision: 1230088 URL: http://svn.apache.org/viewvc?rev=1230088&view=rev Log: QPID-3604 Reverting the changes as it releases messages everytime the channel is suspended. This results in several test failures.
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1230088&r1=1230087&r2=1230088&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Wed Jan 11 15:24:44 2012 @@ -371,7 +371,7 @@ public abstract class AMQSession<C exten * Set when the dispatcher should direct incoming messages straight into the UnackedMessage list instead of * to the syncRecieveQueue or MessageListener. Used during cleanup, e.g. in Session.recover(). */ - protected volatile boolean _usingDispatcherForCleanup; + private volatile boolean _usingDispatcherForCleanup; /** Used to indicates that the connection to which this session belongs, has been stopped. */ private boolean _connectionStopped; @@ -3570,3 +3570,4 @@ public abstract class AMQSession<C exten } } + Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1230088&r1=1230087&r2=1230088&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Wed Jan 11 15:24:44 2012 @@ -795,43 +795,11 @@ public class AMQSession_0_10 extends AMQ { if (suspend) { - synchronized (getMessageDeliveryLock()) - { - for (BasicMessageConsumer consumer : _consumers.values()) - { - getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), - Option.UNRELIABLE); - sync(); - List<Long> tags = consumer.drainReceiverQueueAndRetrieveDeliveryTags(); - _prefetchedMessageTags.addAll(tags); - } - } - - _usingDispatcherForCleanup = true; - syncDispatchQueue(); - _usingDispatcherForCleanup = false; - - RangeSet delivered = gatherRangeSet(_unacknowledgedMessageTags); - RangeSet prefetched = gatherRangeSet(_prefetchedMessageTags); - RangeSet all = RangeSetFactory.createRangeSet(delivered.size() - + prefetched.size()); - - for (Iterator<Range> deliveredIter = delivered.iterator(); deliveredIter.hasNext();) - { - Range range = deliveredIter.next(); - all.add(range); - } - - for (Iterator<Range> prefetchedIter = prefetched.iterator(); prefetchedIter.hasNext();) - { - Range range = prefetchedIter.next(); - all.add(range); - } - - flushProcessed(all, false); - getQpidSession().messageRelease(delivered,Option.SET_REDELIVERED); - getQpidSession().messageRelease(prefetched); - sync(); + for (BasicMessageConsumer consumer : _consumers.values()) + { + getQpidSession().messageStop(String.valueOf(consumer.getConsumerTag()), + Option.UNRELIABLE); + } } else { @@ -1387,3 +1355,4 @@ public class AMQSession_0_10 extends AMQ getQpidSession().sync(); } } + Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java?rev=1230088&r1=1230087&r2=1230088&view=diff ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java (original) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/prefetch/PrefetchBehaviourTest.java Wed Jan 11 15:24:44 2012 @@ -5,14 +5,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; -import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; -import javax.jms.TextMessage; import org.apache.qpid.configuration.ClientProperties; import org.apache.qpid.test.utils.QpidBrokerTestCase; @@ -135,41 +133,5 @@ public class PrefetchBehaviourTest exten assertFalse("Unexpecte exception during async message processing",_exceptionCaught.get()); } - /** - * Test Goal: Verify if connection stop releases all messages in it's prefetch buffer. - * Test Strategy: Send 10 messages to a queue. Create a consumer with maxprefetch of 5, but never consume them. - * Stop the connection. Create a new connection and a consumer with maxprefetch 10 on the same queue. - * Try to receive all 10 messages. - */ - public void testConnectionStop() throws Exception - { - setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "10"); - Connection con = getConnection(); - con.start(); - Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE); - Destination queue = ssn.createQueue("ADDR:my-queue;{create: always}"); - - MessageProducer prod = ssn.createProducer(queue); - for (int i=0; i<10;i++) - { - prod.send(ssn.createTextMessage("Msg" + i)); - } - - MessageConsumer consumer = ssn.createConsumer(queue); - // This is to ensure we get the first client to prefetch. - Message msg = consumer.receive(1000); - assertNotNull("The first consumer should get one message",msg); - con.stop(); - - Connection con2 = getConnection(); - con2.start(); - Session ssn2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer2 = ssn2.createConsumer(queue); - for (int i=0; i<9;i++) - { - TextMessage m = (TextMessage)consumer2.receive(1000); - assertNotNull("The second consumer should get 9 messages, but received only " + i,m); - } - } - } + --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org