This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 682f505e32 ARTEMIS-3942 use session instead of direct routing for MQTT
LWT messages
new 05b3879cba This closes #4180
682f505e32 is described below
commit 682f505e32f9b6472665212acd6f58c32c7bf98d
Author: Justin Bertram <[email protected]>
AuthorDate: Wed Aug 17 11:33:15 2022 -0500
ARTEMIS-3942 use session instead of direct routing for MQTT LWT messages
Using direct routing skips authorization for "Last Will and Testament"
messages (a.k.a. "will" messages). This commit fixes that problem by
using the internal session that is established for normal message
production and consumption.
---
.../artemis/core/protocol/mqtt/MQTTLogger.java | 4 ++
.../core/protocol/mqtt/MQTTPublishManager.java | 9 ++---
.../artemis/core/protocol/mqtt/MQTTSession.java | 3 ++
.../controlpackets/PublishTestsWithSecurity.java | 47 ++++++++++++++++++++++
4 files changed, 58 insertions(+), 5 deletions(-)
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
index 17a1295eb6..a765de1e78 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
@@ -80,4 +80,8 @@ public interface MQTTLogger extends BasicLogger {
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 834006, value = "Failed to publish MQTT message: {0}.",
format = Message.Format.MESSAGE_FORMAT)
void failedToPublishMqttMessage(String exceptionMessage, @Cause Throwable
t);
+
+ @LogMessage(level = Logger.Level.ERROR)
+ @Message(id = 834007, value = "Authorization failure sending will message:
{0}", format = Message.Format.MESSAGE_FORMAT)
+ void authorizationFailureSendingWillMessage(String message);
}
\ No newline at end of file
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index ee1dfdd211..38d6e4cc44 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -214,11 +214,7 @@ public class MQTTPublishManager {
Transaction tx = session.getServerSession().newTransaction();
try {
- if (internal) {
- session.getServer().getPostOffice().route(serverMessage, tx,
true);
- } else {
- session.getServerSession().send(tx, serverMessage, true,
false);
- }
+ session.getServerSession().send(tx, serverMessage, true, false);
if (message.fixedHeader().isRetain()) {
ByteBuf payload = message.payload();
@@ -228,6 +224,9 @@ public class MQTTPublishManager {
tx.commit();
} catch (ActiveMQSecurityException e) {
tx.rollback();
+ if (internal) {
+ throw e;
+ }
if (session.getVersion() == MQTTVersion.MQTT_5) {
sendMessageAck(internal, qos, messageId,
MQTTReasonCodes.NOT_AUTHORIZED);
return;
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index d434ca55ba..db4295b100 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -25,6 +25,7 @@ import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -277,6 +278,8 @@ public class MQTTSession {
getMqttPublishManager().sendToQueue(publishMessage, true);
state.setWillSent(true);
state.setWillMessage(null);
+ } catch (ActiveMQSecurityException e) {
+
MQTTLogger.LOGGER.authorizationFailureSendingWillMessage(e.getMessage());
} catch (Exception e) {
MQTTLogger.LOGGER.errorSendingWillMessage(e);
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java
index ba107493ec..5b902fbcf7 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java
@@ -18,13 +18,17 @@
package
org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets;
import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.eclipse.paho.mqttv5.client.MqttClient;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
import org.eclipse.paho.mqttv5.common.MqttException;
+import org.eclipse.paho.mqttv5.common.MqttMessage;
import org.jboss.logging.Logger;
import org.junit.Test;
@@ -83,4 +87,47 @@ public class PublishTestsWithSecurity extends
MQTT5TestSupport {
client.isConnected();
}
+
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testWillAuthorizationSuccess() throws Exception {
+ internalTestWillAuthorization(fullUser, fullPass, true);
+ }
+
+ @Test(timeout = DEFAULT_TIMEOUT)
+ public void testWillAuthorizationFailure() throws Exception {
+ internalTestWillAuthorization(noprivUser, noprivPass, false);
+ }
+
+ private void internalTestWillAuthorization(String username, String
password, boolean succeed) throws Exception {
+ final byte[] WILL = RandomUtil.randomBytes();
+ final String TOPIC = RandomUtil.randomString();
+
+ // consumer of the will message
+ MqttClient client1 = createPahoClient("willConsumer");
+ CountDownLatch latch = new CountDownLatch(1);
+ client1.setCallback(new DefaultMqttCallback() {
+ @Override
+ public void messageArrived(String topic, MqttMessage message) {
+ latch.countDown();
+ }
+ });
+ MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
+ .username(fullUser)
+ .password(fullPass.getBytes(StandardCharsets.UTF_8))
+ .build();
+ client1.connect(options);
+ client1.subscribe(TOPIC, 1);
+
+ // consumer to generate the will
+ MqttClient client2 = createPahoClient("willGenerator");
+ options = new MqttConnectionOptionsBuilder()
+ .username(username)
+ .password(password.getBytes(StandardCharsets.UTF_8))
+ .will(TOPIC, new MqttMessage(WILL))
+ .build();
+ client2.connect(options);
+ client2.disconnectForcibly(0, 0, false);
+
+ assertEquals(succeed, latch.await(2, TimeUnit.SECONDS));
+ }
}