Repository: activemq Updated Branches: refs/heads/master 78c959a5c -> 09054fc4a
https://issues.apache.org/jira/browse/AMQ-6050 Clearing the subscription from the local map in DemandForwardingBridgeSupport to make sure that demand can be properly recreated again. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/09054fc4 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/09054fc4 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/09054fc4 Branch: refs/heads/master Commit: 09054fc4a85bcdef9e19e589be368d0e9cf0de41 Parents: 78c959a Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Wed Nov 18 15:05:12 2015 +0000 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Wed Nov 18 15:06:33 2015 +0000 ---------------------------------------------------------------------- .../network/DemandForwardingBridgeSupport.java | 3 + .../network/DynamicNetworkTestSupport.java | 97 ++++++++ .../network/NetworkDurableRecreationTest.java | 224 +++++++++++++++++++ .../network/VirtualConsumerDemandTest.java | 57 +---- 4 files changed, 329 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/09054fc4/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index fac39ac..112f5ff 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -833,6 +833,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br sending.setSubscriptionName(ds.getLocalDurableSubscriber().getSubscriptionName()); sending.setConnectionId(this.localConnectionInfo.getConnectionId()); localBroker.oneway(sending); + + //remove subscriber from map + i.remove(); } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/09054fc4/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java new file mode 100644 index 0000000..b2c178e --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertEquals; + +import java.io.File; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.RemoveSubscriptionInfo; +import org.apache.activemq.util.Wait; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + + +public abstract class DynamicNetworkTestSupport { + + protected Connection localConnection; + protected Connection remoteConnection; + protected BrokerService localBroker; + protected BrokerService remoteBroker; + protected Session localSession; + protected Session remoteSession; + protected ActiveMQTopic included; + protected ActiveMQTopic excluded; + protected String testTopicName = "include.test.bar"; + protected String excludeTopicName = "exclude.test.bar"; + protected String clientId = "clientId"; + protected String subName = "subId"; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(new File("target")); + + protected RemoveSubscriptionInfo getRemoveSubscriptionInfo(final ConnectionContext context, + final BrokerService brokerService) throws Exception { + RemoveSubscriptionInfo info = new RemoveSubscriptionInfo(); + info.setClientId(clientId); + info.setSubcriptionName(subName); + context.setBroker(brokerService.getBroker()); + context.setClientId(clientId); + return info; + } + + protected void waitForConsumerCount(final DestinationStatistics destinationStatistics, final int count) throws Exception { + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + //should only be 1 for the composite destination creation + return count == destinationStatistics.getConsumers().getCount(); + } + }); + } + + protected void waitForDispatchFromLocalBroker(final DestinationStatistics destinationStatistics, final int count) throws Exception { + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return count == destinationStatistics.getDequeues().getCount() && + count == destinationStatistics.getDispatched().getCount() && + count == destinationStatistics.getForwards().getCount(); + } + }); + } + + protected void assertLocalBrokerStatistics(final DestinationStatistics localStatistics, final int count) { + assertEquals("local broker dest stat dispatched", count, localStatistics.getDispatched().getCount()); + assertEquals("local broker dest stat dequeues", count, localStatistics.getDequeues().getCount()); + assertEquals("local broker dest stat forwards", count, localStatistics.getForwards().getCount()); + } + + protected interface ConsumerCreator { + MessageConsumer createConsumer() throws JMSException; + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/09054fc4/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkDurableRecreationTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkDurableRecreationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkDurableRecreationTest.java new file mode 100644 index 0000000..c5899a0 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkDurableRecreationTest.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertNotNull; + +import java.net.URI; + +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.RemoveSubscriptionInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Lists; + +/** + * This test is to show that if a durable subscription over a network bridge is deleted and + * re-created, messages will flow properly again for dynamic subscriptions. + * + * AMQ-6050 + */ +public class NetworkDurableRecreationTest extends DynamicNetworkTestSupport { + + /** + * Test publisher on localBroker and durable on remoteBroker + * after durable deletion, recreate durable + */ + @Test(timeout = 30 * 1000) + public void testDurableConsumer() throws Exception { + testReceive(remoteBroker, remoteSession, localBroker, localSession, new ConsumerCreator() { + + @Override + public MessageConsumer createConsumer() throws JMSException { + return remoteSession.createDurableSubscriber(included, subName); + } + }); + } + + /** + * Reverse and test publisher on remoteBroker and durable on localBroker + * after durable deletion, recreate durable + */ + @Test(timeout = 30 * 1000) + public void testDurableConsumerReverse() throws Exception { + testReceive(localBroker, localSession, remoteBroker, remoteSession, new ConsumerCreator() { + + @Override + public MessageConsumer createConsumer() throws JMSException { + return localSession.createDurableSubscriber(included, subName); + } + }); + } + + /** + * Test publisher on localBroker and durable on remoteBroker + * after durable deletion, recreate with a non-durable consumer + */ + @Test(timeout = 30 * 1000) + public void testDurableAndTopicConsumer() throws Exception { + testReceive(remoteBroker, remoteSession, localBroker, localSession, new ConsumerCreator() { + + @Override + public MessageConsumer createConsumer() throws JMSException { + return remoteSession.createConsumer(included); + } + }); + } + + /** + * Reverse and test publisher on remoteBroker and durable on localBroker + * after durable deletion, recreate with a non-durable consumer + */ + @Test(timeout = 30 * 1000) + public void testDurableAndTopicConsumerReverse() throws Exception { + testReceive(localBroker, localSession, remoteBroker, remoteSession, new ConsumerCreator() { + + @Override + public MessageConsumer createConsumer() throws JMSException { + return localSession.createConsumer(included); + } + }); + } + + public void testReceive(BrokerService receiveBroker, Session receiveSession, + BrokerService publishBroker, Session publishSession, ConsumerCreator secondConsumerCreator) throws Exception { + + final DestinationStatistics destinationStatistics = + publishBroker.getDestination(included).getDestinationStatistics(); + + MessageProducer includedProducer = publishSession.createProducer(included); + MessageConsumer bridgeConsumer = receiveSession.createDurableSubscriber( + included, subName); + + waitForConsumerCount(destinationStatistics, 1); + + //remove the durable + final ConnectionContext context = new ConnectionContext(); + RemoveSubscriptionInfo info = getRemoveSubscriptionInfo(context, receiveBroker); + bridgeConsumer.close(); + Thread.sleep(1000); + receiveBroker.getBroker().removeSubscription(context, info); + waitForConsumerCount(destinationStatistics, 0); + + //re-create consumer + MessageConsumer bridgeConsumer2 = secondConsumerCreator.createConsumer(); + waitForConsumerCount(destinationStatistics, 1); + + //make sure message received + includedProducer.send(publishSession.createTextMessage("test")); + assertNotNull(bridgeConsumer2.receive(5000)); + } + + @Before + public void setUp() throws Exception { + doSetUp(true); + } + + @After + public void tearDown() throws Exception { + doTearDown(); + } + + protected void doTearDown() throws Exception { + if (localConnection != null) { + localConnection.close(); + } + if (remoteConnection != null) { + remoteConnection.close(); + } + if (localBroker != null) { + localBroker.stop(); + } + if (remoteBroker != null) { + remoteBroker.stop(); + } + } + + + protected void doSetUp(boolean deleteAllMessages) throws Exception { + remoteBroker = createRemoteBroker(); + remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); + remoteBroker.start(); + remoteBroker.waitUntilStarted(); + localBroker = createLocalBroker(); + localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); + localBroker.start(); + localBroker.waitUntilStarted(); + URI localURI = localBroker.getVmConnectorURI(); + ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI); + fac.setAlwaysSyncSend(true); + fac.setDispatchAsync(false); + localConnection = fac.createConnection(); + localConnection.setClientID(clientId); + localConnection.start(); + URI remoteURI = remoteBroker.getVmConnectorURI(); + fac = new ActiveMQConnectionFactory(remoteURI); + remoteConnection = fac.createConnection(); + remoteConnection.setClientID(clientId); + remoteConnection.start(); + included = new ActiveMQTopic(testTopicName); + localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + + protected NetworkConnector connector; + protected BrokerService createLocalBroker() throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setMonitorConnectionSplits(true); + brokerService.setDataDirectoryFile(tempFolder.newFolder()); + brokerService.setBrokerName("localBroker"); + + connector = new DiscoveryNetworkConnector(new URI("static:(tcp://localhost:61617)")); + connector.setName("networkConnector"); + connector.setDecreaseNetworkConsumerPriority(false); + connector.setConduitSubscriptions(true); + connector.setDuplex(true); + connector.setDynamicallyIncludedDestinations(Lists.<ActiveMQDestination>newArrayList( + new ActiveMQTopic(testTopicName))); + connector.setExcludedDestinations(Lists.<ActiveMQDestination>newArrayList( + new ActiveMQTopic(excludeTopicName))); + + brokerService.addNetworkConnector(connector); + brokerService.addConnector("tcp://localhost:61616"); + + return brokerService; + } + + protected BrokerService createRemoteBroker() throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setBrokerName("remoteBroker"); + brokerService.setUseJmx(false); + brokerService.setDataDirectoryFile(tempFolder.newFolder()); + brokerService.addConnector("tcp://localhost:61617"); + + return brokerService; + } + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/09054fc4/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java index 202a441..ad22b07 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import java.io.File; import java.lang.reflect.Field; import java.net.URI; import java.util.Arrays; @@ -29,7 +28,6 @@ import java.util.Collection; import java.util.Set; import java.util.concurrent.ConcurrentMap; -import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -58,9 +56,7 @@ import org.apache.activemq.util.Wait; import org.junit.After; import org.junit.Assume; import org.junit.Before; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; @@ -75,7 +71,7 @@ import com.google.common.collect.Lists; * as demand. */ @RunWith(Parameterized.class) -public class VirtualConsumerDemandTest { +public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport { protected static final int MESSAGE_COUNT = 10; private static final Logger LOG = LoggerFactory.getLogger(VirtualConsumerDemandTest.class); @@ -96,25 +92,14 @@ public class VirtualConsumerDemandTest { }); } - protected Connection localConnection; - protected Connection remoteConnection; - protected BrokerService localBroker; - protected BrokerService remoteBroker; + protected JavaRuntimeConfigurationBroker runtimeBroker; - protected Session localSession; - protected Session remoteSession; - protected ActiveMQTopic included; - protected ActiveMQTopic excluded; protected String consumerName = "durableSubs"; - protected String testTopicName = "include.test.bar"; protected String testQueueName = "include.test.foo"; private final boolean isDuplex; private final boolean isUseVirtualDestSubsOnCreation; - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(new File("target")); - public VirtualConsumerDemandTest(boolean isDuplex, boolean isUseVirtualDestSubsOnCreation) { // Assume.assumeTrue( @@ -790,8 +775,8 @@ public class VirtualConsumerDemandTest { MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); //configure a virtual destination that forwards messages to an excluded destination - CompositeTopic compositeTopic = createCompositeTopic("excluded.test.bar", - new ActiveMQQueue("excluded.test.bar.bridge")); + CompositeTopic compositeTopic = createCompositeTopic("exclude.test.bar", + new ActiveMQQueue("exclude.test.bar.bridge")); runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}, true); @@ -799,7 +784,7 @@ public class VirtualConsumerDemandTest { Message test = localSession.createTextMessage("test"); Thread.sleep(1000); - MessageConsumer bridgeConsumer = remoteSession.createConsumer(new ActiveMQQueue("excluded.test.bar.bridge")); + MessageConsumer bridgeConsumer = remoteSession.createConsumer(new ActiveMQQueue("exclude.test.bar.bridge")); Thread.sleep(2000); includedProducer.send(test); assertNull(bridgeConsumer.receive(5000)); @@ -1302,13 +1287,8 @@ public class VirtualConsumerDemandTest { remoteAdvisoryBroker = (AdvisoryBroker) brokerService.getBroker().getAdaptor(AdvisoryBroker.class); - NetworkConnector connector = new DiscoveryNetworkConnector(new URI("static:(tcp://localhost:61616)")); - brokerService.addNetworkConnector(connector); - brokerService.addConnector("tcp://localhost:61617"); - - return brokerService; } @@ -1330,27 +1310,6 @@ public class VirtualConsumerDemandTest { return compositeQueue; } - protected void waitForConsumerCount(final DestinationStatistics destinationStatistics, final int count) throws Exception { - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - //should only be 1 for the composite destination creation - return count == destinationStatistics.getConsumers().getCount(); - } - }); - } - - protected void waitForDispatchFromLocalBroker(final DestinationStatistics destinationStatistics, final int count) throws Exception { - Wait.waitFor(new Wait.Condition() { - @Override - public boolean isSatisified() throws Exception { - return count == destinationStatistics.getDequeues().getCount() && - count == destinationStatistics.getDispatched().getCount() && - count == destinationStatistics.getForwards().getCount(); - } - }); - } - protected MessageConsumer getVirtualDestinationAdvisoryConsumer(String topic) throws JMSException { return remoteSession.createConsumer(AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic( new ActiveMQTopic(topic))); @@ -1361,12 +1320,6 @@ public class VirtualConsumerDemandTest { new ActiveMQQueue(queue))); } - protected void assertLocalBrokerStatistics(final DestinationStatistics localStatistics, final int count) { - assertEquals("local broker dest stat dispatched", count, localStatistics.getDispatched().getCount()); - assertEquals("local broker dest stat dequeues", count, localStatistics.getDequeues().getCount()); - assertEquals("local broker dest stat forwards", count, localStatistics.getForwards().getCount()); - } - protected void assertRemoteAdvisoryCount(final MessageConsumer advisoryConsumer, final int count) throws JMSException { int available = 0; ActiveMQMessage message = null;