Repository: activemq
Updated Branches:
  refs/heads/master 9becfc0be -> 5ee9a3426


https://issues.apache.org/jira/browse/AMQ-5791 - apply patch from Vladimír 
Čaniga with thanks


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5ee9a342
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5ee9a342
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5ee9a342

Branch: refs/heads/master
Commit: 5ee9a3426f2837d88ed5b315c4543da25fd1c9db
Parents: 9becfc0
Author: gtully <gary.tu...@gmail.com>
Authored: Thu May 21 15:53:40 2015 +0100
Committer: gtully <gary.tu...@gmail.com>
Committed: Thu May 21 15:53:40 2015 +0100

----------------------------------------------------------------------
 ...VirtualTopicSelectorAwareForwardingTest.java | 85 ++++++++++++++------
 1 file changed, 61 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/5ee9a342/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
index 63fdd5a..d1be900 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
@@ -107,7 +107,7 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
 
     }
 
-    public void testMessageLeaks() throws Exception{
+    public void testMessageLeaks() throws Exception {
         clearSelectorCacheFiles();
         startAllBrokers();
 
@@ -207,7 +207,7 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
 
     }
 
-    private ProducerThreadTester createProducerTester(String brokerName, 
javax.jms.Destination destination) throws Exception{
+    private ProducerThreadTester createProducerTester(String brokerName, 
javax.jms.Destination destination) throws Exception {
         BrokerItem brokerItem = brokers.get(brokerName);
 
         Connection conn = brokerItem.createConnection();
@@ -218,7 +218,7 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
         return rc;
     }
 
-    public void testSelectorConsumptionWithNoMatchAtHeadOfQueue() throws 
Exception{
+    public void testSelectorConsumptionWithNoMatchAtHeadOfQueue() throws 
Exception {
         clearSelectorCacheFiles();
         startAllBrokers();
 
@@ -251,6 +251,7 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
         assertEquals(1, selectingConsumerMessages.getMessageCount());
 
         // assert broker A stats
+        waitForMessagesToBeConsumed(brokerA, 
"Consumer.B.VirtualTopic.tempTopic", false, 2, 1, 5000);
         assertEquals(1, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
                 .getConsumers().size());
         assertEquals(2, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
@@ -262,7 +263,7 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
 
     }
 
-    private MessageConsumer establishConsumer(String broker, 
ActiveMQDestination consumerQueue) throws Exception{
+    private MessageConsumer establishConsumer(String broker, 
ActiveMQDestination consumerQueue) throws Exception {
         BrokerItem item = brokers.get(broker);
         Connection c = item.createConnection();
         c.start();
@@ -270,7 +271,7 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
         return s.createConsumer(consumerQueue);
     }
 
-    public void testSelectorsAndNonSelectors() throws Exception{
+    public void testSelectorsAndNonSelectors() throws Exception {
         clearSelectorCacheFiles();
         // borkerA is local and brokerB is remote
         bridgeAndConfigureBrokers("BrokerA", "BrokerB");
@@ -320,6 +321,7 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
         assertEquals(15, nonSelectingConsumerMessages.getMessageCount());
 
         // assert broker A stats
+        waitForMessagesToBeConsumed(brokerA, 
"Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
         assertEquals(20, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
                 .getDestinationStatistics().getEnqueues().getCount());
         assertEquals(20, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
@@ -328,6 +330,7 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
                 .getDestinationStatistics().getMessages().getCount());
 
         // assert broker B stats
+        waitForMessagesToBeConsumed(brokerB, 
"Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
         assertEquals(20, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
                 .getDestinationStatistics().getEnqueues().getCount());
         assertEquals(20, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
@@ -357,9 +360,10 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
 
         selectingConsumerMessages = getConsumerMessages("BrokerB", 
selectingConsumer);
         selectingConsumerMessages.waitForMessagesToArrive(1, 1000L);
-        assertEquals(0, selectingConsumerMessages.getMessageCount()) ;
+        assertEquals(0, selectingConsumerMessages.getMessageCount());
 
         // assert broker A stats
+        waitForMessagesToBeConsumed(brokerA, 
"Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
         assertEquals(20, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
                 .getDestinationStatistics().getEnqueues().getCount());
         assertEquals(20, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
@@ -368,6 +372,7 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
                 .getDestinationStatistics().getMessages().getCount());
 
         // assert broker B stats
+        waitForMessagesToBeConsumed(brokerB, 
"Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
         assertEquals(20, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
                 .getDestinationStatistics().getEnqueues().getCount());
         assertEquals(20, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
@@ -394,6 +399,7 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
 
 
         // assert broker A stats
+        waitForMessagesToBeConsumed(brokerA, 
"Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
         assertEquals(30, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
                 .getDestinationStatistics().getEnqueues().getCount());
         assertEquals(20, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
@@ -402,6 +408,7 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
                 .getDestinationStatistics().getMessages().getCount());
 
         // assert broker B stats
+        waitForMessagesToBeConsumed(brokerB, 
"Consumer.B.VirtualTopic.tempTopic", false, 20, 20, 5000);
         assertEquals(20, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
                 .getDestinationStatistics().getEnqueues().getCount());
         assertEquals(20, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
@@ -425,23 +432,16 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
         }, 500);
 
         // assert broker A stats
+        waitForMessagesToBeConsumed(brokerA, 
"Consumer.B.VirtualTopic.tempTopic", false, 30, 30, 5000);
         assertEquals(30, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
                 .getDestinationStatistics().getEnqueues().getCount());
-
-        Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
-                        .getDestinationStatistics().getEnqueues().getCount() 
== 30;
-            }
-        }, 5000);
-
         assertEquals(30, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
                 .getDestinationStatistics().getDequeues().getCount());
         assertEquals(0, brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
                 .getDestinationStatistics().getMessages().getCount());
 
         // assert broker B stats
+        waitForMessagesToBeConsumed(brokerB, 
"Consumer.B.VirtualTopic.tempTopic", false, 30, 30, 5000);
         assertEquals(30, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
                 .getDestinationStatistics().getEnqueues().getCount());
         assertEquals(30, brokerB.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
@@ -456,7 +456,7 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
             throws MalformedObjectNameException {
         ObjectName objectName = BrokerMBeanSupport
                 
.createVirtualDestinationSelectorCacheName(broker.getBrokerObjectName(), 
"plugin", "virtualDestinationCache");
-        return 
(VirtualDestinationSelectorCacheViewMBean)broker.getManagementContext()
+        return (VirtualDestinationSelectorCacheViewMBean) 
broker.getManagementContext()
                 .newProxyInstance(objectName, 
VirtualDestinationSelectorCacheViewMBean.class, true);
     }
 
@@ -517,8 +517,6 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
         remoteConsumer.close();
 
 
-
-
         // now let's shut down broker A and clear its persistent selector cache
         brokerA.stop();
         brokerA.waitUntilStopped();
@@ -581,13 +579,12 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
     }
 
     private HashMap<String, Object> asMap(String key, Object value) {
-        HashMap<String, Object> rc = new HashMap<String,Object>(1);
+        HashMap<String, Object> rc = new HashMap<String, Object>(1);
         rc.put(key, value);
         return rc;
     }
 
 
-
     private void bridgeAndConfigureBrokers(String local, String remote)
             throws Exception {
         NetworkConnector bridge = bridgeBrokers(local, remote, false, 1, 
false);
@@ -628,9 +625,8 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
         VirtualTopic virtualTopic = new VirtualTopic();
         virtualTopic.setSelectorAware(true);
         VirtualDestinationInterceptor interceptor = new 
VirtualDestinationInterceptor();
-        interceptor
-                .setVirtualDestinations(new VirtualDestination[] { 
virtualTopic });
-        broker.setDestinationInterceptors(new DestinationInterceptor[] { 
interceptor });
+        interceptor.setVirtualDestinations(new 
VirtualDestination[]{virtualTopic});
+        broker.setDestinationInterceptors(new 
DestinationInterceptor[]{interceptor});
         configurePersistenceAdapter(broker);
 
         SubQueueSelectorCacheBrokerPlugin selectorCacheBrokerPlugin = new 
SubQueueSelectorCacheBrokerPlugin();
@@ -650,6 +646,47 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
         broker.setPersistenceAdapter(kaha);
     }
 
+    /**
+     * Typically used before asserts to give producers and consumers some time 
to finish their tasks
+     * before the final state is tested.
+     *
+     * @param broker BrokerService on which the destinations are looked up
+     * @param destinationName
+     * @param topic true if the destination is a Topic, false if it is a Queue
+     * @param numEnqueueMsgs expected number of enqueued messages in the 
destination
+     * @param numDequeueMsgs expected number of dequeued messages in the 
destination
+     * @param waitTime number of milliseconds to wait for completion
+     * @throws Exception
+     */
+    private void waitForMessagesToBeConsumed(final BrokerService broker, final 
String destinationName,
+                                             final boolean topic, final int 
numEnqueueMsgs, final int numDequeueMsgs, int waitTime) throws Exception {
+        final ActiveMQDestination destination;
+        if (topic) {
+            destination = new ActiveMQTopic(destinationName);
+        } else {
+            destination = new ActiveMQQueue(destinationName);
+        }
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+
+                return broker.getDestination(destination)
+                        .getDestinationStatistics().getEnqueues().getCount() 
== numEnqueueMsgs;
+            }
+        }, waitTime);
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+
+                return broker.getDestination(destination)
+                        .getDestinationStatistics().getDequeues().getCount() 
== numDequeueMsgs;
+            }
+        }, waitTime);
+    }
+
+
     class ProducerThreadTester extends ProducerThread {
 
         private Set<String> selectors = new LinkedHashSet<String>();
@@ -699,4 +736,4 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
         }
 
     }
-}
\ No newline at end of file
+}

Reply via email to