Repository: activemq Updated Branches: refs/heads/master 27238b2dd -> 25703fbd1
https://issues.apache.org/jira/browse/AMQ-6538 Fixing an issue with syncDurableSubs that cause a bridge failure when adding multiple bridges between the same brokers Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/25703fbd Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/25703fbd Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/25703fbd Branch: refs/heads/master Commit: 25703fbd1f27b65a7410acd7df0bfaf7c16845d8 Parents: 27238b2 Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Tue Dec 13 11:58:57 2016 -0500 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Tue Dec 13 11:58:57 2016 -0500 ---------------------------------------------------------------------- .../activemq/network/DurableConduitBridge.java | 20 ++++--- .../network/DurableSyncNetworkBridgeTest.java | 55 +++++++++++++++++++- 2 files changed, 67 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/25703fbd/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java index 50c9855..fb2b6c9 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java @@ -18,6 +18,7 @@ package org.apache.activemq.network; import java.io.IOException; +import org.apache.activemq.broker.region.DurableTopicSubscription; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicRegion; @@ -95,14 +96,19 @@ public class DurableConduitBridge extends ConduitBridge { String candidateSubName = getSubscriberName(dest); for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) { String subName = subscription.getConsumerInfo().getSubscriptionName(); - if (subName != null && subName.equals(candidateSubName)) { + if (subName != null && subName.equals(candidateSubName) && + subscription instanceof DurableTopicSubscription) { try { - // remove the NC subscription as it is no longer for a permissable dest - RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo(); - sending.setClientId(localClientId); - sending.setSubscriptionName(subName); - sending.setConnectionId(this.localConnectionInfo.getConnectionId()); - localBroker.oneway(sending); + DurableTopicSubscription durableSub = (DurableTopicSubscription) subscription; + //check the clientId so we only remove subs for the matching bridge + if (durableSub.getSubscriptionKey().getClientId().equals(localClientId)) { + // remove the NC subscription as it is no longer for a permissible dest + RemoveSubscriptionInfo sending = new RemoveSubscriptionInfo(); + sending.setClientId(localClientId); + sending.setSubscriptionName(subName); + sending.setConnectionId(this.localConnectionInfo.getConnectionId()); + localBroker.oneway(sending); + } } catch (IOException e) { LOG.debug("Exception removing NC durable subscription: {}", subName, e); serviceRemoteException(e); http://git-wip-us.apache.org/repos/asf/activemq/blob/25703fbd/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java index 4a705f3..4e115a4 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java @@ -76,7 +76,7 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { private boolean forceDurable = false; private boolean useVirtualDestSubs = false; private byte remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION; - public static enum FLOW {FORWARD, REVERSE}; + public static enum FLOW {FORWARD, REVERSE} private BrokerService broker1; private BrokerService broker2; @@ -535,6 +535,59 @@ public class DurableSyncNetworkBridgeTest extends DynamicNetworkTestSupport { } + //Test that durable sync works with more than one bridge + @Test + public void testAddOnlineSubscriptionsTwoBridges() throws Exception { + + final ActiveMQTopic topic = new ActiveMQTopic(testTopicName); + final ActiveMQTopic excludeTopic = new ActiveMQTopic(excludeTopicName); + final ActiveMQTopic topic2 = new ActiveMQTopic("include.new.topic"); + + assertSubscriptionsCount(broker1, topic, 0); + assertNCDurableSubsCount(broker2, topic, 0); + + //create durable that shouldn't be propagated + session1.createDurableSubscriber(excludeTopic, "sub-exclude"); + + //Add 3 online subs + session1.createDurableSubscriber(topic, subName); + session1.createDurableSubscriber(topic, "sub2"); + session1.createDurableSubscriber(topic, "sub3"); + //Add sub on second topic/bridge + session1.createDurableSubscriber(topic2, "secondTopicSubName"); + assertSubscriptionsCount(broker1, topic, 3); + assertSubscriptionsCount(broker1, topic2, 1); + + //Add the second network connector + NetworkConnector secondConnector = configureLocalNetworkConnector(); + secondConnector.setName("networkConnector2"); + secondConnector.setDynamicallyIncludedDestinations( + Lists.<ActiveMQDestination>newArrayList( + new ActiveMQTopic("include.new.topic?forceDurable=" + forceDurable))); + localBroker.addNetworkConnector(secondConnector); + secondConnector.start(); + + //Make sure both bridges are connected + assertTrue(Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1 && + localBroker.getNetworkConnectors().get(1).activeBridges().size() == 1; + } + }, 10000, 500)); + + //Make sure NC durables exist for both bridges + assertNCDurableSubsCount(broker2, topic2, 1); + assertNCDurableSubsCount(broker2, topic, 1); + assertNCDurableSubsCount(broker2, excludeTopic, 0); + + //Make sure message can reach remote broker + MessageProducer producer = session2.createProducer(topic2); + producer.send(session2.createTextMessage("test")); + waitForDispatchFromLocalBroker(broker2.getDestination(topic2).getDestinationStatistics(), 1); + assertLocalBrokerStatistics(broker2.getDestination(topic2).getDestinationStatistics(), 1); + } + @Test(timeout = 60 * 1000) public void testVirtualDestSubForceDurableSync() throws Exception { Assume.assumeTrue(flow == FLOW.FORWARD);