This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 3a88c10ed1 Relax timeouts and macOS exclusions + small fixes (#1673)
3a88c10ed1 is described below
commit 3a88c10ed18d55d8c61ebc2fd8f54a7d462bf75b
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Tue Feb 24 16:40:18 2026 +0100
Relax timeouts and macOS exclusions + small fixes (#1673)
---
activemq-unit-tests/pom.xml | 24 +-
.../activemq/bugs/DuplicateFromStoreTest.java | 6 +-
.../network/DurableSyncNetworkBridgeTest.java | 86 +++---
.../network/DynamicNetworkTestSupport.java | 36 +--
...callyIncludedDestinationsDuplexNetworkTest.java | 27 +-
.../network/MQTTNetworkOfBrokersFailoverTest.java | 37 ++-
.../apache/activemq/network/SimpleNetworkTest.java | 291 +++++++++++----------
.../RestrictedThreadPoolInactivityTimeoutTest.java | 2 +-
.../QueueZeroPrefetchLazyDispatchPriorityTest.java | 4 +-
9 files changed, 283 insertions(+), 230 deletions(-)
diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml
index 68955145d1..7e1d3c73a5 100644
--- a/activemq-unit-tests/pom.xml
+++ b/activemq-unit-tests/pom.xml
@@ -953,7 +953,6 @@
<exclude>org/apache/activemq/transport/SoWriteTimeoutTest.*</exclude>
<exclude>org/apache/activemq/transport/TopicClusterTest.*</exclude>
<exclude>org/apache/activemq/transport/discovery/DiscoveryNetworkReconnectTest.*</exclude>
-
<exclude>org/apache/activemq/transport/discovery/DiscoveryTransportBrokerTest.*</exclude>
<exclude>org/apache/activemq/transport/discovery/DiscoveryTransportNoBrokerTest.*</exclude>
<exclude>org/apache/activemq/transport/discovery/DiscoveryUriTest.*</exclude>
<exclude>org/apache/activemq/transport/discovery/MasterSlaveDiscoveryTest.*</exclude>
@@ -1144,6 +1143,29 @@
</plugins>
</build>
</profile>
+ <profile>
+ <id>activemq.tests.mac.excludes</id>
+ <activation>
+ <os>
+ <family>mac</family>
+ </os>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <excludes combine.children="append">
+ <!-- Multicast and UDP based tests are unreliable on macOS
CI (no proper multicast interfaces) -->
+
<exclude>org/apache/activemq/transport/peer/PeerTransportTest.*</exclude>
+
<exclude>org/apache/activemq/transport/multicast/MulticastTransportTest.*</exclude>
+
<exclude>org/apache/activemq/network/MulticastNetworkTest.*</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
<profile>
<id>activemq.tests.aix.excludes</id>
<activation>
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java
index 753e41aa2c..04f633893d 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/DuplicateFromStoreTest.java
@@ -123,7 +123,7 @@ public class DuplicateFromStoreTest {
}
}
- @Test
+ @Test(timeout = 120_000)
public void testDuplicateMessage() throws Exception {
LOG.info("Testing for duplicate messages.");
@@ -134,10 +134,10 @@ public class DuplicateFromStoreTest {
createOpenwireClients(producers, consumers);
LOG.info("All producers and consumers got started. Awaiting their
termination");
- producersFinished.await(100, TimeUnit.MINUTES);
+ producersFinished.await(2, TimeUnit.MINUTES);
LOG.info("All producers have terminated. remaining to send: " +
totalMessagesToSend.get() + ", sent:" + totalMessagesSent.get());
- consumersFinished.await(100, TimeUnit.MINUTES);
+ consumersFinished.await(2, TimeUnit.MINUTES);
LOG.info("All consumers have terminated.");
producers.shutdownNow();
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
index 9727dbc41a..1791ec242f 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
@@ -49,7 +49,6 @@ import
org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import
org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
import org.apache.activemq.util.Wait;
-import org.apache.activemq.util.Wait.Condition;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
@@ -83,7 +82,7 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
private final FLOW flow;
@Rule
- public Timeout globalTimeout = new Timeout(30, TimeUnit.SECONDS);
+ public Timeout globalTimeout = new Timeout(60, TimeUnit.SECONDS);
@Parameters
public static Collection<Object[]> data() {
@@ -531,7 +530,6 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
session1.createDurableSubscriber(topic, "sub3");
session1.createDurableSubscriber(excludeTopic, "sub-exclude");
- Thread.sleep(1000);
assertNCDurableSubsCount(broker2, topic, 1);
assertNCDurableSubsCount(broker2, excludeTopic, 0);
@@ -570,13 +568,10 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
secondConnector.start();
//Make sure both bridges are connected
- assertTrue(Wait.waitFor(new Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1 &&
-
localBroker.getNetworkConnectors().get(1).activeBridges().size() == 1;
- }
- }, 10000, 500));
+ assertTrue(Wait.waitFor(() ->
+
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1 &&
+
localBroker.getNetworkConnectors().get(1).activeBridges().size() == 1,
+ TimeUnit.SECONDS.toMillis(10), 500));
//Make sure NC durables exist for both bridges
assertNCDurableSubsCount(broker2, topic2, 1);
@@ -637,13 +632,7 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
final DestinationStatistics remoteDestStatistics2 =
remoteBroker.getDestination(
new
ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
- assertTrue(Wait.waitFor(new Condition() {
-
- @Override
- public boolean isSatisified() throws Exception {
- return remoteDestStatistics2.getMessages().getCount() == 501;
- }
- }));
+ assertTrue(Wait.waitFor(() ->
remoteDestStatistics2.getMessages().getCount() == 501));
}
@@ -723,8 +712,36 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
included = new ActiveMQTopic(testTopicName);
doSetUpRemoteBroker(deleteAllMessages, remoteDataDir, 0);
doSetUpLocalBroker(deleteAllMessages, startNetworkConnector,
localDataDir);
- //Give time for advisories to propagate
- Thread.sleep(1000);
+ //Wait for the bridge to be fully started (advisory consumers
registered).
+ //Note: activeBridges().size() == 1 is NOT sufficient because bridges
are added
+ //to the map before start() completes asynchronously. We must wait for
the
+ //startedLatch which counts down after advisory consumers are
registered.
+ if (startNetworkConnector) {
+ waitForBridgeFullyStarted();
+ }
+ }
+
+ private void waitForBridgeFullyStarted() throws Exception {
+ // Wait for the local bridge to be fully started (advisory consumers
registered)
+ assertTrue("Local bridge should be fully started", Wait.waitFor(() -> {
+ if
(localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
+ return false;
+ }
+ final NetworkBridge bridge =
localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+ if (bridge instanceof DemandForwardingBridgeSupport) {
+ return ((DemandForwardingBridgeSupport)
bridge).startedLatch.getCount() == 0;
+ }
+ return true;
+ }, TimeUnit.SECONDS.toMillis(10), 100));
+
+ // Also wait for the duplex bridge on the remote broker to be fully
started.
+ // The duplex connector creates a separate DemandForwardingBridge on
the remote side
+ // that also needs its advisory consumers registered before it can
process events.
+ assertTrue("Duplex bridge should be fully started", Wait.waitFor(() ->
{
+ final DemandForwardingBridge duplexBridge = findDuplexBridge(
+ remoteBroker.getTransportConnectors().get(0));
+ return duplexBridge != null &&
duplexBridge.startedLatch.getCount() == 0;
+ }, TimeUnit.SECONDS.toMillis(10), 100));
}
protected void restartLocalBroker(boolean startNetworkConnector) throws
Exception {
@@ -757,12 +774,12 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
localConnection.start();
if (startNetworkConnector) {
- Wait.waitFor(new Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1;
- }
- }, 5000, 500);
+ // Best-effort wait for the bridge to appear. Do NOT use
assertTrue here
+ // because some tests restart localBroker before remoteBroker is
running,
+ // relying on the bridge connecting later when remoteBroker
restarts.
+ // Tests that need the bridge to be fully started call
assertBridgeStarted() explicitly.
+ Wait.waitFor(() ->
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1,
+ TimeUnit.SECONDS.toMillis(10), 500);
}
localSession = localConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
@@ -881,19 +898,16 @@ public class DurableSyncNetworkBridgeTest extends
DynamicNetworkTestSupport {
protected void waitForSubscriptionInactive(final BrokerService
brokerService,
final ActiveMQTopic topic,
final String subName) throws
Exception {
- assertTrue("Subscription should become inactive", Wait.waitFor(new
Condition() {
- @Override
- public boolean isSatisified() throws Exception {
-
List<org.apache.activemq.broker.region.DurableTopicSubscription> subs =
getSubscriptions(brokerService, topic);
- for
(org.apache.activemq.broker.region.DurableTopicSubscription sub : subs) {
- if
(sub.getSubscriptionKey().getSubscriptionName().equals(subName)) {
- return !sub.isActive();
- }
+ assertTrue("Subscription should become inactive", Wait.waitFor(() -> {
+ final
List<org.apache.activemq.broker.region.DurableTopicSubscription> subs =
getSubscriptions(brokerService, topic);
+ for (final
org.apache.activemq.broker.region.DurableTopicSubscription sub : subs) {
+ if
(sub.getSubscriptionKey().getSubscriptionName().equals(subName)) {
+ return !sub.isActive();
}
- // If subscription doesn't exist, it's considered inactive
- return true;
}
- }, 5000, 100));
+ // If subscription doesn't exist, it's considered inactive
+ return true;
+ }, TimeUnit.SECONDS.toMillis(10), 100));
}
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
index 87ee09040e..a347ffe3b5 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
@@ -24,6 +24,7 @@ import java.io.File;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import jakarta.jms.Connection;
import jakarta.jms.JMSException;
@@ -45,7 +46,6 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.activemq.util.Wait;
-import org.apache.activemq.util.Wait.Condition;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
@@ -95,9 +95,16 @@ public abstract class DynamicNetworkTestSupport {
}
protected void assertBridgeStarted() throws Exception {
- assertTrue(Wait.waitFor(
- () ->
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1,
- 10000, 500));
+ assertTrue("Bridge should be fully started", Wait.waitFor(() -> {
+ if
(localBroker.getNetworkConnectors().get(0).activeBridges().size() != 1) {
+ return false;
+ }
+ final NetworkBridge bridge =
localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+ if (bridge instanceof DemandForwardingBridgeSupport) {
+ return ((DemandForwardingBridgeSupport)
bridge).startedLatch.getCount() == 0;
+ }
+ return true;
+ }, TimeUnit.SECONDS.toMillis(10), 500));
}
protected RemoveSubscriptionInfo getRemoveSubscriptionInfo(final
ConnectionContext context,
@@ -135,17 +142,19 @@ public abstract class DynamicNetworkTestSupport {
protected void assertNCDurableSubsCount(final BrokerService brokerService,
final ActiveMQTopic dest, final int count) throws Exception {
- assertTrue(Wait.waitFor(() -> count == getNCDurableSubs(brokerService,
dest).size(),
- 10000, 500));
+ assertTrue("Expected " + count + " NC durable subs on " + dest,
+ Wait.waitFor(() -> count == getNCDurableSubs(brokerService,
dest).size(),
+ TimeUnit.SECONDS.toMillis(30), 500));
}
protected void assertConsumersCount(final BrokerService brokerService,
final ActiveMQDestination dest, final int count) throws Exception {
assertTrue(Wait.waitFor(() -> count == getConsumers(brokerService,
dest).size(),
10000, 500));
- Thread.sleep(1000);
- // Check one more time after a short pause to make sure the count
didn't increase past what we wanted
- assertEquals(count, getConsumers(brokerService, dest).size());
+ // Wait a bit longer and verify the count is stable (didn't increase
past what we wanted)
+ assertTrue("Consumer count should remain stable at " + count,
+ Wait.waitFor(() -> count == getConsumers(brokerService,
dest).size(),
+ TimeUnit.SECONDS.toMillis(5), 500));
}
protected List<Subscription> getConsumers(final BrokerService
brokerService,
@@ -208,12 +217,9 @@ public abstract class DynamicNetworkTestSupport {
protected void assertSubscriptionsCount(final BrokerService brokerService,
final ActiveMQTopic dest, final int count) throws Exception {
- assertTrue(Wait.waitFor(new Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return count == getSubscriptions(brokerService, dest).size();
- }
- }, 10000, 500));
+ assertTrue("Expected " + count + " subscriptions on " + dest,
+ Wait.waitFor(() -> count == getSubscriptions(brokerService,
dest).size(),
+ TimeUnit.SECONDS.toMillis(30), 500));
}
protected void assertSubscriptionMapCounts(NetworkBridge networkBridge,
final int count) {
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
index d52836e4f6..229c522559 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
@@ -80,7 +80,7 @@ public class DynamicallyIncludedDestinationsDuplexNetworkTest
extends SimpleNetw
assertEquals("Destination not deleted", 0,
remoteBroker.getAdminView().getTemporaryQueues().length);
}
- @Test
+ @Test(timeout = 60 * 1000)
public void testDynamicallyIncludedDestinationsForDuplex() throws
Exception{
// Once the bridge is set up, we should see the filter used for the
duplex end of the bridge
// only subscribe to the specific destinations included in the
<dynamicallyIncludedDestinations> list
@@ -88,11 +88,11 @@ public class
DynamicallyIncludedDestinationsDuplexNetworkTest extends SimpleNetw
// is correct
// the bridge on the remote broker has the correct filter
- TransportConnection bridgeConnection =
getDuplexBridgeConnectionFromRemote();
+ final TransportConnection bridgeConnection =
getDuplexBridgeConnectionFromRemote();
assertNotNull(bridgeConnection);
- DemandForwardingBridge duplexBridge =
getDuplexBridgeFromConnection(bridgeConnection);
+ final DemandForwardingBridge duplexBridge =
waitForDuplexBridge(bridgeConnection);
assertNotNull(duplexBridge);
- NetworkBridgeConfiguration configuration =
getConfigurationFromNetworkBridge(duplexBridge);
+ final NetworkBridgeConfiguration configuration =
getConfigurationFromNetworkBridge(duplexBridge);
assertNotNull(configuration);
assertFalse("This destinationFilter does not include ONLY the
destinations specified in dynamicallyIncludedDestinations",
configuration.getDestinationFilter().equals(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX
+ ">"));
@@ -115,17 +115,20 @@ public class
DynamicallyIncludedDestinationsDuplexNetworkTest extends SimpleNetw
return bridge;
}
+ private DemandForwardingBridge waitForDuplexBridge(final
TransportConnection bridgeConnection) throws Exception {
+ assertTrue("Timed out waiting for duplex bridge to be fully started",
+ Wait.waitFor(() -> {
+ final DemandForwardingBridge bridge =
getDuplexBridgeFromConnection(bridgeConnection);
+ return bridge != null && bridge.getRemoteBrokerName() !=
null;
+ }));
+ return getDuplexBridgeFromConnection(bridgeConnection);
+ }
+
public TransportConnection getDuplexBridgeConnectionFromRemote() throws
Exception {
final TransportConnector transportConnector =
remoteBroker.getTransportConnectorByScheme("tcp");
assertTrue("Timed out waiting for duplex bridge connection",
- Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() {
- return !transportConnector.getConnections().isEmpty();
- }
- }));
- CopyOnWriteArrayList<TransportConnection> transportConnections =
transportConnector.getConnections();
- return transportConnections.get(0);
+ Wait.waitFor(() ->
!transportConnector.getConnections().isEmpty()));
+ return transportConnector.getConnections().get(0);
}
@Override
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java
index ad52fecf70..bc6c985e24 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java
@@ -23,9 +23,7 @@ import java.util.concurrent.TimeUnit;
import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
-import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageListener;
import jakarta.jms.Session;
import javax.management.ObjectName;
@@ -35,6 +33,7 @@ import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.activemq.util.Wait;
import org.apache.commons.lang.ArrayUtils;
import org.fusesource.hawtdispatch.Dispatch;
import org.fusesource.mqtt.client.BlockingConnection;
@@ -69,6 +68,10 @@ public class MQTTNetworkOfBrokersFailoverTest extends
NetworkTestSupport {
remoteBroker.addNetworkConnector(nc);
nc.start();
+ // Wait for the network bridge to be established before proceeding
+ assertTrue("Network bridge should be established",
+ Wait.waitFor(() -> nc.activeBridges().size() == 1,
TimeUnit.SECONDS.toMillis(10), 500));
+
// mqtt port should have been assigned by now
assertFalse(localBrokerMQTTPort == -1);
assertFalse(remoteBrokerMQTTPort == -1);
@@ -104,7 +107,7 @@ public class MQTTNetworkOfBrokersFailoverTest extends
NetworkTestSupport {
remoteConn.connect();
remoteConn.subscribe(new Topic[]{new Topic("foo/bar",
QoS.AT_LEAST_ONCE)});
- assertTrue("No destination detected!", consumerNetworked.await(1,
TimeUnit.SECONDS));
+ assertTrue("No destination detected!", consumerNetworked.await(5,
TimeUnit.SECONDS));
assertQueueExistsOn(remoteBroker,
"Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
assertQueueExistsOn(broker,
"Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
remoteConn.disconnect();
@@ -149,23 +152,17 @@ public class MQTTNetworkOfBrokersFailoverTest extends
NetworkTestSupport {
final Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination dest =
session.createTopic("ActiveMQ.Advisory.Consumer.Queue.Consumer.foo:AT_LEAST_ONCE.VirtualTopic.foo.bar");
MessageConsumer consumer = session.createConsumer(dest);
- consumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message message) {
- latch.countDown();
- // shutdown this connection
- Dispatch.getGlobalQueue().execute(new Runnable() {
- @Override
- public void run() {
- try {
- session.close();
- connection.close();
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- });
- }
+ consumer.setMessageListener(message -> {
+ latch.countDown();
+ // shutdown this connection
+ Dispatch.getGlobalQueue().execute(() -> {
+ try {
+ session.close();
+ connection.close();
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ });
});
return latch;
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
index 6ccf0137b6..08ec477e8d 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
@@ -16,20 +16,15 @@
*/
package org.apache.activemq.network;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import java.util.Arrays;
-import java.util.concurrent.ConcurrentMap;
-
import jakarta.jms.DeliveryMode;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
-import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import jakarta.jms.TextMessage;
import jakarta.jms.TopicRequestor;
@@ -41,13 +36,13 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.util.Wait;
-import org.apache.activemq.util.Wait.Condition;
import org.junit.Ignore;
import org.junit.Test;
import org.springframework.context.support.AbstractApplicationContext;
+import java.util.concurrent.TimeUnit;
+
public class SimpleNetworkTest extends BaseNetworkTest {
protected static final int MESSAGE_COUNT = 10;
@@ -70,21 +65,21 @@ public class SimpleNetworkTest extends BaseNetworkTest {
@Test(timeout = 60 * 1000)
public void testMessageCompression() throws Exception {
- ActiveMQConnection localAmqConnection = (ActiveMQConnection)
localConnection;
+ final ActiveMQConnection localAmqConnection = (ActiveMQConnection)
localConnection;
localAmqConnection.setUseCompression(true);
- MessageConsumer consumer1 = remoteSession.createConsumer(included);
- MessageProducer producer = localSession.createProducer(included);
+ final MessageConsumer consumer1 =
remoteSession.createConsumer(included);
+ final MessageProducer producer = localSession.createProducer(included);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
waitForConsumerRegistration(localBroker, 1, included);
for (int i = 0; i < MESSAGE_COUNT; i++) {
- Message test = localSession.createTextMessage("test-" + i);
+ final Message test = localSession.createTextMessage("test-" + i);
producer.send(test);
- Message msg = consumer1.receive(3000);
+ final Message msg = consumer1.receive(3000);
assertNotNull("not null? message: " + i, msg);
- ActiveMQMessage amqMessage = (ActiveMQMessage) msg;
+ final ActiveMQMessage amqMessage = (ActiveMQMessage) msg;
assertTrue(amqMessage.isCompressed());
}
// ensure no more messages received
@@ -96,30 +91,27 @@ public class SimpleNetworkTest extends BaseNetworkTest {
@Test(timeout = 60 * 1000)
public void testRequestReply() throws Exception {
final MessageProducer remoteProducer =
remoteSession.createProducer(null);
- MessageConsumer remoteConsumer =
remoteSession.createConsumer(included);
- remoteConsumer.setMessageListener(new MessageListener() {
- @Override
- public void onMessage(Message msg) {
- try {
- TextMessage textMsg = (TextMessage)msg;
- String payload = "REPLY: " + textMsg.getText();
- Destination replyTo;
- replyTo = msg.getJMSReplyTo();
- textMsg.clearBody();
- textMsg.setText(payload);
- remoteProducer.send(replyTo, textMsg);
- } catch (JMSException e) {
- e.printStackTrace();
- }
+ final MessageConsumer remoteConsumer =
remoteSession.createConsumer(included);
+ remoteConsumer.setMessageListener(msg -> {
+ try {
+ final TextMessage textMsg = (TextMessage) msg;
+ final String payload = "REPLY: " + textMsg.getText();
+ final Destination replyTo = msg.getJMSReplyTo();
+ textMsg.clearBody();
+ textMsg.setText(payload);
+ remoteProducer.send(replyTo, textMsg);
+ } catch (JMSException e) {
+ e.printStackTrace();
}
});
- TopicRequestor requestor = new
TopicRequestor((TopicSession)localSession, included);
- // allow for consumer infos to perculate arround
- Thread.sleep(5000);
+ final TopicRequestor requestor = new TopicRequestor((TopicSession)
localSession, included);
+ // Wait for consumer demand to propagate across the network bridge
+ waitForConsumerRegistration(localBroker, 1, included);
+
for (int i = 0; i < MESSAGE_COUNT; i++) {
- TextMessage msg = localSession.createTextMessage("test msg: " + i);
- TextMessage result = (TextMessage)requestor.request(msg);
+ final TextMessage msg = localSession.createTextMessage("test msg:
" + i);
+ final TextMessage result = (TextMessage) requestor.request(msg);
assertNotNull(result);
LOG.info(result.getText());
}
@@ -129,13 +121,14 @@ public class SimpleNetworkTest extends BaseNetworkTest {
@Test(timeout = 60 * 1000)
public void testFiltering() throws Exception {
- MessageConsumer includedConsumer =
remoteSession.createConsumer(included);
- MessageConsumer excludedConsumer =
remoteSession.createConsumer(excluded);
- MessageProducer includedProducer =
localSession.createProducer(included);
- MessageProducer excludedProducer =
localSession.createProducer(excluded);
- // allow for consumer infos to perculate around
- Thread.sleep(2000);
- Message test = localSession.createTextMessage("test");
+ final MessageConsumer includedConsumer =
remoteSession.createConsumer(included);
+ final MessageConsumer excludedConsumer =
remoteSession.createConsumer(excluded);
+ final MessageProducer includedProducer =
localSession.createProducer(included);
+ final MessageProducer excludedProducer =
localSession.createProducer(excluded);
+ // Wait for consumer demand to propagate across the network bridge
+ waitForConsumerRegistration(localBroker, 1, included);
+
+ final Message test = localSession.createTextMessage("test");
includedProducer.send(test);
excludedProducer.send(test);
assertNull(excludedConsumer.receive(1000));
@@ -146,15 +139,15 @@ public class SimpleNetworkTest extends BaseNetworkTest {
@Test(timeout = 60 * 1000)
public void testConduitBridge() throws Exception {
- MessageConsumer consumer1 = remoteSession.createConsumer(included);
- MessageConsumer consumer2 = remoteSession.createConsumer(included);
- MessageProducer producer = localSession.createProducer(included);
+ final MessageConsumer consumer1 =
remoteSession.createConsumer(included);
+ final MessageConsumer consumer2 =
remoteSession.createConsumer(included);
+ final MessageProducer producer = localSession.createProducer(included);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
waitForConsumerRegistration(localBroker, 2, included);
for (int i = 0; i < MESSAGE_COUNT; i++) {
- Message test = localSession.createTextMessage("test-" + i);
+ final Message test = localSession.createTextMessage("test-" + i);
producer.send(test);
assertNotNull(consumer1.receive(1000));
assertNotNull(consumer2.receive(1000));
@@ -170,27 +163,31 @@ public class SimpleNetworkTest extends BaseNetworkTest {
}
private void waitForConsumerRegistration(final BrokerService
brokerService, final int min, final ActiveMQDestination destination) throws
Exception {
- assertTrue("Internal bridge consumers registered in time",
Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- Object[] bridges =
brokerService.getNetworkConnectors().get(0).bridges.values().toArray();
- if (bridges.length > 0) {
- LOG.info(brokerService + " bridges " +
Arrays.toString(bridges));
- DemandForwardingBridgeSupport
demandForwardingBridgeSupport = (DemandForwardingBridgeSupport) bridges[0];
- ConcurrentMap<ConsumerId, DemandSubscription>
forwardingBridges = demandForwardingBridgeSupport.getLocalSubscriptionMap();
- LOG.info(brokerService + " bridge " +
demandForwardingBridgeSupport + ", localSubs: " + forwardingBridges);
- if (!forwardingBridges.isEmpty()) {
- for (DemandSubscription demandSubscription :
forwardingBridges.values()) {
- if
(demandSubscription.getLocalInfo().getDestination().equals(destination)) {
- LOG.info(brokerService + " DemandSubscription
" + demandSubscription + ", size: " + demandSubscription.size());
- return demandSubscription.size() >= min;
- }
+ // Wait for the bridge demand subscriptions to register the expected
number of
+ // remote consumers. With conduit subscriptions, multiple remote
consumers map
+ // to a single local subscription, so we check
DemandSubscription.size().
+ assertTrue("Bridge demand subscription registered for " + destination,
Wait.waitFor(() -> {
+ if (brokerService.getNetworkConnectors().isEmpty()) {
+ return false;
+ }
+ for (final NetworkBridge bridge :
brokerService.getNetworkConnectors().get(0).activeBridges()) {
+ if (bridge instanceof DemandForwardingBridgeSupport) {
+ final DemandForwardingBridgeSupport dfBridge =
(DemandForwardingBridgeSupport) bridge;
+ for (final DemandSubscription ds :
dfBridge.getLocalSubscriptionMap().values()) {
+ if
(ds.getLocalInfo().getDestination().equals(destination)) {
+ return ds.size() >= min;
}
}
}
- return false;
}
- }));
+ return false;
+ }, TimeUnit.SECONDS.toMillis(30), 100));
+
+ // Also verify the consumer is actually dispatching on the broker's
destination.
+ // The DemandSubscription may exist before the local consumer is fully
registered.
+ assertTrue("Consumer dispatching on " + destination, Wait.waitFor(
+ () ->
brokerService.getDestination(destination).getDestinationStatistics().getConsumers().getCount()
>= 1,
+ TimeUnit.SECONDS.toMillis(10), 100));
}
//Added for AMQ-6465 to make sure memory usage decreased back to 0 after
messages are forwarded
@@ -198,27 +195,28 @@ public class SimpleNetworkTest extends BaseNetworkTest {
@Test(timeout = 60 * 1000)
public void testDurableTopicSubForwardMemoryUsage() throws Exception {
// create a remote durable consumer to create demand
- MessageConsumer remoteConsumer =
remoteSession.createDurableSubscriber(included, consumerName);
- Thread.sleep(1000);
+ final MessageConsumer remoteConsumer =
remoteSession.createDurableSubscriber(included, consumerName);
+ // Wait for consumer demand to propagate across the network bridge
+ waitForConsumerRegistration(localBroker, 1, included);
- MessageProducer producer = localSession.createProducer(included);
+ final MessageProducer producer = localSession.createProducer(included);
for (int i = 0; i < MESSAGE_COUNT; i++) {
- Message test = localSession.createTextMessage("test-" + i);
+ final Message test = localSession.createTextMessage("test-" + i);
producer.send(test);
}
- Thread.sleep(1000);
-
- //Make sure stats are set
- assertEquals(MESSAGE_COUNT,
-
localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount());
-
- assertTrue(Wait.waitFor(new Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return
localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0;
- }
- }, 10000, 500));
+ //Make sure stats are set - wait for forwards to complete
+ assertTrue("Forwards count did not reach expected value", Wait.waitFor(
+ () -> {
+ final long count =
localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount();
+ LOG.info("testDurableTopicSubForwardMemoryUsage: forwards
count = " + count + " (expected " + MESSAGE_COUNT + ")");
+ return count == MESSAGE_COUNT;
+ },
+ TimeUnit.SECONDS.toMillis(30), 100));
+
+ assertTrue("Memory usage did not return to 0", Wait.waitFor(
+ () -> localBroker.getSystemUsage().getMemoryUsage().getUsage()
== 0,
+ TimeUnit.SECONDS.toMillis(10), 500));
remoteConsumer.close();
}
@@ -226,28 +224,25 @@ public class SimpleNetworkTest extends BaseNetworkTest {
//to the other broker
@Test(timeout = 60 * 1000)
public void testTopicSubForwardMemoryUsage() throws Exception {
- // create a remote durable consumer to create demand
- MessageConsumer remoteConsumer =
remoteSession.createConsumer(included);
- Thread.sleep(1000);
+ // create a remote consumer to create demand
+ final MessageConsumer remoteConsumer =
remoteSession.createConsumer(included);
+ // Wait for consumer demand to propagate across the network bridge
+ waitForConsumerRegistration(localBroker, 1, included);
- MessageProducer producer = localSession.createProducer(included);
+ final MessageProducer producer = localSession.createProducer(included);
for (int i = 0; i < MESSAGE_COUNT; i++) {
- Message test = localSession.createTextMessage("test-" + i);
+ final Message test = localSession.createTextMessage("test-" + i);
producer.send(test);
}
- Thread.sleep(1000);
-
- //Make sure stats are set
- assertEquals(MESSAGE_COUNT,
-
localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount());
- assertTrue(Wait.waitFor(new Condition() {
+ //Make sure stats are set - wait for forwards to complete
+ assertTrue("Forwards count did not reach expected value", Wait.waitFor(
+ () ->
localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount()
== MESSAGE_COUNT,
+ TimeUnit.SECONDS.toMillis(30), 100));
- @Override
- public boolean isSatisified() throws Exception {
- return
localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0;
- }
- }, 10000, 500));
+ assertTrue("Memory usage did not return to 0", Wait.waitFor(
+ () -> localBroker.getSystemUsage().getMemoryUsage().getUsage()
== 0,
+ TimeUnit.SECONDS.toMillis(10), 500));
for (int i = 0; i < MESSAGE_COUNT; i++) {
assertNotNull("message count: " + i, remoteConsumer.receive(2500));
@@ -259,28 +254,25 @@ public class SimpleNetworkTest extends BaseNetworkTest {
//to the other broker
@Test(timeout = 60 * 1000)
public void testQueueSubForwardMemoryUsage() throws Exception {
- ActiveMQQueue queue = new ActiveMQQueue("include.test.foo");
- MessageConsumer remoteConsumer = remoteSession.createConsumer(queue);
- Thread.sleep(1000);
+ final ActiveMQQueue queue = new ActiveMQQueue("include.test.foo");
+ final MessageConsumer remoteConsumer =
remoteSession.createConsumer(queue);
+ // Wait for consumer demand to propagate across the network bridge
+ waitForConsumerRegistration(localBroker, 1, queue);
- MessageProducer producer = localSession.createProducer(queue);
+ final MessageProducer producer = localSession.createProducer(queue);
for (int i = 0; i < MESSAGE_COUNT; i++) {
- Message test = localSession.createTextMessage("test-" + i);
+ final Message test = localSession.createTextMessage("test-" + i);
producer.send(test);
}
- Thread.sleep(1000);
-
- //Make sure stats are set
- assertEquals(MESSAGE_COUNT,
-
localBroker.getDestination(queue).getDestinationStatistics().getForwards().getCount());
- assertTrue(Wait.waitFor(new Condition() {
+ //Make sure stats are set - wait for forwards to complete
+ assertTrue("Forwards count did not reach expected value", Wait.waitFor(
+ () ->
localBroker.getDestination(queue).getDestinationStatistics().getForwards().getCount()
== MESSAGE_COUNT,
+ TimeUnit.SECONDS.toMillis(30), 100));
- @Override
- public boolean isSatisified() throws Exception {
- return
localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0;
- }
- }, 10000, 500));
+ assertTrue("Memory usage did not return to 0", Wait.waitFor(
+ () -> localBroker.getSystemUsage().getMemoryUsage().getUsage()
== 0,
+ TimeUnit.SECONDS.toMillis(10), 500));
for (int i = 0; i < MESSAGE_COUNT; i++) {
assertNotNull("message count: " + i, remoteConsumer.receive(2500));
@@ -291,28 +283,30 @@ public class SimpleNetworkTest extends BaseNetworkTest {
@Test(timeout = 60 * 1000)
public void testDurableStoreAndForward() throws Exception {
// create a remote durable consumer
- MessageConsumer remoteConsumer =
remoteSession.createDurableSubscriber(included, consumerName);
- Thread.sleep(1000);
+ final MessageConsumer remoteConsumer =
remoteSession.createDurableSubscriber(included, consumerName);
+ // Wait for consumer demand to propagate across the network bridge
+ waitForConsumerRegistration(localBroker, 1, included);
+
// now close everything down and restart
doTearDown();
doSetUp(false);
- MessageProducer producer = localSession.createProducer(included);
+ final MessageProducer producer = localSession.createProducer(included);
for (int i = 0; i < MESSAGE_COUNT; i++) {
- Message test = localSession.createTextMessage("test-" + i);
+ final Message test = localSession.createTextMessage("test-" + i);
producer.send(test);
}
- Thread.sleep(1000);
- //Make sure stats are set
- assertEquals(MESSAGE_COUNT,
-
localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount());
+ //Make sure stats are set - wait for forwards to complete
+ assertTrue("Forwards count did not reach expected value", Wait.waitFor(
+ () ->
localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount()
== MESSAGE_COUNT,
+ TimeUnit.SECONDS.toMillis(30), 100));
// close everything down and restart
doTearDown();
doSetUp(false);
- remoteConsumer = remoteSession.createDurableSubscriber(included,
consumerName);
+ final MessageConsumer remoteConsumer2 =
remoteSession.createDurableSubscriber(included, consumerName);
for (int i = 0; i < MESSAGE_COUNT; i++) {
- assertNotNull("message count: " + i, remoteConsumer.receive(2500));
+ assertNotNull("message count: " + i,
remoteConsumer2.receive(2500));
}
}
@@ -320,35 +314,55 @@ public class SimpleNetworkTest extends BaseNetworkTest {
"it requires a connection per durable to match that connectionId")
public void testDurableStoreAndForwardReconnect() throws Exception {
// create a local durable consumer
- MessageConsumer localConsumer =
localSession.createDurableSubscriber(included, consumerName);
- Thread.sleep(5000);
+ final MessageConsumer localConsumer =
localSession.createDurableSubscriber(included, consumerName);
+ // Wait for consumer demand to propagate across the network bridge
+ waitForConsumerRegistration(localBroker, 1, included);
+
// now close everything down and restart
doTearDown();
doSetUp(false);
// send messages
- MessageProducer producer = localSession.createProducer(included);
+ final MessageProducer producer = localSession.createProducer(included);
for (int i = 0; i < MESSAGE_COUNT; i++) {
- Message test = localSession.createTextMessage("test-" + i);
+ final Message test = localSession.createTextMessage("test-" + i);
producer.send(test);
}
- Thread.sleep(5000);
+
+ //Make sure stats are set - wait for forwards to complete
+ assertTrue("Forwards count did not reach expected value", Wait.waitFor(
+ () ->
localBroker.getDestination(included).getDestinationStatistics().getForwards().getCount()
== MESSAGE_COUNT,
+ TimeUnit.SECONDS.toMillis(30), 100));
+
// consume some messages locally
- localConsumer = localSession.createDurableSubscriber(included,
consumerName);
- LOG.info("Consume from local consumer: " + localConsumer);
+ final MessageConsumer localConsumer2 =
localSession.createDurableSubscriber(included, consumerName);
+ LOG.info("Consume from local consumer: " + localConsumer2);
for (int i = 0; i < MESSAGE_COUNT / 2; i++) {
- assertNotNull("message count: " + i, localConsumer.receive(2500));
+ assertNotNull("message count: " + i, localConsumer2.receive(2500));
}
- Thread.sleep(5000);
+
+ // Wait for local messages to be consumed
+ assertTrue("Local messages consumed", Wait.waitFor(
+ () ->
localBroker.getDestination(included).getDestinationStatistics().getDequeues().getCount()
>= MESSAGE_COUNT / 2,
+ TimeUnit.SECONDS.toMillis(10), 100));
+
// close everything down and restart
doTearDown();
doSetUp(false);
- Thread.sleep(5000);
+
+ // Wait for network bridge to re-form
+ assertTrue("Network bridge re-formed", Wait.waitFor(
+ () -> !localBroker.getNetworkConnectors().isEmpty()
+ &&
!localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty(),
+ TimeUnit.SECONDS.toMillis(10), 100));
LOG.info("Consume from remote");
// consume the rest remotely
- MessageConsumer remoteConsumer =
remoteSession.createDurableSubscriber(included, consumerName);
+ final MessageConsumer remoteConsumer =
remoteSession.createDurableSubscriber(included, consumerName);
LOG.info("Remote consumer: " + remoteConsumer);
- Thread.sleep(5000);
+
+ // Wait for consumer demand to propagate
+ waitForConsumerRegistration(localBroker, 1, included);
+
for (int i = 0; i < MESSAGE_COUNT / 2; i++) {
assertNotNull("message count: " + i,
remoteConsumer.receive(10000));
}
@@ -359,14 +373,11 @@ public class SimpleNetworkTest extends BaseNetworkTest {
final NetworkBridge localBridge =
localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
final NetworkBridge remoteBridge =
remoteBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return expectedLocalSent ==
localBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
- 0 ==
localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount() &&
- expectedRemoteSent ==
remoteBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
- 0 ==
remoteBridge.getNetworkBridgeStatistics().getReceivedCount().getCount();
- }
- }));
+ assertTrue(Wait.waitFor(() ->
+ expectedLocalSent ==
localBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
+ 0 ==
localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount() &&
+ expectedRemoteSent ==
remoteBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
+ 0 ==
remoteBridge.getNetworkBridgeStatistics().getReceivedCount().getCount()
+ , TimeUnit.SECONDS.toMillis(30), 100));
}
}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java
index d5e2a1a842..7eaefd23bd 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/RestrictedThreadPoolInactivityTimeoutTest.java
@@ -124,7 +124,7 @@ public class RestrictedThreadPoolInactivityTimeoutTest
extends JmsTestSupport {
assertTrue("Should be at most inactivity monitor pool size * 2. Diff =
" + diff, diff <= 2*poolSize);
- assertTrue("all work complete", doneConsumers.await(10,
TimeUnit.SECONDS));
+ assertTrue("all work complete", doneConsumers.await(30,
TimeUnit.SECONDS));
}
@Override
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
index dd4dd8e356..4c5b193741 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/QueueZeroPrefetchLazyDispatchPriorityTest.java
@@ -251,7 +251,7 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest {
return queue != null
&&
queue.getDestinationStatistics().getMessages().getCount() == numToSend
&&
queue.getDestinationStatistics().getInflight().getCount() == 0;
- }, 5000, 100));
+ }, TimeUnit.SECONDS.toMillis(20), 100));
// Use the retry-based consume to handle zero-prefetch dispatch
timing:
// with prioritized messages + lazy dispatch + redelivered
messages in the
@@ -297,7 +297,7 @@ public class QueueZeroPrefetchLazyDispatchPriorityTest {
boolean finished = false;
while (!finished) {
- Message message = consumer.receive(returnedMessages.isEmpty()
? 5000 : 1000);
+ Message message = consumer.receive(returnedMessages.isEmpty()
? TimeUnit.SECONDS.toMillis(20) : 1000);
if (message == null) {
finished = true;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact