[ 
https://issues.apache.org/jira/browse/ARTEMIS-1272?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16080568#comment-16080568
 ] 

ASF GitHub Bot commented on ARTEMIS-1272:
-----------------------------------------

Github user mtaylor commented on a diff in the pull request:

    https://github.com/apache/activemq-artemis/pull/1389#discussion_r126467367
  
    --- Diff: 
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/MqttAcknowledgementTest.java
 ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements. See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.activemq.artemis.tests.integration.mqtt;
    +
    +import java.util.LinkedList;
    +import java.util.concurrent.TimeUnit;
    +
    +import 
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTestSupport;
    +import org.awaitility.Awaitility;
    +import org.awaitility.core.ConditionTimeoutException;
    +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    +import org.eclipse.paho.client.mqttv3.MqttCallback;
    +import org.eclipse.paho.client.mqttv3.MqttClient;
    +import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    +import org.eclipse.paho.client.mqttv3.MqttException;
    +import org.eclipse.paho.client.mqttv3.MqttMessage;
    +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    +import org.jgroups.util.UUID;
    +import org.junit.After;
    +import org.junit.Test;
    +
    +public class MqttAcknowledgementTest extends MQTTTestSupport {
    +
    +   private volatile LinkedList<Integer> messageIds = new LinkedList<>();
    +   private volatile boolean messageArrived = false;
    +
    +   private MqttClient subscriber;
    +   MqttClient sender;
    +
    +   @After
    +   public void clean() throws MqttException {
    +      messageArrived = false;
    +      messageIds.clear();
    +      if (subscriber.isConnected()) {
    +         subscriber.disconnect();
    +      }
    +      if (sender.isConnected()) {
    +         sender.disconnect();
    +      }
    +      subscriber.close();
    +      sender.close();
    +   }
    +
    +   @Test(timeout = 300000)
    +   public void testAcknowledgementQOS1() throws MqttException {
    +      test(1);
    +   }
    +
    +   @Test(timeout = 300000, expected = ConditionTimeoutException.class)
    +   public void testAcknowledgementQOS0() throws MqttException {
    +      test(0);
    +   }
    +
    +   private void test(int qos) throws MqttException {
    +      String subscriberId = UUID.randomUUID().toString();
    +      String senderId = UUID.randomUUID().toString();
    +      String topic = UUID.randomUUID().toString();
    +
    +      subscriber = createMqttClient(subscriberId);
    +      subscriber.subscribe(topic, qos);
    +
    +      sender = createMqttClient(senderId);
    +      sender.publish(topic, UUID.randomUUID().toString().getBytes(), qos, 
false);
    +      sender.publish(topic, UUID.randomUUID().toString().getBytes(), qos, 
false);
    +
    +      Awaitility.await().atMost(5_000, TimeUnit.MILLISECONDS).until(() -> 
messageIds.size() == 2);
    +
    +      subscriber.messageArrivedComplete(messageIds.getLast(), qos);
    +      subscriber.disconnect();
    +      subscriber.close();
    +      messageArrived = false;
    +
    +      Awaitility.await().atMost(60_000, TimeUnit.MILLISECONDS).until(() -> 
{
    +         try {
    +            subscriber = createMqttClient(subscriberId);
    +            return true;
    +         } catch (MqttException e) {
    +            return false;
    +         }
    +      });
    +
    +      Awaitility.await().atMost(5_000, TimeUnit.MILLISECONDS).until(() -> 
messageArrived == true);
    +   }
    +
    --- End diff --
    
    Same comment as above.


> Artemis incorrectly handle MQTT acknowledgement
> -----------------------------------------------
>
>                 Key: ARTEMIS-1272
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-1272
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>          Components: MQTT
>    Affects Versions: 1.5.3, 2.1.0
>            Reporter: Odyldzhon Toshbekov
>
> When MQTT client send acknowledgement Artemis acknowledge previous sent 
> messages.
> Test case:
> 1. Connect to Broker (client 1)
> 2. Connect to Broker (client 2)
> 2. Send two messages with QOS = 1
> 3. Send acknowledgement for second message
> A.R. Artemis will remove from queue first and second message
> E.R. Artemis should remove only second message
> 4. Send acknowledgement for first message
> A.R. WARN message "attempted to Ack already Ack'd message"
> The cause of the problem located in the class MQTTPublisherManager methods 
> "handlePubRec", "handlePubComp", "handlePubAck" and "sendMessage"
> Fix: replace "session.getServerSession().acknowledge" to 
> session.getServerSession().individualAcknowledge 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to