Repository: activemq
Updated Branches:
  refs/heads/master 78c959a5c -> 09054fc4a


https://issues.apache.org/jira/browse/AMQ-6050

Clearing the subscription from the local map in
DemandForwardingBridgeSupport to make sure that demand can be properly
recreated again.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/09054fc4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/09054fc4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/09054fc4

Branch: refs/heads/master
Commit: 09054fc4a85bcdef9e19e589be368d0e9cf0de41
Parents: 78c959a
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Wed Nov 18 15:05:12 2015 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Wed Nov 18 15:06:33 2015 +0000

----------------------------------------------------------------------
 .../network/DemandForwardingBridgeSupport.java  |   3 +
 .../network/DynamicNetworkTestSupport.java      |  97 ++++++++
 .../network/NetworkDurableRecreationTest.java   | 224 +++++++++++++++++++
 .../network/VirtualConsumerDemandTest.java      |  57 +----
 4 files changed, 329 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/09054fc4/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index fac39ac..112f5ff 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -833,6 +833,9 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
                         
sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName());
                         
sending.setConnectionId(this.localConnectionInfo.getConnectionId());
                         localBroker.oneway(sending);
+
+                        //remove subscriber from map
+                        i.remove();
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/09054fc4/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
new file mode 100644
index 0000000..b2c178e
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.util.Wait;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+
+
+public abstract class DynamicNetworkTestSupport {
+
+    protected Connection localConnection;
+    protected Connection remoteConnection;
+    protected BrokerService localBroker;
+    protected BrokerService remoteBroker;
+    protected Session localSession;
+    protected Session remoteSession;
+    protected ActiveMQTopic included;
+    protected ActiveMQTopic excluded;
+    protected String testTopicName = "include.test.bar";
+    protected String excludeTopicName = "exclude.test.bar";
+    protected String clientId = "clientId";
+    protected String subName = "subId";
+
+    @Rule
+    public TemporaryFolder tempFolder = new TemporaryFolder(new 
File("target"));
+
+    protected RemoveSubscriptionInfo getRemoveSubscriptionInfo(final 
ConnectionContext context,
+            final BrokerService brokerService) throws Exception {
+        RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
+        info.setClientId(clientId);
+        info.setSubcriptionName(subName);
+        context.setBroker(brokerService.getBroker());
+        context.setClientId(clientId);
+        return info;
+    }
+
+    protected void waitForConsumerCount(final DestinationStatistics 
destinationStatistics, final int count) throws Exception {
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                //should only be 1 for the composite destination creation
+                return count == 
destinationStatistics.getConsumers().getCount();
+            }
+        });
+    }
+
+    protected void waitForDispatchFromLocalBroker(final DestinationStatistics 
destinationStatistics, final int count) throws Exception {
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return count == destinationStatistics.getDequeues().getCount() 
&&
+                       count == 
destinationStatistics.getDispatched().getCount() &&
+                       count == destinationStatistics.getForwards().getCount();
+            }
+        });
+    }
+
+    protected void assertLocalBrokerStatistics(final DestinationStatistics 
localStatistics, final int count) {
+        assertEquals("local broker dest stat dispatched", count, 
localStatistics.getDispatched().getCount());
+        assertEquals("local broker dest stat dequeues", count, 
localStatistics.getDequeues().getCount());
+        assertEquals("local broker dest stat forwards", count, 
localStatistics.getForwards().getCount());
+    }
+
+    protected interface ConsumerCreator {
+        MessageConsumer createConsumer() throws JMSException;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/09054fc4/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkDurableRecreationTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkDurableRecreationTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkDurableRecreationTest.java
new file mode 100644
index 0000000..c5899a0
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkDurableRecreationTest.java
@@ -0,0 +1,224 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.net.URI;
+
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+/**
+ * This test is to show that if a durable subscription over a network bridge 
is deleted and
+ * re-created, messages will flow properly again for dynamic subscriptions.
+ *
+ * AMQ-6050
+ */
+public class NetworkDurableRecreationTest extends DynamicNetworkTestSupport {
+
+    /**
+     * Test publisher on localBroker and durable on remoteBroker
+     * after durable deletion, recreate durable
+     */
+    @Test(timeout = 30 * 1000)
+    public void testDurableConsumer() throws Exception {
+        testReceive(remoteBroker, remoteSession, localBroker, localSession, 
new ConsumerCreator() {
+
+            @Override
+            public MessageConsumer createConsumer() throws JMSException {
+                return remoteSession.createDurableSubscriber(included, 
subName);
+            }
+        });
+    }
+
+    /**
+     * Reverse and test publisher on remoteBroker and durable on localBroker
+     * after durable deletion, recreate durable
+     */
+    @Test(timeout = 30 * 1000)
+    public void testDurableConsumerReverse() throws Exception {
+        testReceive(localBroker, localSession, remoteBroker, remoteSession, 
new ConsumerCreator() {
+
+            @Override
+            public MessageConsumer createConsumer() throws JMSException {
+                return localSession.createDurableSubscriber(included, subName);
+            }
+        });
+    }
+
+    /**
+     * Test publisher on localBroker and durable on remoteBroker
+     * after durable deletion, recreate with a non-durable consumer
+     */
+    @Test(timeout = 30 * 1000)
+    public void testDurableAndTopicConsumer() throws Exception {
+        testReceive(remoteBroker, remoteSession, localBroker, localSession, 
new ConsumerCreator() {
+
+            @Override
+            public MessageConsumer createConsumer() throws JMSException {
+                return remoteSession.createConsumer(included);
+            }
+        });
+    }
+
+    /**
+     * Reverse and test publisher on remoteBroker and durable on localBroker
+     * after durable deletion, recreate with a non-durable consumer
+     */
+    @Test(timeout = 30 * 1000)
+    public void testDurableAndTopicConsumerReverse() throws Exception {
+        testReceive(localBroker, localSession, remoteBroker, remoteSession, 
new ConsumerCreator() {
+
+            @Override
+            public MessageConsumer createConsumer() throws JMSException {
+                return localSession.createConsumer(included);
+            }
+        });
+    }
+
+    public void testReceive(BrokerService receiveBroker, Session 
receiveSession,
+            BrokerService publishBroker, Session publishSession, 
ConsumerCreator secondConsumerCreator) throws Exception {
+
+        final DestinationStatistics destinationStatistics =
+                
publishBroker.getDestination(included).getDestinationStatistics();
+
+        MessageProducer includedProducer = 
publishSession.createProducer(included);
+        MessageConsumer bridgeConsumer = 
receiveSession.createDurableSubscriber(
+                included, subName);
+
+        waitForConsumerCount(destinationStatistics, 1);
+
+        //remove the durable
+        final ConnectionContext context = new ConnectionContext();
+        RemoveSubscriptionInfo info = getRemoveSubscriptionInfo(context, 
receiveBroker);
+        bridgeConsumer.close();
+        Thread.sleep(1000);
+        receiveBroker.getBroker().removeSubscription(context, info);
+        waitForConsumerCount(destinationStatistics, 0);
+
+        //re-create consumer
+        MessageConsumer bridgeConsumer2 = 
secondConsumerCreator.createConsumer();
+        waitForConsumerCount(destinationStatistics, 1);
+
+        //make sure message received
+        includedProducer.send(publishSession.createTextMessage("test"));
+        assertNotNull(bridgeConsumer2.receive(5000));
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        doSetUp(true);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        doTearDown();
+    }
+
+    protected void doTearDown() throws Exception {
+        if (localConnection != null) {
+            localConnection.close();
+        }
+        if (remoteConnection != null) {
+            remoteConnection.close();
+        }
+        if (localBroker != null) {
+            localBroker.stop();
+        }
+        if (remoteBroker != null) {
+            remoteBroker.stop();
+        }
+    }
+
+
+    protected void doSetUp(boolean deleteAllMessages) throws Exception {
+        remoteBroker = createRemoteBroker();
+        remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        remoteBroker.start();
+        remoteBroker.waitUntilStarted();
+        localBroker = createLocalBroker();
+        localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        localBroker.start();
+        localBroker.waitUntilStarted();
+        URI localURI = localBroker.getVmConnectorURI();
+        ActiveMQConnectionFactory fac = new 
ActiveMQConnectionFactory(localURI);
+        fac.setAlwaysSyncSend(true);
+        fac.setDispatchAsync(false);
+        localConnection = fac.createConnection();
+        localConnection.setClientID(clientId);
+        localConnection.start();
+        URI remoteURI = remoteBroker.getVmConnectorURI();
+        fac = new ActiveMQConnectionFactory(remoteURI);
+        remoteConnection = fac.createConnection();
+        remoteConnection.setClientID(clientId);
+        remoteConnection.start();
+        included = new ActiveMQTopic(testTopicName);
+        localSession = localConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        remoteSession = remoteConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+    }
+
+
+    protected NetworkConnector connector;
+    protected BrokerService createLocalBroker() throws Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.setMonitorConnectionSplits(true);
+        brokerService.setDataDirectoryFile(tempFolder.newFolder());
+        brokerService.setBrokerName("localBroker");
+
+        connector = new DiscoveryNetworkConnector(new 
URI("static:(tcp://localhost:61617)"));
+        connector.setName("networkConnector");
+        connector.setDecreaseNetworkConsumerPriority(false);
+        connector.setConduitSubscriptions(true);
+        connector.setDuplex(true);
+        
connector.setDynamicallyIncludedDestinations(Lists.<ActiveMQDestination>newArrayList(
+                new ActiveMQTopic(testTopicName)));
+        
connector.setExcludedDestinations(Lists.<ActiveMQDestination>newArrayList(
+                new ActiveMQTopic(excludeTopicName)));
+
+        brokerService.addNetworkConnector(connector);
+        brokerService.addConnector("tcp://localhost:61616");
+
+        return brokerService;
+    }
+
+    protected BrokerService createRemoteBroker() throws Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.setBrokerName("remoteBroker");
+        brokerService.setUseJmx(false);
+        brokerService.setDataDirectoryFile(tempFolder.newFolder());
+        brokerService.addConnector("tcp://localhost:61617");
+
+        return brokerService;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/09054fc4/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
index 202a441..ad22b07 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import java.io.File;
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.util.Arrays;
@@ -29,7 +28,6 @@ import java.util.Collection;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
-import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
@@ -58,9 +56,7 @@ import org.apache.activemq.util.Wait;
 import org.junit.After;
 import org.junit.Assume;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameters;
@@ -75,7 +71,7 @@ import com.google.common.collect.Lists;
  * as demand.
  */
 @RunWith(Parameterized.class)
-public class VirtualConsumerDemandTest {
+public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport {
 
     protected static final int MESSAGE_COUNT = 10;
     private static final Logger LOG = 
LoggerFactory.getLogger(VirtualConsumerDemandTest.class);
@@ -96,25 +92,14 @@ public class VirtualConsumerDemandTest {
         });
     }
 
-    protected Connection localConnection;
-    protected Connection remoteConnection;
-    protected BrokerService localBroker;
-    protected BrokerService remoteBroker;
+
     protected JavaRuntimeConfigurationBroker runtimeBroker;
-    protected Session localSession;
-    protected Session remoteSession;
-    protected ActiveMQTopic included;
-    protected ActiveMQTopic excluded;
     protected String consumerName = "durableSubs";
-    protected String testTopicName = "include.test.bar";
     protected String testQueueName = "include.test.foo";
 
     private final boolean isDuplex;
     private final boolean isUseVirtualDestSubsOnCreation;
 
-    @Rule
-    public TemporaryFolder tempFolder = new TemporaryFolder(new 
File("target"));
-
 
     public VirtualConsumerDemandTest(boolean isDuplex, boolean 
isUseVirtualDestSubsOnCreation) {
        // Assume.assumeTrue(
@@ -790,8 +775,8 @@ public class VirtualConsumerDemandTest {
         MessageConsumer advisoryConsumer = 
getVirtualDestinationAdvisoryConsumer(testTopicName);
 
         //configure a virtual destination that forwards messages to an 
excluded destination
-        CompositeTopic compositeTopic = 
createCompositeTopic("excluded.test.bar",
-                new ActiveMQQueue("excluded.test.bar.bridge"));
+        CompositeTopic compositeTopic = 
createCompositeTopic("exclude.test.bar",
+                new ActiveMQQueue("exclude.test.bar.bridge"));
 
         runtimeBroker.setVirtualDestinations(new VirtualDestination[] 
{compositeTopic}, true);
 
@@ -799,7 +784,7 @@ public class VirtualConsumerDemandTest {
         Message test = localSession.createTextMessage("test");
         Thread.sleep(1000);
 
-        MessageConsumer bridgeConsumer = remoteSession.createConsumer(new 
ActiveMQQueue("excluded.test.bar.bridge"));
+        MessageConsumer bridgeConsumer = remoteSession.createConsumer(new 
ActiveMQQueue("exclude.test.bar.bridge"));
         Thread.sleep(2000);
         includedProducer.send(test);
         assertNull(bridgeConsumer.receive(5000));
@@ -1302,13 +1287,8 @@ public class VirtualConsumerDemandTest {
         remoteAdvisoryBroker = (AdvisoryBroker)
                 brokerService.getBroker().getAdaptor(AdvisoryBroker.class);
 
-        NetworkConnector connector = new DiscoveryNetworkConnector(new 
URI("static:(tcp://localhost:61616)"));
-        brokerService.addNetworkConnector(connector);
-
         brokerService.addConnector("tcp://localhost:61617");
 
-
-
         return brokerService;
     }
 
@@ -1330,27 +1310,6 @@ public class VirtualConsumerDemandTest {
         return compositeQueue;
     }
 
-    protected void waitForConsumerCount(final DestinationStatistics 
destinationStatistics, final int count) throws Exception {
-        Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                //should only be 1 for the composite destination creation
-                return count == 
destinationStatistics.getConsumers().getCount();
-            }
-        });
-    }
-
-    protected void waitForDispatchFromLocalBroker(final DestinationStatistics 
destinationStatistics, final int count) throws Exception {
-        Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return count == destinationStatistics.getDequeues().getCount() 
&&
-                       count == 
destinationStatistics.getDispatched().getCount() &&
-                       count == destinationStatistics.getForwards().getCount();
-            }
-        });
-    }
-
     protected MessageConsumer getVirtualDestinationAdvisoryConsumer(String 
topic) throws JMSException {
         return 
remoteSession.createConsumer(AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(
                 new ActiveMQTopic(topic)));
@@ -1361,12 +1320,6 @@ public class VirtualConsumerDemandTest {
                 new ActiveMQQueue(queue)));
     }
 
-    protected void assertLocalBrokerStatistics(final DestinationStatistics 
localStatistics, final int count) {
-        assertEquals("local broker dest stat dispatched", count, 
localStatistics.getDispatched().getCount());
-        assertEquals("local broker dest stat dequeues", count, 
localStatistics.getDequeues().getCount());
-        assertEquals("local broker dest stat forwards", count, 
localStatistics.getForwards().getCount());
-    }
-
     protected void assertRemoteAdvisoryCount(final MessageConsumer 
advisoryConsumer, final int count) throws JMSException {
         int available = 0;
         ActiveMQMessage message = null;

Reply via email to