This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 68e400b45c ARTEMIS-4184 Bidges with concurrency not cleared properly 
on config reload
68e400b45c is described below

commit 68e400b45cf4957de3bf1333e64c01e615f33622
Author: a181321 <anton.roskv...@volvo.com>
AuthorDate: Thu Jun 15 12:18:01 2023 +0200

    ARTEMIS-4184 Bidges with concurrency not cleared properly on config reload
---
 .../artemis/core/config/BridgeConfiguration.java   |  5 ++-
 .../core/server/impl/ActiveMQServerImpl.java       | 45 ++++++++++++++--------
 .../tests/integration/jms/RedeployTest.java        |  2 +
 .../src/test/resources/reload-bridge.xml           |  1 +
 4 files changed, 37 insertions(+), 16 deletions(-)

diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
index d9bc9ff182..7cfb861d70 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
@@ -275,6 +275,9 @@ public final class BridgeConfiguration implements 
Serializable {
     */
    public BridgeConfiguration setName(final String name) {
       this.name = name;
+      if (this.parentName == null) {
+         this.parentName = name;
+      }
       return this;
    }
 
@@ -769,7 +772,7 @@ public final class BridgeConfiguration implements 
Serializable {
       if (name == null) {
          if (other.name != null)
             return false;
-      } else if (!name.equals(other.name))
+      } else if (!parentName.equals(other.parentName))
          return false;
       if (password == null) {
          if (other.password != null)
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 425069f49c..3c3db20b98 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -4619,34 +4619,49 @@ public class ActiveMQServerImpl implements 
ActiveMQServer {
          
deployQueuesFromListQueueConfiguration(configuration.getQueueConfigs());
 
          ActiveMQServerLogger.LOGGER.reloadingConfiguration("bridges");
+
          for (BridgeConfiguration newBridgeConfig : 
configuration.getBridgeConfigurations()) {
-            newBridgeConfig.setParentName(newBridgeConfig.getName());
-            Bridge existingBridge = 
clusterManager.getBridges().get(newBridgeConfig.getParentName());
-            if (existingBridge != null && 
!existingBridge.getConfiguration().equals(newBridgeConfig) && 
existingBridge.getConfiguration().isConfigurationManaged()) {
+
+            String bridgeName = newBridgeConfig.getName();
+            newBridgeConfig.setParentName(bridgeName);
+
+            //Look for bridges with matching parentName. Only need first match 
in case of concurrent bridges
+            Bridge existingBridge = 
clusterManager.getBridges().values().stream()
+               .filter(bridge -> 
bridge.getConfiguration().getParentName().equals(bridgeName))
+               .findFirst()
+               .orElse(null);
+
+            if (existingBridge != null && 
existingBridge.getConfiguration().isConfigurationManaged() && 
!existingBridge.getConfiguration().equals(newBridgeConfig)) {
                // this is an existing bridge but the config changed so stop 
the current bridge and deploy the new one
-               destroyBridge(existingBridge.getName().toString());
+               destroyBridge(bridgeName);
                deployBridge(newBridgeConfig);
             } else if (existingBridge == null) {
                // this is a new bridge
                deployBridge(newBridgeConfig);
             }
+
          }
-         for (final Bridge existingBridge: 
clusterManager.getBridges().values()) {
+
+         //Look for already running bridges no longer in configuration, stop 
if found
+         for (final Bridge existingBridge : 
clusterManager.getBridges().values()) {
             BridgeConfiguration existingBridgeConfig = 
existingBridge.getConfiguration();
-            boolean destroy = true;
-            for (final BridgeConfiguration newBridgeConfig : 
configuration.getBridgeConfigurations()) {
-               if (existingBridgeConfig.isConfigurationManaged() && 
(existingBridgeConfig.getParentName().equals(newBridgeConfig.getName()) || 
existingBridgeConfig.getName().equals(newBridgeConfig.getName()) )) {
-                  destroy = false;
-                  break;
+
+            if (existingBridgeConfig.isConfigurationManaged()) {
+               String existingBridgeName = 
existingBridgeConfig.getParentName();
+
+               boolean noLongerConfigured = 
configuration.getBridgeConfigurations().stream()
+                  .noneMatch(bridge -> 
bridge.getParentName().equals(existingBridgeName));
+
+               if (noLongerConfigured) {
+                  destroyBridge(existingBridgeName);
                }
             }
-            if (destroy) {
-               // this bridge is running but it isn't in the new config which 
means it was removed so destroy it
-               
destroyBridge(existingBridge.getConfiguration().getParentName());
-            }
+
          }
-         recoverStoredConnectors();
+
          recoverStoredBridges();
+         recoverStoredConnectors();
+
       }
    }
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
index 5e6cb02666..a1904dd42a 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java
@@ -518,6 +518,7 @@ public class RedeployTest extends ActiveMQTestBase {
             MessageProducer producer = session.createProducer(queue);
             producer.send(session.createMessage());
             Wait.assertEquals(1, () -> 
embeddedActiveMQ.getActiveMQServer().locateQueue("a-to").getMessageCount());
+            Wait.assertEquals(3, () -> 
embeddedActiveMQ.getActiveMQServer().locateQueue("a-from").getConsumerCount());
          }
 
          try (ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory();
@@ -543,6 +544,7 @@ public class RedeployTest extends ActiveMQTestBase {
             producer.send(session.createMessage());
             Wait.assertEquals(1, () -> 
embeddedActiveMQ.getActiveMQServer().locateQueue("a-new").getMessageCount());
             Wait.assertEquals(1, () -> 
embeddedActiveMQ.getActiveMQServer().locateQueue("a-to").getMessageCount());
+            Wait.assertEquals(2, () -> 
embeddedActiveMQ.getActiveMQServer().locateQueue("a-from").getConsumerCount());
          }
 
          try (ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory();
diff --git a/tests/integration-tests/src/test/resources/reload-bridge.xml 
b/tests/integration-tests/src/test/resources/reload-bridge.xml
index 91c3ad8726..b904816b59 100644
--- a/tests/integration-tests/src/test/resources/reload-bridge.xml
+++ b/tests/integration-tests/src/test/resources/reload-bridge.xml
@@ -39,6 +39,7 @@ under the License.
          <bridge name="a">
             <queue-name>a-from</queue-name>
             <forwarding-address>a-to</forwarding-address>
+            <concurrency>3</concurrency>
             <static-connectors>
                <connector-ref>connector</connector-ref>
             </static-connectors>

Reply via email to