[ 
https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14171220#comment-14171220
 ] 

Davy De Waele commented on AMQ-4585:
------------------------------------

I think there is still an issue with the FuseSource client (Even the latest 
version). It's easy to reproduce the timing issue. See the Github issue for 
more details. The issue in the latest FuseSource client is that a PUBACK can be 
processed before the PUB was completed.

When the client sends a PUB over the wire it can happen that it immediately 
starts processing the PUBACK before it finished its internal bookkeeping on the 
PUB. (FuseSource thinks the PUBACK is an invalid message id because it hasn't 
fully registered the PUB yet).

> MQTT BlockingConnection.receive fails when receiving pending messages after 
> reconnect without cleaning session
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4585
>                 URL: https://issues.apache.org/jira/browse/AMQ-4585
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.8.0
>            Reporter: Pedro Marques
>             Fix For: 5.10.0
>
>         Attachments: MQTTTest.java
>
>
> The system throws at least three different types of exceptions when a 
> subscriber receives the first pending message without cleaning the session. 
> The test case corresponds to receiving several messages from a publisher then 
> closing the subscriber connection and finally reconnecting with 
> setCleanSession(false) and attempt to read the messages published while the 
> subscriber was disconnected.
> The exceptions thrown:
> {code}
> java.net.ProtocolException: Command from server contained an invalid message 
> id: 1
>       at 
> org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723)
>       at 
> org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762)
>       at 
> org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
>       at 
> org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
>       at 
> org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
>       at 
> org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
>       at 
> org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
>       at 
> org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
>       at 
> org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
>       at 
> org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> {code}
> {code}
> java.lang.ArrayIndexOutOfBoundsException: 0
>       at 
> org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81)
>       at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40)
>       at 
> org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749)
>       at 
> org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
>       at 
> org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
>       at 
> org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
>       at 
> org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
>       at 
> org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
>       at 
> org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
>       at 
> org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
>       at 
> org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> {code}
> {code}
> java.net.ProtocolException: Unexpected MQTT command type: 0
>       at 
> org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775)
>       at 
> org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
>       at 
> org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
>       at 
> org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
>       at 
> org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
>       at 
> org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
>       at 
> org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
>       at 
> org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
>       at 
> org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
> {code}
> No message is shown in the server. The problem doesn't occur always but most 
> of the times the first reconnection attempt is made. With 
> setCleanSession(true) the system works fine.
> Code sample (publisher, permanently running):
> {code}
> MQTT mqtt = new MQTT();
> mqtt.setHost(url);
> mqtt.setUserName(user);
> mqtt.setPassword(password);
> mqtt.setClientId("test_id");
> int i = 0;
> while (true) {
>       BlockingConnection connection = mqtt.blockingConnection();
>       connection.connect();
>       String message = "TestMessage: " + i;
>       connection.publish("TopicA", message.getBytes(), QoS.AT_LEAST_ONCE, 
> false);
>       System.out.println("Vendor: Sent message.");
>       Thread.sleep(2500);
>       connection.disconnect();
>       Thread.sleep(2500);
>       i++;
> }
> {code}
> Code sample (subscriber, fails multiple times when restarting after the 
> connection is closed):
> {code}
> BlockingConnection connection = null;
> try {
>     MQTT = new MQTT();
>     mqtt.setHost(url);
>     mqtt.setClientId(clientId);
>     mqtt.setUserName(user);
>     mqtt.setPassword(password);
>     mqtt.setCleanSession(false);
>     connection = mqtt.blockingConnection();
>     connection.connect();
>     Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)};
>     byte[] qoses = connection.subscribe(topics);
>     int numMessages = 1;
>     while (numMessages % 10 != 0) {
>         Message message = connection.receive();
>         byte[] payload = message.getPayload();
>         String messageContent = new String(payload);
>         System.out.println("Received message from topic: " + 
> message.getTopic() + " Message content: " + messageContent);
>         message.ack();
>         numMessages++;
>     }
> } finally {
>     if(connection != null) {
>         try {
>             connection.disconnect();
>         } catch (Exception e) {
>             // TODO Auto-generated catch block
>             e.printStackTrace();
>         }
>     }
> }
> {code}
> The test failed when using the current fusesource client (1.5) on ActiveMQ 
> 5.9, on Mosquitto mqtt the code works correctly



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to