This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.17.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.17.x by this push:
     new 1707cc194b AMQ-9262 - Fix network subscriptions for composite 
consumers (#1014)
1707cc194b is described below

commit 1707cc194b1c74808c67ff54db8b909f8ef4b990
Author: Christopher L. Shannon <christopher.l.shan...@gmail.com>
AuthorDate: Wed Jun 7 07:18:18 2023 -0400

    AMQ-9262 - Fix network subscriptions for composite consumers (#1014)
    
    This fixes network subscriptions that are generated on demand when a
    consumer uses composite destinations. Before this fix conduit
    subscriptions didn't work correctly. This fix now splits up the
    composite dest and generates correct demand for each of the individual
    destinations.
    
    (cherry picked from commit 901956d4ddb6a0ea9fe5fedf39732117ab68f087)
---
 .../network/DemandForwardingBridgeSupport.java     |  99 ++++-
 .../CompositeConsumerNetworkBridgeTest.java        | 435 +++++++++++++++++++++
 .../network/DurableSyncNetworkBridgeTest.java      |  15 +-
 .../network/DynamicNetworkTestSupport.java         |  61 ++-
 .../network/ForceDurableNetworkBridgeTest.java     |   9 +-
 5 files changed, 567 insertions(+), 52 deletions(-)

diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 28d136fe84..57afc85d11 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -24,6 +24,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -136,7 +137,9 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
     protected ActiveMQDestination[] durableDestinations;
     protected final ConcurrentMap<ConsumerId, DemandSubscription> 
subscriptionMapByLocalId = new ConcurrentHashMap<>();
     protected final ConcurrentMap<ConsumerId, DemandSubscription> 
subscriptionMapByRemoteId = new ConcurrentHashMap<>();
-    protected final Set<ConsumerId> forcedDurableRemoteId = 
Collections.newSetFromMap(new ConcurrentHashMap<ConsumerId, Boolean>());
+    protected final Set<ConsumerId> forcedDurableRemoteId = 
Collections.newSetFromMap(new ConcurrentHashMap<>());
+    protected final ConcurrentMap<ConsumerId, Set<ConsumerId>> 
compositeConsumerIds = new ConcurrentHashMap<>();
+    protected final ConcurrentMap<SubscriptionInfo, Set<SubscriptionInfo>> 
compositeSubscriptions = new ConcurrentHashMap<>();
     protected final BrokerId localBrokerPath[] = new BrokerId[]{null};
     protected final CountDownLatch startedLatch = new CountDownLatch(2);
     protected final CountDownLatch localStartedLatch = new CountDownLatch(1);
@@ -1015,6 +1018,18 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
 
         } else if (data.getClass() == RemoveInfo.class) {
             ConsumerId id = (ConsumerId) ((RemoveInfo) data).getObjectId();
+
+            // If we have an entry in compositeConsumerIds then this consumer 
was a
+            // composite consumer and we need to remove the entries in the set 
and
+            // not the consumer id we received here
+            final Set<ConsumerId> compositeIds = 
compositeConsumerIds.remove(id);
+            if (compositeIds != null) {
+                for (ConsumerId compositeId : compositeIds) {
+                    serviceRemoteConsumerAdvisory(new RemoveInfo(compositeId));
+                }
+                return;
+            }
+
             removeDemandSubscription(id);
 
             if (forcedDurableRemoteId.remove(id)) {
@@ -1030,6 +1045,23 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
         } else if (data.getClass() == RemoveSubscriptionInfo.class) {
             final RemoveSubscriptionInfo info = ((RemoveSubscriptionInfo) 
data);
             final SubscriptionInfo subscriptionInfo = new 
SubscriptionInfo(info.getClientId(), info.getSubscriptionName());
+
+            // If we have an entry in compositeSubscriptions then this 
consumer was a
+            // composite consumer and we need to remove the entries in the set 
and not
+            // the subscription that we received here
+            final Set<SubscriptionInfo> compositeSubs =
+                this.compositeSubscriptions.remove(subscriptionInfo);
+            if (compositeSubs != null) {
+                for (SubscriptionInfo compositeSub : compositeSubs) {
+                    RemoveSubscriptionInfo remove = new 
RemoveSubscriptionInfo();
+                    remove.setClientId(compositeSub.getClientId());
+                    
remove.setSubscriptionName(compositeSub.getSubscriptionName());
+                    
remove.setConnectionId(this.localConnectionInfo.getConnectionId());
+                    serviceRemoteConsumerAdvisory(remove);
+                }
+                return;
+            }
+
             final boolean proxyBridgeSub = 
isProxyBridgeSubscription(subscriptionInfo.getClientId(),
                     subscriptionInfo.getSubscriptionName());
             for (Iterator<DemandSubscription> i = 
subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
@@ -1415,6 +1447,12 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
     }
 
     protected void addConsumerInfo(final ConsumerInfo consumerInfo) throws 
IOException {
+        // Check if this was processed and split into new consumers for 
composite dests
+        if (splitCompositeConsumer(consumerInfo)) {
+            // If true we don't want to continue processing the original 
consumer info
+            return;
+        }
+
         ConsumerInfo info = consumerInfo.copy();
         addRemoteBrokerToBrokerPath(info);
         DemandSubscription sub = createDemandSubscription(info);
@@ -1443,6 +1481,65 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
         }
     }
 
+    // Generate new consumers for each destination that part of a composite 
destination list for a consumer
+    private boolean splitCompositeConsumer(final ConsumerInfo consumerInfo) 
throws IOException {
+        // If not a composite destination or if an advisory topic then return 
false
+        // So we process normally and don't split
+        if (!consumerInfo.getDestination().isComposite() ||
+            AdvisorySupport.isAdvisoryTopic(consumerInfo.getDestination())) {
+            return false;
+        }
+
+        // At this point this is a composite destination and not an advisory 
topic. The destination
+        // will be split into individual destinations to create demand so that 
conduit subscriptions
+        // and durable subscriptions work correctly
+
+        // Handle duplicates, don't need to create again if we already have an 
entry
+        // Just return true so we stop processing
+        if (!isDuplicateSuppressionOff(consumerInfo) && 
compositeConsumerIds.containsKey(
+            consumerInfo.getConsumerId())) {
+            return true;
+        }
+
+        // Get a set to store mapped consumer Ids for each individual 
destination in the composite list
+        // and (if applicable) a set for subscriptions for durables
+        final Set<ConsumerId> consumerIds = 
compositeConsumerIds.computeIfAbsent(
+            consumerInfo.getConsumerId(),
+            k -> Collections.newSetFromMap(new ConcurrentHashMap<>()));
+        final Set<SubscriptionInfo> subscriptions = Optional.ofNullable(
+            consumerInfo.getSubscriptionName()).map(
+            subName -> compositeSubscriptions.computeIfAbsent(
+                new SubscriptionInfo(consumerInfo.getClientId(),
+                    consumerInfo.getSubscriptionName()),
+                k -> Collections.newSetFromMap(new 
ConcurrentHashMap<>()))).orElse(null);
+
+        // Split and go through each destination that is part of the composite 
list and process
+        for (ActiveMQDestination individualDest : consumerInfo.getDestination()
+            .getCompositeDestinations()) {
+            // Create a new consumer info with the individual destinations and
+            // generate new consumer Ids for each and add to the consumerIds 
set
+            final ConsumerInfo info = consumerInfo.copy();
+            info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
+                consumerIdGenerator.getNextSequenceId()));
+            info.setDestination(individualDest);
+            consumerIds.add(info.getConsumerId());
+
+            // If there is a subscription name (durable) then generate a new 
one for the dest
+            // and add to the subscriptions set
+            Optional.ofNullable(subscriptions).ifPresent(
+                subs -> {
+                    info.setSubscriptionName(
+                        consumerInfo.getSubscriptionName() + 
individualDest.getPhysicalName());
+                    subs.add(
+                        new SubscriptionInfo(info.getClientId(), 
info.getSubscriptionName()));
+                });
+
+            // Continue on and process the new consumer Info
+            addConsumerInfo(info);
+        }
+        return true;
+    }
+
     private void undoMapRegistration(DemandSubscription sub) {
         subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
         subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId());
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/CompositeConsumerNetworkBridgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CompositeConsumerNetworkBridgeTest.java
new file mode 100644
index 0000000000..cfdc2fd32d
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CompositeConsumerNetworkBridgeTest.java
@@ -0,0 +1,435 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test class to verify composite consumers correctly create demand
+ * with a network of brokers, especially conduit subs
+ * See AMQ-9262
+ */
+@RunWith(Parameterized.class)
+public class CompositeConsumerNetworkBridgeTest extends 
DynamicNetworkTestSupport {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(CompositeConsumerNetworkBridgeTest.class);
+
+    private final static String testTopic1 = "test.composite.topic.1";
+    private final static String testTopic2 = "test.composite.topic.2";
+    private final static String testQueue1 = "test.composite.queue.1";
+    private final static String testQueue2 = "test.composite.queue.2";
+    private BrokerService broker1;
+    private BrokerService broker2;
+    private Session session1;
+    private Session session2;
+    private final FLOW flow;
+    private final static List<ActiveMQTopic> topics = List.of(
+        new ActiveMQTopic(testTopic1), new ActiveMQTopic(testTopic2));
+    private final static List<ActiveMQQueue> queues = List.of(
+        new ActiveMQQueue(testQueue1), new ActiveMQQueue(testQueue2));
+
+    @Parameters
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {FLOW.FORWARD},
+                {FLOW.REVERSE}
+        });
+    }
+
+    public CompositeConsumerNetworkBridgeTest(final FLOW flow) {
+        this.flow = flow;
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        doTearDown();
+    }
+
+    /**
+     * Test a composite durable subscription
+     */
+    @Test
+    public void testCompositeDurableSubscriber() throws Exception {
+        setUp();
+        final ActiveMQTopic compositeTopic = new ActiveMQTopic(testTopic1 + 
"," + testTopic2);
+
+        // Create durable sub on composite destination
+        // Will create a composite consumer on the local broker but
+        // should create 2 consumers on the remote
+        TopicSubscriber durSub = 
session1.createDurableSubscriber(compositeTopic, subName);
+        assertConsumersCount(broker1, compositeTopic, 1);
+
+        // The remote broker should create two durable subs instead of 1
+        // Should be 1 durable on each of the topics that are part of the 
composite
+        assertConsumersCount(broker2, compositeTopic, 0);
+        assertNCDurableSubsCount(broker2, compositeTopic, 0);
+        for (ActiveMQTopic topic : topics) {
+            assertConsumersCount(broker2, topic, 1);
+            assertNCDurableSubsCount(broker2, topic, 1);
+        }
+        assertCompositeMapCounts(1, 1);
+
+        durSub.close();
+        Thread.sleep(1000);
+        removeSubscription(broker1, subName);
+
+        //Verify cleanup
+        for (ActiveMQTopic topic : topics) {
+            assertConsumersCount(broker2, topic, 0);
+            assertNCDurableSubsCount(broker2, topic, 0);
+        }
+        assertCompositeMapCounts(0, 0);
+    }
+
+    /**
+     * Test a composite durable subscription and normal subscription
+     */
+    @Test
+    public void testCompositeAndNormalDurableSub() throws Exception {
+        setUp();
+        final ActiveMQTopic compositeTopic = new ActiveMQTopic(testTopic1 + 
"," + testTopic2);
+
+        // create composite sub and a sub on one of the individual topics
+        TopicSubscriber durSub1 = 
session1.createDurableSubscriber(compositeTopic, subName + "1");
+        TopicSubscriber durSub2 = 
session1.createDurableSubscriber(topics.get(0), subName + "2");
+
+        // Should split the composite and create network subs on individual 
topics
+        for (ActiveMQTopic topic : topics) {
+            assertNCDurableSubsCount(broker2, topic, 1);
+        }
+        assertNCDurableSubsCount(broker2, compositeTopic, 0);
+        // Only 1 sub is composite so should just have 1 map entry
+        assertCompositeMapCounts(1, 1);
+
+        // Verify message received
+        MessageProducer producer = session2.createProducer(topics.get(0));
+        producer.send(session2.createTextMessage("test"));
+        assertNotNull(durSub1.receive(1000));
+        assertNotNull(durSub2.receive(1000));
+
+        durSub1.close();
+        durSub2.close();;
+
+        Thread.sleep(1000);
+        removeSubscription(broker1, subName + "1");
+        removeSubscription(broker1, subName + "2");
+        assertCompositeMapCounts(0, 0);
+    }
+
+
+    /**
+     * Test two topic subscriptions that match
+     */
+    @Test
+    public void testTopicCompositeSubs() throws Exception {
+        setUp();
+        final ActiveMQTopic compositeTopic = new ActiveMQTopic(testTopic1 + 
"," + testTopic2);
+
+        // Create two identical subscriptions on a composite topic
+        MessageConsumer sub1 = session1.createConsumer(compositeTopic);
+        MessageConsumer sub2 = session1.createConsumer(compositeTopic);
+        for (ActiveMQTopic topic : topics) {
+            // Verify the local broker has two subs on each individual topic
+            assertConsumersCount(broker1, topic, 2);
+            // Verify that conduit subscription works correctly now
+            // and only 1 sub on each topic. This used to be broken before 
AMQ-9262
+            // and would create two subscriptions even though conduit was true
+            assertConsumersCount(broker2, topic, 1);
+        }
+        assertCompositeMapCounts(2, 0);
+
+        MessageProducer producer = session2.createProducer(topics.get(0));
+        producer.send(session2.createTextMessage("test"));
+
+        assertNotNull(sub1.receive(1000));
+        assertNotNull(sub2.receive(1000));
+
+        sub1.close();
+        sub2.close();
+
+        assertCompositeMapCounts(0, 0);
+    }
+
+    /**
+     * Test two queue composite subscriptions that match
+     */
+    @Test
+    public void testCompositeQueueSubs() throws Exception {
+        setUp();
+        final ActiveMQQueue compositeQueue = new ActiveMQQueue(testQueue1 + 
"," + testQueue2);
+
+        // Create two matching composite queue subs to test conduit subs
+        MessageConsumer sub1 = session1.createConsumer(compositeQueue);
+        MessageConsumer sub2 = session1.createConsumer(compositeQueue);
+        for (ActiveMQDestination queue : queues) {
+            assertConsumersCount(broker1, queue, 2);
+            // Verify conduit subs now work correctly, this used to be 2
+            // which was wrong as conduit is true and is fixed as of AMQ-9262
+            assertConsumersCount(broker2, queue, 1);
+        }
+        assertCompositeMapCounts(2, 0);
+
+        MessageProducer producer = session2.createProducer(queues.get(0));
+        producer.send(session2.createTextMessage("test"));
+
+        // Make sure one of the queue receivers gets the message
+        assertTrue(sub1.receive(1000) != null
+            || sub2.receive(1000) != null);
+
+        sub1.close();
+        sub2.close();
+        assertCompositeMapCounts(0, 0);
+    }
+
+    /**
+     * Test a composite queue and normal queue sub
+     */
+    @Test
+    public void testCompositeAndNormalQueueSubs() throws Exception {
+        setUp();
+        final ActiveMQQueue compositeQueue = new ActiveMQQueue(testQueue1 + 
"," + testQueue2);
+
+        // Create two matching composite queue subs to test conduit subs
+        MessageConsumer sub1 = session1.createConsumer(compositeQueue);
+        MessageConsumer sub2 = session1.createConsumer(new 
ActiveMQQueue(testQueue2));
+
+        assertConsumersCount(broker1, queues.get(0), 1);
+        assertConsumersCount(broker1, queues.get(1), 2);
+        for (ActiveMQDestination queue : queues) {
+            assertConsumersCount(broker2, queue, 1);
+        }
+        // Only 1 sub is a composite sub
+        assertCompositeMapCounts(1, 0);
+
+        MessageProducer producer = session2.createProducer(queues.get(0));
+        producer.send(session2.createTextMessage("test"));
+
+        // Make sure message received by sub1
+        assertNotNull(sub1.receive(1000));
+
+        sub1.close();
+        sub2.close();
+        assertCompositeMapCounts(0, 0);
+    }
+
+    /**
+     * Test two matching durable composite subs
+     *
+     * This test used to fail with an exception as the bridge would
+     * try and create a duplicate network durable with the same client id
+     * and sub and would error
+     */
+    @Test
+    public void testCompositeTwoDurableSubscribers() throws Exception {
+        setUp();
+        final ActiveMQTopic compositeTopic = new ActiveMQTopic(testTopic1 + 
"," + testTopic2);
+
+        TopicSubscriber durSub1 = 
session1.createDurableSubscriber(compositeTopic, subName + "1");
+        TopicSubscriber durSub2 = 
session1.createDurableSubscriber(compositeTopic, subName + "2");
+        assertConsumersCount(broker1, compositeTopic, 2);
+
+        assertConsumersCount(broker2, compositeTopic, 0);
+        assertNCDurableSubsCount(broker2, compositeTopic, 0);
+        for (ActiveMQTopic topic : topics) {
+            assertConsumersCount(broker2, topic, 1);
+            assertNCDurableSubsCount(broker2, topic, 1);
+        }
+        assertCompositeMapCounts(2, 2);
+
+        durSub1.close();
+        Thread.sleep(1000);
+        removeSubscription(broker1, subName + "1");
+
+        for (ActiveMQTopic topic : topics) {
+            assertConsumersCount(broker2, topic, 1);
+            assertNCDurableSubsCount(broker2, topic, 1);
+        }
+
+        durSub2.close();
+        Thread.sleep(1000);
+        removeSubscription(broker1, subName + "2");
+
+        for (ActiveMQTopic topic : topics) {
+            assertConsumersCount(broker2, topic, 0);
+            assertNCDurableSubsCount(broker2, topic, 0);
+        }
+
+        assertCompositeMapCounts(0, 0);
+    }
+
+
+    private void setUp() throws Exception {
+        doSetUp(tempFolder.newFolder(), tempFolder.newFolder());
+    }
+
+    protected void doSetUp(File localDataDir, File remoteDataDir) throws 
Exception {
+        doSetUpRemoteBroker(remoteDataDir);
+        doSetUpLocalBroker(localDataDir);
+        //Give time for advisories to propagate
+        Thread.sleep(1000);
+    }
+
+    protected void doSetUpLocalBroker(File dataDir) throws Exception {
+        localBroker = createLocalBroker(dataDir);
+        localBroker.setDeleteAllMessagesOnStartup(true);
+        localBroker.start();
+        localBroker.waitUntilStarted();
+        URI localURI = localBroker.getVmConnectorURI();
+        ActiveMQConnectionFactory fac = new 
ActiveMQConnectionFactory(localURI);
+        fac.setAlwaysSyncSend(true);
+        fac.setDispatchAsync(false);
+        localConnection = fac.createConnection();
+        localConnection.setClientID("clientId");
+        localConnection.start();
+
+        Wait.waitFor(() -> 
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1, 10000, 
500);
+        localSession = localConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        if (flow.equals(FLOW.FORWARD)) {
+            broker1 = localBroker;
+            session1 = localSession;
+        } else {
+            broker2 = localBroker;
+            session2 = localSession;
+        }
+    }
+
+    protected void doSetUpRemoteBroker(File dataDir) throws Exception {
+        remoteBroker = createRemoteBroker(dataDir);
+        remoteBroker.setDeleteAllMessagesOnStartup(true);
+        remoteBroker.start();
+        remoteBroker.waitUntilStarted();
+        URI remoteURI = remoteBroker.getVmConnectorURI();
+        ActiveMQConnectionFactory fac = new 
ActiveMQConnectionFactory(remoteURI);
+        remoteConnection = fac.createConnection();
+        remoteConnection.setClientID("clientId");
+        remoteConnection.start();
+        remoteSession = remoteConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        if (flow.equals(FLOW.FORWARD)) {
+            broker2 = remoteBroker;
+            session2 = remoteSession;
+        } else {
+            broker1 = remoteBroker;
+            session1 = remoteSession;
+        }
+    }
+
+    protected BrokerService createLocalBroker(File dataDir) throws Exception {
+
+        BrokerService brokerService = new BrokerService();
+        brokerService.setMonitorConnectionSplits(true);
+        brokerService.setDataDirectoryFile(dataDir);
+        brokerService.setBrokerName("localBroker");
+        brokerService.addNetworkConnector(configureLocalNetworkConnector());
+        brokerService.addConnector("tcp://localhost:0");
+        brokerService.setDestinations(new ActiveMQDestination[] {
+                new ActiveMQTopic(testTopic1),
+                new ActiveMQTopic(testTopic2),
+                new ActiveMQQueue(testQueue1),
+                new ActiveMQQueue(testQueue2)});
+
+        return brokerService;
+    }
+
+    protected NetworkConnector configureLocalNetworkConnector() throws 
Exception {
+
+        List<TransportConnector> transportConnectors = 
remoteBroker.getTransportConnectors();
+        URI remoteURI = transportConnectors.get(0).getConnectUri();
+        String uri = "static:(" + remoteURI + ")";
+        NetworkConnector connector = new DiscoveryNetworkConnector(new 
URI(uri));
+        connector.setName("networkConnector");
+        connector.setDynamicOnly(false);
+        connector.setDecreaseNetworkConsumerPriority(false);
+        connector.setConduitSubscriptions(true);
+        connector.setDuplex(true);
+        connector.setStaticBridge(false);
+        ArrayList<ActiveMQDestination> dynamicIncludedDestinations = new 
ArrayList<>();
+        dynamicIncludedDestinations.addAll(List.of(new 
ActiveMQTopic("test.composite.topic.>"),
+            new ActiveMQQueue("test.composite.queue.>")));
+        
connector.setDynamicallyIncludedDestinations(dynamicIncludedDestinations);
+        return connector;
+    }
+
+
+    protected BrokerService createRemoteBroker(File dataDir) throws Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.setBrokerName("remoteBroker");
+        brokerService.setUseJmx(false);
+        brokerService.setDataDirectoryFile(dataDir);
+        brokerService.addConnector("tcp://localhost:0");
+        brokerService.setDestinations(new ActiveMQDestination[] {
+                new ActiveMQTopic(testTopic1),
+                new ActiveMQTopic(testTopic2),
+                new ActiveMQQueue(testQueue1),
+                new ActiveMQQueue(testQueue2)});
+
+        return brokerService;
+    }
+
+    protected void assertCompositeMapCounts(int compositeConsumerIdsSize, int 
compositeSubSize)
+        throws Exception {
+        DurableConduitBridge bridge = findBridge();
+        assertTrue( Wait.waitFor(() -> compositeConsumerIdsSize == 
bridge.compositeConsumerIds.size(), 5000, 500));
+        assertTrue( Wait.waitFor(() -> compositeSubSize == 
bridge.compositeSubscriptions.size(), 5000, 500));
+    }
+
+    protected DurableConduitBridge findBridge() throws Exception {
+        if (flow.equals(FLOW.FORWARD)) {
+            return findBridge(remoteBroker);
+        } else {
+            return findBridge(localBroker);
+        }
+    }
+
+    protected DurableConduitBridge findBridge(BrokerService broker) throws 
Exception {
+        final NetworkBridge bridge;
+        if (broker.getNetworkConnectors().size() > 0) {
+            assertTrue(Wait.waitFor(() -> 
broker.getNetworkConnectors().get(0).activeBridges().size() == 1, 5000, 500));
+            bridge = 
broker.getNetworkConnectors().get(0).activeBridges().iterator().next();
+        } else {
+            bridge = 
findDuplexBridge(broker.getTransportConnectorByScheme("tcp"));
+        }
+        return (DurableConduitBridge)bridge;
+    }
+}
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
index 7e56bb2112..aa36e26fba 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
@@ -75,7 +75,6 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
     private boolean forceDurable = false;
     private boolean useVirtualDestSubs = false;
     private byte remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION;
-    public static enum FLOW {FORWARD, REVERSE}
 
     private BrokerService broker1;
     private BrokerService broker2;
@@ -139,7 +138,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         assertSubscriptionsCount(broker1, topic, 1);
         assertNCDurableSubsCount(broker2, topic, 1);
 
-        removeSubscription(broker1, topic, subName);
+        removeSubscription(broker1, subName);
 
         assertSubscriptionsCount(broker1, topic, 0);
         assertNCDurableSubsCount(broker2, topic, 0);
@@ -161,7 +160,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         assertSubscriptionsCount(broker1, topic, 1);
         assertNCDurableSubsCount(broker2, topic, 1);
 
-        removeSubscription(broker1, topic, subName);
+        removeSubscription(broker1, subName);
 
         assertSubscriptionsCount(broker1, topic, 0);
         assertNCDurableSubsCount(broker2, topic, 0);
@@ -188,7 +187,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         }
 
         assertSubscriptionsCount(broker1, topic, 1);
-        removeSubscription(broker1, topic, subName);
+        removeSubscription(broker1, subName);
         assertSubscriptionsCount(broker1, topic, 0);
         doTearDown();
 
@@ -217,7 +216,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         includedTopics = "different.topic";
         restartBroker(broker1, false);
         assertSubscriptionsCount(broker1, topic, 1);
-        removeSubscription(broker1, topic, subName);
+        removeSubscription(broker1, subName);
         assertSubscriptionsCount(broker1, topic, 0);
 
         //Test that on successful reconnection of the bridge that
@@ -310,7 +309,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
 
         assertSubscriptionsCount(broker1, topic, 1);
         session1.createDurableSubscriber(topic2, "sub2");
-        removeSubscription(broker1, topic, subName);
+        removeSubscription(broker1, subName);
         assertSubscriptionsCount(broker1, topic, 0);
         assertSubscriptionsCount(broker1, topic2, 1);
 
@@ -376,7 +375,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         //with bridge off, remove 100 subs
         for (int i = 0; i < 10; i++) {
             for (int j = 0; j < 10; j++) {
-                removeSubscription(broker1, new ActiveMQTopic("include.test." 
+ i), subName + i + j);
+                removeSubscription(broker1, subName + i + j);
             }
         }
 
@@ -481,7 +480,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         restartBroker(broker1, false);
 
         assertSubscriptionsCount(broker1, topic, 1);
-        removeSubscription(broker1, topic, subName);
+        removeSubscription(broker1, subName);
         session1.createDurableSubscriber(topic, "sub2").close();
         assertSubscriptionsCount(broker1, topic, 1);
 
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
index aade6d36dd..2d83fb71b7 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
@@ -40,6 +40,7 @@ import 
org.apache.activemq.broker.region.DestinationStatistics;
 import org.apache.activemq.broker.region.DurableTopicSubscription;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.Topic;
+import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.util.SubscriptionKey;
@@ -50,6 +51,7 @@ import org.junit.rules.TemporaryFolder;
 
 
 public abstract class DynamicNetworkTestSupport {
+    public enum FLOW {FORWARD, REVERSE};
 
     protected Connection localConnection;
     protected Connection remoteConnection;
@@ -92,14 +94,10 @@ public abstract class DynamicNetworkTestSupport {
         }
     }
 
-
     protected void assertBridgeStarted() throws Exception {
-        assertTrue(Wait.waitFor(new Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return 
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1;
-            }
-        }, 10000, 500));
+        assertTrue(Wait.waitFor(
+            () -> 
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1,
+            10000, 500));
     }
 
     protected RemoveSubscriptionInfo getRemoveSubscriptionInfo(final 
ConnectionContext context,
@@ -113,24 +111,16 @@ public abstract class DynamicNetworkTestSupport {
     }
 
     protected void waitForConsumerCount(final DestinationStatistics 
destinationStatistics, final int count) throws Exception {
-        assertTrue(Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                //should only be 1 for the composite destination creation
-                return count == 
destinationStatistics.getConsumers().getCount();
-            }
+        assertTrue(Wait.waitFor(() -> {
+            //should only be 1 for the composite destination creation
+            return count == destinationStatistics.getConsumers().getCount();
         }));
     }
 
     protected void waitForDispatchFromLocalBroker(final DestinationStatistics 
destinationStatistics, final int count) throws Exception {
-        assertTrue(Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return count == destinationStatistics.getDequeues().getCount() 
&&
-                       count == 
destinationStatistics.getDispatched().getCount() &&
-                       count == destinationStatistics.getForwards().getCount();
-            }
-        }));
+        assertTrue(Wait.waitFor(() -> count == 
destinationStatistics.getDequeues().getCount() &&
+               count == destinationStatistics.getDispatched().getCount() &&
+               count == destinationStatistics.getForwards().getCount()));
     }
 
     protected void assertLocalBrokerStatistics(final DestinationStatistics 
localStatistics, final int count) {
@@ -145,27 +135,22 @@ public abstract class DynamicNetworkTestSupport {
 
     protected void assertNCDurableSubsCount(final BrokerService brokerService,
             final ActiveMQTopic dest, final int count) throws Exception {
-        assertTrue(Wait.waitFor(new Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return count == getNCDurableSubs(brokerService, dest).size();
-            }
-        }, 10000, 500));
+        assertTrue(Wait.waitFor(() -> count == getNCDurableSubs(brokerService, 
dest).size(),
+            10000, 500));
     }
 
     protected void assertConsumersCount(final BrokerService brokerService,
-            final ActiveMQTopic dest, final int count) throws Exception {
-        assertTrue(Wait.waitFor(new Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return count == getConsumers(brokerService, dest).size();
-            }
-        }, 10000, 500));
+            final ActiveMQDestination dest, final int count) throws Exception {
+        assertTrue(Wait.waitFor(() -> count == getConsumers(brokerService, 
dest).size(),
+            10000, 500));
+        Thread.sleep(1000);
+        // Check one more time after a short pause to make sure the count 
didn't increase past what we wanted
+        assertEquals(count, getConsumers(brokerService, dest).size());
     }
 
     protected List<Subscription> getConsumers(final BrokerService 
brokerService,
-            final ActiveMQTopic dest) throws Exception {
-        Topic destination = (Topic) brokerService.getDestination(dest);
+            final ActiveMQDestination dest) throws Exception {
+        Destination destination = brokerService.getDestination(dest);
         return destination.getConsumers();
     }
 
@@ -208,8 +193,8 @@ public abstract class DynamicNetworkTestSupport {
         return subs;
     }
 
-    protected void removeSubscription(final BrokerService brokerService, final 
ActiveMQTopic topic,
-            final String subName) throws Exception {
+    protected void removeSubscription(final BrokerService brokerService,
+        final String subName) throws Exception {
         final RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
         info.setClientId(clientId);
         info.setSubscriptionName(subName);
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java
index 678935bc1a..a93420e32f 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/ForceDurableNetworkBridgeTest.java
@@ -51,7 +51,6 @@ public class ForceDurableNetworkBridgeTest extends 
DynamicNetworkTestSupport {
     protected String testTopicName2 = "include.nonforced.bar";
     protected String staticTopic = "include.static.bar";
     protected String staticTopic2 = "include.static.nonforced.bar";
-    public static enum FLOW {FORWARD, REVERSE};
     private BrokerService broker1;
     private BrokerService broker2;
     private Session session1;
@@ -126,7 +125,7 @@ public class ForceDurableNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         //Remove the sub
         durSub.close();
         Thread.sleep(1000);
-        removeSubscription(broker1, topic, subName);
+        removeSubscription(broker1, subName);
 
         //The durable should be gone even though there is a consumer left
         //since we are not forcing durable subs
@@ -186,7 +185,7 @@ public class ForceDurableNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         Thread.sleep(1000);
         assertNCDurableSubsCount(broker2, topic, 1);
 
-        removeSubscription(broker1, topic, subName);
+        removeSubscription(broker1, subName);
         assertNCDurableSubsCount(broker2, topic, 0);
     }
 
@@ -201,7 +200,7 @@ public class ForceDurableNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         Thread.sleep(1000);
         assertNCDurableSubsCount(broker2, topic, 1);
 
-        removeSubscription(broker1, topic, subName);
+        removeSubscription(broker1, subName);
         Thread.sleep(1000);
         assertConsumersCount(broker2, topic, 1);
         assertNCDurableSubsCount(broker2, topic, 1);
@@ -225,7 +224,7 @@ public class ForceDurableNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         assertConsumersCount(broker2, topic, 1);
         assertNCDurableSubsCount(broker2, topic, 1);
 
-        removeSubscription(broker1, topic, subName);
+        removeSubscription(broker1, subName);
         assertConsumersCount(broker2, topic, 0);
         assertNCDurableSubsCount(broker2, topic, 0);
     }


Reply via email to