Author: kwall
Date: Wed Nov 30 13:58:14 2011
New Revision: 1208435

URL: http://svn.apache.org/viewvc?rev=1208435&view=rev
Log:
QPID-3642: Fix for redelivery regression found by python tests

Applied patch from Andrew MacBean <andymacb...@gmail.com> and Oleksandr 
Rudyy<oru...@gmail.com>

Added:
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/redelivered/
    
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/redelivered/RedeliveredMessageTest.java
Modified:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
    
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1208435&r1=1208434&r2=1208435&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
 Wed Nov 30 13:58:14 2011
@@ -643,27 +643,26 @@ public class Subscription_0_10 implement
                             });
     }
 
-    void reject(QueueEntry entry)
+    void reject(final QueueEntry entry)
     {
         entry.setRedelivered();
         entry.routeToAlternate();
 
     }
 
-    void release(QueueEntry entry, boolean setRedelivered)
+    void release(final QueueEntry entry, final boolean setRedelivered)
     {
-        boolean maxDeliveryLimitExceeded = false;
         if (setRedelivered)
         {
             entry.setRedelivered();
-            maxDeliveryLimitExceeded = isMaxDeliveryLimitExceeded(entry);
         }
-        else
+
+        if (getSession().isClosing() || !setRedelivered)
         {
             entry.decrementDeliveryCount();
         }
 
-        if (maxDeliveryLimitExceeded)
+        if (isMaxDeliveryLimitReached(entry))
         {
             sendToDLQOrDiscard(entry);
         }
@@ -708,7 +707,7 @@ public class Subscription_0_10 implement
         }
     }
 
-    private boolean isMaxDeliveryLimitExceeded(QueueEntry entry)
+    private boolean isMaxDeliveryLimitReached(QueueEntry entry)
     {
         final int maxDeliveryLimit = 
entry.getQueue().getMaximumDeliveryCount();
         return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= 
maxDeliveryLimit);

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1208435&r1=1208434&r2=1208435&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
 Wed Nov 30 13:58:14 2011
@@ -350,7 +350,7 @@ public class ServerSession extends Sessi
         _transaction.rollback();
         for(MessageDispositionChangeListener listener : 
_messageDispositionListenerMap.values())
         {
-            listener.onRelease(false);
+            listener.onRelease(true);
         }
         _messageDispositionListenerMap.clear();
 

Modified: 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1208435&r1=1208434&r2=1208435&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 (original)
+++ 
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
 Wed Nov 30 13:58:14 2011
@@ -405,10 +405,6 @@ public class AMQSession_0_10 extends AMQ
      */
     public void sendClose(long timeout) throws AMQException, FailoverException
     {
-        if (getTransacted())
-        {
-            releaseForRollback();
-        }
         if (flushTask != null)
         {
             flushTask.cancel();

Added: 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/redelivered/RedeliveredMessageTest.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/redelivered/RedeliveredMessageTest.java?rev=1208435&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/redelivered/RedeliveredMessageTest.java
 (added)
+++ 
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/redelivered/RedeliveredMessageTest.java
 Wed Nov 30 13:58:14 2011
@@ -0,0 +1,50 @@
+package org.apache.qpid.client.redelivered;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+
+public class RedeliveredMessageTest extends QpidBrokerTestCase
+{
+    private Connection _connection;
+
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _connection = getConnection();
+    }
+
+    public void testRedeliveredFlagOnSessionClose() throws Exception
+    {
+        Session session = _connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+        Destination destination = session.createQueue(getTestQueueName());
+        MessageConsumer consumer = session.createConsumer(destination);
+
+        final int numberOfMessages = 3;
+        sendMessage(session, destination, numberOfMessages);
+
+        _connection.start();
+
+        for(int i = 0; i < numberOfMessages; i++)
+        {
+            final Message m = consumer.receive(1000l);
+            assertNotNull("Message is not recieved at " + i, m);
+            assertFalse("Redelivered should be not set", 
m.getJMSRedelivered());
+        }
+
+        session.close();
+        session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        destination = session.createQueue(getTestQueueName());
+        consumer = session.createConsumer(destination);
+
+        for(int i = 0; i < numberOfMessages; i++)
+        {
+            final Message m = consumer.receive(1000l);
+            assertNotNull("Message is not recieved at " + i, m);
+            assertTrue("Redelivered should be set", m.getJMSRedelivered());
+        }
+    }
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to