[jira] [Commented] (AMQ-4585) MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session
[ https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14171232#comment-14171232 ] Timothy Bish commented on AMQ-4585: --- You'll need to try and raise the priority of that with the Fuse client project. I beleive the broker side troubles have been resolved in this area. > MQTT BlockingConnection.receive fails when receiving pending messages after > reconnect without cleaning session > -- > > Key: AMQ-4585 > URL: https://issues.apache.org/jira/browse/AMQ-4585 > Project: ActiveMQ > Issue Type: Bug >Affects Versions: 5.8.0 >Reporter: Pedro Marques > Fix For: 5.10.0 > > Attachments: MQTTTest.java > > > The system throws at least three different types of exceptions when a > subscriber receives the first pending message without cleaning the session. > The test case corresponds to receiving several messages from a publisher then > closing the subscriber connection and finally reconnecting with > setCleanSession(false) and attempt to read the messages published while the > subscriber was disconnected. > The exceptions thrown: > {code} > java.net.ProtocolException: Command from server contained an invalid message > id: 1 > at > org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.lang.ArrayIndexOutOfBoundsException: 0 > at > org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81) > at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.net.ProtocolException: Unexpected MQTT command type: 0 > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > No message is shown in the server. The problem doesn't occur always but most > of the times the first reconnection attempt is made. With > setCleanSession(true) the system works fine. > Code sample (publisher, permanently running): > {code} > MQTT mqtt = new MQTT(); > mqtt.setHost(url); > mqtt.setUserName(user); > mqtt.setPassword(password); > mqtt.setClientId("test_id"); > int
[jira] [Commented] (AMQ-4585) MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session
[ https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14171220#comment-14171220 ] Davy De Waele commented on AMQ-4585: I think there is still an issue with the FuseSource client (Even the latest version). It's easy to reproduce the timing issue. See the Github issue for more details. The issue in the latest FuseSource client is that a PUBACK can be processed before the PUB was completed. When the client sends a PUB over the wire it can happen that it immediately starts processing the PUBACK before it finished its internal bookkeeping on the PUB. (FuseSource thinks the PUBACK is an invalid message id because it hasn't fully registered the PUB yet). > MQTT BlockingConnection.receive fails when receiving pending messages after > reconnect without cleaning session > -- > > Key: AMQ-4585 > URL: https://issues.apache.org/jira/browse/AMQ-4585 > Project: ActiveMQ > Issue Type: Bug >Affects Versions: 5.8.0 >Reporter: Pedro Marques > Fix For: 5.10.0 > > Attachments: MQTTTest.java > > > The system throws at least three different types of exceptions when a > subscriber receives the first pending message without cleaning the session. > The test case corresponds to receiving several messages from a publisher then > closing the subscriber connection and finally reconnecting with > setCleanSession(false) and attempt to read the messages published while the > subscriber was disconnected. > The exceptions thrown: > {code} > java.net.ProtocolException: Command from server contained an invalid message > id: 1 > at > org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.lang.ArrayIndexOutOfBoundsException: 0 > at > org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81) > at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.net.ProtocolException: Unexpected MQTT command type: 0 > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
[jira] [Commented] (AMQ-4585) MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session
[ https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14170740#comment-14170740 ] Timothy Bish commented on AMQ-4585: --- There are fixes to both the client and the MQTT support on the broker. You should run with the latest release of both. > MQTT BlockingConnection.receive fails when receiving pending messages after > reconnect without cleaning session > -- > > Key: AMQ-4585 > URL: https://issues.apache.org/jira/browse/AMQ-4585 > Project: ActiveMQ > Issue Type: Bug >Affects Versions: 5.8.0 >Reporter: Pedro Marques > Fix For: 5.10.0 > > Attachments: MQTTTest.java > > > The system throws at least three different types of exceptions when a > subscriber receives the first pending message without cleaning the session. > The test case corresponds to receiving several messages from a publisher then > closing the subscriber connection and finally reconnecting with > setCleanSession(false) and attempt to read the messages published while the > subscriber was disconnected. > The exceptions thrown: > {code} > java.net.ProtocolException: Command from server contained an invalid message > id: 1 > at > org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.lang.ArrayIndexOutOfBoundsException: 0 > at > org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81) > at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.net.ProtocolException: Unexpected MQTT command type: 0 > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > No message is shown in the server. The problem doesn't occur always but most > of the times the first reconnection attempt is made. With > setCleanSession(true) the system works fine. > Code sample (publisher, permanently running): > {code} > MQTT mqtt = new MQTT(); > mqtt.setHost(url); > mqtt.setUserName(user); > mqtt.setPassword(password); > mqtt.setClientId("test_id"); > int i = 0; > while (true) { >
[jira] [Commented] (AMQ-4585) MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session
[ https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14170566#comment-14170566 ] Davy De Waele commented on AMQ-4585: [~tabish121] Can you confirm that work was done on AcitveMQ for this ? I also feel like this is an issue on the MQTT Client (FuseSource) We're seeing the exact same issue on our stack: - Apache Active MQ 5.10.0 - Camel 2.14 - MQTT Client 1.10.0 (FuseSource) There appears to be a github issue for it as well : https://github.com/fusesource/mqtt-client/issues/26 > MQTT BlockingConnection.receive fails when receiving pending messages after > reconnect without cleaning session > -- > > Key: AMQ-4585 > URL: https://issues.apache.org/jira/browse/AMQ-4585 > Project: ActiveMQ > Issue Type: Bug >Affects Versions: 5.8.0 >Reporter: Pedro Marques > Fix For: 5.10.0 > > Attachments: MQTTTest.java > > > The system throws at least three different types of exceptions when a > subscriber receives the first pending message without cleaning the session. > The test case corresponds to receiving several messages from a publisher then > closing the subscriber connection and finally reconnecting with > setCleanSession(false) and attempt to read the messages published while the > subscriber was disconnected. > The exceptions thrown: > {code} > java.net.ProtocolException: Command from server contained an invalid message > id: 1 > at > org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.lang.ArrayIndexOutOfBoundsException: 0 > at > org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81) > at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.net.ProtocolException: Unexpected MQTT command type: 0 > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > No message is shown in the server. The problem doesn't occur always but most > of the times the first reconnection attempt is made. With > setCleanSession(true) th
[jira] [Commented] (AMQ-4585) MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session
[ https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13688443#comment-13688443 ] Timothy Bish commented on AMQ-4585: --- The problem is a tricky one, it looks like there might be an issue in the MQTT client. The issue happens on subscribe when for some reason the client receives two SUBACK frames when there appears to only have been one sent. You can see it in this trace log where the SUBSCRIBE followed by a PUBLISH results in two SUBACK frames and the second one causes the client to bail. {noformat} 2013-06-19 16:50:40,556 | DEBUG | MQTTProtocolConverter | MQTT Client MQTT-Sub-Client connected. 2013-06-19 16:50:40,557 | INFO | MQTTTest | Client Received: MQTTFrame { type: CONNACK, qos: AT_MOST_ONCE, dup:false } 2013-06-19 16:50:40,557 | INFO | MQTTTest | MQTT login accepted 2013-06-19 16:50:40,557 | INFO | MQTTTest | Client Sent: MQTTFrame { type: SUBSCRIBE, qos: AT_LEAST_ONCE, dup:false } 2013-06-19 16:50:40,559 | TRACE | MQTTIO | Received: MQTTFrame { type: SUBSCRIBE, qos: AT_LEAST_ONCE, dup:false } 2013-06-19 16:50:40,562 | TRACE | MQTTIO | Sending: MQTTFrame { type: SUBACK, qos: AT_MOST_ONCE, dup:false } 2013-06-19 16:50:40,562 | TRACE | MQTTIO | Sending: MQTTFrame { type: PUBLISH, qos: AT_LEAST_ONCE, dup:false } 2013-06-19 16:50:40,562 | INFO | MQTTTest | Client Received: MQTTFrame { type: SUBACK, qos: AT_MOST_ONCE, dup:false } 2013-06-19 16:50:40,562 | INFO | MQTTTest | Client Received: MQTTFrame { type: SUBACK, qos: AT_MOST_ONCE, dup:false } 2013-06-19 16:50:40,562 | INFO | MQTTTest | Fatal connection failure: %s java.net.ProtocolException: Command from server contained an invalid message id: 1 at org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723) at org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762) at org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) at org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) at org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) at org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) at org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) at org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) at org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) 2013-06-19 16:50:40,563 | INFO | MQTTTest | Client Received: MQTTFrame { type: PUBLISH, qos: AT_LEAST_ONCE, dup:false } 2013-06-19 16:50:40,564 | INFO | BrokerService | Apache ActiveMQ 5.9-SNAPSHOT (localhost, ID:OfficePC-37216-1371675037999-0:1) is shutting down {noformat} > MQTT BlockingConnection.receive fails when receiving pending messages after > reconnect without cleaning session > -- > > Key: AMQ-4585 > URL: https://issues.apache.org/jira/browse/AMQ-4585 > Project: ActiveMQ > Issue Type: Bug >Affects Versions: 5.8.0 >Reporter: Pedro Marques > Attachments: MQTTTest.java > > > The system throws at least three different types of exceptions when a > subscriber receives the first pending message without cleaning the session. > The test case corresponds to receiving several messages from a publisher then > closing the subscriber connection and finally reconnecting with > setCleanSession(false) and attempt to read the messages published while the > subscriber was disconnected. > The exceptions thrown: > {code} > java.net.ProtocolException: Command from server contained an invalid message > id: 1 > at > org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource
[jira] [Commented] (AMQ-4585) MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session
[ https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13687882#comment-13687882 ] Pedro Marques commented on AMQ-4585: Forget about the locking problem when setCleanSession(true)... It was just a distraction on my part, obviously, if I clean the session there are no messages pending and the receive blocks... > MQTT BlockingConnection.receive fails when receiving pending messages after > reconnect without cleaning session > -- > > Key: AMQ-4585 > URL: https://issues.apache.org/jira/browse/AMQ-4585 > Project: ActiveMQ > Issue Type: Bug >Affects Versions: 5.8.0 >Reporter: Pedro Marques > Attachments: MQTTTest.java > > > The system throws at least three different types of exceptions when a > subscriber receives the first pending message without cleaning the session. > The test case corresponds to receiving several messages from a publisher then > closing the subscriber connection and finally reconnecting with > setCleanSession(false) and attempt to read the messages published while the > subscriber was disconnected. > The exceptions thrown: > {code} > java.net.ProtocolException: Command from server contained an invalid message > id: 1 > at > org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.lang.ArrayIndexOutOfBoundsException: 0 > at > org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81) > at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.net.ProtocolException: Unexpected MQTT command type: 0 > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > No message is shown in the server. The problem doesn't occur always but most > of the times the first reconnection attempt is made. With > setCleanSession(true) the system works fine. > Code sample (publisher, permanently running): > {code} > MQTT mqtt = new MQTT(); > mqtt.setHost(url); > mqtt.setUserName(user); > mqtt.setPassword(password); > mqtt.set
[jira] [Commented] (AMQ-4585) MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session
[ https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13687013#comment-13687013 ] Pedro Marques commented on AMQ-4585: I was actually trying to replicate the problem but I was having some problems making the test fail consistently... I attached the code I was trying (testReceivePendingMessages), even with high values on "numberOfRuns" and "messagesPerRun" the system only fails some times. I also had problems with the tests never stopping... I don't know if it's because I am using windows or if it's something on my test code but the tests easily ended up not stopping, especially when I changed the setCleanSession to true. > MQTT BlockingConnection.receive fails when receiving pending messages after > reconnect without cleaning session > -- > > Key: AMQ-4585 > URL: https://issues.apache.org/jira/browse/AMQ-4585 > Project: ActiveMQ > Issue Type: Bug >Affects Versions: 5.8.0 >Reporter: Pedro Marques > Attachments: MQTTTest.java > > > The system throws at least three different types of exceptions when a > subscriber receives the first pending message without cleaning the session. > The test case corresponds to receiving several messages from a publisher then > closing the subscriber connection and finally reconnecting with > setCleanSession(false) and attempt to read the messages published while the > subscriber was disconnected. > The exceptions thrown: > {code} > java.net.ProtocolException: Command from server contained an invalid message > id: 1 > at > org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.lang.ArrayIndexOutOfBoundsException: 0 > at > org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81) > at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.net.ProtocolException: Unexpected MQTT command type: 0 > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > No message is shown in the server
[jira] [Commented] (AMQ-4585) MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session
[ https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13686995#comment-13686995 ] Timothy Bish commented on AMQ-4585: --- Was able to recreate the issue in a test, added test to trunk. > MQTT BlockingConnection.receive fails when receiving pending messages after > reconnect without cleaning session > -- > > Key: AMQ-4585 > URL: https://issues.apache.org/jira/browse/AMQ-4585 > Project: ActiveMQ > Issue Type: Bug >Affects Versions: 5.8.0 >Reporter: Pedro Marques > > The system throws at least three different types of exceptions when a > subscriber receives the first pending message without cleaning the session. > The test case corresponds to receiving several messages from a publisher then > closing the subscriber connection and finally reconnecting with > setCleanSession(false) and attempt to read the messages published while the > subscriber was disconnected. > The exceptions thrown: > {code} > java.net.ProtocolException: Command from server contained an invalid message > id: 1 > at > org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.lang.ArrayIndexOutOfBoundsException: 0 > at > org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81) > at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.net.ProtocolException: Unexpected MQTT command type: 0 > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > No message is shown in the server. The problem doesn't occur always but most > of the times the first reconnection attempt is made. With > setCleanSession(true) the system works fine. > Code sample (publisher, permanently running): > {code} > MQTT mqtt = new MQTT(); > mqtt.setHost(url); > mqtt.setUserName(user); > mqtt.setPassword(password); > mqtt.setClientId("test_id"); > int i = 0; > while (true) { > BlockingConnection connection = mqtt.blockingConnection(); > connection.connect(); > String messag
[jira] [Commented] (AMQ-4585) MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session
[ https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13685855#comment-13685855 ] Timothy Bish commented on AMQ-4585: --- The test code is easy to work from so I'd try and create a unit test, I won't have time to do that before I leave for vacation but if you can create a test case the fix could be simple and I might get it in quickly. > MQTT BlockingConnection.receive fails when receiving pending messages after > reconnect without cleaning session > -- > > Key: AMQ-4585 > URL: https://issues.apache.org/jira/browse/AMQ-4585 > Project: ActiveMQ > Issue Type: Bug >Affects Versions: 5.8.0 >Reporter: Pedro Marques > > The system throws at least three different types of exceptions when a > subscriber receives the first pending message without cleaning the session. > The test case corresponds to receiving several messages from a publisher then > closing the subscriber connection and finally reconnecting with > setCleanSession(false) and attempt to read the messages published while the > subscriber was disconnected. > The exceptions thrown: > {code} > java.net.ProtocolException: Command from server contained an invalid message > id: 1 > at > org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.lang.ArrayIndexOutOfBoundsException: 0 > at > org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81) > at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.net.ProtocolException: Unexpected MQTT command type: 0 > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > No message is shown in the server. The problem doesn't occur always but most > of the times the first reconnection attempt is made. With > setCleanSession(true) the system works fine. > Code sample (publisher, permanently running): > {code} > MQTT mqtt = new MQTT(); > mqtt.setHost(url); > mqtt.setUserName(user); > mqtt.setPassword(password); > mqtt.setClientId("test_
[jira] [Commented] (AMQ-4585) MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session
[ https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13685840#comment-13685840 ] Pedro Marques commented on AMQ-4585: Yes, well the code is not simple because I used two classes running simultaneously... The publisher always sending messages, the subscriber receiving 10 messages on each "run". The problem occurs when restarting the subscriber after the first run (leaving some time between runs in order to allow the publisher to publish some messages). Theoretically it would be simple to use only one class publish some messages, receive them on the subscriber, close the subscriber, publish more messages, and attempt to reconnect and receive those messages but I haven't tested this method. I should stress that the problem, although it fails frequently, doesn't occur every time, I had multiple "runs" of the subscriber that processed the pending messages correctly. > MQTT BlockingConnection.receive fails when receiving pending messages after > reconnect without cleaning session > -- > > Key: AMQ-4585 > URL: https://issues.apache.org/jira/browse/AMQ-4585 > Project: ActiveMQ > Issue Type: Bug >Affects Versions: 5.8.0 >Reporter: Pedro Marques > > The system throws at least three different types of exceptions when a > subscriber receives the first pending message without cleaning the session. > The test case corresponds to receiving several messages from a publisher then > closing the subscriber connection and finally reconnecting with > setCleanSession(false) and attempt to read the messages published while the > subscriber was disconnected. > The exceptions thrown: > {code} > java.net.ProtocolException: Command from server contained an invalid message > id: 1 > at > org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.lang.ArrayIndexOutOfBoundsException: 0 > at > org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81) > at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.net.ProtocolException: Unexpected MQTT command type: 0 > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtd
[jira] [Commented] (AMQ-4585) MQTT BlockingConnection.receive fails when receiving pending messages after reconnect without cleaning session
[ https://issues.apache.org/jira/browse/AMQ-4585?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13685808#comment-13685808 ] Timothy Bish commented on AMQ-4585: --- Not fully following the code to reproduce, best bet is to create a unit test similar to those in the activemq-mqtt module so we can run it and see the issue and ensure that the fix is preserved into the future. > MQTT BlockingConnection.receive fails when receiving pending messages after > reconnect without cleaning session > -- > > Key: AMQ-4585 > URL: https://issues.apache.org/jira/browse/AMQ-4585 > Project: ActiveMQ > Issue Type: Bug >Affects Versions: 5.8.0 >Reporter: Pedro Marques > > The system throws at least three different types of exceptions when a > subscriber receives the first pending message without cleaning the session. > The test case corresponds to receiving several messages from a publisher then > closing the subscriber connection and finally reconnecting with > setCleanSession(false) and attempt to read the messages published while the > subscriber was disconnected. > The exceptions thrown: > {code} > java.net.ProtocolException: Command from server contained an invalid message > id: 1 > at > org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.lang.ArrayIndexOutOfBoundsException: 0 > at > org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81) > at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40) > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > {code} > java.net.ProtocolException: Unexpected MQTT command type: 0 > at > org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775) > at > org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51) > at > org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392) > at > org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659) > at > org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264) > at > org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538) > at > org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209) > at > org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100) > at > org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77) > {code} > No message is shown in the server. The problem doesn't occur always but most > of the times the first reconnection attempt is made. With > setCleanSession(true) the system works fine. > Code sample (publisher, permanently running): > {code} > MQTT mqtt = new MQTT(); > mqtt.setHost(url); > mqtt.setUserName(user); > mqtt.setPassword(password); > mqtt.setClientId("test_id");