Author: ritchiem Date: Tue May 11 19:51:32 2010 New Revision: 943249 URL: http://svn.apache.org/viewvc?rev=943249&view=rev Log: QPID-2596 : Updated QEI to restoreCredit when aquired messages are released. Updated CommitRollbackTest.
Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Modified: qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=943249&r1=943248&r2=943249&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original) +++ qpid/branches/0.5.x-dev/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Tue May 11 19:51:32 2010 @@ -178,6 +178,11 @@ public class QueueEntryImpl implements Q public void release() { + Subscription subscription = getDeliveredSubscription(); + if (subscription != null) + { + subscription.restoreCredit(this); + } _stateUpdater.set(this,AVAILABLE_STATE); } Modified: qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java URL: http://svn.apache.org/viewvc/qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?rev=943249&r1=943248&r2=943249&view=diff ============================================================================== --- qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original) +++ qpid/branches/0.5.x-dev/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Tue May 11 19:51:32 2010 @@ -20,6 +20,7 @@ */ package org.apache.qpid.test.unit.transacted; +import org.apache.qpid.client.AMQSession; import org.apache.qpid.test.utils.QpidTestCase; import org.apache.qpid.client.AMQConnection; import org.slf4j.Logger; @@ -63,12 +64,12 @@ public class CommitRollbackTest extends { conn = (AMQConnection) getConnection("guest", "guest"); - _session = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE); + _session = conn.createSession(true, Session.SESSION_TRANSACTED); _jmsQueue = _session.createQueue(queue); _consumer = _session.createConsumer(_jmsQueue); - _pubSession = conn.createSession(true, Session.CLIENT_ACKNOWLEDGE); + _pubSession = conn.createSession(true, Session.SESSION_TRANSACTED); _publisher = _pubSession.createProducer(_pubSession.createQueue(queue)); @@ -500,6 +501,86 @@ public class CommitRollbackTest extends } /** + * QPID-2596 + * Test that rollback works. + * + * Goal is to ensure that message credit is correctly restored. + * Previously rollback would result in message release() on the Java broker + * which would leak credit. + * + * Here we set a small pre-fetch and so small credit window and then consume + * a large number of messages. + * + * By filling the pre-fetch before rolling back we ensure that all the + * credit is used and so if we do not get any back the test will fail + * consistently. + * + * Using a large pre-fetch we can guarantee that we have filled the pre-fetch + * before performing rollback. + * + * Test outline. + * + * - Connect two transacted sessions with a small pre-fetch. + * - Send a large amount of messages on one session + * - Use second session to receive pre-fetch worth of messages + * - Rollback receiver session + * - Continue to consume all the messages on the receiver session, + * committing every batch size. + * - Fail if we can't get the message. + * - End by checking all msgs are consumed + * + * @throws Exception uf some thing unexpected occured + */ + public void testReceiveThenRollbackConsumerThenReceive() throws Exception + { + // Close connection so we can reset the pre-fetch + conn.close(); + + int MAX_PREFETCH=5; + int BACK_LOG_FACTOR=200; + + setSystemProperty("max_prefetch", String.valueOf(MAX_PREFETCH)); + + // Reconnect + newConnection(); + + assertEquals("Prefetch not reset", + MAX_PREFETCH,((AMQSession)_session).getDefaultPrefetch()); + + assertTrue("session is not transacted", _session.getTransacted()); + assertTrue("session is not transacted", _pubSession.getTransacted()); + + _logger.info("Sending (" + MAX_PREFETCH * BACK_LOG_FACTOR + ")messages"); + sendMessage(_pubSession, _publisher.getDestination(), MAX_PREFETCH * BACK_LOG_FACTOR); + _pubSession.commit(); + + for (int i=0 ;i< MAX_PREFETCH; i++) + { + assertNotNull("Received:" + i, _consumer.receive(1000)); + _logger.info("Received:"+i); + } + + + _logger.info("Rolling back"); + _session.rollback(); + + _logger.info("Receiving messages"); + + for (int b = 0; b < BACK_LOG_FACTOR; b++) + { + for (int a = 0; a < MAX_PREFETCH; a++) + { + assertNotNull("Received (" + b + ")after rollback:" + a, _consumer.receive(1000)); + } + _session.commit(); + } + + Message result = _consumer.receive(500); + assertNull("test message was put and rolled back, but is still present", result); + } + + + /** * Qpid-1163 * Check that when commt is called inside onMessage then * the last message is nor redelivered. --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org