Interrupting Consumer.close() thread puts queue into unusable state
-------------------------------------------------------------------
Key: AMQ-2648
URL: https://issues.apache.org/activemq/browse/AMQ-2648
Project: ActiveMQ
Issue Type: Bug
Components: JMS client
Affects Versions: 5.2.0
Reporter: Parasoft Corporation
We have built a client program for sending and receiving JMS messages. Each
send/receive operation is performed in a thread, so that we can handle timeouts
properly. However, if the thread which is performing the receive() gets
interrupted, the queue no longer responds to receive() requests, even from
another client with a separate JVM.
To reproduce, use two separate programs:
-----------[QueueSendReceiveActiveMQInterrupt.java]--------
import java.util.*;
import javax.jms.*;
import javax.naming.*;
public class QueueSendReceiveActiveMQInterrupt implements MessageListener {
public static void main(String[] args) throws Exception {
useConnectionFactory();
}
private static void useConnectionFactory() throws Exception, JMSException {
ConnectionFactory factory = getConnectionFactoryUsingJNDI();
Connection connect = null;
Session session = null;
connect = factory.createConnection(/*"Admin", "Admin"*/);
session = connect.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination entryDest = session.createQueue("soatest.demo.queue");
Destination exitDest = entryDest;
MessageProducer producer = session.createProducer(entryDest);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageConsumer consumer = session.createConsumer(exitDest);
connect.start();
TextMessage txtMessage = session.createTextMessage();
txtMessage.setJMSReplyTo(exitDest);
txtMessage.setText("Hello 1 from standalone program!");
producer.send(txtMessage);
System.out.println("message 1 sent");
Message msg;
// threaded receive with a kill
ReceiverRunner runner = (new QueueSendReceiveActiveMQInterrupt()).new
ReceiverRunner(consumer);
Thread t = new Thread(runner);
t.setDaemon(true);
t.start();
t.join(1000);
t.interrupt();
msg = runner.getMessage();
if (msg != null) {
System.out.println("msg 1 received: " +
((TextMessage)msg).getText());
} else {
System.out.println("got no message 1");
}
producer.close();
// consumer.close();
// session.close();
// connect.close();
}
private static ConnectionFactory getConnectionFactoryUsingJNDI() throws
Exception {
Object ret = null;
Properties props = new Properties();
props.put(javax.naming.Context.PROVIDER_URL, "tcp://skynet:61616");
props.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY,
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
InitialContext ictx = new javax.naming.InitialContext(props);
Object obj = ictx.lookup("QueueConnectionFactory");
if (obj instanceof Reference) {
Reference ref = (Reference)obj;
String className = ref.getClassName();
System.out.println("Connection factory class name: " + className);
Class cls = Class.forName(className);
ret = cls.newInstance();
} else {
ret = obj;
}
ictx.close();
return (ConnectionFactory)ret;
}
public void onMessage(Message msg) {
if (msg != null) {
try {
System.out.println("msg = " + ((TextMessage)msg).getText());
} catch (JMSException e) {
e.printStackTrace();
}
} else {
System.out.println("got nothing");
}
}
public class ReceiverRunner implements Runnable {
private MessageConsumer consumer;
private Message msg;
public ReceiverRunner(MessageConsumer consumer) {
this.consumer = consumer;
}
public void run() {
try {
msg = consumer.receive(500);
// change the following to a very small amount like 500 and
notice how everything works
consumer.receive(10000); // another receive just so it blocks
and get the thread to stop
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
consumer.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
public Message getMessage() {
return msg;
}
}
}
--
-----------[QueueSendReceiveActiveMQ.java]--------
import java.util.*;
import javax.jms.*;
import javax.naming.*;
public class QueueSendReceiveActiveMQ implements MessageListener {
public static void main(String[] args) throws Exception {
useConnectionFactory();
}
private static void useConnectionFactory() throws Exception, JMSException {
ConnectionFactory factory = getConnectionFactoryUsingJNDI();
Connection connect = null;
Session session = null;
connect = factory.createConnection(/*"Admin", "Admin"*/);
session = connect.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination entryDest = session.createQueue("soatest.demo.queue");
Destination exitDest = entryDest;
MessageProducer producer = session.createProducer(entryDest);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
MessageConsumer consumer = session.createConsumer(exitDest);
connect.start();
TextMessage txtMessage = session.createTextMessage();
txtMessage.setJMSReplyTo(exitDest);
txtMessage.setText("without thread interrupt: Hello 1 from standalone
program!");
producer.send(txtMessage);
System.out.println("without thread interrupt: message 1 sent");
Message msg;
// regular receive
msg = consumer.receive(2000);
if (msg != null) {
System.out.println("msg 1 received: " +
((TextMessage)msg).getText());
} else {
System.out.println("without thread interrupt: got no message 1");
}
producer.close();
consumer.close();
session.close();
connect.close();
}
private static ConnectionFactory getConnectionFactoryUsingJNDI() throws
Exception {
Object ret = null;
Properties props = new Properties();
props.put(javax.naming.Context.PROVIDER_URL, "tcp://skynet:61616");
props.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY,
"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
InitialContext ictx = new javax.naming.InitialContext(props);
Object obj = ictx.lookup("QueueConnectionFactory");
if (obj instanceof Reference) {
Reference ref = (Reference)obj;
String className = ref.getClassName();
System.out.println("Connection factory class name: " + className);
Class cls = Class.forName(className);
ret = cls.newInstance();
} else {
ret = obj;
}
ictx.close();
return (ConnectionFactory)ret;
}
public void onMessage(Message msg) {
if (msg != null) {
try {
System.out.println("msg = " + ((TextMessage)msg).getText());
} catch (JMSException e) {
e.printStackTrace();
}
} else {
System.out.println("got nothing");
}
}
}
--
1) Run QueueSendReceiveActiveMQ alone, notice how it works in sending receiving
messages from the queue.
2) Run QueueSendReceiveActiveMQInterrupt will result in the program halting
(due to some non-daemon thread created by ActiveMQ), then while it is running
run QueueSendReceiveActiveMQ and notice how it fails to retrieve messages from
the queue. If JMS Consumer.close() is excuted in a thread that is interrupted,
it fails and throws an exception and leaves the consumer in some bad state.
Note that the same code does not exhibit this behavior when using other
vendors' MQ solutions.
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.