activemq git commit: https://issues.apache.org/jira/browse/AMQ-6027
Repository: activemq Updated Branches: refs/heads/master 29d943429 -> 95f58fa7c https://issues.apache.org/jira/browse/AMQ-6027 Adding back in test case now that AMQ-5898 is resolved Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/95f58fa7 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/95f58fa7 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/95f58fa7 Branch: refs/heads/master Commit: 95f58fa7c4e26b5b2d73a80bd8e1cb2bee8ebf47 Parents: 29d9434 Author: Christopher L. Shannon (cshannon) Authored: Wed Nov 25 19:13:24 2015 + Committer: Christopher L. Shannon (cshannon) Committed: Wed Nov 25 19:13:24 2015 + -- .../network/VirtualConsumerDemandTest.java | 50 1 file changed, 50 insertions(+) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/95f58fa7/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java -- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java index ad22b07..bff069b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java @@ -275,6 +275,56 @@ public class VirtualConsumerDemandTest extends DynamicNetworkTestSupport { /** * Test that dynamic flow works for virtual destinations when a second composite + * topic is included that forwards to the same queue, but is excluded from + * being forwarded from the remote broker + * + * @throws Exception + */ +@Test(timeout = 60 * 1000) +public void testSecondNonIncludedCompositeTopicForwardSameQueue() throws Exception { +Assume.assumeTrue(isUseVirtualDestSubsOnCreation); + +doSetUp(true, null); + +MessageConsumer advisoryConsumer = getVirtualDestinationAdvisoryConsumer(testTopicName); + +//configure a composite topic that isn't included +CompositeTopic compositeTopic = createCompositeTopic("include.test.bar2", +new ActiveMQQueue("include.test.bar.bridge")); + +runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}, true); + +Thread.sleep(2000); + +//add one that is included +CompositeTopic compositeTopic2 = createCompositeTopic(testTopicName, +new ActiveMQQueue("include.test.bar.bridge")); + +runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic, compositeTopic2}, true); + +Thread.sleep(2000); +MessageProducer includedProducer = localSession.createProducer(included); +Message test = localSession.createTextMessage("test"); + +final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics(); +final DestinationStatistics remoteDestStatistics = remoteBroker.getDestination( +new ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics(); + +waitForConsumerCount(destinationStatistics, 1); + +includedProducer.send(test); + +waitForDispatchFromLocalBroker(destinationStatistics, 1); +assertLocalBrokerStatistics(destinationStatistics, 1); +assertEquals("remote dest messages", 1, remoteDestStatistics.getMessages().getCount()); + +assertRemoteAdvisoryCount(advisoryConsumer, 1); +assertAdvisoryBrokerCounts(2,2,2); + +} + +/** + * Test that dynamic flow works for virtual destinations when a second composite * topic is included, but is excluded from * being forwarded from the remote broker *
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6027
Repository: activemq Updated Branches: refs/heads/master 938aa626c -> 3ef6a9f76 https://issues.apache.org/jira/browse/AMQ-6027 Fixing unit test to pass assertion Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3ef6a9f7 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3ef6a9f7 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3ef6a9f7 Branch: refs/heads/master Commit: 3ef6a9f7693972c7001f82375b684998ae3f46b9 Parents: 938aa62 Author: Christopher L. Shannon (cshannon) Authored: Tue Nov 10 18:40:17 2015 + Committer: Christopher L. Shannon (cshannon) Committed: Tue Nov 10 18:42:03 2015 + -- .../network/VirtualConsumerDemandTest.java | 24 ++-- 1 file changed, 12 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/3ef6a9f7/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java -- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java index 3cb65a9..202a441 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java @@ -290,7 +290,7 @@ public class VirtualConsumerDemandTest { /** * Test that dynamic flow works for virtual destinations when a second composite - * topic is included that forwards to the same queue, but is excluded from + * topic is included, but is excluded from * being forwarded from the remote broker * * @throws Exception @@ -305,7 +305,7 @@ public class VirtualConsumerDemandTest { //configure a composite topic that isn't included CompositeTopic compositeTopic = createCompositeTopic("include.test.bar2", -new ActiveMQQueue("include.test.bar.bridge")); +new ActiveMQQueue("include.test.bar.bridge2")); runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}, true); @@ -318,7 +318,6 @@ public class VirtualConsumerDemandTest { runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic, compositeTopic2}, true); Thread.sleep(2000); - MessageProducer includedProducer = localSession.createProducer(included); Message test = localSession.createTextMessage("test"); @@ -335,7 +334,7 @@ public class VirtualConsumerDemandTest { assertEquals("remote dest messages", 1, remoteDestStatistics.getMessages().getCount()); assertRemoteAdvisoryCount(advisoryConsumer, 1); -assertAdvisoryBrokerCounts(2,2,2); +assertAdvisoryBrokerCounts(2,1,1); } @@ -471,7 +470,7 @@ public class VirtualConsumerDemandTest { MessageProducer includedProducer = localSession.createProducer(included); Message test = localSession.createTextMessage("test"); -Thread.sleep(2000); +Thread.sleep(1000); final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics(); final DestinationStatistics remoteDestStatistics = remoteBroker.getDestination( @@ -531,7 +530,7 @@ public class VirtualConsumerDemandTest { MessageProducer includedProducer = localSession.createProducer(included); Message test = localSession.createTextMessage("test"); -Thread.sleep(2000); +Thread.sleep(1000); final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics(); final DestinationStatistics remoteDestStatistics = remoteBroker.getDestination( @@ -594,7 +593,7 @@ public class VirtualConsumerDemandTest { MessageProducer includedProducer = localSession.createProducer(included); Message test = localSession.createTextMessage("test"); -Thread.sleep(2000); +Thread.sleep(1000); final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics(); @@ -632,7 +631,7 @@ public class VirtualConsumerDemandTest { MessageProducer includedProducer = localSession.createProducer(included); Message test = localSession.createTextMessage("test"); -Thread.sleep(2000); +Thread.sleep(1000); final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics(); @@ -686,7 +685,7 @@ public class VirtualConsumerDemandTest { MessageProducer includedProducer = l
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6027
Repository: activemq Updated Branches: refs/heads/master cc81680e1 -> 0c846cf8f https://issues.apache.org/jira/browse/AMQ-6027 Tweaking test case to apply virtual destination updates immediately Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0c846cf8 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0c846cf8 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0c846cf8 Branch: refs/heads/master Commit: 0c846cf8f6bcb999f98d499a717e922cbd04c6d2 Parents: cc81680 Author: Christopher L. Shannon (cshannon) Authored: Tue Nov 10 14:45:37 2015 + Committer: Christopher L. Shannon (cshannon) Committed: Tue Nov 10 14:45:37 2015 + -- .../network/VirtualConsumerDemandTest.java | 55 ++-- 1 file changed, 27 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/0c846cf8/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java -- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java index 3439ccb..3cb65a9 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java @@ -266,7 +266,7 @@ public class VirtualConsumerDemandTest { CompositeTopic compositeTopic = createCompositeTopic(testTopicName, new ActiveMQQueue("include.test.bar.bridge")); -runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}); +runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}, true); MessageProducer includedProducer = localSession.createProducer(included); Thread.sleep(2000); @@ -307,7 +307,7 @@ public class VirtualConsumerDemandTest { CompositeTopic compositeTopic = createCompositeTopic("include.test.bar2", new ActiveMQQueue("include.test.bar.bridge")); -runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}); +runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}, true); Thread.sleep(2000); @@ -315,10 +315,11 @@ public class VirtualConsumerDemandTest { CompositeTopic compositeTopic2 = createCompositeTopic(testTopicName, new ActiveMQQueue("include.test.bar.bridge")); -runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic, compositeTopic2}); +runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic, compositeTopic2}, true); -MessageProducer includedProducer = localSession.createProducer(included); Thread.sleep(2000); + +MessageProducer includedProducer = localSession.createProducer(included); Message test = localSession.createTextMessage("test"); final DestinationStatistics destinationStatistics = localBroker.getDestination(included).getDestinationStatistics(); @@ -357,7 +358,7 @@ public class VirtualConsumerDemandTest { CompositeTopic compositeTopic = createCompositeTopic(testTopicName, new ActiveMQQueue("include.test.bar.bridge")); -runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}); +runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}, true); MessageProducer includedProducer = localSession.createProducer(included); Thread.sleep(2000); @@ -399,7 +400,7 @@ public class VirtualConsumerDemandTest { new ActiveMQQueue("include.test.bar.bridge"), new ActiveMQQueue("include.test.bar.bridge2")); -runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}); +runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}, true); MessageProducer includedProducer = localSession.createProducer(included); Thread.sleep(2000); @@ -466,11 +467,11 @@ public class VirtualConsumerDemandTest { new ActiveMQQueue("include.test.bar.bridge"), new ActiveMQQueue("include.test.bar.bridge2")); -runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}); +runtimeBroker.setVirtualDestinations(new VirtualDestination[] {compositeTopic}, true); MessageProducer includedProducer = localSession.createProducer(included); -Thread.sleep(2000); Message test = localSession.createTextMessage("test"); +
[2/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6027
https://issues.apache.org/jira/browse/AMQ-6027 Adding support for consumers on virtual destinations to create network demand. This behavior is turned off by default but can be enabled. For example, if a consumer comes online for a queue that is part of a VirtualTopic, this will cause a network of brokers to forward messages because a demand subscription will be created. Same for if a consumer comes online for a forwarded destination from a composite destination. There is also an option to enable flow based on the existence of a virtual destination if the virtual destination is forwarding to a Queue. Full configuration instructions for this feature will be on the wiki page. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cc81680e Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cc81680e Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cc81680e Branch: refs/heads/master Commit: cc81680e10e5c7140ec3e28091df23e9d3c3233b Parents: 480b3e7 Author: Christopher L. Shannon (cshannon) Authored: Tue Oct 20 18:15:30 2015 + Committer: Christopher L. Shannon (cshannon) Committed: Mon Nov 9 20:07:43 2015 + -- .../activemq/advisory/AdvisoryBroker.java | 267 ...tinationFilterVirtualDestinationMatcher.java | 53 + .../advisory/VirtualDestinationMatcher.java | 29 + .../java/org/apache/activemq/broker/Broker.java |5 + .../apache/activemq/broker/BrokerFilter.java| 13 + .../apache/activemq/broker/BrokerService.java | 35 + .../org/apache/activemq/broker/EmptyBroker.java | 11 + .../org/apache/activemq/broker/ErrorBroker.java | 13 + .../activemq/broker/MutableBrokerFilter.java| 13 + .../region/virtual/CompositeDestination.java| 41 + .../broker/region/virtual/CompositeQueue.java |5 + .../broker/region/virtual/CompositeTopic.java |5 + .../broker/region/virtual/VirtualTopic.java | 49 + .../network/DemandForwardingBridgeSupport.java |3 +- .../network/NetworkBridgeConfiguration.java | 28 +- .../activemq/advisory/AdvisorySupport.java | 31 + .../activemq/command/NetworkBridgeFilter.java |8 +- .../plugin/UpdateVirtualDestinationsTask.java | 48 + activemq-unit-tests/pom.xml |4 + .../network/VirtualConsumerDemandTest.java | 1418 ++ 20 files changed, 2076 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java index 36f5f0b..bc5f105 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java @@ -18,6 +18,7 @@ package org.apache.activemq.advisory; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; @@ -40,6 +41,7 @@ import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.broker.region.TopicSubscription; +import org.apache.activemq.broker.region.virtual.VirtualDestination; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQTopic; @@ -55,6 +57,7 @@ import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.RemoveSubscriptionInfo; +import org.apache.activemq.command.SessionId; import org.apache.activemq.security.SecurityContext; import org.apache.activemq.state.ProducerState; import org.apache.activemq.usage.Usage; @@ -78,6 +81,22 @@ public class AdvisoryBroker extends BrokerFilter { private final ReentrantReadWriteLock consumersLock = new ReentrantReadWriteLock(); protected final Map consumers = new LinkedHashMap(); +/** + * This is a set to track all of the virtual destinations that have been added to the broker so + * they can be easily referenced later. + */ +protected final Set virtualDestinations = Collections.newSetFromMap(new ConcurrentHashMap()); +/** + * This is a map to track all consumers that exist on the virtual destination so that we can fire + * an advisory later when they go away to remove the demand. + */ +protec
[1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6027
Repository: activemq Updated Branches: refs/heads/master 480b3e7c3 -> cc81680e1 http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java -- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java new file mode 100644 index 000..3439ccb --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java @@ -0,0 +1,1418 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.lang.reflect.Field; +import java.net.URI; +import java.util.Arrays; +import java.util.Collection; +import java.util.Set; +import java.util.concurrent.ConcurrentMap; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.advisory.AdvisoryBroker; +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.broker.region.virtual.CompositeQueue; +import org.apache.activemq.broker.region.virtual.CompositeTopic; +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker; +import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; + +/** + * This test is to show that dynamicallyIncludedDestinations will work properly + * when a network of brokers is configured to treat Virtual Destinations (Virtual topic and composite destination) + * as demand. + */ +@RunWith(Parameterized.class) +public class VirtualConsumerDemandTest { + +protected static final int MESSAGE_COUNT = 10; +private static final Logger LOG = LoggerFactory.getLogger(VirtualConsumerDemandTest.class); + + +/** + * test params + */ +@Parameters +public static Collection data() { +return Arrays.asList(new Object[][] { +//not duplex, useVirtualDestSubsOnCreation +{false, true}, +//duplex +{true, false}, +{true, true}, +{false, false} +}); +} + +protected Connection localConnection; +protected Connection remoteConnection; +protected BrokerService localBroker; +protected BrokerService remoteBroker; +protected JavaRuntimeConfigurationBroker runtimeBroker; +protected Session localSession; +protected Session remoteSession; +protected ActiveMQTopic included; +protected ActiveMQTopic excluded; +protected String consumerName = "durableSubs"; +protected String testTopicName = "include.test.bar"; +protected String testQueueNam