Repository: qpid-jms Updated Branches: refs/heads/master d35be9992 -> 2ffee1fd3
Add some larger TX messaging scenarios to the tests. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/2ffee1fd Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/2ffee1fd Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/2ffee1fd Branch: refs/heads/master Commit: 2ffee1fd3964ba5f4662d17b3629f268fd9a4fb9 Parents: d35be99 Author: Timothy Bish <[email protected]> Authored: Tue May 5 13:07:11 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue May 5 13:07:11 2015 -0400 ---------------------------------------------------------------------- .../transactions/JmsTransactedConsumerTest.java | 82 +++++++++++++++++++- 1 file changed, 81 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/2ffee1fd/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java index e01e4c1..8846566 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/transactions/JmsTransactedConsumerTest.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.List; import javax.jms.DeliveryMode; +import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -35,6 +36,7 @@ import org.apache.activemq.broker.jmx.QueueViewMBean; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.support.AmqpTestSupport; import org.apache.qpid.jms.support.QpidJmsTestSupport; +import org.junit.Ignore; import org.junit.Test; /** @@ -42,7 +44,8 @@ import org.junit.Test; */ public class JmsTransactedConsumerTest extends AmqpTestSupport { - private static final String MSG_NUM = "MSG_NUM"; + private final String MSG_NUM = "MSG_NUM"; + private final int MSG_COUNT = 1000; @Test(timeout = 60000) public void testCreateConsumerFromTxSession() throws Exception { @@ -381,4 +384,81 @@ public class JmsTransactedConsumerTest extends AmqpTestSupport { session.close(); } + + @Ignore("Fails at 500 messages consumed.") + @Test(timeout = 60000) + public void testSingleConsumedMessagePerTxCase() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination queue = session.createQueue(getTestName()); + MessageProducer messageProducer = session.createProducer(queue); + for (int i = 0; i < MSG_COUNT; i++) { + TextMessage message = session.createTextMessage(); + message.setText("test" + i); + messageProducer.send(message, DeliveryMode.PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, javax.jms.Message.DEFAULT_TIME_TO_LIVE); + } + + session.close(); + + QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertEquals(MSG_COUNT, queueView.getQueueSize()); + + int counter = 0; + session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer messageConsumer = session.createConsumer(queue); + do { + TextMessage message = (TextMessage) messageConsumer.receive(1000); + if (message != null) { + counter++; + LOG.info("Message n. {} with content '{}' has been recieved.", counter,message.getText()); + session.commit(); + LOG.info("Transaction has been committed."); + assertEquals(MSG_COUNT - counter, queueView.getQueueSize()); + } + } while (counter < MSG_COUNT); + + assertEquals(0, queueView.getQueueSize()); + + session.close(); + } + + @Test(timeout = 60000) + public void testConsumeAllMessagesInSingleTxCase() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination queue = session.createQueue(getTestName()); + MessageProducer messageProducer = session.createProducer(queue); + for (int i = 0; i < MSG_COUNT; i++) { + TextMessage message = session.createTextMessage(); + message.setText("test" + i); + messageProducer.send(message, DeliveryMode.PERSISTENT, javax.jms.Message.DEFAULT_PRIORITY, javax.jms.Message.DEFAULT_TIME_TO_LIVE); + } + + session.close(); + + QueueViewMBean queueView = getProxyToQueue(getTestName()); + assertEquals(MSG_COUNT, queueView.getQueueSize()); + + int counter = 0; + session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer messageConsumer = session.createConsumer(queue); + do { + TextMessage message = (TextMessage) messageConsumer.receive(1000); + if (message != null) { + counter++; + LOG.info("Message n. {} with content '{}' has been recieved.", counter,message.getText()); + } + } while (counter < MSG_COUNT); + + LOG.info("Transaction has been committed."); + session.commit(); + + assertEquals(0, queueView.getQueueSize()); + + session.close(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
