Hi!
I am using qpid-jms-client version 0.40.0 to connect to Azure Service Bus,
but the same happens with the latest, 0.46.0. I'm having issues with the
acknowledge process.
My application receives a message from a queue or topic, and then processes
it. It then does a manual ack.
If this processing takes longer than 30 seconds, the acknowledge() call
fails, and the message goes to the dead letter queue.
If the processing takes less time (say, 20 seconds), then the ack works
fine and the message is consumed.
Here is a sample code that simulates this scenario, using
Thread.sleep(30000). Changing the value to 20000 will make it work as
expected.
If you leave it at 30000, the app gets no exception, and after a few
retries, the number of messages in the queue stays the same, but the DLQ
counter gets increased by one.
package org.apache.qpid.jms.example;import
org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsQueue;import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;public class Receiver { private static
String user = "user";
private static String password = "password";
private static String address =
"amqps://test.servicebus.windows.net"; public static void
main(String[] args) throws Exception {
try {
ConnectionFactory factory = new JmsConnectionFactory(user,
password, address);
Connection connection = factory.createConnection(user, password);
connection.setExceptionListener(new MyExceptionListener());
connection.start();
Session session = connection.createSession(false, 101);
MessageConsumer messageConsumer =
session.createConsumer(new JmsQueue("queue_test"));
Message message = messageConsumer.receive();
System.out.println("Message: ");
System.out.println(message.toString());
System.out.println("Sleep...");
Thread.sleep(30000); // 20000 works, 30000 does not.
System.out.println("Wake up");
System.out.println("Do manual ACK");
message.acknowledge();
connection.close(); } catch (Exception exp) {
exp.printStackTrace(System.out);
}
} private static class MyExceptionListener implements ExceptionListener {
@Override
public void onException(JMSException exception) {
System.out.println("Connection ExceptionListener fired, exiting.");
exception.printStackTrace(System.out);
System.exit(1);
}
}
}
It seems that there is an internal "timeout" value that is returning the
message to the queue before we are done processing it. Can this value be
somehow increased?
Thanks in advance!
Martín Paoloni