This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 8eae494904 fix(test): fix CompositeConsumerNetworkBridgeTest flakiness
(#1742)
8eae494904 is described below
commit 8eae494904bce6022b4ad5cf0ea62579e3ecbb36
Author: JB Onofré <[email protected]>
AuthorDate: Mon Mar 9 14:23:45 2026 +0100
fix(test): fix CompositeConsumerNetworkBridgeTest flakiness (#1742)
Replace Thread.sleep with proper synchronization to eliminate race
conditions: use assertBridgeStarted() instead of sleeping for advisory
propagation, reorder assertions to confirm individual topic subs before
checking composite has none, and retry removeSubscription via
Wait.waitFor instead of a fixed sleep after closing durable subscribers.
---
.../CompositeConsumerNetworkBridgeTest.java | 41 +++++++++++++---------
1 file changed, 25 insertions(+), 16 deletions(-)
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/CompositeConsumerNetworkBridgeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CompositeConsumerNetworkBridgeTest.java
index 17b0447dd4..77ac2a5f83 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/CompositeConsumerNetworkBridgeTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CompositeConsumerNetworkBridgeTest.java
@@ -101,17 +101,18 @@ public class CompositeConsumerNetworkBridgeTest extends
DynamicNetworkTestSuppor
// The remote broker should create two durable subs instead of 1
// Should be 1 durable on each of the topics that are part of the
composite
- assertConsumersCount(broker2, compositeTopic, 0);
- assertNCDurableSubsCount(broker2, compositeTopic, 0);
+ // First confirm individual topic subs are created (wait for bridge to
finish processing)
for (ActiveMQTopic topic : topics) {
assertConsumersCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, topic, 1);
}
+ // Then verify no subs were created on the composite topic itself
+ assertConsumersCount(broker2, compositeTopic, 0);
+ assertNCDurableSubsCount(broker2, compositeTopic, 0);
assertCompositeMapCounts(1, 1);
durSub.close();
- Thread.sleep(1000);
- removeSubscription(broker1, subName);
+ waitAndRemoveSubscription(broker1, subName);
//Verify cleanup
for (ActiveMQTopic topic : topics) {
@@ -148,11 +149,10 @@ public class CompositeConsumerNetworkBridgeTest extends
DynamicNetworkTestSuppor
assertNotNull(durSub2.receive(1000));
durSub1.close();
- durSub2.close();;
+ durSub2.close();
- Thread.sleep(1000);
- removeSubscription(broker1, subName + "1");
- removeSubscription(broker1, subName + "2");
+ waitAndRemoveSubscription(broker1, subName + "1");
+ waitAndRemoveSubscription(broker1, subName + "2");
assertCompositeMapCounts(0, 0);
}
@@ -268,17 +268,16 @@ public class CompositeConsumerNetworkBridgeTest extends
DynamicNetworkTestSuppor
TopicSubscriber durSub2 =
session1.createDurableSubscriber(compositeTopic, subName + "2");
assertConsumersCount(broker1, compositeTopic, 2);
- assertConsumersCount(broker2, compositeTopic, 0);
- assertNCDurableSubsCount(broker2, compositeTopic, 0);
for (ActiveMQTopic topic : topics) {
assertConsumersCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, topic, 1);
}
+ assertConsumersCount(broker2, compositeTopic, 0);
+ assertNCDurableSubsCount(broker2, compositeTopic, 0);
assertCompositeMapCounts(2, 2);
durSub1.close();
- Thread.sleep(1000);
- removeSubscription(broker1, subName + "1");
+ waitAndRemoveSubscription(broker1, subName + "1");
for (ActiveMQTopic topic : topics) {
assertConsumersCount(broker2, topic, 1);
@@ -286,8 +285,7 @@ public class CompositeConsumerNetworkBridgeTest extends
DynamicNetworkTestSuppor
}
durSub2.close();
- Thread.sleep(1000);
- removeSubscription(broker1, subName + "2");
+ waitAndRemoveSubscription(broker1, subName + "2");
for (ActiveMQTopic topic : topics) {
assertConsumersCount(broker2, topic, 0);
@@ -305,8 +303,7 @@ public class CompositeConsumerNetworkBridgeTest extends
DynamicNetworkTestSuppor
protected void doSetUp(File localDataDir, File remoteDataDir) throws
Exception {
doSetUpRemoteBroker(remoteDataDir);
doSetUpLocalBroker(localDataDir);
- //Give time for advisories to propagate
- Thread.sleep(1000);
+ assertBridgeStarted();
}
protected void doSetUpLocalBroker(File dataDir) throws Exception {
@@ -414,6 +411,18 @@ public class CompositeConsumerNetworkBridgeTest extends
DynamicNetworkTestSuppor
assertTrue( Wait.waitFor(() -> compositeSubSize ==
bridge.compositeSubscriptions.size(), 5000, 500));
}
+ private void waitAndRemoveSubscription(BrokerService broker, String
subName) throws Exception {
+ assertTrue("Subscription " + subName + " should be removable",
+ Wait.waitFor(() -> {
+ try {
+ removeSubscription(broker, subName);
+ return true;
+ } catch (Exception e) {
+ return false;
+ }
+ }, 10000, 500));
+ }
+
protected DurableConduitBridge findBridge() throws Exception {
if (flow.equals(FLOW.FORWARD)) {
return findBridge(remoteBroker);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact