Hi,

here is a JUnit test (see below) I wrote to simulate the crash of a
worker and the expected redelivery behavior:
You have to replace _connectionProvider.getConnection(queueName); with
some code of you own to get you a Connection to the Queue.

The CrashingWorker does not commit or rollback the opened session,
therefore the dropped connection should be detected and after the
timeout the message should be redelivered. But after waiting for 1
minute there is still no message available.

Here are the used connection Urls for server and client:
Server:
java.naming.provider.url=vm:(broker:(tcp://0.0.0.0:61616?jms.useAsyncSen
d=true&wireFormat.maxInactivityDuration=20000)?persistent=true)?marshal=
false
Client: _default_.url =
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=0&wireFormat.maxI
nactivityDuration=20000

Any ideas ? Anything that I do wrong ?

Besides, I also added some code from the InactivityMonitorTest to my
test ( the clientTransport from testClientHang()). About 10 seconds
after t.join(); is executed onException(IOException error) is called
with an java.io.EOFException
Client transport error:
        at java.io.DataInputStream.readInt(DataInputStream.java:375)
        at
org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.jav
a:269)
        at
org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.
java:211)
        at
org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:2
03)
        at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:186
)
        at java.lang.Thread.run(Thread.java:619)

So the connection drop can be detected in some way, but what about the
automatic redelivery ?


Bye,
Daniel


import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;

import junit.framework.TestCase;

/**
 * Test case for queue communication and worker failure.
 */
public class TestQueueWorkerFailure extends TestCase {

  /**
   * JMS Connection Provider.
   */
  private JmsConnectionProvider _connectionProvider;

  /**
   * {...@inheritdoc}
   */
  public void setUp() throws Exception {
    super.setUp();
    _connectionProvider = new JmsConnectionProvider();
    _connectionProvider.start();
  }

  /**
   * {...@inheritdoc}
   */
  public void tearDown() throws Exception {
    super.tearDown();
    if (_connectionProvider != null) {
      _connectionProvider.stop();
    }
  }
  

  /**
   * Test send/receive with a crashing consumer.
   * 
   * @throws Exception
   *           if any error occurs
   */
  public void testSendReceiveOnCrash() throws Exception {
    final String queueName = "myQueue";
    final String sendMsg = "test message";

    String rcvMsg = receive(queueName);
    assertNull(rcvMsg);

    send(queueName, sendMsg);

    final Thread t = new Thread(new CrashingWorker(queueName));
    t.start();
    t.join();

    Thread.sleep(60000);
    rcvMsg = receive(queueName);
    assertNotNull(rcvMsg); // test fails here because rcvMsg is null
    assertEquals(sendMsg, rcvMsg);

    Thread.sleep(60000);
    rcvMsg = receive(queueName);
    assertNull(rcvMsg);
  }

  /**
   * Send a message to the given queue.
   * 
   * @param queueName
   *          the name of the queue
   * @param msg
   *          the msg text
   * @throws Exception
   *           if any error occurs
   */
  private void send(final String queueName, final String msg) throws
Exception {
    Session session = null;
    try {
      final Connection connection =
_connectionProvider.getConnection(queueName);
      session = connection.createSession(true,
Session.SESSION_TRANSACTED);

      final Message message = session.createTextMessage();
      message.setStringProperty("MSG_TEXT", msg);

      final Destination destination = session.createQueue(queueName);
      final MessageProducer producer =
session.createProducer(destination);
      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
      producer.send(message);
      session.commit();
    } catch (final Exception e) {
      e.printStackTrace();
      rollbackQuietly(session);
      throw e;
    } finally {
      if (session != null) {
        closeQuietly(session);
      }
    }
  }

  /**
   * Receive a message from the given queue.
   * 
   * @param queueName
   *          name of the queue
   * @return the received message text or null
   * @throws Exception
   *           if any error occurs
   */
  private String receive(final String queueName) throws Exception {
    Session session = null;
    try {
      final Connection connection =
_connectionProvider.getConnection(queueName);
      session = connection.createSession(true,
Session.SESSION_TRANSACTED);
      final Destination destination = session.createQueue(queueName);
      final MessageConsumer consumer =
session.createConsumer(destination);

      String msgText = null;
      final Message message = consumer.receiveNoWait();
      if (message != null) {
        msgText = message.getStringProperty("MSG_TEXT");
      }
      session.commit();
      return msgText;
    } catch (final Exception e) {
      e.printStackTrace();
      session.rollback();
      throw e;
    } finally {
      if (session != null) {
        closeQuietly(session);
      }
    }
  }

  /**
   * Close the given session quietly.
   * 
   * @param session
   *          the session to close
   */
  private void closeQuietly(final Session session) {
    if (session != null) {
      try {
        session.close();
      } catch (final Throwable e) {
        e.printStackTrace();
      }
    }
  }

  /**
   * Rollback the given session quietly.
   * 
   * @param session
   *          the session to rollback
   */
  private void rollbackQuietly(final Session session) {
    if (session != null) {
      try {
        session.rollback();
      } catch (final Throwable e) {
        e.printStackTrace();
      }
    }
  }

  /**
   * Worker that simulates a crash by not committing or rolling back or
closing the session it received a message from.
   */
  private class CrashingWorker implements Runnable {

    /**
     * The name of the queue to receive messages from.
     */
    private String _queueName;

    /**
     * Conversion constructor.
     * 
     * @param queueName
     *          the name of the queue to receive messages from.
     */
    public CrashingWorker(final String queueName) {
      _queueName = queueName;
    }

    /**
     * {...@inheritdoc}
     */
    public void run() {
      Session session = null;
      try {
        final Connection connection =
_connectionProvider.getConnection(_queueName);
        session = connection.createSession(true,
Session.SESSION_TRANSACTED);
        final Destination destination = session.createQueue(_queueName);
        final MessageConsumer consumer =
session.createConsumer(destination);

        final Message message = consumer.receiveNoWait();
        if (message != null) {
          System.out.println("received message: " +
message.getStringProperty("MSG_TEXT"));
        } else {
          System.out.println("received no message");
        }
      } catch (final Exception e) {
        e.printStackTrace();
      } finally {
        System.out.println("Simulate worker crash, session is not
committed, rolled back or closed");
      }
    }
  }
}

Reply via email to