[4/6] activemq git commit: AMQ-6858 - handle resync of network proxy durables after restart

2017-11-16 Thread cshannon
AMQ-6858 - handle resync of network proxy durables after restart

We need to properly handle the re-addition of network proxy durables
after the brokers are restarted so removal is done properly

(cherry picked from commit 6013441a9a7c4a13f7412d6d72638de0f420e6a3)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/247243c9
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/247243c9
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/247243c9

Branch: refs/heads/activemq-5.15.x
Commit: 247243c9c120f5d1c526330813dcc2b8079f52c3
Parents: 39d6321
Author: Christopher L. Shannon (cshannon) 
Authored: Wed Nov 15 08:22:47 2017 -0500
Committer: Christopher L. Shannon (cshannon) 
Committed: Thu Nov 16 07:50:40 2017 -0500

--
 .../apache/activemq/network/ConduitBridge.java  |  14 ++-
 .../network/DemandForwardingBridgeSupport.java  |  89 +-
 .../activemq/network/DemandSubscription.java|   6 -
 .../DurableFiveBrokerNetworkBridgeTest.java | 117 ++-
 4 files changed, 185 insertions(+), 41 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/247243c9/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java 
b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
index bc9d004..70f45f7 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
@@ -81,10 +81,16 @@ public class ConduitBridge extends DemandForwardingBridge {
 ds.addForcedDurableConsumer(info.getConsumerId());
 }
 } else {
-   if (isProxyNSConsumer(info)) {
-   final BrokerId[] path = info.getBrokerPath();
-   addProxyNetworkSubscription(ds, path, 
info.getSubscriptionName());
-   } else {
+//Handle the demand generated by proxy network 
subscriptions
+//The broker path is case is normal
+if (isProxyNSConsumerBrokerPath(info)) {
+final BrokerId[] path = info.getBrokerPath();
+addProxyNetworkSubscriptionBrokerPath(ds, path, 
info.getSubscriptionName());
+//This is the durable sync case on broker restart
+} else if (isProxyNSConsumerClientId(info.getClientId()) &&
+isProxyBridgeSubscription(info.getClientId(), 
info.getSubscriptionName())) {
+addProxyNetworkSubscriptionClientId(ds, 
info.getClientId(), info.getSubscriptionName());
+} else {
ds.getDurableRemoteSubs().add(new 
SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
}
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/247243c9/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 75084d1..df493c3 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -36,7 +36,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 import javax.management.ObjectName;
@@ -672,31 +671,53 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
 (info.getClientId() == null || 
info.getClientId().startsWith(configuration.getName()));
 }
 
-private boolean isProxyBridgeSubscription(SubscriptionInfo info) {
-if (info.getSubcriptionName() != null && info.getClientId() != null) {
-if (info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)
-&& 
!info.getClientId().startsWith(configuration.getName())) {
+protected boolean isProxyBridgeSubscription(String clientId, String 
subName) {
+if (subName != null && clientId != null) {
+if (subName.startsWith(DURABLE_SUB_PREFIX) && 
!clientId.startsWith(confi

activemq git commit: AMQ-6858 - handle resync of network proxy durables after restart

2017-11-15 Thread cshannon
Repository: activemq
Updated Branches:
  refs/heads/master 50243106c -> 6013441a9


AMQ-6858 - handle resync of network proxy durables after restart

We need to properly handle the re-addition of network proxy durables
after the brokers are restarted so removal is done properly


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/6013441a
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/6013441a
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/6013441a

Branch: refs/heads/master
Commit: 6013441a9a7c4a13f7412d6d72638de0f420e6a3
Parents: 5024310
Author: Christopher L. Shannon (cshannon) 
Authored: Wed Nov 15 08:22:47 2017 -0500
Committer: Christopher L. Shannon (cshannon) 
Committed: Wed Nov 15 08:56:43 2017 -0500

--
 .../apache/activemq/network/ConduitBridge.java  |  14 ++-
 .../network/DemandForwardingBridgeSupport.java  |  89 +-
 .../activemq/network/DemandSubscription.java|   6 -
 .../DurableFiveBrokerNetworkBridgeTest.java | 117 ++-
 4 files changed, 185 insertions(+), 41 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/6013441a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java 
b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
index bc9d004..70f45f7 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
@@ -81,10 +81,16 @@ public class ConduitBridge extends DemandForwardingBridge {
 ds.addForcedDurableConsumer(info.getConsumerId());
 }
 } else {
-   if (isProxyNSConsumer(info)) {
-   final BrokerId[] path = info.getBrokerPath();
-   addProxyNetworkSubscription(ds, path, 
info.getSubscriptionName());
-   } else {
+//Handle the demand generated by proxy network 
subscriptions
+//The broker path is case is normal
+if (isProxyNSConsumerBrokerPath(info)) {
+final BrokerId[] path = info.getBrokerPath();
+addProxyNetworkSubscriptionBrokerPath(ds, path, 
info.getSubscriptionName());
+//This is the durable sync case on broker restart
+} else if (isProxyNSConsumerClientId(info.getClientId()) &&
+isProxyBridgeSubscription(info.getClientId(), 
info.getSubscriptionName())) {
+addProxyNetworkSubscriptionClientId(ds, 
info.getClientId(), info.getSubscriptionName());
+} else {
ds.getDurableRemoteSubs().add(new 
SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
}
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/6013441a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 75084d1..df493c3 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -36,7 +36,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 import javax.management.ObjectName;
@@ -672,31 +671,53 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
 (info.getClientId() == null || 
info.getClientId().startsWith(configuration.getName()));
 }
 
-private boolean isProxyBridgeSubscription(SubscriptionInfo info) {
-if (info.getSubcriptionName() != null && info.getClientId() != null) {
-if (info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)
-&& 
!info.getClientId().startsWith(configuration.getName())) {
+protected boolean isProxyBridgeSubscription(String clientId, String 
subName) {
+if (subName != null && clientId != null) {
+if (subName.startsWith(DURABLE_SUB_PREFIX) && 
!clientId.startsWith(