[
https://issues.apache.org/jira/browse/ARTEMIS-1272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080568#comment-16080568
]
ASF GitHub Bot commented on ARTEMIS-1272:
-----------------------------------------
Github user mtaylor commented on a diff in the pull request:
https://github.com/apache/activemq-artemis/pull/1389#discussion_r126467367
--- Diff:
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttAcknowledgementTest.java
---
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.mqtt;
+
+import java.util.LinkedList;
+import java.util.concurrent.TimeUnit;
+
+import
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport;
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.jgroups.util.UUID;
+import org.junit.After;
+import org.junit.Test;
+
+public class MqttAcknowledgementTest extends MQTTTestSupport {
+
+ private volatile LinkedList<Integer> messageIds = new LinkedList<>();
+ private volatile boolean messageArrived = false;
+
+ private MqttClient subscriber;
+ MqttClient sender;
+
+ @After
+ public void clean() throws MqttException {
+ messageArrived = false;
+ messageIds.clear();
+ if (subscriber.isConnected()) {
+ subscriber.disconnect();
+ }
+ if (sender.isConnected()) {
+ sender.disconnect();
+ }
+ subscriber.close();
+ sender.close();
+ }
+
+ @Test(timeout = 300000)
+ public void testAcknowledgementQOS1() throws MqttException {
+ test(1);
+ }
+
+ @Test(timeout = 300000, expected = ConditionTimeoutException.class)
+ public void testAcknowledgementQOS0() throws MqttException {
+ test(0);
+ }
+
+ private void test(int qos) throws MqttException {
+ String subscriberId = UUID.randomUUID().toString();
+ String senderId = UUID.randomUUID().toString();
+ String topic = UUID.randomUUID().toString();
+
+ subscriber = createMqttClient(subscriberId);
+ subscriber.subscribe(topic, qos);
+
+ sender = createMqttClient(senderId);
+ sender.publish(topic, UUID.randomUUID().toString().getBytes(), qos,
false);
+ sender.publish(topic, UUID.randomUUID().toString().getBytes(), qos,
false);
+
+ Awaitility.await().atMost(5_000, TimeUnit.MILLISECONDS).until(() ->
messageIds.size() == 2);
+
+ subscriber.messageArrivedComplete(messageIds.getLast(), qos);
+ subscriber.disconnect();
+ subscriber.close();
+ messageArrived = false;
+
+ Awaitility.await().atMost(60_000, TimeUnit.MILLISECONDS).until(() ->
{
+ try {
+ subscriber = createMqttClient(subscriberId);
+ return true;
+ } catch (MqttException e) {
+ return false;
+ }
+ });
+
+ Awaitility.await().atMost(5_000, TimeUnit.MILLISECONDS).until(() ->
messageArrived == true);
+ }
+
--- End diff --
Same comment as above.
> Artemis incorrectly handle MQTT acknowledgement
> -----------------------------------------------
>
> Key: ARTEMIS-1272
> URL: https://issues.apache.org/jira/browse/ARTEMIS-1272
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Components: MQTT
> Affects Versions: 1.5.3, 2.1.0
> Reporter: Odyldzhon Toshbekov
>
> When MQTT client send acknowledgement Artemis acknowledge previous sent
> messages.
> Test case:
> 1. Connect to Broker (client 1)
> 2. Connect to Broker (client 2)
> 2. Send two messages with QOS = 1
> 3. Send acknowledgement for second message
> A.R. Artemis will remove from queue first and second message
> E.R. Artemis should remove only second message
> 4. Send acknowledgement for first message
> A.R. WARN message "attempted to Ack already Ack'd message"
> The cause of the problem located in the class MQTTPublisherManager methods
> "handlePubRec", "handlePubComp", "handlePubAck" and "sendMessage"
> Fix: replace "session.getServerSession().acknowledge" to
> session.getServerSession().individualAcknowledge
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)