This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 677d71b8e7 ARTEMIS-4366 Missing Mirrored ACKs with MULTICAST and 
subscriptions
677d71b8e7 is described below

commit 677d71b8e7b3bb0afd2fd618af4b86657324e50b
Author: Clebert Suconic <clebertsuco...@apache.org>
AuthorDate: Fri Jul 14 16:49:31 2023 -0400

    ARTEMIS-4366 Missing Mirrored ACKs with MULTICAST and subscriptions
---
 .../artemis/cli/commands/AbstractAction.java       |  17 +-
 .../utils/collections/NodeStoreFactory.java        |  21 ++
 .../api/core/management/ManagementHelper.java      |  37 ++++
 .../amqp/broker/ProtonProtocolManager.java         |   8 +-
 .../connect/mirror/AMQPMirrorControllerSource.java |   6 +-
 .../connect/mirror/AMQPMirrorControllerTarget.java |  12 +-
 .../amqp/connect/mirror/ReferenceNodeStore.java    |  41 +---
 .../connect/mirror/ReferenceNodeStoreFactory.java  |  82 ++++++++
 .../apache/activemq/artemis/core/server/Queue.java |   3 +-
 .../artemis/core/server/impl/QueueImpl.java        |   9 +-
 .../core/server/impl/RoutingContextTest.java       |   4 +-
 .../server/impl/ScheduledDeliveryHandlerTest.java  |   4 +-
 .../integration/amqp/connect/AMQPReplicaTest.java  | 221 ++++++++++++++++++++
 tests/smoke-tests/pom.xml                          |  32 +++
 .../mirrored-subscriptions/broker1/broker.xml      | 226 +++++++++++++++++++++
 .../mirrored-subscriptions/broker2/broker.xml      | 220 ++++++++++++++++++++
 .../brokerConnection/MirroredSubscriptionTest.java | 164 +++++++++++++++
 .../tests/smoke/common/SimpleManagement.java       |  68 +++++++
 .../tests/unit/core/postoffice/impl/FakeQueue.java |   4 +-
 19 files changed, 1107 insertions(+), 72 deletions(-)

diff --git 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java
 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java
index 37f08c35f1..3eebaf4e2c 100644
--- 
a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java
+++ 
b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/AbstractAction.java
@@ -18,7 +18,6 @@ package org.apache.activemq.artemis.cli.commands;
 
 import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
 import org.apache.activemq.artemis.api.core.client.ClientMessage;
-import org.apache.activemq.artemis.api.core.client.ClientRequestor;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
 import org.apache.activemq.artemis.api.core.client.ServerLocator;
@@ -28,25 +27,13 @@ import 
org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
 
 public abstract class AbstractAction extends ConnectionAbstract {
 
+   // TODO: This call could be replaced by a direct call into 
ManagementHelpr.doManagement and their lambdas
    public void performCoreManagement(ManagementCallback<ClientMessage> cb) 
throws Exception {
-
       try (ActiveMQConnectionFactory factory = createCoreConnectionFactory();
            ServerLocator locator = factory.getServerLocator();
            ClientSessionFactory sessionFactory = 
locator.createSessionFactory();
            ClientSession session = sessionFactory.createSession(user, 
password, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)) {
-         session.start();
-         ClientRequestor requestor = new ClientRequestor(session, 
"activemq.management");
-         ClientMessage message = session.createMessage(false);
-
-         cb.setUpInvocation(message);
-
-         ClientMessage reply = requestor.request(message);
-
-         if (ManagementHelper.hasOperationSucceeded(reply)) {
-            cb.requestSuccessful(reply);
-         } else {
-            cb.requestFailed(reply);
-         }
+         ManagementHelper.doManagement(session, cb::setUpInvocation, 
cb::requestSuccessful, cb::requestFailed);
       }
    }
 
diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/NodeStoreFactory.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/NodeStoreFactory.java
new file mode 100644
index 0000000000..2bd6c9c292
--- /dev/null
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/NodeStoreFactory.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+public interface NodeStoreFactory<E> {
+   NodeStore<E> newNodeStore();
+}
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
index 678df28625..69a16a8f4b 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
@@ -16,6 +16,13 @@
  */
 package org.apache.activemq.artemis.api.core.management;
 
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientRequestor;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
 import org.apache.activemq.artemis.json.JsonArray;
 
 import org.apache.activemq.artemis.api.core.ICoreMessage;
@@ -87,6 +94,36 @@ public final class ManagementHelper {
 
    public static final SimpleString HDR_CLIENT_ID = new 
SimpleString("_AMQ_Client_ID");
 
+   // Lambda declaration for management function. Pretty much same thing as 
java.util.function.Consumer but with an exception in the declaration that was 
needed.
+   public interface MessageAcceptor {
+      void accept(ClientMessage message) throws Exception;
+   }
+
+   /** Utility function to connect to a server and perform a management 
operation via core. */
+   public static void doManagement(String uri, String user, String password, 
MessageAcceptor setup, MessageAcceptor ok, MessageAcceptor failed) throws 
Exception {
+      try (ServerLocator locator = ServerLocatorImpl.newLocator(uri);
+           ClientSessionFactory sessionFactory = 
locator.createSessionFactory();
+           ClientSession session = sessionFactory.createSession(user, 
password, false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE)) {
+         doManagement(session, setup, ok, failed);
+      }
+   }
+
+   /** Utility function to reuse a ClientSessionConnection and perform a 
single management operation via core. */
+   public static void doManagement(ClientSession session, MessageAcceptor 
setup, MessageAcceptor ok, MessageAcceptor failed) throws Exception {
+      session.start();
+      ClientRequestor requestor = new ClientRequestor(session, 
"activemq.management");
+      ClientMessage message = session.createMessage(false);
+
+      setup.accept(message);
+
+      ClientMessage reply = requestor.request(message);
+
+      if (ManagementHelper.hasOperationSucceeded(reply)) {
+         ok.accept(reply);
+      } else {
+         failed.accept(reply);
+      }
+   }
 
    /**
     * Stores a resource attribute in a message to retrieve the value from the 
server resource.
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
index a34184ae6b..9355bbdff1 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/ProtonProtocolManager.java
@@ -33,7 +33,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.management.Notification;
 import org.apache.activemq.artemis.core.server.management.NotificationListener;
 import 
org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
-import 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceNodeStore;
+import 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceNodeStoreFactory;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
 import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRoutingHandler;
@@ -75,7 +75,7 @@ public class ProtonProtocolManager extends 
AbstractProtocolManager<AMQPMessage,
 
    // We must use one referenceIDSupplier per server.
    // protocol manager is the perfect aggregation for that.
-   private ReferenceNodeStore referenceIDSupplier;
+   private ReferenceNodeStoreFactory referenceIDSupplier;
 
    private final ProtonProtocolManagerFactory factory;
 
@@ -125,11 +125,11 @@ public class ProtonProtocolManager extends 
AbstractProtocolManager<AMQPMessage,
       routingHandler = new AMQPRoutingHandler(server);
    }
 
-   public synchronized ReferenceNodeStore getReferenceIDSupplier() {
+   public synchronized ReferenceNodeStoreFactory getReferenceIDSupplier() {
       if (referenceIDSupplier == null) {
          // we lazy start the instance.
          // only create it when needed
-         referenceIDSupplier = new ReferenceNodeStore(server);
+         referenceIDSupplier = new ReferenceNodeStoreFactory(server);
       }
       return referenceIDSupplier;
    }
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
index 160e44c1aa..6d943bf2d6 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerSource.java
@@ -92,7 +92,7 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
 
    final Queue snfQueue;
    final ActiveMQServer server;
-   final ReferenceNodeStore idSupplier;
+   final ReferenceNodeStoreFactory idSupplier;
    final boolean acks;
    final boolean addQueues;
    final boolean deleteQueues;
@@ -324,14 +324,14 @@ public class AMQPMirrorControllerSource extends 
BasicMirrorController<Sender> im
       }
    }
 
-   public static void validateProtocolData(ReferenceNodeStore 
referenceIDSupplier, MessageReference ref, SimpleString snfAddress) {
+   public static void validateProtocolData(ReferenceNodeStoreFactory 
referenceIDSupplier, MessageReference ref, SimpleString snfAddress) {
       if (ref.getProtocolData(DeliveryAnnotations.class) == null && 
!ref.getMessage().getAddressSimpleString().equals(snfAddress)) {
          setProtocolData(referenceIDSupplier, ref);
       }
    }
 
    /** This method will return the brokerID used by the message */
-   private static String setProtocolData(ReferenceNodeStore 
referenceIDSupplier, MessageReference ref) {
+   private static String setProtocolData(ReferenceNodeStoreFactory 
referenceIDSupplier, MessageReference ref) {
       String brokerID = referenceIDSupplier.getServerID(ref);
       long id = referenceIDSupplier.getID(ref);
 
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
index 55f35bf645..fa168005a0 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/AMQPMirrorControllerTarget.java
@@ -162,7 +162,7 @@ public class AMQPMirrorControllerTarget extends 
ProtonAbstractReceiver implement
    DuplicateIDCache lruduplicateIDCache;
    String lruDuplicateIDKey;
 
-   private final ReferenceNodeStore referenceNodeStore;
+   private final ReferenceNodeStoreFactory referenceNodeStore;
 
    OperationContext mirrorContext;
 
@@ -367,15 +367,17 @@ public class AMQPMirrorControllerTarget extends 
ProtonAbstractReceiver implement
    }
 
    private void performAck(String nodeID, long messageID, Queue targetQueue, 
ACKMessageOperation ackMessageOperation, AckReason reason, final short retry) {
-      MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, 
messageID, referenceNodeStore);
 
       if (logger.isTraceEnabled()) {
-         logger.trace("performAck (nodeID={}, messageID={}), targetQueue={}). 
Ref={}", nodeID, messageID, targetQueue.getName(), reference);
+         logger.trace("performAck (nodeID={}, messageID={}), targetQueue={})", 
nodeID, messageID, targetQueue.getName());
       }
 
+      MessageReference reference = targetQueue.removeWithSuppliedID(nodeID, 
messageID, referenceNodeStore);
+
+
       if (reference == null) {
          if (logger.isDebugEnabled()) {
-            logger.debug("Retrying Reference not found on messageID={}, 
nodeID={}, currentRetry={}", messageID, nodeID, retry);
+            logger.debug("Retrying Reference not found on messageID={}, 
nodeID={}, queue={}. currentRetry={}", messageID, nodeID, targetQueue, retry);
          }
          switch (retry) {
             case 0:
@@ -404,7 +406,7 @@ public class AMQPMirrorControllerTarget extends 
ProtonAbstractReceiver implement
 
       if (reference != null) {
          if (logger.isTraceEnabled()) {
-            logger.trace("Post ack Server {} worked well for messageID={} 
nodeID={}", server, messageID, nodeID);
+            logger.trace("Post ack Server {} worked well for messageID={} 
nodeID={} queue={}, targetQueue={}", server, messageID, nodeID, 
reference.getQueue(), targetQueue);
          }
          try {
             switch (reason) {
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
index d82560cace..a9ae71a273 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java
@@ -20,20 +20,16 @@ import java.util.HashMap;
 
 import io.netty.util.collection.LongObjectHashMap;
 import org.apache.activemq.artemis.api.core.Message;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.utils.collections.NodeStore;
 import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
 
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
-import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
-
 public class ReferenceNodeStore implements NodeStore<MessageReference> {
 
-   private final String serverID;
+   private final ReferenceNodeStoreFactory factory;
 
-   public ReferenceNodeStore(ActiveMQServer server) {
-      this.serverID = server.getNodeID().toString();
+   public ReferenceNodeStore(ReferenceNodeStoreFactory factory) {
+      this.factory = factory;
    }
 
    // This is where the messages are stored by server id...
@@ -43,10 +39,6 @@ public class ReferenceNodeStore implements 
NodeStore<MessageReference> {
    LongObjectHashMap<LinkedListImpl.Node<MessageReference>> lruMap;
 
 
-   public String getDefaultNodeID() {
-      return serverID;
-   }
-
    @Override
    public void storeNode(MessageReference element, 
LinkedListImpl.Node<MessageReference> node) {
       String list = getServerID(element);
@@ -90,7 +82,7 @@ public class ReferenceNodeStore implements 
NodeStore<MessageReference> {
    /** notice getMap should always return an instance. It should never return 
null. */
    private synchronized 
LongObjectHashMap<LinkedListImpl.Node<MessageReference>> getMap(String 
serverID) {
       if (serverID == null) {
-         serverID = this.serverID; // returning for the localList in case it's 
null
+         serverID = factory.getDefaultNodeID();
       }
 
       if (lruListID != null && lruListID.equals(serverID)) {
@@ -113,34 +105,15 @@ public class ReferenceNodeStore implements 
NodeStore<MessageReference> {
    }
 
    public String getServerID(MessageReference element) {
-      return getServerID(element.getMessage());
+      return factory.getServerID(element);
    }
 
-
    public String getServerID(Message message) {
-      Object nodeID = 
message.getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY);
-      if (nodeID != null) {
-         return nodeID.toString();
-      } else {
-         // it is important to return null here, as the MirrorSource is 
expecting it to be null
-         // in the case the nodeID being from the originating server.
-         // don't be tempted to return this.serverID here.
-         return null;
-      }
+      return factory.getServerID(message);
    }
 
    public long getID(MessageReference element) {
-      Message message = element.getMessage();
-      Long id = getID(message);
-      if (id == null) {
-         return element.getMessageID();
-      } else {
-         return id;
-      }
-   }
-
-   private Long getID(Message message) {
-      return (Long)message.getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
+      return factory.getID(element);
    }
 
    @Override
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStoreFactory.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStoreFactory.java
new file mode 100644
index 0000000000..2782f85175
--- /dev/null
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStoreFactory.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.protocol.amqp.connect.mirror;
+
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.utils.collections.NodeStore;
+import org.apache.activemq.artemis.utils.collections.NodeStoreFactory;
+
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_BROKER_ID_EXTRA_PROPERTY;
+import static 
org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource.INTERNAL_ID_EXTRA_PROPERTY;
+
+public class ReferenceNodeStoreFactory implements 
NodeStoreFactory<MessageReference> {
+
+   final ActiveMQServer server;
+
+   private final String serverID;
+
+   public ReferenceNodeStoreFactory(ActiveMQServer server) {
+      this.server = server;
+      this.serverID = server.getNodeID().toString();
+
+   }
+
+   @Override
+   public NodeStore<MessageReference> newNodeStore() {
+      return new ReferenceNodeStore(this);
+   }
+
+   public String getDefaultNodeID() {
+      return serverID;
+   }
+
+   public String getServerID(MessageReference element) {
+      return getServerID(element.getMessage());
+   }
+
+
+   public String getServerID(Message message) {
+      Object nodeID = 
message.getBrokerProperty(INTERNAL_BROKER_ID_EXTRA_PROPERTY);
+      if (nodeID != null) {
+         return nodeID.toString();
+      } else {
+         // it is important to return null here, as the MirrorSource is 
expecting it to be null
+         // in the case the nodeID being from the originating server.
+         // don't be tempted to return this.serverID here.
+         return null;
+      }
+   }
+
+   public long getID(MessageReference element) {
+      Message message = element.getMessage();
+      Long id = getID(message);
+      if (id == null) {
+         return element.getMessageID();
+      } else {
+         return id;
+      }
+   }
+
+   private Long getID(Message message) {
+      return (Long)message.getBrokerProperty(INTERNAL_ID_EXTRA_PROPERTY);
+   }
+
+
+}
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 2c1319bcd4..e9042a9913 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -37,6 +37,7 @@ import 
org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.ReferenceCounter;
 import org.apache.activemq.artemis.utils.collections.NodeStore;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.NodeStoreFactory;
 import org.apache.activemq.artemis.utils.critical.CriticalComponent;
 
 public interface Queue extends Bindable,CriticalComponent {
@@ -77,7 +78,7 @@ public interface Queue extends Bindable,CriticalComponent {
     *  If the idSupplier returns {@literal < 0} the ID is considered a non 
value (null) and it will be ignored.
     *
     *  @see 
org.apache.activemq.artemis.utils.collections.LinkedList#setNodeStore(NodeStore)
 */
-   MessageReference removeWithSuppliedID(String serverID, long id, 
NodeStore<MessageReference> nodeStore);
+   MessageReference removeWithSuppliedID(String serverID, long id, 
NodeStoreFactory<MessageReference> nodeStore);
 
    /**
     * The queue definition could be durable, but the messages could eventually 
be considered non durable.
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 2ff193595b..9f948355d9 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -116,6 +116,7 @@ import 
org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
 import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
 import org.apache.activemq.artemis.utils.collections.NodeStore;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.NodeStoreFactory;
 import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
 import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
 import org.apache.activemq.artemis.utils.collections.TypedProperties;
@@ -219,9 +220,9 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
    private NodeStore<MessageReference> nodeStore;
 
-   private void checkIDSupplier(NodeStore<MessageReference> nodeStore) {
-      if (this.nodeStore != nodeStore) {
-         this.nodeStore = nodeStore;
+   private void checkIDSupplier(NodeStoreFactory<MessageReference> 
nodeStoreFactory) {
+      if (this.nodeStore == null) {
+         this.nodeStore = nodeStoreFactory.newNodeStore();
          messageReferences.setNodeStore(nodeStore);
       }
    }
@@ -3457,7 +3458,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
    }
 
    @Override
-   public synchronized MessageReference removeWithSuppliedID(String serverID, 
long id, NodeStore<MessageReference> nodeStore) {
+   public synchronized MessageReference removeWithSuppliedID(String serverID, 
long id, NodeStoreFactory<MessageReference> nodeStore) {
       checkIDSupplier(nodeStore);
       MessageReference reference = messageReferences.removeWithID(serverID, 
id);
       if (reference != null) {
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
index 90cd723c48..09c96e8745 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java
@@ -41,7 +41,7 @@ import 
org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.utils.ReferenceCounter;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
-import org.apache.activemq.artemis.utils.collections.NodeStore;
+import org.apache.activemq.artemis.utils.collections.NodeStoreFactory;
 import org.apache.activemq.artemis.utils.critical.CriticalAnalyzer;
 import org.apache.activemq.artemis.utils.critical.CriticalCloseable;
 import org.junit.Assert;
@@ -157,7 +157,7 @@ public class RoutingContextTest {
       }
 
       @Override
-      public MessageReference removeWithSuppliedID(String serverID, long id, 
NodeStore<MessageReference> nodeStore) {
+      public MessageReference removeWithSuppliedID(String serverID, long id, 
NodeStoreFactory<MessageReference> nodeStore) {
          return null;
       }
 
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 5e43510f8c..e4cd2acbe6 100644
--- 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -56,8 +56,8 @@ import 
org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.artemis.utils.ReferenceCounter;
 import org.apache.activemq.artemis.utils.UUID;
-import org.apache.activemq.artemis.utils.collections.NodeStore;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.NodeStoreFactory;
 import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
 import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
 import org.slf4j.Logger;
@@ -868,7 +868,7 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
-      public MessageReference removeWithSuppliedID(String serverID, long id, 
NodeStore<MessageReference> nodeStore) {
+      public MessageReference removeWithSuppliedID(String serverID, long id, 
NodeStoreFactory<MessageReference> nodeStore) {
          return null;
       }
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
index 841c8ec1a6..66fafbc3ca 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPReplicaTest.java
@@ -24,10 +24,16 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.jms.Topic;
 
+import java.lang.invoke.MethodHandles;
 import java.net.URI;
 import java.util.HashSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -59,9 +65,13 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AMQPReplicaTest extends AmqpClientTestSupport {
 
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
    protected static final int AMQP_PORT_2 = 5673;
    protected static final int AMQP_PORT_3 = 5674;
    public static final int TIME_BEFORE_RESTART = 1000;
@@ -834,6 +844,8 @@ public class AMQPReplicaTest extends AmqpClientTestSupport {
    }
 
    public Queue locateQueue(ActiveMQServer server, String queueName) throws 
Exception {
+      Assert.assertNotNull(queueName);
+      Assert.assertNotNull(server);
       Wait.waitFor(() -> server.locateQueue(queueName) != null);
       return server.locateQueue(queueName);
    }
@@ -1077,4 +1089,213 @@ public class AMQPReplicaTest extends 
AmqpClientTestSupport {
       conn.close();
    }
 
+   private void consumeSubscription(int START_ID,
+                                int LAST_ID,
+                                int port,
+                                String clientID,
+                                String queueName,
+                                String subscriptionName,
+                                boolean assertNull) throws JMSException {
+      ConnectionFactory cf = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:" + port);
+      Connection conn = cf.createConnection();
+      conn.setClientID(clientID);
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      conn.start();
+
+      HashSet<Integer> idsReceived = new HashSet<>();
+
+      Topic topic = sess.createTopic(queueName);
+
+      MessageConsumer consumer = sess.createDurableConsumer(topic, 
subscriptionName);
+      for (int i = START_ID; i <= LAST_ID; i++) {
+         Message message = consumer.receive(3000);
+         Assert.assertNotNull(message);
+         Integer id = message.getIntProperty("i");
+         Assert.assertNotNull(id);
+         Assert.assertTrue(idsReceived.add(id));
+      }
+
+      if (assertNull) {
+         Assert.assertNull(consumer.receiveNoWait());
+      }
+
+      for (int i = START_ID; i <= LAST_ID; i++) {
+         Assert.assertTrue(idsReceived.remove(i));
+      }
+
+      Assert.assertTrue(idsReceived.isEmpty());
+      conn.close();
+   }
+
+   @Test
+   public void testMulticast() throws Exception {
+      multiCastReplicaTest(false, false, false, false, true);
+   }
+
+
+   @Test
+   public void testMulticastSerializeConsumption() throws Exception {
+      multiCastReplicaTest(false, false, false, false, false);
+   }
+
+   @Test
+   public void testMulticastTargetPaging() throws Exception {
+      multiCastReplicaTest(false, true, false, false, true);
+   }
+
+   @Test
+   public void testMulticastTargetSourcePaging() throws Exception {
+      multiCastReplicaTest(false, true, true, true, true);
+   }
+
+   @Test
+   public void testMulticastTargetLargeMessage() throws Exception {
+      multiCastReplicaTest(true, true, true, true, true);
+   }
+
+
+   private void multiCastReplicaTest(boolean largeMessage,
+                            boolean pagingTarget,
+                            boolean pagingSource,
+                            boolean restartBrokerConnection, boolean 
multiThreadConsumers) throws Exception {
+
+      String brokerConnectionName = "brokerConnectionName:" + 
UUIDGenerator.getInstance().generateStringUUID();
+      final ActiveMQServer server = this.server;
+      server.setIdentity("targetServer");
+
+      server_2 = createServer(AMQP_PORT_2, false);
+      server_2.setIdentity("server_2");
+      server_2.getConfiguration().setName("thisone");
+
+      AMQPBrokerConnectConfiguration amqpConnection = new 
AMQPBrokerConnectConfiguration(brokerConnectionName, "tcp://localhost:" + 
AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
+      AMQPMirrorBrokerConnectionElement replica = new 
AMQPMirrorBrokerConnectionElement().setMessageAcknowledgements(true).setDurable(true);
+      replica.setName("theReplica");
+      amqpConnection.addElement(replica);
+      server_2.getConfiguration().addAMQPConnection(amqpConnection);
+      server_2.getConfiguration().setName("server_2");
+
+      int NUMBER_OF_MESSAGES = 200;
+
+      server_2.start();
+      server.start();
+      Wait.assertTrue(server_2::isStarted);
+      Wait.assertTrue(server::isStarted);
+
+      // We create the address to avoid auto delete on the queue
+      server_2.addAddressInfo(new 
AddressInfo(getTopicName()).addRoutingType(RoutingType.MULTICAST).setAutoCreated(false));
+
+
+      ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", 
"tcp://localhost:" + AMQP_PORT_2);
+      Connection connection = factory.createConnection();
+      Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+      Topic topic = session.createTopic(getTopicName());
+      MessageProducer producer = session.createProducer(topic);
+
+      for (int i = 0; i <= 1; i++) {
+         // just creating the subscription and not consuming anything
+         consumeSubscription(0, -1, AMQP_PORT_2, "client" + i, getTopicName(), 
"subscription" + i, false);
+      }
+
+      String subs0Name = "client0.subscription0";
+      String subs1Name = "client1.subscription1";
+
+
+      Queue subs0Server1 = locateQueue(server, subs0Name);
+      Queue subs1Server1 = locateQueue(server, subs1Name);
+      Assert.assertNotNull(subs0Server1);
+      Assert.assertNotNull(subs1Server1);
+
+      Queue subs0Server2 = locateQueue(server_2, subs0Name);
+      Queue subs1Server2 = locateQueue(server_2, subs1Name);
+      Assert.assertNotNull(subs0Server2);
+      Assert.assertNotNull(subs1Server2);
+
+      if (pagingTarget) {
+         subs0Server1.getPagingStore().startPaging();
+      }
+
+      if (pagingSource) {
+         subs0Server2.getPagingStore().startPaging();
+      }
+
+      for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+         Message message = session.createTextMessage(getText(largeMessage, i));
+         message.setIntProperty("i", i);
+         producer.send(message);
+      }
+
+      if (pagingTarget) {
+         subs0Server1.getPagingStore().startPaging();
+      }
+
+      Queue snfreplica = server_2.locateQueue(replica.getMirrorSNF());
+
+      Assert.assertNotNull(snfreplica);
+
+      Wait.assertEquals(0, snfreplica::getMessageCount);
+
+      Wait.assertEquals(NUMBER_OF_MESSAGES, subs0Server1::getMessageCount, 
2000);
+      Wait.assertEquals(NUMBER_OF_MESSAGES, subs1Server1::getMessageCount, 
2000);
+
+      Wait.assertEquals(NUMBER_OF_MESSAGES, subs0Server2::getMessageCount, 
2000);
+      Wait.assertEquals(NUMBER_OF_MESSAGES, subs1Server2::getMessageCount, 
2000);
+
+      if (restartBrokerConnection) {
+         // stop and start the broker connection, making sure we wouldn't 
duplicate the mirror
+         server_2.stopBrokerConnection(brokerConnectionName);
+         Thread.sleep(1000);
+         server_2.startBrokerConnection(brokerConnectionName);
+      }
+
+      Assert.assertSame(snfreplica, 
server_2.locateQueue(replica.getMirrorSNF()));
+
+      if (pagingTarget) {
+         assertTrue(subs0Server1.getPagingStore().isPaging());
+         assertTrue(subs1Server1.getPagingStore().isPaging());
+      }
+
+      ExecutorService executorService = Executors.newFixedThreadPool(2);
+      runAfter(executorService::shutdownNow);
+
+      CountDownLatch done = new CountDownLatch(2);
+      AtomicInteger errors = new AtomicInteger(0);
+
+      for (int i = 0; i <= 1; i++) {
+         CountDownLatch threadDone = new CountDownLatch(1);
+         int subscriptionID = i;
+         executorService.execute(() -> {
+            try {
+               consumeSubscription(0, NUMBER_OF_MESSAGES - 1, AMQP_PORT_2, 
"client" + subscriptionID, getTopicName(), "subscription" + subscriptionID, 
false);
+            } catch (Throwable e) {
+               logger.warn(e.getMessage(), e);
+               errors.incrementAndGet();
+            } finally {
+               done.countDown();
+               threadDone.countDown();
+            }
+         });
+         if (!multiThreadConsumers) {
+            threadDone.await(1, TimeUnit.MINUTES);
+         }
+      }
+
+      Assert.assertTrue(done.await(60, TimeUnit.SECONDS));
+      Assert.assertEquals(0, errors.get());
+
+      // Replica is async, so we need to wait acks to arrive before we finish 
consuming there
+      Wait.assertEquals(0, snfreplica::getMessageCount);
+      Wait.assertEquals(0L, subs0Server1::getMessageCount, 2000, 100);
+      Wait.assertEquals(0L, subs1Server1::getMessageCount, 2000, 100);
+      Wait.assertEquals(0L, subs0Server2::getMessageCount, 2000, 100);
+      Wait.assertEquals(0L, subs1Server2::getMessageCount, 2000, 100);
+
+
+      if (largeMessage) {
+         
validateNoFilesOnLargeDir(server.getConfiguration().getLargeMessagesDirectory(),
 0);
+         
validateNoFilesOnLargeDir(server_2.getConfiguration().getLargeMessagesDirectory(),
 0);
+      }
+   }
+
+
+
 }
diff --git a/tests/smoke-tests/pom.xml b/tests/smoke-tests/pom.xml
index 79990fa897..680fc2497a 100644
--- a/tests/smoke-tests/pom.xml
+++ b/tests/smoke-tests/pom.xml
@@ -1366,6 +1366,38 @@
                      </args>
                   </configuration>
                </execution>
+               <execution>
+                  <phase>test-compile</phase>
+                  <id>create-test-Mirror1</id>
+                  <goals>
+                     <goal>create</goal>
+                  </goals>
+                  <configuration>
+                     <role>amq</role>
+                     <user>admin</user>
+                     <password>admin</password>
+                     <allowAnonymous>true</allowAnonymous>
+                     <noWeb>true</noWeb>
+                     
<instance>${basedir}/target/mirrored-subscriptions/broker1</instance>
+                     
<configuration>${basedir}/target/classes/servers/mirrored-subscriptions/broker1</configuration>
+                </configuration>
+               </execution>
+               <execution>
+                  <phase>test-compile</phase>
+                  <id>create-test-Mirror2</id>
+                  <goals>
+                     <goal>create</goal>
+                  </goals>
+                  <configuration>
+                     <role>amq</role>
+                     <user>admin</user>
+                     <password>admin</password>
+                     <allowAnonymous>true</allowAnonymous>
+                     <noWeb>true</noWeb>
+                     
<instance>${basedir}/target/mirrored-subscriptions/broker2</instance>
+                     
<configuration>${basedir}/target/classes/servers/mirrored-subscriptions/broker2</configuration>
+                  </configuration>
+               </execution>
             </executions>
             <dependencies>
                <dependency>
diff --git 
a/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker1/broker.xml
 
b/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker1/broker.xml
new file mode 100644
index 0000000000..763558fec9
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker1/broker.xml
@@ -0,0 +1,226 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="urn:activemq 
/schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="urn:activemq:core ">
+
+      <name>0.0.0.0</name>
+
+      <persistence-enabled>true</persistence-enabled>
+
+      <!-- this could be ASYNCIO or NIO
+       -->
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+      <journal-datasync>true</journal-datasync>
+
+      <journal-min-files>2</journal-min-files>
+
+      <journal-pool-files>-1</journal-pool-files>
+
+      <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+      <security-enabled>false</security-enabled>
+
+      <!--
+        You can verify the network health of a particular NIC by specifying 
the <network-check-NIC> element.
+       <network-check-NIC>theNicName</network-check-NIC>
+        -->
+
+      <!--
+        Use this to use an HTTP server to validate the network
+         
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+      <!-- <network-check-period>10000</network-check-period> -->
+      <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+      <!-- this is a comma separated list, no spaces, just DNS or IPs
+           it should accept IPV6
+
+           Warning: Make sure you understand your network topology as this is 
meant to validate if your network is valid.
+                    Using IPs that could eventually disappear or be partially 
visible may defeat the purpose.
+                    You can use a list of multiple IPs, and if any successful 
ping will make the server OK to continue running -->
+      <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+      <!-- use this to customize the ping used for ipv4 addresses -->
+      <!-- <network-check-ping-command>ping -c 1 -t %d 
%s</network-check-ping-command> -->
+
+      <!-- use this to customize the ping used for ipv6 addresses -->
+      <!-- <network-check-ping6-command>ping6 -c 1 
%2$s</network-check-ping6-command> -->
+
+
+
+
+      <!-- how often we are looking for how many bytes are being used on the 
disk in ms -->
+      <disk-scan-period>5000</disk-scan-period>
+
+      <!-- once the disk hits this limit the system will block, or close the 
connection in certain protocols
+           that won't support flow control. -->
+      <max-disk-usage>90</max-disk-usage>
+
+      <!-- the system will enter into page mode once you hit this limit.
+           This is an estimate in bytes of how much the messages are using in 
memory
+
+            The system will use half of the available memory (-Xmx) by default 
for the global-max-size.
+            You may specify a different value here if you need to customize it 
to your needs.
+
+            <global-max-size>100Mb</global-max-size>
+
+      -->
+
+      <acceptors>
+
+         <!-- useEpoll means: it will use Netty epoll if you are on a system 
(Linux) that supports it -->
+         <!-- useKQueue means: it will use Netty kqueue if you are on a system 
(MacOS) that supports it -->
+         <!-- amqpCredits: The number of credits sent to AMQP producers -->
+         <!-- amqpLowCredits: The server will send the # credits specified at 
amqpCredits at this low mark -->
+
+         <!-- Acceptor for every supported protocol -->
+         <acceptor 
name="artemis">tcp://0.0.0.0:61616?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300</acceptor>
+      </acceptors>
+
+      <broker-connections>
+         <amqp-connection uri="tcp://localhost:61617" name="mirror" 
retry-interval="100">
+            <mirror/>
+         </amqp-connection>
+      </broker-connections>
+
+      <security-settings>
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="guest"/>
+            <permission type="deleteNonDurableQueue" roles="guest"/>
+            <permission type="createDurableQueue" roles="guest"/>
+            <permission type="deleteDurableQueue" roles="guest"/>
+            <permission type="createAddress" roles="guest"/>
+            <permission type="deleteAddress" roles="guest"/>
+            <permission type="consume" roles="guest"/>
+            <permission type="browse" roles="guest"/>
+            <permission type="send" roles="guest"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="guest"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to 
be auto-create -->
+         <address-setting match="activemq.management#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <address-setting match="myQueue">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            <default-max-consumers>1</default-max-consumers>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+
+         <address-setting match="myTopic">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            <default-max-consumers>1</default-max-consumers>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+
+         <address-setting match="myTopicPaging">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>100K</max-size-bytes>
+            <default-max-consumers>1</default-max-consumers>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ"/>
+            </anycast>
+         </address>
+         <address name="ExpiryQueue">
+            <anycast>
+               <queue name="ExpiryQueue"/>
+            </anycast>
+         </address>
+         <address name="myQueue">
+            <anycast>
+               <!-- this should be maxed from the default -->
+               <queue name="myQueue">
+               </queue>
+            </anycast>
+         </address>
+         <address name="myTopic">
+            <multicast/>
+         </address>
+         <address name="myTopicPaging">
+            <multicast/>
+         </address>
+
+      </addresses>
+
+   </core>
+</configuration>
diff --git 
a/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker2/broker.xml
 
b/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker2/broker.xml
new file mode 100644
index 0000000000..33c8a7ca3a
--- /dev/null
+++ 
b/tests/smoke-tests/src/main/resources/servers/mirrored-subscriptions/broker2/broker.xml
@@ -0,0 +1,220 @@
+<?xml version='1.0'?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<configuration xmlns="urn:activemq"
+               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+               xsi:schemaLocation="urn:activemq 
/schema/artemis-configuration.xsd">
+
+   <core xmlns="urn:activemq:core" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="urn:activemq:core ">
+
+      <name>0.0.0.0</name>
+
+      <persistence-enabled>true</persistence-enabled>
+
+      <!-- this could be ASYNCIO or NIO
+       -->
+      <journal-type>NIO</journal-type>
+
+      <paging-directory>./data/paging</paging-directory>
+
+      <bindings-directory>./data/bindings</bindings-directory>
+
+      <journal-directory>./data/journal</journal-directory>
+
+      
<large-messages-directory>./data/large-messages</large-messages-directory>
+
+      <journal-datasync>true</journal-datasync>
+
+      <journal-min-files>2</journal-min-files>
+
+      <journal-pool-files>-1</journal-pool-files>
+
+      <message-expiry-scan-period>1000</message-expiry-scan-period>
+
+      <security-enabled>false</security-enabled>
+
+      <!--
+        You can verify the network health of a particular NIC by specifying 
the <network-check-NIC> element.
+       <network-check-NIC>theNicName</network-check-NIC>
+        -->
+
+      <!--
+        Use this to use an HTTP server to validate the network
+         
<network-check-URL-list>http://www.apache.org</network-check-URL-list> -->
+
+      <!-- <network-check-period>10000</network-check-period> -->
+      <!-- <network-check-timeout>1000</network-check-timeout> -->
+
+      <!-- this is a comma separated list, no spaces, just DNS or IPs
+           it should accept IPV6
+
+           Warning: Make sure you understand your network topology as this is 
meant to validate if your network is valid.
+                    Using IPs that could eventually disappear or be partially 
visible may defeat the purpose.
+                    You can use a list of multiple IPs, and if any successful 
ping will make the server OK to continue running -->
+      <!-- <network-check-list>10.0.0.1</network-check-list> -->
+
+      <!-- use this to customize the ping used for ipv4 addresses -->
+      <!-- <network-check-ping-command>ping -c 1 -t %d 
%s</network-check-ping-command> -->
+
+      <!-- use this to customize the ping used for ipv6 addresses -->
+      <!-- <network-check-ping6-command>ping6 -c 1 
%2$s</network-check-ping6-command> -->
+
+
+      <!-- how often we are looking for how many bytes are being used on the 
disk in ms -->
+      <disk-scan-period>5000</disk-scan-period>
+
+      <!-- once the disk hits this limit the system will block, or close the 
connection in certain protocols
+           that won't support flow control. -->
+      <max-disk-usage>90</max-disk-usage>
+
+      <!-- the system will enter into page mode once you hit this limit.
+           This is an estimate in bytes of how much the messages are using in 
memory
+
+            The system will use half of the available memory (-Xmx) by default 
for the global-max-size.
+            You may specify a different value here if you need to customize it 
to your needs.
+
+            <global-max-size>100Mb</global-max-size>
+
+      -->
+
+      <acceptors>
+
+         <!-- useEpoll means: it will use Netty epoll if you are on a system 
(Linux) that supports it -->
+         <!-- useKQueue means: it will use Netty kqueue if you are on a system 
(MacOS) that supports it -->
+         <!-- amqpCredits: The number of credits sent to AMQP producers -->
+         <!-- amqpLowCredits: The server will send the # credits specified at 
amqpCredits at this low mark -->
+
+         <!-- Acceptor for every supported protocol -->
+         <acceptor name="artemis">
+            
tcp://0.0.0.0:61617?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=CORE,AMQP,STOMP,HORNETQ,MQTT,OPENWIRE;useEpoll=true;useKQueue;amqpCredits=1000;amqpLowCredits=300
+         </acceptor>
+      </acceptors>
+
+      <security-settings>
+         <security-setting match="#">
+            <permission type="createNonDurableQueue" roles="guest"/>
+            <permission type="deleteNonDurableQueue" roles="guest"/>
+            <permission type="createDurableQueue" roles="guest"/>
+            <permission type="deleteDurableQueue" roles="guest"/>
+            <permission type="createAddress" roles="guest"/>
+            <permission type="deleteAddress" roles="guest"/>
+            <permission type="consume" roles="guest"/>
+            <permission type="browse" roles="guest"/>
+            <permission type="send" roles="guest"/>
+            <!-- we need this otherwise ./artemis data imp wouldn't work -->
+            <permission type="manage" roles="guest"/>
+         </security-setting>
+      </security-settings>
+
+      <address-settings>
+         <!-- if you define auto-create on certain queues, management has to 
be auto-create -->
+         <address-setting match="activemq.management#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <!--default for catch all-->
+         <address-setting match="#">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+         <address-setting match="myQueue">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            <default-max-consumers>1</default-max-consumers>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+
+         <address-setting match="myTopic">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>-1</max-size-bytes>
+            <default-max-consumers>1</default-max-consumers>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+
+         <address-setting match="myTopicPaging">
+            <dead-letter-address>DLQ</dead-letter-address>
+            <expiry-address>ExpiryQueue</expiry-address>
+            <redelivery-delay>0</redelivery-delay>
+            <!-- with -1 only the global-max-size is in use for limiting -->
+            <max-size-bytes>100K</max-size-bytes>
+            <default-max-consumers>1</default-max-consumers>
+            
<message-counter-history-day-limit>10</message-counter-history-day-limit>
+            <address-full-policy>PAGE</address-full-policy>
+            <auto-create-queues>true</auto-create-queues>
+            <auto-create-addresses>true</auto-create-addresses>
+         </address-setting>
+      </address-settings>
+
+      <addresses>
+         <address name="DLQ">
+            <anycast>
+               <queue name="DLQ"/>
+            </anycast>
+         </address>
+         <address name="ExpiryQueue">
+            <anycast>
+               <queue name="ExpiryQueue"/>
+            </anycast>
+         </address>
+         <address name="myQueue">
+            <anycast>
+               <!-- this should be maxed from the default -->
+               <queue name="myQueue">
+               </queue>
+            </anycast>
+         </address>
+         <address name="myTopic">
+            <multicast/>
+         </address>
+         <address name="myTopicPaging">
+            <multicast/>
+         </address>
+
+      </addresses>
+
+   </core>
+</configuration>
diff --git 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirroredSubscriptionTest.java
 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirroredSubscriptionTest.java
new file mode 100644
index 0000000000..69242281a9
--- /dev/null
+++ 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/brokerConnection/MirroredSubscriptionTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <br>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <br>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.brokerConnection;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import java.lang.invoke.MethodHandles;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.activemq.artemis.tests.smoke.common.SimpleManagement;
+import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
+import org.apache.activemq.artemis.tests.util.CFUtil;
+import org.apache.activemq.artemis.util.ServerUtil;
+import org.apache.activemq.artemis.utils.Wait;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MirroredSubscriptionTest extends SmokeTestBase {
+
+   public static final String SERVER_NAME_A = "mirrored-subscriptions/broker1";
+   public static final String SERVER_NAME_B = "mirrored-subscriptions/broker2";
+   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+   // Change this to true to generate a print-data in certain cases on this 
test
+   private static final boolean PRINT_DATA = false;
+   private static final String JMX_SERVER_HOSTNAME = "localhost";
+   private static final int JMX_SERVER_PORT = 11099;
+
+   Process processB;
+   Process processA;
+
+   @Before
+   public void beforeClass() throws Exception {
+      cleanupData(SERVER_NAME_A);
+      cleanupData(SERVER_NAME_B);
+      processB = startServer(SERVER_NAME_B, 1, 0);
+      processA = startServer(SERVER_NAME_A, 0, 0);
+
+      ServerUtil.waitForServerToStart(1, "B", "B", 30000);
+      ServerUtil.waitForServerToStart(0, "A", "A", 30000);
+   }
+
+   @Test
+   public void testSend() throws Throwable {
+
+      int COMMIT_INTERVAL = 100;
+      int NUMBER_OF_MESSAGES = 500;
+      int CLIENTS = 2;
+      String mainURI = "tcp://localhost:61616";
+      String secondURI = "tcp://localhost:61617";
+
+      String topicName = "myTopic";
+
+      ConnectionFactory cf = CFUtil.createConnectionFactory("amqp", mainURI);
+
+      for (int i = 0; i < CLIENTS; i++) {
+         try (Connection connection = cf.createConnection()) {
+            connection.setClientID("client" + i);
+            Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+            Topic topic = session.createTopic(topicName);
+            session.createDurableSubscriber(topic, "subscription" + i);
+         }
+      }
+
+      try (Connection connection = cf.createConnection()) {
+         Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+         Topic topic = session.createTopic(topicName);
+         MessageProducer producer = session.createProducer(topic);
+         for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
+            producer.send(session.createTextMessage("hello " + i));
+            if (i % COMMIT_INTERVAL == 0) {
+               session.commit();
+            }
+         }
+         session.commit();
+      }
+
+      Map<String, Integer> result = SimpleManagement.listQueues(mainURI, null, 
null, 100);
+      result.entrySet().forEach(entry -> System.out.println("Queue " + 
entry.getKey() + "=" + entry.getValue()));
+
+      checkMessages(NUMBER_OF_MESSAGES, CLIENTS, mainURI, secondURI);
+
+      ExecutorService executorService = Executors.newFixedThreadPool(CLIENTS);
+      runAfter(executorService::shutdownNow);
+
+      CountDownLatch done = new CountDownLatch(CLIENTS);
+      AtomicInteger errors = new AtomicInteger(0);
+
+      for (int i = 0; i < CLIENTS; i++) {
+         final int clientID = i;
+         executorService.execute(() -> {
+            try (Connection connection = cf.createConnection()) {
+               connection.setClientID("client" + clientID);
+               connection.start();
+               Session session = connection.createSession(true, 
Session.SESSION_TRANSACTED);
+               Topic topic = session.createTopic(topicName);
+               TopicSubscriber subscriber = 
session.createDurableSubscriber(topic, "subscription" + clientID);
+               for (int messageI = 0; messageI < NUMBER_OF_MESSAGES; 
messageI++) {
+                  TextMessage message = (TextMessage) subscriber.receive(5000);
+                  Assert.assertNotNull(message);
+                  if (messageI % COMMIT_INTERVAL == 0) {
+                     session.commit();
+                  }
+               }
+               session.commit();
+            } catch (Throwable e) {
+               logger.warn(e.getMessage(), e);
+               errors.incrementAndGet();
+            } finally {
+               done.countDown();
+            }
+         });
+      }
+
+      Assert.assertTrue(done.await(300, TimeUnit.SECONDS));
+      Assert.assertEquals(0, errors.get());
+      checkMessages(0, CLIENTS, mainURI, secondURI);
+   }
+
+   private void checkMessages(int NUMBER_OF_MESSAGES, int CLIENTS, String 
mainURI, String secondURI) throws Exception {
+      for (int i = 0; i < CLIENTS; i++) {
+         final int clientID = i;
+         Wait.assertEquals(NUMBER_OF_MESSAGES, () -> getMessageCount(mainURI, 
"client" + clientID + ".subscription" + clientID));
+         Wait.assertEquals(NUMBER_OF_MESSAGES, () -> 
getMessageCount(secondURI, "client" + clientID + ".subscription" + clientID));
+      }
+   }
+
+   int getMessageCount(String uri, String queueName) throws Exception {
+      Map<String, Integer> result = SimpleManagement.listQueues(uri, null, 
null, 100);
+      Integer resultReturn = result.get(queueName);
+
+      logger.debug("Result = {}, queueName={}, returnValue = {}", result, 
queueName, resultReturn);
+      return resultReturn == null ? 0 : resultReturn;
+
+   }
+
+}
diff --git 
a/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SimpleManagement.java
 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SimpleManagement.java
new file mode 100644
index 0000000000..5b9a34b2df
--- /dev/null
+++ 
b/tests/smoke-tests/src/test/java/org/apache/activemq/artemis/tests/smoke/common/SimpleManagement.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.smoke.common;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.JsonUtil;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.management.ManagementHelper;
+import org.apache.activemq.artemis.json.JsonArray;
+import org.apache.activemq.artemis.json.JsonObject;
+
+public class SimpleManagement {
+
+   private static final String SIMPLE_OPTIONS  = 
"{\"field\":\"\",\"value\":\"\",\"operation\":\"\"}";
+
+   /** Simple management function that will return a list of Pair<Name of 
Queue, Number of Messages> */
+   public static Map<String, Integer> listQueues(String uri, String user, 
String password, int maxRows) throws Exception {
+      Map<String, Integer> queues = new HashMap<>();
+      ManagementHelper.doManagement(uri, user, password, t -> 
setupListQueue(t, maxRows), t -> listQueueResult(t, queues), 
SimpleManagement::failed);
+      return queues;
+   }
+
+   private static void setupListQueue(ClientMessage m, int maxRows) throws 
Exception {
+      ManagementHelper.putOperationInvocation(m, "broker", "listQueues", 
SIMPLE_OPTIONS, 1, maxRows);
+   }
+
+   private static void listQueueResult(ClientMessage message, Map<String, 
Integer> mapQueues) throws Exception {
+
+      final String result = (String) ManagementHelper.getResult(message, 
String.class);
+
+
+      JsonObject queuesAsJsonObject = JsonUtil.readJsonObject(result);
+      JsonArray array = queuesAsJsonObject.getJsonArray("data");
+
+      for (int i = 0; i < array.size(); i++) {
+         JsonObject object = array.getJsonObject(i);
+         String name = object.getString("name");
+         String messageCount = object.getString("messageCount");
+         mapQueues.put(name, Integer.parseInt(messageCount));
+      }
+
+   }
+
+   private static void failed(ClientMessage message) throws Exception {
+
+      final String result = (String) ManagementHelper.getResult(message, 
String.class);
+
+      throw new Exception("Failed " + result);
+   }
+
+}
diff --git 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index bb779ac2a0..cd30f4b054 100644
--- 
a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ 
b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -40,8 +40,8 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.impl.AckReason;
 import org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.ReferenceCounter;
-import org.apache.activemq.artemis.utils.collections.NodeStore;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
+import org.apache.activemq.artemis.utils.collections.NodeStoreFactory;
 import org.apache.activemq.artemis.utils.critical.CriticalComponentImpl;
 import org.apache.activemq.artemis.utils.critical.EmptyCriticalAnalyzer;
 
@@ -143,7 +143,7 @@ public class FakeQueue extends CriticalComponentImpl 
implements Queue {
    }
 
    @Override
-   public MessageReference removeWithSuppliedID(String serverID, long id, 
NodeStore<MessageReference> nodeStore) {
+   public MessageReference removeWithSuppliedID(String serverID, long id, 
NodeStoreFactory<MessageReference> nodeStore) {
       return null;
    }
 

Reply via email to