This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push: new 68e400b45c ARTEMIS-4184 Bidges with concurrency not cleared properly on config reload 68e400b45c is described below commit 68e400b45cf4957de3bf1333e64c01e615f33622 Author: a181321 <anton.roskv...@volvo.com> AuthorDate: Thu Jun 15 12:18:01 2023 +0200 ARTEMIS-4184 Bidges with concurrency not cleared properly on config reload --- .../artemis/core/config/BridgeConfiguration.java | 5 ++- .../core/server/impl/ActiveMQServerImpl.java | 45 ++++++++++++++-------- .../tests/integration/jms/RedeployTest.java | 2 + .../src/test/resources/reload-bridge.xml | 1 + 4 files changed, 37 insertions(+), 16 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java index d9bc9ff182..7cfb861d70 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java @@ -275,6 +275,9 @@ public final class BridgeConfiguration implements Serializable { */ public BridgeConfiguration setName(final String name) { this.name = name; + if (this.parentName == null) { + this.parentName = name; + } return this; } @@ -769,7 +772,7 @@ public final class BridgeConfiguration implements Serializable { if (name == null) { if (other.name != null) return false; - } else if (!name.equals(other.name)) + } else if (!parentName.equals(other.parentName)) return false; if (password == null) { if (other.password != null) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 425069f49c..3c3db20b98 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -4619,34 +4619,49 @@ public class ActiveMQServerImpl implements ActiveMQServer { deployQueuesFromListQueueConfiguration(configuration.getQueueConfigs()); ActiveMQServerLogger.LOGGER.reloadingConfiguration("bridges"); + for (BridgeConfiguration newBridgeConfig : configuration.getBridgeConfigurations()) { - newBridgeConfig.setParentName(newBridgeConfig.getName()); - Bridge existingBridge = clusterManager.getBridges().get(newBridgeConfig.getParentName()); - if (existingBridge != null && !existingBridge.getConfiguration().equals(newBridgeConfig) && existingBridge.getConfiguration().isConfigurationManaged()) { + + String bridgeName = newBridgeConfig.getName(); + newBridgeConfig.setParentName(bridgeName); + + //Look for bridges with matching parentName. Only need first match in case of concurrent bridges + Bridge existingBridge = clusterManager.getBridges().values().stream() + .filter(bridge -> bridge.getConfiguration().getParentName().equals(bridgeName)) + .findFirst() + .orElse(null); + + if (existingBridge != null && existingBridge.getConfiguration().isConfigurationManaged() && !existingBridge.getConfiguration().equals(newBridgeConfig)) { // this is an existing bridge but the config changed so stop the current bridge and deploy the new one - destroyBridge(existingBridge.getName().toString()); + destroyBridge(bridgeName); deployBridge(newBridgeConfig); } else if (existingBridge == null) { // this is a new bridge deployBridge(newBridgeConfig); } + } - for (final Bridge existingBridge: clusterManager.getBridges().values()) { + + //Look for already running bridges no longer in configuration, stop if found + for (final Bridge existingBridge : clusterManager.getBridges().values()) { BridgeConfiguration existingBridgeConfig = existingBridge.getConfiguration(); - boolean destroy = true; - for (final BridgeConfiguration newBridgeConfig : configuration.getBridgeConfigurations()) { - if (existingBridgeConfig.isConfigurationManaged() && (existingBridgeConfig.getParentName().equals(newBridgeConfig.getName()) || existingBridgeConfig.getName().equals(newBridgeConfig.getName()) )) { - destroy = false; - break; + + if (existingBridgeConfig.isConfigurationManaged()) { + String existingBridgeName = existingBridgeConfig.getParentName(); + + boolean noLongerConfigured = configuration.getBridgeConfigurations().stream() + .noneMatch(bridge -> bridge.getParentName().equals(existingBridgeName)); + + if (noLongerConfigured) { + destroyBridge(existingBridgeName); } } - if (destroy) { - // this bridge is running but it isn't in the new config which means it was removed so destroy it - destroyBridge(existingBridge.getConfiguration().getParentName()); - } + } - recoverStoredConnectors(); + recoverStoredBridges(); + recoverStoredConnectors(); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java index 5e6cb02666..a1904dd42a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/RedeployTest.java @@ -518,6 +518,7 @@ public class RedeployTest extends ActiveMQTestBase { MessageProducer producer = session.createProducer(queue); producer.send(session.createMessage()); Wait.assertEquals(1, () -> embeddedActiveMQ.getActiveMQServer().locateQueue("a-to").getMessageCount()); + Wait.assertEquals(3, () -> embeddedActiveMQ.getActiveMQServer().locateQueue("a-from").getConsumerCount()); } try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); @@ -543,6 +544,7 @@ public class RedeployTest extends ActiveMQTestBase { producer.send(session.createMessage()); Wait.assertEquals(1, () -> embeddedActiveMQ.getActiveMQServer().locateQueue("a-new").getMessageCount()); Wait.assertEquals(1, () -> embeddedActiveMQ.getActiveMQServer().locateQueue("a-to").getMessageCount()); + Wait.assertEquals(2, () -> embeddedActiveMQ.getActiveMQServer().locateQueue("a-from").getConsumerCount()); } try (ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); diff --git a/tests/integration-tests/src/test/resources/reload-bridge.xml b/tests/integration-tests/src/test/resources/reload-bridge.xml index 91c3ad8726..b904816b59 100644 --- a/tests/integration-tests/src/test/resources/reload-bridge.xml +++ b/tests/integration-tests/src/test/resources/reload-bridge.xml @@ -39,6 +39,7 @@ under the License. <bridge name="a"> <queue-name>a-from</queue-name> <forwarding-address>a-to</forwarding-address> + <concurrency>3</concurrency> <static-connectors> <connector-ref>connector</connector-ref> </static-connectors>