This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new aac622a39e fix(test): resolve flaky tests (#1774)
aac622a39e is described below
commit aac622a39ebffb076e8b2f4c9a5da0ad2452be06
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Sun Mar 15 08:31:34 2026 +0100
fix(test): resolve flaky tests (#1774)
* fix(test): resolve flaky tests
* fix(test): try to make it more stable
---
.../broker/advisory/AdvisoryBrokerTest.java | 23 +++--
.../broker/scheduler/JmsCronSchedulerTest.java | 33 +++---
.../DurableFiveBrokerNetworkBridgeTest.java | 8 +-
.../org/apache/activemq/usecases/AMQ6366Test.java | 14 +++
.../usecases/DurableSubscriptionOffline2Test.java | 23 ++---
...kerVirtualTopicSelectorAwareForwardingTest.java | 111 ++++++++-------------
6 files changed, 100 insertions(+), 112 deletions(-)
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
index 4f9e6dd3e4..e4ff2ae27b 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/advisory/AdvisoryBrokerTest.java
@@ -285,34 +285,41 @@ public class AdvisoryBrokerTest extends BrokerTestSupport
{
ActiveMQDestination queue = new ActiveMQQueue("test");
ActiveMQDestination destination =
AdvisorySupport.getProducerAdvisoryTopic(queue);
-
+
// Setup a first connection
StubConnection connection1 = createConnection();
ConnectionInfo connectionInfo1 = createConnectionInfo();
SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
connection1.send(connectionInfo1);
connection1.send(sessionInfo1);
- // Create the first consumer..
+ // Create the first consumer..
+ // Use request() to ensure the advisory consumer is fully registered
before
+ // the producer is created. This prevents a race where addConsumer and
addProducer
+ // execute concurrently on different transport threads, which could
cause the
+ // advisory consumer to receive both a broadcast AND a replay of the
same producer.
ConsumerInfo consumerInfo1 = createConsumerInfo(sessionInfo1,
destination);
consumerInfo1.setPrefetchSize(100);
- connection1.send(consumerInfo1);
+ connection1.request(consumerInfo1);
// Setup a producer.
StubConnection connection2 = createConnection();
ConnectionInfo connectionInfo2 = createConnectionInfo();
SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
ProducerInfo producerInfo2 = createProducerInfo(sessionInfo2);
- producerInfo2.setDestination(queue);
+ producerInfo2.setDestination(queue);
connection2.send(connectionInfo2);
connection2.send(sessionInfo2);
- connection2.send(producerInfo2);
-
+ // Use request() to ensure producer is fully registered and its "new
producer"
+ // advisory is fired before we create the advisory consumer. This
prevents a race
+ // where the advisory consumer could receive both a replay AND the
broadcast advisory.
+ connection2.request(producerInfo2);
+
Message m1 = receiveMessage(connection1);
assertNotNull(m1);
assertNotNull(m1.getDataStructure());
assertEquals(((ProducerInfo)m1.getDataStructure()).getProducerId(),
producerInfo2.getProducerId());
-
- // Create the 2nd consumer..
+
+ // Create the 2nd consumer..
ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2,
destination);
consumerInfo2.setPrefetchSize(100);
connection2.send(consumerInfo2);
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
index 28c3c00d6b..c734266f8b 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/JmsCronSchedulerTest.java
@@ -30,9 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import jakarta.jms.Connection;
import jakarta.jms.JMSException;
-import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
@@ -57,19 +55,16 @@ public class JmsCronSchedulerTest extends
JobSchedulerTestSupport {
MessageConsumer consumer = session.createConsumer(destination);
final CountDownLatch latch = new CountDownLatch(COUNT);
- consumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- count.incrementAndGet();
- latch.countDown();
- assertTrue(message instanceof TextMessage);
- TextMessage tm = (TextMessage) message;
- try {
- LOG.info("Received [{}] count: {} ", tm.getText(),
count.get());
- } catch (JMSException e) {
- LOG.error("Unexpected exception in onMessage", e);
- fail("Unexpected exception in onMessage: " +
e.getMessage());
- }
+ consumer.setMessageListener(message -> {
+ count.incrementAndGet();
+ latch.countDown();
+ assertTrue(message instanceof TextMessage);
+ final TextMessage tm = (TextMessage) message;
+ try {
+ LOG.info("Received [{}] count: {} ", tm.getText(),
count.get());
+ } catch (JMSException e) {
+ LOG.error("Unexpected exception in onMessage", e);
+ fail("Unexpected exception in onMessage: " + e.getMessage());
}
});
@@ -88,9 +83,11 @@ public class JmsCronSchedulerTest extends
JobSchedulerTestSupport {
JobScheduler js = sb.getJobScheduler();
List<Job> list = js.getAllJobs();
assertEquals(COUNT, list.size());
- latch.await(2, TimeUnit.MINUTES);
- // All should messages should have been received by now
- assertEquals(COUNT, count.get());
+ assertTrue("all scheduled messages should fire", latch.await(2,
TimeUnit.MINUTES));
+ // Cron "* * * * *" fires every minute, so count may exceed COUNT
+ // if a second minute boundary is crossed during the wait
+ assertTrue("at least " + COUNT + " messages received, got " +
count.get(),
+ count.get() >= COUNT);
connection.close();
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
index 0265229fb4..6f355ae53c 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
@@ -803,11 +803,11 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
protected void assertNCDurableSubsCount(final BrokerService brokerService,
final ActiveMQTopic dest,
final int count) throws Exception {
+ final boolean result = Wait.waitFor(() -> count ==
getNCDurableSubs(brokerService, dest).size(),
+ TimeUnit.SECONDS.toMillis(30), 500);
assertTrue("Expected " + count + " NC durable sub(s) on " +
brokerService.getBrokerName()
+ " for " + dest.getTopicName() + ", but got "
- + getNCDurableSubs(brokerService, dest).size(),
- Wait.waitFor(() -> count == getNCDurableSubs(brokerService,
dest).size(),
- TimeUnit.SECONDS.toMillis(30), 500));
+ + getNCDurableSubs(brokerService, dest).size(), result);
}
protected List<DurableTopicSubscription> getNCDurableSubs(final
BrokerService brokerService,
@@ -824,7 +824,7 @@ public class DurableFiveBrokerNetworkBridgeTest extends
JmsMultipleBrokersTestSu
for (final SubscriptionKey key :
destination.getDurableTopicSubs().keySet()) {
if
(key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX))
{
final DurableTopicSubscription sub =
destination.getDurableTopicSubs().get(key);
- if (sub != null && sub.isActive()) {
+ if (sub != null) {
subs.add(sub);
}
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
index afe382934c..37d9dcb02e 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/AMQ6366Test.java
@@ -97,6 +97,20 @@ public class AMQ6366Test extends
JmsMultipleBrokersTestSupport {
networkConnector.start();
waitForBridgeFormation();
+ // Wait for the network bridge to re-establish its demand subscription
on the
+ // publishing broker. waitForBridgeFormation() only verifies the
bridge is connected,
+ // but setupStaticDestinations() (which creates the durable demand
subscription)
+ // runs asynchronously after bridge connection. Without this wait,
sendMessages()
+ // can fire before the demand subscription is set up, causing the
message to be
+ // published with no subscriber to forward it to the consumer broker.
+ // We check for an active durable subscription because the durable sub
may already
+ // exist (inactive) from the previous bridge; we need it to be
reactivated.
+ final Topic pubBrokerDest = (Topic)
brokers.get(pubBroker).broker.getDestination(dest);
+ assertTrue("Network durable subscription should be active on " +
pubBroker,
+ Wait.waitFor(() ->
pubBrokerDest.getDurableTopicSubs().values().stream()
+ .anyMatch(DurableTopicSubscription::isActive),
+ 10000, 100));
+
// Send messages
sendMessages(pubBroker, dest, 1);
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
index 8e27acb477..74d1682afb 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
@@ -97,7 +97,7 @@ public class DurableSubscriptionOffline2Test extends
DurableSubscriptionOfflineT
MessageConsumer consumer = session.createDurableSubscriber(topic,
"SubsId", null, true);
for (int i=0; i<sent/2; i++) {
- Message m = consumer.receive(4000);
+ final Message m = consumer.receive(30000);
assertNotNull("got message: " + i, m);
LOG.info("Got :" + i + ", " + m);
}
@@ -110,12 +110,8 @@ public class DurableSubscriptionOffline2Test extends
DurableSubscriptionOfflineT
assertTrue("is active", durableSubscriptionView.isActive());
assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0,
durableSubscriptionView.getEnqueueCounter());
- assertTrue("correct waiting acks", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return 5 ==
durableSubscriptionView.getMessageCountAwaitingAcknowledge();
- }
- }));
+ assertTrue("correct waiting acks", Wait.waitFor(
+ () -> 5 ==
durableSubscriptionView.getMessageCountAwaitingAcknowledge()));
assertEquals("correct dequeue", 5,
durableSubscriptionView.getDequeueCounter());
@@ -150,7 +146,7 @@ public class DurableSubscriptionOffline2Test extends
DurableSubscriptionOfflineT
consumer = session.createDurableSubscriber(topic, "SubsId", null,
true);
for (int i=0; i<sent/2;i++) {
- Message m = consumer.receive(30000);
+ final Message m = consumer.receive(30000);
assertNotNull("got message: " + i, m);
LOG.info("Got :" + i + ", " + m);
}
@@ -162,13 +158,10 @@ public class DurableSubscriptionOffline2Test extends
DurableSubscriptionOfflineT
assertTrue("is active", durableSubscriptionView2.isActive());
assertEquals("all enqueued", keepDurableSubsActive ? 10 : 0,
durableSubscriptionView2.getEnqueueCounter());
- assertTrue("correct dequeue", Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- long val = durableSubscriptionView2.getDequeueCounter();
- LOG.info("dequeue count:" + val);
- return 10 == val;
- }
+ assertTrue("correct dequeue", Wait.waitFor(() -> {
+ final long val = durableSubscriptionView2.getDequeueCounter();
+ LOG.info("dequeue count:" + val);
+ return 10 == val;
}));
}
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
index 905ce34294..83ed1ed8af 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/TwoBrokerVirtualTopicSelectorAwareForwardingTest.java
@@ -72,14 +72,14 @@ public class
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
final BrokerService brokerA = brokers.get("BrokerA").broker;
- String testQueue = "queue://Consumer.B.VirtualTopic.tempTopic";
- VirtualDestinationSelectorCacheViewMBean cache =
getVirtualDestinationSelectorCacheMBean(brokerA);
+ final String testQueue = "queue://Consumer.B.VirtualTopic.tempTopic";
+ final VirtualDestinationSelectorCacheViewMBean cache =
getVirtualDestinationSelectorCacheMBean(brokerA);
Set<String> selectors = cache.selectorsForDestination(testQueue);
assertEquals(1, selectors.size());
assertTrue(selectors.contains("foo = 'bar'"));
- boolean removed = cache.deleteSelectorForDestination(testQueue, "foo =
'bar'");
+ final boolean removed = cache.deleteSelectorForDestination(testQueue,
"foo = 'bar'");
assertTrue(removed);
selectors = cache.selectorsForDestination(testQueue);
@@ -88,10 +88,12 @@ public class
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
createConsumer("BrokerB",
createDestination("Consumer.B.VirtualTopic.tempTopic", false),
"ceposta = 'redhat'");
-
- final Destination destA0 = brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
+ final ActiveMQQueue consumerQueue = new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic");
assertTrue("advisories should propagate: 2 consumers on BrokerA",
- Wait.waitFor(() -> destA0.getConsumers().size() == 2, 5000,
100));
+ Wait.waitFor(() -> {
+ final Destination d =
brokerA.getBroker().getDestinationMap().get(consumerQueue);
+ return d != null && d.getConsumers().size() == 2;
+ }, 15000, 100));
selectors = cache.selectorsForDestination(testQueue);
assertEquals(1, selectors.size());
@@ -135,13 +137,8 @@ public class
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
consumer1 = createConsumer("BrokerA", consumerQueue, "SYMBOL = 'VIX'");
// wait till new consumer is on board
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
- .getConsumers().size() == 2;
- }
- });
+ final ActiveMQQueue leakTestQueue = new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic");
+ Wait.waitFor(() ->
brokerA.getDestination(leakTestQueue).getConsumers().size() == 2);
currentCount = producerTester.getSentCount();
LOG.info(">>>> currently sent: total=" + currentCount + ", AAPL=" +
producerTester.getCountForProperty("AAPL") + ", VIX=" +
producerTester.getCountForProperty("VIX"));
@@ -154,13 +151,7 @@ public class
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
consumer2 = createConsumer("BrokerA", consumerQueue, "SYMBOL = 'VIX'");
// wait till new consumer is on board
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
- .getConsumers().size() == 2;
- }
- });
+ Wait.waitFor(() ->
brokerA.getDestination(leakTestQueue).getConsumers().size() == 2);
currentCount = producerTester.getSentCount();
LOG.info(">>>> currently sent: total=" + currentCount + ", AAPL=" +
producerTester.getCountForProperty("AAPL") + ", VIX=" +
producerTester.getCountForProperty("VIX"));
@@ -174,29 +165,23 @@ public class
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
// make sure if there are messages that are orphaned in the queue that
this number doesn't
// grow...
- final long currentDepth = brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
+ final long currentDepth = brokerA.getDestination(leakTestQueue)
.getDestinationStatistics().getMessages().getCount();
LOG.info(">>>>> Orphaned messages? " + currentDepth);
// wait 5s to see if we can get a growth in the depth of the queue
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
- .getDestinationStatistics().getMessages().getCount() >
currentDepth;
- }
- }, 5000);
+ Wait.waitFor(() -> brokerA.getDestination(leakTestQueue)
+ .getDestinationStatistics().getMessages().getCount() >
currentDepth, 5000);
// stop producers
producerTester.setRunning(false);
producerTester.join();
- // pause to let consumers catch up
- Thread.sleep(1000);
-
- assertTrue(brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"))
- .getDestinationStatistics().getMessages().getCount() <=
currentDepth);
+ // wait for consumers to catch up and drain remaining messages
+ assertTrue("queue depth should not grow beyond snapshot",
+ Wait.waitFor(() -> brokerA.getDestination(leakTestQueue)
+ .getDestinationStatistics().getMessages().getCount()
<= currentDepth, 5000, 100));
}
@@ -283,12 +268,15 @@ public class
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
MessageConsumer nonSelectingConsumer = createConsumer("BrokerB",
consumerBQueue);
// let advisories propogate
- final Destination destA1 = brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
+ final ActiveMQQueue consumerBQueueInternal = new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic");
assertTrue("advisories should propagate: 2 consumers on BrokerA",
- Wait.waitFor(() -> destA1.getConsumers().size() == 2, 5000,
100));
+ Wait.waitFor(() -> {
+ final Destination d =
brokerA.getBroker().getDestinationMap().get(consumerBQueueInternal);
+ return d != null && d.getConsumers().size() == 2;
+ }, 15000, 100));
- Destination destination = getDestination(brokerB, consumerBQueue);
+ final Destination destination = getDestination(brokerB,
consumerBQueue);
assertEquals(2, destination.getConsumers().size());
// publisher publishes to this
@@ -333,9 +321,8 @@ public class
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
// let advisories propogate
- final Destination destA2 = brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
assertTrue("advisories should propagate: 1 consumer on BrokerA",
- Wait.waitFor(() -> destA2.getConsumers().size() == 1, 5000,
100));
+ Wait.waitFor(() ->
brokerA.getDestination(consumerBQueueInternal).getConsumers().size() == 1,
15000, 100));
// and let's send messages with a selector that doesnt' match
selectingConsumerMessages.flushMessages();
@@ -368,9 +355,8 @@ public class
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
selectingConsumer.close();
// let advisories propogate
- final Destination destA3 = brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
assertTrue("advisories should propagate: 0 consumers on BrokerA",
- Wait.waitFor(() -> destA3.getConsumers().isEmpty(), 5000,
100));
+ Wait.waitFor(() ->
brokerA.getDestination(consumerBQueueInternal).getConsumers().isEmpty(), 15000,
100));
selectingConsumerMessages.flushMessages();
@@ -401,9 +387,8 @@ public class
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
assertEquals(10, selectingConsumerMessages.getMessageCount());
// let advisories propogate
- final Destination destA4 = brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
assertTrue("advisories should propagate: 1 consumer on BrokerA",
- Wait.waitFor(() -> destA4.getConsumers().size() == 1, 5000,
100));
+ Wait.waitFor(() ->
brokerA.getDestination(consumerBQueueInternal).getConsumers().size() == 1,
15000, 100));
// assert broker A stats
waitForMessagesToBeConsumed(brokerA,
"Consumer.B.VirtualTopic.tempTopic", false, 30, 30, 5000);
@@ -451,20 +436,17 @@ public class
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
// let advisories propogate
- Wait.waitFor(new Wait.Condition() {
- Destination dest = brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
-
- @Override
- public boolean isSatisified() throws Exception {
- return dest.getConsumers().size() == 1;
- }
- }, 500);
-
- ActiveMQQueue queueB = new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic");
- Destination destination =
getDestination(brokers.get("BrokerB").broker, queueB);
+ final ActiveMQQueue selectorAwareQueue = new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic");
+ assertTrue("advisory should propagate: 1 consumer on BrokerA",
+ Wait.waitFor(() -> {
+ final Destination d =
brokerA.getBroker().getDestinationMap().get(selectorAwareQueue);
+ return d != null && d.getConsumers().size() == 1;
+ }, 15000, 100));
+
+ final Destination destination =
getDestination(brokers.get("BrokerB").broker, selectorAwareQueue);
assertEquals(1, destination.getConsumers().size());
- ActiveMQTopic virtualTopic = new
ActiveMQTopic("VirtualTopic.tempTopic");
+ final ActiveMQTopic virtualTopic = new
ActiveMQTopic("VirtualTopic.tempTopic");
assertNull(getDestination(brokers.get("BrokerA").broker,
virtualTopic));
assertNull(getDestination(brokers.get("BrokerB").broker,
virtualTopic));
@@ -512,16 +494,12 @@ public class
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
System.out.println(brokerA.getNetworkConnectors());
- // give a sec to let advisories propogate
// let advisories propogate
- Wait.waitFor(new Wait.Condition() {
- Destination dest = brokerA.getDestination(new
ActiveMQQueue("Consumer.B.VirtualTopic.tempTopic"));
-
- @Override
- public boolean isSatisified() throws Exception {
- return dest.getConsumers().size() == 1;
- }
- }, 500);
+ assertTrue("advisory should propagate after restart: 1 consumer on
BrokerA",
+ Wait.waitFor(() -> {
+ final Destination d =
brokerA.getBroker().getDestinationMap().get(selectorAwareQueue);
+ return d != null && d.getConsumers().size() == 1;
+ }, 15000, 100));
// send two types of messages, one unwanted and the other wanted
@@ -586,12 +564,11 @@ public class
TwoBrokerVirtualTopicSelectorAwareForwardingTest extends
public void setUp() throws Exception {
super.setAutoFail(true);
super.setUp();
- String options = new String(
- "?useJmx=false&deleteAllMessagesOnStartup=true");
+ final String options = "?useJmx=false&deleteAllMessagesOnStartup=true";
createAndConfigureBroker(new URI(
- "broker:(tcp://localhost:61616)/BrokerA" + options));
+ "broker:(tcp://localhost:0)/BrokerA" + options));
createAndConfigureBroker(new URI(
- "broker:(tcp://localhost:61617)/BrokerB" + options));
+ "broker:(tcp://localhost:0)/BrokerB" + options));
}
private void clearSelectorCacheFiles() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact