This is an automated email from the ASF dual-hosted git repository.

michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 4484d05  ARTEMIS-2226 last consumer connection should close the 
previous consumer connection
     new ed05bbf  This closes #2528
4484d05 is described below

commit 4484d05cf0d15c9b9388ea014b91d482b2f14a7b
Author: onlyMIT <liangshipi...@qq.com>
AuthorDate: Tue Jan 29 17:58:36 2019 +0800

    ARTEMIS-2226 last consumer connection should close the previous consumer 
connection
    
    Multiple consumers using the same clientId in the cluster, the last 
consumer connection should close the previous consumer connection!
    
    ARTEMIS-2226 last consumer connection should close the previous consumer 
connection
    
    to address apache-rat-plugin:0.12:check
    
    ARTEMIS-2226 last consumer connection should close the previous consumer 
connection
    
    to address checkstyle
    
    ARTEMIS-2226 last consumer connection should close the previous consumer 
connection
    
    adjust the code structure
    
    ARTEMIS-2226 last consumer connection should close the previous consumer 
connection
    
    adjust the code structure
    
    ARTEMIS-2226 last consumer connection should close the previous consumer 
connection
    
    adjust the code structure
    
    ARTEMIS-2226 last consumer connection should close the previous consumer 
connection
    
    adjust the code structure
    
    ARTEMIS-2226 last consumer connection should close the previous consumer 
connection
    
    adjust the code structure
    
    ARTEMIS-2226 last consumer connection should close the previous consumer 
connection
    
    add javadoc
---
 .../api/core/management/ManagementHelper.java      |    4 +
 .../core/protocol/mqtt/MQTTConnectionManager.java  |    9 +-
 .../core/protocol/mqtt/MQTTProtocolManager.java    |   60 +-
 .../protocol/mqtt/MQTTProtocolManagerFactory.java  |    6 +-
 .../artemis/core/protocol/mqtt/MQTTSession.java    |   10 +-
 .../management/impl/ActiveMQServerControlImpl.java |   12 +-
 .../core/remoting/impl/AbstractAcceptor.java       |    5 +
 .../cluster/impl/ClusterConnectionBridge.java      |   16 +-
 .../server/cluster/impl/ClusterConnectionImpl.java |   17 +
 .../core/server/impl/ServerSessionImpl.java        |   17 +
 .../impl/NotificationActiveMQServerPlugin.java     |   46 -
 .../integration/management/NotificationTest.java   |    1 -
 .../tests/integration/mqtt/imported/MQTTTest.java  |   12 +-
 .../integration/mqtt/imported/MQTTTestSupport.java |   18 +
 .../imported/MqttClusterRemoteSubscribeTest.java   | 1032 ++++++++++++++++++--
 15 files changed, 1093 insertions(+), 172 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
index bba8419..53cb087 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ManagementHelper.java
@@ -84,6 +84,10 @@ public final class ManagementHelper {
 
    public static final SimpleString HDR_MESSAGE_ID = new 
SimpleString("_AMQ_Message_ID");
 
+   public static final SimpleString HDR_PROTOCOL_NAME = new 
SimpleString("_AMQ_Protocol_Name");
+
+   public static final SimpleString HDR_CLIENT_ID = new 
SimpleString("_AMQ_Client_ID");
+
    // Attributes ----------------------------------------------------
 
    // Static --------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index bc511ea..8efea0a 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -162,14 +162,7 @@ public class MQTTConnectionManager {
    }
 
    private MQTTSessionState getSessionState(String clientId) {
-      /* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise 
create a new one. */
-      MQTTSessionState state = MQTTSession.SESSIONS.get(clientId);
-      if (state == null) {
-         state = new MQTTSessionState(clientId);
-         MQTTSession.SESSIONS.put(clientId, state);
-      }
-
-      return state;
+      return session.getProtocolManager().getSessionState(clientId);
    }
 
    private String validateClientId(String clientId, boolean cleanSession) {
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index 71d30d8..6e91443 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -18,9 +18,9 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
@@ -30,6 +30,9 @@ import io.netty.handler.codec.mqtt.MqttEncoder;
 import io.netty.handler.codec.mqtt.MqttMessage;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
+import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import 
org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.management.Notification;
@@ -40,11 +43,12 @@ import 
org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.collections.TypedProperties;
 
 /**
  * MQTTProtocolManager
  */
-class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, 
MQTTInterceptor, MQTTConnection> implements NotificationListener {
+public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, 
MQTTInterceptor, MQTTConnection> implements NotificationListener {
 
    private static final List<String> websocketRegistryNames = 
Arrays.asList("mqtt", "mqttv3.1");
 
@@ -55,18 +59,53 @@ class MQTTProtocolManager extends 
AbstractProtocolManager<MqttMessage, MQTTInter
    private final List<MQTTInterceptor> outgoingInterceptors = new 
ArrayList<>();
 
    //TODO Read in a list of existing client IDs from stored Sessions.
-   private Map<String, MQTTConnection> connectedClients = new 
ConcurrentHashMap<>();
+   private final Map<String, MQTTConnection> connectedClients;
+   private final Map<String, MQTTSessionState> sessionStates;
 
    MQTTProtocolManager(ActiveMQServer server,
+                       Map<String, MQTTConnection> connectedClients,
+                       Map<String, MQTTSessionState> sessionStates,
                        List<BaseInterceptor> incomingInterceptors,
                        List<BaseInterceptor> outgoingInterceptors) {
       this.server = server;
+      this.connectedClients = connectedClients;
+      this.sessionStates = sessionStates;
       this.updateInterceptors(incomingInterceptors, outgoingInterceptors);
+      server.getManagementService().addNotificationListener(this);
    }
 
    @Override
    public void onNotification(Notification notification) {
-      // TODO handle notifications
+      if (!(notification.getType() instanceof CoreNotificationType))
+         return;
+
+      CoreNotificationType type = (CoreNotificationType) 
notification.getType();
+      if (type != CoreNotificationType.SESSION_CREATED)
+         return;
+
+      TypedProperties props = notification.getProperties();
+
+      SimpleString protocolName = 
props.getSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME);
+
+      //Only process SESSION_CREATED notifications for the MQTT protocol
+      if (protocolName == null || 
!protocolName.toString().equals(MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME))
+         return;
+
+      int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE);
+
+      //distance > 0 means only processing notifications which are received 
from other nodes in the cluster
+      if (distance > 0) {
+         String clientId = 
props.getSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID).toString();
+         /*
+          * If there is a connection in the node with the same clientId as the 
value of the "_AMQ_Client_ID" attribute
+          * in the SESSION_CREATED notification, you need to close this 
connection.
+          * Avoid consumers with the same client ID in the cluster appearing 
at different nodes at the same time.
+          */
+         MQTTConnection mqttConnection = connectedClients.get(clientId);
+         if (mqttConnection != null) {
+            mqttConnection.destroy();
+         }
+      }
    }
 
    @Override
@@ -201,4 +240,17 @@ class MQTTProtocolManager extends 
AbstractProtocolManager<MqttMessage, MQTTInter
    public MQTTConnection addConnectedClient(String clientId, MQTTConnection 
connection) {
       return connectedClients.put(clientId, connection);
    }
+
+   public MQTTSessionState getSessionState(String clientId) {
+      /* [MQTT-3.1.2-4] Attach an existing session if one exists otherwise 
create a new one. */
+      return sessionStates.computeIfAbsent(clientId, MQTTSessionState::new);
+   }
+
+   public MQTTSessionState removeSessionState(String clientId) {
+      return sessionStates.remove(clientId);
+   }
+
+   public Map<String, MQTTSessionState> getSessionStates() {
+      return new HashMap<>(sessionStates);
+   }
 }
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
index 453b267..74a29e6 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManagerFactory.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.artemis.api.core.BaseInterceptor;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -37,12 +38,15 @@ public class MQTTProtocolManagerFactory extends 
AbstractProtocolManagerFactory<M
 
    private static final String[] SUPPORTED_PROTOCOLS = {MQTT_PROTOCOL_NAME};
 
+   private final Map<String, MQTTConnection> connectedClients  = new 
ConcurrentHashMap<>();
+   private final Map<String, MQTTSessionState> sessionStates = new 
ConcurrentHashMap<>();
+
    @Override
    public ProtocolManager createProtocolManager(ActiveMQServer server,
                                                 final Map<String, Object> 
parameters,
                                                 List<BaseInterceptor> 
incomingInterceptors,
                                                 List<BaseInterceptor> 
outgoingInterceptors) throws Exception {
-      return BeanSupport.setData(new MQTTProtocolManager(server, 
incomingInterceptors, outgoingInterceptors), parameters);
+      return BeanSupport.setData(new MQTTProtocolManager(server, 
connectedClients, sessionStates, incomingInterceptors, outgoingInterceptors), 
parameters);
    }
 
    @Override
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index 640b893..b788f36 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -17,10 +17,7 @@
 
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
-import java.util.HashMap;
-import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import org.apache.activemq.artemis.core.message.impl.CoreMessageObjectPools;
@@ -30,8 +27,6 @@ import 
org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
 
 public class MQTTSession {
 
-   static Map<String, MQTTSessionState> SESSIONS = new ConcurrentHashMap<>();
-
    private final String id = UUID.randomUUID().toString();
 
    private MQTTProtocolHandler protocolHandler;
@@ -108,7 +103,7 @@ public class MQTTSession {
 
          if (isClean()) {
             clean();
-            SESSIONS.remove(connection.getClientID());
+            protocolManager.removeSessionState(connection.getClientID());
          }
       }
       stopped = true;
@@ -201,7 +196,4 @@ public class MQTTSession {
       return coreMessageObjectPools;
    }
 
-   public static Map<String, MQTTSessionState> getSessions() {
-      return new HashMap<>(SESSIONS);
-   }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 318d880..26b4eca 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -59,6 +59,7 @@ import 
org.apache.activemq.artemis.api.core.management.AddressControl;
 import org.apache.activemq.artemis.api.core.management.BridgeControl;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.api.core.management.DivertControl;
+import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.api.core.management.Parameter;
 import org.apache.activemq.artemis.api.core.management.QueueControl;
 import org.apache.activemq.artemis.core.client.impl.Topology;
@@ -2967,7 +2968,16 @@ public class ActiveMQServerControlImpl extends 
AbstractControl implements Active
       if (!(notification.getType() instanceof CoreNotificationType))
          return;
       CoreNotificationType type = (CoreNotificationType) 
notification.getType();
-      TypedProperties prop = notification.getProperties();
+      if (type == CoreNotificationType.SESSION_CREATED) {
+         TypedProperties props = notification.getProperties();
+         /*
+          * If the SESSION_CREATED notification is received from another node 
in the cluster, no broadcast call is made.
+          * To keep the original logic to avoid calling the broadcast multiple 
times for the same SESSION_CREATED notification in the cluster.
+          */
+         if (props.getIntProperty(ManagementHelper.HDR_DISTANCE) > 0) {
+            return;
+         }
+      }
 
       this.broadcaster.sendNotification(new Notification(type.toString(), 
this, notifSeq.incrementAndGet(), notification.toString()));
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/AbstractAcceptor.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/AbstractAcceptor.java
index a2f30f3..1aa1dff 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/AbstractAcceptor.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/remoting/impl/AbstractAcceptor.java
@@ -17,6 +17,7 @@
 
 package org.apache.activemq.artemis.core.remoting.impl;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -43,4 +44,8 @@ public abstract class AbstractAcceptor implements Acceptor {
       }
    }
 
+   public Map<String, ProtocolManager> getProtocolMap() {
+      return Collections.unmodifiableMap(protocolMap);
+   }
+
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
index a9d80e5..f7e2817 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionBridge.java
@@ -233,6 +233,8 @@ public class ClusterConnectionBridge extends BridgeImpl {
                                                    " AND " +
                                                    
ManagementHelper.HDR_NOTIFICATION_TYPE +
                                                    " IN ('" +
+                                                   
CoreNotificationType.SESSION_CREATED +
+                                                   "','" +
                                                    
CoreNotificationType.BINDING_ADDED +
                                                    "','" +
                                                    
CoreNotificationType.BINDING_REMOVED +
@@ -252,6 +254,8 @@ public class ClusterConnectionBridge extends BridgeImpl {
                                                    flowRecord.getMaxHops() +
                                                    " AND (" +
                                                    
createSelectorFromAddress(appendIgnoresToFilter(flowRecord.getAddress())) +
+                                                   ") AND (" +
+                                                   
createPermissiveManagementNotificationToFilter() +
                                                    ")");
 
          sessionConsumer.createTemporaryQueue(managementNotificationAddress, 
notifQueueName, filter);
@@ -351,10 +355,20 @@ public class ClusterConnectionBridge extends BridgeImpl {
       }
       filterString += "!" + storeAndForwardPrefix;
       filterString += ",!" + managementAddress;
-      filterString += ",!" + managementNotificationAddress;
       return filterString;
    }
 
+   /**
+    * Create a filter rule,in addition to SESSION_CREATED notifications, all 
other notifications using managementNotificationAddress
+    * as the routing address will be filtered.
+    * @return
+    */
+   private String createPermissiveManagementNotificationToFilter() {
+      StringBuilder filterBuilder = new 
StringBuilder(ManagementHelper.HDR_NOTIFICATION_TYPE).append(" = '")
+              .append(CoreNotificationType.SESSION_CREATED).append("' OR 
(").append(ManagementHelper.HDR_ADDRESS)
+              .append(" NOT LIKE 
'").append(managementNotificationAddress).append("%')");
+      return filterBuilder.toString();
+   }
 
    @Override
    protected void nodeUP(TopologyMember member, boolean last) {
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
index 4b884b5..aa68b81 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java
@@ -1078,6 +1078,10 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
                doUnProposalReceived(message);
                break;
             }
+            case SESSION_CREATED: {
+               doSessionCreated(message);
+               break;
+            }
             default: {
                throw ActiveMQMessageBundle.BUNDLE.invalidType(ntype);
             }
@@ -1303,6 +1307,19 @@ public final class ClusterConnectionImpl implements 
ClusterConnection, AfterConn
          binding.disconnect();
       }
 
+      private synchronized void doSessionCreated(final ClientMessage message) 
throws Exception {
+         if (logger.isTraceEnabled()) {
+            logger.trace(ClusterConnectionImpl.this + " session created " + 
message);
+         }
+         TypedProperties props = new TypedProperties();
+         props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME, 
message.getSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME));
+         props.putSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS, 
message.getSimpleStringProperty(ManagementHelper.HDR_REMOTE_ADDRESS));
+         props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID, 
message.getSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID));
+         props.putSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME, 
message.getSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME));
+         props.putIntProperty(ManagementHelper.HDR_DISTANCE, 
message.getIntProperty(ManagementHelper.HDR_DISTANCE) + 1);
+         managementService.sendNotification(new Notification(null, 
CoreNotificationType.SESSION_CREATED, props));
+      }
+
       private synchronized void doConsumerCreated(final ClientMessage message) 
throws Exception {
          if (logger.isTraceEnabled()) {
             logger.trace(ClusterConnectionImpl.this + " Consumer created " + 
message);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 6464eee..04322df 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -272,6 +272,8 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
       if (!xa) {
          tx = newTransaction();
       }
+      //When the ServerSessionImpl initialization is complete, need to create 
and send a SESSION_CREATED notification.
+      sendSessionNotification(CoreNotificationType.SESSION_CREATED);
    }
 
    // ServerSession implementation 
---------------------------------------------------------------------------
@@ -422,6 +424,8 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
          }
 
          closed = true;
+         //When the ServerSessionImpl is closed, need to create and send a 
SESSION_CLOSED notification.
+         sendSessionNotification(CoreNotificationType.SESSION_CLOSED);
 
          if (server.hasBrokerSessionPlugins()) {
             server.callBrokerSessionPlugins(plugin -> 
plugin.afterCloseSession(this, failed));
@@ -429,6 +433,19 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
       }
    }
 
+   private void sendSessionNotification(final CoreNotificationType type) 
throws Exception {
+      final TypedProperties props = new TypedProperties();
+      props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME, 
SimpleString.toSimpleString(this.getConnectionID().toString()));
+      props.putSimpleStringProperty(ManagementHelper.HDR_USER, 
SimpleString.toSimpleString(this.getUsername()));
+      props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, 
SimpleString.toSimpleString(this.getName()));
+
+      props.putSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID, 
SimpleString.toSimpleString(this.remotingConnection.getClientID()));
+      props.putSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME, 
SimpleString.toSimpleString(this.remotingConnection.getProtocolName()));
+      props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, 
managementService.getManagementNotificationAddress());
+      props.putIntProperty(ManagementHelper.HDR_DISTANCE, 0);
+      managementService.sendNotification(new Notification(null, type, props));
+   }
+
    private void securityCheck(SimpleString address, CheckType checkType, 
SecurityAuth auth) throws Exception {
       if (securityEnabled) {
          securityStore.check(address, checkType, auth);
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java
index 29846aa..880f970 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/plugin/impl/NotificationActiveMQServerPlugin.java
@@ -26,7 +26,6 @@ import 
org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
-import org.apache.activemq.artemis.core.server.ServerSession;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.core.server.management.ManagementService;
 import org.apache.activemq.artemis.core.server.management.Notification;
@@ -43,13 +42,11 @@ public class NotificationActiveMQServerPlugin implements 
ActiveMQServerPlugin {
    private static final Logger logger = 
Logger.getLogger(NotificationActiveMQServerPlugin.class);
 
    public static final String SEND_CONNECTION_NOTIFICATIONS = 
"SEND_CONNECTION_NOTIFICATIONS";
-   public static final String SEND_SESSION_NOTIFICATIONS = 
"SEND_SESSION_NOTIFICATIONS";
    public static final String SEND_ADDRESS_NOTIFICATIONS = 
"SEND_ADDRESS_NOTIFICATIONS";
    public static final String SEND_DELIVERED_NOTIFICATIONS = 
"SEND_DELIVERED_NOTIFICATIONS";
    public static final String SEND_EXPIRED_NOTIFICATIONS = 
"SEND_EXPIRED_NOTIFICATIONS";
 
    private boolean sendConnectionNotifications;
-   private boolean sendSessionNotifications;
    private boolean sendAddressNotifications;
    private boolean sendDeliveredNotifications;
    private boolean sendExpiredNotifications;
@@ -66,8 +63,6 @@ public class NotificationActiveMQServerPlugin implements 
ActiveMQServerPlugin {
    public void init(Map<String, String> properties) {
       sendConnectionNotifications = 
Boolean.parseBoolean(properties.getOrDefault(SEND_CONNECTION_NOTIFICATIONS,
             Boolean.FALSE.toString()));
-      sendSessionNotifications = 
Boolean.parseBoolean(properties.getOrDefault(SEND_SESSION_NOTIFICATIONS,
-            Boolean.FALSE.toString()));
       sendAddressNotifications = 
Boolean.parseBoolean(properties.getOrDefault(SEND_ADDRESS_NOTIFICATIONS,
             Boolean.FALSE.toString()));
       sendDeliveredNotifications = 
Boolean.parseBoolean(properties.getOrDefault(SEND_DELIVERED_NOTIFICATIONS,
@@ -97,16 +92,6 @@ public class NotificationActiveMQServerPlugin implements 
ActiveMQServerPlugin {
    }
 
    @Override
-   public void afterCreateSession(ServerSession session) throws 
ActiveMQException {
-      sendSessionNotification(session, CoreNotificationType.SESSION_CREATED);
-   }
-
-   @Override
-   public void afterCloseSession(ServerSession session, boolean failed) throws 
ActiveMQException {
-      sendSessionNotification(session, CoreNotificationType.SESSION_CLOSED);
-   }
-
-   @Override
    public void afterAddAddress(AddressInfo addressInfo, boolean reload) throws 
ActiveMQException {
       sendAddressNotification(addressInfo, CoreNotificationType.ADDRESS_ADDED);
    }
@@ -196,23 +181,6 @@ public class NotificationActiveMQServerPlugin implements 
ActiveMQServerPlugin {
       }
    }
 
-   private void sendSessionNotification(final ServerSession session, final 
CoreNotificationType type) {
-      final ManagementService managementService = getManagementService();
-
-      if (managementService != null && sendSessionNotifications) {
-         try {
-            final TypedProperties props = new TypedProperties();
-            
props.putSimpleStringProperty(ManagementHelper.HDR_CONNECTION_NAME, 
SimpleString.toSimpleString(session.getConnectionID().toString()));
-            props.putSimpleStringProperty(ManagementHelper.HDR_USER, 
SimpleString.toSimpleString(session.getUsername()));
-            props.putSimpleStringProperty(ManagementHelper.HDR_SESSION_NAME, 
SimpleString.toSimpleString(session.getName()));
-
-            managementService.sendNotification(new Notification(null, type, 
props));
-         } catch (Exception e) {
-            logger.warn("Error sending notification: " + type, e.getMessage(), 
e);
-         }
-      }
-   }
-
    /**
     * @return the sendConnectionNotifications
     */
@@ -228,20 +196,6 @@ public class NotificationActiveMQServerPlugin implements 
ActiveMQServerPlugin {
    }
 
    /**
-    * @return the sendSessionNotifications
-    */
-   public boolean isSendSessionNotifications() {
-      return sendSessionNotifications;
-   }
-
-   /**
-    * @param sendSessionNotifications the sendSessionNotifications to set
-    */
-   public void setSendSessionNotifications(boolean sendSessionNotifications) {
-      this.sendSessionNotifications = sendSessionNotifications;
-   }
-
-   /**
     * @return the sendDeliveredNotifications
     */
    public boolean isSendDeliveredNotifications() {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java
index ed5713e..196e939 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/NotificationTest.java
@@ -362,7 +362,6 @@ public class NotificationTest extends ActiveMQTestBase {
       NotificationActiveMQServerPlugin notificationPlugin = new 
NotificationActiveMQServerPlugin();
       notificationPlugin.setSendAddressNotifications(true);
       notificationPlugin.setSendConnectionNotifications(true);
-      notificationPlugin.setSendSessionNotifications(true);
       notificationPlugin.setSendDeliveredNotifications(true);
       notificationPlugin.setSendExpiredNotifications(true);
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 03bcddd..5b35f33 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -1120,7 +1120,7 @@ public class MQTTTest extends MQTTTestSupport {
       notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
       notClean.disconnect();
 
-      assertEquals(1, MQTTSession.getSessions().size());
+      assertEquals(1, getSessions().size());
 
       // MUST receive message from existing subscription from previous not 
clean session
       notClean = mqttNotClean.blockingConnection();
@@ -1132,7 +1132,7 @@ public class MQTTTest extends MQTTTestSupport {
       notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
       notClean.disconnect();
 
-      assertEquals(1, MQTTSession.getSessions().size());
+      assertEquals(1, getSessions().size());
 
       // MUST NOT receive message from previous not clean session as existing 
subscription should be gone
       final MQTT mqttClean = createMQTTConnection(CLIENTID, true);
@@ -1144,7 +1144,7 @@ public class MQTTTest extends MQTTTestSupport {
       clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
       clean.disconnect();
 
-      assertEquals(0, MQTTSession.getSessions().size());
+      assertEquals(0, getSessions().size());
 
       // MUST NOT receive message from previous clean session as existing 
subscription should be gone
       notClean = mqttNotClean.blockingConnection();
@@ -1153,7 +1153,7 @@ public class MQTTTest extends MQTTTestSupport {
       assertNull(msg);
       notClean.disconnect();
 
-      assertEquals(1, MQTTSession.getSessions().size());
+      assertEquals(1, getSessions().size());
    }
 
    @Test(timeout = 60 * 1000)
@@ -1167,7 +1167,7 @@ public class MQTTTest extends MQTTTestSupport {
       notClean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
       notClean.disconnect();
 
-      assertEquals(1, MQTTSession.getSessions().size());
+      assertEquals(1, getSessions().size());
 
       // MUST NOT receive message from previous not clean session even when 
creating a new subscription
       final MQTT mqttClean = createMQTTConnection(CLIENTID, true);
@@ -1179,7 +1179,7 @@ public class MQTTTest extends MQTTTestSupport {
       clean.publish(TOPIC, TOPIC.getBytes(), QoS.EXACTLY_ONCE, false);
       clean.disconnect();
 
-      assertEquals(0, MQTTSession.getSessions().size());
+      assertEquals(0, getSessions().size());
    }
 
    @Test(timeout = 60 * 1000)
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
index e49ec92..83871e3 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
@@ -28,6 +28,7 @@ import java.security.ProtectionDomain;
 import java.security.SecureRandom;
 import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -42,13 +43,18 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState;
+import org.apache.activemq.artemis.core.remoting.impl.AbstractAcceptor;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.security.Role;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import 
org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
 import org.fusesource.mqtt.client.MQTT;
@@ -366,6 +372,18 @@ public class MQTTTestSupport extends ActiveMQTestBase {
       return mqtt;
    }
 
+   public Map<String, MQTTSessionState> getSessions() {
+      Acceptor acceptor = server.getRemotingService().getAcceptor("MQTT");
+      if (acceptor instanceof AbstractAcceptor) {
+         ProtocolManager protocolManager = ((AbstractAcceptor) 
acceptor).getProtocolMap().get("MQTT");
+         if (protocolManager instanceof MQTTProtocolManager) {
+            return ((MQTTProtocolManager) protocolManager).getSessionStates();
+         }
+
+      }
+      return Collections.emptyMap();
+   }
+
    private MQTT createMQTTSslConnection(String clientId, boolean clean) throws 
Exception {
       MQTT mqtt = new MQTT();
       mqtt.setConnectAttemptsMax(1);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java
index 8caba17..19360b1 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MqttClusterRemoteSubscribeTest.java
@@ -24,6 +24,7 @@ import 
org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import 
org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import 
org.apache.activemq.artemis.tests.integration.cluster.distribution.ClusterTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
 import org.fusesource.mqtt.client.BlockingConnection;
 import org.fusesource.mqtt.client.MQTT;
 import org.fusesource.mqtt.client.Message;
@@ -43,29 +44,127 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
    }
 
    @Test
-   public void unsubscribeRemoteQueue() throws Exception {
-      final String TOPIC = "test/1/some/la";
+   public void useSameClientIdAndAnycastSubscribeRemoteQueue() throws 
Exception {
+      final String ANYCAST_TOPIC = "anycast/test/1/some/la";
+      final String subClientId = "subClientId";
+      final String pubClientId = "pubClientId";
 
-      setupServers(TOPIC);
+      setupServers(ANYCAST_TOPIC);
+
+      startServers(0, 1);
+
+      BlockingConnection subConnection1 = null;
+      BlockingConnection subConnection2 = null;
+      BlockingConnection pubConnection = null;
+      try {
+         //Waiting for resource initialization to complete
+         Thread.sleep(5000);
+         Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
+         subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", 
subClientId);
+         subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", 
subClientId);
+         pubConnection = retrieveMQTTConnection("tcp://localhost:61616", 
pubClientId);
+
+         //Waiting for the first sub connection be closed
+         assertTrue(waitConnectionClosed(subConnection1));
+
+         subConnection2.subscribe(topics);
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 1, true);
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 1, false);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 0, false);
+
+         // Publish Messages
+         String payload1 = "This is message 1";
+         String payload2 = "This is message 2";
+         String payload3 = "This is message 3";
+
+         pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
+
+         Message message1 = subConnection2.receive(5, TimeUnit.SECONDS);
+         message1.ack();
+         Message message2 = subConnection2.receive(5, TimeUnit.SECONDS);
+         message2.ack();
+         Message message3 = subConnection2.receive(5, TimeUnit.SECONDS);
+         message3.ack();
+
+         assertEquals(payload1, new String(message1.getPayload()));
+         assertEquals(payload2, new String(message2.getPayload()));
+         assertEquals(payload3, new String(message3.getPayload()));
+
+         subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC});
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 0, true);
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 0, false);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 0, false);
+
+         pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
+
+         Message message11 = subConnection2.receive(5, TimeUnit.SECONDS);
+         assertNull(message11);
+         Message message21 = subConnection2.receive(5, TimeUnit.SECONDS);
+         assertNull(message21);
+         Message message31 = subConnection2.receive(5, TimeUnit.SECONDS);
+         assertNull(message31);
+
+      } finally {
+         String[] topics = new String[]{ANYCAST_TOPIC};
+         if (subConnection1 != null && subConnection1.isConnected()) {
+            subConnection1.unsubscribe(topics);
+            subConnection1.disconnect();
+         }
+         if (subConnection2 != null && subConnection2.isConnected()) {
+            subConnection2.unsubscribe(topics);
+            subConnection2.disconnect();
+         }
+         if (pubConnection != null && pubConnection.isConnected()) {
+            pubConnection.disconnect();
+         }
+      }
+
+   }
+
+   @Test
+   public void useDiffClientIdAndAnycastSubscribeRemoteQueue() throws 
Exception {
+      final String ANYCAST_TOPIC = "anycast/test/1/some/la";
+      final String clientId1 = "clientId1";
+      final String clientId2 = "clientId2";
+
+      setupServers(ANYCAST_TOPIC);
 
       startServers(0, 1);
 
       BlockingConnection connection1 = null;
       BlockingConnection connection2 = null;
       try {
-
-         connection1 = retrieveMQTTConnection("tcp://localhost:61616");
-         connection2 = retrieveMQTTConnection("tcp://localhost:61617");
+         //Waiting for resource initialization to complete
+         Thread.sleep(5000);
+         Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
+         connection1 = retrieveMQTTConnection("tcp://localhost:61616", 
clientId1);
+         connection2 = retrieveMQTTConnection("tcp://localhost:61617", 
clientId2);
          // Subscribe to topics
-         Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)};
          connection1.subscribe(topics);
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 0, true);
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 0, false);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
+
          connection2.subscribe(topics);
 
-         waitForBindings(0, TOPIC, 1, 1, true);
-         waitForBindings(1, TOPIC, 1, 1, true);
+         waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 1, true);
 
-         waitForBindings(0, TOPIC, 1, 1, false);
-         waitForBindings(1, TOPIC, 1, 1, false);
+         waitForBindings(0, ANYCAST_TOPIC, 1, 1, false);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
 
 
          // Publish Messages
@@ -73,9 +172,9 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
          String payload2 = "This is message 2";
          String payload3 = "This is message 3";
 
-         connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, 
false);
-         connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, 
false);
-         connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, 
false);
+         connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
 
 
          Message message1 = connection1.receive(5, TimeUnit.SECONDS);
@@ -89,12 +188,17 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
          assertEquals(payload2, new String(message2.getPayload()));
          assertEquals(payload3, new String(message3.getPayload()));
 
+         connection2.unsubscribe(new String[]{ANYCAST_TOPIC});
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 0, true);
 
-         connection2.unsubscribe(new String[]{TOPIC});
+         waitForBindings(0, ANYCAST_TOPIC, 1, 0, false);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
 
-         connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, 
false);
-         connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, 
false);
-         connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, 
false);
+         connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
 
          Message message11 = connection1.receive(5, TimeUnit.SECONDS);
          message11.ack();
@@ -103,7 +207,6 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
          Message message31 = connection1.receive(5, TimeUnit.SECONDS);
          message31.ack();
 
-
          String message11String = new String(message11.getPayload());
          String message21String = new String(message21.getPayload());
          String message31String = new String(message31.getPayload());
@@ -111,14 +214,13 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
          assertTrue(payload2.equals(message11String) || 
payload2.equals(message21String) || payload2.equals(message31String) );
          assertTrue(payload3.equals(message11String) || 
payload3.equals(message21String) || payload3.equals(message31String) );
 
-
       } finally {
-         String[] topics = new String[]{TOPIC};
-         if (connection1 != null) {
+         String[] topics = new String[]{ANYCAST_TOPIC};
+         if (connection1 != null && connection1.isConnected()) {
             connection1.unsubscribe(topics);
             connection1.disconnect();
          }
-         if (connection2 != null) {
+         if (connection2 != null && connection2.isConnected()) {
             connection2.unsubscribe(topics);
             connection2.disconnect();
          }
@@ -127,29 +229,320 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
    }
 
    @Test
-   public void unsubscribeRemoteQueueWildCard() throws Exception {
-      final String TOPIC = "test/+/some/#";
+   public void useSameClientIdAndMulticastSubscribeRemoteQueue() throws 
Exception {
+      final String MULTICAST_TOPIC = "multicast/test/1/some/la";
+      final String ANYCAST_TOPIC = "anycast/test/1/some/la";
+      final String subClientId = "subClientId";
+      final String pubClientId = "pubClientId";
+
+      setupServers(ANYCAST_TOPIC);
+
+      startServers(0, 1);
+
+      BlockingConnection subConnection1 = null;
+      BlockingConnection subConnection2 = null;
+      BlockingConnection pubConnection = null;
+      try {
+         //Waiting for resource initialization to complete
+         Thread.sleep(5000);
+         Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
+         subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", 
subClientId);
+         subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", 
subClientId);
+         pubConnection = retrieveMQTTConnection("tcp://localhost:61616", 
pubClientId);
+
+         //Waiting for the first sub connection be closed
+         assertTrue(waitConnectionClosed(subConnection1));
+
+         subConnection2.subscribe(topics);
+
+         waitForBindings(0, MULTICAST_TOPIC, 0, 0, true);
+         waitForBindings(1, MULTICAST_TOPIC, 1, 1, true);
+
+         waitForBindings(0, MULTICAST_TOPIC, 1, 1, false);
+         waitForBindings(1, MULTICAST_TOPIC, 0, 0, false);
+
+         // Publish Messages
+         String payload1 = "This is message 1";
+         String payload2 = "This is message 2";
+         String payload3 = "This is message 3";
+
+         pubConnection.publish(MULTICAST_TOPIC, payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         pubConnection.publish(MULTICAST_TOPIC, payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         pubConnection.publish(MULTICAST_TOPIC, payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
+
+         Message message1 = subConnection2.receive(5, TimeUnit.SECONDS);
+         message1.ack();
+         Message message2 = subConnection2.receive(5, TimeUnit.SECONDS);
+         message2.ack();
+         Message message3 = subConnection2.receive(5, TimeUnit.SECONDS);
+         message3.ack();
+
+         assertEquals(payload1, new String(message1.getPayload()));
+         assertEquals(payload2, new String(message2.getPayload()));
+         assertEquals(payload3, new String(message3.getPayload()));
+
+         subConnection2.unsubscribe(new String[]{MULTICAST_TOPIC});
+
+         waitForBindings(0, MULTICAST_TOPIC, 0, 0, true);
+         waitForBindings(1, MULTICAST_TOPIC, 0, 0, true);
+
+         waitForBindings(0, MULTICAST_TOPIC, 0, 0, false);
+         waitForBindings(1, MULTICAST_TOPIC, 0, 0, false);
+
+         pubConnection.publish(MULTICAST_TOPIC, payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         pubConnection.publish(MULTICAST_TOPIC, payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         pubConnection.publish(MULTICAST_TOPIC, payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
+
+         Message message11 = subConnection2.receive(5, TimeUnit.SECONDS);
+         assertNull(message11);
+         Message message21 = subConnection2.receive(5, TimeUnit.SECONDS);
+         assertNull(message21);
+         Message message31 = subConnection2.receive(5, TimeUnit.SECONDS);
+         assertNull(message31);
+
+      } finally {
+         String[] topics = new String[]{MULTICAST_TOPIC};
+         if (subConnection1 != null && subConnection1.isConnected()) {
+            subConnection1.unsubscribe(topics);
+            subConnection1.disconnect();
+         }
+         if (subConnection2 != null && subConnection2.isConnected()) {
+            subConnection2.unsubscribe(topics);
+            subConnection2.disconnect();
+         }
+         if (pubConnection != null && pubConnection.isConnected()) {
+            pubConnection.disconnect();
+         }
+      }
+
+   }
+
+   @Test
+   public void useDiffClientIdAndMulticastSubscribeRemoteQueue() throws 
Exception {
+      final String MULTICAST_TOPIC = "multicast/test/1/some/la";
+      final String ANYCAST_TOPIC = "anycast/test/1/some/la";
+      final String clientId1 = "clientId1";
+      final String clientId2 = "clientId2";
 
-      setupServers(TOPIC);
+      setupServers(ANYCAST_TOPIC);
 
       startServers(0, 1);
 
       BlockingConnection connection1 = null;
       BlockingConnection connection2 = null;
       try {
+         //Waiting for resource initialization to complete
+         Thread.sleep(5000);
+         Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
+         connection1 = retrieveMQTTConnection("tcp://localhost:61616", 
clientId1);
+         connection2 = retrieveMQTTConnection("tcp://localhost:61617", 
clientId2);
+         // Subscribe to topics
+         connection1.subscribe(topics);
+
+         waitForBindings(0, MULTICAST_TOPIC, 1, 1, true);
+         waitForBindings(1, MULTICAST_TOPIC, 0, 0, true);
+
+         waitForBindings(0, MULTICAST_TOPIC, 0, 0, false);
+         waitForBindings(1, MULTICAST_TOPIC, 1, 1, false);
+
+         connection2.subscribe(topics);
+
+         waitForBindings(0, MULTICAST_TOPIC, 1, 1, true);
+         waitForBindings(1, MULTICAST_TOPIC, 1, 1, true);
+
+         waitForBindings(0, MULTICAST_TOPIC, 1, 1, false);
+         waitForBindings(1, MULTICAST_TOPIC, 1, 1, false);
+
+         // Publish Messages
+         String payload1 = "This is message 1";
+         String payload2 = "This is message 2";
+         String payload3 = "This is message 3";
+
+         connection1.publish(MULTICAST_TOPIC, payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         connection1.publish(MULTICAST_TOPIC, payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         connection1.publish(MULTICAST_TOPIC, payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
+
+         Message message11 = connection1.receive(5, TimeUnit.SECONDS);
+         message11.ack();
+         Message message12 = connection1.receive(5, TimeUnit.SECONDS);
+         message12.ack();
+         Message message13 = connection1.receive(5, TimeUnit.SECONDS);
+         message13.ack();
+
+         assertEquals(payload1, new String(message11.getPayload()));
+         assertEquals(payload2, new String(message12.getPayload()));
+         assertEquals(payload3, new String(message13.getPayload()));
+
+         Message message21 = connection2.receive(5, TimeUnit.SECONDS);
+         message21.ack();
+         Message message22 = connection2.receive(5, TimeUnit.SECONDS);
+         message22.ack();
+         Message message23 = connection2.receive(5, TimeUnit.SECONDS);
+         message23.ack();
+
+         assertEquals(payload1, new String(message21.getPayload()));
+         assertEquals(payload2, new String(message22.getPayload()));
+         assertEquals(payload3, new String(message23.getPayload()));
+
+         connection2.unsubscribe(new String[]{MULTICAST_TOPIC});
+
+         waitForBindings(0, MULTICAST_TOPIC, 1, 1, true);
+         waitForBindings(1, MULTICAST_TOPIC, 0, 0, true);
+
+         waitForBindings(0, MULTICAST_TOPIC, 0, 0, false);
+         waitForBindings(1, MULTICAST_TOPIC, 1, 1, false);
+
+         connection1.publish(MULTICAST_TOPIC, payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         connection1.publish(MULTICAST_TOPIC, payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         connection1.publish(MULTICAST_TOPIC, payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
+
+         Message message31 = connection1.receive(5, TimeUnit.SECONDS);
+         message31.ack();
+         Message message32 = connection1.receive(5, TimeUnit.SECONDS);
+         message32.ack();
+         Message message33 = connection1.receive(5, TimeUnit.SECONDS);
+         message33.ack();
+
+         assertEquals(payload1, new String(message31.getPayload()));
+         assertEquals(payload2, new String(message32.getPayload()));
+         assertEquals(payload3, new String(message33.getPayload()));
+
+      } finally {
+         String[] topics = new String[]{MULTICAST_TOPIC};
+         if (connection1 != null && connection1.isConnected()) {
+            connection1.unsubscribe(topics);
+            connection1.disconnect();
+         }
+         if (connection2 != null && connection2.isConnected()) {
+            connection2.unsubscribe(topics);
+            connection2.disconnect();
+         }
+      }
+
+   }
+
+   @Test
+   public void useSameClientIdAndAnycastSubscribeRemoteQueueWildCard() throws 
Exception {
+      final String ANYCAST_TOPIC = "anycast/test/+/some/#";
+      final String subClientId = "subClientId";
+      final String pubClientId = "pubClientId";
+
+      setupServers(ANYCAST_TOPIC);
+
+      startServers(0, 1);
+
+      BlockingConnection subConnection1 = null;
+      BlockingConnection subConnection2 = null;
+      BlockingConnection pubConnection = null;
+      try {
+         //Waiting for resource initialization to complete
+         Thread.sleep(5000);
+         Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
+         subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", 
subClientId);
+         subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", 
subClientId);
+         pubConnection = retrieveMQTTConnection("tcp://localhost:61616", 
pubClientId);
+
+         //Waiting for the first sub connection be closed
+         assertTrue(waitConnectionClosed(subConnection1));
+
+         subConnection2.subscribe(topics);
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 1, true);
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 1, false);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 0, false);
+
+         // Publish Messages
+         String payload1 = "This is message 1";
+         String payload2 = "This is message 2";
+         String payload3 = "This is message 3";
+
+         pubConnection.publish("anycast/test/1/some/la", payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
+
+         Message message1 = subConnection2.receive(5, TimeUnit.SECONDS);
+         message1.ack();
+         Message message2 = subConnection2.receive(5, TimeUnit.SECONDS);
+         message2.ack();
+         Message message3 = subConnection2.receive(5, TimeUnit.SECONDS);
+         message3.ack();
+
+         assertEquals(payload1, new String(message1.getPayload()));
+         assertEquals(payload2, new String(message2.getPayload()));
+         assertEquals(payload3, new String(message3.getPayload()));
+
+         subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC});
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 0, true);
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 0, false);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 0, false);
+
+         pubConnection.publish("anycast/test/1/some/la", payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         pubConnection.publish("anycast/test/1/some/la", payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         pubConnection.publish("anycast/test/1/some/la", payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
+
+         Message message11 = subConnection2.receive(5, TimeUnit.SECONDS);
+         assertNull(message11);
+         Message message21 = subConnection2.receive(5, TimeUnit.SECONDS);
+         assertNull(message21);
+         Message message31 = subConnection2.receive(5, TimeUnit.SECONDS);
+         assertNull(message31);
+
+      } finally {
+         String[] topics = new String[]{ANYCAST_TOPIC};
+         if (subConnection1 != null && subConnection1.isConnected()) {
+            subConnection1.unsubscribe(topics);
+            subConnection1.disconnect();
+         }
+         if (subConnection2 != null && subConnection2.isConnected()) {
+            subConnection2.unsubscribe(topics);
+            subConnection2.disconnect();
+         }
+         if (pubConnection != null && pubConnection.isConnected()) {
+            pubConnection.disconnect();
+         }
+      }
+
+   }
 
-         connection1 = retrieveMQTTConnection("tcp://localhost:61616");
-         connection2 = retrieveMQTTConnection("tcp://localhost:61617");
+   @Test
+   public void useDiffClientIdAndAnycastSubscribeRemoteQueueWildCard() throws 
Exception {
+      final String ANYCAST_TOPIC = "anycast/test/+/some/#";
+      final String clientId1 = "clientId1";
+      final String clientId2 = "clientId2";
+
+      setupServers(ANYCAST_TOPIC);
+
+      startServers(0, 1);
+
+      BlockingConnection connection1 = null;
+      BlockingConnection connection2 = null;
+      try {
+         //Waiting for resource initialization to complete
+         Thread.sleep(5000);
+         Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
+         connection1 = retrieveMQTTConnection("tcp://localhost:61616", 
clientId1);
+         connection2 = retrieveMQTTConnection("tcp://localhost:61617", 
clientId2);
          // Subscribe to topics
-         Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)};
          connection1.subscribe(topics);
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 0, true);
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 0, false);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
+
          connection2.subscribe(topics);
 
-         waitForBindings(0, TOPIC, 1, 1, true);
-         waitForBindings(1, TOPIC, 1, 1, true);
+         waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 1, true);
 
-         waitForBindings(0, TOPIC, 1, 1, false);
-         waitForBindings(1, TOPIC, 1, 1, false);
+         waitForBindings(0, ANYCAST_TOPIC, 1, 1, false);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
 
 
          // Publish Messages
@@ -157,9 +550,9 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
          String payload2 = "This is message 2";
          String payload3 = "This is message 3";
 
-         connection1.publish("test/1/some/la", payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
-         connection1.publish("test/1/some/la", payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
-         connection1.publish("test/1/some/la", payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         connection1.publish("anycast/test/1/some/la", payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         connection1.publish("anycast/test/1/some/la", payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         connection1.publish("anycast/test/1/some/la", payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
 
 
          Message message1 = connection1.receive(5, TimeUnit.SECONDS);
@@ -174,11 +567,17 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
          assertEquals(payload3, new String(message3.getPayload()));
 
 
-         connection2.unsubscribe(new String[]{TOPIC});
+         connection2.unsubscribe(new String[]{ANYCAST_TOPIC});
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 0, true);
 
-         connection1.publish("test/1/some/la", payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
-         connection1.publish("test/1/some/la", payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
-         connection1.publish("test/1/some/la", payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         waitForBindings(0, ANYCAST_TOPIC, 1, 0, false);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
+
+         connection1.publish("anycast/test/1/some/la", payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         connection1.publish("anycast/test/1/some/la", payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         connection1.publish("anycast/test/1/some/la", payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
 
          Message message11 = connection1.receive(5, TimeUnit.SECONDS);
          message11.ack();
@@ -187,22 +586,215 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
          Message message31 = connection1.receive(5, TimeUnit.SECONDS);
          message31.ack();
 
+
          String message11String = new String(message11.getPayload());
          String message21String = new String(message21.getPayload());
          String message31String = new String(message31.getPayload());
+         assertTrue(payload1.equals(message11String) || 
payload1.equals(message21String) || payload1.equals(message31String) );
+         assertTrue(payload2.equals(message11String) || 
payload2.equals(message21String) || payload2.equals(message31String) );
+         assertTrue(payload3.equals(message11String) || 
payload3.equals(message21String) || payload3.equals(message31String) );
 
-         assertTrue(payload1.equals(message11String) || 
payload1.equals(message21String) || payload1.equals(message31String));
-         assertTrue(payload2.equals(message11String) || 
payload2.equals(message21String) || payload2.equals(message31String));
-         assertTrue(payload3.equals(message11String) || 
payload3.equals(message21String) || payload3.equals(message31String));
+      } finally {
+         String[] topics = new String[]{ANYCAST_TOPIC};
+         if (connection1 != null && connection1.isConnected()) {
+            connection1.unsubscribe(topics);
+            connection1.disconnect();
+         }
+         if (connection2 != null && connection2.isConnected()) {
+            connection2.unsubscribe(topics);
+            connection2.disconnect();
+         }
+      }
+
+   }
+
+   @Test
+   public void useSameClientIdAndMulticastSubscribeRemoteQueueWildCard() 
throws Exception {
+      final String MULTICAST_TOPIC = "multicast/test/+/some/#";
+      final String ANYCAST_TOPIC = "anycast/test/+/some/#";
+      final String subClientId = "subClientId";
+      final String pubClientId = "pubClientId";
+
+      setupServers(ANYCAST_TOPIC);
+
+      startServers(0, 1);
+
+      BlockingConnection subConnection1 = null;
+      BlockingConnection subConnection2 = null;
+      BlockingConnection pubConnection = null;
+      try {
+         //Waiting for resource initialization to complete
+         Thread.sleep(5000);
+         Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
+         subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", 
subClientId);
+         subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", 
subClientId);
+         pubConnection = retrieveMQTTConnection("tcp://localhost:61616", 
pubClientId);
+
+         //Waiting for the first sub connection be closed
+         assertTrue(waitConnectionClosed(subConnection1));
+
+         subConnection2.subscribe(topics);
+
+         waitForBindings(0, MULTICAST_TOPIC, 0, 0, true);
+         waitForBindings(1, MULTICAST_TOPIC, 1, 1, true);
+
+         waitForBindings(0, MULTICAST_TOPIC, 1, 1, false);
+         waitForBindings(1, MULTICAST_TOPIC, 0, 0, false);
+
+         // Publish Messages
+         String payload1 = "This is message 1";
+         String payload2 = "This is message 2";
+         String payload3 = "This is message 3";
+
+         pubConnection.publish("multicast/test/1/some/la", 
payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
+         pubConnection.publish("multicast/test/1/some/la", 
payload2.getBytes(), QoS.AT_MOST_ONCE, false);
+         pubConnection.publish("multicast/test/1/some/la", 
payload3.getBytes(), QoS.AT_MOST_ONCE, false);
+
+         Message message1 = subConnection2.receive(5, TimeUnit.SECONDS);
+         message1.ack();
+         Message message2 = subConnection2.receive(5, TimeUnit.SECONDS);
+         message2.ack();
+         Message message3 = subConnection2.receive(5, TimeUnit.SECONDS);
+         message3.ack();
+
+         assertEquals(payload1, new String(message1.getPayload()));
+         assertEquals(payload2, new String(message2.getPayload()));
+         assertEquals(payload3, new String(message3.getPayload()));
+
+         subConnection2.unsubscribe(new String[]{MULTICAST_TOPIC});
+
+         waitForBindings(0, MULTICAST_TOPIC, 0, 0, true);
+         waitForBindings(1, MULTICAST_TOPIC, 0, 0, true);
+
+         waitForBindings(0, MULTICAST_TOPIC, 0, 0, false);
+         waitForBindings(1, MULTICAST_TOPIC, 0, 0, false);
+
+         pubConnection.publish("multicast/test/1/some/la", 
payload1.getBytes(), QoS.AT_LEAST_ONCE, false);
+         pubConnection.publish("multicast/test/1/some/la", 
payload2.getBytes(), QoS.AT_MOST_ONCE, false);
+         pubConnection.publish("multicast/test/1/some/la", 
payload3.getBytes(), QoS.AT_MOST_ONCE, false);
+
+         Message message11 = subConnection2.receive(5, TimeUnit.SECONDS);
+         assertNull(message11);
+         Message message21 = subConnection2.receive(5, TimeUnit.SECONDS);
+         assertNull(message21);
+         Message message31 = subConnection2.receive(5, TimeUnit.SECONDS);
+         assertNull(message31);
+
+      } finally {
+         String[] topics = new String[]{MULTICAST_TOPIC};
+         if (subConnection1 != null && subConnection1.isConnected()) {
+            subConnection1.unsubscribe(topics);
+            subConnection1.disconnect();
+         }
+         if (subConnection2 != null && subConnection2.isConnected()) {
+            subConnection2.unsubscribe(topics);
+            subConnection2.disconnect();
+         }
+         if (pubConnection != null && pubConnection.isConnected()) {
+            pubConnection.disconnect();
+         }
+      }
+
+   }
+
+   @Test
+   public void useDiffClientIdAndMulticastSubscribeRemoteQueueWildCard() 
throws Exception {
+      final String MULTICAST_TOPIC = "multicast/test/+/some/#";
+      final String ANYCAST_TOPIC = "anycast/test/+/some/#";
+      final String clientId1 = "clientId1";
+      final String clientId2 = "clientId2";
+
+      setupServers(ANYCAST_TOPIC);
+
+      startServers(0, 1);
+
+      BlockingConnection connection1 = null;
+      BlockingConnection connection2 = null;
+      try {
+         //Waiting for resource initialization to complete
+         Thread.sleep(5000);
+         Topic[] topics = {new Topic(MULTICAST_TOPIC, QoS.AT_MOST_ONCE)};
+         connection1 = retrieveMQTTConnection("tcp://localhost:61616", 
clientId1);
+         connection2 = retrieveMQTTConnection("tcp://localhost:61617", 
clientId2);
+         // Subscribe to topics
+         connection1.subscribe(topics);
+
+         waitForBindings(0, MULTICAST_TOPIC, 1, 1, true);
+         waitForBindings(1, MULTICAST_TOPIC, 0, 0, true);
+
+         waitForBindings(0, MULTICAST_TOPIC, 0, 0, false);
+         waitForBindings(1, MULTICAST_TOPIC, 1, 1, false);
+
+         connection2.subscribe(topics);
+
+         waitForBindings(0, MULTICAST_TOPIC, 1, 1, true);
+         waitForBindings(1, MULTICAST_TOPIC, 1, 1, true);
+
+         waitForBindings(0, MULTICAST_TOPIC, 1, 1, false);
+         waitForBindings(1, MULTICAST_TOPIC, 1, 1, false);
+
+         // Publish Messages
+         String payload1 = "This is message 1";
+         String payload2 = "This is message 2";
+         String payload3 = "This is message 3";
+
+         connection1.publish("multicast/test/1/some/la", payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         connection1.publish("multicast/test/1/some/la", payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         connection1.publish("multicast/test/1/some/la", payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
+
+         Message message11 = connection1.receive(5, TimeUnit.SECONDS);
+         message11.ack();
+         Message message12 = connection1.receive(5, TimeUnit.SECONDS);
+         message12.ack();
+         Message message13 = connection1.receive(5, TimeUnit.SECONDS);
+         message13.ack();
+
+         assertEquals(payload1, new String(message11.getPayload()));
+         assertEquals(payload2, new String(message12.getPayload()));
+         assertEquals(payload3, new String(message13.getPayload()));
+
+         Message message21 = connection2.receive(5, TimeUnit.SECONDS);
+         message21.ack();
+         Message message22 = connection2.receive(5, TimeUnit.SECONDS);
+         message22.ack();
+         Message message23 = connection2.receive(5, TimeUnit.SECONDS);
+         message23.ack();
+
+         assertEquals(payload1, new String(message21.getPayload()));
+         assertEquals(payload2, new String(message22.getPayload()));
+         assertEquals(payload3, new String(message23.getPayload()));
+
+         connection2.unsubscribe(new String[]{MULTICAST_TOPIC});
+
+         waitForBindings(0, MULTICAST_TOPIC, 1, 1, true);
+         waitForBindings(1, MULTICAST_TOPIC, 0, 0, true);
+
+         waitForBindings(0, MULTICAST_TOPIC, 0, 0, false);
+         waitForBindings(1, MULTICAST_TOPIC, 1, 1, false);
+
+         connection1.publish("multicast/test/1/some/la", payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         connection1.publish("multicast/test/1/some/la", payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         connection1.publish("multicast/test/1/some/la", payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
+
+         Message message31 = connection1.receive(5, TimeUnit.SECONDS);
+         message31.ack();
+         Message message32 = connection1.receive(5, TimeUnit.SECONDS);
+         message32.ack();
+         Message message33 = connection1.receive(5, TimeUnit.SECONDS);
+         message33.ack();
+
+         assertEquals(payload1, new String(message31.getPayload()));
+         assertEquals(payload2, new String(message32.getPayload()));
+         assertEquals(payload3, new String(message33.getPayload()));
 
 
       } finally {
-         String[] topics = new String[]{TOPIC};
-         if (connection1 != null) {
+         String[] topics = new String[]{MULTICAST_TOPIC};
+         if (connection1 != null && connection1.isConnected()) {
             connection1.unsubscribe(topics);
             connection1.disconnect();
          }
-         if (connection2 != null) {
+         if (connection2 != null && connection2.isConnected()) {
             connection2.unsubscribe(topics);
             connection2.disconnect();
          }
@@ -211,31 +803,39 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
    }
 
    @Test
-   public void unsubscribeRemoteQueueMultipleSubscriptions() throws Exception {
-      final String TOPIC = "test/1/some/la";
+   public void useDiffClientIdSubscribeRemoteQueueMultipleSubscriptions() 
throws Exception {
+      final String ANYCAST_TOPIC = "anycast/test/1/some/la";
       final String TOPIC2 = "sample";
+      final String clientId1 = "clientId1";
+      final String clientId2 = "clientId2";
 
-      setupServers(TOPIC);
+      setupServers(ANYCAST_TOPIC);
 
       startServers(0, 1);
 
       BlockingConnection connection1 = null;
       BlockingConnection connection2 = null;
       try {
-
-         connection1 = retrieveMQTTConnection("tcp://localhost:61616");
-         connection2 = retrieveMQTTConnection("tcp://localhost:61617");
+         //Waiting for resource initialization to complete
+         Thread.sleep(5000);
+         connection1 = retrieveMQTTConnection("tcp://localhost:61616", 
clientId1);
+         connection2 = retrieveMQTTConnection("tcp://localhost:61617", 
clientId2);
          // Subscribe to topics
-         connection1.subscribe(new Topic[]{new Topic(TOPIC, 
QoS.AT_MOST_ONCE)});
-         connection2.subscribe(new Topic[]{new Topic(TOPIC, QoS.AT_MOST_ONCE), 
new Topic(TOPIC2, QoS.AT_MOST_ONCE)});
+         connection1.subscribe(new Topic[]{new Topic(ANYCAST_TOPIC, 
QoS.AT_MOST_ONCE)});
 
-         waitForBindings(0, TOPIC, 1, 1, true);
-         waitForBindings(1, TOPIC, 1, 1, true);
+         waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 0, true);
 
-         waitForBindings(0, TOPIC, 1, 1, false);
-         waitForBindings(1, TOPIC, 1, 1, false);
+         waitForBindings(0, ANYCAST_TOPIC, 1, 0, false);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
 
+         connection2.subscribe(new Topic[]{new Topic(ANYCAST_TOPIC, 
QoS.AT_MOST_ONCE), new Topic(TOPIC2, QoS.AT_MOST_ONCE)});
 
+         waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 1, true);
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 1, false);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
 
          // Publish Messages
          String payload1 = "This is message 1";
@@ -243,9 +843,9 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
          String payload3 = "This is message 3";
          String payload4 = "This is message 4";
 
-         connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, 
false);
-         connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, 
false);
-         connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, 
false);
+         connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
          connection1.publish(TOPIC2, payload4.getBytes(), QoS.AT_MOST_ONCE, 
false);
 
 
@@ -263,11 +863,17 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
          assertEquals(payload3, new String(message3.getPayload()));
          assertEquals(payload4, new String(message4.getPayload()));
 
-         connection2.unsubscribe(new String[]{TOPIC});
+         connection2.unsubscribe(new String[]{ANYCAST_TOPIC});
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 0, true);
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 0, false);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
 
-         connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, 
false);
-         connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, 
false);
-         connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, 
false);
+         connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
          connection1.publish(TOPIC2, payload4.getBytes(), QoS.AT_MOST_ONCE, 
false);
 
          Message message11 = connection1.receive(5, TimeUnit.SECONDS);
@@ -289,12 +895,12 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
 
 
       } finally {
-         if (connection1 != null) {
-            connection1.unsubscribe(new String[]{TOPIC});
+         if (connection1 != null && connection1.isConnected()) {
+            connection1.unsubscribe(new String[]{ANYCAST_TOPIC});
             connection1.disconnect();
          }
-         if (connection2 != null) {
-            connection2.unsubscribe(new String[]{TOPIC, TOPIC2});
+         if (connection2 != null && connection2.isConnected()) {
+            connection2.unsubscribe(new String[]{ANYCAST_TOPIC, TOPIC2});
             connection2.disconnect();
          }
       }
@@ -302,33 +908,258 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
    }
 
    @Test
-   public void unsubscribeExistingQueue() throws Exception {
-      final String TOPIC = "test/1/some/la";
+   public void useSameClientIdSubscribeRemoteQueueMultipleSubscriptions() 
throws Exception {
+      final String ANYCAST_TOPIC = "anycast/test/1/some/la";
+      final String TOPIC2 = "sample";
+      final String subClientId = "subClientId";
+      final String pubClientId = "pubClientId";
+
+      setupServers(ANYCAST_TOPIC);
+
+      startServers(0, 1);
+
+      BlockingConnection subConnection1 = null;
+      BlockingConnection subConnection2 = null;
+      BlockingConnection pubConnection = null;
+      try {
+         //Waiting for resource initialization to complete
+         Thread.sleep(5000);
+         subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", 
subClientId);
+         subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", 
subClientId);
+         pubConnection = retrieveMQTTConnection("tcp://localhost:61616", 
pubClientId);
+
+         //Waiting for the first sub connection be closed
+         assertTrue(waitConnectionClosed(subConnection1));
+
+         subConnection2.subscribe(new Topic[]{new Topic(ANYCAST_TOPIC, 
QoS.AT_MOST_ONCE), new Topic(TOPIC2, QoS.AT_MOST_ONCE)});
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 1, true);
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 1, false);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 0, false);
+
+         // Publish Messages
+         String payload1 = "This is message 1";
+         String payload2 = "This is message 2";
+         String payload3 = "This is message 3";
+         String payload4 = "This is message 4";
+
+         pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         pubConnection.publish(TOPIC2, payload4.getBytes(), QoS.AT_MOST_ONCE, 
false);
+
+         Message message1 = subConnection2.receive(5, TimeUnit.SECONDS);
+         message1.ack();
+         Message message2 = subConnection2.receive(5, TimeUnit.SECONDS);
+         message2.ack();
+         Message message3 = subConnection2.receive(5, TimeUnit.SECONDS);
+         message3.ack();
+         Message message4 = subConnection2.receive(5, TimeUnit.SECONDS);
+         message4.ack();
+
+         String messageStr1 = new String(message1.getPayload());
+         String messageStr2 = new String(message2.getPayload());
+         String messageStr3 = new String(message3.getPayload());
+         String messageStr4 = new String(message4.getPayload());
+         assertTrue(payload1.equals(messageStr1) || 
payload1.equals(messageStr2) || payload1.equals(messageStr3) || 
payload1.equals(messageStr4));
+         assertTrue(payload2.equals(messageStr1) || 
payload2.equals(messageStr2) || payload2.equals(messageStr3) || 
payload2.equals(messageStr4));
+         assertTrue(payload3.equals(messageStr1) || 
payload3.equals(messageStr2) || payload3.equals(messageStr3) || 
payload3.equals(messageStr4));
+         assertTrue(payload4.equals(messageStr1) || 
payload4.equals(messageStr2) || payload4.equals(messageStr3) || 
payload4.equals(messageStr4));
+
+         subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC});
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 0, true);
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 0, false);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 0, false);
+
+         pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         pubConnection.publish(TOPIC2, payload4.getBytes(), QoS.AT_MOST_ONCE, 
false);
+
+         Message message11 = subConnection2.receive(5, TimeUnit.SECONDS);
+         message11.ack();
+         assertEquals(payload4, new String(message11.getPayload()));
+
+         Message message21 = subConnection2.receive(5, TimeUnit.SECONDS);
+         assertNull(message21);
+         Message message31 = subConnection2.receive(5, TimeUnit.SECONDS);
+         assertNull(message31);
+         Message message41 = subConnection2.receive(5, TimeUnit.SECONDS);
+         assertNull(message41);
+
+      } finally {
+         if (subConnection1 != null && subConnection1.isConnected()) {
+            subConnection1.unsubscribe(new String[]{ANYCAST_TOPIC});
+            subConnection1.disconnect();
+         }
+         if (subConnection2 != null && subConnection2.isConnected()) {
+            subConnection2.unsubscribe(new String[]{ANYCAST_TOPIC, TOPIC2});
+            subConnection2.disconnect();
+         }
+         if (pubConnection != null && pubConnection.isConnected()) {
+            pubConnection.disconnect();
+         }
+      }
+
+   }
+
+   @Test
+   public void useSameClientIdSubscribeExistingQueue() throws Exception {
+      final String ANYCAST_TOPIC = "anycast/test/1/some/la";
+      final String subClientId = "subClientId";
+      final String pubClientId = "pubClientId";
+
+      setupServers(ANYCAST_TOPIC);
+
+      startServers(0, 1);
+
+      BlockingConnection subConnection1 = null;
+      BlockingConnection subConnection2 = null;
+      BlockingConnection subConnection3 = null;
+      BlockingConnection pubConnection = null;
+      try {
+         //Waiting for resource initialization to complete
+         Thread.sleep(5000);
+         pubConnection = retrieveMQTTConnection("tcp://localhost:61616", 
pubClientId);
+
+         subConnection1 = retrieveMQTTConnection("tcp://localhost:61616", 
subClientId);
+         subConnection2 = retrieveMQTTConnection("tcp://localhost:61617", 
subClientId);
+
+         //Waiting for the first sub connection be closed
+         assertTrue(waitConnectionClosed(subConnection1));
+
+         subConnection3 = retrieveMQTTConnection("tcp://localhost:61617", 
subClientId);
+
+         //Waiting for the second sub connection be closed
+         assertTrue(waitConnectionClosed(subConnection1));
+
+         // Subscribe to topics
+         Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
+
+         subConnection3.subscribe(topics);
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 1, true);
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 1, false);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 0, false);
+
+         // Publish Messages
+         String payload1 = "This is message 1";
+         String payload2 = "This is message 2";
+         String payload3 = "This is message 3";
+         String payload4 = "This is message 4";
+
+         pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         pubConnection.publish(ANYCAST_TOPIC, payload4.getBytes(), 
QoS.AT_MOST_ONCE, false);
+
+         Message message1 = subConnection3.receive(5, TimeUnit.SECONDS);
+         message1.ack();
+         Message message2 = subConnection3.receive(5, TimeUnit.SECONDS);
+         message2.ack();
+         Message message3 = subConnection3.receive(5, TimeUnit.SECONDS);
+         message3.ack();
+         Message message4 = subConnection3.receive(5, TimeUnit.SECONDS);
+         message4.ack();
+
+         assertEquals(payload1, new String(message1.getPayload()));
+         assertEquals(payload2, new String(message2.getPayload()));
+         assertEquals(payload3, new String(message3.getPayload()));
+         assertEquals(payload4, new String(message4.getPayload()));
+
+         subConnection3.unsubscribe(new String[]{ANYCAST_TOPIC});
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 0, true);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 0, true);
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 0, false);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 0, false);
+
+         pubConnection.publish(ANYCAST_TOPIC, payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         pubConnection.publish(ANYCAST_TOPIC, payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         pubConnection.publish(ANYCAST_TOPIC, payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
+
+         Message message11 = subConnection3.receive(5, TimeUnit.SECONDS);
+         assertNull(message11);
+         Message message21 = subConnection3.receive(5, TimeUnit.SECONDS);
+         assertNull(message21);
+         Message message31 = subConnection3.receive(5, TimeUnit.SECONDS);
+         assertNull(message31);
+
+
+      } finally {
+         String[] topics = new String[]{ANYCAST_TOPIC};
+         if (subConnection1 != null && subConnection1.isConnected()) {
+            subConnection1.unsubscribe(topics);
+            subConnection1.disconnect();
+         }
+         if (subConnection2 != null && subConnection2.isConnected()) {
+            subConnection2.unsubscribe(topics);
+            subConnection2.disconnect();
+         }
+         if (subConnection3 != null && subConnection3.isConnected()) {
+            subConnection3.unsubscribe(topics);
+            subConnection3.disconnect();
+         }
+         if (pubConnection != null && pubConnection.isConnected()) {
+            pubConnection.unsubscribe(topics);
+            pubConnection.disconnect();
+         }
+      }
+
+   }
+
+   @Test
+   public void useDiffClientIdSubscribeExistingQueue() throws Exception {
+      final String ANYCAST_TOPIC = "anycast/test/1/some/la";
+      final String clientId1 = "clientId1";
+      final String clientId2 = "clientId2";
+      final String clientId3 = "clientId3";
 
-      setupServers(TOPIC);
+      setupServers(ANYCAST_TOPIC);
 
       startServers(0, 1);
       BlockingConnection connection1 = null;
       BlockingConnection connection2 = null;
       BlockingConnection connection3 = null;
       try {
-
-         connection1 = retrieveMQTTConnection("tcp://localhost:61616");
-         connection2 = retrieveMQTTConnection("tcp://localhost:61617");
-         connection3 = retrieveMQTTConnection("tcp://localhost:61617");
+         //Waiting for resource initialization to complete
+         Thread.sleep(5000);
+         connection1 = retrieveMQTTConnection("tcp://localhost:61616", 
clientId1);
+         connection2 = retrieveMQTTConnection("tcp://localhost:61617", 
clientId2);
+         connection3 = retrieveMQTTConnection("tcp://localhost:61617", 
clientId3);
          // Subscribe to topics
-         Topic[] topics = {new Topic(TOPIC, QoS.AT_MOST_ONCE)};
+         Topic[] topics = {new Topic(ANYCAST_TOPIC, QoS.AT_MOST_ONCE)};
          connection1.subscribe(topics);
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 0, true);
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 0, false);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
+
          connection2.subscribe(topics);
-         connection3.subscribe(topics);
 
+         waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 1, true);
+
+         waitForBindings(0, ANYCAST_TOPIC, 1, 1, false);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
 
-         waitForBindings(0, TOPIC, 1, 1, true);
-         waitForBindings(1, TOPIC, 1, 2, true);
+         connection3.subscribe(topics);
 
-         waitForBindings(0, TOPIC, 1, 2, false);
-         waitForBindings(1, TOPIC, 1, 1, false);
+         waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 2, true);
 
+         waitForBindings(0, ANYCAST_TOPIC, 1, 2, false);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
 
          // Publish Messages
          String payload1 = "This is message 1";
@@ -336,10 +1167,10 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
          String payload3 = "This is message 3";
          String payload4 = "This is message 4";
 
-         connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, 
false);
-         connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, 
false);
-         connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, 
false);
-         connection1.publish(TOPIC, payload4.getBytes(), QoS.AT_MOST_ONCE, 
false);
+         connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         connection1.publish(ANYCAST_TOPIC, payload4.getBytes(), 
QoS.AT_MOST_ONCE, false);
 
          Message message1 = connection1.receive(5, TimeUnit.SECONDS);
          message1.ack();
@@ -355,12 +1186,17 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
          assertEquals(payload3, new String(message3.getPayload()));
          assertEquals(payload4, new String(message4.getPayload()));
 
+         connection2.unsubscribe(new String[]{ANYCAST_TOPIC});
 
-         connection2.unsubscribe(new String[]{TOPIC});
+         waitForBindings(0, ANYCAST_TOPIC, 1, 1, true);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 1, true);
 
-         connection1.publish(TOPIC, payload1.getBytes(), QoS.AT_LEAST_ONCE, 
false);
-         connection1.publish(TOPIC, payload2.getBytes(), QoS.AT_MOST_ONCE, 
false);
-         connection1.publish(TOPIC, payload3.getBytes(), QoS.AT_MOST_ONCE, 
false);
+         waitForBindings(0, ANYCAST_TOPIC, 1, 1, false);
+         waitForBindings(1, ANYCAST_TOPIC, 1, 1, false);
+
+         connection1.publish(ANYCAST_TOPIC, payload1.getBytes(), 
QoS.AT_LEAST_ONCE, false);
+         connection1.publish(ANYCAST_TOPIC, payload2.getBytes(), 
QoS.AT_MOST_ONCE, false);
+         connection1.publish(ANYCAST_TOPIC, payload3.getBytes(), 
QoS.AT_MOST_ONCE, false);
 
          Message message11 = connection1.receive(5, TimeUnit.SECONDS);
          message11.ack();
@@ -377,16 +1213,16 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
          assertTrue(payload3.equals(message11String) || 
payload3.equals(message21String) || payload3.equals(message31String));
 
       } finally {
-         String[] topics = new String[]{TOPIC};
-         if (connection1 != null) {
+         String[] topics = new String[]{ANYCAST_TOPIC};
+         if (connection1 != null && connection1.isConnected()) {
             connection1.unsubscribe(topics);
             connection1.disconnect();
          }
-         if (connection2 != null) {
+         if (connection2 != null && connection2.isConnected()) {
             connection2.unsubscribe(topics);
             connection2.disconnect();
          }
-         if (connection3 != null) {
+         if (connection3 != null && connection3.isConnected()) {
             connection3.unsubscribe(topics);
             connection3.disconnect();
          }
@@ -395,9 +1231,12 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
 
    }
 
-   private static BlockingConnection retrieveMQTTConnection(String host) 
throws Exception {
+   private static BlockingConnection retrieveMQTTConnection(String host, 
String clientId) throws Exception {
       MQTT mqtt = new MQTT();
       mqtt.setHost(host);
+      mqtt.setClientId(clientId);
+      mqtt.setConnectAttemptsMax(0);
+      mqtt.setReconnectAttemptsMax(0);
       BlockingConnection connection = mqtt.blockingConnection();
       connection.connect();
       return connection;
@@ -450,4 +1289,7 @@ public class MqttClusterRemoteSubscribeTest extends 
ClusterTestBase {
       return wildcardConfiguration;
    }
 
+   private boolean waitConnectionClosed(BlockingConnection connection) throws 
Exception {
+      return Wait.waitFor(() -> !connection.isConnected());
+   }
 }

Reply via email to