This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a88c10ed1 Relax timeouts and macOS exclusions + small fixes (#1673)
3a88c10ed1 is described below

commit 3a88c10ed18d55d8c61ebc2fd8f54a7d462bf75b
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Tue Feb 24 16:40:18 2026 +0100

    Relax timeouts and macOS exclusions + small fixes (#1673)
---
 activemq-unit-tests/pom.xml                        |  24 +-
 .../activemq/bugs/DuplicateFromStoreTest.java      |   6 +-
 .../network/DurableSyncNetworkBridgeTest.java      |  86 +++---
 .../network/DynamicNetworkTestSupport.java         |  36 +--
 ...callyIncludedDestinationsDuplexNetworkTest.java |  27 +-
 .../network/MQTTNetworkOfBrokersFailoverTest.java  |  37 ++-
 .../apache/activemq/network/SimpleNetworkTest.java | 291 +++++++++++----------
 .../RestrictedThreadPoolInactivityTimeoutTest.java |   2 +-
 .../QueueZeroPrefetchLazyDispatchPriorityTest.java |   4 +-
 9 files changed, 283 insertions(+), 230 deletions(-)

diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml
index 68955145d1..7e1d3c73a5 100644
--- a/activemq-unit-tests/pom.xml
+++ b/activemq-unit-tests/pom.xml
@@ -953,7 +953,6 @@
                 
<exclude>org/apache/activemq/transport/SoWriteTimeoutTest.*</exclude>
                 
<exclude>org/apache/activemq/transport/TopicClusterTest.*</exclude>
                 
<exclude>org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.*</exclude>
-                
<exclude>org/apache/activemq/transport/discovery/DiscoveryTransportBrokerTest.*</exclude>
                 
<exclude>org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.*</exclude>
                 
<exclude>org/apache/activemq/transport/discovery/DiscoveryUriTest.*</exclude>
                 
<exclude>org/apache/activemq/transport/discovery/MasterSlaveDiscoveryTest.*</exclude>
@@ -1144,6 +1143,29 @@
               </plugins>
           </build>
       </profile>
+      <profile>
+        <id>activemq.tests.mac.excludes</id>
+        <activation>
+          <os>
+            <family>mac</family>
+          </os>
+        </activation>
+        <build>
+          <plugins>
+            <plugin>
+              <artifactId>maven-surefire-plugin</artifactId>
+              <configuration>
+                <excludes combine.children="append">
+                  <!-- Multicast and UDP based tests are unreliable on macOS 
CI (no proper multicast interfaces) -->
+                  
<exclude>org/apache/activemq/transport/peer/PeerTransportTest.*</exclude>
+                  
<exclude>org/apache/activemq/transport/multicast/MulticastTransportTest.*</exclude>
+                  
<exclude>org/apache/activemq/network/MulticastNetworkTest.*</exclude>
+                </excludes>
+              </configuration>
+            </plugin>
+          </plugins>
+        </build>
+      </profile>
       <profile>
           <id>activemq.tests.aix.excludes</id>
           <activation>
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java
index 753e41aa2c..04f633893d 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java
@@ -123,7 +123,7 @@ public class DuplicateFromStoreTest {
         }
     }
 
-    @Test
+    @Test(timeout = 120_000)
     public void testDuplicateMessage() throws Exception {
         LOG.info("Testing for duplicate messages.");
 
@@ -134,10 +134,10 @@ public class DuplicateFromStoreTest {
         createOpenwireClients(producers, consumers);
 
         LOG.info("All producers and consumers got started. Awaiting their 
termination");
-        producersFinished.await(100, TimeUnit.MINUTES);
+        producersFinished.await(2, TimeUnit.MINUTES);
         LOG.info("All producers have terminated. remaining to send: " + 
totalMessagesToSend.get() + ", sent:" + totalMessagesSent.get());
 
-        consumersFinished.await(100, TimeUnit.MINUTES);
+        consumersFinished.await(2, TimeUnit.MINUTES);
         LOG.info("All consumers have terminated.");
 
         producers.shutdownNow();
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
index 9727dbc41a..1791ec242f 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
@@ -49,7 +49,6 @@ import 
org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin;
 import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import 
org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
 import org.apache.activemq.util.Wait;
-import org.apache.activemq.util.Wait.Condition;
 import org.junit.After;
 import org.junit.Assume;
 import org.junit.Before;
@@ -83,7 +82,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
     private final FLOW flow;
 
     @Rule
-    public Timeout globalTimeout = new Timeout(30, TimeUnit.SECONDS);
+    public Timeout globalTimeout = new Timeout(60, TimeUnit.SECONDS);
 
     @Parameters
     public static Collection<Object[]> data() {
@@ -531,7 +530,6 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         session1.createDurableSubscriber(topic, "sub3");
         session1.createDurableSubscriber(excludeTopic, "sub-exclude");
 
-        Thread.sleep(1000);
         assertNCDurableSubsCount(broker2, topic, 1);
         assertNCDurableSubsCount(broker2, excludeTopic, 0);
 
@@ -570,13 +568,10 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         secondConnector.start();
 
         //Make sure both bridges are connected
-        assertTrue(Wait.waitFor(new Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return 
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1 &&
-                        
localBroker.getNetworkConnectors().get(1).activeBridges().size() == 1;
-            }
-        }, 10000, 500));
+        assertTrue(Wait.waitFor(() ->
+                
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1 &&
+                
localBroker.getNetworkConnectors().get(1).activeBridges().size() == 1,
+            TimeUnit.SECONDS.toMillis(10), 500));
 
         //Make sure NC durables exist for both bridges
         assertNCDurableSubsCount(broker2, topic2, 1);
@@ -637,13 +632,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         final DestinationStatistics remoteDestStatistics2 = 
remoteBroker.getDestination(
                 new 
ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
 
-        assertTrue(Wait.waitFor(new Condition() {
-
-            @Override
-            public boolean isSatisified() throws Exception {
-                return remoteDestStatistics2.getMessages().getCount() == 501;
-            }
-        }));
+        assertTrue(Wait.waitFor(() -> 
remoteDestStatistics2.getMessages().getCount() == 501));
 
     }
 
@@ -723,8 +712,36 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         included = new ActiveMQTopic(testTopicName);
         doSetUpRemoteBroker(deleteAllMessages, remoteDataDir, 0);
         doSetUpLocalBroker(deleteAllMessages, startNetworkConnector, 
localDataDir);
-        //Give time for advisories to propagate
-        Thread.sleep(1000);
+        //Wait for the bridge to be fully started (advisory consumers 
registered).
+        //Note: activeBridges().size() == 1 is NOT sufficient because bridges 
are added
+        //to the map before start() completes asynchronously. We must wait for 
the
+        //startedLatch which counts down after advisory consumers are 
registered.
+        if (startNetworkConnector) {
+            waitForBridgeFullyStarted();
+        }
+    }
+
+    private void waitForBridgeFullyStarted() throws Exception {
+        // Wait for the local bridge to be fully started (advisory consumers 
registered)
+        assertTrue("Local bridge should be fully started", Wait.waitFor(() -> {
+            if 
(localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
+                return false;
+            }
+            final NetworkBridge bridge = 
localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+            if (bridge instanceof DemandForwardingBridgeSupport) {
+                return ((DemandForwardingBridgeSupport) 
bridge).startedLatch.getCount() == 0;
+            }
+            return true;
+        }, TimeUnit.SECONDS.toMillis(10), 100));
+
+        // Also wait for the duplex bridge on the remote broker to be fully 
started.
+        // The duplex connector creates a separate DemandForwardingBridge on 
the remote side
+        // that also needs its advisory consumers registered before it can 
process events.
+        assertTrue("Duplex bridge should be fully started", Wait.waitFor(() -> 
{
+            final DemandForwardingBridge duplexBridge = findDuplexBridge(
+                remoteBroker.getTransportConnectors().get(0));
+            return duplexBridge != null && 
duplexBridge.startedLatch.getCount() == 0;
+        }, TimeUnit.SECONDS.toMillis(10), 100));
     }
 
     protected void restartLocalBroker(boolean startNetworkConnector) throws 
Exception {
@@ -757,12 +774,12 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         localConnection.start();
 
         if (startNetworkConnector) {
-            Wait.waitFor(new Condition() {
-                @Override
-                public boolean isSatisified() throws Exception {
-                    return 
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1;
-                }
-            }, 5000, 500);
+            // Best-effort wait for the bridge to appear. Do NOT use 
assertTrue here
+            // because some tests restart localBroker before remoteBroker is 
running,
+            // relying on the bridge connecting later when remoteBroker 
restarts.
+            // Tests that need the bridge to be fully started call 
assertBridgeStarted() explicitly.
+            Wait.waitFor(() -> 
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1,
+                         TimeUnit.SECONDS.toMillis(10), 500);
         }
         localSession = localConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
 
@@ -881,19 +898,16 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
     protected void waitForSubscriptionInactive(final BrokerService 
brokerService,
                                                final ActiveMQTopic topic,
                                                final String subName) throws 
Exception {
-        assertTrue("Subscription should become inactive", Wait.waitFor(new 
Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                
List<org.apache.activemq.broker.region.DurableTopicSubscription> subs = 
getSubscriptions(brokerService, topic);
-                for 
(org.apache.activemq.broker.region.DurableTopicSubscription sub : subs) {
-                    if 
(sub.getSubscriptionKey().getSubscriptionName().equals(subName)) {
-                        return !sub.isActive();
-                    }
+        assertTrue("Subscription should become inactive", Wait.waitFor(() -> {
+            final 
List<org.apache.activemq.broker.region.DurableTopicSubscription> subs = 
getSubscriptions(brokerService, topic);
+            for (final 
org.apache.activemq.broker.region.DurableTopicSubscription sub : subs) {
+                if 
(sub.getSubscriptionKey().getSubscriptionName().equals(subName)) {
+                    return !sub.isActive();
                 }
-                // If subscription doesn't exist, it's considered inactive
-                return true;
             }
-        }, 5000, 100));
+            // If subscription doesn't exist, it's considered inactive
+            return true;
+        }, TimeUnit.SECONDS.toMillis(10), 100));
     }
 
 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
index 87ee09040e..a347ffe3b5 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
@@ -24,6 +24,7 @@ import java.io.File;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import jakarta.jms.Connection;
 import jakarta.jms.JMSException;
@@ -45,7 +46,6 @@ import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.util.SubscriptionKey;
 import org.apache.activemq.util.Wait;
-import org.apache.activemq.util.Wait.Condition;
 import org.junit.Rule;
 import org.junit.rules.TemporaryFolder;
 
@@ -95,9 +95,16 @@ public abstract class DynamicNetworkTestSupport {
     }
 
     protected void assertBridgeStarted() throws Exception {
-        assertTrue(Wait.waitFor(
-            () -> 
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1,
-            10000, 500));
+        assertTrue("Bridge should be fully started", Wait.waitFor(() -> {
+            if 
(localBroker.getNetworkConnectors().get(0).activeBridges().size() != 1) {
+                return false;
+            }
+            final NetworkBridge bridge = 
localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+            if (bridge instanceof DemandForwardingBridgeSupport) {
+                return ((DemandForwardingBridgeSupport) 
bridge).startedLatch.getCount() == 0;
+            }
+            return true;
+        }, TimeUnit.SECONDS.toMillis(10), 500));
     }
 
     protected RemoveSubscriptionInfo getRemoveSubscriptionInfo(final 
ConnectionContext context,
@@ -135,17 +142,19 @@ public abstract class DynamicNetworkTestSupport {
 
     protected void assertNCDurableSubsCount(final BrokerService brokerService,
             final ActiveMQTopic dest, final int count) throws Exception {
-        assertTrue(Wait.waitFor(() -> count == getNCDurableSubs(brokerService, 
dest).size(),
-            10000, 500));
+        assertTrue("Expected " + count + " NC durable subs on " + dest,
+            Wait.waitFor(() -> count == getNCDurableSubs(brokerService, 
dest).size(),
+                         TimeUnit.SECONDS.toMillis(30), 500));
     }
 
     protected void assertConsumersCount(final BrokerService brokerService,
             final ActiveMQDestination dest, final int count) throws Exception {
         assertTrue(Wait.waitFor(() -> count == getConsumers(brokerService, 
dest).size(),
             10000, 500));
-        Thread.sleep(1000);
-        // Check one more time after a short pause to make sure the count 
didn't increase past what we wanted
-        assertEquals(count, getConsumers(brokerService, dest).size());
+        // Wait a bit longer and verify the count is stable (didn't increase 
past what we wanted)
+        assertTrue("Consumer count should remain stable at " + count,
+            Wait.waitFor(() -> count == getConsumers(brokerService, 
dest).size(),
+                         TimeUnit.SECONDS.toMillis(5), 500));
     }
 
     protected List<Subscription> getConsumers(final BrokerService 
brokerService,
@@ -208,12 +217,9 @@ public abstract class DynamicNetworkTestSupport {
 
     protected void assertSubscriptionsCount(final BrokerService brokerService,
             final ActiveMQTopic dest, final int count) throws Exception {
-        assertTrue(Wait.waitFor(new Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return count == getSubscriptions(brokerService, dest).size();
-            }
-        }, 10000, 500));
+        assertTrue("Expected " + count + " subscriptions on " + dest,
+            Wait.waitFor(() -> count == getSubscriptions(brokerService, 
dest).size(),
+                         TimeUnit.SECONDS.toMillis(30), 500));
     }
 
     protected void assertSubscriptionMapCounts(NetworkBridge networkBridge, 
final int count) {
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
index d52836e4f6..229c522559 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
@@ -80,7 +80,7 @@ public class DynamicallyIncludedDestinationsDuplexNetworkTest 
extends SimpleNetw
         assertEquals("Destination not deleted", 0, 
remoteBroker.getAdminView().getTemporaryQueues().length);
     }
 
-    @Test
+    @Test(timeout = 60 * 1000)
     public void testDynamicallyIncludedDestinationsForDuplex()  throws 
Exception{
         // Once the bridge is set up, we should see the filter used for the 
duplex end of the bridge
         // only subscribe to the specific destinations included in the 
<dynamicallyIncludedDestinations> list
@@ -88,11 +88,11 @@ public class 
DynamicallyIncludedDestinationsDuplexNetworkTest extends SimpleNetw
         // is correct
 
         // the bridge on the remote broker has the correct filter
-        TransportConnection bridgeConnection = 
getDuplexBridgeConnectionFromRemote();
+        final TransportConnection bridgeConnection = 
getDuplexBridgeConnectionFromRemote();
         assertNotNull(bridgeConnection);
-        DemandForwardingBridge duplexBridge = 
getDuplexBridgeFromConnection(bridgeConnection);
+        final DemandForwardingBridge duplexBridge = 
waitForDuplexBridge(bridgeConnection);
         assertNotNull(duplexBridge);
-        NetworkBridgeConfiguration configuration = 
getConfigurationFromNetworkBridge(duplexBridge);
+        final NetworkBridgeConfiguration configuration = 
getConfigurationFromNetworkBridge(duplexBridge);
         assertNotNull(configuration);
         assertFalse("This destinationFilter does not include ONLY the 
destinations specified in dynamicallyIncludedDestinations",
                 
configuration.getDestinationFilter().equals(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX
 + ">"));
@@ -115,17 +115,20 @@ public class 
DynamicallyIncludedDestinationsDuplexNetworkTest extends SimpleNetw
         return bridge;
     }
 
+    private DemandForwardingBridge waitForDuplexBridge(final 
TransportConnection bridgeConnection) throws Exception {
+        assertTrue("Timed out waiting for duplex bridge to be fully started",
+                Wait.waitFor(() -> {
+                    final DemandForwardingBridge bridge = 
getDuplexBridgeFromConnection(bridgeConnection);
+                    return bridge != null && bridge.getRemoteBrokerName() != 
null;
+                }));
+        return getDuplexBridgeFromConnection(bridgeConnection);
+    }
+
     public TransportConnection getDuplexBridgeConnectionFromRemote() throws 
Exception {
         final TransportConnector transportConnector = 
remoteBroker.getTransportConnectorByScheme("tcp");
         assertTrue("Timed out waiting for duplex bridge connection",
-                Wait.waitFor(new Wait.Condition() {
-                    @Override
-                    public boolean isSatisified() {
-                        return !transportConnector.getConnections().isEmpty();
-                    }
-                }));
-        CopyOnWriteArrayList<TransportConnection> transportConnections = 
transportConnector.getConnections();
-        return transportConnections.get(0);
+                Wait.waitFor(() -> 
!transportConnector.getConnections().isEmpty()));
+        return transportConnector.getConnections().get(0);
     }
 
     @Override
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java
index ad52fecf70..bc6c985e24 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java
@@ -23,9 +23,7 @@ import java.util.concurrent.TimeUnit;
 import jakarta.jms.Connection;
 import jakarta.jms.Destination;
 import jakarta.jms.JMSException;
-import jakarta.jms.Message;
 import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageListener;
 import jakarta.jms.Session;
 import javax.management.ObjectName;
 
@@ -35,6 +33,7 @@ import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
 import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.util.Wait;
 import org.apache.commons.lang.ArrayUtils;
 import org.fusesource.hawtdispatch.Dispatch;
 import org.fusesource.mqtt.client.BlockingConnection;
@@ -69,6 +68,10 @@ public class MQTTNetworkOfBrokersFailoverTest extends 
NetworkTestSupport {
         remoteBroker.addNetworkConnector(nc);
         nc.start();
 
+        // Wait for the network bridge to be established before proceeding
+        assertTrue("Network bridge should be established",
+            Wait.waitFor(() -> nc.activeBridges().size() == 1, 
TimeUnit.SECONDS.toMillis(10), 500));
+
         // mqtt port should have been assigned by now
         assertFalse(localBrokerMQTTPort == -1);
         assertFalse(remoteBrokerMQTTPort == -1);
@@ -104,7 +107,7 @@ public class MQTTNetworkOfBrokersFailoverTest extends 
NetworkTestSupport {
         remoteConn.connect();
         remoteConn.subscribe(new Topic[]{new Topic("foo/bar", 
QoS.AT_LEAST_ONCE)});
 
-        assertTrue("No destination detected!", consumerNetworked.await(1, 
TimeUnit.SECONDS));
+        assertTrue("No destination detected!", consumerNetworked.await(5, 
TimeUnit.SECONDS));
         assertQueueExistsOn(remoteBroker, 
"Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
         assertQueueExistsOn(broker, 
"Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
         remoteConn.disconnect();
@@ -149,23 +152,17 @@ public class MQTTNetworkOfBrokersFailoverTest extends 
NetworkTestSupport {
         final Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
         Destination dest = 
session.createTopic("ActiveMQ.Advisory.Consumer.Queue.Consumer.foo:AT_LEAST_ONCE.VirtualTopic.foo.bar");
         MessageConsumer consumer = session.createConsumer(dest);
-        consumer.setMessageListener(new MessageListener() {
-            @Override
-            public void onMessage(Message message) {
-                latch.countDown();
-                // shutdown this connection
-                Dispatch.getGlobalQueue().execute(new Runnable() {
-                    @Override
-                    public void run() {
-                        try {
-                            session.close();
-                            connection.close();
-                        } catch (JMSException e) {
-                            e.printStackTrace();
-                        }
-                    }
-                });
-            }
+        consumer.setMessageListener(message -> {
+            latch.countDown();
+            // shutdown this connection
+            Dispatch.getGlobalQueue().execute(() -> {
+                try {
+                    session.close();
+                    connection.close();
+                } catch (JMSException e) {
+                    e.printStackTrace();
+                }
+            });
         });
 
         return latch;
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
index 6ccf0137b6..08ec477e8d 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
@@ -16,20 +16,15 @@
  */
 package org.apache.activemq.network;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import java.util.Arrays;
-import java.util.concurrent.ConcurrentMap;
-
 import jakarta.jms.DeliveryMode;
 import jakarta.jms.Destination;
 import jakarta.jms.JMSException;
 import jakarta.jms.Message;
 import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageListener;
 import jakarta.jms.MessageProducer;
 import jakarta.jms.TextMessage;
 import jakarta.jms.TopicRequestor;
@@ -41,13 +36,13 @@ import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.util.Wait;
-import org.apache.activemq.util.Wait.Condition;
 import org.junit.Ignore;
 import org.junit.Test;
 import org.springframework.context.support.AbstractApplicationContext;
 
+import java.util.concurrent.TimeUnit;
+
 public class SimpleNetworkTest extends BaseNetworkTest {
 
     protected static final int MESSAGE_COUNT = 10;
@@ -70,21 +65,21 @@ public class SimpleNetworkTest extends BaseNetworkTest {
     @Test(timeout = 60 * 1000)
     public void testMessageCompression() throws Exception {
 
-        ActiveMQConnection localAmqConnection = (ActiveMQConnection) 
localConnection;
+        final ActiveMQConnection localAmqConnection = (ActiveMQConnection) 
localConnection;
         localAmqConnection.setUseCompression(true);
 
-        MessageConsumer consumer1 = remoteSession.createConsumer(included);
-        MessageProducer producer = localSession.createProducer(included);
+        final MessageConsumer consumer1 = 
remoteSession.createConsumer(included);
+        final MessageProducer producer = localSession.createProducer(included);
         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
         waitForConsumerRegistration(localBroker, 1, included);
 
         for (int i = 0; i < MESSAGE_COUNT; i++) {
-            Message test = localSession.createTextMessage("test-" + i);
+            final Message test = localSession.createTextMessage("test-" + i);
             producer.send(test);
-            Message msg = consumer1.receive(3000);
+            final Message msg = consumer1.receive(3000);
             assertNotNull("not null? message: " + i, msg);
-            ActiveMQMessage amqMessage = (ActiveMQMessage) msg;
+            final ActiveMQMessage amqMessage = (ActiveMQMessage) msg;
             assertTrue(amqMessage.isCompressed());
         }
         // ensure no more messages received
@@ -96,30 +91,27 @@ public class SimpleNetworkTest extends BaseNetworkTest {
     @Test(timeout = 60 * 1000)
     public void testRequestReply() throws Exception {
         final MessageProducer remoteProducer = 
remoteSession.createProducer(null);
-        MessageConsumer remoteConsumer = 
remoteSession.createConsumer(included);
-        remoteConsumer.setMessageListener(new MessageListener() {
-            @Override
-            public void onMessage(Message msg) {
-                try {
-                    TextMessage textMsg = (TextMessage)msg;
-                    String payload = "REPLY: " + textMsg.getText();
-                    Destination replyTo;
-                    replyTo = msg.getJMSReplyTo();
-                    textMsg.clearBody();
-                    textMsg.setText(payload);
-                    remoteProducer.send(replyTo, textMsg);
-                } catch (JMSException e) {
-                    e.printStackTrace();
-                }
+        final MessageConsumer remoteConsumer = 
remoteSession.createConsumer(included);
+        remoteConsumer.setMessageListener(msg -> {
+            try {
+                final TextMessage textMsg = (TextMessage) msg;
+                final String payload = "REPLY: " + textMsg.getText();
+                final Destination replyTo = msg.getJMSReplyTo();
+                textMsg.clearBody();
+                textMsg.setText(payload);
+                remoteProducer.send(replyTo, textMsg);
+            } catch (JMSException e) {
+                e.printStackTrace();
             }
         });
 
-        TopicRequestor requestor = new 
TopicRequestor((TopicSession)localSession, included);
-        // allow for consumer infos to perculate arround
-        Thread.sleep(5000);
+        final TopicRequestor requestor = new TopicRequestor((TopicSession) 
localSession, included);
+        // Wait for consumer demand to propagate across the network bridge
+        waitForConsumerRegistration(localBroker, 1, included);
+
         for (int i = 0; i < MESSAGE_COUNT; i++) {
-            TextMessage msg = localSession.createTextMessage("test msg: " + i);
-            TextMessage result = (TextMessage)requestor.request(msg);
+            final TextMessage msg = localSession.createTextMessage("test msg: 
" + i);
+            final TextMessage result = (TextMessage) requestor.request(msg);
             assertNotNull(result);
             LOG.info(result.getText());
         }
@@ -129,13 +121,14 @@ public class SimpleNetworkTest extends BaseNetworkTest {
 
     @Test(timeout = 60 * 1000)
     public void testFiltering() throws Exception {
-        MessageConsumer includedConsumer = 
remoteSession.createConsumer(included);
-        MessageConsumer excludedConsumer = 
remoteSession.createConsumer(excluded);
-        MessageProducer includedProducer = 
localSession.createProducer(included);
-        MessageProducer excludedProducer = 
localSession.createProducer(excluded);
-        // allow for consumer infos to perculate around
-        Thread.sleep(2000);
-        Message test = localSession.createTextMessage("test");
+        final MessageConsumer includedConsumer = 
remoteSession.createConsumer(included);
+        final MessageConsumer excludedConsumer = 
remoteSession.createConsumer(excluded);
+        final MessageProducer includedProducer = 
localSession.createProducer(included);
+        final MessageProducer excludedProducer = 
localSession.createProducer(excluded);
+        // Wait for consumer demand to propagate across the network bridge
+        waitForConsumerRegistration(localBroker, 1, included);
+
+        final Message test = localSession.createTextMessage("test");
         includedProducer.send(test);
         excludedProducer.send(test);
         assertNull(excludedConsumer.receive(1000));
@@ -146,15 +139,15 @@ public class SimpleNetworkTest extends BaseNetworkTest {
 
     @Test(timeout = 60 * 1000)
     public void testConduitBridge() throws Exception {
-        MessageConsumer consumer1 = remoteSession.createConsumer(included);
-        MessageConsumer consumer2 = remoteSession.createConsumer(included);
-        MessageProducer producer = localSession.createProducer(included);
+        final MessageConsumer consumer1 = 
remoteSession.createConsumer(included);
+        final MessageConsumer consumer2 = 
remoteSession.createConsumer(included);
+        final MessageProducer producer = localSession.createProducer(included);
         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
 
         waitForConsumerRegistration(localBroker, 2, included);
 
         for (int i = 0; i < MESSAGE_COUNT; i++) {
-            Message test = localSession.createTextMessage("test-" + i);
+            final Message test = localSession.createTextMessage("test-" + i);
             producer.send(test);
             assertNotNull(consumer1.receive(1000));
             assertNotNull(consumer2.receive(1000));
@@ -170,27 +163,31 @@ public class SimpleNetworkTest extends BaseNetworkTest {
     }
 
     private void waitForConsumerRegistration(final BrokerService 
brokerService, final int min, final ActiveMQDestination destination) throws 
Exception {
-        assertTrue("Internal bridge consumers registered in time", 
Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                Object[] bridges = 
brokerService.getNetworkConnectors().get(0).bridges.values().toArray();
-                if (bridges.length > 0) {
-                    LOG.info(brokerService + " bridges "  + 
Arrays.toString(bridges));
-                    DemandForwardingBridgeSupport 
demandForwardingBridgeSupport = (DemandForwardingBridgeSupport) bridges[0];
-                    ConcurrentMap<ConsumerId, DemandSubscription> 
forwardingBridges = demandForwardingBridgeSupport.getLocalSubscriptionMap();
-                    LOG.info(brokerService + " bridge "  + 
demandForwardingBridgeSupport + ", localSubs: " + forwardingBridges);
-                    if (!forwardingBridges.isEmpty()) {
-                        for (DemandSubscription demandSubscription : 
forwardingBridges.values()) {
-                            if 
(demandSubscription.getLocalInfo().getDestination().equals(destination)) {
-                                LOG.info(brokerService + " DemandSubscription 
"  + demandSubscription + ", size: " + demandSubscription.size());
-                                return demandSubscription.size() >= min;
-                            }
+        // Wait for the bridge demand subscriptions to register the expected 
number of
+        // remote consumers. With conduit subscriptions, multiple remote 
consumers map
+        // to a single local subscription, so we check 
DemandSubscription.size().
+        assertTrue("Bridge demand subscription registered for " + destination, 
Wait.waitFor(() -> {
+            if (brokerService.getNetworkConnectors().isEmpty()) {
+                return false;
+            }
+            for (final NetworkBridge bridge : 
brokerService.getNetworkConnectors().get(0).activeBridges()) {
+                if (bridge instanceof DemandForwardingBridgeSupport) {
+                    final DemandForwardingBridgeSupport dfBridge = 
(DemandForwardingBridgeSupport) bridge;
+                    for (final DemandSubscription ds : 
dfBridge.getLocalSubscriptionMap().values()) {
+                        if 
(ds.getLocalInfo().getDestination().equals(destination)) {
+                            return ds.size() >= min;
                         }
                     }
                 }
-                return false;
             }
-        }));
+            return false;
+        }, TimeUnit.SECONDS.toMillis(30), 100));
+
+        // Also verify the consumer is actually dispatching on the broker's 
destination.
+        // The DemandSubscription may exist before the local consumer is fully 
registered.
+        assertTrue("Consumer dispatching on " + destination, Wait.waitFor(
+            () -> 
brokerService.getDestination(destination).getDestinationStatistics().getConsumers().getCount()
 >= 1,
+            TimeUnit.SECONDS.toMillis(10), 100));
     }
 
     //Added for AMQ-6465 to make sure memory usage decreased back to 0 after 
messages are forwarded
@@ -198,27 +195,28 @@ public class SimpleNetworkTest extends BaseNetworkTest {
     @Test(timeout = 60 * 1000)
     public void testDurableTopicSubForwardMemoryUsage() throws Exception {
         // create a remote durable consumer to create demand
-        MessageConsumer remoteConsumer = 
remoteSession.createDurableSubscriber(included, consumerName);
-        Thread.sleep(1000);
+        final MessageConsumer remoteConsumer = 
remoteSession.createDurableSubscriber(included, consumerName);
+        // Wait for consumer demand to propagate across the network bridge
+        waitForConsumerRegistration(localBroker, 1, included);
 
-        MessageProducer producer = localSession.createProducer(included);
+        final MessageProducer producer = localSession.createProducer(included);
         for (int i = 0; i < MESSAGE_COUNT; i++) {
-            Message test = localSession.createTextMessage("test-" + i);
+            final Message test = localSession.createTextMessage("test-" + i);
             producer.send(test);
         }
-        Thread.sleep(1000);
-
-        //Make sure stats are set
-        assertEquals(MESSAGE_COUNT,
-                
localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount());
-
-        assertTrue(Wait.waitFor(new Condition() {
 
-            @Override
-            public boolean isSatisified() throws Exception {
-                return 
localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0;
-            }
-        }, 10000, 500));
+        //Make sure stats are set - wait for forwards to complete
+        assertTrue("Forwards count did not reach expected value", Wait.waitFor(
+                () -> {
+                    final long count = 
localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount();
+                    LOG.info("testDurableTopicSubForwardMemoryUsage: forwards 
count = " + count + " (expected " + MESSAGE_COUNT + ")");
+                    return count == MESSAGE_COUNT;
+                },
+                TimeUnit.SECONDS.toMillis(30), 100));
+
+        assertTrue("Memory usage did not return to 0", Wait.waitFor(
+                () -> localBroker.getSystemUsage().getMemoryUsage().getUsage() 
== 0,
+                TimeUnit.SECONDS.toMillis(10), 500));
         remoteConsumer.close();
     }
 
@@ -226,28 +224,25 @@ public class SimpleNetworkTest extends BaseNetworkTest {
     //to the other broker
     @Test(timeout = 60 * 1000)
     public void testTopicSubForwardMemoryUsage() throws Exception {
-        // create a remote durable consumer to create demand
-        MessageConsumer remoteConsumer = 
remoteSession.createConsumer(included);
-        Thread.sleep(1000);
+        // create a remote consumer to create demand
+        final MessageConsumer remoteConsumer = 
remoteSession.createConsumer(included);
+        // Wait for consumer demand to propagate across the network bridge
+        waitForConsumerRegistration(localBroker, 1, included);
 
-        MessageProducer producer = localSession.createProducer(included);
+        final MessageProducer producer = localSession.createProducer(included);
         for (int i = 0; i < MESSAGE_COUNT; i++) {
-            Message test = localSession.createTextMessage("test-" + i);
+            final Message test = localSession.createTextMessage("test-" + i);
             producer.send(test);
         }
-        Thread.sleep(1000);
-
-        //Make sure stats are set
-        assertEquals(MESSAGE_COUNT,
-                
localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount());
 
-        assertTrue(Wait.waitFor(new Condition() {
+        //Make sure stats are set - wait for forwards to complete
+        assertTrue("Forwards count did not reach expected value", Wait.waitFor(
+                () -> 
localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount()
 == MESSAGE_COUNT,
+                TimeUnit.SECONDS.toMillis(30), 100));
 
-            @Override
-            public boolean isSatisified() throws Exception {
-                return 
localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0;
-            }
-        }, 10000, 500));
+        assertTrue("Memory usage did not return to 0", Wait.waitFor(
+                () -> localBroker.getSystemUsage().getMemoryUsage().getUsage() 
== 0,
+                TimeUnit.SECONDS.toMillis(10), 500));
 
         for (int i = 0; i < MESSAGE_COUNT; i++) {
             assertNotNull("message count: " + i, remoteConsumer.receive(2500));
@@ -259,28 +254,25 @@ public class SimpleNetworkTest extends BaseNetworkTest {
     //to the other broker
     @Test(timeout = 60 * 1000)
     public void testQueueSubForwardMemoryUsage() throws Exception {
-        ActiveMQQueue queue = new ActiveMQQueue("include.test.foo");
-        MessageConsumer remoteConsumer = remoteSession.createConsumer(queue);
-        Thread.sleep(1000);
+        final ActiveMQQueue queue = new ActiveMQQueue("include.test.foo");
+        final MessageConsumer remoteConsumer = 
remoteSession.createConsumer(queue);
+        // Wait for consumer demand to propagate across the network bridge
+        waitForConsumerRegistration(localBroker, 1, queue);
 
-        MessageProducer producer = localSession.createProducer(queue);
+        final MessageProducer producer = localSession.createProducer(queue);
         for (int i = 0; i < MESSAGE_COUNT; i++) {
-            Message test = localSession.createTextMessage("test-" + i);
+            final Message test = localSession.createTextMessage("test-" + i);
             producer.send(test);
         }
-        Thread.sleep(1000);
-
-        //Make sure stats are set
-        assertEquals(MESSAGE_COUNT,
-                
localBroker.getDestination(queue).getDestinationStatistics().getForwards().getCount());
 
-        assertTrue(Wait.waitFor(new Condition() {
+        //Make sure stats are set - wait for forwards to complete
+        assertTrue("Forwards count did not reach expected value", Wait.waitFor(
+                () -> 
localBroker.getDestination(queue).getDestinationStatistics().getForwards().getCount()
 == MESSAGE_COUNT,
+                TimeUnit.SECONDS.toMillis(30), 100));
 
-            @Override
-            public boolean isSatisified() throws Exception {
-                return 
localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0;
-            }
-        }, 10000, 500));
+        assertTrue("Memory usage did not return to 0", Wait.waitFor(
+                () -> localBroker.getSystemUsage().getMemoryUsage().getUsage() 
== 0,
+                TimeUnit.SECONDS.toMillis(10), 500));
 
         for (int i = 0; i < MESSAGE_COUNT; i++) {
             assertNotNull("message count: " + i, remoteConsumer.receive(2500));
@@ -291,28 +283,30 @@ public class SimpleNetworkTest extends BaseNetworkTest {
     @Test(timeout = 60 * 1000)
     public void testDurableStoreAndForward() throws Exception {
         // create a remote durable consumer
-        MessageConsumer remoteConsumer = 
remoteSession.createDurableSubscriber(included, consumerName);
-        Thread.sleep(1000);
+        final MessageConsumer remoteConsumer = 
remoteSession.createDurableSubscriber(included, consumerName);
+        // Wait for consumer demand to propagate across the network bridge
+        waitForConsumerRegistration(localBroker, 1, included);
+
         // now close everything down and restart
         doTearDown();
         doSetUp(false);
-        MessageProducer producer = localSession.createProducer(included);
+        final MessageProducer producer = localSession.createProducer(included);
         for (int i = 0; i < MESSAGE_COUNT; i++) {
-            Message test = localSession.createTextMessage("test-" + i);
+            final Message test = localSession.createTextMessage("test-" + i);
             producer.send(test);
         }
-        Thread.sleep(1000);
 
-        //Make sure stats are set
-        assertEquals(MESSAGE_COUNT,
-                
localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount());
+        //Make sure stats are set - wait for forwards to complete
+        assertTrue("Forwards count did not reach expected value", Wait.waitFor(
+                () -> 
localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount()
 == MESSAGE_COUNT,
+                TimeUnit.SECONDS.toMillis(30), 100));
 
         // close everything down and restart
         doTearDown();
         doSetUp(false);
-        remoteConsumer = remoteSession.createDurableSubscriber(included, 
consumerName);
+        final MessageConsumer remoteConsumer2 = 
remoteSession.createDurableSubscriber(included, consumerName);
         for (int i = 0; i < MESSAGE_COUNT; i++) {
-            assertNotNull("message count: " + i, remoteConsumer.receive(2500));
+            assertNotNull("message count: " + i, 
remoteConsumer2.receive(2500));
         }
     }
 
@@ -320,35 +314,55 @@ public class SimpleNetworkTest extends BaseNetworkTest {
             "it requires a connection per durable to match that connectionId")
     public void testDurableStoreAndForwardReconnect() throws Exception {
         // create a local durable consumer
-        MessageConsumer localConsumer = 
localSession.createDurableSubscriber(included, consumerName);
-        Thread.sleep(5000);
+        final MessageConsumer localConsumer = 
localSession.createDurableSubscriber(included, consumerName);
+        // Wait for consumer demand to propagate across the network bridge
+        waitForConsumerRegistration(localBroker, 1, included);
+
         // now close everything down and restart
         doTearDown();
         doSetUp(false);
         // send messages
-        MessageProducer producer = localSession.createProducer(included);
+        final MessageProducer producer = localSession.createProducer(included);
         for (int i = 0; i < MESSAGE_COUNT; i++) {
-            Message test = localSession.createTextMessage("test-" + i);
+            final Message test = localSession.createTextMessage("test-" + i);
             producer.send(test);
         }
-        Thread.sleep(5000);
+
+        //Make sure stats are set - wait for forwards to complete
+        assertTrue("Forwards count did not reach expected value", Wait.waitFor(
+                () -> 
localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount()
 == MESSAGE_COUNT,
+                TimeUnit.SECONDS.toMillis(30), 100));
+
         // consume some messages locally
-        localConsumer = localSession.createDurableSubscriber(included, 
consumerName);
-        LOG.info("Consume from local consumer: " + localConsumer);
+        final MessageConsumer localConsumer2 = 
localSession.createDurableSubscriber(included, consumerName);
+        LOG.info("Consume from local consumer: " + localConsumer2);
         for (int i = 0; i < MESSAGE_COUNT / 2; i++) {
-            assertNotNull("message count: " + i, localConsumer.receive(2500));
+            assertNotNull("message count: " + i, localConsumer2.receive(2500));
         }
-        Thread.sleep(5000);
+
+        // Wait for local messages to be consumed
+        assertTrue("Local messages consumed", Wait.waitFor(
+                () -> 
localBroker.getDestination(included).getDestinationStatistics().getDequeues().getCount()
 >= MESSAGE_COUNT / 2,
+                TimeUnit.SECONDS.toMillis(10), 100));
+
         // close everything down and restart
         doTearDown();
         doSetUp(false);
-        Thread.sleep(5000);
+
+        // Wait for network bridge to re-form
+        assertTrue("Network bridge re-formed", Wait.waitFor(
+                () -> !localBroker.getNetworkConnectors().isEmpty()
+                    && 
!localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty(),
+                TimeUnit.SECONDS.toMillis(10), 100));
 
         LOG.info("Consume from remote");
         // consume the rest remotely
-        MessageConsumer remoteConsumer = 
remoteSession.createDurableSubscriber(included, consumerName);
+        final MessageConsumer remoteConsumer = 
remoteSession.createDurableSubscriber(included, consumerName);
         LOG.info("Remote consumer: " + remoteConsumer);
-        Thread.sleep(5000);
+
+        // Wait for consumer demand to propagate
+        waitForConsumerRegistration(localBroker, 1, included);
+
         for (int i = 0; i < MESSAGE_COUNT / 2; i++) {
             assertNotNull("message count: " + i, 
remoteConsumer.receive(10000));
         }
@@ -359,14 +373,11 @@ public class SimpleNetworkTest extends BaseNetworkTest {
         final NetworkBridge localBridge = 
localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
         final NetworkBridge remoteBridge = 
remoteBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
 
-        assertTrue(Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return expectedLocalSent == 
localBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
-                       0 == 
localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount() &&
-                       expectedRemoteSent == 
remoteBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
-                       0 == 
remoteBridge.getNetworkBridgeStatistics().getReceivedCount().getCount();
-            }
-        }));
+        assertTrue(Wait.waitFor(() ->
+                expectedLocalSent == 
localBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
+                0 == 
localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount() &&
+                expectedRemoteSent == 
remoteBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
+                0 == 
remoteBridge.getNetworkBridgeStatistics().getReceivedCount().getCount()
+        , TimeUnit.SECONDS.toMillis(30), 100));
     }
 }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java
index d5e2a1a842..7eaefd23bd 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java
@@ -124,7 +124,7 @@ public class RestrictedThreadPoolInactivityTimeoutTest 
extends JmsTestSupport {
 
         assertTrue("Should be at most inactivity monitor pool size * 2. Diff = 
" + diff, diff <= 2*poolSize);
 
-        assertTrue("all work complete", doneConsumers.await(10, 
TimeUnit.SECONDS));
+        assertTrue("all work complete", doneConsumers.await(30, 
TimeUnit.SECONDS));
     }
 
     @Override
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
index dd4dd8e356..4c5b193741 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
@@ -251,7 +251,7 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest {
                 return queue != null
                     && 
queue.getDestinationStatistics().getMessages().getCount() == numToSend
                     && 
queue.getDestinationStatistics().getInflight().getCount() == 0;
-            }, 5000, 100));
+            }, TimeUnit.SECONDS.toMillis(20), 100));
 
             // Use the retry-based consume to handle zero-prefetch dispatch 
timing:
             // with prioritized messages + lazy dispatch + redelivered 
messages in the
@@ -297,7 +297,7 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest {
             boolean finished = false;
 
             while (!finished) {
-                Message message = consumer.receive(returnedMessages.isEmpty() 
? 5000 : 1000);
+                Message message = consumer.receive(returnedMessages.isEmpty() 
? TimeUnit.SECONDS.toMillis(20) : 1000);
                 if (message == null) {
                     finished = true;
                 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to