This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 1c394e4bdf ARTEMIS-5637 reduce MQTT-to-Core session ratio
1c394e4bdf is described below

commit 1c394e4bdf4cce8824503613d92ed15f705f0ac5
Author: Justin Bertram <[email protected]>
AuthorDate: Tue Aug 26 09:36:30 2025 -0500

    ARTEMIS-5637 reduce MQTT-to-Core session ratio
    
    This commit includes the following changes:
    
     - Eliminate the "internal" Core session used by MQTT.
     - Consolidate MQTT-specific logic to send a message directly to a
       queue.
     - Rename some variables in MQTTPublishManager to be more clear.
     - Refactor some methods in MQTTPublishManager to be more clear.
     - Move createPubRelMessage method from MQTTUtil to MQTTPublishManager.
     - Add new methods to ServerSession to allow creating a consumer with no
       security checks (i.e. an "internal" consumer).
     - Add a test to verify 1:1 MQTT-connection:Core-session ratio.
---
 .../core/protocol/mqtt/MQTTConnectionManager.java  |  5 +-
 .../artemis/core/protocol/mqtt/MQTTLogger.java     |  2 +-
 .../core/protocol/mqtt/MQTTPublishManager.java     | 94 +++++++++++-----------
 .../protocol/mqtt/MQTTRetainMessageManager.java    |  8 +-
 .../artemis/core/protocol/mqtt/MQTTSession.java    | 14 +---
 .../core/protocol/mqtt/MQTTStateManager.java       |  9 +--
 .../artemis/core/protocol/mqtt/MQTTUtil.java       | 42 ++++++----
 .../artemis/core/server/ServerSession.java         |  9 +++
 .../core/server/impl/ServerSessionImpl.java        | 44 +++++++---
 .../artemis/tests/integration/mqtt/MQTTTest.java   |  2 +-
 .../artemis/tests/integration/mqtt5/MQTT5Test.java | 10 +++
 .../tests/integration/mqtt5/spec/QoSTests.java     |  4 +-
 12 files changed, 136 insertions(+), 107 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index ebc9dd6e5a..1a4031262d 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -73,10 +73,7 @@ public class MQTTConnectionManager {
       sessionState.setFailed(false);
       ServerSessionImpl serverSession = createServerSession(username, 
password, validatedUser);
       serverSession.start();
-      ServerSessionImpl internalServerSession = createServerSession(username, 
password, validatedUser);
-      internalServerSession.disableSecurity();
-      internalServerSession.start();
-      session.setServerSession(serverSession, internalServerSession);
+      session.setServerSession(serverSession);
 
       if (cleanStart) {
          /*
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
index 8af4ca74d3..7a2ef08b2c 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
@@ -17,9 +17,9 @@
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
 import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.logs.BundleFactory;
 import org.apache.activemq.artemis.logs.annotation.LogBundle;
 import org.apache.activemq.artemis.logs.annotation.LogMessage;
-import org.apache.activemq.artemis.logs.BundleFactory;
 
 /**
  * Logger Codes 830000 - 839999
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index c75e148326..1165709893 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -16,8 +16,10 @@
  */
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
+import java.lang.invoke.MethodHandles;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
@@ -50,8 +52,6 @@ import 
org.apache.activemq.artemis.core.transaction.Transaction;
 import org.apache.activemq.artemis.utils.UUIDGenerator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
-import java.util.Objects;
 
 import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE;
 import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CORRELATION_DATA;
@@ -68,6 +68,7 @@ import static 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_PAYLO
 import static 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_RESPONSE_TOPIC_KEY;
 import static 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_USER_PROPERTY_EXISTS_KEY;
 import static 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_USER_PROPERTY_KEY_PREFIX_SIMPLE;
+import static 
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.createServerMessage;
 
 /**
  * Handles MQTT Exactly Once (QoS level 2) Protocol.
@@ -76,15 +77,17 @@ public class MQTTPublishManager {
 
    private static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-   private SimpleString managementAddress;
+   private SimpleString qos2ManagementAddress;
+
+   private Queue qos2ManagementQueue;
 
    private final String senderName = 
UUIDGenerator.getInstance().generateUUID().toString();
 
    private boolean createProducer = true;
 
-   private ServerConsumer managementConsumer;
+   private ServerConsumer qos2ManagementConsumer;
 
-   private MQTTSession session;
+   private final MQTTSession session;
 
    private final Object lock = new Object();
 
@@ -109,42 +112,21 @@ public class MQTTPublishManager {
       if (serversession != null) {
          serversession.removeProducer(serversession.getName());
       }
-      if (managementConsumer != null) {
-         managementConsumer.removeItself();
-         managementConsumer.setStarted(false);
-         managementConsumer.close(false);
+      if (qos2ManagementConsumer != null) {
+         qos2ManagementConsumer.removeItself();
+         qos2ManagementConsumer.setStarted(false);
+         qos2ManagementConsumer.close(false);
       }
    }
 
    void clean() throws Exception {
-      SimpleString managementAddress = createManagementAddress();
-      Queue queue = session.getServer().locateQueue(managementAddress);
-      if (queue != null) {
-         queue.deleteQueue();
-      }
-   }
-
-   private void createManagementConsumer() throws Exception {
-      long consumerId = session.getServer().getStorageManager().generateID();
-      managementConsumer = 
session.getInternalServerSession().createConsumer(consumerId, 
managementAddress, null, false, false, -1);
-      managementConsumer.setStarted(true);
-   }
-
-   private SimpleString createManagementAddress() {
-      return SimpleString.of(MQTTUtil.MANAGEMENT_QUEUE_PREFIX + 
session.getState().getClientId());
-   }
-
-   private void createManagementQueue() throws Exception {
-      Queue q = session.getServer().locateQueue(managementAddress);
-      if (q == null) {
-         
session.getServer().createQueue(QueueConfiguration.of(managementAddress)
-                                            
.setRoutingType(RoutingType.ANYCAST)
-                                            
.setDurable(MQTTUtil.DURABLE_MESSAGES));
+      if (qos2ManagementQueue != null) {
+         qos2ManagementQueue.deleteQueue();
       }
    }
 
-   boolean isManagementConsumer(ServerConsumer consumer) {
-      return consumer == managementConsumer;
+   boolean isQos2ManagementConsumer(ServerConsumer consumer) {
+      return consumer == qos2ManagementConsumer;
    }
 
    /**
@@ -156,7 +138,7 @@ public class MQTTPublishManager {
     */
    protected void sendMessage(ICoreMessage message, ServerConsumer consumer, 
int deliveryCount) throws Exception {
       // This is to allow retries of PubRel.
-      if (isManagementConsumer(consumer)) {
+      if (isQos2ManagementConsumer(consumer)) {
          sendPubRelMessage(message);
       } else {
          int qos = decideQoS(message, consumer);
@@ -312,22 +294,12 @@ public class MQTTPublishManager {
       session.getProtocolHandler().sendPubRel(messageId);
    }
 
-   private SimpleString getManagementAddress() throws Exception {
-      if (managementAddress == null) {
-         managementAddress = createManagementAddress();
-         createManagementQueue();
-         createManagementConsumer();
-      }
-      return managementAddress;
-   }
-
    void handlePubRec(int messageId) throws Exception {
       try {
          Pair<Long, Long> ref = outboundStore.publishReceived(messageId);
          if (ref != null) {
-            Message m = MQTTUtil.createPubRelMessage(session, 
getManagementAddress(), messageId);
-            //send the management message via the internal server session to 
bypass security.
-            session.getInternalServerSession().send(m, true, senderName);
+            initQos2Resources();
+            
MQTTUtil.sendMessageDirectlyToQueue(session.getServer().getStorageManager(), 
session.getServer().getPostOffice(), createPubRelMessage(session, messageId), 
qos2ManagementQueue, null);
             session.getServerSession().individualAcknowledge(ref.getB(), 
ref.getA());
             releaseFlowControl(ref.getB());
          } else {
@@ -338,6 +310,32 @@ public class MQTTPublishManager {
       }
    }
 
+   /*
+    * Only create these resources if we actually need them (i.e. we're sending 
a message to a subscriber via QoS 2)
+    */
+   private void initQos2Resources() throws Exception {
+      if (qos2ManagementAddress == null) {
+         qos2ManagementAddress = 
SimpleString.of(MQTTUtil.QOS2_MANAGEMENT_QUEUE_PREFIX + 
session.getState().getClientId());
+      }
+      if (qos2ManagementQueue == null) {
+         qos2ManagementQueue = 
session.getServer().createQueue(QueueConfiguration.of(qos2ManagementAddress)
+                                                                  
.setRoutingType(RoutingType.ANYCAST)
+                                                                  
.setDurable(MQTTUtil.DURABLE_MESSAGES),
+                                                               true);
+         qos2ManagementConsumer = 
session.getServerSession().createInternalConsumer(qos2ManagementAddress);
+         qos2ManagementConsumer.setStarted(true);
+      }
+   }
+
+   private Message createPubRelMessage(MQTTSession session, int messageId) {
+      MqttFixedHeader fixedHeader = new 
MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
+      MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader, 
null, null);
+      Message message = createServerMessage(session, qos2ManagementAddress, 
publishMessage)
+         .putIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY, messageId)
+         .putIntProperty(MQTTUtil.MQTT_MESSAGE_TYPE_KEY, 
MqttMessageType.PUBREL.value());
+      return message;
+   }
+
    /**
     * Once we get an acknowledgement for a QoS 1 or 2 message we allow 
messages to flow
     */
@@ -352,7 +350,7 @@ public class MQTTPublishManager {
       Pair<Long, Long> ref = 
session.getState().getOutboundStore().publishComplete(messageId);
       if (ref != null) {
          // ack the message via the internal server session to bypass security.
-         
session.getInternalServerSession().individualAcknowledge(managementConsumer.getID(),
 ref.getA());
+         
session.getServerSession().individualAcknowledge(qos2ManagementConsumer.getID(),
 ref.getA());
       }
    }
 
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
index b37931a74f..6dc6d9ac29 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
 import 
org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.MessageReference;
@@ -58,8 +59,9 @@ public class MQTTRetainMessageManager {
       queue.deleteAllReferences();
 
       if (!reset) {
-         Message message = 
LargeServerMessageImpl.checkLargeMessage(messageParameter, 
session.getServer().getStorageManager());
-         
sendToQueue(message.copy(session.getServer().getStorageManager().generateID()), 
queue, tx);
+         StorageManager storageManager = 
session.getServer().getStorageManager();
+         Message message = 
LargeServerMessageImpl.checkLargeMessage(messageParameter, storageManager);
+         MQTTUtil.sendMessageDirectlyToQueue(storageManager, 
session.getServer().getPostOffice(), message.copy(storageManager.generateID()), 
queue, tx);
       }
    }
 
@@ -84,7 +86,7 @@ public class MQTTRetainMessageManager {
                   }
                   Message message = 
ref.getMessage().copy(session.getServer().getStorageManager().generateID());
                   
message.putStringProperty(MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY, 
(String) null);
-                  sendToQueue(message, queue, tx);
+                  
MQTTUtil.sendMessageDirectlyToQueue(session.getServer().getStorageManager(), 
session.getServer().getPostOffice(), message, queue, tx);
                }
             }
          }
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index a681875eae..551ab41626 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -49,8 +49,6 @@ public class MQTTSession {
 
    private ServerSessionImpl serverSession;
 
-   private ServerSessionImpl internalServerSession;
-
    private MQTTPublishManager mqttPublishManager;
 
    private MQTTConnectionManager mqttConnectionManager;
@@ -129,11 +127,6 @@ public class MQTTSession {
             serverSession.close(failure);
          }
 
-         if (internalServerSession != null) {
-            internalServerSession.stop();
-            internalServerSession.close(failure);
-         }
-
          state.setAttached(false);
          state.setDisconnectedTime(System.currentTimeMillis());
          state.clearTopicAliases();
@@ -193,10 +186,6 @@ public class MQTTSession {
       return serverSession;
    }
 
-   ServerSessionImpl getInternalServerSession() {
-      return internalServerSession;
-   }
-
    ActiveMQServer getServer() {
       return protocolHandler.getServer();
    }
@@ -213,9 +202,8 @@ public class MQTTSession {
       return sessionCallback;
    }
 
-   void setServerSession(ServerSessionImpl serverSession, ServerSessionImpl 
internalServerSession) {
+   void setServerSession(ServerSessionImpl serverSession) {
       this.serverSession = serverSession;
-      this.internalServerSession = internalServerSession;
    }
 
    void setSessionState(MQTTSessionState state) {
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
index 7d536a884b..700d6f4978 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
@@ -33,11 +33,10 @@ import 
org.apache.activemq.artemis.api.core.QueueConfiguration;
 import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -179,10 +178,8 @@ public class MQTTStateManager {
    public void storeDurableSubscriptionState(MQTTSessionState state) throws 
Exception {
       if (subscriptionPersistenceEnabled) {
          logger.debug("Adding durable MQTT subscription record for: {}", 
state.getClientId());
-         Transaction tx = new TransactionImpl(server.getStorageManager());
-         tx.setAsync(true);
-         server.getPostOffice().route(serializeState(state, 
server.getStorageManager().generateID()), tx, false);
-         tx.commit();
+         StorageManager storageManager = server.getStorageManager();
+         MQTTUtil.sendMessageDirectlyToQueue(storageManager, 
server.getPostOffice(), serializeState(state, storageManager.generateID()), 
sessionStore, null);
       }
    }
 
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index d3777a63c0..3316e55f82 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -16,9 +16,11 @@
  */
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
+import java.lang.invoke.MethodHandles;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufUtil;
@@ -27,16 +29,13 @@ import 
io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
 import io.netty.handler.codec.mqtt.MqttConnectMessage;
 import io.netty.handler.codec.mqtt.MqttConnectPayload;
 import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
-import io.netty.handler.codec.mqtt.MqttFixedHeader;
 import io.netty.handler.codec.mqtt.MqttMessage;
 import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
-import io.netty.handler.codec.mqtt.MqttMessageType;
 import io.netty.handler.codec.mqtt.MqttProperties;
 import io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType;
 import io.netty.handler.codec.mqtt.MqttPubReplyMessageVariableHeader;
 import io.netty.handler.codec.mqtt.MqttPublishMessage;
 import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
-import io.netty.handler.codec.mqtt.MqttQoS;
 import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader;
 import io.netty.handler.codec.mqtt.MqttSubAckMessage;
 import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
@@ -48,12 +47,17 @@ import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.commons.text.CaseUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
-import java.util.Objects;
 
 import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE;
 import static 
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CORRELATION_DATA;
@@ -116,7 +120,7 @@ public class MQTTUtil {
 
    public static final SimpleString MQTT_CONTENT_TYPE_KEY = 
SimpleString.of("mqtt.content.type");
 
-   public static final String MANAGEMENT_QUEUE_PREFIX = DOLLAR + 
"sys.mqtt.queue.qos2.";
+   public static final String QOS2_MANAGEMENT_QUEUE_PREFIX = DOLLAR + 
"sys.mqtt.queue.qos2.";
 
    public static final String SHARED_SUBSCRIPTION_PREFIX = DOLLAR + "share/";
 
@@ -214,7 +218,7 @@ public class MQTTUtil {
       return wildcardConfiguration.convert(address, MQTT_WILDCARD);
    }
 
-   private static ICoreMessage createServerMessage(MQTTSession session, 
SimpleString address, MqttPublishMessage mqttPublishMessage) {
+   public static ICoreMessage createServerMessage(MQTTSession session, 
SimpleString address, MqttPublishMessage mqttPublishMessage) {
       long id = session.getServer().getStorageManager().generateID();
 
       CoreMessage message = new CoreMessage(id, 
mqttPublishMessage.fixedHeader().remainingLength(), 
session.getCoreMessageObjectPools());
@@ -285,15 +289,6 @@ public class MQTTUtil {
       return message;
    }
 
-   public static Message createPubRelMessage(MQTTSession session, SimpleString 
address, int messageId) {
-      MqttFixedHeader fixedHeader = new 
MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
-      MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader, 
null, null);
-      Message message = createServerMessage(session, address, publishMessage)
-         .putIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY, messageId)
-         .putIntProperty(MQTTUtil.MQTT_MESSAGE_TYPE_KEY, 
MqttMessageType.PUBREL.value());
-      return message;
-   }
-
    public static void logMessage(MQTTSessionState state, MqttMessage message, 
boolean inbound, MQTTVersion version) {
       if (logger.isTraceEnabled()) {
          StringBuilder log = new StringBuilder("MQTT(");
@@ -590,4 +585,19 @@ public class MQTTUtil {
          return false;
       }
    }
+
+   public static void sendMessageDirectlyToQueue(StorageManager 
storageManager, PostOffice postOffice, Message message, Queue queue, final 
Transaction incomingTx) throws Exception {
+      Transaction tx = incomingTx;
+      if (incomingTx == null) {
+         tx = new TransactionImpl(storageManager);
+         tx.setAsync(true);
+      }
+      RoutingContext context = new RoutingContextImpl(tx);
+      queue.route(message, context);
+      postOffice.processRoute(message, context, false);
+      if (incomingTx == null) {
+         // commit the transaction we created otherwise leave it for the 
caller to commit
+         tx.commit();
+      }
+   }
 }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index d3b03d7216..cc281bba45 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -122,6 +122,15 @@ public interface ServerSession extends SecurityAuth {
                                  boolean supportLargeMessage,
                                  Integer credits) throws Exception;
 
+   ServerConsumer createConsumer(long consumerID,
+                                 SimpleString queueName,
+                                 SimpleString filterString,
+                                 int priority,
+                                 boolean browseOnly,
+                                 boolean supportLargeMessage,
+                                 Integer credits,
+                                 boolean enforceSecurity) throws Exception;
+
    /**
     * To be used by protocol heads that needs to control the transaction 
outside the session context.
     */
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index adccaaac72..4c112e5990 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -554,6 +554,22 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
                                         final boolean browseOnly,
                                         final boolean supportLargeMessage,
                                         final Integer credits) throws 
Exception {
+      return this.createConsumer(consumerID, queueName, filterString, 
priority, browseOnly, supportLargeMessage, credits, true);
+   }
+
+   public ServerConsumer createInternalConsumer(final SimpleString queueName) 
throws Exception {
+      return this.createConsumer(storageManager.generateID(), queueName, null, 
ActiveMQDefaultConfiguration.getDefaultConsumerPriority(), false, false, -1, 
false);
+   }
+
+   @Override
+   public ServerConsumer createConsumer(final long consumerID,
+                                        final SimpleString queueName,
+                                        final SimpleString filterString,
+                                        final int priority,
+                                        final boolean browseOnly,
+                                        final boolean supportLargeMessage,
+                                        final Integer credits,
+                                        final boolean enforceSecurity) throws 
Exception {
       if (AuditLogger.isBaseLoggingEnabled()) {
          AuditLogger.createCoreConsumer(this, remotingConnection.getSubject(), 
remotingConnection.getRemoteAddress(), consumerID, queueName, filterString, 
priority, browseOnly, supportLargeMessage, credits);
       }
@@ -566,19 +582,21 @@ public class ServerSessionImpl implements ServerSession, 
FailureListener {
       }
 
       SimpleString address = removePrefix(binding.getAddress());
-      try {
-         securityCheck(address, unPrefixedQueueName, browseOnly ? 
CheckType.BROWSE : CheckType.CONSUME, this);
-      } catch (Exception e) {
-         /*
-          * This is here for backwards compatibility with the pre-FQQN syntax 
from ARTEMIS-592.
-          * We only want to do this check if an exact match exists in the 
security-settings.
-          * This code is deprecated and should be removed at the release of 
the next major version.
-          */
-         SimpleString exactMatch = 
address.concat(".").concat(unPrefixedQueueName);
-         if 
(server.getSecurityRepository().containsExactMatch(exactMatch.toString())) {
-            securityCheck(exactMatch, unPrefixedQueueName, browseOnly ? 
CheckType.BROWSE : CheckType.CONSUME, this);
-         } else {
-            throw e;
+      if (enforceSecurity) {
+         try {
+            securityCheck(address, unPrefixedQueueName, browseOnly ? 
CheckType.BROWSE : CheckType.CONSUME, this);
+         } catch (Exception e) {
+            /*
+             * This is here for backwards compatibility with the pre-FQQN 
syntax from ARTEMIS-592.
+             * We only want to do this check if an exact match exists in the 
security-settings.
+             * This code is deprecated and should be removed at the release of 
the next major version.
+             */
+            SimpleString exactMatch = 
address.concat(".").concat(unPrefixedQueueName);
+            if 
(server.getSecurityRepository().containsExactMatch(exactMatch.toString())) {
+               securityCheck(exactMatch, unPrefixedQueueName, browseOnly ? 
CheckType.BROWSE : CheckType.CONSUME, this);
+            } else {
+               throw e;
+            }
          }
       }
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
index fd57a70a63..2e979fa707 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
@@ -332,7 +332,7 @@ public class MQTTTest extends MQTTTestSupport {
          assertEquals(payload, new String(message));
       }
 
-      final Queue queue = 
server.locateQueue(SimpleString.of(MQTTUtil.MANAGEMENT_QUEUE_PREFIX + 
clientId));
+      final Queue queue = 
server.locateQueue(SimpleString.of(MQTTUtil.QOS2_MANAGEMENT_QUEUE_PREFIX + 
clientId));
 
       Wait.waitFor(() -> queue.getMessageCount() == 0, 1000, 100);
 
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index 187b76fc7c..f66af71846 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -914,4 +914,14 @@ public class MQTT5Test extends MQTT5TestSupport {
       assertTrue(latch.await(1, TimeUnit.MINUTES), "not all tasks finished");
       assertFalse(failed.get());
    }
+
+   @Test
+   @Timeout(DEFAULT_TIMEOUT_SEC)
+   public void testSessionCount() throws Exception {
+      MqttClient subscriber = createPahoClient("subscriber");
+      subscriber.connect();
+      MqttClient producer = createPahoClient("producer");
+      producer.connect();
+      assertEquals(2, server.getActiveMQServerControl().getSessionCount());
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/QoSTests.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/QoSTests.java
index 5e4ef175cd..92ff22721d 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/QoSTests.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/QoSTests.java
@@ -370,8 +370,8 @@ public class QoSTests extends MQTT5TestSupport {
          if (packet.fixedHeader().messageType() == MqttMessageType.PUBCOMP) {
             try {
                // ensure the message is still in the management queue before 
we get the PUBCOMP from the client
-               Wait.assertEquals(1L, () -> 
server.locateQueue(MQTTUtil.MANAGEMENT_QUEUE_PREFIX + 
CONSUMER_ID).getMessageCount(), 2000, 100);
-               Wait.assertEquals(1L, () -> 
server.locateQueue(MQTTUtil.MANAGEMENT_QUEUE_PREFIX + 
CONSUMER_ID).getDeliveringCount(), 2000, 100);
+               Wait.assertEquals(1L, () -> 
server.locateQueue(MQTTUtil.QOS2_MANAGEMENT_QUEUE_PREFIX + 
CONSUMER_ID).getMessageCount(), 2000, 100);
+               Wait.assertEquals(1L, () -> 
server.locateQueue(MQTTUtil.QOS2_MANAGEMENT_QUEUE_PREFIX + 
CONSUMER_ID).getDeliveringCount(), 2000, 100);
             } catch (Exception e) {
                return false;
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to