[ https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Pedro Marques updated AMQ-4585: ------------------------------- Description: The system throws at least three different types of exceptions when a subscriber receives the first pending message without cleaning the session. The test case corresponds to receiving several messages from a publisher then closing the subscriber connection and finally reconnecting with setCleanSession(false) and attempt to read the messages published while the subscriber was disconnected. The exceptions thrown: {code} java.net.ProtocolException: Command from server contained an invalid message id: 1 at org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723) at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) {code} {code} java.lang.ArrayIndexOutOfBoundsException: 0 at org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81) at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40) at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) {code} {code} java.net.ProtocolException: Unexpected MQTT command type: 0 at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) {code} The problem doesn't occur always but most of the times the first reconnection attempt is made. With setCleanSession(true) the system works fine. Code sample (publisher, permanently running): {code} MQTT mqtt = new MQTT(); mqtt.setHost(url); mqtt.setUserName(user); mqtt.setPassword(password); mqtt.setClientId("test_id"); int i = 0; while (true) { BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); String message = "TestMessage: " + i; connection.publish("TopicA", message.getBytes(), QoS.AT_LEAST_ONCE, false); System.out.println("Vendor: Sent message."); Thread.sleep(2500); connection.disconnect(); Thread.sleep(2500); i++; } {code} Code sample (subscriber, fails multiple times when restarting after the connection is closed): {code} BlockingConnection connection = null; try { MQTT = new MQTT(); mqtt.setHost(url); mqtt.setClientId(clientId); mqtt.setUserName(user); mqtt.setPassword(password); mqtt.setCleanSession(false); connection = mqtt.blockingConnection(); connection.connect(); Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)}; byte[] qoses = connection.subscribe(topics); int numMessages = 1; while (numMessages % 10 != 0) { Message message = connection.receive(); byte[] payload = message.getPayload(); String messageContent = new String(payload); System.out.println("Received message from topic: " + message.getTopic() + " Message content: " + messageContent); message.ack(); numMessages++; } } finally { if(connection != null) { try { connection.disconnect(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } } {code} The test failed when using the current fusesource client (1.5) on ActiveMQ 5.9, on Mosquitto mqtt the code works correctly was: The system throws at least three different types of exceptions when a subscriber receives the first pending message without cleaning the session. The test case corresponds to receiving several messages from a publisher then closing the subscriber connection and finally reconnecting with setCleanSession(false) and attempt to read the messages published while the subscriber was disconnected. The exceptions thrown: {code} java.net.ProtocolException: Command from server contained an invalid message id: 1 at org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723) at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) {code} {code} java.lang.ArrayIndexOutOfBoundsException: 0 at org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81) at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40) at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) {code} {code} java.net.ProtocolException: Unexpected MQTT command type: 0 at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) {code} The problem doesn't occur always but most of the times the first reconnection attempt is made. With setCleanSession(true) the system works fine. Code sample (publisher, permanently running): {code} MQTT mqtt = new MQTT(); mqtt.setHost(url); mqtt.setUserName(user); mqtt.setPassword(password); mqtt.setClientId("test_id"); int i = 0; while (true) { BlockingConnection connection = mqtt.blockingConnection(); connection.connect(); String message = "TestMessage: " + i; connection.publish("TopicA", message.getBytes(), QoS.AT_LEAST_ONCE, false); System.out.println("Vendor: Sent message."); Thread.sleep(2500); connection.disconnect(); Thread.sleep(2500); i++; } {code} Code sample (subscriber, fails multiple times when restarting after the connection is closed): {code} BlockingConnection connection = null; try { MQTT = new MQTT(); mqtt.setHost(url); mqtt.setClientId(clientId); mqtt.setUserName(user); mqtt.setPassword(password); mqtt.setCleanSession(false); connection = mqtt.blockingConnection(); connection.connect(); Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)}; byte[] qoses = connection.subscribe(topics); int numMessages = 1; while (numMessages % 10 != 0) { Message message = connection.receive(); byte[] payload = message.getPayload(); String messageContent = new String(payload); System.out.println("Received message from topic: " + message.getTopic() + " Message content: " + messageContent); message.ack(); numMessages++; } } finally { if(connection != null) { try { connection.disconnect(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } {code} The test failed when using the current fusesource client (1.5) on ActiveMQ 5.9, on Mosquitto mqtt the code works correctly > MQTT BlockingConnection.receive fails when receiving pending messages after > reconnect without cleaning session > -------------------------------------------------------------------------------------------------------------- > > Key: AMQ-4585 > URL: https://issues.apache.org/jira/browse/AMQ-4585 > Project: ActiveMQ > Issue Type: Bug > Affects Versions: 5.8.0 > Reporter: Pedro Marques > > The system throws at least three different types of exceptions when a > subscriber receives the first pending message without cleaning the session. > The test case corresponds to receiving several messages from a publisher then > closing the subscriber connection and finally reconnecting with > setCleanSession(false) and attempt to read the messages published while the > subscriber was disconnected. > The exceptions thrown: > {code} > java.net.ProtocolException: Command from server contained an invalid message > id: 1 > at > org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.lang.ArrayIndexOutOfBoundsException: 0 > at > org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81) > at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.net.ProtocolException: Unexpected MQTT command type: 0 > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > The problem doesn't occur always but most of the times the first reconnection > attempt is made. With setCleanSession(true) the system works fine. > Code sample (publisher, permanently running): > {code} > MQTT mqtt = new MQTT(); > mqtt.setHost(url); > mqtt.setUserName(user); > mqtt.setPassword(password); > mqtt.setClientId("test_id"); > int i = 0; > while (true) { > BlockingConnection connection = mqtt.blockingConnection(); > connection.connect(); > String message = "TestMessage: " + i; > connection.publish("TopicA", message.getBytes(), QoS.AT_LEAST_ONCE, > false); > System.out.println("Vendor: Sent message."); > Thread.sleep(2500); > connection.disconnect(); > Thread.sleep(2500); > i++; > } > {code} > Code sample (subscriber, fails multiple times when restarting after the > connection is closed): > {code} > BlockingConnection connection = null; > try { > MQTT = new MQTT(); > mqtt.setHost(url); > mqtt.setClientId(clientId); > mqtt.setUserName(user); > mqtt.setPassword(password); > mqtt.setCleanSession(false); > connection = mqtt.blockingConnection(); > connection.connect(); > Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)}; > byte[] qoses = connection.subscribe(topics); > int numMessages = 1; > while (numMessages % 10 != 0) { > Message message = connection.receive(); > byte[] payload = message.getPayload(); > String messageContent = new String(payload); > System.out.println("Received message from topic: " + > message.getTopic() + " Message content: " + messageContent); > message.ack(); > numMessages++; > } > } finally { > if(connection != null) { > try { > connection.disconnect(); > } catch (Exception e) { > // TODO Auto-generated catch block > e.printStackTrace(); > } > } > } > {code} > The test failed when using the current fusesource client (1.5) on ActiveMQ > 5.9, on Mosquitto mqtt the code works correctly -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira