[4/6] activemq git commit: AMQ-6858 - handle resync of network proxy durables after restart
AMQ-6858 - handle resync of network proxy durables after restart We need to properly handle the re-addition of network proxy durables after the brokers are restarted so removal is done properly (cherry picked from commit 6013441a9a7c4a13f7412d6d72638de0f420e6a3) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/247243c9 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/247243c9 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/247243c9 Branch: refs/heads/activemq-5.15.x Commit: 247243c9c120f5d1c526330813dcc2b8079f52c3 Parents: 39d6321 Author: Christopher L. Shannon (cshannon) Authored: Wed Nov 15 08:22:47 2017 -0500 Committer: Christopher L. Shannon (cshannon) Committed: Thu Nov 16 07:50:40 2017 -0500 -- .../apache/activemq/network/ConduitBridge.java | 14 ++- .../network/DemandForwardingBridgeSupport.java | 89 +- .../activemq/network/DemandSubscription.java| 6 - .../DurableFiveBrokerNetworkBridgeTest.java | 117 ++- 4 files changed, 185 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/247243c9/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java index bc9d004..70f45f7 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java @@ -81,10 +81,16 @@ public class ConduitBridge extends DemandForwardingBridge { ds.addForcedDurableConsumer(info.getConsumerId()); } } else { - if (isProxyNSConsumer(info)) { - final BrokerId[] path = info.getBrokerPath(); - addProxyNetworkSubscription(ds, path, info.getSubscriptionName()); - } else { +//Handle the demand generated by proxy network subscriptions +//The broker path is case is normal +if (isProxyNSConsumerBrokerPath(info)) { +final BrokerId[] path = info.getBrokerPath(); +addProxyNetworkSubscriptionBrokerPath(ds, path, info.getSubscriptionName()); +//This is the durable sync case on broker restart +} else if (isProxyNSConsumerClientId(info.getClientId()) && +isProxyBridgeSubscription(info.getClientId(), info.getSubscriptionName())) { +addProxyNetworkSubscriptionClientId(ds, info.getClientId(), info.getSubscriptionName()); +} else { ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName())); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/247243c9/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 75084d1..df493c3 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -36,7 +36,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import javax.management.ObjectName; @@ -672,31 +671,53 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br (info.getClientId() == null || info.getClientId().startsWith(configuration.getName())); } -private boolean isProxyBridgeSubscription(SubscriptionInfo info) { -if (info.getSubcriptionName() != null && info.getClientId() != null) { -if (info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) -&& !info.getClientId().startsWith(configuration.getName())) { +protected boolean isProxyBridgeSubscription(String clientId, String subName) { +if (subName != null && clientId != null) { +if (subName.startsWith(DURABLE_SUB_PREFIX) && !clientId.startsWith(confi
activemq git commit: AMQ-6858 - handle resync of network proxy durables after restart
Repository: activemq Updated Branches: refs/heads/master 50243106c -> 6013441a9 AMQ-6858 - handle resync of network proxy durables after restart We need to properly handle the re-addition of network proxy durables after the brokers are restarted so removal is done properly Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6013441a Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6013441a Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6013441a Branch: refs/heads/master Commit: 6013441a9a7c4a13f7412d6d72638de0f420e6a3 Parents: 5024310 Author: Christopher L. Shannon (cshannon) Authored: Wed Nov 15 08:22:47 2017 -0500 Committer: Christopher L. Shannon (cshannon) Committed: Wed Nov 15 08:56:43 2017 -0500 -- .../apache/activemq/network/ConduitBridge.java | 14 ++- .../network/DemandForwardingBridgeSupport.java | 89 +- .../activemq/network/DemandSubscription.java| 6 - .../DurableFiveBrokerNetworkBridgeTest.java | 117 ++- 4 files changed, 185 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/6013441a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java index bc9d004..70f45f7 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java @@ -81,10 +81,16 @@ public class ConduitBridge extends DemandForwardingBridge { ds.addForcedDurableConsumer(info.getConsumerId()); } } else { - if (isProxyNSConsumer(info)) { - final BrokerId[] path = info.getBrokerPath(); - addProxyNetworkSubscription(ds, path, info.getSubscriptionName()); - } else { +//Handle the demand generated by proxy network subscriptions +//The broker path is case is normal +if (isProxyNSConsumerBrokerPath(info)) { +final BrokerId[] path = info.getBrokerPath(); +addProxyNetworkSubscriptionBrokerPath(ds, path, info.getSubscriptionName()); +//This is the durable sync case on broker restart +} else if (isProxyNSConsumerClientId(info.getClientId()) && +isProxyBridgeSubscription(info.getClientId(), info.getSubscriptionName())) { +addProxyNetworkSubscriptionClientId(ds, info.getClientId(), info.getSubscriptionName()); +} else { ds.getDurableRemoteSubs().add(new SubscriptionInfo(info.getClientId(), info.getSubscriptionName())); } } http://git-wip-us.apache.org/repos/asf/activemq/blob/6013441a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 75084d1..df493c3 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -36,7 +36,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; import java.util.regex.Pattern; import javax.management.ObjectName; @@ -672,31 +671,53 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br (info.getClientId() == null || info.getClientId().startsWith(configuration.getName())); } -private boolean isProxyBridgeSubscription(SubscriptionInfo info) { -if (info.getSubcriptionName() != null && info.getClientId() != null) { -if (info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) -&& !info.getClientId().startsWith(configuration.getName())) { +protected boolean isProxyBridgeSubscription(String clientId, String subName) { +if (subName != null && clientId != null) { +if (subName.startsWith(DURABLE_SUB_PREFIX) && !clientId.startsWith(