This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git

commit 5b2c47567dbc318c1c4c1ff821e072e74675ad34
Author: Francesco Nigro <nigro....@gmail.com>
AuthorDate: Mon Mar 11 18:00:30 2019 +0100

    ARTEMIS-1604 Artemis deadlock using MQTT Protocol
    
    MQTT shouldn't support direct deliveries
---
 .../core/protocol/mqtt/MQTTSessionCallback.java    |  5 ++++
 .../artemis/core/protocol/stomp/StompSession.java  |  5 ++++
 .../artemis/core/server/impl/QueueImpl.java        |  2 +-
 .../tests/integration/mqtt/imported/MQTTTest.java  | 20 ++++++++++++++
 .../artemis/tests/integration/stomp/StompTest.java | 32 ++++++++++++++++++++++
 5 files changed, 63 insertions(+), 1 deletion(-)

diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
index a49cf11..50d5732 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionCallback.java
@@ -37,6 +37,11 @@ public class MQTTSessionCallback implements SessionCallback {
    }
 
    @Override
+   public boolean supportsDirectDelivery() {
+      return false;
+   }
+
+   @Override
    public boolean isWritable(ReadyListener callback, Object protocolContext) {
       return connection.isWritable(callback);
    }
diff --git 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
index b355168..80bbbe8 100644
--- 
a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
+++ 
b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompSession.java
@@ -80,6 +80,11 @@ public class StompSession implements SessionCallback {
    }
 
    @Override
+   public boolean supportsDirectDelivery() {
+      return false;
+   }
+
+   @Override
    public boolean isWritable(ReadyListener callback, Object protocolContext) {
       return connection.isWritable(callback);
    }
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 92fc9bf..7e736ca 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2348,7 +2348,7 @@ public class QueueImpl extends CriticalComponentImpl 
implements Queue {
 
    @Override
    public boolean isDirectDeliver() {
-      return directDeliver;
+      return directDeliver && supportsDirectDeliver;
    }
 
    /**
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 9fc6cfd..03bcddd 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -43,6 +43,8 @@ import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.config.Configuration;
 import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
 import org.apache.activemq.artemis.core.config.CoreQueueConfiguration;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
 import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
@@ -63,6 +65,7 @@ import org.fusesource.mqtt.client.Topic;
 import org.fusesource.mqtt.client.Tracer;
 import org.fusesource.mqtt.codec.MQTTFrame;
 import org.fusesource.mqtt.codec.PUBLISH;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -152,6 +155,23 @@ public class MQTTTest extends MQTTTestSupport {
    }
 
    @Test(timeout = 60 * 1000)
+   public void testDirectDeliverFalse() throws Exception {
+      final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
+      initializeConnection(subscriptionProvider);
+
+      subscriptionProvider.subscribe("foo/bah", AT_MOST_ONCE);
+
+
+      for (Binding b : server.getPostOffice().getAllBindings().values()) {
+         if (b instanceof QueueBinding) {
+            Assert.assertFalse("Queue " + ((QueueBinding) 
b).getQueue().getName(), ((QueueBinding)b).getQueue().isDirectDeliver());
+         }
+      }
+
+      subscriptionProvider.disconnect();
+   }
+
+   @Test(timeout = 60 * 1000)
    public void testUnsubscribeMQTT() throws Exception {
       final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
       initializeConnection(subscriptionProvider);
diff --git 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
index 16ffca2..ba0616f 100644
--- 
a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
+++ 
b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java
@@ -46,6 +46,8 @@ import 
org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
 import org.apache.activemq.artemis.api.core.management.ManagementHelper;
 import org.apache.activemq.artemis.api.core.management.ResourceNames;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
+import org.apache.activemq.artemis.core.postoffice.Binding;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.core.postoffice.impl.LocalQueueBinding;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
 import 
org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
@@ -2009,4 +2011,34 @@ public class StompTest extends StompTestBase {
       
assertTrue(server.getAddressInfo(simpleQueueName).getRoutingTypes().contains(RoutingType.MULTICAST));
       Assert.assertNull(server.locateQueue(simpleQueueName));
    }
+
+
+
+   @Test
+   public void directDeliverDisabledOnStomp() throws Exception {
+      String payload = "This is a test message";
+
+      // Set up STOMP subscription
+      conn.connect(defUser, defPass);
+      subscribe(conn, null, Stomp.Headers.Subscribe.AckModeValues.AUTO);
+
+      for (Binding b : server.getPostOffice().getAllBindings().values()) {
+         if (b instanceof QueueBinding) {
+            Assert.assertFalse("Queue " + ((QueueBinding) 
b).getQueue().getName(), ((QueueBinding)b).getQueue().isDirectDeliver());
+         }
+      }
+
+      // Send MQTT Message
+      MQTTClientProvider clientProvider = new FuseMQTTClientProvider();
+      clientProvider.connect("tcp://" + hostname + ":" + port);
+      clientProvider.publish(getQueuePrefix() + getQueueName(), 
payload.getBytes(), 0);
+      clientProvider.disconnect();
+
+      // Receive STOMP Message
+      ClientStompFrame frame = conn.receiveFrame();
+      assertTrue(frame.getBody()
+                    .contains(payload));
+
+   }
+
 }

Reply via email to