activemq git commit: https://issues.apache.org/jira/browse/AMQ-6027

2015-11-25 Thread cshannon
Repository: activemq
Updated Branches:
  refs/heads/master 29d943429 -> 95f58fa7c


https://issues.apache.org/jira/browse/AMQ-6027

Adding back in test case now that AMQ-5898 is resolved


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/95f58fa7
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/95f58fa7
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/95f58fa7

Branch: refs/heads/master
Commit: 95f58fa7c4e26b5b2d73a80bd8e1cb2bee8ebf47
Parents: 29d9434
Author: Christopher L. Shannon (cshannon) 
Authored: Wed Nov 25 19:13:24 2015 +
Committer: Christopher L. Shannon (cshannon) 
Committed: Wed Nov 25 19:13:24 2015 +

--
 .../network/VirtualConsumerDemandTest.java  | 50 
 1 file changed, 50 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/95f58fa7/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
--
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
index ad22b07..bff069b 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
@@ -275,6 +275,56 @@ public class VirtualConsumerDemandTest extends 
DynamicNetworkTestSupport {
 
 /**
  * Test that dynamic flow works for virtual destinations when a second 
composite
+ * topic is included that forwards to the same queue, but is excluded from
+ * being forwarded from the remote broker
+ *
+ * @throws Exception
+ */
+@Test(timeout = 60 * 1000)
+public void testSecondNonIncludedCompositeTopicForwardSameQueue() throws 
Exception {
+Assume.assumeTrue(isUseVirtualDestSubsOnCreation);
+
+doSetUp(true, null);
+
+MessageConsumer advisoryConsumer = 
getVirtualDestinationAdvisoryConsumer(testTopicName);
+
+//configure a composite topic that isn't included
+CompositeTopic compositeTopic = 
createCompositeTopic("include.test.bar2",
+new ActiveMQQueue("include.test.bar.bridge"));
+
+runtimeBroker.setVirtualDestinations(new VirtualDestination[] 
{compositeTopic}, true);
+
+Thread.sleep(2000);
+
+//add one that is included
+CompositeTopic compositeTopic2 = createCompositeTopic(testTopicName,
+new ActiveMQQueue("include.test.bar.bridge"));
+
+runtimeBroker.setVirtualDestinations(new VirtualDestination[] 
{compositeTopic, compositeTopic2}, true);
+
+Thread.sleep(2000);
+MessageProducer includedProducer = 
localSession.createProducer(included);
+Message test = localSession.createTextMessage("test");
+
+final DestinationStatistics destinationStatistics = 
localBroker.getDestination(included).getDestinationStatistics();
+final DestinationStatistics remoteDestStatistics = 
remoteBroker.getDestination(
+new 
ActiveMQQueue("include.test.bar.bridge")).getDestinationStatistics();
+
+waitForConsumerCount(destinationStatistics, 1);
+
+includedProducer.send(test);
+
+waitForDispatchFromLocalBroker(destinationStatistics, 1);
+assertLocalBrokerStatistics(destinationStatistics, 1);
+assertEquals("remote dest messages", 1, 
remoteDestStatistics.getMessages().getCount());
+
+assertRemoteAdvisoryCount(advisoryConsumer, 1);
+assertAdvisoryBrokerCounts(2,2,2);
+
+}
+
+/**
+ * Test that dynamic flow works for virtual destinations when a second 
composite
  * topic is included, but is excluded from
  * being forwarded from the remote broker
  *



activemq git commit: https://issues.apache.org/jira/browse/AMQ-6027

2015-11-10 Thread cshannon
Repository: activemq
Updated Branches:
  refs/heads/master 938aa626c -> 3ef6a9f76


https://issues.apache.org/jira/browse/AMQ-6027

Fixing unit test to pass assertion


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/3ef6a9f7
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/3ef6a9f7
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/3ef6a9f7

Branch: refs/heads/master
Commit: 3ef6a9f7693972c7001f82375b684998ae3f46b9
Parents: 938aa62
Author: Christopher L. Shannon (cshannon) 
Authored: Tue Nov 10 18:40:17 2015 +
Committer: Christopher L. Shannon (cshannon) 
Committed: Tue Nov 10 18:42:03 2015 +

--
 .../network/VirtualConsumerDemandTest.java  | 24 ++--
 1 file changed, 12 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/3ef6a9f7/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
--
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
index 3cb65a9..202a441 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
@@ -290,7 +290,7 @@ public class VirtualConsumerDemandTest {
 
 /**
  * Test that dynamic flow works for virtual destinations when a second 
composite
- * topic is included that forwards to the same queue, but is excluded from
+ * topic is included, but is excluded from
  * being forwarded from the remote broker
  *
  * @throws Exception
@@ -305,7 +305,7 @@ public class VirtualConsumerDemandTest {
 
 //configure a composite topic that isn't included
 CompositeTopic compositeTopic = 
createCompositeTopic("include.test.bar2",
-new ActiveMQQueue("include.test.bar.bridge"));
+new ActiveMQQueue("include.test.bar.bridge2"));
 
 runtimeBroker.setVirtualDestinations(new VirtualDestination[] 
{compositeTopic}, true);
 
@@ -318,7 +318,6 @@ public class VirtualConsumerDemandTest {
 runtimeBroker.setVirtualDestinations(new VirtualDestination[] 
{compositeTopic, compositeTopic2}, true);
 
 Thread.sleep(2000);
-
 MessageProducer includedProducer = 
localSession.createProducer(included);
 Message test = localSession.createTextMessage("test");
 
@@ -335,7 +334,7 @@ public class VirtualConsumerDemandTest {
 assertEquals("remote dest messages", 1, 
remoteDestStatistics.getMessages().getCount());
 
 assertRemoteAdvisoryCount(advisoryConsumer, 1);
-assertAdvisoryBrokerCounts(2,2,2);
+assertAdvisoryBrokerCounts(2,1,1);
 
 }
 
@@ -471,7 +470,7 @@ public class VirtualConsumerDemandTest {
 
 MessageProducer includedProducer = 
localSession.createProducer(included);
 Message test = localSession.createTextMessage("test");
-Thread.sleep(2000);
+Thread.sleep(1000);
 
 final DestinationStatistics destinationStatistics = 
localBroker.getDestination(included).getDestinationStatistics();
 final DestinationStatistics remoteDestStatistics = 
remoteBroker.getDestination(
@@ -531,7 +530,7 @@ public class VirtualConsumerDemandTest {
 
 MessageProducer includedProducer = 
localSession.createProducer(included);
 Message test = localSession.createTextMessage("test");
-Thread.sleep(2000);
+Thread.sleep(1000);
 
 final DestinationStatistics destinationStatistics = 
localBroker.getDestination(included).getDestinationStatistics();
 final DestinationStatistics remoteDestStatistics = 
remoteBroker.getDestination(
@@ -594,7 +593,7 @@ public class VirtualConsumerDemandTest {
 
 MessageProducer includedProducer = 
localSession.createProducer(included);
 Message test = localSession.createTextMessage("test");
-Thread.sleep(2000);
+Thread.sleep(1000);
 
 final DestinationStatistics destinationStatistics = 
localBroker.getDestination(included).getDestinationStatistics();
 
@@ -632,7 +631,7 @@ public class VirtualConsumerDemandTest {
 
 MessageProducer includedProducer = 
localSession.createProducer(included);
 Message test = localSession.createTextMessage("test");
-Thread.sleep(2000);
+Thread.sleep(1000);
 
 final DestinationStatistics destinationStatistics = 
localBroker.getDestination(included).getDestinationStatistics();
 
@@ -686,7 +685,7 @@ public class VirtualConsumerDemandTest {
 
 MessageProducer includedProducer = 
l

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6027

2015-11-10 Thread cshannon
Repository: activemq
Updated Branches:
  refs/heads/master cc81680e1 -> 0c846cf8f


https://issues.apache.org/jira/browse/AMQ-6027

Tweaking test case to apply virtual destination updates immediately


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0c846cf8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0c846cf8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0c846cf8

Branch: refs/heads/master
Commit: 0c846cf8f6bcb999f98d499a717e922cbd04c6d2
Parents: cc81680
Author: Christopher L. Shannon (cshannon) 
Authored: Tue Nov 10 14:45:37 2015 +
Committer: Christopher L. Shannon (cshannon) 
Committed: Tue Nov 10 14:45:37 2015 +

--
 .../network/VirtualConsumerDemandTest.java  | 55 ++--
 1 file changed, 27 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/0c846cf8/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
--
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
index 3439ccb..3cb65a9 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
@@ -266,7 +266,7 @@ public class VirtualConsumerDemandTest {
 CompositeTopic compositeTopic = createCompositeTopic(testTopicName,
 new ActiveMQQueue("include.test.bar.bridge"));
 
-runtimeBroker.setVirtualDestinations(new VirtualDestination[] 
{compositeTopic});
+runtimeBroker.setVirtualDestinations(new VirtualDestination[] 
{compositeTopic}, true);
 
 MessageProducer includedProducer = 
localSession.createProducer(included);
 Thread.sleep(2000);
@@ -307,7 +307,7 @@ public class VirtualConsumerDemandTest {
 CompositeTopic compositeTopic = 
createCompositeTopic("include.test.bar2",
 new ActiveMQQueue("include.test.bar.bridge"));
 
-runtimeBroker.setVirtualDestinations(new VirtualDestination[] 
{compositeTopic});
+runtimeBroker.setVirtualDestinations(new VirtualDestination[] 
{compositeTopic}, true);
 
 Thread.sleep(2000);
 
@@ -315,10 +315,11 @@ public class VirtualConsumerDemandTest {
 CompositeTopic compositeTopic2 = createCompositeTopic(testTopicName,
 new ActiveMQQueue("include.test.bar.bridge"));
 
-runtimeBroker.setVirtualDestinations(new VirtualDestination[] 
{compositeTopic, compositeTopic2});
+runtimeBroker.setVirtualDestinations(new VirtualDestination[] 
{compositeTopic, compositeTopic2}, true);
 
-MessageProducer includedProducer = 
localSession.createProducer(included);
 Thread.sleep(2000);
+
+MessageProducer includedProducer = 
localSession.createProducer(included);
 Message test = localSession.createTextMessage("test");
 
 final DestinationStatistics destinationStatistics = 
localBroker.getDestination(included).getDestinationStatistics();
@@ -357,7 +358,7 @@ public class VirtualConsumerDemandTest {
 CompositeTopic compositeTopic = createCompositeTopic(testTopicName,
 new ActiveMQQueue("include.test.bar.bridge"));
 
-runtimeBroker.setVirtualDestinations(new VirtualDestination[] 
{compositeTopic});
+runtimeBroker.setVirtualDestinations(new VirtualDestination[] 
{compositeTopic}, true);
 
 MessageProducer includedProducer = 
localSession.createProducer(included);
 Thread.sleep(2000);
@@ -399,7 +400,7 @@ public class VirtualConsumerDemandTest {
 new ActiveMQQueue("include.test.bar.bridge"),
 new ActiveMQQueue("include.test.bar.bridge2"));
 
-runtimeBroker.setVirtualDestinations(new VirtualDestination[] 
{compositeTopic});
+runtimeBroker.setVirtualDestinations(new VirtualDestination[] 
{compositeTopic}, true);
 
 MessageProducer includedProducer = 
localSession.createProducer(included);
 Thread.sleep(2000);
@@ -466,11 +467,11 @@ public class VirtualConsumerDemandTest {
 new ActiveMQQueue("include.test.bar.bridge"),
 new ActiveMQQueue("include.test.bar.bridge2"));
 
-runtimeBroker.setVirtualDestinations(new VirtualDestination[] 
{compositeTopic});
+runtimeBroker.setVirtualDestinations(new VirtualDestination[] 
{compositeTopic}, true);
 
 MessageProducer includedProducer = 
localSession.createProducer(included);
-Thread.sleep(2000);
 Message test = localSession.createTextMessage("test");
+   

[2/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6027

2015-11-09 Thread cshannon
https://issues.apache.org/jira/browse/AMQ-6027

Adding support for consumers on virtual destinations to create network
demand. This behavior is turned off by default but can be enabled.

For example, if a consumer comes online for a queue that is part of a
VirtualTopic, this will cause a network of brokers to forward messages
because a demand subscription will be created. Same for if a consumer
comes online for a forwarded destination from a composite
destination.

There is also an option to enable flow based on the existence of a
virtual destination if the virtual destination is forwarding to a
Queue.

Full configuration instructions for this feature will be on the wiki page.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/cc81680e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/cc81680e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/cc81680e

Branch: refs/heads/master
Commit: cc81680e10e5c7140ec3e28091df23e9d3c3233b
Parents: 480b3e7
Author: Christopher L. Shannon (cshannon) 
Authored: Tue Oct 20 18:15:30 2015 +
Committer: Christopher L. Shannon (cshannon) 
Committed: Mon Nov 9 20:07:43 2015 +

--
 .../activemq/advisory/AdvisoryBroker.java   |  267 
 ...tinationFilterVirtualDestinationMatcher.java |   53 +
 .../advisory/VirtualDestinationMatcher.java |   29 +
 .../java/org/apache/activemq/broker/Broker.java |5 +
 .../apache/activemq/broker/BrokerFilter.java|   13 +
 .../apache/activemq/broker/BrokerService.java   |   35 +
 .../org/apache/activemq/broker/EmptyBroker.java |   11 +
 .../org/apache/activemq/broker/ErrorBroker.java |   13 +
 .../activemq/broker/MutableBrokerFilter.java|   13 +
 .../region/virtual/CompositeDestination.java|   41 +
 .../broker/region/virtual/CompositeQueue.java   |5 +
 .../broker/region/virtual/CompositeTopic.java   |5 +
 .../broker/region/virtual/VirtualTopic.java |   49 +
 .../network/DemandForwardingBridgeSupport.java  |3 +-
 .../network/NetworkBridgeConfiguration.java |   28 +-
 .../activemq/advisory/AdvisorySupport.java  |   31 +
 .../activemq/command/NetworkBridgeFilter.java   |8 +-
 .../plugin/UpdateVirtualDestinationsTask.java   |   48 +
 activemq-unit-tests/pom.xml |4 +
 .../network/VirtualConsumerDemandTest.java  | 1418 ++
 20 files changed, 2076 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
 
b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 36f5f0b..bc5f105 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -18,6 +18,7 @@ package org.apache.activemq.advisory;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
@@ -40,6 +41,7 @@ import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.broker.region.TopicSubscription;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.command.ActiveMQTopic;
@@ -55,6 +57,7 @@ import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.ProducerId;
 import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
+import org.apache.activemq.command.SessionId;
 import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.usage.Usage;
@@ -78,6 +81,22 @@ public class AdvisoryBroker extends BrokerFilter {
 private final ReentrantReadWriteLock consumersLock = new 
ReentrantReadWriteLock();
 protected final Map consumers = new 
LinkedHashMap();
 
+/**
+ * This is a set to track all of the virtual destinations that have been 
added to the broker so
+ * they can be easily referenced later.
+ */
+protected final Set virtualDestinations = 
Collections.newSetFromMap(new ConcurrentHashMap());
+/**
+ * This is a map to track all consumers that exist on the virtual 
destination so that we can fire
+ * an advisory later when they go away to remove the demand.
+ */
+protec

[1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-6027

2015-11-09 Thread cshannon
Repository: activemq
Updated Branches:
  refs/heads/master 480b3e7c3 -> cc81680e1


http://git-wip-us.apache.org/repos/asf/activemq/blob/cc81680e/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
--
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
new file mode 100644
index 000..3439ccb
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/VirtualConsumerDemandTest.java
@@ -0,0 +1,1418 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisoryBroker;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.DestinationStatistics;
+import org.apache.activemq.broker.region.virtual.CompositeQueue;
+import org.apache.activemq.broker.region.virtual.CompositeTopic;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker;
+import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * This test is to show that dynamicallyIncludedDestinations will work properly
+ * when a network of brokers is configured to treat Virtual Destinations 
(Virtual topic and composite destination)
+ * as demand.
+ */
+@RunWith(Parameterized.class)
+public class VirtualConsumerDemandTest {
+
+protected static final int MESSAGE_COUNT = 10;
+private static final Logger LOG = 
LoggerFactory.getLogger(VirtualConsumerDemandTest.class);
+
+
+/**
+ * test params
+ */
+@Parameters
+public static Collection data() {
+return Arrays.asList(new Object[][] {
+//not duplex, useVirtualDestSubsOnCreation
+{false, true},
+//duplex
+{true, false},
+{true, true},
+{false, false}
+});
+}
+
+protected Connection localConnection;
+protected Connection remoteConnection;
+protected BrokerService localBroker;
+protected BrokerService remoteBroker;
+protected JavaRuntimeConfigurationBroker runtimeBroker;
+protected Session localSession;
+protected Session remoteSession;
+protected ActiveMQTopic included;
+protected ActiveMQTopic excluded;
+protected String consumerName = "durableSubs";
+protected String testTopicName = "include.test.bar";
+protected String testQueueNam