Repository: qpid-jms Updated Branches: refs/heads/master b7e7ecdd5 -> b7f2b9ec1
add test to expose broker issue when committing transacted message acks out of sequence with their original dispatch Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/b7f2b9ec Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/b7f2b9ec Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/b7f2b9ec Branch: refs/heads/master Commit: b7f2b9ec1dcf92225e70ca0f958e457c55e0ae90 Parents: b7e7ecd Author: Robert Gemmell <rob...@apache.org> Authored: Mon Dec 1 16:48:53 2014 +0000 Committer: Robert Gemmell <rob...@apache.org> Committed: Mon Dec 1 16:48:53 2014 +0000 ---------------------------------------------------------------------- .../transactions/JmsTransactedConsumerTest.java | 67 ++++++++++++++++++++ 1 file changed, 67 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/b7f2b9ec/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java index 468997d..4d06a87 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTrue; import java.util.ArrayList; import java.util.List; +import javax.jms.DeliveryMode; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -41,6 +42,8 @@ import org.junit.Test; */ public class JmsTransactedConsumerTest extends AmqpTestSupport { + private static final String MSG_NUM = "MSG_NUM"; + @Test(timeout = 60000) public void testCreateConsumerFromTxSession() throws Exception { connection = createAmqpConnection(); @@ -283,4 +286,68 @@ public class JmsTransactedConsumerTest extends AmqpTestSupport { assertEquals(3, jmsxDeliveryCount); session.commit(); } + + @Ignore //TODO: enable after fixing rollback issue on broker + @Test(timeout=30000) + public void testSessionTransactedCommitWithPriorityReordering() throws Exception { + connection = createAmqpConnection(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getDestinationName()); + + connection.start(); + + MessageProducer pr = session.createProducer(queue); + for (int i = 1; i <= 2; i++) { + Message m = session.createTextMessage("TestMessage" + i); + m.setIntProperty(MSG_NUM, i); + pr.send(m, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE); + } + session.commit(); + + // Receive first message. + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(5000); + assertNotNull(msg); + assertEquals(1, msg.getIntProperty(MSG_NUM)); + assertEquals(Message.DEFAULT_PRIORITY, msg.getJMSPriority()); + + // Send a couple higher priority, expect them to 'overtake' upon arrival at the consumer. + for (int i = 3; i <= 4; i++) { + Message m = session.createTextMessage("TestMessage" + i); + m.setIntProperty(MSG_NUM, i); + pr.send(m, DeliveryMode.NON_PERSISTENT, 5 , Message.DEFAULT_TIME_TO_LIVE); + } + session.commit(); + + // Wait for them to arrive at the consumer + Thread.sleep(3000); + + // Receive the other messages. Expect higher priority messages first. + msg = consumer.receive(50); + assertNotNull(msg); + assertEquals(3, msg.getIntProperty(MSG_NUM)); + assertEquals(5, msg.getJMSPriority()); + + msg = consumer.receive(50); + assertNotNull(msg); + assertEquals(4, msg.getIntProperty(MSG_NUM)); + assertEquals(5, msg.getJMSPriority()); + + msg = consumer.receive(50); + assertNotNull(msg); + assertEquals(2, msg.getIntProperty(MSG_NUM)); + assertEquals(Message.DEFAULT_PRIORITY, msg.getJMSPriority()); + + session.commit(); + + // Send a couple messages to check the session still works. + for (int i = 5; i <= 6; i++) { + Message m = session.createTextMessage("TestMessage" + i); + m.setIntProperty(MSG_NUM, i); + pr.send(m, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY , Message.DEFAULT_TIME_TO_LIVE); + } + session.commit(); + + session.close(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org