Pedro Marques created AMQ-4585:
----------------------------------

             Summary: MQTT BlockingConnection.receive fails when receiving 
pending messages after reconnect without cleaning session
                 Key: AMQ-4585
                 URL: https://issues.apache.org/jira/browse/AMQ-4585
             Project: ActiveMQ
          Issue Type: Bug
    Affects Versions: 5.8.0
            Reporter: Pedro Marques


The system throws at least three different types of exceptions when a 
subscriber receives the first pending message without cleaning the session. The 
test case corresponds to receiving several messages from a publisher then 
closing the subscriber connection and finally reconnecting with 
setCleanSession(false) and attempt to read the messages published while the 
subscriber was disconnected.
The exceptions thrown:
{code}
java.net.ProtocolException: Command from server contained an invalid message 
id: 1
        at 
org.fusesource.mqtt.client.CallbackConnection.completeRequest(CallbackConnection.java:723)
        at 
org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:762)
        at 
org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
        at 
org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
        at 
org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
        at 
org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
        at 
org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
        at 
org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
        at 
org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
        at 
org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
{code}
{code}
java.lang.ArrayIndexOutOfBoundsException: 0
        at 
org.fusesource.mqtt.codec.MessageSupport$AckBase.decode(MessageSupport.java:81)
        at org.fusesource.mqtt.codec.PUBREC.decode(PUBREC.java:40)
        at 
org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:749)
        at 
org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
        at 
org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
        at 
org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
        at 
org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
        at 
org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
        at 
org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
        at 
org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
        at 
org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
{code}
{code}
java.net.ProtocolException: Unexpected MQTT command type: 0
        at 
org.fusesource.mqtt.client.CallbackConnection.processFrame(CallbackConnection.java:775)
        at 
org.fusesource.mqtt.client.CallbackConnection.access$1500(CallbackConnection.java:51)
        at 
org.fusesource.mqtt.client.CallbackConnection$6.onTransportCommand(CallbackConnection.java:392)
        at 
org.fusesource.hawtdispatch.transport.TcpTransport.drainInbound(TcpTransport.java:659)
        at 
org.fusesource.hawtdispatch.transport.SslTransport.drainInbound(SslTransport.java:264)
        at 
org.fusesource.hawtdispatch.transport.TcpTransport$6.run(TcpTransport.java:538)
        at 
org.fusesource.hawtdispatch.internal.NioDispatchSource$3.run(NioDispatchSource.java:209)
        at 
org.fusesource.hawtdispatch.internal.SerialDispatchQueue.run(SerialDispatchQueue.java:100)
        at 
org.fusesource.hawtdispatch.internal.pool.SimpleThread.run(SimpleThread.java:77)
{code}
The problem doesn't occur always but most of the times the first reconnection 
attempt is made. With setCleanSession(true) the system works fine.
Code sample (publisher, permanently running):
{code}
MQTT mqtt = new MQTT();
mqtt.setHost(url);
mqtt.setUserName(user);
mqtt.setPassword(password);
mqtt.setClientId("test_id");

int i = 0;
while (true) {
        BlockingConnection connection = mqtt.blockingConnection();
        connection.connect();
        String message = "TestMessage: " + i;
        connection.publish("TopicA", message.getBytes(), QoS.AT_LEAST_ONCE, 
false);
        System.out.println("Vendor: Sent message.");

        Thread.sleep(2500);
        connection.disconnect();
        Thread.sleep(2500);
        i++;
}
{code}
Code sample (subscriber, fails multiple times when restarting after the 
connection is closed):
{code}
try {
    MQTT = new MQTT();
    mqtt.setHost(url);
    mqtt.setClientId(clientId);
    mqtt.setUserName(user);
    mqtt.setPassword(password);
    mqtt.setCleanSession(false);

    BlockingConnection connection = mqtt.blockingConnection();
    connection.connect();
    Topic[] topics = {new Topic("TopicA", QoS.EXACTLY_ONCE)};
    byte[] qoses = connection.subscribe(topics);
    int numMessages = 1;
    while (numMessages % 10 != 0) {
        Message message = connection.receive();
        byte[] payload = message.getPayload();
        String messageContent = new String(payload);
        System.out.println("Received message from topic: " + message.getTopic() 
+ " Message content: " + messageContent);
        message.ack();
        numMessages++;
    }
} finally {
    if(connection != null) {
    try {
        connection.disconnect();
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}
{code}


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to