Hi, here is a JUnit test (see below) I wrote to simulate the crash of a worker and the expected redelivery behavior: You have to replace _connectionProvider.getConnection(queueName); with some code of you own to get you a Connection to the Queue.
The CrashingWorker does not commit or rollback the opened session, therefore the dropped connection should be detected and after the timeout the message should be redelivered. But after waiting for 1 minute there is still no message available. Here are the used connection Urls for server and client: Server: java.naming.provider.url=vm:(broker:(tcp://0.0.0.0:61616?jms.useAsyncSen d=true&wireFormat.maxInactivityDuration=20000)?persistent=true)?marshal= false Client: _default_.url = tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=0&wireFormat.maxI nactivityDuration=20000 Any ideas ? Anything that I do wrong ? Besides, I also added some code from the InactivityMonitorTest to my test ( the clientTransport from testClientHang()). About 10 seconds after t.join(); is executed onException(IOException error) is called with an java.io.EOFException Client transport error: at java.io.DataInputStream.readInt(DataInputStream.java:375) at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.jav a:269) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport. java:211) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:2 03) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:186 ) at java.lang.Thread.run(Thread.java:619) So the connection drop can be detected in some way, but what about the automatic redelivery ? Bye, Daniel import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import junit.framework.TestCase; /** * Test case for queue communication and worker failure. */ public class TestQueueWorkerFailure extends TestCase { /** * JMS Connection Provider. */ private JmsConnectionProvider _connectionProvider; /** * {...@inheritdoc} */ public void setUp() throws Exception { super.setUp(); _connectionProvider = new JmsConnectionProvider(); _connectionProvider.start(); } /** * {...@inheritdoc} */ public void tearDown() throws Exception { super.tearDown(); if (_connectionProvider != null) { _connectionProvider.stop(); } } /** * Test send/receive with a crashing consumer. * * @throws Exception * if any error occurs */ public void testSendReceiveOnCrash() throws Exception { final String queueName = "myQueue"; final String sendMsg = "test message"; String rcvMsg = receive(queueName); assertNull(rcvMsg); send(queueName, sendMsg); final Thread t = new Thread(new CrashingWorker(queueName)); t.start(); t.join(); Thread.sleep(60000); rcvMsg = receive(queueName); assertNotNull(rcvMsg); // test fails here because rcvMsg is null assertEquals(sendMsg, rcvMsg); Thread.sleep(60000); rcvMsg = receive(queueName); assertNull(rcvMsg); } /** * Send a message to the given queue. * * @param queueName * the name of the queue * @param msg * the msg text * @throws Exception * if any error occurs */ private void send(final String queueName, final String msg) throws Exception { Session session = null; try { final Connection connection = _connectionProvider.getConnection(queueName); session = connection.createSession(true, Session.SESSION_TRANSACTED); final Message message = session.createTextMessage(); message.setStringProperty("MSG_TEXT", msg); final Destination destination = session.createQueue(queueName); final MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); producer.send(message); session.commit(); } catch (final Exception e) { e.printStackTrace(); rollbackQuietly(session); throw e; } finally { if (session != null) { closeQuietly(session); } } } /** * Receive a message from the given queue. * * @param queueName * name of the queue * @return the received message text or null * @throws Exception * if any error occurs */ private String receive(final String queueName) throws Exception { Session session = null; try { final Connection connection = _connectionProvider.getConnection(queueName); session = connection.createSession(true, Session.SESSION_TRANSACTED); final Destination destination = session.createQueue(queueName); final MessageConsumer consumer = session.createConsumer(destination); String msgText = null; final Message message = consumer.receiveNoWait(); if (message != null) { msgText = message.getStringProperty("MSG_TEXT"); } session.commit(); return msgText; } catch (final Exception e) { e.printStackTrace(); session.rollback(); throw e; } finally { if (session != null) { closeQuietly(session); } } } /** * Close the given session quietly. * * @param session * the session to close */ private void closeQuietly(final Session session) { if (session != null) { try { session.close(); } catch (final Throwable e) { e.printStackTrace(); } } } /** * Rollback the given session quietly. * * @param session * the session to rollback */ private void rollbackQuietly(final Session session) { if (session != null) { try { session.rollback(); } catch (final Throwable e) { e.printStackTrace(); } } } /** * Worker that simulates a crash by not committing or rolling back or closing the session it received a message from. */ private class CrashingWorker implements Runnable { /** * The name of the queue to receive messages from. */ private String _queueName; /** * Conversion constructor. * * @param queueName * the name of the queue to receive messages from. */ public CrashingWorker(final String queueName) { _queueName = queueName; } /** * {...@inheritdoc} */ public void run() { Session session = null; try { final Connection connection = _connectionProvider.getConnection(_queueName); session = connection.createSession(true, Session.SESSION_TRANSACTED); final Destination destination = session.createQueue(_queueName); final MessageConsumer consumer = session.createConsumer(destination); final Message message = consumer.receiveNoWait(); if (message != null) { System.out.println("received message: " + message.getStringProperty("MSG_TEXT")); } else { System.out.println("received no message"); } } catch (final Exception e) { e.printStackTrace(); } finally { System.out.println("Simulate worker crash, session is not committed, rolled back or closed"); } } } }