Repository: activemq Updated Branches: refs/heads/master 9becfc0be -> 5ee9a3426
https://issues.apache.org/jira/browse/AMQ-5791 - apply patch from VladimÃr Äaniga with thanks Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5ee9a342 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5ee9a342 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5ee9a342 Branch: refs/heads/master Commit: 5ee9a3426f2837d88ed5b315c4543da25fd1c9db Parents: 9becfc0 Author: gtully <gary.tu...@gmail.com> Authored: Thu May 21 15:53:40 2015 +0100 Committer: gtully <gary.tu...@gmail.com> Committed: Thu May 21 15:53:40 2015 +0100 ---------------------------------------------------------------------- ...VirtualTopicSelectorAwareForwardingTest.java | 85 ++++++++++++++------ 1 file changed, 61 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/5ee9a342/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java index 63fdd5a..d1be900 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java @@ -107,7 +107,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends } - public void testMessageLeaks() throws Exception{ + public void testMessageLeaks() throws Exception { clearSelectorCacheFiles(); startAllBrokers(); @@ -207,7 +207,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends } - private ProducerThreadTester createProducerTester(String brokerName, javax.jms.Destination destination) throws Exception{ + private ProducerThreadTester createProducerTester(String brokerName, javax.jms.Destination destination) throws Exception { BrokerItem brokerItem = brokers.get(brokerName); Connection conn = brokerItem.createConnection(); @@ -218,7 +218,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends return rc; } - public void testSelectorConsumptionWithNoMatchAtHeadOfQueue() throws Exception{ + public void testSelectorConsumptionWithNoMatchAtHeadOfQueue() throws Exception { clearSelectorCacheFiles(); startAllBrokers(); @@ -251,6 +251,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends assertEquals(1, selectingConsumerMessages.getMessageCount()); // assert broker A stats + waitForMessagesToBeConsumed(brokerA, "Consumer.B.VirtualTopic.tempTopic", false, 2, 1, 5000); assertEquals(1, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) .getConsumers().size()); assertEquals(2, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) @@ -262,7 +263,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends } - private MessageConsumer establishConsumer(String broker, ActiveMQDestination consumerQueue) throws Exception{ + private MessageConsumer establishConsumer(String broker, ActiveMQDestination consumerQueue) throws Exception { BrokerItem item = brokers.get(broker); Connection c = item.createConnection(); c.start(); @@ -270,7 +271,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends return s.createConsumer(consumerQueue); } - public void testSelectorsAndNonSelectors() throws Exception{ + public void testSelectorsAndNonSelectors() throws Exception { clearSelectorCacheFiles(); // borkerA is local and brokerB is remote bridgeAndConfigureBrokers("BrokerA", "BrokerB"); @@ -320,6 +321,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends assertEquals(15, nonSelectingConsumerMessages.getMessageCount()); // assert broker A stats + waitForMessagesToBeConsumed(brokerA, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000); assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) .getDestinationStatistics().getEnqueues().getCount()); assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) @@ -328,6 +330,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends .getDestinationStatistics().getMessages().getCount()); // assert broker B stats + waitForMessagesToBeConsumed(brokerB, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000); assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) .getDestinationStatistics().getEnqueues().getCount()); assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) @@ -357,9 +360,10 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends selectingConsumerMessages = getConsumerMessages("BrokerB", selectingConsumer); selectingConsumerMessages.waitForMessagesToArrive(1, 1000L); - assertEquals(0, selectingConsumerMessages.getMessageCount()) ; + assertEquals(0, selectingConsumerMessages.getMessageCount()); // assert broker A stats + waitForMessagesToBeConsumed(brokerA, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000); assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) .getDestinationStatistics().getEnqueues().getCount()); assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) @@ -368,6 +372,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends .getDestinationStatistics().getMessages().getCount()); // assert broker B stats + waitForMessagesToBeConsumed(brokerB, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000); assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) .getDestinationStatistics().getEnqueues().getCount()); assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) @@ -394,6 +399,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends // assert broker A stats + waitForMessagesToBeConsumed(brokerA, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000); assertEquals(30, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) .getDestinationStatistics().getEnqueues().getCount()); assertEquals(20, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) @@ -402,6 +408,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends .getDestinationStatistics().getMessages().getCount()); // assert broker B stats + waitForMessagesToBeConsumed(brokerB, "Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000); assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) .getDestinationStatistics().getEnqueues().getCount()); assertEquals(20, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) @@ -425,23 +432,16 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends }, 500); // assert broker A stats + waitForMessagesToBeConsumed(brokerA, "Consumer.B.VirtualTopic.tempTopic", false, 30, 30, 5000); assertEquals(30, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) .getDestinationStatistics().getEnqueues().getCount()); - - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) - .getDestinationStatistics().getEnqueues().getCount() == 30; - } - }, 5000); - assertEquals(30, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) .getDestinationStatistics().getDequeues().getCount()); assertEquals(0, brokerA.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) .getDestinationStatistics().getMessages().getCount()); // assert broker B stats + waitForMessagesToBeConsumed(brokerB, "Consumer.B.VirtualTopic.tempTopic", false, 30, 30, 5000); assertEquals(30, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) .getDestinationStatistics().getEnqueues().getCount()); assertEquals(30, brokerB.getDestination(new ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic")) @@ -456,7 +456,7 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends throws MalformedObjectNameException { ObjectName objectName = BrokerMBeanSupport .createVirtualDestinationSelectorCacheName(broker.getBrokerObjectName(), "plugin", "virtualDestinationCache"); - return (VirtualDestinationSelectorCacheViewMBean)broker.getManagementContext() + return (VirtualDestinationSelectorCacheViewMBean) broker.getManagementContext() .newProxyInstance(objectName, VirtualDestinationSelectorCacheViewMBean.class, true); } @@ -517,8 +517,6 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends remoteConsumer.close(); - - // now let's shut down broker A and clear its persistent selector cache brokerA.stop(); brokerA.waitUntilStopped(); @@ -581,13 +579,12 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends } private HashMap<String, Object> asMap(String key, Object value) { - HashMap<String, Object> rc = new HashMap<String,Object>(1); + HashMap<String, Object> rc = new HashMap<String, Object>(1); rc.put(key, value); return rc; } - private void bridgeAndConfigureBrokers(String local, String remote) throws Exception { NetworkConnector bridge = bridgeBrokers(local, remote, false, 1, false); @@ -628,9 +625,8 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends VirtualTopic virtualTopic = new VirtualTopic(); virtualTopic.setSelectorAware(true); VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); - interceptor - .setVirtualDestinations(new VirtualDestination[] { virtualTopic }); - broker.setDestinationInterceptors(new DestinationInterceptor[] { interceptor }); + interceptor.setVirtualDestinations(new VirtualDestination[]{virtualTopic}); + broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor}); configurePersistenceAdapter(broker); SubQueueSelectorCacheBrokerPlugin selectorCacheBrokerPlugin = new SubQueueSelectorCacheBrokerPlugin(); @@ -650,6 +646,47 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends broker.setPersistenceAdapter(kaha); } + /** + * Typically used before asserts to give producers and consumers some time to finish their tasks + * before the final state is tested. + * + * @param broker BrokerService on which the destinations are looked up + * @param destinationName + * @param topic true if the destination is a Topic, false if it is a Queue + * @param numEnqueueMsgs expected number of enqueued messages in the destination + * @param numDequeueMsgs expected number of dequeued messages in the destination + * @param waitTime number of milliseconds to wait for completion + * @throws Exception + */ + private void waitForMessagesToBeConsumed(final BrokerService broker, final String destinationName, + final boolean topic, final int numEnqueueMsgs, final int numDequeueMsgs, int waitTime) throws Exception { + final ActiveMQDestination destination; + if (topic) { + destination = new ActiveMQTopic(destinationName); + } else { + destination = new ActiveMQQueue(destinationName); + } + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + + return broker.getDestination(destination) + .getDestinationStatistics().getEnqueues().getCount() == numEnqueueMsgs; + } + }, waitTime); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + + return broker.getDestination(destination) + .getDestinationStatistics().getDequeues().getCount() == numDequeueMsgs; + } + }, waitTime); + } + + class ProducerThreadTester extends ProducerThread { private Set<String> selectors = new LinkedHashSet<String>(); @@ -699,4 +736,4 @@ public class TwoBrokerVirtualTopicSelectorAwareForwardingTest extends } } -} \ No newline at end of file +}