This is an automated email from the ASF dual-hosted git repository.
jlmonteiro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/main by this push:
new 92c8b58355 Improve duplex network wait conditions and ports (#1669)
92c8b58355 is described below
commit 92c8b58355d33b6e7dec8ac546c8020a6abce1a8
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Wed Feb 25 13:32:44 2026 +0100
Improve duplex network wait conditions and ports (#1669)
* Improve duplex network wait conditions and ports
* Add CopyOnWriteArrayList import to support thread-safe list operations
* Override addNetworkConnectors method in DuplexNetworkTest and
MulticastNetworkTest to use what's in the XML for now
* Remove overridden addNetworkConnectors method in DuplexNetworkTest as
it's no longer needed
---
.../MessageDestinationVirtualTopicTest.java | 93 +++++++++++++++-------
.../apache/activemq/network/BaseNetworkTest.java | 16 ++--
.../network/DynamicNetworkTestSupport.java | 5 +-
...callyIncludedDestinationsDuplexNetworkTest.java | 70 ++++++++++------
.../duplexDynamicIncludedDestLocalBroker.xml | 26 +-----
5 files changed, 126 insertions(+), 84 deletions(-)
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java
index cf0ae79cf3..5ac3495e2d 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java
@@ -18,6 +18,9 @@ package org.apache.activemq.broker.virtual;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.util.Wait;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
@@ -27,9 +30,12 @@ import
org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import jakarta.annotation.Resource;
import jakarta.jms.*;
+import java.net.URI;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({ "virtual-topic-network-test.xml" })
@@ -53,45 +59,64 @@ public class MessageDestinationVirtualTopicTest {
private Session session1;
- public void init() throws JMSException {
+ public void init() throws Exception {
+ // Get actual assigned ephemeral ports
+ final String broker1URL =
broker1.getTransportConnectors().get(0).getConnectUri().toString();
+ final String broker2URL =
broker2.getTransportConnectors().get(0).getConnectUri().toString();
+ LOG.info("Broker1 URL: {}", broker1URL);
+ LOG.info("Broker2 URL: {}", broker2URL);
+
+ // Add network connector from broker2 to broker1 programmatically
using actual port
+ final DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector(
+ new URI("static://(" + broker1URL + ")"));
+ nc.setName("linkToBrokerB1");
+ nc.setNetworkTTL(1);
+ nc.setDuplex(true);
+ broker2.addNetworkConnector(nc);
+ nc.start();
+
+ // Wait for bridge to be established
+ assertTrue("Network bridge should be established",
+ Wait.waitFor(() -> nc.activeBridges().size() == 1, 10_000, 500));
+
// Create connection on Broker B2
- ConnectionFactory broker2ConnectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:62616");
- Connection connection2 = broker2ConnectionFactory.createConnection();
+ final ConnectionFactory broker2ConnectionFactory = new
ActiveMQConnectionFactory(broker2URL);
+ final Connection connection2 =
broker2ConnectionFactory.createConnection();
connection2.start();
- Session session2 = connection2.createSession(false,
Session.AUTO_ACKNOWLEDGE);
- Queue consumerDQueue =
session2.createQueue("Consumer.D.VirtualTopic.T1");
+ final Session session2 = connection2.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ final Queue consumerDQueue =
session2.createQueue("Consumer.D.VirtualTopic.T1");
// Bind listener on queue for consumer D
- MessageConsumer consumer = session2.createConsumer(consumerDQueue);
+ final MessageConsumer consumer =
session2.createConsumer(consumerDQueue);
listener2 = new SimpleMessageListener();
consumer.setMessageListener(listener2);
// Create connection on Broker B1
- ConnectionFactory broker1ConnectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61616");
- Connection connection1 = broker1ConnectionFactory.createConnection();
+ final ConnectionFactory broker1ConnectionFactory = new
ActiveMQConnectionFactory(broker1URL);
+ final Connection connection1 =
broker1ConnectionFactory.createConnection();
connection1.start();
session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue consumerCQueue =
session1.createQueue("Consumer.C.VirtualTopic.T1");
+ final Queue consumerCQueue =
session1.createQueue("Consumer.C.VirtualTopic.T1");
- // Bind listener on queue for consumer D
- MessageConsumer consumer1 = session1.createConsumer(consumerCQueue);
+ // Bind listener on queue for consumer C
+ final MessageConsumer consumer1 =
session1.createConsumer(consumerCQueue);
listener1 = new SimpleMessageListener();
consumer1.setMessageListener(listener1);
- // Create listener on Broker B1 for VT T2 witout setOriginalDest
- Queue consumer3Queue =
session1.createQueue("Consumer.A.VirtualTopic.T2");
+ // Create listener on Broker B1 for VT T2 without setOriginalDest
+ final Queue consumer3Queue =
session1.createQueue("Consumer.A.VirtualTopic.T2");
- // Bind listener on queue for consumer D
- MessageConsumer consumerD = session1.createConsumer(consumer3Queue);
+ // Bind listener on queue for consumer A
+ final MessageConsumer consumerD =
session1.createConsumer(consumer3Queue);
listener3 = new SimpleMessageListener();
consumerD.setMessageListener(listener3);
// Create producer for topic, on B1
- Topic virtualTopicT1 =
session1.createTopic("VirtualTopic.T1,VirtualTopic.T2");
+ final Topic virtualTopicT1 =
session1.createTopic("VirtualTopic.T1,VirtualTopic.T2");
producer = session1.createProducer(virtualTopicT1);
}
- @Test
+ @Test(timeout = 30_000)
public void testDestinationNames() throws Exception {
LOG.info("Started waiting for broker 1 and 2");
@@ -102,27 +127,41 @@ public class MessageDestinationVirtualTopicTest {
init();
// Create a monitor
- CountDownLatch monitor = new CountDownLatch(3);
+ final CountDownLatch monitor = new CountDownLatch(3);
listener1.setCountDown(monitor);
listener2.setCountDown(monitor);
listener3.setCountDown(monitor);
+ // Wait for the consumer on broker2 to be visible on broker1 via the
network bridge.
+ // The virtual topic Consumer.D.VirtualTopic.T1 on broker2 must be
forwarded to broker1
+ // before sending, otherwise the message won't reach listener2.
+ assertTrue("Consumer.D queue should exist on broker1 via network
bridge",
+ Wait.waitFor(() -> {
+ try {
+ final org.apache.activemq.broker.region.Destination dest =
+ broker1.getDestination(new
ActiveMQQueue("Consumer.D.VirtualTopic.T1"));
+ return dest != null && dest.getConsumers().size() >= 1;
+ } catch (final Exception e) {
+ return false;
+ }
+ }, 10_000, 200));
+
LOG.info("Sending message");
// Send a message on the topic
- TextMessage message = session1.createTextMessage("Hello World !");
+ final TextMessage message = session1.createTextMessage("Hello World
!");
producer.send(message);
LOG.info("Waiting for message reception");
// Wait the two messages in the related queues
- monitor.await();
+ assertTrue("All 3 listeners should receive messages",
monitor.await(15, TimeUnit.SECONDS));
// Get the message destinations
- String lastJMSDestination2 = listener2.getLastJMSDestination();
- System.err.println(lastJMSDestination2);
- String lastJMSDestination1 = listener1.getLastJMSDestination();
- System.err.println(lastJMSDestination1);
+ final String lastJMSDestination2 = listener2.getLastJMSDestination();
+ LOG.info("Listener2 destination: {}", lastJMSDestination2);
+ final String lastJMSDestination1 = listener1.getLastJMSDestination();
+ LOG.info("Listener1 destination: {}", lastJMSDestination1);
- String lastJMSDestination3 = listener3.getLastJMSDestination();
- System.err.println(lastJMSDestination3);
+ final String lastJMSDestination3 = listener3.getLastJMSDestination();
+ LOG.info("Listener3 destination: {}", lastJMSDestination3);
// The destination names
assertEquals("queue://Consumer.D.VirtualTopic.T1",
lastJMSDestination2);
@@ -130,4 +169,4 @@ public class MessageDestinationVirtualTopicTest {
assertEquals("topic://VirtualTopic.T2", lastJMSDestination3);
}
-}
\ No newline at end of file
+}
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java
index d72dc28177..1faac2e206 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java
@@ -93,11 +93,15 @@ public class BaseNetworkTest {
// Use startNetworkConnector() instead of connector.start() to ensure
proper JMX MBean registration.
addNetworkConnectors();
- // Wait for both network bridges to be FULLY started (advisory
consumers registered).
+ // Wait for network bridges to be FULLY started (advisory consumers
registered).
// activeBridges().isEmpty() is NOT sufficient because bridges are
added to the map
// before start() completes asynchronously. We must wait for the
startedLatch.
waitForBridgeFullyStarted(localBroker, "Local");
- waitForBridgeFullyStarted(remoteBroker, "Remote");
+ // Only wait for remote bridge if the remote broker has its own
network connector
+ // (duplex bridges don't add a separate connector on the remote side)
+ if (!remoteBroker.getNetworkConnectors().isEmpty()) {
+ waitForBridgeFullyStarted(remoteBroker, "Remote");
+ }
final URI localURI = localBroker.getVmConnectorURI();
ActiveMQConnectionFactory fac = new
ActiveMQConnectionFactory(localURI);
@@ -155,13 +159,9 @@ public class BaseNetworkTest {
}
protected void waitForBridgeFullyStarted(final BrokerService broker, final
String label) throws Exception {
- // Skip if broker has no network connectors (e.g., duplex target
broker receives
- // bridge connections but doesn't initiate them)
- if (broker.getNetworkConnectors().isEmpty()) {
- return;
- }
assertTrue(label + " broker bridge should be fully started",
Wait.waitFor(() -> {
- if
(broker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
+ if (broker.getNetworkConnectors().isEmpty()
+ ||
broker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
return false;
}
final NetworkBridge bridge =
broker.getNetworkConnectors().get(0).activeBridges().iterator().next();
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
index a347ffe3b5..3bafc89386 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
@@ -232,8 +232,9 @@ public abstract class DynamicNetworkTestSupport {
protected DemandForwardingBridge findDuplexBridge(final TransportConnector
connector) throws Exception {
assertNotNull(connector);
- for (TransportConnection tc : connector.getConnections()) {
- if (tc.getConnectionId().startsWith("networkConnector_")) {
+ for (final TransportConnection tc : connector.getConnections()) {
+ final String connectionId = tc.getConnectionId();
+ if (connectionId != null &&
connectionId.startsWith("networkConnector_")) {
final Field bridgeField =
TransportConnection.class.getDeclaredField("duplexBridge");
bridgeField.setAccessible(true);
return (DemandForwardingBridge) bridgeField.get(tc);
diff --git
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
index 229c522559..c17a953ae8 100644
---
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
+++
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
@@ -22,6 +22,9 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import jakarta.jms.MessageProducer;
@@ -31,6 +34,9 @@ import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnection;
import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.junit.Test;
@@ -39,8 +45,6 @@ import org.junit.Test;
*/
public class DynamicallyIncludedDestinationsDuplexNetworkTest extends
SimpleNetworkTest {
- private static final int REMOTE_BROKER_TCP_PORT = 61617;
-
@Override
protected String getLocalBrokerURI() {
return
"org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml";
@@ -50,13 +54,35 @@ public class
DynamicallyIncludedDestinationsDuplexNetworkTest extends SimpleNetw
protected BrokerService createRemoteBroker() throws Exception {
final BrokerService broker = new BrokerService();
broker.setBrokerName("remoteBroker");
- broker.addConnector("tcp://localhost:" + REMOTE_BROKER_TCP_PORT);
+ broker.addConnector("tcp://localhost:0");
return broker;
}
@Override
protected void addNetworkConnectors() throws Exception {
- // No-op: duplex network connector is already defined in
duplexDynamicIncludedDestLocalBroker.xml
+ // For duplex test: only one connector from local to remote, with
duplex=true
+ final URI remoteConnectURI =
remoteBroker.getTransportConnectors().get(0).getConnectUri();
+
+ final DiscoveryNetworkConnector localToRemote = new
DiscoveryNetworkConnector(
+ new URI("static:(" + remoteConnectURI + ")"));
+ localToRemote.setName("networkConnector");
+ localToRemote.setDuplex(true);
+ localToRemote.setDynamicOnly(false);
+ localToRemote.setConduitSubscriptions(true);
+ localToRemote.setDecreaseNetworkConsumerPriority(false);
+
+ final List<ActiveMQDestination> dynamicallyIncluded = new
ArrayList<>();
+ dynamicallyIncluded.add(new ActiveMQQueue("include.test.foo"));
+ dynamicallyIncluded.add(new ActiveMQTopic("include.test.bar"));
+ localToRemote.setDynamicallyIncludedDestinations(dynamicallyIncluded);
+
+ final List<ActiveMQDestination> excluded = new ArrayList<>();
+ excluded.add(new ActiveMQQueue("exclude.test.foo"));
+ excluded.add(new ActiveMQTopic("exclude.test.bar"));
+ localToRemote.setExcludedDestinations(excluded);
+
+ localBroker.addNetworkConnector(localToRemote);
+ localBroker.startNetworkConnector(localToRemote, null);
}
// we have to override this, because with dynamicallyIncludedDestinations
working properly
@@ -70,14 +96,14 @@ public class
DynamicallyIncludedDestinationsDuplexNetworkTest extends SimpleNetw
@Test
public void testTempQueues() throws Exception {
- TemporaryQueue temp = localSession.createTemporaryQueue();
- MessageProducer producer = localSession.createProducer(temp);
+ final TemporaryQueue temp = localSession.createTemporaryQueue();
+ final MessageProducer producer = localSession.createProducer(temp);
producer.send(localSession.createTextMessage("test"));
- Thread.sleep(100);
- assertEquals("Destination not created", 1,
remoteBroker.getAdminView().getTemporaryQueues().length);
+ assertTrue("Destination created on remote",
+ Wait.waitFor(() ->
remoteBroker.getAdminView().getTemporaryQueues().length == 1, 5000, 100));
temp.delete();
- Thread.sleep(100);
- assertEquals("Destination not deleted", 0,
remoteBroker.getAdminView().getTemporaryQueues().length);
+ assertTrue("Destination deleted on remote",
+ Wait.waitFor(() ->
remoteBroker.getAdminView().getTemporaryQueues().length == 0, 5000, 100));
}
@Test(timeout = 60 * 1000)
@@ -101,18 +127,16 @@ public class
DynamicallyIncludedDestinationsDuplexNetworkTest extends SimpleNetw
configuration.getDestinationFilter());
}
- private NetworkBridgeConfiguration
getConfigurationFromNetworkBridge(DemandForwardingBridgeSupport duplexBridge)
throws NoSuchFieldException, IllegalAccessException {
- Field f =
DemandForwardingBridgeSupport.class.getDeclaredField("configuration");
+ private NetworkBridgeConfiguration getConfigurationFromNetworkBridge(final
DemandForwardingBridgeSupport duplexBridge) throws NoSuchFieldException,
IllegalAccessException {
+ final Field f =
DemandForwardingBridgeSupport.class.getDeclaredField("configuration");
f.setAccessible(true);
- NetworkBridgeConfiguration configuration =
(NetworkBridgeConfiguration) f.get(duplexBridge);
- return configuration;
+ return (NetworkBridgeConfiguration) f.get(duplexBridge);
}
- private DemandForwardingBridge
getDuplexBridgeFromConnection(TransportConnection bridgeConnection) throws
NoSuchFieldException, IllegalAccessException {
- Field f = TransportConnection.class.getDeclaredField("duplexBridge");
+ private DemandForwardingBridge getDuplexBridgeFromConnection(final
TransportConnection bridgeConnection) throws NoSuchFieldException,
IllegalAccessException {
+ final Field f =
TransportConnection.class.getDeclaredField("duplexBridge");
f.setAccessible(true);
- DemandForwardingBridge bridge = (DemandForwardingBridge)
f.get(bridgeConnection);
- return bridge;
+ return (DemandForwardingBridge) f.get(bridgeConnection);
}
private DemandForwardingBridge waitForDuplexBridge(final
TransportConnection bridgeConnection) throws Exception {
@@ -136,13 +160,9 @@ public class
DynamicallyIncludedDestinationsDuplexNetworkTest extends SimpleNetw
final NetworkBridge localBridge =
localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return expectedLocalSent ==
localBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
- expectedRemoteSent ==
localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount();
- }
- }));
+ assertTrue(Wait.waitFor(() ->
+ expectedLocalSent ==
localBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
+ expectedRemoteSent ==
localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount()));
}
}
diff --git
a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml
b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml
index 70f6da8215..4346bcfae4 100644
---
a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml
+++
b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml
@@ -6,9 +6,9 @@
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
-
+
http://www.apache.org/licenses/LICENSE-2.0
-
+
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,28 +25,10 @@
<bean
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker brokerName="localBroker" start="false" persistent="true"
useShutdownHook="false" xmlns="http://activemq.apache.org/schema/core">
- <networkConnectors>
- <networkConnector uri="static:(tcp://localhost:61617)"
- duplex="true"
- dynamicOnly = "false"
- conduitSubscriptions = "true"
- decreaseNetworkConsumerPriority = "false">
-
- <dynamicallyIncludedDestinations>
- <queue physicalName="include.test.foo"/>
- <topic physicalName="include.test.bar"/>
- </dynamicallyIncludedDestinations>
-
- <excludedDestinations>
- <queue physicalName="exclude.test.foo"/>
- <topic physicalName="exclude.test.bar"/>
- </excludedDestinations>
-
- </networkConnector>
- </networkConnectors>
+ <!-- Network connector is added programmatically in the test to use
ephemeral ports -->
<transportConnectors>
- <transportConnector uri="tcp://localhost:61616"/>
+ <transportConnector uri="tcp://localhost:0"/>
</transportConnectors>
</broker>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact