This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 513b7826a4 ARTEMIS-4532 MQTT-to-core wildcard conversion is broken
513b7826a4 is described below

commit 513b7826a440eecbfa0f3d7000360186e35bc43d
Author: Justin Bertram <jbert...@apache.org>
AuthorDate: Wed Dec 13 10:36:11 2023 -0600

    ARTEMIS-4532 MQTT-to-core wildcard conversion is broken
    
    Currently when an MQTT topic filter contains characters from the
    configured wildcard syntax the conversion to/from this syntax breaks.
    
    For example, when using the default wildcard syntax if an MQTT topic
    filter contains a . the conversion from the MQTT wildcard syntax to the
    core wildcard syntax and back will result in the `.` being replaced with
    a `/.`.
    
    This commit fixes that plus a few other things...
    
     - Implements proper conversions to/from one WildcardConfiguration to
       another.
     - Refactors the MQTT code which invokes these conversion methods. This
       includes simplifying a lot of test code.
     - Adds lots of tests for everything.
     - Clarifies some variable naming to better distinguish between core and
       MQTT.
---
 .../core/protocol/mqtt/MQTTPublishManager.java     |  16 +--
 .../protocol/mqtt/MQTTRetainMessageManager.java    |   4 +-
 .../core/protocol/mqtt/MQTTSessionState.java       |   4 +-
 .../protocol/mqtt/MQTTSubscriptionManager.java     |  51 +++-----
 .../artemis/core/protocol/mqtt/MQTTUtil.java       | 110 +++++++++++------
 .../artemis/core/protocol/mqtt/MQTTUtilTest.java   | 110 ++++++++++++++++-
 .../core/protocol/openwire/util/OpenWireUtil.java  |  10 +-
 .../artemis/core/config/WildcardConfiguration.java | 130 ++++++++++++++++++---
 .../core/config/WildcardConfigurationTest.java     | 124 ++++++++++++++++++++
 docs/user-manual/mqtt.adoc                         |  20 +++-
 docs/user-manual/versions.adoc                     |  22 ++++
 .../artemis/tests/integration/mqtt/MQTTTest.java   |   8 +-
 .../tests/integration/mqtt/MQTTTestSupport.java    |   3 +-
 .../mqtt/MqttWildCardSubAutoCreateTest.java        |   6 +-
 .../integration/mqtt/PahoMQTTQOS2SecurityTest.java |   3 +-
 .../artemis/tests/integration/mqtt5/MQTT5Test.java |  40 +++++--
 .../tests/integration/mqtt5/MQTT5TestSupport.java  |  41 ++-----
 .../mqtt5/spec/ControlPacketFormatTests.java       |  25 ++--
 .../mqtt5/spec/MessageReceiptTests.java            |   5 +-
 .../tests/integration/mqtt5/spec/QoSTests.java     |  60 +++++-----
 .../mqtt5/spec/controlpackets/ConnectTests.java    |   4 +-
 .../mqtt5/spec/controlpackets/PublishTests.java    |  64 ++++++----
 .../controlpackets/PublishTestsWithSecurity.java   |   6 +-
 .../controlpackets/SubscribeTestsWithSecurity.java |   6 +-
 .../ssl/CertificateAuthenticationSslTests.java     |   9 +-
 25 files changed, 641 insertions(+), 240 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 5c79a53b43..eec146287d 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -216,7 +216,7 @@ public class MQTTPublishManager {
                }
             }
          }
-         String coreAddress = MQTTUtil.convertMqttTopicFilterToCore(topic, 
session.getWildcardConfiguration());
+         String coreAddress = MQTTUtil.getCoreAddressFromMqttTopic(topic, 
session.getWildcardConfiguration());
          SimpleString address = SimpleString.toSimpleString(coreAddress, 
session.getCoreMessageObjectPools().getAddressStringSimpleStringPool());
          Message serverMessage = 
MQTTUtil.createServerMessageFromByteBuf(session, address, message);
          int qos = message.fixedHeader().qosLevel().value();
@@ -392,7 +392,7 @@ public class MQTTPublishManager {
    }
 
    private boolean publishToClient(int messageId, ICoreMessage message, int 
deliveryCount, int qos, long consumerId) throws Exception {
-      String address = 
MQTTUtil.convertCoreAddressToMqttTopicFilter(message.getAddress() == null ? "" 
: message.getAddress(), session.getWildcardConfiguration());
+      String topic = MQTTUtil.getMqttTopicFromCoreAddress(message.getAddress() 
== null ? "" : message.getAddress(), session.getWildcardConfiguration());
 
       ByteBuf payload;
       switch (message.getType()) {
@@ -418,29 +418,29 @@ public class MQTTPublishManager {
 
       if (session.getVersion() == MQTTVersion.MQTT_5) {
          if (!isRetain && message.getBooleanProperty(MQTT_MESSAGE_RETAIN_KEY)) 
{
-            MqttTopicSubscription sub = 
session.getState().getSubscription(message.getAddress());
+            MqttTopicSubscription sub = 
session.getState().getSubscription(topic);
             if (sub != null && sub.option().isRetainAsPublished()) {
                isRetain = true;
             }
          }
 
          if (session.getState().getClientTopicAliasMaximum() != null) {
-            Integer alias = session.getState().getServerTopicAlias(address);
+            Integer alias = session.getState().getServerTopicAlias(topic);
             if (alias == null) {
-               alias = session.getState().addServerTopicAlias(address);
+               alias = session.getState().addServerTopicAlias(topic);
                if (alias != null) {
                   mqttProperties.add(new 
MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), alias));
                }
             } else {
                mqttProperties.add(new 
MqttProperties.IntegerProperty(TOPIC_ALIAS.value(), alias));
-               address = "";
+               topic = "";
             }
          }
       }
 
-      int remainingLength = MQTTUtil.calculateRemainingLength(address, 
mqttProperties, payload);
+      int remainingLength = MQTTUtil.calculateRemainingLength(topic, 
mqttProperties, payload);
       MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.PUBLISH, 
redelivery, MqttQoS.valueOf(qos), isRetain, remainingLength);
-      MqttPublishVariableHeader varHeader = new 
MqttPublishVariableHeader(address, messageId, mqttProperties);
+      MqttPublishVariableHeader varHeader = new 
MqttPublishVariableHeader(topic, messageId, mqttProperties);
       MqttPublishMessage publish = new MqttPublishMessage(header, varHeader, 
payload);
 
       int maxSize = session.getState().getClientMaxPacketSize();
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
index 26be4bc53f..b04f09f784 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
@@ -48,7 +48,7 @@ public class MQTTRetainMessageManager {
     * the retained queue and the previous retain message consumed to remove it 
from the queue.
     */
    void handleRetainedMessage(Message messageParameter, String address, 
boolean reset, Transaction tx) throws Exception {
-      SimpleString retainAddress = new 
SimpleString(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 address, session.getWildcardConfiguration()));
+      String retainAddress = 
MQTTUtil.getCoreRetainAddressFromMqttTopic(address, 
session.getWildcardConfiguration());
 
       Queue queue = session.getServer().locateQueue(retainAddress);
       if (queue == null) {
@@ -65,7 +65,7 @@ public class MQTTRetainMessageManager {
 
    void addRetainedMessagesToQueue(Queue queue, String address) throws 
Exception {
       // The address filter that matches all retained message queues.
-      String retainAddress = 
MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, 
address, session.getWildcardConfiguration());
+      String retainAddress = 
MQTTUtil.getCoreRetainAddressFromMqttTopic(address, 
session.getWildcardConfiguration());
       BindingQueryResult bindingQueryResult = 
session.getServerSession().executeBindingQuery(new SimpleString(retainAddress));
 
       // Iterate over all matching retain queues and add the queue
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index 2570cabcd6..ad354345ec 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -199,7 +199,7 @@ public class MQTTSessionState {
    public boolean addSubscription(MqttTopicSubscription subscription, 
WildcardConfiguration wildcardConfiguration, Integer subscriptionIdentifier) 
throws Exception {
       // synchronized to prevent race with removeSubscription
       synchronized (subscriptions) {
-         
addressMessageMap.putIfAbsent(MQTTUtil.convertMqttTopicFilterToCore(subscription.topicName(),
 wildcardConfiguration).toString(), new ConcurrentHashMap<>());
+         
addressMessageMap.putIfAbsent(MQTTUtil.getCoreAddressFromMqttTopic(subscription.topicName(),
 wildcardConfiguration), new ConcurrentHashMap<>());
 
          Pair<MqttTopicSubscription, Integer> existingSubscription = 
subscriptions.get(subscription.topicName());
          if (existingSubscription != null) {
@@ -237,7 +237,7 @@ public class MQTTSessionState {
    }
 
    public List<Integer> getMatchingSubscriptionIdentifiers(String address) {
-      address = MQTTUtil.convertCoreAddressToMqttTopicFilter(address, 
session.getServer().getConfiguration().getWildcardConfiguration());
+      address = MQTTUtil.getMqttTopicFromCoreAddress(address, 
session.getServer().getConfiguration().getWildcardConfiguration());
       List<Integer> result = null;
       for (Pair<MqttTopicSubscription, Integer> pair : subscriptions.values()) 
{
          Pattern pattern = Match.createPattern(pair.getA().topicName(), 
MQTTUtil.MQTT_WILDCARD, true);
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index 5ca6679dc0..e66c880d19 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -28,7 +28,6 @@ import io.netty.handler.codec.mqtt.MqttTopicSubscription;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.FilterConstants;
-import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
@@ -40,7 +39,6 @@ import 
org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.utils.CompositeAddress;
 
 import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.DOLLAR;
-import static org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.SLASH;
 import static 
org.apache.activemq.artemis.reader.MessageUtil.CONNECTION_ID_PROPERTY_NAME_STRING;
 
 public class MQTTSubscriptionManager {
@@ -106,11 +104,10 @@ public class MQTTSubscriptionManager {
    private void addSubscription(MqttTopicSubscription subscription, Integer 
subscriptionIdentifier, boolean initialStart) throws Exception {
       String rawTopicName = 
CompositeAddress.extractAddressName(subscription.topicName());
       String parsedTopicName = 
MQTTUtil.decomposeSharedSubscriptionTopicFilter(rawTopicName).getB();
-      int qos = subscription.qualityOfService().value();
-      String coreAddress = 
MQTTUtil.convertMqttTopicFilterToCore(parsedTopicName, 
session.getWildcardConfiguration());
-      String coreQueue = getQueueNameForTopic(rawTopicName).toString();
 
-      Queue q = createQueueForSubscription(coreAddress, coreQueue);
+      Queue q = createQueueForSubscription(rawTopicName, parsedTopicName);
+
+      int qos = subscription.qualityOfService().value();
 
       try {
          if (initialStart) {
@@ -140,16 +137,6 @@ public class MQTTSubscriptionManager {
       }
    }
 
-   private String parseTopicName(String rawTopicName) {
-      String parsedTopicName = rawTopicName;
-
-      // if using a shared subscription then parse
-      if (rawTopicName.startsWith(MQTTUtil.SHARED_SUBSCRIPTION_PREFIX)) {
-         parsedTopicName = rawTopicName.substring(rawTopicName.indexOf(SLASH, 
rawTopicName.indexOf(SLASH) + 1) + 1);
-      }
-      return parsedTopicName;
-   }
-
    synchronized void stop() throws Exception {
       for (ServerConsumer consumer : consumers.values()) {
          consumer.setStarted(false);
@@ -159,13 +146,16 @@ public class MQTTSubscriptionManager {
       }
    }
 
-   private Queue createQueueForSubscription(String address, String queueName) 
throws Exception {
+   private Queue createQueueForSubscription(String rawTopicName, String 
parsedTopicName) throws Exception {
+      String coreAddress = 
MQTTUtil.getCoreAddressFromMqttTopic(parsedTopicName, 
session.getWildcardConfiguration());
+      String coreQueue = MQTTUtil.getCoreQueueFromMqttTopic(rawTopicName, 
session.getState().getClientId(), session.getWildcardConfiguration());
+
       // check to see if a subscription queue already exists.
-      Queue q = session.getServer().locateQueue(queueName);
+      Queue q = session.getServer().locateQueue(coreQueue);
 
       // The queue does not exist so we need to create it.
       if (q == null) {
-         SimpleString sAddress = SimpleString.toSimpleString(address);
+         SimpleString sAddress = SimpleString.toSimpleString(coreAddress);
 
          // Check we can auto create queues.
          BindingQueryResult bindingQueryResult = 
session.getServerSession().executeBindingQuery(sAddress);
@@ -182,7 +172,7 @@ public class MQTTSubscriptionManager {
             addressInfo = session.getServerSession().createAddress(sAddress,
                                                                    
RoutingType.MULTICAST, true);
          }
-         return findOrCreateQueue(bindingQueryResult, addressInfo, queueName);
+         return findOrCreateQueue(bindingQueryResult, addressInfo, coreQueue);
       }
       return q;
    }
@@ -233,13 +223,13 @@ public class MQTTSubscriptionManager {
       }
    }
 
-   private void createConsumerForSubscriptionQueue(Queue queue, String topic, 
int qos, boolean noLocal, Long existingConsumerId) throws Exception {
+   private void createConsumerForSubscriptionQueue(Queue queue, String 
topicFilter, int qos, boolean noLocal, Long existingConsumerId) throws 
Exception {
       long cid = existingConsumerId != null ? existingConsumerId : 
session.getServer().getStorageManager().generateID();
 
       // for noLocal support we use the MQTT *client id* rather than the 
connection ID, but we still use the existing property name
       ServerConsumer consumer = session.getServerSession().createConsumer(cid, 
queue.getName(), noLocal ? 
SimpleString.toSimpleString(CONNECTION_ID_PROPERTY_NAME_STRING + " <> '" + 
session.getState().getClientId() + "'") : null, false, false, -1);
 
-      ServerConsumer existingConsumer = 
consumers.put(MQTTUtil.decomposeSharedSubscriptionTopicFilter(topic).getB(), 
consumer);
+      ServerConsumer existingConsumer = consumers.put(topicFilter, consumer);
       if (existingConsumer != null) {
          existingConsumer.setStarted(false);
          existingConsumer.close(false);
@@ -257,7 +247,7 @@ public class MQTTSubscriptionManager {
       synchronized (state) {
          reasonCodes = new short[topics.size()];
          for (int i = 0; i < topics.size(); i++) {
-            if (session.getState().getSubscription(topics.get(i)) == null) {
+            if (state.getSubscription(topics.get(i)) == null) {
                reasonCodes[i] = MQTTReasonCodes.NO_SUBSCRIPTION_EXISTED;
                continue;
             }
@@ -265,14 +255,14 @@ public class MQTTSubscriptionManager {
             short reasonCode = MQTTReasonCodes.SUCCESS;
 
             try {
-               session.getState().removeSubscription(topics.get(i));
+               state.removeSubscription(topics.get(i));
                ServerConsumer removed = 
consumers.remove(MQTTUtil.decomposeSharedSubscriptionTopicFilter(topics.get(i)).getB());
                if (removed != null) {
                   removed.close(false);
                   consumerQoSLevels.remove(removed.getID());
                }
 
-               SimpleString internalQueueName = 
SimpleString.toSimpleString(getQueueNameForTopic(topics.get(i)));
+               SimpleString internalQueueName = 
SimpleString.toSimpleString(MQTTUtil.getCoreQueueFromMqttTopic(topics.get(i), 
state.getClientId(), 
session.getServer().getConfiguration().getWildcardConfiguration()));
                Queue queue = 
session.getServer().locateQueue(internalQueueName);
                if (queue != null) {
                   if (queue.isConfigurationManaged()) {
@@ -296,17 +286,6 @@ public class MQTTSubscriptionManager {
       return reasonCodes;
    }
 
-   private String getQueueNameForTopic(String topic) {
-      String queueName;
-      if (MQTTUtil.isSharedSubscription(topic)) {
-         Pair<String, String> decomposed = 
MQTTUtil.decomposeSharedSubscriptionTopicFilter(topic);
-         queueName = decomposed.getA().concat(".").concat(decomposed.getB());
-      } else {
-         queueName = 
session.getState().getClientId().concat(".").concat(topic);
-      }
-      return MQTTUtil.convertMqttTopicFilterToCore(queueName, 
session.getWildcardConfiguration());
-   }
-
    /**
     * As per MQTT Spec. Subscribes this client to a number of MQTT topics.
     *
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index 976e958005..73bdd8d06a 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -53,6 +53,7 @@ import org.apache.commons.text.CaseUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.lang.invoke.MethodHandles;
+import java.util.Objects;
 
 import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE;
 import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CORRELATION_DATA;
@@ -123,7 +124,7 @@ public class MQTTUtil {
 
    public static final int TWO_BYTE_INT_MAX = Integer.decode("0xFFFF"); // 
65_535
 
-    // 
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901011
+   // 
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901011
    public static final int VARIABLE_BYTE_INT_MAX = 268_435_455;
 
    public static final int MAX_PACKET_SIZE = VARIABLE_BYTE_INT_MAX;
@@ -138,26 +139,70 @@ public class MQTTUtil {
 
    public static final int DEFAULT_MAXIMUM_PACKET_SIZE = MAX_PACKET_SIZE;
 
-   public static String convertMqttTopicFilterToCore(String filter, 
WildcardConfiguration wildcardConfiguration) {
-      return convertMqttTopicFilterToCore(null, filter, wildcardConfiguration);
-   }
+   public static final WildcardConfiguration MQTT_WILDCARD = new 
WildcardConfiguration().setDelimiter(SLASH).setAnyWords(HASH).setSingleWord(PLUS);
 
-   public static String convertMqttTopicFilterToCore(String prefixToAdd, 
String filter, WildcardConfiguration wildcardConfiguration) {
-      if (filter == null) {
-         return "";
-      }
+   /**
+    * This method takes the MQTT-related input and translates it into the 
proper name for a core subscription queue. The
+    * {@code topicFilter} may be either for a shared subscription in the 
format {@code $share/<shareName>/<topicFilter>}
+    * or a normal MQTT topic filter (e.g. {@code a/b/#}, {@code a/+/c}, {@code 
a/b/c}, etc.).
+    *
+    * @param topicFilter the MQTT topic filter
+    * @param clientId the MQTT client ID, used for normal (i.e. non-shared) 
subscriptions
+    * @param wildcardConfiguration the {@code WildcardConfiguration} governing 
the core broker
+    * @return the name of the core subscription queue based on the input
+    */
+   public static String getCoreQueueFromMqttTopic(String topicFilter, String 
clientId, WildcardConfiguration wildcardConfiguration) {
+      Objects.requireNonNull(topicFilter, "MQTT topic filter must not be 
null");
+      Objects.requireNonNull(wildcardConfiguration, "Broker wildcard 
configuration must not be null");
 
-      String converted = MQTT_WILDCARD.convert(filter, wildcardConfiguration);
-      if (prefixToAdd != null) {
-         converted = prefixToAdd + converted;
+      if (isSharedSubscription(topicFilter)) {
+         Pair<String, String> decomposed = 
decomposeSharedSubscriptionTopicFilter(topicFilter);
+         return new 
StringBuilder().append(decomposed.getA()).append(".").append(getCoreAddressFromMqttTopic(decomposed.getB(),
 wildcardConfiguration)).toString();
+      } else {
+         Objects.requireNonNull(clientId, "MQTT client ID must not be null");
+         return new 
StringBuilder().append(clientId).append(".").append(getCoreAddressFromMqttTopic(topicFilter,
 wildcardConfiguration)).toString();
       }
-      return converted;
    }
 
-   public static String convertCoreAddressToMqttTopicFilter(String address, 
WildcardConfiguration wildcardConfiguration) {
-      if (address == null) {
-         return "";
-      }
+   /**
+    * This method takes the MQTT-related input and translates it into the 
proper name for a core address. The
+    * {@code topicFilter} must be normal (i.e. non-shared). It should not be 
in the format
+    * {@code $share/<shareName>/<topicFilter>}.
+    *
+    * @param topicFilter the MQTT topic filter
+    * @param wildcardConfiguration the {@code WildcardConfiguration} governing 
the core broker
+    * @return the name of the core addres based on the input
+    */
+   public static String getCoreAddressFromMqttTopic(String topicFilter, 
WildcardConfiguration wildcardConfiguration) {
+      Objects.requireNonNull(topicFilter, "MQTT topic filter must not be 
null");
+      Objects.requireNonNull(wildcardConfiguration, "Broker wildcard 
configuration must not be null");
+
+      return MQTT_WILDCARD.convert(topicFilter, wildcardConfiguration);
+   }
+
+   /**
+    * This is exactly the same as {@link #getCoreAddressFromMqttTopic(String, 
WildcardConfiguration)} except that it
+    * also prefixes the return with
+    * {@link 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil#MQTT_RETAIN_ADDRESS_PREFIX}
+    *
+    * @param topicFilter the MQTT topic filter
+    * @param wildcardConfiguration the {@code WildcardConfiguration} governing 
the core broker
+    * @return the name of the core address based on the input, stripping
+    *         {@link 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil#MQTT_RETAIN_ADDRESS_PREFIX}
 if it exists
+    */
+   public static String getCoreRetainAddressFromMqttTopic(String topicFilter, 
WildcardConfiguration wildcardConfiguration) {
+      return MQTT_RETAIN_ADDRESS_PREFIX + 
getCoreAddressFromMqttTopic(topicFilter, wildcardConfiguration);
+   }
+
+   /**
+    *
+    * @param address the core address
+    * @param wildcardConfiguration the {@code WildcardConfiguration} governing 
the core broker
+    * @return the name of the MQTT topic based on the input
+    */
+   public static String getMqttTopicFromCoreAddress(String address, 
WildcardConfiguration wildcardConfiguration) {
+      Objects.requireNonNull(address, "Address must not be null");
+      Objects.requireNonNull(wildcardConfiguration, "Broker wildcard 
configuration must not be null");
 
       if (address.startsWith(MQTT_RETAIN_ADDRESS_PREFIX)) {
          address = address.substring(MQTT_RETAIN_ADDRESS_PREFIX.length());
@@ -166,16 +211,6 @@ public class MQTTUtil {
       return wildcardConfiguration.convert(address, MQTT_WILDCARD);
    }
 
-   public static class MQTTWildcardConfiguration extends WildcardConfiguration 
{
-      public MQTTWildcardConfiguration() {
-         setDelimiter(SLASH);
-         setSingleWord(PLUS);
-         setAnyWords(HASH);
-      }
-   }
-
-   public static final WildcardConfiguration MQTT_WILDCARD = new 
MQTTWildcardConfiguration();
-
    private static ICoreMessage createServerMessage(MQTTSession session, 
SimpleString address, MqttPublishMessage mqttPublishMessage) {
       long id = session.getServer().getStorageManager().generateID();
 
@@ -530,25 +565,30 @@ public class MQTTUtil {
       return defaultReturnValue == null ? null : defaultReturnValue;
    }
 
-
-
-   /*
-    * MQTT shared subscriptions are specified with the syntax from
-    * 
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901250:
-    *   $share/<shareName>/<topicFilter>
-    * This method takes this syntax and returns the shareName and the 
topicFilter.
+   /**
+    * MQTT shared subscriptions are specified with
+    * <a 
href="https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901250";>this
 syntax</a>.
+    *
+    * @param topicFilter String in the format {@code 
$share/<shareName>/<topicFilter>}
+    * @return {@code Pair<String, String>} with {@code shareName} and {@code 
topicFilter} respectively or {@code null}
+    *         and {@code topicFilter} if not in the shared-subscription format.
     */
    public static Pair<String, String> 
decomposeSharedSubscriptionTopicFilter(String topicFilter) {
       if (isSharedSubscription(topicFilter)) {
          int prefix = SHARED_SUBSCRIPTION_PREFIX.length();
          String shareName = topicFilter.substring(prefix, 
topicFilter.indexOf(SLASH, prefix));
          String parsedTopicName = 
topicFilter.substring(topicFilter.indexOf(SLASH, prefix) + 1);
-         return new Pair(shareName, parsedTopicName);
+         return new Pair<>(shareName, parsedTopicName);
       } else {
-         return new Pair(null, topicFilter);
+         return new Pair<>(null, topicFilter);
       }
    }
 
+   /**
+    *
+    * @param topicFilter the topic filter
+    * @return {@code true} if the input starts with {@code $share/}, {@code 
false} otherwise
+    */
    public static boolean isSharedSubscription(String topicFilter) {
       if (topicFilter.startsWith(SHARED_SUBSCRIPTION_PREFIX)) {
          return true;
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtilTest.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtilTest.java
index 910b24851e..d24666e842 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtilTest.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtilTest.java
@@ -18,13 +18,14 @@
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
 import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
 
 public class MQTTUtilTest {
-
    @Test
    public void testDecompose() {
       String shareName = RandomUtil.randomString();
@@ -34,4 +35,111 @@ public class MQTTUtilTest {
       assertEquals(shareName, decomposed.getA());
       assertEquals(topicFilter, decomposed.getB());
    }
+
+   @Test
+   public void testGetCoreQueueFromMqttTopic() {
+      assertThrows(NullPointerException.class, () -> 
MQTTUtil.getCoreQueueFromMqttTopic(null, null, null));
+      assertThrows(NullPointerException.class, () -> 
MQTTUtil.getCoreQueueFromMqttTopic(null, null, new WildcardConfiguration()));
+
+      assertThrows(NullPointerException.class, () -> 
MQTTUtil.getCoreQueueFromMqttTopic("", null, null));
+      assertThrows(NullPointerException.class, () -> 
MQTTUtil.getCoreQueueFromMqttTopic("", null, new WildcardConfiguration()));
+
+      assertThrows(NullPointerException.class, () -> 
MQTTUtil.getCoreQueueFromMqttTopic("", "", null));
+
+      assertThrows(NullPointerException.class, () -> 
MQTTUtil.getCoreQueueFromMqttTopic(null, "", null));
+      assertThrows(NullPointerException.class, () -> 
MQTTUtil.getCoreQueueFromMqttTopic(null, "", new WildcardConfiguration()));
+
+      final String clientId = RandomUtil.randomString().replace("-", "");
+
+      WildcardConfiguration defaultWildCardConfig = new 
WildcardConfiguration();
+      assertEquals(clientId + ".a.b.c", 
MQTTUtil.getCoreQueueFromMqttTopic("a/b/c", clientId, defaultWildCardConfig));
+      assertEquals(clientId + ".a.*.c", 
MQTTUtil.getCoreQueueFromMqttTopic("a/+/c", clientId, defaultWildCardConfig));
+      assertEquals(clientId + ".a.*.#", 
MQTTUtil.getCoreQueueFromMqttTopic("a/+/#", clientId, defaultWildCardConfig));
+      assertEquals(clientId + ".1\\.0.device", 
MQTTUtil.getCoreQueueFromMqttTopic("1.0/device", clientId, 
defaultWildCardConfig));
+      assertEquals(clientId + ".*", MQTTUtil.getCoreQueueFromMqttTopic("+", 
clientId, defaultWildCardConfig));
+      assertEquals(clientId + "..", MQTTUtil.getCoreQueueFromMqttTopic("/", 
clientId, defaultWildCardConfig));
+      assertEquals(clientId + ".#", MQTTUtil.getCoreQueueFromMqttTopic("#", 
clientId, defaultWildCardConfig));
+
+      WildcardConfiguration customWildCardConfig = new 
WildcardConfiguration().setDelimiter('|').setSingleWord('$').setAnyWords('!');
+      assertEquals(clientId + ".a|b|c", 
MQTTUtil.getCoreQueueFromMqttTopic("a/b/c", clientId, customWildCardConfig));
+      assertEquals(clientId + ".a|$|c", 
MQTTUtil.getCoreQueueFromMqttTopic("a/+/c", clientId, customWildCardConfig));
+      assertEquals(clientId + ".a|$|!", 
MQTTUtil.getCoreQueueFromMqttTopic("a/+/#", clientId, customWildCardConfig));
+      assertEquals(clientId + ".1.0|device", 
MQTTUtil.getCoreQueueFromMqttTopic("1.0/device", clientId, 
customWildCardConfig));
+      assertEquals(clientId + ".$", MQTTUtil.getCoreQueueFromMqttTopic("+", 
clientId, customWildCardConfig));
+      assertEquals(clientId + ".|", MQTTUtil.getCoreQueueFromMqttTopic("/", 
clientId, customWildCardConfig));
+      assertEquals(clientId + ".!", MQTTUtil.getCoreQueueFromMqttTopic("#", 
clientId, customWildCardConfig));
+   }
+
+   @Test
+   public void testGetCoreQueueFromMqttTopicWithSharedSubscription() {
+      final String clientId = RandomUtil.randomString().replace("-", "");
+
+      WildcardConfiguration defaultWildCardConfig = new 
WildcardConfiguration();
+      assertEquals("shareName.a.b.c", 
MQTTUtil.getCoreQueueFromMqttTopic("$share/shareName/a/b/c", clientId, 
defaultWildCardConfig));
+
+      WildcardConfiguration customWildCardConfig = new 
WildcardConfiguration().setDelimiter('|').setSingleWord('$').setAnyWords('!');
+      assertEquals("shareName.a|b|c", 
MQTTUtil.getCoreQueueFromMqttTopic("$share/shareName/a/b/c", clientId, 
customWildCardConfig));
+
+   }
+
+   @Test
+   public void testGetCoreAddressFromMqttTopic() {
+      assertThrows(NullPointerException.class, () -> 
MQTTUtil.getCoreAddressFromMqttTopic(null, null));
+      assertThrows(NullPointerException.class, () -> 
MQTTUtil.getCoreAddressFromMqttTopic(null, new WildcardConfiguration()));
+      assertThrows(NullPointerException.class, () -> 
MQTTUtil.getCoreAddressFromMqttTopic("", null));
+
+      WildcardConfiguration defaultWildCardConfig = new 
WildcardConfiguration();
+      assertEquals("a.b.c", MQTTUtil.getCoreAddressFromMqttTopic("a/b/c", 
defaultWildCardConfig));
+      assertEquals("a.*.c", MQTTUtil.getCoreAddressFromMqttTopic("a/+/c", 
defaultWildCardConfig));
+      assertEquals("a.*.#", MQTTUtil.getCoreAddressFromMqttTopic("a/+/#", 
defaultWildCardConfig));
+      assertEquals("1\\.0.device", 
MQTTUtil.getCoreAddressFromMqttTopic("1.0/device", defaultWildCardConfig));
+      assertEquals("*", MQTTUtil.getCoreAddressFromMqttTopic("+", 
defaultWildCardConfig));
+      assertEquals(".", MQTTUtil.getCoreAddressFromMqttTopic("/", 
defaultWildCardConfig));
+      assertEquals("#", MQTTUtil.getCoreAddressFromMqttTopic("#", 
defaultWildCardConfig));
+
+      WildcardConfiguration customWildCardConfig = new 
WildcardConfiguration().setDelimiter('|').setSingleWord('$').setAnyWords('!');
+      assertEquals("a|b|c", MQTTUtil.getCoreAddressFromMqttTopic("a/b/c", 
customWildCardConfig));
+      assertEquals("a|$|c", MQTTUtil.getCoreAddressFromMqttTopic("a/+/c", 
customWildCardConfig));
+      assertEquals("a|$|!", MQTTUtil.getCoreAddressFromMqttTopic("a/+/#", 
customWildCardConfig));
+      assertEquals("1.0|device", 
MQTTUtil.getCoreAddressFromMqttTopic("1.0/device", customWildCardConfig));
+      assertEquals("$", MQTTUtil.getCoreAddressFromMqttTopic("+", 
customWildCardConfig));
+      assertEquals("|", MQTTUtil.getCoreAddressFromMqttTopic("/", 
customWildCardConfig));
+      assertEquals("!", MQTTUtil.getCoreAddressFromMqttTopic("#", 
customWildCardConfig));
+   }
+
+   @Test
+   public void testGetCoreRetainAddressFromMqttTopic() {
+      assertThrows(NullPointerException.class, () -> 
MQTTUtil.getCoreRetainAddressFromMqttTopic(null, null));
+      assertThrows(NullPointerException.class, () -> 
MQTTUtil.getCoreRetainAddressFromMqttTopic(null, new WildcardConfiguration()));
+      assertThrows(NullPointerException.class, () -> 
MQTTUtil.getCoreRetainAddressFromMqttTopic("", null));
+
+      final String retainPrefix = "$sys.mqtt.retain.";
+      WildcardConfiguration defaultWildCardConfig = new 
WildcardConfiguration();
+      assertEquals(retainPrefix + "a.b.c", 
MQTTUtil.getCoreRetainAddressFromMqttTopic("a/b/c", defaultWildCardConfig));
+   }
+
+   @Test
+   public void testGetMqttTopicFromCoreAddress() {
+      assertThrows(NullPointerException.class, () -> 
MQTTUtil.getMqttTopicFromCoreAddress(null, null));
+      assertThrows(NullPointerException.class, () -> 
MQTTUtil.getMqttTopicFromCoreAddress(null, new WildcardConfiguration()));
+      assertThrows(NullPointerException.class, () -> 
MQTTUtil.getMqttTopicFromCoreAddress("", null));
+
+      WildcardConfiguration defaultWildCardConfig = new 
WildcardConfiguration();
+      assertEquals("a/b/c", MQTTUtil.getMqttTopicFromCoreAddress("a.b.c", 
defaultWildCardConfig));
+      assertEquals("a/+/c", MQTTUtil.getMqttTopicFromCoreAddress("a.*.c", 
defaultWildCardConfig));
+      assertEquals("a/+/#", MQTTUtil.getMqttTopicFromCoreAddress("a.*.#", 
defaultWildCardConfig));
+      assertEquals("1.0/device", 
MQTTUtil.getMqttTopicFromCoreAddress("1\\.0.device", defaultWildCardConfig));
+      assertEquals("+", MQTTUtil.getMqttTopicFromCoreAddress("*", 
defaultWildCardConfig));
+      assertEquals("/", MQTTUtil.getMqttTopicFromCoreAddress(".", 
defaultWildCardConfig));
+      assertEquals("#", MQTTUtil.getMqttTopicFromCoreAddress("#", 
defaultWildCardConfig));
+
+      WildcardConfiguration customWildCardConfig = new 
WildcardConfiguration().setDelimiter('|').setSingleWord('$').setAnyWords('!');
+      assertEquals("a/b/c", MQTTUtil.getMqttTopicFromCoreAddress("a|b|c", 
customWildCardConfig));
+      assertEquals("a/+/c", MQTTUtil.getMqttTopicFromCoreAddress("a|$|c", 
customWildCardConfig));
+      assertEquals("a/+/#", MQTTUtil.getMqttTopicFromCoreAddress("a|$|!", 
customWildCardConfig));
+      assertEquals("1.0/device", 
MQTTUtil.getMqttTopicFromCoreAddress("1.0|device", customWildCardConfig));
+      assertEquals("+", MQTTUtil.getMqttTopicFromCoreAddress("$", 
customWildCardConfig));
+      assertEquals("/", MQTTUtil.getMqttTopicFromCoreAddress("|", 
customWildCardConfig));
+      assertEquals("#", MQTTUtil.getMqttTopicFromCoreAddress("!", 
customWildCardConfig));
+   }
 }
diff --git 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
index 197e130c67..074e729dd0 100644
--- 
a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
+++ 
b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/util/OpenWireUtil.java
@@ -28,15 +28,7 @@ import org.apache.activemq.command.XATransactionId;
 
 public class OpenWireUtil {
 
-   public static class OpenWireWildcardConfiguration extends 
WildcardConfiguration {
-      public OpenWireWildcardConfiguration() {
-         setDelimiter('.');
-         setSingleWord('*');
-         setAnyWords('>');
-      }
-   }
-
-   public static final WildcardConfiguration OPENWIRE_WILDCARD = new 
OpenWireWildcardConfiguration();
+   public static final WildcardConfiguration OPENWIRE_WILDCARD = new 
WildcardConfiguration().setDelimiter('.').setAnyWords('>').setSingleWord('*');
 
    public static final String SELECTOR_AWARE_OPTION = "selectorAware";
 
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java
index bdaf36ef17..3ee2f7b686 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/WildcardConfiguration.java
@@ -28,6 +28,8 @@ public class WildcardConfiguration implements Serializable {
 
    static final char DELIMITER = '.';
 
+   static final char ESCAPE = '\\';
+
    boolean routingEnabled = true;
 
    char singleWord = SINGLE_WORD;
@@ -42,19 +44,33 @@ public class WildcardConfiguration implements Serializable {
 
    String delimiterString = String.valueOf(delimiter);
 
+   String escapeString = String.valueOf(ESCAPE);
+
 
    @Override
    public boolean equals(Object o) {
-      if (this == o) return true;
-      if (!(o instanceof WildcardConfiguration)) return false;
+      if (this == o) {
+         return true;
+      }
+      if (!(o instanceof WildcardConfiguration)) {
+         return false;
+      }
 
       WildcardConfiguration that = (WildcardConfiguration) o;
 
-      if (routingEnabled != that.routingEnabled) return false;
-      if (singleWord != that.singleWord) return false;
-      if (anyWords != that.anyWords) return false;
-      return delimiter == that.delimiter;
-
+      if (routingEnabled != that.routingEnabled) {
+         return false;
+      }
+      if (singleWord != that.singleWord) {
+         return false;
+      }
+      if (anyWords != that.anyWords) {
+         return false;
+      }
+      if (delimiter != that.delimiter) {
+         return false;
+      }
+      return true;
    }
 
    @Override
@@ -80,8 +96,9 @@ public class WildcardConfiguration implements Serializable {
       return routingEnabled;
    }
 
-   public void setRoutingEnabled(boolean routingEnabled) {
+   public WildcardConfiguration setRoutingEnabled(boolean routingEnabled) {
       this.routingEnabled = routingEnabled;
+      return this;
    }
 
    public char getAnyWords() {
@@ -93,9 +110,10 @@ public class WildcardConfiguration implements Serializable {
    }
 
 
-   public void setAnyWords(char anyWords) {
+   public WildcardConfiguration setAnyWords(char anyWords) {
       this.anyWords = anyWords;
       this.anyWordsString = String.valueOf(anyWords);
+      return this;
    }
 
    public char getDelimiter() {
@@ -106,9 +124,10 @@ public class WildcardConfiguration implements Serializable 
{
       return delimiterString;
    }
 
-   public void setDelimiter(char delimiter) {
+   public WildcardConfiguration setDelimiter(char delimiter) {
       this.delimiter = delimiter;
       this.delimiterString = String.valueOf(delimiter);
+      return this;
    }
 
    public char getSingleWord() {
@@ -119,19 +138,94 @@ public class WildcardConfiguration implements 
Serializable {
       return singleWordString;
    }
 
-   public void setSingleWord(char singleWord) {
+   public WildcardConfiguration setSingleWord(char singleWord) {
       this.singleWord = singleWord;
       this.singleWordString = String.valueOf(singleWord);
+      return this;
    }
 
-   public String convert(String filter, WildcardConfiguration to) {
-      if (this.equals(to)) {
-         return filter;
+   /**
+    * Convert the input from this WildcardConfiguration into the specified 
WildcardConfiguration.
+    *
+    * If the input already contains characters defined in the target 
WildcardConfiguration then those characters will
+    * be escaped and preserved as such in the returned String. That said, 
wildcard characters which are the same
+    * between the two configurations will not be escaped
+    *
+    * If the input already contains escaped characters defined in this 
WildcardConfiguration then those characters will
+    * be unescaped after conversion and restored in the returned String.
+    *
+    * @param input  the String to convert
+    * @param target the WildcardConfiguration to convert the input into
+    * @return the converted String
+    */
+   public String convert(final String input, final WildcardConfiguration 
target) {
+      if (this.equals(target)) {
+         return input;
       } else {
-         return filter
-            .replace(getDelimiter(), to.getDelimiter())
-            .replace(getSingleWord(), to.getSingleWord())
-            .replace(getAnyWords(), to.getAnyWords());
+         boolean escaped = isEscaped(input);
+         StringBuilder result;
+         if (!escaped) {
+            result = new StringBuilder(target.escape(input, this));
+         } else {
+            result = new StringBuilder(input);
+         }
+         replaceChar(result, getDelimiter(), target.getDelimiter());
+         replaceChar(result, getSingleWord(), target.getSingleWord());
+         replaceChar(result, getAnyWords(), target.getAnyWords());
+         if (escaped) {
+            return unescape(result.toString());
+         } else {
+            return result.toString();
+         }
+      }
+   }
+
+   private String escape(final String input, WildcardConfiguration from) {
+      String result = input.replace(escapeString, escapeString + escapeString);
+      if (delimiter != from.getDelimiter()) {
+         result = result.replace(getDelimiterString(), escapeString + 
getDelimiterString());
+      }
+      if (singleWord != from.getSingleWord()) {
+         result = result.replace(getSingleWordString(), escapeString + 
getSingleWordString());
+      }
+      if (anyWords != from.getAnyWords()) {
+         result = result.replace(getAnyWordsString(), escapeString + 
getAnyWordsString());
+      }
+      return result;
+   }
+
+   private String unescape(final String input) {
+      return input
+         .replace(escapeString + escapeString, escapeString)
+         .replace(ESCAPE + getDelimiterString(), getDelimiterString())
+         .replace(ESCAPE + getSingleWordString(), getSingleWordString())
+         .replace(ESCAPE + getAnyWordsString(), getAnyWordsString());
+   }
+
+   private boolean isEscaped(final String input) {
+      for (int i = 0; i < input.length() - 1; i++) {
+         if (input.charAt(i) == ESCAPE && (input.charAt(i + 1) == 
getDelimiter() || input.charAt(i + 1) == getSingleWord() || input.charAt(i + 1) 
== getAnyWords())) {
+            return true;
+         }
+      }
+      return false;
+   }
+
+   /**
+    * This will replace one character with another while ignoring escaped 
characters (i.e. those proceeded with '\').
+    *
+    * @param result the final result of the replacement
+    * @param replace the character to replace
+    * @param replacement the replacement character to use
+    */
+   private void replaceChar(StringBuilder result, char replace, char 
replacement) {
+      if (replace == replacement) {
+         return;
+      }
+      for (int i = 0; i < result.length(); i++) {
+         if (result.charAt(i) == replace && (i == 0 || result.charAt(i - 1) != 
ESCAPE)) {
+            result.setCharAt(i, replacement);
+         }
       }
    }
 }
diff --git 
a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/WildcardConfigurationTest.java
 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/WildcardConfigurationTest.java
new file mode 100644
index 0000000000..394541b558
--- /dev/null
+++ 
b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/WildcardConfigurationTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.config;
+
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class WildcardConfigurationTest extends Assert {
+
+   private static final WildcardConfiguration MQTT_WILDCARD = new 
WildcardConfiguration().setDelimiter('/').setAnyWords('#').setSingleWord('+');
+   private static final WildcardConfiguration DEFAULT_WILDCARD = new 
WildcardConfiguration();
+
+   @Test
+   public void testDefaultWildcard() {
+      assertEquals('.', DEFAULT_WILDCARD.getDelimiter());
+      assertEquals('*', DEFAULT_WILDCARD.getSingleWord());
+      assertEquals('#', DEFAULT_WILDCARD.getAnyWords());
+   }
+
+   @Test
+   public void testToFromCoreMQTT() {
+      testToFromCoreMQTT("foo.foo", "foo/foo");
+      testToFromCoreMQTT("foo.*.foo", "foo/+/foo");
+      testToFromCoreMQTT("foo.#", "foo/#");
+      testToFromCoreMQTT("foo.*.foo.#", "foo/+/foo/#");
+      testToFromCoreMQTT("foo\\.foo.foo", "foo.foo/foo");
+   }
+
+   private void testToFromCoreMQTT(String coreAddress, String mqttTopicFilter) 
{
+      assertEquals(coreAddress, MQTT_WILDCARD.convert(mqttTopicFilter, 
DEFAULT_WILDCARD));
+      assertEquals(mqttTopicFilter, DEFAULT_WILDCARD.convert(coreAddress, 
MQTT_WILDCARD));
+   }
+
+   @Test
+   public void testEquality() {
+      WildcardConfiguration a = new 
WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c');
+      WildcardConfiguration b = new 
WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c');
+
+      assertEquals(a, b);
+      assertEquals(b, a);
+      assertEquals(a.hashCode(), b.hashCode());
+
+      String toConvert = RandomUtil.randomString();
+      assertSame(toConvert, a.convert(toConvert, b));
+      assertSame(toConvert, a.convert(toConvert, a));
+   }
+
+   @Test
+   public void testEqualityNegative() {
+      WildcardConfiguration a;
+      WildcardConfiguration b;
+
+      // none equal
+      a = new 
WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c');
+      b = new 
WildcardConfiguration().setDelimiter('x').setAnyWords('y').setSingleWord('z');
+
+      assertNotEquals(a, b);
+      assertNotEquals(b, a);
+      assertNotEquals(a.hashCode(), b.hashCode());
+
+      // only delimiter equal
+      a = new 
WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c');
+      b = new 
WildcardConfiguration().setDelimiter('a').setAnyWords('y').setSingleWord('z');
+
+      assertNotEquals(a, b);
+      assertNotEquals(b, a);
+      assertNotEquals(a.hashCode(), b.hashCode());
+
+      // only anyWords equal
+      a = new 
WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c');
+      b = new 
WildcardConfiguration().setDelimiter('x').setAnyWords('b').setSingleWord('z');
+
+      assertNotEquals(a, b);
+      assertNotEquals(b, a);
+      assertNotEquals(a.hashCode(), b.hashCode());
+
+      // only singleWord equal
+      a = new 
WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c');
+      b = new 
WildcardConfiguration().setDelimiter('x').setAnyWords('y').setSingleWord('c');
+
+      assertNotEquals(a, b);
+      assertNotEquals(b, a);
+      assertNotEquals(a.hashCode(), b.hashCode());
+
+      // only delimiter not equal
+      a = new 
WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c');
+      b = new 
WildcardConfiguration().setDelimiter('x').setAnyWords('b').setSingleWord('c');
+
+      assertNotEquals(a, b);
+      assertNotEquals(b, a);
+      assertNotEquals(a.hashCode(), b.hashCode());
+
+      // only anyWords not equal
+      a = new 
WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c');
+      b = new 
WildcardConfiguration().setDelimiter('a').setAnyWords('y').setSingleWord('c');
+
+      assertNotEquals(a, b);
+      assertNotEquals(b, a);
+      assertNotEquals(a.hashCode(), b.hashCode());
+
+      // only singleWord not equal
+      a = new 
WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('c');
+      b = new 
WildcardConfiguration().setDelimiter('a').setAnyWords('b').setSingleWord('z');
+
+      assertNotEquals(a, b);
+      assertNotEquals(b, a);
+      assertNotEquals(a.hashCode(), b.hashCode());
+   }
+}
\ No newline at end of file
diff --git a/docs/user-manual/mqtt.adoc b/docs/user-manual/mqtt.adoc
index 0c02c47969..8b74af49bc 100644
--- a/docs/user-manual/mqtt.adoc
+++ b/docs/user-manual/mqtt.adoc
@@ -129,7 +129,8 @@ If you perform some custom validation of the client ID you 
can reject the client
 
 == Wildcard subscriptions
 
-MQTT addresses are hierarchical much like a file system, and they use a 
special character (i.e. `/` by default) to separate hierarchical levels.
+MQTT defines a special wildcard syntax for topic filters. This definition is 
found in section 4.7.1 of both the 
http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc398718107[3.1.1]
 and 
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901242[5] 
specs.
+MQTT topics are hierarchical much like a file system, and they use a special 
character (i.e. `/` by default) to separate hierarchical levels.
 Subscribers are able to subscribe to specific topics or to whole branches of a 
hierarchy.
 
 To subscribe to branches of an address hierarchy a subscriber can use wild 
cards.
@@ -147,9 +148,20 @@ This can be useful, but should be done so with care since 
it has significant per
 Matches a single level in the address hierarchy.
 For example `/uk/+/stores` would match `/uk/newcastle/stores` but not 
`/uk/cities/newcastle/stores`.
 
-These MQTT-specific wildcards are automatically _translated_ into the wildcard 
syntax used by ActiveMQ Artemis.
-These wildcards are configurable.
-See the xref:wildcard-syntax.adoc#customizing-the-syntax[Wildcard Syntax] 
chapter for details about how to configure custom wildcards.
+This is _close_ to the default  
xref:wildcard-syntax.adoc#wildcard-syntax[wildcard syntax], but not exactly the 
same.
+Therefore, some conversion is necessary.
+This conversion isn't free so *if you want the best MQTT performance* use 
`broker.xml` to configure the wildcard syntax to match MQTT's, e.g.:
+
+[,xml]
+----
+<wildcard-addresses>
+   <delimiter>/</delimiter>
+   <any-words>#</any-words>
+   <single-word>*</single-word>
+</wildcard-addresses>
+----
+
+Of course, changing the default syntax also means other clients on other 
protocols will need to follow this same syntax as well as the `match` values of 
your `address-setting` configuration elements.
 
 == Web Sockets
 
diff --git a/docs/user-manual/versions.adoc b/docs/user-manual/versions.adoc
index e0be84f7c3..610d681c38 100644
--- a/docs/user-manual/versions.adoc
+++ b/docs/user-manual/versions.adoc
@@ -12,6 +12,28 @@ NOTE: If the upgrade spans multiple versions then the steps 
from *each* version
 
 NOTE: Follow the general upgrade procedure outlined in the 
xref:upgrading.adoc#upgrading-the-broker[Upgrading the Broker]  chapter in 
addition to any version-specific upgrade instructions outlined here.
 
+== 2.33.0
+
+https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315920&version=...[Full
 release notes]
+
+=== Highlights
+
+* highlight 1
+* highlight 2
+
+=== Upgrading from 2.32.0
+
+* Due to https://issues.apache.org/jira/browse/ARTEMIS-4532[ARTEMIS-4532] the 
names of addresses and queues related to MQTT topics and subscriptions 
respectively may change.
+This will only impact you if *both* of the following are true:
++
+. The broker is configured to use a xref:wildcard-syntax.adoc[wildcard syntax] 
which _doesn't match_ the xref:mqtt.adoc#wildcard-syntax[MQTT wildcard syntax] 
(e.g. the default wildcard syntax).
+. You are using characters from the broker's wildcard syntax in your MQTT 
topic name or filter.
+For example, if you were using the default wildcard syntax and an MQTT topic 
named `1.0/group/device`.
+The dot (`.`) character here is part of the broker's wildcard syntax, and it 
is being used in the name of an MQTT topic.
++
+In this case the characters from the broker's wildcard syntax that do not 
match the characters in the MQTT wildcard syntax will be escaped with a 
backslash (i.e. `\`).
+To avoid this conversion you can configure the broker to use the MQTT wildcard 
syntax or change the name of the MQTT topic name or filter.
+
 == 2.32.0
 
 
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315920&version=12353769[Full
 release notes]
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
index 91ba3d732d..4575e29aa7 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
@@ -1903,10 +1903,10 @@ public class MQTTTest extends MQTTTestSupport {
       Exception peerDisconnectedException = null;
       try {
          String clientId = "test.client";
-         String coreAddress = MQTTUtil.convertMqttTopicFilterToCore("foo/bar", 
server.getConfiguration().getWildcardConfiguration());
+         String coreAddress = MQTTUtil.getCoreAddressFromMqttTopic("foo/bar", 
server.getConfiguration().getWildcardConfiguration());
          Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", 
QoS.AT_LEAST_ONCE)};
 
-         getServer().createQueue(new QueueConfiguration(new 
SimpleString(clientId + "." + 
coreAddress)).setAddress(coreAddress).setRoutingType(RoutingType.MULTICAST).setDurable(false).setTemporary(true).setMaxConsumers(0));
+         getServer().createQueue(new 
QueueConfiguration(MQTTUtil.getCoreQueueFromMqttTopic("foo/bar", clientId, 
server.getConfiguration().getWildcardConfiguration())).setAddress(coreAddress).setRoutingType(RoutingType.MULTICAST).setDurable(false).setTemporary(true).setMaxConsumers(0));
 
          MQTT mqtt = createMQTTConnection();
          mqtt.setClientId(clientId);
@@ -2151,11 +2151,11 @@ public class MQTTTest extends MQTTTestSupport {
    @Test(timeout = 60 * 1000)
    public void testAutoDeleteRetainedQueue() throws Exception {
       final String TOPIC = "/abc/123";
-      final String RETAINED_QUEUE = 
MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX, 
TOPIC, server.getConfiguration().getWildcardConfiguration());
+      final String RETAINED_QUEUE = 
MQTTUtil.getCoreRetainAddressFromMqttTopic(TOPIC, 
server.getConfiguration().getWildcardConfiguration());
       final MQTTClientProvider publisher = getMQTTClientProvider();
       final MQTTClientProvider subscriber = getMQTTClientProvider();
 
-      
server.getAddressSettingsRepository().addMatch(MQTTUtil.convertMqttTopicFilterToCore("#",
 server.getConfiguration().getWildcardConfiguration()), new 
AddressSettings().setExpiryDelay(500L).setAutoDeleteQueues(true).setAutoDeleteAddresses(true));
+      
server.getAddressSettingsRepository().addMatch(MQTTUtil.getCoreAddressFromMqttTopic("#",
 server.getConfiguration().getWildcardConfiguration()), new 
AddressSettings().setExpiryDelay(500L).setAutoDeleteQueues(true).setAutoDeleteAddresses(true));
 
       initializeConnection(publisher);
       initializeConnection(subscriber);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java
index 491c065fe3..a15360f115 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTestSupport.java
@@ -44,6 +44,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
 import org.apache.activemq.artemis.core.remoting.impl.AbstractAcceptor;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.security.Role;
@@ -201,7 +202,7 @@ public class MQTTTestSupport extends ActiveMQTestBase {
          value.add(new Role("browser", false, false, false, false, false, 
false, false, true, false, false));
          value.add(new Role("guest", false, true, false, false, false, false, 
false, true, false, false));
          value.add(new Role("full", true, true, true, true, true, true, true, 
true, true, true));
-         securityRepository.addMatch(getQueueName(), value);
+         
securityRepository.addMatch(MQTTUtil.getCoreAddressFromMqttTopic(getQueueName(),
 server.getConfiguration().getWildcardConfiguration()), value);
 
          server.getConfiguration().setSecurityEnabled(true);
       } else {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java
index 8a6341a8b0..01d2e63b7f 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttWildCardSubAutoCreateTest.java
@@ -81,8 +81,8 @@ public class MqttWildCardSubAutoCreateTest extends 
MQTTTestSupport {
 
       String subscriberId = UUID.randomUUID().toString();
       String senderId = UUID.randomUUID().toString();
-      String subscribeTo = "A.*";
-      String publishTo = "A.a";
+      String subscribeTo = "A/+";
+      String publishTo = "A/a";
 
       subscriber = createMqttClient(subscriberId);
       subscriber.subscribe(subscribeTo, 2);
@@ -93,7 +93,7 @@ public class MqttWildCardSubAutoCreateTest extends 
MQTTTestSupport {
       sender.publish(publishTo, UUID.randomUUID().toString().getBytes(), 2, 
false);
       sender.publish(publishTo, UUID.randomUUID().toString().getBytes(), 2, 
false);
 
-      assertTrue(server.getPagingManager().getPageStore(new 
SimpleString(subscribeTo)).isPaging());
+      assertTrue(server.getPagingManager().getPageStore(new 
SimpleString(MQTTUtil.getCoreAddressFromMqttTopic(subscribeTo, 
server.getConfiguration().getWildcardConfiguration()))).isPaging());
 
       subscriber = createMqttClient(subscriberId);
       subscriber.subscribe(subscribeTo, 2);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java
index b13b2b5ef4..0af994e89c 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/PahoMQTTQOS2SecurityTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.tests.integration.mqtt;
 
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
 import org.apache.activemq.artemis.core.security.Role;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@@ -53,7 +54,7 @@ public class PahoMQTTQOS2SecurityTest extends MQTTTestSupport 
{
       HashSet<Role> value = new HashSet<>();
       value.add(new Role("addressOnly", true, true, true, true, false, false, 
false, false, true, true));
 
-      securityRepository.addMatch(getQueueName(), value);
+      
securityRepository.addMatch(MQTTUtil.getCoreAddressFromMqttTopic(getQueueName(),
 server.getConfiguration().getWildcardConfiguration()), value);
    }
 
    @Override
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index 247fdb68ae..05f7eb9b31 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
@@ -83,6 +84,27 @@ public class MQTT5Test extends MQTT5TestSupport {
       assertTrue(latch.await(500, TimeUnit.MILLISECONDS));
    }
 
+   @Test(timeout = DEFAULT_TIMEOUT)
+   public void testTopicNameEscape() throws Exception {
+      final String topic = "foo1.0/bar/baz";
+      AtomicReference<String> receivedTopic = new AtomicReference<>();
+
+      MqttClient subscriber = createPahoClient("subscriber");
+      subscriber.connect();
+      subscriber.setCallback(new DefaultMqttCallback() {
+         @Override
+         public void messageArrived(String t, MqttMessage message) {
+            receivedTopic.set(t);
+         }
+      });
+      subscriber.subscribe(topic, AT_LEAST_ONCE);
+
+      MqttClient producer = createPahoClient("producer");
+      producer.connect();
+      producer.publish(topic, "myMessage".getBytes(StandardCharsets.UTF_8), 1, 
false);
+      Wait.assertEquals(topic, receivedTopic::get, 500, 50);
+   }
+
    /*
     * Ensure that the broker adds a timestamp on the message when sending via 
MQTT
     */
@@ -333,7 +355,7 @@ public class MQTT5Test extends MQTT5TestSupport {
       consumer1.subscribe(SHARED_SUB1, 1);
 
       
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC1)));
-      Queue q1 = getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME);
+      Queue q1 = getSharedSubscriptionQueue(SHARED_SUB1);
       assertNotNull(q1);
       assertEquals(TOPIC1, q1.getAddress().toString());
       assertEquals(1, q1.getConsumerCount());
@@ -344,7 +366,7 @@ public class MQTT5Test extends MQTT5TestSupport {
       consumer2.subscribe(SHARED_SUB2, 1);
 
       
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC2)));
-      Queue q2 = getSubscriptionQueue(TOPIC2, "consumer2", SUB_NAME);
+      Queue q2 = getSharedSubscriptionQueue(SHARED_SUB2);
       assertNotNull(q2);
       assertEquals(TOPIC2, q2.getAddress().toString());
       assertEquals(1, q2.getConsumerCount());
@@ -360,10 +382,10 @@ public class MQTT5Test extends MQTT5TestSupport {
       assertTrue(ackLatch2.await(2, TimeUnit.SECONDS));
 
       consumer1.unsubscribe(SHARED_SUB1);
-      assertNull(getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME));
+      assertNull(getSharedSubscriptionQueue(SHARED_SUB1));
 
       consumer2.unsubscribe(SHARED_SUB2);
-      assertNull(getSubscriptionQueue(TOPIC2, "consumer2", SUB_NAME));
+      assertNull(getSharedSubscriptionQueue(SHARED_SUB2));
 
       consumer1.disconnect();
       consumer1.close();
@@ -388,13 +410,13 @@ public class MQTT5Test extends MQTT5TestSupport {
       consumer.subscribe(SHARED_SUBS, new int[]{1, 1});
 
       
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC1)));
-      Queue q1 = getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME);
+      Queue q1 = getSharedSubscriptionQueue(SHARED_SUBS[0]);
       assertNotNull(q1);
       assertEquals(TOPIC1, q1.getAddress().toString());
       assertEquals(1, q1.getConsumerCount());
 
       
assertNotNull(server.getAddressInfo(SimpleString.toSimpleString(TOPIC2)));
-      Queue q2 = getSubscriptionQueue(TOPIC2, "consumer1", SUB_NAME);
+      Queue q2 = getSharedSubscriptionQueue(SHARED_SUBS[1]);
       assertNotNull(q2);
       assertEquals(TOPIC2, q2.getAddress().toString());
       assertEquals(1, q2.getConsumerCount());
@@ -409,8 +431,8 @@ public class MQTT5Test extends MQTT5TestSupport {
       assertTrue(ackLatch.await(2, TimeUnit.SECONDS));
 
       consumer.unsubscribe(SHARED_SUBS);
-      assertNull(getSubscriptionQueue(TOPIC1, "consumer1", SUB_NAME));
-      assertNull(getSubscriptionQueue(TOPIC2, "consumer1", SUB_NAME));
+      assertNull(getSharedSubscriptionQueue(SHARED_SUBS[0]));
+      assertNull(getSharedSubscriptionQueue(SHARED_SUBS[1]));
 
       consumer.disconnect();
       consumer.close();
@@ -644,7 +666,7 @@ public class MQTT5Test extends MQTT5TestSupport {
       MqttClient client = createPahoClient(clientID);
       client.connect();
       client.subscribe(topic, 1);
-      Wait.assertTrue(() -> 
server.locateQueue(SimpleString.toSimpleString(clientID.concat(".").concat(topic.replace('/',
 '.')))) != null, 2000, 100);
+      Wait.assertTrue(() -> getSubscriptionQueue(topic, clientID) != null, 
2000, 100);
       client.disconnect();
       client.close();
    }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
index a4f106a2fd..e2cd5e66b9 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5TestSupport.java
@@ -37,11 +37,10 @@ import 
org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.TransportConfiguration;
 import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.postoffice.Binding;
-import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManager;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSessionState;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
 import org.apache.activemq.artemis.core.remoting.impl.AbstractAcceptor;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
 import org.apache.activemq.artemis.core.security.Role;
@@ -107,7 +106,7 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
       return new MqttAsyncClient(TCP + "://localhost:" + (isUseSsl() ? 
getSslPort() : getPort()), clientId, new MemoryPersistence());
    }
 
-   private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+   protected static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    protected static final long DEFAULT_TIMEOUT = 300000;
    protected ActiveMQServer server;
 
@@ -345,40 +344,16 @@ public class MQTT5TestSupport extends ActiveMQTestBase {
       return null;
    }
 
-   protected Queue getSubscriptionQueue(String TOPIC) {
-      try {
-         Object[] array = 
server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(TOPIC)).getBindings().toArray();
-         if (array.length == 0) {
-            return null;
-         } else {
-            return ((LocalQueueBinding)array[0]).getQueue();
-         }
-      } catch (Exception e) {
-         e.printStackTrace();
-         return null;
-      }
+   protected Queue getSharedSubscriptionQueue(String mqttTopicFilter) {
+      return getSubscriptionQueue(mqttTopicFilter, null);
    }
 
-   protected Queue getSubscriptionQueue(String TOPIC, String clientId) {
-      return getSubscriptionQueue(TOPIC, clientId, null);
+   protected Queue getSubscriptionQueue(String mqttTopicFilter, String 
clientId) {
+      return 
server.locateQueue(MQTTUtil.getCoreQueueFromMqttTopic(mqttTopicFilter, 
clientId, server.getConfiguration().getWildcardConfiguration()));
    }
 
-   protected Queue getSubscriptionQueue(String TOPIC, String clientId, String 
sharedSubscriptionName) {
-      try {
-         for (Binding b : 
server.getPostOffice().getMatchingBindings(SimpleString.toSimpleString(TOPIC))) 
{
-            if (sharedSubscriptionName != null) {
-               if 
(((LocalQueueBinding)b).getQueue().getName().startsWith(SimpleString.toSimpleString(sharedSubscriptionName)))
 {
-                  return ((LocalQueueBinding)b).getQueue();
-               }
-            } else if 
(((LocalQueueBinding)b).getQueue().getName().startsWith(SimpleString.toSimpleString(clientId)))
 {
-               return ((LocalQueueBinding)b).getQueue();
-            }
-         }
-         return null;
-      } catch (Exception e) {
-         e.printStackTrace();
-         return null;
-      }
+   protected Queue getRetainedMessageQueue(String mqttTopicFilter) {
+      return 
server.locateQueue(MQTTUtil.getCoreRetainAddressFromMqttTopic(mqttTopicFilter, 
server.getConfiguration().getWildcardConfiguration()));
    }
 
    protected void setAcceptorProperty(String property) throws Exception {
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/ControlPacketFormatTests.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/ControlPacketFormatTests.java
index 3340df263d..2113fb194a 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/ControlPacketFormatTests.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/ControlPacketFormatTests.java
@@ -55,10 +55,11 @@ public class ControlPacketFormatTests extends 
MQTT5TestSupport {
    @Test(timeout = DEFAULT_TIMEOUT)
    public void testPacketIdQoSZero() throws Exception {
       final String TOPIC = this.getTopicName();
+      final String CONSUMER_CLIENT_ID = "consumer";
       final int MESSAGE_COUNT = 100;
 
       final CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT);
-      MqttClient consumer = createPahoClient("consumer");
+      MqttClient consumer = createPahoClient(CONSUMER_CLIENT_ID);
       consumer.setCallback(new DefaultMqttCallback() {
          @Override
          public void messageArrived(String topic, MqttMessage message) throws 
Exception {
@@ -75,7 +76,7 @@ public class ControlPacketFormatTests extends 
MQTT5TestSupport {
       for (int i = 0; i < MESSAGE_COUNT; i++) {
          producer.publish(TOPIC, ("foo" + i).getBytes(), 0, false);
       }
-      Wait.assertEquals(MESSAGE_COUNT, () -> 
getSubscriptionQueue(TOPIC).getMessagesAdded());
+      Wait.assertEquals(MESSAGE_COUNT, () -> getSubscriptionQueue(TOPIC, 
CONSUMER_CLIENT_ID).getMessagesAdded());
       producer.disconnect();
       producer.close();
 
@@ -111,15 +112,15 @@ public class ControlPacketFormatTests extends 
MQTT5TestSupport {
       });
       consumer.connect();
       consumer.subscribe(TOPIC, 2);
-      Wait.assertTrue(() -> getSubscriptionQueue(TOPIC) != null);
-      Wait.assertEquals(1, () -> 
getSubscriptionQueue(TOPIC).getConsumerCount());
+      Wait.assertTrue(() -> getSubscriptionQueue(TOPIC, CONSUMER_ID) != null);
+      Wait.assertEquals(1, () -> getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getConsumerCount());
 
       MqttClient producer = createPahoClient("producer");
       producer.connect();
       for (int i = 0; i < MESSAGE_COUNT; i++) {
          producer.publish(TOPIC, ("foo" + i).getBytes(), 
(RandomUtil.randomPositiveInt() % 2) + 1, false);
       }
-      Wait.assertEquals(MESSAGE_COUNT, () -> 
getSubscriptionQueue(TOPIC).getMessagesAdded());
+      Wait.assertEquals(MESSAGE_COUNT, () -> getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessagesAdded());
       producer.disconnect();
       producer.close();
 
@@ -173,7 +174,8 @@ public class ControlPacketFormatTests extends 
MQTT5TestSupport {
       final String TOPIC = this.getTopicName();
 
       final CountDownLatch latch = new CountDownLatch(1);
-      MqttClient consumer = createPahoClient("consumer");
+      final String CONSUMER_ID = "consumer";
+      MqttClient consumer = createPahoClient(CONSUMER_ID);
       consumer.setCallback(new DefaultMqttCallback() {
          @Override
          public void messageArrived(String topic, MqttMessage message) throws 
Exception {
@@ -186,11 +188,11 @@ public class ControlPacketFormatTests extends 
MQTT5TestSupport {
       MqttClient producer = createPahoClient("producer");
       producer.connect();
       producer.publish(TOPIC, "foo".getBytes(StandardCharsets.UTF_8), 2, 
false);
-      Wait.assertEquals((long) 1, () -> 
getSubscriptionQueue(TOPIC).getMessagesAdded(), 2000, 100);
+      Wait.assertEquals((long) 1, () -> getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessagesAdded(), 2000, 100);
       producer.disconnect();
       producer.close();
 
-      Wait.assertEquals(1L, () -> 
getSubscriptionQueue(TOPIC).getMessagesAcknowledged(), 15000, 100);
+      Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessagesAcknowledged(), 15000, 100);
       assertTrue(latch.await(15, TimeUnit.SECONDS));
       Wait.assertFalse(() -> failed.get(), 2000, 100);
       Wait.assertEquals(8, () -> packetCount.get());
@@ -243,7 +245,8 @@ public class ControlPacketFormatTests extends 
MQTT5TestSupport {
       final String TOPIC = this.getTopicName();
 
       final CountDownLatch latch = new CountDownLatch(1);
-      MqttClient consumer = createPahoClient("consumer");
+      final String CONSUMER_ID = "consumer";
+      MqttClient consumer = createPahoClient(CONSUMER_ID);
       consumer.setCallback(new DefaultMqttCallback() {
          @Override
          public void messageArrived(String topic, MqttMessage message) throws 
Exception {
@@ -256,11 +259,11 @@ public class ControlPacketFormatTests extends 
MQTT5TestSupport {
       MqttClient producer = createPahoClient("producer");
       producer.connect();
       producer.publish(TOPIC, "foo".getBytes(StandardCharsets.UTF_8), 1, 
false);
-      Wait.assertEquals((long) 1, () -> 
getSubscriptionQueue(TOPIC).getMessagesAdded(), 2000, 100);
+      Wait.assertEquals((long) 1, () -> getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessagesAdded(), 2000, 100);
       producer.disconnect();
       producer.close();
 
-      Wait.assertEquals(1L, () -> 
getSubscriptionQueue(TOPIC).getMessagesAcknowledged(), 15000, 100);
+      Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessagesAcknowledged(), 15000, 100);
       assertTrue(latch.await(15, TimeUnit.SECONDS));
       Wait.assertFalse(() -> failed.get(), 2000, 100);
       Wait.assertEquals(4, () -> packetCount.get());
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/MessageReceiptTests.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/MessageReceiptTests.java
index 35b1dabeed..3798edf8a1 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/MessageReceiptTests.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/MessageReceiptTests.java
@@ -46,12 +46,13 @@ public class MessageReceiptTests extends MQTT5TestSupport {
    @Test(timeout = DEFAULT_TIMEOUT)
    public void testMessageReceipt() throws Exception {
       final String TOPIC = RandomUtil.randomString();
+      final String CONSUMER_ID = "consumer";
       final int CONSUMER_COUNT = 25;
       final MqttClient[] consumers = new MqttClient[CONSUMER_COUNT];
 
       final CountDownLatch latch = new CountDownLatch(CONSUMER_COUNT);
       for (int i = 0; i < CONSUMER_COUNT; i++) {
-         MqttClient consumer = createPahoClient(RandomUtil.randomString());
+         MqttClient consumer = createPahoClient(CONSUMER_ID + i);
          consumers[i] = consumer;
          consumer.connect();
          int finalI = i;
@@ -75,7 +76,7 @@ public class MessageReceiptTests extends MQTT5TestSupport {
       Wait.assertEquals((long) CONSUMER_COUNT, () -> {
          int totalMessagesAdded = 0;
          for (int i = 0; i < CONSUMER_COUNT; i++) {
-            totalMessagesAdded += getSubscriptionQueue(TOPIC + 
i).getMessagesAdded();
+            totalMessagesAdded += getSubscriptionQueue(TOPIC + i, CONSUMER_ID 
+ i).getMessagesAdded();
          }
          return totalMessagesAdded;
       }, 2000, 100);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/QoSTests.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/QoSTests.java
index e28c6cf54c..9e307b2de4 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/QoSTests.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/QoSTests.java
@@ -101,14 +101,15 @@ public class QoSTests extends MQTT5TestSupport {
    @Test(timeout = DEFAULT_TIMEOUT)
    public void testQoS1PubAck() throws Exception {
       final String TOPIC = RandomUtil.randomString();
+      final String CONSUMER_ID = "consumer";
       final CountDownLatch ackLatch = new CountDownLatch(1);
       final AtomicInteger packetId = new AtomicInteger();
 
       MQTTInterceptor incomingInterceptor = (packet, connection) -> {
          if (packet.fixedHeader().messageType() == MqttMessageType.PUBACK) {
             // ensure the message is still in the queue before we get the ack 
from the client
-            assertEquals(1, getSubscriptionQueue(TOPIC).getMessageCount());
-            assertEquals(1, getSubscriptionQueue(TOPIC).getDeliveringCount());
+            assertEquals(1, getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessageCount());
+            assertEquals(1, getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getDeliveringCount());
 
             // ensure the ids match so we know this is the "corresponding" 
PUBACK for the previous PUBLISH
             assertEquals(packetId.get(), 
((MqttPubReplyMessageVariableHeader)packet.variableHeader()).messageId());
@@ -129,7 +130,7 @@ public class QoSTests extends MQTT5TestSupport {
       server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor);
 
       final CountDownLatch latch = new CountDownLatch(1);
-      MqttClient consumer = createPahoClient("consumer");
+      MqttClient consumer = createPahoClient(CONSUMER_ID);
       consumer.connect();
       consumer.setCallback(new LatchedMqttCallback(latch));
       consumer.subscribe(TOPIC, 1);
@@ -142,8 +143,8 @@ public class QoSTests extends MQTT5TestSupport {
 
       assertTrue(ackLatch.await(2, TimeUnit.SECONDS));
       assertTrue(latch.await(2, TimeUnit.SECONDS));
-      assertEquals(0, getSubscriptionQueue(TOPIC).getMessageCount());
-      assertEquals(0, getSubscriptionQueue(TOPIC).getDeliveringCount());
+      assertEquals(0, getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessageCount());
+      assertEquals(0, getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getDeliveringCount());
       consumer.disconnect();
       consumer.close();
    }
@@ -241,14 +242,15 @@ public class QoSTests extends MQTT5TestSupport {
    @Test(timeout = DEFAULT_TIMEOUT)
    public void testQoS2PubRec() throws Exception {
       final String TOPIC = RandomUtil.randomString();
+      final String CONSUMER_ID = "consumer";
       final CountDownLatch ackLatch = new CountDownLatch(1);
       final AtomicInteger packetId = new AtomicInteger();
 
       MQTTInterceptor incomingInterceptor = (packet, connection) -> {
          if (packet.fixedHeader().messageType() == MqttMessageType.PUBREC) {
             // ensure the message is still in the queue before we get the ack 
from the client
-            assertEquals(1, getSubscriptionQueue(TOPIC).getMessageCount());
-            assertEquals(1, getSubscriptionQueue(TOPIC).getDeliveringCount());
+            assertEquals(1, getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessageCount());
+            assertEquals(1, getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getDeliveringCount());
 
             // ensure the ids match so we know this is the "corresponding" 
PUBREC for the previous PUBLISH
             assertEquals(packetId.get(), 
((MqttPubReplyMessageVariableHeader)packet.variableHeader()).messageId());
@@ -269,7 +271,7 @@ public class QoSTests extends MQTT5TestSupport {
       server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor);
 
       final CountDownLatch latch = new CountDownLatch(1);
-      MqttClient consumer = createPahoClient("consumer");
+      MqttClient consumer = createPahoClient(CONSUMER_ID);
       consumer.connect();
       consumer.setCallback(new LatchedMqttCallback(latch));
       consumer.subscribe(TOPIC, 2);
@@ -282,8 +284,8 @@ public class QoSTests extends MQTT5TestSupport {
 
       assertTrue(ackLatch.await(2, TimeUnit.SECONDS));
       assertTrue(latch.await(2, TimeUnit.SECONDS));
-      assertEquals(0, getSubscriptionQueue(TOPIC).getMessageCount());
-      assertEquals(0, getSubscriptionQueue(TOPIC).getDeliveringCount());
+      assertEquals(0, getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessageCount());
+      assertEquals(0, getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getDeliveringCount());
       consumer.disconnect();
       consumer.close();
    }
@@ -348,7 +350,7 @@ public class QoSTests extends MQTT5TestSupport {
    @Test(timeout = DEFAULT_TIMEOUT)
    public void testQoS2PubRel() throws Exception {
       final String TOPIC = RandomUtil.randomString();
-      final String CONSUMER_CLIENT_ID = "consumer";
+      final String CONSUMER_ID = "consumer";
       final CountDownLatch ackLatch = new CountDownLatch(1);
       final AtomicInteger packetId = new AtomicInteger();
 
@@ -356,8 +358,8 @@ public class QoSTests extends MQTT5TestSupport {
          if (packet.fixedHeader().messageType() == MqttMessageType.PUBCOMP) {
             try {
                // ensure the message is still in the management queue before 
we get the PUBCOMP from the client
-               Wait.assertEquals(1L, () -> 
server.locateQueue(MQTTUtil.MANAGEMENT_QUEUE_PREFIX + 
CONSUMER_CLIENT_ID).getMessageCount(), 2000, 100);
-               Wait.assertEquals(1L, () -> 
server.locateQueue(MQTTUtil.MANAGEMENT_QUEUE_PREFIX + 
CONSUMER_CLIENT_ID).getDeliveringCount(), 2000, 100);
+               Wait.assertEquals(1L, () -> 
server.locateQueue(MQTTUtil.MANAGEMENT_QUEUE_PREFIX + 
CONSUMER_ID).getMessageCount(), 2000, 100);
+               Wait.assertEquals(1L, () -> 
server.locateQueue(MQTTUtil.MANAGEMENT_QUEUE_PREFIX + 
CONSUMER_ID).getDeliveringCount(), 2000, 100);
             } catch (Exception e) {
                return false;
             }
@@ -381,7 +383,7 @@ public class QoSTests extends MQTT5TestSupport {
       server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor);
 
       final CountDownLatch latch = new CountDownLatch(1);
-      MqttClient consumer = createPahoClient(CONSUMER_CLIENT_ID);
+      MqttClient consumer = createPahoClient(CONSUMER_ID);
       consumer.connect();
       consumer.setCallback(new LatchedMqttCallback(latch));
       consumer.subscribe(TOPIC, 2);
@@ -394,8 +396,8 @@ public class QoSTests extends MQTT5TestSupport {
 
       assertTrue(ackLatch.await(2, TimeUnit.SECONDS));
       assertTrue(latch.await(2, TimeUnit.SECONDS));
-      assertEquals(0, getSubscriptionQueue(TOPIC).getMessageCount());
-      assertEquals(0, getSubscriptionQueue(TOPIC).getDeliveringCount());
+      assertEquals(0, getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessageCount());
+      assertEquals(0, getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getDeliveringCount());
       consumer.disconnect();
       consumer.close();
    }
@@ -412,6 +414,7 @@ public class QoSTests extends MQTT5TestSupport {
    @Test(timeout = DEFAULT_TIMEOUT)
    public void testQoS2WithExpiration() throws Exception {
       final String TOPIC = "myTopic";
+      final String CONSUMER_ID = "consumer";
       final CountDownLatch ackLatch = new CountDownLatch(1);
       final CountDownLatch expireRefsLatch = new CountDownLatch(1);
       final long messageExpiryInterval = 2;
@@ -419,12 +422,12 @@ public class QoSTests extends MQTT5TestSupport {
       MQTTInterceptor incomingInterceptor = (packet, connection) -> {
          if (packet.fixedHeader().messageType() == MqttMessageType.PUBREC) {
             // ensure the message is still in the queue before we get the 
PUBREC from the client
-            assertEquals(1, getSubscriptionQueue(TOPIC).getMessageCount());
-            assertEquals(1, getSubscriptionQueue(TOPIC).getDeliveringCount());
+            assertEquals(1, getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessageCount());
+            assertEquals(1, getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getDeliveringCount());
             try {
                // ensure enough time has passed for the message to expire
                Thread.sleep(messageExpiryInterval * 1500);
-               
getSubscriptionQueue(TOPIC).expireReferences(expireRefsLatch::countDown);
+               getSubscriptionQueue(TOPIC, 
CONSUMER_ID).expireReferences(expireRefsLatch::countDown);
                assertTrue(expireRefsLatch.await(2, TimeUnit.SECONDS));
             } catch (InterruptedException e) {
                e.printStackTrace();
@@ -438,7 +441,7 @@ public class QoSTests extends MQTT5TestSupport {
       server.getRemotingService().addIncomingInterceptor(incomingInterceptor);
 
       final CountDownLatch latch = new CountDownLatch(1);
-      MqttClient consumer = createPahoClient("consumer");
+      MqttClient consumer = createPahoClient(CONSUMER_ID);
       consumer.connect();
       consumer.setCallback(new DefaultMqttCallback() {
          @Override
@@ -462,9 +465,9 @@ public class QoSTests extends MQTT5TestSupport {
 
       assertTrue(ackLatch.await(messageExpiryInterval * 2, TimeUnit.SECONDS));
       assertTrue(latch.await(messageExpiryInterval * 2, TimeUnit.SECONDS));
-      Wait.assertEquals(0, () -> 
getSubscriptionQueue(TOPIC).getMessageCount());
-      Wait.assertEquals(0, () -> 
getSubscriptionQueue(TOPIC).getDeliveringCount());
-      Wait.assertEquals(0, () -> 
getSubscriptionQueue(TOPIC).getMessagesExpired());
+      Wait.assertEquals(0, () -> getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessageCount());
+      Wait.assertEquals(0, () -> getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getDeliveringCount());
+      Wait.assertEquals(0, () -> getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessagesExpired());
       consumer.disconnect();
       consumer.close();
    }
@@ -628,7 +631,10 @@ public class QoSTests extends MQTT5TestSupport {
    @Test(timeout = DEFAULT_TIMEOUT)
    public void testQoS2WithExpiration2() throws Exception {
       final String TOPIC = "myTopic";
-      server.createQueue(new 
QueueConfiguration(RandomUtil.randomString()).setAddress(TOPIC).setRoutingType(RoutingType.MULTICAST));
+      final String CONSUMER_ID = "consumer";
+      server.createQueue(new 
QueueConfiguration(MQTTUtil.getCoreQueueFromMqttTopic(TOPIC, CONSUMER_ID, 
server.getConfiguration().getWildcardConfiguration()))
+                            
.setAddress(MQTTUtil.getCoreAddressFromMqttTopic(TOPIC, 
server.getConfiguration().getWildcardConfiguration()))
+                            .setRoutingType(RoutingType.MULTICAST));
       final CountDownLatch ackLatch = new CountDownLatch(1);
       final CountDownLatch expireRefsLatch = new CountDownLatch(1);
       final long messageExpiryInterval = 1;
@@ -636,11 +642,11 @@ public class QoSTests extends MQTT5TestSupport {
       MQTTInterceptor outgoingInterceptor = (packet, connection) -> {
          if (packet.fixedHeader().messageType() == MqttMessageType.PUBREC) {
             // ensure the message is in the queue before trying to expire
-            Wait.assertTrue(() -> 
getSubscriptionQueue(TOPIC).getMessageCount() == 1, 2000, 100);
+            Wait.assertTrue(() -> getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessageCount() == 1, 2000, 100);
             try {
                // ensure enough time has passed for the message to expire
                Thread.sleep(messageExpiryInterval * 1500);
-               
getSubscriptionQueue(TOPIC).expireReferences(expireRefsLatch::countDown);
+               getSubscriptionQueue(TOPIC, 
CONSUMER_ID).expireReferences(expireRefsLatch::countDown);
                assertTrue(expireRefsLatch.await(2, TimeUnit.SECONDS));
             } catch (InterruptedException e) {
                e.printStackTrace();
@@ -666,6 +672,6 @@ public class QoSTests extends MQTT5TestSupport {
       producer.close();
 
       assertTrue(ackLatch.await(messageExpiryInterval * 2, TimeUnit.SECONDS));
-      Wait.assertEquals(1, () -> 
getSubscriptionQueue(TOPIC).getMessagesExpired());
+      Wait.assertEquals(1, () -> getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessagesExpired());
    }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java
index 6cea9dd86d..612882525d 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/ConnectTests.java
@@ -480,13 +480,13 @@ public class ConnectTests extends MQTT5TestSupport {
       producer.publish(TOPIC, bytes, 2, false);
       producer.disconnect();
       producer.close();
-      Wait.assertEquals(1L, () -> 
getSubscriptionQueue(TOPIC).getMessagesAdded(), 2000, 100);
+      Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessagesAdded(), 2000, 100);
 
       // the client should *not* receive the message
       assertFalse(latch.await(2, TimeUnit.SECONDS));
 
       // the broker should acknowledge the message since it exceeded the 
client's max packet size
-      Wait.assertEquals(1L, () -> 
getSubscriptionQueue(TOPIC).getMessagesAcknowledged(), 2000, 100);
+      Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessagesAcknowledged(), 2000, 100);
       consumer.disconnect();
       consumer.close();
    }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
index 655d5495ed..caa34d47c7 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTests.java
@@ -16,6 +16,7 @@
  */
 package 
org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets;
 
+import java.lang.invoke.MethodHandles;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -29,7 +30,6 @@ import io.netty.handler.codec.mqtt.MqttMessageType;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
-import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
 import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
 import org.apache.activemq.artemis.tests.util.RandomUtil;
 import org.apache.activemq.artemis.tests.util.Wait;
@@ -48,7 +48,6 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
 
 /**
  * Fulfilled by client or Netty codec (i.e. not tested here):
@@ -260,12 +259,14 @@ public class PublishTests extends MQTT5TestSupport {
       final String CONSUMER_ID = RandomUtil.randomString();
       final String TOPIC = this.getTopicName();
 
+      assertNull(getRetainedMessageQueue(TOPIC));
+
       MqttClient producer = createPahoClient("producer");
       producer.connect();
       // send first retained message
       producer.publish(TOPIC, "retain1".getBytes(), 2, true);
 
-      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
+      Wait.assertTrue(() -> getRetainedMessageQueue(TOPIC).getMessageCount() 
== 1, 2000, 100);
 
       // send second retained message; should *remove* the first
       producer.publish(TOPIC, new byte[0], 2, true);
@@ -273,7 +274,7 @@ public class PublishTests extends MQTT5TestSupport {
       producer.disconnect();
       producer.close();
 
-      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 0, 2000, 100);
+      Wait.assertTrue(() -> getRetainedMessageQueue(TOPIC).getMessageCount() 
== 0, 2000, 100);
 
       final CountDownLatch latch = new CountDownLatch(1);
       MqttClient consumer = createPahoClient(CONSUMER_ID);
@@ -302,12 +303,14 @@ public class PublishTests extends MQTT5TestSupport {
       final String RETAINED_PAYLOAD = RandomUtil.randomString();
       final String UNRETAINED_PAYLOAD = RandomUtil.randomString();
 
+      assertNull(getRetainedMessageQueue(TOPIC));
+
       MqttClient producer = createPahoClient("producer");
       producer.connect();
 
       // send retained message
       producer.publish(TOPIC, RETAINED_PAYLOAD.getBytes(), 2, true);
-      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 1000, 100);
+      Wait.assertTrue(() -> getRetainedMessageQueue(TOPIC).getMessageCount() 
== 1, 1000, 100);
 
       // send an unretained message; should *not* remove the existing retained 
message
       producer.publish(TOPIC, UNRETAINED_PAYLOAD.getBytes(), 2, false);
@@ -315,7 +318,7 @@ public class PublishTests extends MQTT5TestSupport {
       producer.disconnect();
       producer.close();
 
-      Wait.assertFalse(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() > 1, 1000, 100);
+      Wait.assertFalse(() -> getRetainedMessageQueue(TOPIC).getMessageCount() 
> 1, 1000, 100);
 
       final CountDownLatch latch = new CountDownLatch(1);
       MqttClient consumer = createPahoClient(CONSUMER_ID);
@@ -395,13 +398,17 @@ public class PublishTests extends MQTT5TestSupport {
          retainedPayloads[i] = RandomUtil.randomString();
       }
 
+      for (int i = 0; i < SUBSCRIPTION_COUNT; i++) {
+         assertNull(getRetainedMessageQueue(topicNames[i]));
+      }
+
       // send retained messages
       MqttClient producer = createPahoClient("producer");
       producer.connect();
       for (int i = 0; i < SUBSCRIPTION_COUNT; i++) {
          final String topicName = topicNames[i];
          producer.publish(topicName, retainedPayloads[i].getBytes(), 2, true);
-         Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 topicName, 
server.getConfiguration().getWildcardConfiguration())).getMessageCount() == 1, 
2000, 100);
+         Wait.assertTrue(() -> 
getRetainedMessageQueue(topicName).getMessageCount() == 1, 2000, 100);
       }
       producer.disconnect();
       producer.close();
@@ -458,12 +465,14 @@ public class PublishTests extends MQTT5TestSupport {
       final String CONSUMER_ID = RandomUtil.randomString();
       final String TOPIC = this.getTopicName();
 
+      assertNull(getRetainedMessageQueue(TOPIC));
+
       // send retained messages
       MqttClient producer = createPahoClient("producer");
       producer.connect();
       producer.publish(TOPIC, "retained".getBytes(), 2, true);
 
-      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, 
server.getConfiguration().getWildcardConfiguration())).getMessageCount() == 1, 
2000, 100);
+      Wait.assertTrue(() -> getRetainedMessageQueue(TOPIC).getMessageCount() 
== 1, 2000, 100);
       producer.disconnect();
       producer.close();
 
@@ -491,7 +500,7 @@ public class PublishTests extends MQTT5TestSupport {
       assertTrue(latch.await(2, TimeUnit.SECONDS));
 
       // ensure the retained message has been successfully acknowledge and 
removed from the subscription queue
-      Wait.assertTrue(() -> getSubscriptionQueue(TOPIC).getMessageCount() == 
0, 2000, 100);
+      Wait.assertTrue(() -> getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessageCount() == 0, 2000, 100);
 
       consumer.disconnect();
 
@@ -522,11 +531,13 @@ public class PublishTests extends MQTT5TestSupport {
       final String CONSUMER_ID = RandomUtil.randomString();
       final String TOPIC = this.getTopicName();
 
+      assertNull(getRetainedMessageQueue(TOPIC));
+
       // send first retained message
       MqttClient producer = createPahoClient("producer");
       producer.connect();
       producer.publish(TOPIC, "retained".getBytes(), 2, true);
-      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
+      Wait.assertTrue(() -> getRetainedMessageQueue(TOPIC).getMessageCount() 
== 1, 2000, 100);
       producer.disconnect();
       producer.close();
 
@@ -578,11 +589,13 @@ public class PublishTests extends MQTT5TestSupport {
       subscription.setRetainAsPublished(false);
       consumer.subscribe(new MqttSubscription[]{subscription});
 
+      assertNull(getRetainedMessageQueue(TOPIC));
+
       // send retained message
       MqttClient producer = createPahoClient("producer");
       producer.connect();
       producer.publish(TOPIC, "retained".getBytes(), 2, true);
-      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
+      Wait.assertTrue(() -> getRetainedMessageQueue(TOPIC).getMessageCount() 
== 1, 2000, 100);
       producer.disconnect();
       producer.close();
 
@@ -627,11 +640,13 @@ public class PublishTests extends MQTT5TestSupport {
       subscription.setRetainAsPublished(true);
       consumer.subscribe(new MqttSubscription[]{subscription});
 
+      assertNull(getRetainedMessageQueue(TOPIC));
+
       // send retained message
       MqttClient producer = createPahoClient("producer");
       producer.connect();
       producer.publish(TOPIC, "retained".getBytes(), 2, true);
-      Wait.assertTrue(() -> 
server.locateQueue(MQTTUtil.convertMqttTopicFilterToCore(MQTTUtil.MQTT_RETAIN_ADDRESS_PREFIX,
 TOPIC, MQTTUtil.MQTT_WILDCARD)).getMessageCount() == 1, 2000, 100);
+      Wait.assertTrue(() -> getRetainedMessageQueue(TOPIC).getMessageCount() 
== 1, 2000, 100);
       producer.disconnect();
       producer.close();
 
@@ -821,9 +836,9 @@ public class PublishTests extends MQTT5TestSupport {
       producer.disconnect();
       producer.close();
 
-      Wait.assertEquals(1L, () -> 
getSubscriptionQueue(TOPIC).getMessageCount(), 1000, 100);
+      Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessageCount(), 1000, 100);
       Wait.assertEquals(1L, () -> 
server.locateQueue("EXPIRY").getMessageCount(), 3000, 100);
-      Wait.assertEquals(0L, () -> 
getSubscriptionQueue(TOPIC).getMessageCount(), 1000, 100);
+      Wait.assertEquals(0L, () -> getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessageCount(), 1000, 100);
 
       consumer.connect(options);
       assertFalse(latch.await(1, TimeUnit.SECONDS));
@@ -874,7 +889,7 @@ public class PublishTests extends MQTT5TestSupport {
       producer.disconnect();
       producer.close();
 
-      Wait.assertEquals(1L, () -> 
getSubscriptionQueue(TOPIC).getMessageCount(), 500, 100);
+      Wait.assertEquals(1L, () -> getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessageCount(), 500, 100);
 
       Thread.sleep(SLEEP);
 
@@ -1571,7 +1586,8 @@ public class PublishTests extends MQTT5TestSupport {
       final String TOPIC = this.getTopicName();
 
       final CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT);
-      MqttAsyncClient consumer = 
createAsyncPahoClient(RandomUtil.randomString());
+      final String CONSUMER_ID = "consumer";
+      MqttAsyncClient consumer = createAsyncPahoClient(CONSUMER_ID);
       MqttConnectionOptions options = new MqttConnectionOptions();
       options.setReceiveMaximum(RECEIVE_MAXIMUM);
       consumer.connect(options).waitForCompletion();
@@ -1589,11 +1605,11 @@ public class PublishTests extends MQTT5TestSupport {
       for (int i = 0; i < MESSAGE_COUNT; i++) {
          producer.publish(TOPIC, "foo".getBytes(StandardCharsets.UTF_8), 
(RandomUtil.randomPositiveInt() % 2) + 1, false);
       }
-      Wait.assertEquals((long) MESSAGE_COUNT, () -> 
getSubscriptionQueue(TOPIC).getMessagesAdded(), 2000, 100);
+      Wait.assertEquals((long) MESSAGE_COUNT, () -> 
getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessagesAdded(), 2000, 100);
       producer.disconnect();
       producer.close();
 
-      Wait.assertEquals(0L, () -> 
getSubscriptionQueue(TOPIC).getMessageCount(), 15000, 100);
+      Wait.assertEquals(0L, () -> getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessageCount(), 15000, 100);
       assertTrue(latch.await(15, TimeUnit.SECONDS));
       assertFalse(failed.get());
       consumer.disconnect();
@@ -1633,7 +1649,8 @@ public class PublishTests extends MQTT5TestSupport {
       server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor);
 
       final CountDownLatch latch = new CountDownLatch(MESSAGE_COUNT);
-      MqttAsyncClient consumer = 
createAsyncPahoClient(RandomUtil.randomString());
+      final String CONSUMER_ID = "consumer";
+      MqttAsyncClient consumer = createAsyncPahoClient(CONSUMER_ID);
       MqttConnectionOptions options = new MqttConnectionOptions();
       options.setReceiveMaximum(RECEIVE_MAXIMUM);
       consumer.connect(options).waitForCompletion();
@@ -1651,11 +1668,11 @@ public class PublishTests extends MQTT5TestSupport {
       for (int i = 0; i < MESSAGE_COUNT; i++) {
          producer.publish(TOPIC, ("foo" + i).getBytes(StandardCharsets.UTF_8), 
0, false);
       }
-      Wait.assertEquals((long) MESSAGE_COUNT, () -> 
getSubscriptionQueue(TOPIC).getMessagesAdded(), 2000, 100);
+      Wait.assertEquals((long) MESSAGE_COUNT, () -> 
getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessagesAdded(), 2000, 100);
       producer.disconnect();
       producer.close();
 
-      Wait.assertEquals(0L, () -> 
getSubscriptionQueue(TOPIC).getMessageCount(), 8000, 100);
+      Wait.assertEquals(0L, () -> getSubscriptionQueue(TOPIC, 
CONSUMER_ID).getMessageCount(), 8000, 100);
       assertTrue(latch.await(8, TimeUnit.SECONDS));
       assertTrue(succeeded.get());
       consumer.disconnect();
@@ -1675,6 +1692,7 @@ public class PublishTests extends MQTT5TestSupport {
       final int MESSAGE_COUNT = 2;
       final int RECEIVE_MAXIMUM = 1;
       final String TOPIC = this.getTopicName();
+      final String CONSUMER_ID = "consumer";
       final AtomicBoolean messageArrived = new AtomicBoolean(false);
 
       MQTTInterceptor outgoingInterceptor = (packet, connection) -> {
@@ -1686,7 +1704,7 @@ public class PublishTests extends MQTT5TestSupport {
       server.getRemotingService().addOutgoingInterceptor(outgoingInterceptor);
 
       final CountDownLatch latch = new CountDownLatch(1);
-      MqttClient consumer = createPahoClient("consumer");
+      MqttClient consumer = createPahoClient(CONSUMER_ID);
       MqttConnectionOptions options = new MqttConnectionOptions();
       options.setReceiveMaximum(RECEIVE_MAXIMUM);
       options.setKeepAliveInterval(2);
@@ -1705,7 +1723,7 @@ public class PublishTests extends MQTT5TestSupport {
       for (int i = 0; i < MESSAGE_COUNT; i++) {
          producer.publish(TOPIC, "foo".getBytes(StandardCharsets.UTF_8), 2, 
false);
       }
-      Wait.assertEquals((long) MESSAGE_COUNT, () -> 
getSubscriptionQueue(TOPIC).getMessagesAdded(), 2000, 100);
+      Wait.assertEquals((long) MESSAGE_COUNT, () -> 
getSubscriptionQueue(TOPIC, CONSUMER_ID).getMessagesAdded(), 2000, 100);
       producer.disconnect();
       producer.close();
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java
index a00c30a9f1..e2e6961250 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java
@@ -24,6 +24,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
 import org.apache.activemq.artemis.core.security.CheckType;
 import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
 import org.apache.activemq.artemis.tests.util.RandomUtil;
@@ -76,6 +77,7 @@ public class PublishTestsWithSecurity extends 
MQTT5TestSupport {
    @Test(timeout = DEFAULT_TIMEOUT)
    public void testSendAuthorizationFailure() throws Exception {
       final String CLIENT_ID = "publisher";
+      final String TOPIC = "/foo";
       final CountDownLatch latch = new CountDownLatch(1);
       MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
          .username(createAddressUser)
@@ -91,7 +93,7 @@ public class PublishTestsWithSecurity extends 
MQTT5TestSupport {
       });
 
       try {
-         client.publish("/foo", new byte[0], 2, false);
+         client.publish(TOPIC, new byte[0], 2, false);
          fail("Publishing should have failed with a security problem");
       } catch (MqttException e) {
          assertEquals(MQTTReasonCodes.NOT_AUTHORIZED, (byte) 
e.getReasonCode());
@@ -103,7 +105,7 @@ public class PublishTestsWithSecurity extends 
MQTT5TestSupport {
 
       assertFalse(client.isConnected());
 
-      Wait.assertTrue(() -> 
server.getAddressInfo(SimpleString.toSimpleString(".foo")) != null, 2000, 100);
+      Wait.assertTrue(() -> 
server.getAddressInfo(SimpleString.toSimpleString(MQTTUtil.getCoreAddressFromMqttTopic(TOPIC,
 server.getConfiguration().getWildcardConfiguration()))) != null, 2000, 100);
    }
 
    @Test(timeout = DEFAULT_TIMEOUT)
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTestsWithSecurity.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTestsWithSecurity.java
index 9eaad18e9b..14d7de37b0 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTestsWithSecurity.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/SubscribeTestsWithSecurity.java
@@ -92,17 +92,17 @@ public class SubscribeTestsWithSecurity extends 
MQTT5TestSupport {
 
    @Test(timeout = DEFAULT_TIMEOUT)
    public void testSubscriptionQueueRemoved() throws Exception {
-      final String CLIENT_ID = "consumer";
+      final String CONSUMER_ID = "consumer";
       MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
          .username(noDeleteUser)
          .password(noDeletePass.getBytes(StandardCharsets.UTF_8))
          .build();
-      MqttClient client = createPahoClient(CLIENT_ID);
+      MqttClient client = createPahoClient(CONSUMER_ID);
       client.connect(options);
 
       client.subscribe(getTopicName(), 0).waitForCompletion();
       client.disconnect();
 
-      Wait.assertTrue(() -> getSubscriptionQueue(getTopicName()) == null, 
2000, 100);
+      Wait.assertTrue(() -> getSubscriptionQueue(getTopicName(), CONSUMER_ID) 
== null, 2000, 100);
    }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/ssl/CertificateAuthenticationSslTests.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/ssl/CertificateAuthenticationSslTests.java
index d1514b0076..2f06512d2d 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/ssl/CertificateAuthenticationSslTests.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/ssl/CertificateAuthenticationSslTests.java
@@ -92,11 +92,12 @@ public class CertificateAuthenticationSslTests extends 
MQTT5TestSupport {
     */
    @Test(timeout = DEFAULT_TIMEOUT)
    public void testSimpleSendReceive() throws Exception {
-      String topic = RandomUtil.randomString();
+      final String topic = RandomUtil.randomString();
+      final String clientId = "subscriber";
       byte[] body = RandomUtil.randomBytes(32);
 
       CountDownLatch latch = new CountDownLatch(1);
-      MqttClient subscriber = createPahoClient(protocol,"subscriber");
+      MqttClient subscriber = createPahoClient(protocol, clientId);
       subscriber.connect(getSslMqttConnectOptions());
       subscriber.setCallback(new DefaultMqttCallback() {
          @Override
@@ -107,8 +108,8 @@ public class CertificateAuthenticationSslTests extends 
MQTT5TestSupport {
       });
       subscriber.subscribe(topic, AT_LEAST_ONCE);
 
-      Wait.assertTrue(() -> getSubscriptionQueue(topic) != null, 2000, 100);
-      Wait.assertEquals(1, () -> 
getSubscriptionQueue(topic).getConsumerCount(), 2000, 100);
+      Wait.assertTrue(() -> getSubscriptionQueue(topic, clientId) != null, 
2000, 100);
+      Wait.assertEquals(1, () -> getSubscriptionQueue(topic, 
clientId).getConsumerCount(), 2000, 100);
 
       MqttClient producer = createPahoClient(protocol,"producer");
       producer.connect(getSslMqttConnectOptions());

Reply via email to