This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/main by this push:
     new de0f6ac8f5 ARTEMIS-4760 creating MQTT consumer should work if 
auto-create-queues is false
de0f6ac8f5 is described below

commit de0f6ac8f54eb699a175c700c044f4f0bf8b6ef2
Author: Justin Bertram <[email protected]>
AuthorDate: Fri May 3 23:26:34 2024 -0500

    ARTEMIS-4760 creating MQTT consumer should work if auto-create-queues is 
false
---
 .../protocol/mqtt/MQTTSubscriptionManager.java     | 16 ++---
 .../jms/multiprotocol/JMSTopicSubscriberTest.java  | 75 ++++++++++++++++++++++
 .../artemis/tests/integration/mqtt5/MQTT5Test.java | 15 +++++
 .../artemis/tests/integration/stomp/StompTest.java | 13 ++++
 4 files changed, 111 insertions(+), 8 deletions(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index e66c880d19..377d19c9d1 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -104,8 +104,9 @@ public class MQTTSubscriptionManager {
    private void addSubscription(MqttTopicSubscription subscription, Integer 
subscriptionIdentifier, boolean initialStart) throws Exception {
       String rawTopicName = 
CompositeAddress.extractAddressName(subscription.topicName());
       String parsedTopicName = 
MQTTUtil.decomposeSharedSubscriptionTopicFilter(rawTopicName).getB();
+      boolean isFullyQualified = 
CompositeAddress.isFullyQualified(subscription.topicName());
 
-      Queue q = createQueueForSubscription(rawTopicName, parsedTopicName);
+      Queue q = createQueueForSubscription(rawTopicName, parsedTopicName, 
isFullyQualified);
 
       int qos = subscription.qualityOfService().value();
 
@@ -146,20 +147,20 @@ public class MQTTSubscriptionManager {
       }
    }
 
-   private Queue createQueueForSubscription(String rawTopicName, String 
parsedTopicName) throws Exception {
+   private Queue createQueueForSubscription(String rawTopicName, String 
parsedTopicName, boolean isFullyQualified) throws Exception {
       String coreAddress = 
MQTTUtil.getCoreAddressFromMqttTopic(parsedTopicName, 
session.getWildcardConfiguration());
       String coreQueue = MQTTUtil.getCoreQueueFromMqttTopic(rawTopicName, 
session.getState().getClientId(), session.getWildcardConfiguration());
 
-      // check to see if a subscription queue already exists.
+      // check to see if a subscription queue already exists
       Queue q = session.getServer().locateQueue(coreQueue);
 
-      // The queue does not exist so we need to create it.
+      // the subscription queue does not exist so we need to create it
       if (q == null) {
          SimpleString sAddress = SimpleString.toSimpleString(coreAddress);
 
-         // Check we can auto create queues.
+         // only check if we can auto create queues if it's FQQN
          BindingQueryResult bindingQueryResult = 
session.getServerSession().executeBindingQuery(sAddress);
-         if (!bindingQueryResult.isAutoCreateQueues()) {
+         if (isFullyQualified && !bindingQueryResult.isAutoCreateQueues()) {
             throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(sAddress);
          }
 
@@ -169,8 +170,7 @@ public class MQTTSubscriptionManager {
             if (!bindingQueryResult.isAutoCreateAddresses()) {
                throw 
ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(sAddress);
             }
-            addressInfo = session.getServerSession().createAddress(sAddress,
-                                                                   
RoutingType.MULTICAST, true);
+            addressInfo = session.getServerSession().createAddress(sAddress, 
RoutingType.MULTICAST, true);
          }
          return findOrCreateQueue(bindingQueryResult, addressInfo, coreQueue);
       }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSTopicSubscriberTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSTopicSubscriberTest.java
new file mode 100644
index 0000000000..8a86c7e9b4
--- /dev/null
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSTopicSubscriberTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class JMSTopicSubscriberTest extends MultiprotocolJMSClientTestSupport {
+
+   protected static final Logger logger = 
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+   @Override
+   protected void addConfiguration(ActiveMQServer server) throws Exception {
+      
server.getAddressSettingsRepository().getMatch(getTopicName()).setAutoCreateQueues(false);
+   }
+
+   @Test
+   @Timeout(value = 30_000, unit = TimeUnit.MILLISECONDS)
+   public void testCoreSubscriptionQueueCreatedWhenAutoCreateDisabled() throws 
Exception {
+      Connection connection =  createCoreConnection();
+      testSubscriptionQueueCreatedWhenAutoCreateDisabled(connection);
+   }
+
+   @Test
+   @Timeout(value = 30_000, unit = TimeUnit.MILLISECONDS)
+   public void testOpenWireSubscriptionQueueCreatedWhenAutoCreateDisabled() 
throws Exception {
+      Connection connection =  createOpenWireConnection();
+      testSubscriptionQueueCreatedWhenAutoCreateDisabled(connection);
+   }
+
+   @Test
+   @Timeout(value = 30_000, unit = TimeUnit.MILLISECONDS)
+   public void testAMQPSubscriptionQueueCreatedWhenAutoCreateDisabled() throws 
Exception {
+      Connection connection =  createConnection();
+      testSubscriptionQueueCreatedWhenAutoCreateDisabled(connection);
+   }
+
+   private void testSubscriptionQueueCreatedWhenAutoCreateDisabled(Connection 
connection) throws Exception {
+      try {
+         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+         javax.jms.Topic topic = session.createTopic(getTopicName());
+         assertEquals(0, 
server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(getTopicName())).size());
+         session.createConsumer(topic);
+         Wait.assertEquals(1, () -> 
server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(getTopicName())).size(),
 2000, 100);
+      } finally {
+         connection.close();
+      }
+   }
+}
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index 6a6ac7b0dd..18012a59a3 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -698,4 +698,19 @@ public class MQTT5Test extends MQTT5TestSupport {
       client.disconnect();
       client.close();
    }
+
+   @Test
+   @Timeout(value = DEFAULT_TIMEOUT, unit = TimeUnit.MILLISECONDS)
+   public void testSubscriptionQueueCreatedWhenAutoCreateDisabled() throws 
Exception {
+      final String topic = "a/b";
+      final String clientID = "myClientID";
+      
server.getAddressSettingsRepository().getMatch(topic).setAutoCreateQueues(false);
+
+      MqttClient client = createPahoClient(clientID);
+      client.connect();
+      client.subscribe(topic, 1);
+      Wait.assertTrue(() -> getSubscriptionQueue(topic, clientID) != null, 
2000, 100);
+      client.disconnect();
+      client.close();
+   }
 }
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index e136a06667..af4a4d4fe8 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -830,6 +830,19 @@ public class StompTest extends StompTestBase {
       conn.disconnect();
    }
 
+   @TestTemplate
+   public void testSubscriptionQueueCreatedWhenAutoCreateDisabled() throws 
Exception {
+      SimpleString topic = SimpleString.toSimpleString(getTopicPrefix() + 
getTopicName());
+      
server.getAddressSettingsRepository().getMatch(topic.toString()).setAutoCreateQueues(false);
+      conn.connect(defUser, defPass);
+
+      assertEquals(0, 
server.getPostOffice().getBindingsForAddress(topic).size());
+      subscribeTopic(conn, null, null, null, true);
+      Wait.assertEquals(1, () -> 
server.getPostOffice().getBindingsForAddress(topic).size(), 2000, 100);
+
+      conn.disconnect();
+   }
+
    @TestTemplate
    public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
       conn.connect(defUser, defPass);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact


Reply via email to