This is an automated email from the ASF dual-hosted git repository.

jlmonteiro pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 92c8b58355 Improve duplex network wait conditions and ports (#1669)
92c8b58355 is described below

commit 92c8b58355d33b6e7dec8ac546c8020a6abce1a8
Author: Jean-Louis Monteiro <[email protected]>
AuthorDate: Wed Feb 25 13:32:44 2026 +0100

    Improve duplex network wait conditions and ports (#1669)
    
    * Improve duplex network wait conditions and ports
    
    * Add CopyOnWriteArrayList import to support thread-safe list operations
    
    * Override addNetworkConnectors method in DuplexNetworkTest and 
MulticastNetworkTest to use what's in the XML for now
    
    * Remove overridden addNetworkConnectors method in DuplexNetworkTest as 
it's no longer needed
---
 .../MessageDestinationVirtualTopicTest.java        | 93 +++++++++++++++-------
 .../apache/activemq/network/BaseNetworkTest.java   | 16 ++--
 .../network/DynamicNetworkTestSupport.java         |  5 +-
 ...callyIncludedDestinationsDuplexNetworkTest.java | 70 ++++++++++------
 .../duplexDynamicIncludedDestLocalBroker.xml       | 26 +-----
 5 files changed, 126 insertions(+), 84 deletions(-)

diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java
index cf0ae79cf3..5ac3495e2d 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java
@@ -18,6 +18,9 @@ package org.apache.activemq.broker.virtual;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.util.Wait;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.slf4j.Logger;
@@ -27,9 +30,12 @@ import 
org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 
 import jakarta.annotation.Resource;
 import jakarta.jms.*;
+import java.net.URI;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 @RunWith(SpringJUnit4ClassRunner.class)
 @ContextConfiguration({ "virtual-topic-network-test.xml" })
@@ -53,45 +59,64 @@ public class MessageDestinationVirtualTopicTest {
 
     private Session session1;
 
-    public void init() throws JMSException {
+    public void init() throws Exception {
+        // Get actual assigned ephemeral ports
+        final String broker1URL = 
broker1.getTransportConnectors().get(0).getConnectUri().toString();
+        final String broker2URL = 
broker2.getTransportConnectors().get(0).getConnectUri().toString();
+        LOG.info("Broker1 URL: {}", broker1URL);
+        LOG.info("Broker2 URL: {}", broker2URL);
+
+        // Add network connector from broker2 to broker1 programmatically 
using actual port
+        final DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector(
+                new URI("static://(" + broker1URL + ")"));
+        nc.setName("linkToBrokerB1");
+        nc.setNetworkTTL(1);
+        nc.setDuplex(true);
+        broker2.addNetworkConnector(nc);
+        nc.start();
+
+        // Wait for bridge to be established
+        assertTrue("Network bridge should be established",
+            Wait.waitFor(() -> nc.activeBridges().size() == 1, 10_000, 500));
+
         // Create connection on Broker B2
-        ConnectionFactory broker2ConnectionFactory = new 
ActiveMQConnectionFactory("tcp://localhost:62616");
-        Connection connection2 = broker2ConnectionFactory.createConnection();
+        final ConnectionFactory broker2ConnectionFactory = new 
ActiveMQConnectionFactory(broker2URL);
+        final Connection connection2 = 
broker2ConnectionFactory.createConnection();
         connection2.start();
-        Session session2 = connection2.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
-        Queue consumerDQueue = 
session2.createQueue("Consumer.D.VirtualTopic.T1");
+        final Session session2 = connection2.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        final Queue consumerDQueue = 
session2.createQueue("Consumer.D.VirtualTopic.T1");
 
         // Bind listener on queue for consumer D
-        MessageConsumer consumer = session2.createConsumer(consumerDQueue);
+        final MessageConsumer consumer = 
session2.createConsumer(consumerDQueue);
         listener2 = new SimpleMessageListener();
         consumer.setMessageListener(listener2);
 
         // Create connection on Broker B1
-        ConnectionFactory broker1ConnectionFactory = new 
ActiveMQConnectionFactory("tcp://localhost:61616");
-        Connection connection1 = broker1ConnectionFactory.createConnection();
+        final ConnectionFactory broker1ConnectionFactory = new 
ActiveMQConnectionFactory(broker1URL);
+        final Connection connection1 = 
broker1ConnectionFactory.createConnection();
         connection1.start();
         session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue consumerCQueue = 
session1.createQueue("Consumer.C.VirtualTopic.T1");
+        final Queue consumerCQueue = 
session1.createQueue("Consumer.C.VirtualTopic.T1");
 
-        // Bind listener on queue for consumer D
-        MessageConsumer consumer1 = session1.createConsumer(consumerCQueue);
+        // Bind listener on queue for consumer C
+        final MessageConsumer consumer1 = 
session1.createConsumer(consumerCQueue);
         listener1 = new SimpleMessageListener();
         consumer1.setMessageListener(listener1);
 
-        // Create listener on Broker B1 for VT T2 witout setOriginalDest
-        Queue consumer3Queue = 
session1.createQueue("Consumer.A.VirtualTopic.T2");
+        // Create listener on Broker B1 for VT T2 without setOriginalDest
+        final Queue consumer3Queue = 
session1.createQueue("Consumer.A.VirtualTopic.T2");
 
-        // Bind listener on queue for consumer D
-        MessageConsumer consumerD = session1.createConsumer(consumer3Queue);
+        // Bind listener on queue for consumer A
+        final MessageConsumer consumerD = 
session1.createConsumer(consumer3Queue);
         listener3 = new SimpleMessageListener();
         consumerD.setMessageListener(listener3);
 
         // Create producer for topic, on B1
-        Topic virtualTopicT1 = 
session1.createTopic("VirtualTopic.T1,VirtualTopic.T2");
+        final Topic virtualTopicT1 = 
session1.createTopic("VirtualTopic.T1,VirtualTopic.T2");
         producer = session1.createProducer(virtualTopicT1);
     }
 
-    @Test
+    @Test(timeout = 30_000)
     public void testDestinationNames() throws Exception {
 
         LOG.info("Started waiting for broker 1 and 2");
@@ -102,27 +127,41 @@ public class MessageDestinationVirtualTopicTest {
         init();
 
         // Create a monitor
-        CountDownLatch monitor = new CountDownLatch(3);
+        final CountDownLatch monitor = new CountDownLatch(3);
         listener1.setCountDown(monitor);
         listener2.setCountDown(monitor);
         listener3.setCountDown(monitor);
 
+        // Wait for the consumer on broker2 to be visible on broker1 via the 
network bridge.
+        // The virtual topic Consumer.D.VirtualTopic.T1 on broker2 must be 
forwarded to broker1
+        // before sending, otherwise the message won't reach listener2.
+        assertTrue("Consumer.D queue should exist on broker1 via network 
bridge",
+            Wait.waitFor(() -> {
+                try {
+                    final org.apache.activemq.broker.region.Destination dest =
+                        broker1.getDestination(new 
ActiveMQQueue("Consumer.D.VirtualTopic.T1"));
+                    return dest != null && dest.getConsumers().size() >= 1;
+                } catch (final Exception e) {
+                    return false;
+                }
+            }, 10_000, 200));
+
         LOG.info("Sending message");
         // Send a message on the topic
-        TextMessage message = session1.createTextMessage("Hello World !");
+        final TextMessage message = session1.createTextMessage("Hello World 
!");
         producer.send(message);
         LOG.info("Waiting for message reception");
         // Wait the two messages in the related queues
-        monitor.await();
+        assertTrue("All 3 listeners should receive messages", 
monitor.await(15, TimeUnit.SECONDS));
 
         // Get the message destinations
-        String lastJMSDestination2 = listener2.getLastJMSDestination();
-        System.err.println(lastJMSDestination2);
-        String lastJMSDestination1 = listener1.getLastJMSDestination();
-        System.err.println(lastJMSDestination1);
+        final String lastJMSDestination2 = listener2.getLastJMSDestination();
+        LOG.info("Listener2 destination: {}", lastJMSDestination2);
+        final String lastJMSDestination1 = listener1.getLastJMSDestination();
+        LOG.info("Listener1 destination: {}", lastJMSDestination1);
 
-        String lastJMSDestination3 = listener3.getLastJMSDestination();
-        System.err.println(lastJMSDestination3);
+        final String lastJMSDestination3 = listener3.getLastJMSDestination();
+        LOG.info("Listener3 destination: {}", lastJMSDestination3);
 
         // The destination names
         assertEquals("queue://Consumer.D.VirtualTopic.T1", 
lastJMSDestination2);
@@ -130,4 +169,4 @@ public class MessageDestinationVirtualTopicTest {
         assertEquals("topic://VirtualTopic.T2", lastJMSDestination3);
 
     }
-}
\ No newline at end of file
+}
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java
index d72dc28177..1faac2e206 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java
@@ -93,11 +93,15 @@ public class BaseNetworkTest {
         // Use startNetworkConnector() instead of connector.start() to ensure 
proper JMX MBean registration.
         addNetworkConnectors();
 
-        // Wait for both network bridges to be FULLY started (advisory 
consumers registered).
+        // Wait for network bridges to be FULLY started (advisory consumers 
registered).
         // activeBridges().isEmpty() is NOT sufficient because bridges are 
added to the map
         // before start() completes asynchronously. We must wait for the 
startedLatch.
         waitForBridgeFullyStarted(localBroker, "Local");
-        waitForBridgeFullyStarted(remoteBroker, "Remote");
+        // Only wait for remote bridge if the remote broker has its own 
network connector
+        // (duplex bridges don't add a separate connector on the remote side)
+        if (!remoteBroker.getNetworkConnectors().isEmpty()) {
+            waitForBridgeFullyStarted(remoteBroker, "Remote");
+        }
 
         final URI localURI = localBroker.getVmConnectorURI();
         ActiveMQConnectionFactory fac = new 
ActiveMQConnectionFactory(localURI);
@@ -155,13 +159,9 @@ public class BaseNetworkTest {
     }
 
     protected void waitForBridgeFullyStarted(final BrokerService broker, final 
String label) throws Exception {
-        // Skip if broker has no network connectors (e.g., duplex target 
broker receives
-        // bridge connections but doesn't initiate them)
-        if (broker.getNetworkConnectors().isEmpty()) {
-            return;
-        }
         assertTrue(label + " broker bridge should be fully started", 
Wait.waitFor(() -> {
-            if 
(broker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
+            if (broker.getNetworkConnectors().isEmpty()
+                    || 
broker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
                 return false;
             }
             final NetworkBridge bridge = 
broker.getNetworkConnectors().get(0).activeBridges().iterator().next();
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
index a347ffe3b5..3bafc89386 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
@@ -232,8 +232,9 @@ public abstract class DynamicNetworkTestSupport {
     protected DemandForwardingBridge findDuplexBridge(final TransportConnector 
connector) throws Exception {
         assertNotNull(connector);
 
-        for (TransportConnection tc : connector.getConnections()) {
-            if (tc.getConnectionId().startsWith("networkConnector_")) {
+        for (final TransportConnection tc : connector.getConnections()) {
+            final String connectionId = tc.getConnectionId();
+            if (connectionId != null && 
connectionId.startsWith("networkConnector_")) {
                 final Field bridgeField = 
TransportConnection.class.getDeclaredField("duplexBridge");
                 bridgeField.setAccessible(true);
                 return (DemandForwardingBridge) bridgeField.get(tc);
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
index 229c522559..c17a953ae8 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
@@ -22,6 +22,9 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 
 import jakarta.jms.MessageProducer;
@@ -31,6 +34,9 @@ import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnection;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.util.Wait;
 import org.junit.Test;
 
@@ -39,8 +45,6 @@ import org.junit.Test;
  */
 public class DynamicallyIncludedDestinationsDuplexNetworkTest extends 
SimpleNetworkTest {
 
-    private static final int REMOTE_BROKER_TCP_PORT = 61617;
-
     @Override
     protected String getLocalBrokerURI() {
         return 
"org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml";
@@ -50,13 +54,35 @@ public class 
DynamicallyIncludedDestinationsDuplexNetworkTest extends SimpleNetw
     protected BrokerService createRemoteBroker() throws Exception {
         final BrokerService broker = new BrokerService();
         broker.setBrokerName("remoteBroker");
-        broker.addConnector("tcp://localhost:" + REMOTE_BROKER_TCP_PORT);
+        broker.addConnector("tcp://localhost:0");
         return broker;
     }
 
     @Override
     protected void addNetworkConnectors() throws Exception {
-        // No-op: duplex network connector is already defined in 
duplexDynamicIncludedDestLocalBroker.xml
+        // For duplex test: only one connector from local to remote, with 
duplex=true
+        final URI remoteConnectURI = 
remoteBroker.getTransportConnectors().get(0).getConnectUri();
+
+        final DiscoveryNetworkConnector localToRemote = new 
DiscoveryNetworkConnector(
+                new URI("static:(" + remoteConnectURI + ")"));
+        localToRemote.setName("networkConnector");
+        localToRemote.setDuplex(true);
+        localToRemote.setDynamicOnly(false);
+        localToRemote.setConduitSubscriptions(true);
+        localToRemote.setDecreaseNetworkConsumerPriority(false);
+
+        final List<ActiveMQDestination> dynamicallyIncluded = new 
ArrayList<>();
+        dynamicallyIncluded.add(new ActiveMQQueue("include.test.foo"));
+        dynamicallyIncluded.add(new ActiveMQTopic("include.test.bar"));
+        localToRemote.setDynamicallyIncludedDestinations(dynamicallyIncluded);
+
+        final List<ActiveMQDestination> excluded = new ArrayList<>();
+        excluded.add(new ActiveMQQueue("exclude.test.foo"));
+        excluded.add(new ActiveMQTopic("exclude.test.bar"));
+        localToRemote.setExcludedDestinations(excluded);
+
+        localBroker.addNetworkConnector(localToRemote);
+        localBroker.startNetworkConnector(localToRemote, null);
     }
 
     // we have to override this, because with dynamicallyIncludedDestinations 
working properly
@@ -70,14 +96,14 @@ public class 
DynamicallyIncludedDestinationsDuplexNetworkTest extends SimpleNetw
 
     @Test
     public void testTempQueues() throws Exception {
-        TemporaryQueue temp = localSession.createTemporaryQueue();
-        MessageProducer producer = localSession.createProducer(temp);
+        final TemporaryQueue temp = localSession.createTemporaryQueue();
+        final MessageProducer producer = localSession.createProducer(temp);
         producer.send(localSession.createTextMessage("test"));
-        Thread.sleep(100);
-        assertEquals("Destination not created", 1, 
remoteBroker.getAdminView().getTemporaryQueues().length);
+        assertTrue("Destination created on remote",
+            Wait.waitFor(() -> 
remoteBroker.getAdminView().getTemporaryQueues().length == 1, 5000, 100));
         temp.delete();
-        Thread.sleep(100);
-        assertEquals("Destination not deleted", 0, 
remoteBroker.getAdminView().getTemporaryQueues().length);
+        assertTrue("Destination deleted on remote",
+            Wait.waitFor(() -> 
remoteBroker.getAdminView().getTemporaryQueues().length == 0, 5000, 100));
     }
 
     @Test(timeout = 60 * 1000)
@@ -101,18 +127,16 @@ public class 
DynamicallyIncludedDestinationsDuplexNetworkTest extends SimpleNetw
                 configuration.getDestinationFilter());
     }
 
-    private NetworkBridgeConfiguration 
getConfigurationFromNetworkBridge(DemandForwardingBridgeSupport duplexBridge) 
throws NoSuchFieldException, IllegalAccessException {
-        Field f = 
DemandForwardingBridgeSupport.class.getDeclaredField("configuration");
+    private NetworkBridgeConfiguration getConfigurationFromNetworkBridge(final 
DemandForwardingBridgeSupport duplexBridge) throws NoSuchFieldException, 
IllegalAccessException {
+        final Field f = 
DemandForwardingBridgeSupport.class.getDeclaredField("configuration");
         f.setAccessible(true);
-        NetworkBridgeConfiguration configuration = 
(NetworkBridgeConfiguration) f.get(duplexBridge);
-        return configuration;
+        return (NetworkBridgeConfiguration) f.get(duplexBridge);
     }
 
-    private DemandForwardingBridge 
getDuplexBridgeFromConnection(TransportConnection bridgeConnection) throws 
NoSuchFieldException, IllegalAccessException {
-        Field f = TransportConnection.class.getDeclaredField("duplexBridge");
+    private DemandForwardingBridge getDuplexBridgeFromConnection(final 
TransportConnection bridgeConnection) throws NoSuchFieldException, 
IllegalAccessException {
+        final Field f = 
TransportConnection.class.getDeclaredField("duplexBridge");
         f.setAccessible(true);
-        DemandForwardingBridge bridge = (DemandForwardingBridge) 
f.get(bridgeConnection);
-        return bridge;
+        return (DemandForwardingBridge) f.get(bridgeConnection);
     }
 
     private DemandForwardingBridge waitForDuplexBridge(final 
TransportConnection bridgeConnection) throws Exception {
@@ -136,13 +160,9 @@ public class 
DynamicallyIncludedDestinationsDuplexNetworkTest extends SimpleNetw
 
         final NetworkBridge localBridge = 
localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
 
-        assertTrue(Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return expectedLocalSent == 
localBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
-                        expectedRemoteSent == 
localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount();
-            }
-        }));
+        assertTrue(Wait.waitFor(() ->
+                expectedLocalSent == 
localBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
+                expectedRemoteSent == 
localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount()));
 
     }
 }
diff --git 
a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml
 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml
index 70f6da8215..4346bcfae4 100644
--- 
a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml
+++ 
b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml
@@ -6,9 +6,9 @@
     The ASF licenses this file to You under the Apache License, Version 2.0
     (the "License"); you may not use this file except in compliance with
     the License.  You may obtain a copy of the License at
-   
+
     http://www.apache.org/licenses/LICENSE-2.0
-   
+
     Unless required by applicable law or agreed to in writing, software
     distributed under the License is distributed on an "AS IS" BASIS,
     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -25,28 +25,10 @@
     <bean 
class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
 
     <broker brokerName="localBroker" start="false" persistent="true" 
useShutdownHook="false" xmlns="http://activemq.apache.org/schema/core";>
-        <networkConnectors>
-            <networkConnector uri="static:(tcp://localhost:61617)"
-                              duplex="true"
-                              dynamicOnly = "false"
-                              conduitSubscriptions = "true"
-                              decreaseNetworkConsumerPriority = "false">
-
-                <dynamicallyIncludedDestinations>
-                    <queue physicalName="include.test.foo"/>
-                    <topic physicalName="include.test.bar"/>
-                </dynamicallyIncludedDestinations>
-
-                <excludedDestinations>
-                    <queue physicalName="exclude.test.foo"/>
-                    <topic physicalName="exclude.test.bar"/>
-                </excludedDestinations>
-
-            </networkConnector>
-        </networkConnectors>
+        <!-- Network connector is added programmatically in the test to use 
ephemeral ports -->
 
         <transportConnectors>
-            <transportConnector uri="tcp://localhost:61616"/>
+            <transportConnector uri="tcp://localhost:0"/>
         </transportConnectors>
 
     </broker>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to