Greg Garlak created AMQ-5107:
--------------------------------
Summary: In-flight queue message redelivered to multiple listeners
upon broker shutdown
Key: AMQ-5107
URL: https://issues.apache.org/jira/browse/AMQ-5107
Project: ActiveMQ
Issue Type: Bug
Components: Transport
Affects Versions: 5.9.0
Environment: Windows 7 64Bit - Java "1.6.0_20"
CentOS 6.0 - Java "1.7.0_09-icedtea"
Reporter: Greg Garlak
Fix For: NEEDS_REVIEW
To reproduce:
1) Start 3 or more listener processes (see listener code below)
2) Run producer to push one message on queue (see producer code below)
3) One of the listeners will pick-up the message and sleep for one minute
before auto acknowledging the message
4) Start a shutdown sequence of the broker within the 60 second window (Ctrl-C
or issue Terminate jvm(int) command from Hawtio console)
5) All other idle listeners should get the same message redelivered
simultaneously, each one having deliveryCount incremented
Listener code:
--------------
package com.test;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TestListener {
public static void main(String[] args) {
try {
ActiveMQConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection =
connectionFactory.createConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination destination =
session.createQueue("TEST.QUEUE");
MessageConsumer consumer =
session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
TextMessage textMessage =
(TextMessage) message;
System.out.print("\nReceived "
+ textMessage.getText());
System.out.print(", Redelivery:
" + message.getJMSRedelivered());
System.out.print(", Count: " +
message.getLongProperty("JMSXDeliveryCount"));
Thread.sleep(60000);
System.out.print("... finished
after sleep");
} catch (Exception e) {
e.printStackTrace();
}
}
});
connection.start();
} catch (Exception e) {
e.printStackTrace();
}
}
public TestListener() {
super();
}
}
Producer code:
--------------
package com.test;
import java.util.Date;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class TestProducer {
public static void main(String[] args) {
try {
thread(new HelloWorldProducer(), false);
} catch (Exception e) {
e.printStackTrace();
}
}
public static class HelloWorldProducer implements Runnable {
public void run() {
try {
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection =
connectionFactory.createConnection();
connection.start();
Session session =
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination =
session.createQueue("TEST.QUEUE");
MessageProducer producer =
session.createProducer(destination);
String text = "test message created on " + new
Date();
TextMessage message =
session.createTextMessage(text);
System.out.println("Sent " + text);
producer.send(message);
session.close();
connection.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
public HelloWorldProducer() {}
}
public static void thread(Runnable runnable, boolean daemon) {
Thread brokerThread = new Thread(runnable);
brokerThread.setDaemon(daemon);
brokerThread.start();
}
public TestProducer() {
super();
}
}
--
This message was sent by Atlassian JIRA
(v6.2#6252)