This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new aac622a39e fix(test): resolve flaky tests (#1774)
aac622a39e is described below

commit aac622a39ebffb076e8b2f4c9a5da0ad2452be06
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Sun Mar 15 08:31:34 2026 +0100

    fix(test): resolve flaky tests (#1774)
    
    * fix(test): resolve flaky tests
    
    * fix(test): try to make it more stable
---
 .../broker/advisory/AdvisoryBrokerTest.java        |  23 +++--
 .../broker/scheduler/JmsCronSchedulerTest.java     |  33 +++---
 .../DurableFiveBrokerNetworkBridgeTest.java        |   8 +-
 .../org/apache/activemq/usecases/AMQ6366Test.java  |  14 +++
 .../usecases/DurableSubscriptionOffline2Test.java  |  23 ++---
 ...kerVirtualTopicSelectorAwareForwardingTest.java | 111 ++++++++-------------
 6 files changed, 100 insertions(+), 112 deletions(-)

diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
index 4f9e6dd3e4..e4ff2ae27b 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
@@ -285,34 +285,41 @@ public class AdvisoryBrokerTest extends BrokerTestSupport 
{
 
         ActiveMQDestination queue = new ActiveMQQueue("test");
         ActiveMQDestination destination = 
AdvisorySupport.getProducerAdvisoryTopic(queue);
-        
+
         // Setup a first connection
         StubConnection connection1 = createConnection();
         ConnectionInfo connectionInfo1 = createConnectionInfo();
         SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
         connection1.send(connectionInfo1);
         connection1.send(sessionInfo1);
-        // Create the first consumer..         
+        // Create the first consumer..
+        // Use request() to ensure the advisory consumer is fully registered 
before
+        // the producer is created. This prevents a race where addConsumer and 
addProducer
+        // execute concurrently on different transport threads, which could 
cause the
+        // advisory consumer to receive both a broadcast AND a replay of the 
same producer.
         ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1, 
destination);
         consumerInfo1.setPrefetchSize(100);
-        connection1.send(consumerInfo1);
+        connection1.request(consumerInfo1);
 
         // Setup a producer.
         StubConnection connection2 = createConnection();
         ConnectionInfo connectionInfo2 = createConnectionInfo();
         SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
         ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
-        producerInfo2.setDestination(queue);        
+        producerInfo2.setDestination(queue);
         connection2.send(connectionInfo2);
         connection2.send(sessionInfo2);
-        connection2.send(producerInfo2);
-        
+        // Use request() to ensure producer is fully registered and its "new 
producer"
+        // advisory is fired before we create the advisory consumer. This 
prevents a race
+        // where the advisory consumer could receive both a replay AND the 
broadcast advisory.
+        connection2.request(producerInfo2);
+
         Message m1 = receiveMessage(connection1);
         assertNotNull(m1);
         assertNotNull(m1.getDataStructure());
         assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(), 
producerInfo2.getProducerId());
-        
-        // Create the 2nd consumer..         
+
+        // Create the 2nd consumer..
         ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, 
destination);
         consumerInfo2.setPrefetchSize(100);
         connection2.send(consumerInfo2);
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
index 28c3c00d6b..c734266f8b 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
@@ -30,9 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import jakarta.jms.Connection;
 import jakarta.jms.JMSException;
-import jakarta.jms.Message;
 import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageListener;
 import jakarta.jms.MessageProducer;
 import jakarta.jms.Session;
 import jakarta.jms.TextMessage;
@@ -57,19 +55,16 @@ public class JmsCronSchedulerTest extends 
JobSchedulerTestSupport {
         MessageConsumer consumer = session.createConsumer(destination);
 
         final CountDownLatch latch = new CountDownLatch(COUNT);
-        consumer.setMessageListener(new MessageListener() {
-            @Override
-            public void onMessage(Message message) {
-                count.incrementAndGet();
-                latch.countDown();
-                assertTrue(message instanceof TextMessage);
-                TextMessage tm = (TextMessage) message;
-                try {
-                    LOG.info("Received [{}] count: {} ", tm.getText(), 
count.get());
-                } catch (JMSException e) {
-                    LOG.error("Unexpected exception in onMessage", e);
-                    fail("Unexpected exception in onMessage: " + 
e.getMessage());
-                }
+        consumer.setMessageListener(message -> {
+            count.incrementAndGet();
+            latch.countDown();
+            assertTrue(message instanceof TextMessage);
+            final TextMessage tm = (TextMessage) message;
+            try {
+                LOG.info("Received [{}] count: {} ", tm.getText(), 
count.get());
+            } catch (JMSException e) {
+                LOG.error("Unexpected exception in onMessage", e);
+                fail("Unexpected exception in onMessage: " + e.getMessage());
             }
         });
 
@@ -88,9 +83,11 @@ public class JmsCronSchedulerTest extends 
JobSchedulerTestSupport {
         JobScheduler js = sb.getJobScheduler();
         List<Job> list = js.getAllJobs();
         assertEquals(COUNT, list.size());
-        latch.await(2, TimeUnit.MINUTES);
-        // All should messages should have been received by now
-        assertEquals(COUNT, count.get());
+        assertTrue("all scheduled messages should fire", latch.await(2, 
TimeUnit.MINUTES));
+        // Cron "* * * * *" fires every minute, so count may exceed COUNT
+        // if a second minute boundary is crossed during the wait
+        assertTrue("at least " + COUNT + " messages received, got " + 
count.get(),
+                count.get() >= COUNT);
 
         connection.close();
     }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
index 0265229fb4..6f355ae53c 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
@@ -803,11 +803,11 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
 
     protected void assertNCDurableSubsCount(final BrokerService brokerService, 
final ActiveMQTopic dest,
             final int count) throws Exception {
+        final boolean result = Wait.waitFor(() -> count == 
getNCDurableSubs(brokerService, dest).size(),
+                TimeUnit.SECONDS.toMillis(30), 500);
         assertTrue("Expected " + count + " NC durable sub(s) on " + 
brokerService.getBrokerName()
                 + " for " + dest.getTopicName() + ", but got "
-                + getNCDurableSubs(brokerService, dest).size(),
-            Wait.waitFor(() -> count == getNCDurableSubs(brokerService, 
dest).size(),
-                TimeUnit.SECONDS.toMillis(30), 500));
+                + getNCDurableSubs(brokerService, dest).size(), result);
     }
 
     protected List<DurableTopicSubscription> getNCDurableSubs(final 
BrokerService brokerService,
@@ -824,7 +824,7 @@ public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSu
         for (final SubscriptionKey key : 
destination.getDurableTopicSubs().keySet()) {
             if 
(key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX))
 {
                 final DurableTopicSubscription sub = 
destination.getDurableTopicSubs().get(key);
-                if (sub != null && sub.isActive()) {
+                if (sub != null) {
                     subs.add(sub);
                 }
             }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
index afe382934c..37d9dcb02e 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
@@ -97,6 +97,20 @@ public class AMQ6366Test extends 
JmsMultipleBrokersTestSupport {
         networkConnector.start();
         waitForBridgeFormation();
 
+        // Wait for the network bridge to re-establish its demand subscription 
on the
+        // publishing broker. waitForBridgeFormation() only verifies the 
bridge is connected,
+        // but setupStaticDestinations() (which creates the durable demand 
subscription)
+        // runs asynchronously after bridge connection. Without this wait, 
sendMessages()
+        // can fire before the demand subscription is set up, causing the 
message to be
+        // published with no subscriber to forward it to the consumer broker.
+        // We check for an active durable subscription because the durable sub 
may already
+        // exist (inactive) from the previous bridge; we need it to be 
reactivated.
+        final Topic pubBrokerDest = (Topic) 
brokers.get(pubBroker).broker.getDestination(dest);
+        assertTrue("Network durable subscription should be active on " + 
pubBroker,
+                Wait.waitFor(() -> 
pubBrokerDest.getDurableTopicSubs().values().stream()
+                                .anyMatch(DurableTopicSubscription::isActive),
+                        10000, 100));
+
         // Send messages
         sendMessages(pubBroker, dest, 1);
 
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
index 8e27acb477..74d1682afb 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
@@ -97,7 +97,7 @@ public class DurableSubscriptionOffline2Test extends 
DurableSubscriptionOfflineT
         MessageConsumer consumer = session.createDurableSubscriber(topic, 
"SubsId", null, true);
 
         for (int i=0; i<sent/2; i++) {
-            Message m =  consumer.receive(4000);
+            final Message m =  consumer.receive(30000);
             assertNotNull("got message: " + i, m);
             LOG.info("Got :" + i + ", " + m);
         }
@@ -110,12 +110,8 @@ public class DurableSubscriptionOffline2Test extends 
DurableSubscriptionOfflineT
 
         assertTrue("is active", durableSubscriptionView.isActive());
         assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, 
durableSubscriptionView.getEnqueueCounter());
-        assertTrue("correct waiting acks", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return 5 == 
durableSubscriptionView.getMessageCountAwaitingAcknowledge();
-            }
-        }));
+        assertTrue("correct waiting acks", Wait.waitFor(
+                () -> 5 == 
durableSubscriptionView.getMessageCountAwaitingAcknowledge()));
         assertEquals("correct dequeue", 5, 
durableSubscriptionView.getDequeueCounter());
 
 
@@ -150,7 +146,7 @@ public class DurableSubscriptionOffline2Test extends 
DurableSubscriptionOfflineT
         consumer = session.createDurableSubscriber(topic, "SubsId", null, 
true);
 
         for (int i=0; i<sent/2;i++) {
-            Message m =  consumer.receive(30000);
+            final Message m =  consumer.receive(30000);
             assertNotNull("got message: " + i, m);
             LOG.info("Got :" + i + ", " + m);
         }
@@ -162,13 +158,10 @@ public class DurableSubscriptionOffline2Test extends 
DurableSubscriptionOfflineT
 
         assertTrue("is active", durableSubscriptionView2.isActive());
         assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0, 
durableSubscriptionView2.getEnqueueCounter());
-        assertTrue("correct dequeue", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                long val = durableSubscriptionView2.getDequeueCounter();
-                LOG.info("dequeue count:" + val);
-                return 10 == val;
-            }
+        assertTrue("correct dequeue", Wait.waitFor(() -> {
+            final long val = durableSubscriptionView2.getDequeueCounter();
+            LOG.info("dequeue count:" + val);
+            return 10 == val;
         }));
     }
 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
index 905ce34294..83ed1ed8af 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
@@ -72,14 +72,14 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
 
         final BrokerService brokerA = brokers.get("BrokerA").broker;
 
-        String testQueue = "queue://Consumer.B.VirtualTopic.tempTopic";
-        VirtualDestinationSelectorCacheViewMBean cache = 
getVirtualDestinationSelectorCacheMBean(brokerA);
+        final String testQueue = "queue://Consumer.B.VirtualTopic.tempTopic";
+        final VirtualDestinationSelectorCacheViewMBean cache = 
getVirtualDestinationSelectorCacheMBean(brokerA);
         Set<String> selectors = cache.selectorsForDestination(testQueue);
 
         assertEquals(1, selectors.size());
         assertTrue(selectors.contains("foo = 'bar'"));
 
-        boolean removed = cache.deleteSelectorForDestination(testQueue, "foo = 
'bar'");
+        final boolean removed = cache.deleteSelectorForDestination(testQueue, 
"foo = 'bar'");
         assertTrue(removed);
 
         selectors = cache.selectorsForDestination(testQueue);
@@ -88,10 +88,12 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
         createConsumer("BrokerB", 
createDestination("Consumer.B.VirtualTopic.tempTopic", false),
                 "ceposta = 'redhat'");
 
-
-        final Destination destA0 = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
+        final ActiveMQQueue consumerQueue = new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic");
         assertTrue("advisories should propagate: 2 consumers on BrokerA",
-                Wait.waitFor(() -> destA0.getConsumers().size() == 2, 5000, 
100));
+                Wait.waitFor(() -> {
+                    final Destination d = 
brokerA.getBroker().getDestinationMap().get(consumerQueue);
+                    return d != null && d.getConsumers().size() == 2;
+                }, 15000, 100));
 
         selectors = cache.selectorsForDestination(testQueue);
         assertEquals(1, selectors.size());
@@ -135,13 +137,8 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
         consumer1 = createConsumer("BrokerA", consumerQueue, "SYMBOL = 'VIX'");
 
         // wait till new consumer is on board
-        Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
-                        .getConsumers().size() == 2;
-            }
-        });
+        final ActiveMQQueue leakTestQueue = new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic");
+        Wait.waitFor(() -> 
brokerA.getDestination(leakTestQueue).getConsumers().size() == 2);
 
         currentCount = producerTester.getSentCount();
         LOG.info(">>>> currently sent: total=" + currentCount + ", AAPL=" + 
producerTester.getCountForProperty("AAPL") + ", VIX=" + 
producerTester.getCountForProperty("VIX"));
@@ -154,13 +151,7 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
         consumer2 = createConsumer("BrokerA", consumerQueue, "SYMBOL = 'VIX'");
 
         // wait till new consumer is on board
-        Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
-                        .getConsumers().size() == 2;
-            }
-        });
+        Wait.waitFor(() -> 
brokerA.getDestination(leakTestQueue).getConsumers().size() == 2);
 
         currentCount = producerTester.getSentCount();
         LOG.info(">>>> currently sent: total=" + currentCount + ", AAPL=" + 
producerTester.getCountForProperty("AAPL") + ", VIX=" + 
producerTester.getCountForProperty("VIX"));
@@ -174,29 +165,23 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
 
         // make sure if there are messages that are orphaned in the queue that 
this number doesn't
         // grow...
-        final long currentDepth = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+        final long currentDepth = brokerA.getDestination(leakTestQueue)
                 .getDestinationStatistics().getMessages().getCount();
 
         LOG.info(">>>>> Orphaned messages? " + currentDepth);
 
         // wait 5s to see if we can get a growth in the depth of the queue
-        Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
-                        .getDestinationStatistics().getMessages().getCount() > 
currentDepth;
-            }
-        }, 5000);
+        Wait.waitFor(() -> brokerA.getDestination(leakTestQueue)
+                .getDestinationStatistics().getMessages().getCount() > 
currentDepth, 5000);
 
         // stop producers
         producerTester.setRunning(false);
         producerTester.join();
 
-        // pause to let consumers catch up
-        Thread.sleep(1000);
-
-        assertTrue(brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
-                .getDestinationStatistics().getMessages().getCount() <= 
currentDepth);
+        // wait for consumers to catch up and drain remaining messages
+        assertTrue("queue depth should not grow beyond snapshot",
+                Wait.waitFor(() -> brokerA.getDestination(leakTestQueue)
+                        .getDestinationStatistics().getMessages().getCount() 
<= currentDepth, 5000, 100));
 
 
     }
@@ -283,12 +268,15 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
         MessageConsumer nonSelectingConsumer = createConsumer("BrokerB", 
consumerBQueue);
 
         // let advisories propogate
-        final Destination destA1 = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
+        final ActiveMQQueue consumerBQueueInternal = new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic");
         assertTrue("advisories should propagate: 2 consumers on BrokerA",
-                Wait.waitFor(() -> destA1.getConsumers().size() == 2, 5000, 
100));
+                Wait.waitFor(() -> {
+                    final Destination d = 
brokerA.getBroker().getDestinationMap().get(consumerBQueueInternal);
+                    return d != null && d.getConsumers().size() == 2;
+                }, 15000, 100));
 
 
-        Destination destination = getDestination(brokerB, consumerBQueue);
+        final Destination destination = getDestination(brokerB, 
consumerBQueue);
         assertEquals(2, destination.getConsumers().size());
 
         // publisher publishes to this
@@ -333,9 +321,8 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
 
 
         // let advisories propogate
-        final Destination destA2 = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
         assertTrue("advisories should propagate: 1 consumer on BrokerA",
-                Wait.waitFor(() -> destA2.getConsumers().size() == 1, 5000, 
100));
+                Wait.waitFor(() -> 
brokerA.getDestination(consumerBQueueInternal).getConsumers().size() == 1, 
15000, 100));
 
         // and let's send messages with a selector that doesnt' match
         selectingConsumerMessages.flushMessages();
@@ -368,9 +355,8 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
         selectingConsumer.close();
 
         // let advisories propogate
-        final Destination destA3 = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
         assertTrue("advisories should propagate: 0 consumers on BrokerA",
-                Wait.waitFor(() -> destA3.getConsumers().isEmpty(), 5000, 
100));
+                Wait.waitFor(() -> 
brokerA.getDestination(consumerBQueueInternal).getConsumers().isEmpty(), 15000, 
100));
 
         selectingConsumerMessages.flushMessages();
 
@@ -401,9 +387,8 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
         assertEquals(10, selectingConsumerMessages.getMessageCount());
 
         // let advisories propogate
-        final Destination destA4 = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
         assertTrue("advisories should propagate: 1 consumer on BrokerA",
-                Wait.waitFor(() -> destA4.getConsumers().size() == 1, 5000, 
100));
+                Wait.waitFor(() -> 
brokerA.getDestination(consumerBQueueInternal).getConsumers().size() == 1, 
15000, 100));
 
         // assert broker A stats
         waitForMessagesToBeConsumed(brokerA, 
"Consumer.B.VirtualTopic.tempTopic", false, 30, 30, 5000);
@@ -451,20 +436,17 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
 
 
         // let advisories propogate
-        Wait.waitFor(new Wait.Condition() {
-            Destination dest = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return dest.getConsumers().size() == 1;
-            }
-        }, 500);
-
-        ActiveMQQueue queueB = new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic");
-        Destination destination = 
getDestination(brokers.get("BrokerB").broker, queueB);
+        final ActiveMQQueue selectorAwareQueue = new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic");
+        assertTrue("advisory should propagate: 1 consumer on BrokerA",
+                Wait.waitFor(() -> {
+                    final Destination d = 
brokerA.getBroker().getDestinationMap().get(selectorAwareQueue);
+                    return d != null && d.getConsumers().size() == 1;
+                }, 15000, 100));
+
+        final Destination destination = 
getDestination(brokers.get("BrokerB").broker, selectorAwareQueue);
         assertEquals(1, destination.getConsumers().size());
 
-        ActiveMQTopic virtualTopic = new 
ActiveMQTopic("VirtualTopic.tempTopic");
+        final ActiveMQTopic virtualTopic = new 
ActiveMQTopic("VirtualTopic.tempTopic");
         assertNull(getDestination(brokers.get("BrokerA").broker, 
virtualTopic));
         assertNull(getDestination(brokers.get("BrokerB").broker, 
virtualTopic));
 
@@ -512,16 +494,12 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
 
         System.out.println(brokerA.getNetworkConnectors());
 
-        // give a sec to let advisories propogate
         // let advisories propogate
-        Wait.waitFor(new Wait.Condition() {
-            Destination dest = brokerA.getDestination(new 
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return dest.getConsumers().size() == 1;
-            }
-        }, 500);
+        assertTrue("advisory should propagate after restart: 1 consumer on 
BrokerA",
+                Wait.waitFor(() -> {
+                    final Destination d = 
brokerA.getBroker().getDestinationMap().get(selectorAwareQueue);
+                    return d != null && d.getConsumers().size() == 1;
+                }, 15000, 100));
 
 
         // send two types of messages, one unwanted and the other wanted
@@ -586,12 +564,11 @@ public class 
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
     public void setUp() throws Exception {
         super.setAutoFail(true);
         super.setUp();
-        String options = new String(
-                "?useJmx=false&deleteAllMessagesOnStartup=true");
+        final String options = "?useJmx=false&deleteAllMessagesOnStartup=true";
         createAndConfigureBroker(new URI(
-                "broker:(tcp://localhost:61616)/BrokerA" + options));
+                "broker:(tcp://localhost:0)/BrokerA" + options));
         createAndConfigureBroker(new URI(
-                "broker:(tcp://localhost:61617)/BrokerB" + options));
+                "broker:(tcp://localhost:0)/BrokerB" + options));
     }
 
     private void clearSelectorCacheFiles() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to