Author: kwall Date: Wed Nov 30 13:58:14 2011 New Revision: 1208435 URL: http://svn.apache.org/viewvc?rev=1208435&view=rev Log: QPID-3642: Fix for redelivery regression found by python tests
Applied patch from Andrew MacBean <andymacb...@gmail.com> and Oleksandr Rudyy<oru...@gmail.com> Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/redelivered/ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/redelivered/RedeliveredMessageTest.java Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1208435&r1=1208434&r2=1208435&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Wed Nov 30 13:58:14 2011 @@ -643,27 +643,26 @@ public class Subscription_0_10 implement }); } - void reject(QueueEntry entry) + void reject(final QueueEntry entry) { entry.setRedelivered(); entry.routeToAlternate(); } - void release(QueueEntry entry, boolean setRedelivered) + void release(final QueueEntry entry, final boolean setRedelivered) { - boolean maxDeliveryLimitExceeded = false; if (setRedelivered) { entry.setRedelivered(); - maxDeliveryLimitExceeded = isMaxDeliveryLimitExceeded(entry); } - else + + if (getSession().isClosing() || !setRedelivered) { entry.decrementDeliveryCount(); } - if (maxDeliveryLimitExceeded) + if (isMaxDeliveryLimitReached(entry)) { sendToDLQOrDiscard(entry); } @@ -708,7 +707,7 @@ public class Subscription_0_10 implement } } - private boolean isMaxDeliveryLimitExceeded(QueueEntry entry) + private boolean isMaxDeliveryLimitReached(QueueEntry entry) { final int maxDeliveryLimit = entry.getQueue().getMaximumDeliveryCount(); return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit); Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1208435&r1=1208434&r2=1208435&view=diff ============================================================================== --- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original) +++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Wed Nov 30 13:58:14 2011 @@ -350,7 +350,7 @@ public class ServerSession extends Sessi _transaction.rollback(); for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values()) { - listener.onRelease(false); + listener.onRelease(true); } _messageDispositionListenerMap.clear(); Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=1208435&r1=1208434&r2=1208435&view=diff ============================================================================== --- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original) +++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Wed Nov 30 13:58:14 2011 @@ -405,10 +405,6 @@ public class AMQSession_0_10 extends AMQ */ public void sendClose(long timeout) throws AMQException, FailoverException { - if (getTransacted()) - { - releaseForRollback(); - } if (flushTask != null) { flushTask.cancel(); Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/redelivered/RedeliveredMessageTest.java URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/redelivered/RedeliveredMessageTest.java?rev=1208435&view=auto ============================================================================== --- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/redelivered/RedeliveredMessageTest.java (added) +++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/redelivered/RedeliveredMessageTest.java Wed Nov 30 13:58:14 2011 @@ -0,0 +1,50 @@ +package org.apache.qpid.client.redelivered; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import org.apache.qpid.test.utils.QpidBrokerTestCase; + +public class RedeliveredMessageTest extends QpidBrokerTestCase +{ + private Connection _connection; + + public void setUp() throws Exception + { + super.setUp(); + _connection = getConnection(); + } + + public void testRedeliveredFlagOnSessionClose() throws Exception + { + Session session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Destination destination = session.createQueue(getTestQueueName()); + MessageConsumer consumer = session.createConsumer(destination); + + final int numberOfMessages = 3; + sendMessage(session, destination, numberOfMessages); + + _connection.start(); + + for(int i = 0; i < numberOfMessages; i++) + { + final Message m = consumer.receive(1000l); + assertNotNull("Message is not recieved at " + i, m); + assertFalse("Redelivered should be not set", m.getJMSRedelivered()); + } + + session.close(); + session = _connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + destination = session.createQueue(getTestQueueName()); + consumer = session.createConsumer(destination); + + for(int i = 0; i < numberOfMessages; i++) + { + final Message m = consumer.receive(1000l); + assertNotNull("Message is not recieved at " + i, m); + assertTrue("Redelivered should be set", m.getJMSRedelivered()); + } + } +} --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org