This is an automated email from the ASF dual-hosted git repository.
tabish pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 1c394e4bdf ARTEMIS-5637 reduce MQTT-to-Core session ratio
1c394e4bdf is described below
commit 1c394e4bdf4cce8824503613d92ed15f705f0ac5
Author: Justin Bertram <[email protected]>
AuthorDate: Tue Aug 26 09:36:30 2025 -0500
ARTEMIS-5637 reduce MQTT-to-Core session ratio
This commit includes the following changes:
- Eliminate the "internal" Core session used by MQTT.
- Consolidate MQTT-specific logic to send a message directly to a
queue.
- Rename some variables in MQTTPublishManager to be more clear.
- Refactor some methods in MQTTPublishManager to be more clear.
- Move createPubRelMessage method from MQTTUtil to MQTTPublishManager.
- Add new methods to ServerSession to allow creating a consumer with no
security checks (i.e. an "internal" consumer).
- Add a test to verify 1:1 MQTT-connection:Core-session ratio.
---
.../core/protocol/mqtt/MQTTConnectionManager.java | 5 +-
.../artemis/core/protocol/mqtt/MQTTLogger.java | 2 +-
.../core/protocol/mqtt/MQTTPublishManager.java | 94 +++++++++++-----------
.../protocol/mqtt/MQTTRetainMessageManager.java | 8 +-
.../artemis/core/protocol/mqtt/MQTTSession.java | 14 +---
.../core/protocol/mqtt/MQTTStateManager.java | 9 +--
.../artemis/core/protocol/mqtt/MQTTUtil.java | 42 ++++++----
.../artemis/core/server/ServerSession.java | 9 +++
.../core/server/impl/ServerSessionImpl.java | 44 +++++++---
.../artemis/tests/integration/mqtt/MQTTTest.java | 2 +-
.../artemis/tests/integration/mqtt5/MQTT5Test.java | 10 +++
.../tests/integration/mqtt5/spec/QoSTests.java | 4 +-
12 files changed, 136 insertions(+), 107 deletions(-)
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
index ebc9dd6e5a..1a4031262d 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java
@@ -73,10 +73,7 @@ public class MQTTConnectionManager {
sessionState.setFailed(false);
ServerSessionImpl serverSession = createServerSession(username,
password, validatedUser);
serverSession.start();
- ServerSessionImpl internalServerSession = createServerSession(username,
password, validatedUser);
- internalServerSession.disableSecurity();
- internalServerSession.start();
- session.setServerSession(serverSession, internalServerSession);
+ session.setServerSession(serverSession);
if (cleanStart) {
/*
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
index 8af4ca74d3..7a2ef08b2c 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
@@ -17,9 +17,9 @@
package org.apache.activemq.artemis.core.protocol.mqtt;
import org.apache.activemq.artemis.core.server.MessageReference;
+import org.apache.activemq.artemis.logs.BundleFactory;
import org.apache.activemq.artemis.logs.annotation.LogBundle;
import org.apache.activemq.artemis.logs.annotation.LogMessage;
-import org.apache.activemq.artemis.logs.BundleFactory;
/**
* Logger Codes 830000 - 839999
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index c75e148326..1165709893 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -16,8 +16,10 @@
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
+import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
@@ -50,8 +52,6 @@ import
org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
-import java.util.Objects;
import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE;
import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CORRELATION_DATA;
@@ -68,6 +68,7 @@ import static
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_PAYLO
import static
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_RESPONSE_TOPIC_KEY;
import static
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_USER_PROPERTY_EXISTS_KEY;
import static
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.MQTT_USER_PROPERTY_KEY_PREFIX_SIMPLE;
+import static
org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil.createServerMessage;
/**
* Handles MQTT Exactly Once (QoS level 2) Protocol.
@@ -76,15 +77,17 @@ public class MQTTPublishManager {
private static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- private SimpleString managementAddress;
+ private SimpleString qos2ManagementAddress;
+
+ private Queue qos2ManagementQueue;
private final String senderName =
UUIDGenerator.getInstance().generateUUID().toString();
private boolean createProducer = true;
- private ServerConsumer managementConsumer;
+ private ServerConsumer qos2ManagementConsumer;
- private MQTTSession session;
+ private final MQTTSession session;
private final Object lock = new Object();
@@ -109,42 +112,21 @@ public class MQTTPublishManager {
if (serversession != null) {
serversession.removeProducer(serversession.getName());
}
- if (managementConsumer != null) {
- managementConsumer.removeItself();
- managementConsumer.setStarted(false);
- managementConsumer.close(false);
+ if (qos2ManagementConsumer != null) {
+ qos2ManagementConsumer.removeItself();
+ qos2ManagementConsumer.setStarted(false);
+ qos2ManagementConsumer.close(false);
}
}
void clean() throws Exception {
- SimpleString managementAddress = createManagementAddress();
- Queue queue = session.getServer().locateQueue(managementAddress);
- if (queue != null) {
- queue.deleteQueue();
- }
- }
-
- private void createManagementConsumer() throws Exception {
- long consumerId = session.getServer().getStorageManager().generateID();
- managementConsumer =
session.getInternalServerSession().createConsumer(consumerId,
managementAddress, null, false, false, -1);
- managementConsumer.setStarted(true);
- }
-
- private SimpleString createManagementAddress() {
- return SimpleString.of(MQTTUtil.MANAGEMENT_QUEUE_PREFIX +
session.getState().getClientId());
- }
-
- private void createManagementQueue() throws Exception {
- Queue q = session.getServer().locateQueue(managementAddress);
- if (q == null) {
-
session.getServer().createQueue(QueueConfiguration.of(managementAddress)
-
.setRoutingType(RoutingType.ANYCAST)
-
.setDurable(MQTTUtil.DURABLE_MESSAGES));
+ if (qos2ManagementQueue != null) {
+ qos2ManagementQueue.deleteQueue();
}
}
- boolean isManagementConsumer(ServerConsumer consumer) {
- return consumer == managementConsumer;
+ boolean isQos2ManagementConsumer(ServerConsumer consumer) {
+ return consumer == qos2ManagementConsumer;
}
/**
@@ -156,7 +138,7 @@ public class MQTTPublishManager {
*/
protected void sendMessage(ICoreMessage message, ServerConsumer consumer,
int deliveryCount) throws Exception {
// This is to allow retries of PubRel.
- if (isManagementConsumer(consumer)) {
+ if (isQos2ManagementConsumer(consumer)) {
sendPubRelMessage(message);
} else {
int qos = decideQoS(message, consumer);
@@ -312,22 +294,12 @@ public class MQTTPublishManager {
session.getProtocolHandler().sendPubRel(messageId);
}
- private SimpleString getManagementAddress() throws Exception {
- if (managementAddress == null) {
- managementAddress = createManagementAddress();
- createManagementQueue();
- createManagementConsumer();
- }
- return managementAddress;
- }
-
void handlePubRec(int messageId) throws Exception {
try {
Pair<Long, Long> ref = outboundStore.publishReceived(messageId);
if (ref != null) {
- Message m = MQTTUtil.createPubRelMessage(session,
getManagementAddress(), messageId);
- //send the management message via the internal server session to
bypass security.
- session.getInternalServerSession().send(m, true, senderName);
+ initQos2Resources();
+
MQTTUtil.sendMessageDirectlyToQueue(session.getServer().getStorageManager(),
session.getServer().getPostOffice(), createPubRelMessage(session, messageId),
qos2ManagementQueue, null);
session.getServerSession().individualAcknowledge(ref.getB(),
ref.getA());
releaseFlowControl(ref.getB());
} else {
@@ -338,6 +310,32 @@ public class MQTTPublishManager {
}
}
+ /*
+ * Only create these resources if we actually need them (i.e. we're sending
a message to a subscriber via QoS 2)
+ */
+ private void initQos2Resources() throws Exception {
+ if (qos2ManagementAddress == null) {
+ qos2ManagementAddress =
SimpleString.of(MQTTUtil.QOS2_MANAGEMENT_QUEUE_PREFIX +
session.getState().getClientId());
+ }
+ if (qos2ManagementQueue == null) {
+ qos2ManagementQueue =
session.getServer().createQueue(QueueConfiguration.of(qos2ManagementAddress)
+
.setRoutingType(RoutingType.ANYCAST)
+
.setDurable(MQTTUtil.DURABLE_MESSAGES),
+ true);
+ qos2ManagementConsumer =
session.getServerSession().createInternalConsumer(qos2ManagementAddress);
+ qos2ManagementConsumer.setStarted(true);
+ }
+ }
+
+ private Message createPubRelMessage(MQTTSession session, int messageId) {
+ MqttFixedHeader fixedHeader = new
MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
+ MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader,
null, null);
+ Message message = createServerMessage(session, qos2ManagementAddress,
publishMessage)
+ .putIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY, messageId)
+ .putIntProperty(MQTTUtil.MQTT_MESSAGE_TYPE_KEY,
MqttMessageType.PUBREL.value());
+ return message;
+ }
+
/**
* Once we get an acknowledgement for a QoS 1 or 2 message we allow
messages to flow
*/
@@ -352,7 +350,7 @@ public class MQTTPublishManager {
Pair<Long, Long> ref =
session.getState().getOutboundStore().publishComplete(messageId);
if (ref != null) {
// ack the message via the internal server session to bypass security.
-
session.getInternalServerSession().individualAcknowledge(managementConsumer.getID(),
ref.getA());
+
session.getServerSession().individualAcknowledge(qos2ManagementConsumer.getID(),
ref.getA());
}
}
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
index b37931a74f..6dc6d9ac29 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.protocol.mqtt;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
import
org.apache.activemq.artemis.core.persistence.impl.journal.LargeServerMessageImpl;
import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference;
@@ -58,8 +59,9 @@ public class MQTTRetainMessageManager {
queue.deleteAllReferences();
if (!reset) {
- Message message =
LargeServerMessageImpl.checkLargeMessage(messageParameter,
session.getServer().getStorageManager());
-
sendToQueue(message.copy(session.getServer().getStorageManager().generateID()),
queue, tx);
+ StorageManager storageManager =
session.getServer().getStorageManager();
+ Message message =
LargeServerMessageImpl.checkLargeMessage(messageParameter, storageManager);
+ MQTTUtil.sendMessageDirectlyToQueue(storageManager,
session.getServer().getPostOffice(), message.copy(storageManager.generateID()),
queue, tx);
}
}
@@ -84,7 +86,7 @@ public class MQTTRetainMessageManager {
}
Message message =
ref.getMessage().copy(session.getServer().getStorageManager().generateID());
message.putStringProperty(MQTT_MESSAGE_RETAIN_INITIAL_DISTRIBUTION_KEY,
(String) null);
- sendToQueue(message, queue, tx);
+
MQTTUtil.sendMessageDirectlyToQueue(session.getServer().getStorageManager(),
session.getServer().getPostOffice(), message, queue, tx);
}
}
}
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index a681875eae..551ab41626 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -49,8 +49,6 @@ public class MQTTSession {
private ServerSessionImpl serverSession;
- private ServerSessionImpl internalServerSession;
-
private MQTTPublishManager mqttPublishManager;
private MQTTConnectionManager mqttConnectionManager;
@@ -129,11 +127,6 @@ public class MQTTSession {
serverSession.close(failure);
}
- if (internalServerSession != null) {
- internalServerSession.stop();
- internalServerSession.close(failure);
- }
-
state.setAttached(false);
state.setDisconnectedTime(System.currentTimeMillis());
state.clearTopicAliases();
@@ -193,10 +186,6 @@ public class MQTTSession {
return serverSession;
}
- ServerSessionImpl getInternalServerSession() {
- return internalServerSession;
- }
-
ActiveMQServer getServer() {
return protocolHandler.getServer();
}
@@ -213,9 +202,8 @@ public class MQTTSession {
return sessionCallback;
}
- void setServerSession(ServerSessionImpl serverSession, ServerSessionImpl
internalServerSession) {
+ void setServerSession(ServerSessionImpl serverSession) {
this.serverSession = serverSession;
- this.internalServerSession = internalServerSession;
}
void setSessionState(MQTTSessionState state) {
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
index 7d536a884b..700d6f4978 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTStateManager.java
@@ -33,11 +33,10 @@ import
org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.filter.impl.FilterImpl;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.core.transaction.Transaction;
-import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -179,10 +178,8 @@ public class MQTTStateManager {
public void storeDurableSubscriptionState(MQTTSessionState state) throws
Exception {
if (subscriptionPersistenceEnabled) {
logger.debug("Adding durable MQTT subscription record for: {}",
state.getClientId());
- Transaction tx = new TransactionImpl(server.getStorageManager());
- tx.setAsync(true);
- server.getPostOffice().route(serializeState(state,
server.getStorageManager().generateID()), tx, false);
- tx.commit();
+ StorageManager storageManager = server.getStorageManager();
+ MQTTUtil.sendMessageDirectlyToQueue(storageManager,
server.getPostOffice(), serializeState(state, storageManager.generateID()),
sessionStore, null);
}
}
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index d3777a63c0..3316e55f82 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -16,9 +16,11 @@
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
+import java.lang.invoke.MethodHandles;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
@@ -27,16 +29,13 @@ import
io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttConnectPayload;
import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
-import io.netty.handler.codec.mqtt.MqttFixedHeader;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
-import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType;
import io.netty.handler.codec.mqtt.MqttPubReplyMessageVariableHeader;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
-import io.netty.handler.codec.mqtt.MqttQoS;
import io.netty.handler.codec.mqtt.MqttReasonCodeAndPropertiesVariableHeader;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
@@ -48,12 +47,17 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
+import org.apache.activemq.artemis.core.persistence.StorageManager;
+import org.apache.activemq.artemis.core.postoffice.PostOffice;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.RoutingContext;
+import org.apache.activemq.artemis.core.server.impl.RoutingContextImpl;
+import org.apache.activemq.artemis.core.transaction.Transaction;
+import org.apache.activemq.artemis.core.transaction.impl.TransactionImpl;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.commons.text.CaseUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.lang.invoke.MethodHandles;
-import java.util.Objects;
import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CONTENT_TYPE;
import static
io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.CORRELATION_DATA;
@@ -116,7 +120,7 @@ public class MQTTUtil {
public static final SimpleString MQTT_CONTENT_TYPE_KEY =
SimpleString.of("mqtt.content.type");
- public static final String MANAGEMENT_QUEUE_PREFIX = DOLLAR +
"sys.mqtt.queue.qos2.";
+ public static final String QOS2_MANAGEMENT_QUEUE_PREFIX = DOLLAR +
"sys.mqtt.queue.qos2.";
public static final String SHARED_SUBSCRIPTION_PREFIX = DOLLAR + "share/";
@@ -214,7 +218,7 @@ public class MQTTUtil {
return wildcardConfiguration.convert(address, MQTT_WILDCARD);
}
- private static ICoreMessage createServerMessage(MQTTSession session,
SimpleString address, MqttPublishMessage mqttPublishMessage) {
+ public static ICoreMessage createServerMessage(MQTTSession session,
SimpleString address, MqttPublishMessage mqttPublishMessage) {
long id = session.getServer().getStorageManager().generateID();
CoreMessage message = new CoreMessage(id,
mqttPublishMessage.fixedHeader().remainingLength(),
session.getCoreMessageObjectPools());
@@ -285,15 +289,6 @@ public class MQTTUtil {
return message;
}
- public static Message createPubRelMessage(MQTTSession session, SimpleString
address, int messageId) {
- MqttFixedHeader fixedHeader = new
MqttFixedHeader(MqttMessageType.PUBREL, false, MqttQoS.AT_LEAST_ONCE, false, 0);
- MqttPublishMessage publishMessage = new MqttPublishMessage(fixedHeader,
null, null);
- Message message = createServerMessage(session, address, publishMessage)
- .putIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY, messageId)
- .putIntProperty(MQTTUtil.MQTT_MESSAGE_TYPE_KEY,
MqttMessageType.PUBREL.value());
- return message;
- }
-
public static void logMessage(MQTTSessionState state, MqttMessage message,
boolean inbound, MQTTVersion version) {
if (logger.isTraceEnabled()) {
StringBuilder log = new StringBuilder("MQTT(");
@@ -590,4 +585,19 @@ public class MQTTUtil {
return false;
}
}
+
+ public static void sendMessageDirectlyToQueue(StorageManager
storageManager, PostOffice postOffice, Message message, Queue queue, final
Transaction incomingTx) throws Exception {
+ Transaction tx = incomingTx;
+ if (incomingTx == null) {
+ tx = new TransactionImpl(storageManager);
+ tx.setAsync(true);
+ }
+ RoutingContext context = new RoutingContextImpl(tx);
+ queue.route(message, context);
+ postOffice.processRoute(message, context, false);
+ if (incomingTx == null) {
+ // commit the transaction we created otherwise leave it for the
caller to commit
+ tx.commit();
+ }
+ }
}
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index d3b03d7216..cc281bba45 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -122,6 +122,15 @@ public interface ServerSession extends SecurityAuth {
boolean supportLargeMessage,
Integer credits) throws Exception;
+ ServerConsumer createConsumer(long consumerID,
+ SimpleString queueName,
+ SimpleString filterString,
+ int priority,
+ boolean browseOnly,
+ boolean supportLargeMessage,
+ Integer credits,
+ boolean enforceSecurity) throws Exception;
+
/**
* To be used by protocol heads that needs to control the transaction
outside the session context.
*/
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index adccaaac72..4c112e5990 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -554,6 +554,22 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
final boolean browseOnly,
final boolean supportLargeMessage,
final Integer credits) throws
Exception {
+ return this.createConsumer(consumerID, queueName, filterString,
priority, browseOnly, supportLargeMessage, credits, true);
+ }
+
+ public ServerConsumer createInternalConsumer(final SimpleString queueName)
throws Exception {
+ return this.createConsumer(storageManager.generateID(), queueName, null,
ActiveMQDefaultConfiguration.getDefaultConsumerPriority(), false, false, -1,
false);
+ }
+
+ @Override
+ public ServerConsumer createConsumer(final long consumerID,
+ final SimpleString queueName,
+ final SimpleString filterString,
+ final int priority,
+ final boolean browseOnly,
+ final boolean supportLargeMessage,
+ final Integer credits,
+ final boolean enforceSecurity) throws
Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.createCoreConsumer(this, remotingConnection.getSubject(),
remotingConnection.getRemoteAddress(), consumerID, queueName, filterString,
priority, browseOnly, supportLargeMessage, credits);
}
@@ -566,19 +582,21 @@ public class ServerSessionImpl implements ServerSession,
FailureListener {
}
SimpleString address = removePrefix(binding.getAddress());
- try {
- securityCheck(address, unPrefixedQueueName, browseOnly ?
CheckType.BROWSE : CheckType.CONSUME, this);
- } catch (Exception e) {
- /*
- * This is here for backwards compatibility with the pre-FQQN syntax
from ARTEMIS-592.
- * We only want to do this check if an exact match exists in the
security-settings.
- * This code is deprecated and should be removed at the release of
the next major version.
- */
- SimpleString exactMatch =
address.concat(".").concat(unPrefixedQueueName);
- if
(server.getSecurityRepository().containsExactMatch(exactMatch.toString())) {
- securityCheck(exactMatch, unPrefixedQueueName, browseOnly ?
CheckType.BROWSE : CheckType.CONSUME, this);
- } else {
- throw e;
+ if (enforceSecurity) {
+ try {
+ securityCheck(address, unPrefixedQueueName, browseOnly ?
CheckType.BROWSE : CheckType.CONSUME, this);
+ } catch (Exception e) {
+ /*
+ * This is here for backwards compatibility with the pre-FQQN
syntax from ARTEMIS-592.
+ * We only want to do this check if an exact match exists in the
security-settings.
+ * This code is deprecated and should be removed at the release of
the next major version.
+ */
+ SimpleString exactMatch =
address.concat(".").concat(unPrefixedQueueName);
+ if
(server.getSecurityRepository().containsExactMatch(exactMatch.toString())) {
+ securityCheck(exactMatch, unPrefixedQueueName, browseOnly ?
CheckType.BROWSE : CheckType.CONSUME, this);
+ } else {
+ throw e;
+ }
}
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
index fd57a70a63..2e979fa707 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MQTTTest.java
@@ -332,7 +332,7 @@ public class MQTTTest extends MQTTTestSupport {
assertEquals(payload, new String(message));
}
- final Queue queue =
server.locateQueue(SimpleString.of(MQTTUtil.MANAGEMENT_QUEUE_PREFIX +
clientId));
+ final Queue queue =
server.locateQueue(SimpleString.of(MQTTUtil.QOS2_MANAGEMENT_QUEUE_PREFIX +
clientId));
Wait.waitFor(() -> queue.getMessageCount() == 0, 1000, 100);
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index 187b76fc7c..f66af71846 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -914,4 +914,14 @@ public class MQTT5Test extends MQTT5TestSupport {
assertTrue(latch.await(1, TimeUnit.MINUTES), "not all tasks finished");
assertFalse(failed.get());
}
+
+ @Test
+ @Timeout(DEFAULT_TIMEOUT_SEC)
+ public void testSessionCount() throws Exception {
+ MqttClient subscriber = createPahoClient("subscriber");
+ subscriber.connect();
+ MqttClient producer = createPahoClient("producer");
+ producer.connect();
+ assertEquals(2, server.getActiveMQServerControl().getSessionCount());
+ }
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/QoSTests.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/QoSTests.java
index 5e4ef175cd..92ff22721d 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/QoSTests.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/QoSTests.java
@@ -370,8 +370,8 @@ public class QoSTests extends MQTT5TestSupport {
if (packet.fixedHeader().messageType() == MqttMessageType.PUBCOMP) {
try {
// ensure the message is still in the management queue before
we get the PUBCOMP from the client
- Wait.assertEquals(1L, () ->
server.locateQueue(MQTTUtil.MANAGEMENT_QUEUE_PREFIX +
CONSUMER_ID).getMessageCount(), 2000, 100);
- Wait.assertEquals(1L, () ->
server.locateQueue(MQTTUtil.MANAGEMENT_QUEUE_PREFIX +
CONSUMER_ID).getDeliveringCount(), 2000, 100);
+ Wait.assertEquals(1L, () ->
server.locateQueue(MQTTUtil.QOS2_MANAGEMENT_QUEUE_PREFIX +
CONSUMER_ID).getMessageCount(), 2000, 100);
+ Wait.assertEquals(1L, () ->
server.locateQueue(MQTTUtil.QOS2_MANAGEMENT_QUEUE_PREFIX +
CONSUMER_ID).getDeliveringCount(), 2000, 100);
} catch (Exception e) {
return false;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact