This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 230db1fec0 Refactor network connections to process after 
ConnectionInfo (#2112)
230db1fec0 is described below

commit 230db1fec0b4bfdb8dcf06cdceeb00585ed8f82c
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Mon Jun 15 12:29:14 2026 -0400

    Refactor network connections to process after ConnectionInfo (#2112)
    
    This refactors the setup for network connections to be processed
    after receiving ConnectionInfo. Network bridges are established by
    each broker sending a BrokerInfo command to the other broker to provide
    remote broker information, but this is done before ConnectionInfo.
    This commit reworks the connection to capture the BrokerInfo information,
    but delay processing until after the connection information has been
    received and processed by broker.addConnection(). The future that was
    previously added for the connection id is no longer needed and removed.
    
    This commit also simplifies durable sync for bridges and prevents a
    race condition on startup by making sure the initial bridge and the
    duplex side only send back the BrokerSubscriptionInfo command after
    fully initialized.
---
 .../activemq/broker/TransportConnection.java       | 137 ++++++++++-----------
 .../activemq/broker/region/RegionBroker.java       |  37 ++++++
 .../network/DemandForwardingBridgeSupport.java     |  14 ++-
 .../network/DurableSyncNetworkBridgeAuthTest.java  | 115 +++++++++++++++--
 4 files changed, 217 insertions(+), 86 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 9e77101744..b6fe548857 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -165,7 +165,6 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
     private final ReentrantReadWriteLock serviceLock = new 
ReentrantReadWriteLock();
     private String duplexNetworkConnectorId;
     private final long connectedTimestamp;
-    private final CompletableFuture<ConnectionId> initialConnectionId = new 
CompletableFuture<>();
 
     /**
      * @param taskRunnerFactory - can be null if you want direct dispatch to 
the transport
@@ -854,16 +853,14 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
 
         try {
             broker.addConnection(context, info);
-            // Complete the future with the connectionId if we completed
-            // the broker.addConnection() chain successfully
-            initialConnectionId.complete(info.getConnectionId());
+            // If we completed broker.addConnection() successfully we can now
+            // continue the required extra setup for any network connections
+            addNetworkConnection();
         } catch (Exception e) {
             synchronized (brokerConnectionStates) {
                 brokerConnectionStates.remove(info.getConnectionId());
             }
             unregisterConnectionState(info.getConnectionId());
-            // complete with the exception
-            initialConnectionId.completeExceptionally(e);
             LOG.warn("Failed to add Connection id={}, clientId={}, clientIP={} 
due to {}",
                     info.getConnectionId(), clientId, info.getClientIp(), 
e.getLocalizedMessage());
             //AMQ-6561 - stop for all exceptions on addConnection
@@ -1401,44 +1398,75 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
             throw new IOException("Unexpected extra broker info command 
received from: " + info.getBrokerId());
         }
         if (info.isSlaveBroker()) {
-            LOG.error(" Slave Brokers are no longer supported - slave trying 
to attach is: {}", info.getBrokerName());
-        } else if (info.isNetworkConnection() && !info.isDuplexConnection()) {
+            LOG.error("Slave Brokers are no longer supported - slave trying to 
attach is: {}", info.getBrokerName());
+            throw new IOException("Slave Brokers are no longer supported - 
slave trying to attach is: " + info.getBrokerName());
+        }
+
+        // The only thing this method now does is capture the BrokerInfo 
object and mark as a network connection.
+        // Actual processing for starting up duplex bridges and for durable 
sync has been moved until
+        // after ConnectionInfo has been received.
+
+        // If this is duplex we need to get the ID configured so we can use it
+        // to close existing connections later that match the same ID
+        // This will be done inside the RegionBroker
+        if (info.isNetworkConnection() && info.isDuplexConnection()) {
+            NetworkBridgeConfiguration config = getNetworkConfiguration(info);
+            config.setBrokerName(broker.getBrokerName());
+            String duplexNetworkConnectorId = config.getName() + "@" + 
info.getBrokerId();
+            setDuplexNetworkConnectorId(duplexNetworkConnectorId);
+        }
+
+        this.brokerInfo = info;
+        networkConnection = true;
+        List<TransportConnectionState> connectionStates = 
listConnectionStates();
+        for (TransportConnectionState cs : connectionStates) {
+            cs.getContext().setNetworkConnection(true);
+        }
+        return null;
+    }
+
+    // Process the network connection set up
+    private void addNetworkConnection() throws Exception {
+        final BrokerInfo info = this.brokerInfo;
+        if (info == null || !info.isNetworkConnection()){
+            return;
+        }
+
+        // For a one way bridge we need to respond on bridge creation by 
sending back the durable
+        // subs if durable sync is enabled via BrokerSubscriptionInfo command. 
The bridge is only
+        // initialized on one broker, so if this is the passive side we know 
it's initialized and
+        // we can respond.
+        //
+        // For a duplex bridge, we do NOT send back the durable subs. To 
simplify and ensure
+        // the bridge is fully initialized, the bridge startup will now handle 
sending
+        // BrokerSubscriptionInfo to the remote broker once fully started.
+        if (!info.isDuplexConnection()) {
             try {
-                // register durable sync to be sent after ConnectionInfo has 
been handled
-                registerDurableSync(getNetworkConfiguration(info), info);
+                NetworkBridgeConfiguration config = 
getNetworkConfiguration(info);
+                if (config.isSyncDurableSubs() && protocolVersion.get() >= 
CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
+                    LOG.debug("SyncDurableSubs is enabled, Sending 
BrokerSubscriptionInfo");
+                    // Send back the durable subs as this is a one way bridge
+                    
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(this.broker.getBrokerService(),
 config));
+                }
             } catch (Exception e) {
-                LOG.error("Failed to register durable sync for network bridge 
creation from broker {}", info.getBrokerId(), e);
-                return null;
+                LOG.error("Failed to respond to network bridge creation from 
broker {}", info.getBrokerId(), e);
+                throw e;
             }
-        } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
+        } else {
+            // duplex
             // so this TransportConnection is the rear end of a network bridge
             // We have been requested to create a two way pipe ...
             try {
                 NetworkBridgeConfiguration config = 
getNetworkConfiguration(info);
                 config.setBrokerName(broker.getBrokerName());
 
-                // register durable sync to be sent after ConnectionInfo has 
been handled
-                registerDurableSync(config, info);
-
-                // check for existing duplex connection hanging about
-
-                // We first look if existing network connection already exists 
for the same broker Id and network connector name
-                // It's possible in case of brief network fault to have this 
transport connector side of the connection always active
-                // and the duplex network connector side wanting to open a new 
one
-                // In this case, the old connection must be broken
-                String duplexNetworkConnectorId = config.getName() + "@" + 
info.getBrokerId();
-                CopyOnWriteArrayList<TransportConnection> connections = 
this.connector.getConnections();
-                synchronized (connections) {
-                    for (TransportConnection c : connections) {
-                        if ((c != this) && 
(duplexNetworkConnectorId.equals(c.getDuplexNetworkConnectorId()))) {
-                            LOG.warn("Stopping an existing active duplex 
connection [{}] for network connector ({}).", c, duplexNetworkConnectorId);
-                            c.stopAsync();
-                            // better to wait for a bit rather than get 
connection id already in use and failure to start new bridge
-                            c.getStopped().await(1, TimeUnit.SECONDS);
-                        }
-                    }
-                    setDuplexNetworkConnectorId(duplexNetworkConnectorId);
-                }
+                // Note: Durable sync used to be here and was moved to 
DemandForwardingBridgeSupport
+                // inside doStartLocalAndRemoteBridges()
+
+                //The logic to clean up existing network connections for the 
same ID
+                // has been moved to the RegionBroker where it will check if 
the broker
+                // needs to close the connection before trying to create a 
duplicate connection
+
                 Transport localTransport = 
NetworkBridgeFactory.createLocalTransport(config, broker.getVmConnectorURI());
                 Transport remoteBridgeTransport = transport;
                 if (! (remoteBridgeTransport instanceof ResponseCorrelator)) {
@@ -1462,46 +1490,13 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
                 duplexBridge.setCreatedByDuplex(true);
                 duplexBridge.duplexStart(this, brokerInfo, info);
                 LOG.info("Started responder end of duplex bridge {}", 
duplexNetworkConnectorId);
-                return null;
             } catch (TransportDisposedIOException e) {
                 LOG.warn("Duplex bridge {} was stopped before it was correctly 
started.", duplexNetworkConnectorId);
-                return null;
             } catch (Exception e) {
                 LOG.error("Failed to create responder end of duplex network 
bridge {}", duplexNetworkConnectorId, e);
-                return null;
+                throw e;
             }
         }
-        this.brokerInfo = info;
-        networkConnection = true;
-        List<TransportConnectionState> connectionStates = 
listConnectionStates();
-        for (TransportConnectionState cs : connectionStates) {
-            cs.getContext().setNetworkConnection(true);
-        }
-        return null;
-    }
-
-    private void registerDurableSync(final NetworkBridgeConfiguration config, 
final BrokerInfo info) {
-        if (config.isSyncDurableSubs() && protocolVersion.get() >= 
CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
-            // this will complete when the connection id has been set, or 
immediately if already set
-            initialConnectionId.whenComplete((connectionId, t) -> {
-                try {
-                    if (t != null) {
-                        LOG.warn("SyncDurableSubs will be skipped due to error 
{}",
-                                t.getMessage());
-                        return;
-                    }
-                    // check connection still registered
-                    if (lookupConnectionState(connectionId) != null) {
-                        LOG.debug("SyncDurableSubs is enabled, Sending 
BrokerSubscriptionInfo");
-                        
dispatchSync(NetworkBridgeUtils.getBrokerSubscriptionInfo(
-                                this.broker.getBrokerService(), config));
-                    }
-                } catch (Exception e) {
-                    LOG.error("Failed to respond to network bridge creation 
from broker {}",
-                            info.getBrokerId(), e);
-                }
-            });
-        }
     }
 
     @SuppressWarnings({"unchecked", "rawtypes"})
@@ -1707,7 +1702,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
         this.duplexNetworkConnectorId = duplexNetworkConnectorId;
     }
 
-    protected synchronized String getDuplexNetworkConnectorId() {
+    public synchronized String getDuplexNetworkConnectorId() {
         return this.duplexNetworkConnectorId;
     }
 
@@ -1715,7 +1710,7 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
         return stopping.get();
     }
 
-    protected CountDownLatch getStopped() {
+    public CountDownLatch getStopped() {
         return stopped;
     }
 
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
index 2e6ee20497..7d5de4c184 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
@@ -29,6 +29,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -255,6 +256,10 @@ public class RegionBroker extends EmptyBroker {
             throw new InvalidClientIDException("No clientID specified for 
connection request");
         }
 
+        // Clean up existing duplex network connection if this is a reconnect 
attempt
+        // This was moved from TransportConnection
+        cleanupExistingDuplexNetworkConnection(context);
+
         ConnectionContext oldContext = null;
 
         synchronized (clientIdSet) {
@@ -289,6 +294,38 @@ public class RegionBroker extends EmptyBroker {
         connections.add(context.getConnection());
     }
 
+    // We first look if existing network connection already exists for the 
same broker Id and network connector name
+    // It's possible in case of brief network fault to have this transport 
connector side of the connection always active
+    // and the duplex network connector side wanting to open a new one
+    // In this case, the old connection must be broken
+    private void cleanupExistingDuplexNetworkConnection(ConnectionContext 
context) {
+        try {
+            if (context.isNetworkConnection()
+                    && context.getConnection() instanceof TransportConnection) 
{
+                final TransportConnection newConn = (TransportConnection) 
context.getConnection();
+                if (newConn.getDuplexNetworkConnectorId() != null) {
+                    for (Connection c : connections) {
+                        if (c instanceof TransportConnection) {
+                            final TransportConnection existingConn = 
(TransportConnection) c;
+                            if (newConn.getDuplexNetworkConnectorId()
+                                    
.equals(existingConn.getDuplexNetworkConnectorId())) {
+                                LOG.warn("Stopping an existing active duplex 
connection [{}] for network connector ({}).",
+                                        c, 
existingConn.getDuplexNetworkConnectorId());
+                                existingConn.stopAsync();
+                                // better to wait for a bit rather than get 
connection id already in use and failure to start new bridge
+                                existingConn.getStopped().await(2, 
TimeUnit.SECONDS);
+                                break;
+                            }
+                        }
+                    }
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("Error cleaning up Duplex connection: {}" , 
e.getMessage());
+            LOG.debug(e.getMessage(), e);
+        }
+    }
+
     @Override
     public void removeConnection(ConnectionContext context, ConnectionInfo 
info, Throwable error) throws Exception {
         String clientId = info.getClientId();
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 26a5769ed0..17eb9c5529 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -477,6 +477,15 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
             if (safeWaitUntilStarted()) {
                 setupStaticDestinations();
                 staticDestinationsLatch.countDown();
+
+                // Send to the remote broker the durable subs if sync is 
enabled after statup.
+                // This is done by the initiating side of a bridge as well as 
by duplex bridges to
+                // ensure everything is fully initialized before sending.
+                if (configuration.isSyncDurableSubs() &&
+                        remoteBroker.getWireFormat().getVersion() >= 
CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
+                    
remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService,
+                            configuration));
+                }
             }
         } catch (Throwable e) {
             serviceLocalException(e);
@@ -599,11 +608,6 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
                     brokerInfo.setNetworkProperties(str);
                     brokerInfo.setBrokerId(this.localBrokerId);
                     remoteBroker.oneway(brokerInfo);
-                    if (configuration.isSyncDurableSubs() &&
-                            remoteBroker.getWireFormat().getVersion() >= 
CommandTypes.PROTOCOL_VERSION_DURABLE_SYNC) {
-                        
remoteBroker.oneway(NetworkBridgeUtils.getBrokerSubscriptionInfo(brokerService,
-                                configuration));
-                    }
                 }
                 if (remoteConnectionInfo != null) {
                     
remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java
index 19c95da29b..3a99029617 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java
@@ -16,10 +16,12 @@
  */
 package org.apache.activemq.network;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.Collection;
@@ -39,8 +41,11 @@ import 
org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
 import 
org.apache.activemq.store.kahadb.disk.journal.Journal.JournalDiskSyncStrategy;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFilter;
+import org.apache.activemq.transport.discovery.DiscoveryAgent;
+import org.apache.activemq.transport.discovery.DiscoveryListener;
 import org.apache.activemq.util.Wait;
 import org.junit.After;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -84,6 +89,7 @@ public class DurableSyncNetworkBridgeAuthTest extends 
AbstractDurableSyncNetwork
     private static final String USER_PASSWORD = "password";
     private final boolean duplex;
     private final AtomicReference<BrokerSubscriptionInfo> brokerSubInfo = new 
AtomicReference<>();
+    private final AtomicReference<DiscoveryEvent> serviceFailed = new 
AtomicReference<>();
     private String ncPassword = USER_PASSWORD;
 
     public DurableSyncNetworkBridgeAuthTest(boolean duplex) {
@@ -94,6 +100,7 @@ public class DurableSyncNetworkBridgeAuthTest extends 
AbstractDurableSyncNetwork
     public void setUp() throws Exception {
         this.ncPassword = USER_PASSWORD;
         this.brokerSubInfo.set(null);
+        this.serviceFailed.set(null);
     }
 
     @After
@@ -110,31 +117,77 @@ public class DurableSyncNetworkBridgeAuthTest extends 
AbstractDurableSyncNetwork
         // automatically on connect so the remote broker will always receive 
it. However, the
         // remote broker should only send back its list after the connection 
is properly authenticated.
         assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10));
+        assertNull(serviceFailed.get());
+        DemandForwardingBridge bridge = getActiveBridge();
 
         // Simulate a connection exception and reconnect, we should receive 
again
         brokerSubInfo.set(null);
         localBroker.getNetworkConnectors().get(0).activeBridges().stream()
                 .findFirst().orElseThrow().serviceRemoteException(new 
Exception());
+        // wait for failure
+        assertTrue(Wait.waitFor(() -> serviceFailed.get() != null,5000,10));
+        assertTrue(Wait.waitFor(bridge.localBroker::isDisposed,5000,10));
+
+        // should reconnect again and get updated info
+        assertTrue(Wait.waitFor(() -> 
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1,
+                TimeUnit.SECONDS.toMillis(5), 10));
         assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10));
     }
 
     @Test
     public void testAuthFailure() throws Exception {
         this.ncPassword = "badpassword";
-        try {
-            // set a shorter wait time, it won't connect with bad password
-            doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(),
-                    TimeUnit.SECONDS.toMillis(5));
-            throw new IllegalStateException("Should have received assertion 
error with bad password");
-        } catch (AssertionError e) {
-            // expected
-        }
+        doSetUpRemoteBroker(true, tempFolder.newFolder(), 0);
+        doSetUpLocalBroker(true, true, tempFolder.newFolder());
 
+        // Wait for the failure due to authentication
+        assertTrue(Wait.waitFor(() -> serviceFailed.get() != null,5000,10));
+        assertTrue(Wait.waitFor(() -> 
localBroker.getNetworkConnectors().get(0).activeBridges().isEmpty(),
+                TimeUnit.SECONDS.toMillis(5), 10));
         // Because the local broker was not authenticated by the remote 
broker, the local broker
         // should not have received back the BrokerSubscriptionInfo
         assertNull(brokerSubInfo.get());
     }
 
+    @Test
+    public void testDuplicateDuplexBridgeFailedAuthIgnored() throws Exception {
+        Assume.assumeTrue(duplex);
+        doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(),
+                TimeUnit.SECONDS.toMillis(15));
+
+        // everything is good, no error and we got the sync command
+        assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10));
+        assertNull(serviceFailed.get());
+
+        // Start a duplicate bridge with the same configuration but bad 
password
+        // so authentication fails. This should not cause a failure with the 
existing
+        // bridge because this connection won't be authenticated
+        DemandForwardingBridge bridge = getActiveBridge();
+        this.ncPassword = "badpassword";
+        NetworkConnector nc = 
localBroker.addNetworkConnector(configureLocalNetworkConnector());
+        nc.start();
+        try {
+            Thread.sleep(2000);
+            // Verify bridge is not disposed and still connected
+            assertFalse(bridge.disposed.get());
+        } finally {
+            nc.stop();
+        }
+
+        // try again, this will connect successfully and the broker will 
detect it's a duplex bridge
+        // matching the same config and close the other
+        this.ncPassword = USER_PASSWORD;
+        nc = localBroker.addNetworkConnector(configureLocalNetworkConnector());
+        nc.start();
+        try {
+            // authentication is now correct so the RegionBroker should 
terminate the other duplex
+            // bridge as it matches
+            assertTrue(Wait.waitFor(bridge.disposed::get,5000,10));
+        } finally {
+            nc.stop();
+        }
+    }
+
     @Test
     public void testRestartSync() throws Exception {
         doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(),
@@ -144,12 +197,17 @@ public class DurableSyncNetworkBridgeAuthTest extends 
AbstractDurableSyncNetwork
         // automatically on connect so the remote broker will always receive 
it. However, the
         // remote broker should only send back its list after the connection 
is properly authenticated.
         assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10));
+        assertNull(serviceFailed.get());
 
         // Restart, should receive again with new connection
         brokerSubInfo.set(null);
         restartRemoteBroker();
+        // should fail from restart
+        assertTrue(Wait.waitFor(() -> serviceFailed.get() != null,5000,10));
 
         // Wait for the reconnect and receive of BrokerSubInfo
+        assertTrue(Wait.waitFor(() -> 
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1,
+                TimeUnit.SECONDS.toMillis(5), 10));
         assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10));
     }
 
@@ -159,10 +217,10 @@ public class DurableSyncNetworkBridgeAuthTest extends 
AbstractDurableSyncNetwork
         doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(),
                 TimeUnit.SECONDS.toMillis(15));
         assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10));
+        assertNull(serviceFailed.get());
 
         // find the established bridge
-        DemandForwardingBridge bridge = (DemandForwardingBridge) 
localBroker.getNetworkConnectors().get(0).activeBridges().stream()
-                .findFirst().orElseThrow();
+        DemandForwardingBridge bridge = getActiveBridge();
 
         // send to one of the brokers (networked brokers will have already 
received a BrokerInfo)
         // the duplicate will trigger the bridge connection to close
@@ -232,6 +290,9 @@ public class DurableSyncNetworkBridgeAuthTest extends 
AbstractDurableSyncNetwork
         URI remoteURI = transportConnectors.get(0).getConnectUri();
         String uri = "static:(" + remoteURI + ")";
         NetworkConnector connector = new DiscoveryNetworkConnector(new 
URI(uri)) {
+            {
+                this.setDiscoveryAgent(new 
DiscoveryAgentFilter(getDiscoveryAgent()));
+            }
             @Override
             protected NetworkBridge createBridge(Transport localTransport,
                     Transport remoteTransport, DiscoveryEvent event) {
@@ -248,6 +309,7 @@ public class DurableSyncNetworkBridgeAuthTest extends 
AbstractDurableSyncNetwork
                         }
                         super.onCommand(command);
                     }
+
                 };
                 return super.createBridge(localTransport, remoteFilter, event);
             }
@@ -285,4 +347,37 @@ public class DurableSyncNetworkBridgeAuthTest extends 
AbstractDurableSyncNetwork
         return brokerService;
     }
 
+    private DemandForwardingBridge getActiveBridge() {
+        return(DemandForwardingBridge) 
localBroker.getNetworkConnectors().get(0).activeBridges().stream()
+                .findFirst().orElseThrow();
+    }
+
+    private class DiscoveryAgentFilter implements DiscoveryAgent {
+        private final DiscoveryAgent agent;
+
+        public DiscoveryAgentFilter(DiscoveryAgent agent) {
+            this.agent = agent;
+        }
+
+        public void setDiscoveryListener(DiscoveryListener listener) {
+            agent.setDiscoveryListener(listener);
+        }
+
+        public void start() throws Exception {
+            agent.start();
+        }
+
+        public void stop() throws Exception {
+            agent.stop();
+        }
+
+        public void registerService(String name) throws IOException {
+            agent.registerService(name);
+        }
+
+        public void serviceFailed(DiscoveryEvent event) throws IOException {
+            serviceFailed.set(event);
+            agent.serviceFailed(event);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to