This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 80562ea8d3 Ensure at most one BrokerInfo command is received (#2083)
80562ea8d3 is described below

commit 80562ea8d327b7f538de1ff7672f158fe2abc4c1
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Mon Jun 8 09:58:09 2026 -0400

    Ensure at most one BrokerInfo command is received (#2083)
    
    There should only ever be at most one BrokerInfo command received on a
    connection so we should throw an exception and close the connection if a
    second is receveived as that is a protocol error.
---
 .../apache/activemq/broker/TransportConnection.java    | 13 ++++++++-----
 .../network/DurableSyncNetworkBridgeAuthTest.java      | 18 ++++++++++++++++++
 2 files changed, 26 insertions(+), 5 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index f038f34771..9e77101744 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -1392,7 +1392,14 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
     }
 
     @Override
-    public Response processBrokerInfo(BrokerInfo info) {
+    public Response processBrokerInfo(BrokerInfo info) throws IOException {
+        // We only expect to get at most one broker info command per connection
+        // Log and throw an IOException to close the connection if we receive 
more
+        // one because this is a protocol violation
+        if (this.brokerInfo != null) {
+            LOG.warn("Unexpected extra broker info command received: {}", 
info);
+            throw new IOException("Unexpected extra broker info command 
received from: " + info.getBrokerId());
+        }
         if (info.isSlaveBroker()) {
             LOG.error(" Slave Brokers are no longer supported - slave trying 
to attach is: {}", info.getBrokerName());
         } else if (info.isNetworkConnection() && !info.isDuplexConnection()) {
@@ -1464,10 +1471,6 @@ public class TransportConnection implements Connection, 
Task, CommandVisitor {
                 return null;
             }
         }
-        // We only expect to get one broker info command per connection
-        if (this.brokerInfo != null) {
-            LOG.warn("Unexpected extra broker info command received: {}", 
info);
-        }
         this.brokerInfo = info;
         networkConnection = true;
         List<TransportConnectionState> connectionStates = 
listConnectionStates();
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java
index 646dd4f184..19c95da29b 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeAuthTest.java
@@ -30,6 +30,7 @@ import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.BrokerSubscriptionInfo;
 import org.apache.activemq.command.DiscoveryEvent;
 import org.apache.activemq.security.AuthenticationUser;
@@ -152,6 +153,23 @@ public class DurableSyncNetworkBridgeAuthTest extends 
AbstractDurableSyncNetwork
         assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10));
     }
 
+    @Test
+    public void testDuplicateBrokerInfo() throws Exception {
+        // Wait for connection and auth setup
+        doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder(),
+                TimeUnit.SECONDS.toMillis(15));
+        assertTrue(Wait.waitFor(() -> brokerSubInfo.get() != null,5000,10));
+
+        // find the established bridge
+        DemandForwardingBridge bridge = (DemandForwardingBridge) 
localBroker.getNetworkConnectors().get(0).activeBridges().stream()
+                .findFirst().orElseThrow();
+
+        // send to one of the brokers (networked brokers will have already 
received a BrokerInfo)
+        // the duplicate will trigger the bridge connection to close
+        bridge.localBroker.oneway(new BrokerInfo());
+        assertTrue(Wait.waitFor(bridge.localBroker::isDisposed,5000,10));
+    }
+
     protected void doSetUp(boolean deleteAllMessages, boolean 
startNetworkConnector, File localDataDir,
             File remoteDataDir, long waitForStart) throws Exception {
         doSetUpRemoteBroker(deleteAllMessages, remoteDataDir, 0);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to