This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new de0f6ac8f5 ARTEMIS-4760 creating MQTT consumer should work if
auto-create-queues is false
de0f6ac8f5 is described below
commit de0f6ac8f54eb699a175c700c044f4f0bf8b6ef2
Author: Justin Bertram <[email protected]>
AuthorDate: Fri May 3 23:26:34 2024 -0500
ARTEMIS-4760 creating MQTT consumer should work if auto-create-queues is
false
---
.../protocol/mqtt/MQTTSubscriptionManager.java | 16 ++---
.../jms/multiprotocol/JMSTopicSubscriberTest.java | 75 ++++++++++++++++++++++
.../artemis/tests/integration/mqtt5/MQTT5Test.java | 15 +++++
.../artemis/tests/integration/stomp/StompTest.java | 13 ++++
4 files changed, 111 insertions(+), 8 deletions(-)
diff --git
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index e66c880d19..377d19c9d1 100644
---
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -104,8 +104,9 @@ public class MQTTSubscriptionManager {
private void addSubscription(MqttTopicSubscription subscription, Integer
subscriptionIdentifier, boolean initialStart) throws Exception {
String rawTopicName =
CompositeAddress.extractAddressName(subscription.topicName());
String parsedTopicName =
MQTTUtil.decomposeSharedSubscriptionTopicFilter(rawTopicName).getB();
+ boolean isFullyQualified =
CompositeAddress.isFullyQualified(subscription.topicName());
- Queue q = createQueueForSubscription(rawTopicName, parsedTopicName);
+ Queue q = createQueueForSubscription(rawTopicName, parsedTopicName,
isFullyQualified);
int qos = subscription.qualityOfService().value();
@@ -146,20 +147,20 @@ public class MQTTSubscriptionManager {
}
}
- private Queue createQueueForSubscription(String rawTopicName, String
parsedTopicName) throws Exception {
+ private Queue createQueueForSubscription(String rawTopicName, String
parsedTopicName, boolean isFullyQualified) throws Exception {
String coreAddress =
MQTTUtil.getCoreAddressFromMqttTopic(parsedTopicName,
session.getWildcardConfiguration());
String coreQueue = MQTTUtil.getCoreQueueFromMqttTopic(rawTopicName,
session.getState().getClientId(), session.getWildcardConfiguration());
- // check to see if a subscription queue already exists.
+ // check to see if a subscription queue already exists
Queue q = session.getServer().locateQueue(coreQueue);
- // The queue does not exist so we need to create it.
+ // the subscription queue does not exist so we need to create it
if (q == null) {
SimpleString sAddress = SimpleString.toSimpleString(coreAddress);
- // Check we can auto create queues.
+ // only check if we can auto create queues if it's FQQN
BindingQueryResult bindingQueryResult =
session.getServerSession().executeBindingQuery(sAddress);
- if (!bindingQueryResult.isAutoCreateQueues()) {
+ if (isFullyQualified && !bindingQueryResult.isAutoCreateQueues()) {
throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(sAddress);
}
@@ -169,8 +170,7 @@ public class MQTTSubscriptionManager {
if (!bindingQueryResult.isAutoCreateAddresses()) {
throw
ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(sAddress);
}
- addressInfo = session.getServerSession().createAddress(sAddress,
-
RoutingType.MULTICAST, true);
+ addressInfo = session.getServerSession().createAddress(sAddress,
RoutingType.MULTICAST, true);
}
return findOrCreateQueue(bindingQueryResult, addressInfo, coreQueue);
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSTopicSubscriberTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSTopicSubscriberTest.java
new file mode 100644
index 0000000000..8a86c7e9b4
--- /dev/null
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/multiprotocol/JMSTopicSubscriberTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.jms.multiprotocol;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+import java.lang.invoke.MethodHandles;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class JMSTopicSubscriberTest extends MultiprotocolJMSClientTestSupport {
+
+ protected static final Logger logger =
LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @Override
+ protected void addConfiguration(ActiveMQServer server) throws Exception {
+
server.getAddressSettingsRepository().getMatch(getTopicName()).setAutoCreateQueues(false);
+ }
+
+ @Test
+ @Timeout(value = 30_000, unit = TimeUnit.MILLISECONDS)
+ public void testCoreSubscriptionQueueCreatedWhenAutoCreateDisabled() throws
Exception {
+ Connection connection = createCoreConnection();
+ testSubscriptionQueueCreatedWhenAutoCreateDisabled(connection);
+ }
+
+ @Test
+ @Timeout(value = 30_000, unit = TimeUnit.MILLISECONDS)
+ public void testOpenWireSubscriptionQueueCreatedWhenAutoCreateDisabled()
throws Exception {
+ Connection connection = createOpenWireConnection();
+ testSubscriptionQueueCreatedWhenAutoCreateDisabled(connection);
+ }
+
+ @Test
+ @Timeout(value = 30_000, unit = TimeUnit.MILLISECONDS)
+ public void testAMQPSubscriptionQueueCreatedWhenAutoCreateDisabled() throws
Exception {
+ Connection connection = createConnection();
+ testSubscriptionQueueCreatedWhenAutoCreateDisabled(connection);
+ }
+
+ private void testSubscriptionQueueCreatedWhenAutoCreateDisabled(Connection
connection) throws Exception {
+ try {
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ javax.jms.Topic topic = session.createTopic(getTopicName());
+ assertEquals(0,
server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(getTopicName())).size());
+ session.createConsumer(topic);
+ Wait.assertEquals(1, () ->
server.getPostOffice().getBindingsForAddress(SimpleString.toSimpleString(getTopicName())).size(),
2000, 100);
+ } finally {
+ connection.close();
+ }
+ }
+}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
index 6a6ac7b0dd..18012a59a3 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt5/MQTT5Test.java
@@ -698,4 +698,19 @@ public class MQTT5Test extends MQTT5TestSupport {
client.disconnect();
client.close();
}
+
+ @Test
+ @Timeout(value = DEFAULT_TIMEOUT, unit = TimeUnit.MILLISECONDS)
+ public void testSubscriptionQueueCreatedWhenAutoCreateDisabled() throws
Exception {
+ final String topic = "a/b";
+ final String clientID = "myClientID";
+
server.getAddressSettingsRepository().getMatch(topic).setAutoCreateQueues(false);
+
+ MqttClient client = createPahoClient(clientID);
+ client.connect();
+ client.subscribe(topic, 1);
+ Wait.assertTrue(() -> getSubscriptionQueue(topic, clientID) != null,
2000, 100);
+ client.disconnect();
+ client.close();
+ }
}
diff --git
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index e136a06667..af4a4d4fe8 100644
---
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -830,6 +830,19 @@ public class StompTest extends StompTestBase {
conn.disconnect();
}
+ @TestTemplate
+ public void testSubscriptionQueueCreatedWhenAutoCreateDisabled() throws
Exception {
+ SimpleString topic = SimpleString.toSimpleString(getTopicPrefix() +
getTopicName());
+
server.getAddressSettingsRepository().getMatch(topic.toString()).setAutoCreateQueues(false);
+ conn.connect(defUser, defPass);
+
+ assertEquals(0,
server.getPostOffice().getBindingsForAddress(topic).size());
+ subscribeTopic(conn, null, null, null, true);
+ Wait.assertEquals(1, () ->
server.getPostOffice().getBindingsForAddress(topic).size(), 2000, 100);
+
+ conn.disconnect();
+ }
+
@TestTemplate
public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
conn.connect(defUser, defPass);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]
For further information, visit: https://activemq.apache.org/contact