This is an automated email from the ASF dual-hosted git repository.

jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new 682f505e32 ARTEMIS-3942 use session instead of direct routing for MQTT 
LWT messages
     new 05b3879cba This closes #4180
682f505e32 is described below

commit 682f505e32f9b6472665212acd6f58c32c7bf98d
Author: Justin Bertram <[email protected]>
AuthorDate: Wed Aug 17 11:33:15 2022 -0500

    ARTEMIS-3942 use session instead of direct routing for MQTT LWT messages
    
    Using direct routing skips authorization for "Last Will and Testament"
    messages (a.k.a. "will" messages). This commit fixes that problem by
    using the internal session that is established for normal message
    production and consumption.
---
 .../artemis/core/protocol/mqtt/MQTTLogger.java     |  4 ++
 .../core/protocol/mqtt/MQTTPublishManager.java     |  9 ++---
 .../artemis/core/protocol/mqtt/MQTTSession.java    |  3 ++
 .../controlpackets/PublishTestsWithSecurity.java   | 47 ++++++++++++++++++++++
 4 files changed, 58 insertions(+), 5 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
index 17a1295eb6..a765de1e78 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTLogger.java
@@ -80,4 +80,8 @@ public interface MQTTLogger extends BasicLogger {
    @LogMessage(level = Logger.Level.ERROR)
    @Message(id = 834006, value = "Failed to publish MQTT message: {0}.", 
format = Message.Format.MESSAGE_FORMAT)
    void failedToPublishMqttMessage(String exceptionMessage, @Cause Throwable 
t);
+
+   @LogMessage(level = Logger.Level.ERROR)
+   @Message(id = 834007, value = "Authorization failure sending will message: 
{0}", format = Message.Format.MESSAGE_FORMAT)
+   void authorizationFailureSendingWillMessage(String message);
 }
\ No newline at end of file
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index ee1dfdd211..38d6e4cc44 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -214,11 +214,7 @@ public class MQTTPublishManager {
 
             Transaction tx = session.getServerSession().newTransaction();
             try {
-               if (internal) {
-                  session.getServer().getPostOffice().route(serverMessage, tx, 
true);
-               } else {
-                  session.getServerSession().send(tx, serverMessage, true, 
false);
-               }
+               session.getServerSession().send(tx, serverMessage, true, false);
 
                if (message.fixedHeader().isRetain()) {
                   ByteBuf payload = message.payload();
@@ -228,6 +224,9 @@ public class MQTTPublishManager {
                tx.commit();
             } catch (ActiveMQSecurityException e) {
                tx.rollback();
+               if (internal) {
+                  throw e;
+               }
                if (session.getVersion() == MQTTVersion.MQTT_5) {
                   sendMessageAck(internal, qos, messageId, 
MQTTReasonCodes.NOT_AUTHORIZED);
                   return;
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
index d434ca55ba..db4295b100 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSession.java
@@ -25,6 +25,7 @@ import io.netty.handler.codec.mqtt.MqttMessageBuilders;
 import io.netty.handler.codec.mqtt.MqttProperties;
 import io.netty.handler.codec.mqtt.MqttPublishMessage;
 import io.netty.handler.codec.mqtt.MqttQoS;
+import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.core.config.WildcardConfiguration;
 import org.apache.activemq.artemis.core.persistence.CoreMessageObjectPools;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -277,6 +278,8 @@ public class MQTTSession {
          getMqttPublishManager().sendToQueue(publishMessage, true);
          state.setWillSent(true);
          state.setWillMessage(null);
+      } catch (ActiveMQSecurityException e) {
+         
MQTTLogger.LOGGER.authorizationFailureSendingWillMessage(e.getMessage());
       } catch (Exception e) {
          MQTTLogger.LOGGER.errorSendingWillMessage(e);
       }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java
index ba107493ec..5b902fbcf7 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/spec/controlpackets/PublishTestsWithSecurity.java
@@ -18,13 +18,17 @@
 package 
org.apache.activemq.artemis.tests.integration.mqtt5.spec.controlpackets;
 
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTReasonCodes;
 import org.apache.activemq.artemis.tests.integration.mqtt5.MQTT5TestSupport;
+import org.apache.activemq.artemis.tests.util.RandomUtil;
 import org.eclipse.paho.mqttv5.client.MqttClient;
 import org.eclipse.paho.mqttv5.client.MqttConnectionOptions;
 import org.eclipse.paho.mqttv5.client.MqttConnectionOptionsBuilder;
 import org.eclipse.paho.mqttv5.common.MqttException;
+import org.eclipse.paho.mqttv5.common.MqttMessage;
 import org.jboss.logging.Logger;
 import org.junit.Test;
 
@@ -83,4 +87,47 @@ public class PublishTestsWithSecurity extends 
MQTT5TestSupport {
 
       client.isConnected();
    }
+
+   @Test(timeout = DEFAULT_TIMEOUT)
+   public void testWillAuthorizationSuccess() throws Exception {
+      internalTestWillAuthorization(fullUser, fullPass, true);
+   }
+
+   @Test(timeout = DEFAULT_TIMEOUT)
+   public void testWillAuthorizationFailure() throws Exception {
+      internalTestWillAuthorization(noprivUser, noprivPass, false);
+   }
+
+   private void internalTestWillAuthorization(String username, String 
password, boolean succeed) throws Exception {
+      final byte[] WILL = RandomUtil.randomBytes();
+      final String TOPIC = RandomUtil.randomString();
+
+      // consumer of the will message
+      MqttClient client1 = createPahoClient("willConsumer");
+      CountDownLatch latch = new CountDownLatch(1);
+      client1.setCallback(new DefaultMqttCallback() {
+         @Override
+         public void messageArrived(String topic, MqttMessage message) {
+            latch.countDown();
+         }
+      });
+      MqttConnectionOptions options = new MqttConnectionOptionsBuilder()
+         .username(fullUser)
+         .password(fullPass.getBytes(StandardCharsets.UTF_8))
+         .build();
+      client1.connect(options);
+      client1.subscribe(TOPIC, 1);
+
+      // consumer to generate the will
+      MqttClient client2 = createPahoClient("willGenerator");
+      options = new MqttConnectionOptionsBuilder()
+         .username(username)
+         .password(password.getBytes(StandardCharsets.UTF_8))
+         .will(TOPIC, new MqttMessage(WILL))
+         .build();
+      client2.connect(options);
+      client2.disconnectForcibly(0, 0, false);
+
+      assertEquals(succeed, latch.await(2, TimeUnit.SECONDS));
+   }
 }

Reply via email to