Hi Hiram,
Thanks for your answer and sorry for taking so long to answer. Attached
you will find two tests:
1 - Class MessageListenerRedeliveryTestActiveMQ3: tests redelivery in
ActiveMQ 3 when message is rolled back in a MessageListener. It shows
that redelivery policy (including initial timeout as well as back off
mode) are not followed. The only setting that works in the policy is the
maximum number of tries. Tested with 3.2.2
2 - Class MessageListenerRedeliveryTestActiveMQ4: tests redelivery in
ActiveMQ 4 when message is rolled back in a MessageListener (same as
above). Redelivery policy does not work at all in this version when the
message is rolled back in MessageListener. It also tests the number of
retries as well as if the initial timeout and back off mode. Tested with
ActiveMQ 4 RC2.
I would like to contribute these classes to Apache ActiveMQ. I will be
happy to hear comments and make any necessary adjustments in this code.
Cheers,
Rodrigo
Hiram Chirino wrote:
Hi Rodrigo,
Is that junit test a patch you want to contibute to Apache? If so,
I'll add it to our test suite asap.
Regards,
Hiram
On 4/15/06, Rodrigo S de Castro <[EMAIL PROTECTED]> wrote:
Hi,
1) I have a MessageListener implementation that rolls back the session if
something goes wrong. I expected the message to be redelivered to this
listener, but that does not happen. It is never redelivered.
This code does NOT work:
private class MessageListenerTest implements MessageListener {
private Session session;
public int counter = 0;
public MessageListenerTest(ActiveMQMessageConsumer session) {
this.session = session;
}
public void onMessage(Message message) {
try {
System.out.println("Message: " + message);
counter++;
if (counter <= 2) {
System.out.println("ROLLBACK");
session.rollback();
} else {
System.out.println("COMMIT");
message.acknowledge();
session.commit();
}
} catch(JMSException e) {
System.err.println("Error when rolling back
transaction");
}
}
}
2) The only I managed to make it redeliver is to pass a reference to
MessageConsumer to the MessageListener implementation, cast it to
ActiveMQMessageConsumer and call its rollback method.
The code below DOES work:
private class MessageListenerTest implements MessageListener {
private ActiveMQMessageConsumer consumer;
public int counter = 0;
public MessageListenerTest(ActiveMQMessageConsumer consumer) {
this.consumer = consumer;
}
public void onMessage(Message message) {
try {
System.out.println("Message: " + message);
counter++;
if (counter <= 2) {
System.out.println("ROLLBACK");
session.rollback();
} else {
System.out.println("COMMIT");
message.acknowledge();
session.commit();
}
} catch(JMSException e) {
System.err.println("Error when rolling back
transaction");
}
}
}
It this right? I think that session.rollback() should work as well.
Below a JUnit code that shows this problem:
[...]
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.RedeliveryPolicy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class MessageListenerRedeliveryTestActiveMQ4 extends TestCase {
private Log log = LogFactory.getLog(getClass());
private Connection connection;
protected void setUp() throws Exception {
connection = createConnection();
}
/**
* @see junit.framework.TestCase#tearDown()
*/
protected void tearDown() throws Exception {
if (connection != null) {
connection.close();
connection = null;
}
}
protected RedeliveryPolicy getRedeliveryPolicy() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(1000);
redeliveryPolicy.setBackOffMultiplier((short) 2);
redeliveryPolicy.setMaximumRedeliveries(2);
redeliveryPolicy.setUseExponentialBackOff(true);
return redeliveryPolicy;
}
protected Connection createConnection() throws Exception {
ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory(
"vm://localhost?broker.persistent=false");
factory.setRedeliveryPolicy(getRedeliveryPolicy());
return factory.createConnection();
}
private class ConsumerMessageListenerTest implements MessageListener {
private ActiveMQMessageConsumer consumer;
public int counter = 0;
public ConsumerMessageListenerTest(ActiveMQMessageConsumer
consumer) {
this.consumer = consumer;
}
public void onMessage(Message message) {
try {
log.info("Message Received: " + message);
counter++;
if (counter <= 3) {
log.info("Message Rollback.");
consumer.rollback();
} else {
log.info("Message Commit.");
message.acknowledge();
consumer.commit();
}
} catch (JMSException e) {
log.error("Error when rolling back
transaction");
}
}
}
private class SessionMessageListenerTest implements MessageListener {
private Session session;
public int counter = 0;
public SessionMessageListenerTest(Session session) {
this.session = session;
}
public void onMessage(Message message) {
try {
log.info("Message Received: " + message);
counter++;
if (counter <= 3) {
log.info("Message Rollback.");
session.rollback();
} else {
log.info("Message Commit.");
message.acknowledge();
session.commit();
}
} catch (JMSException e) {
log.error("Error when rolling back
transaction");
}
}
}
public void testQueueRollbackMessageListener() throws JMSException {
connection.start();
Session session = connection.createSession(true,
Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("queue-" + getName());
MessageProducer producer = createProducer(session, queue);
Message message = createTextMessage(session);
producer.send(message);
session.commit();
MessageConsumer consumer = session.createConsumer(queue);
ActiveMQMessageConsumer mc = (ActiveMQMessageConsumer) consumer;
mc.setRedeliveryPolicy(getRedeliveryPolicy());
SessionMessageListenerTest listener = new
SessionMessageListenerTest(session);
consumer.setMessageListener(listener);
// redelivery works with the code below
/*
* ConsumerMessageListenerTest listener = new
* ConsumerMessageListenerTest(session);
* consumer.setMessageListener(listener);
*/
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
}
// first try
assertEquals(1, listener.counter);
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
}
// second try (redelivery after 1 sec)
assertEquals(2, listener.counter);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
// third try (redelivery after 2 seconds) - it should give up
after that
assertEquals(3, listener.counter);
// create new message
producer.send(createTextMessage(session));
session.commit();
try {
Thread.sleep(500);
} catch (InterruptedException e) {
// ignore
}
// it should be committed, so no redelivery
assertEquals(4, listener.counter);
try {
Thread.sleep(1500);
} catch (InterruptedException e) {
// ignore
}
// no redelivery, counter should still be 4
assertEquals(4, listener.counter);
session.close();
}
private TextMessage createTextMessage(Session session) throws
JMSException {
return session.createTextMessage("Hello");
}
private MessageProducer createProducer(Session session, Destination
queue)
throws JMSException {
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(getDeliveryMode());
return producer;
}
protected int getDeliveryMode() {
return DeliveryMode.PERSISTENT;
}
}/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import java.util.Date;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.activemq.ActiveMQConnectionFactory;
import org.activemq.service.RedeliveryPolicy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class MessageListenerRedeliveryTestActiveMQ3 extends TestCase {
private Log log = LogFactory.getLog(getClass());
private Connection connection;
protected void setUp() throws Exception {
ActiveMQConnectionFactory factory = new
ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
factory.setUseEmbeddedBroker(true);
RedeliveryPolicy redeliveryPolicy =
factory.getEmbeddedBroker().getRedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryTimeout(1000);
redeliveryPolicy.setBackOffIncreaseRate(2);
redeliveryPolicy.setMaximumRetryCount(2);
redeliveryPolicy.setBackOffMode(true);
connection = factory.createConnection();
}
/**
* @see junit.framework.TestCase#tearDown()
*/
protected void tearDown() throws Exception {
if (connection != null) {
connection.close();
connection = null;
}
}
private class SessionMessageListenerTest implements MessageListener {
private Session session;
public int counter = 0;
public SessionMessageListenerTest(Session session) {
this.session = session;
}
public void onMessage(Message message) {
try {
counter++;
log.info("Message Received: " + message);
if (counter <= 3) {
log.info("Message Rollback.");
session.rollback();
} else {
log.info("Message Commit.");
message.acknowledge();
session.commit();
}
} catch(JMSException e) {
log.error("Error when rolling back
transaction");
}
}
}
public void testQueueRollbackMessageListener() throws JMSException {
connection.start();
Session session = connection.createSession(true,
Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue("queue-"+getName());
MessageProducer producer = createProducer(session, queue);
Message message = createTextMessage(session);
producer.send(message);
session.commit();
MessageConsumer consumer = session.createConsumer(queue);
SessionMessageListenerTest listener = new
SessionMessageListenerTest(session);
consumer.setMessageListener(listener);
try {
Thread.sleep(1500);
} catch(InterruptedException e) {
}
// first try
assertEquals(1, listener.counter);
try {
Thread.sleep(1500);
} catch(InterruptedException e) {
}
// second try (redelivery after 1 sec)
assertEquals(2, listener.counter);
try {
Thread.sleep(3000);
} catch(InterruptedException e) {
}
// third try (redelivery after 2 seconds) - it should give up after that
assertEquals(3, listener.counter);
// create new message
producer.send(createTextMessage(session));
session.commit();
try {
Thread.sleep(500);
} catch(InterruptedException e) {
// ignore
}
// it should be committed, so no redelivery
assertEquals(4, listener.counter);
try {
Thread.sleep(1500);
} catch(InterruptedException e) {
// ignore
}
// no redelivery, counter should still be 4
assertEquals(4, listener.counter);
session.close();
}
private TextMessage createTextMessage(Session session) throws JMSException {
return session.createTextMessage("Hello");
}
private MessageProducer createProducer(Session session, Destination queue)
throws JMSException {
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(getDeliveryMode());
return producer;
}
protected int getDeliveryMode() {
return DeliveryMode.PERSISTENT;
}
}