hi,
i've created a test to check if a message can be consumed after it was
rollbacked and after reconnection using durable topic transaction and
jdbc persistence. i would appreciate if someone could verify it.
package org.apache.activemq;
import java.util.ArrayList;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
import
org.apache.activemq.broker.region.policy.FixedCountSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.derby.jdbc.EmbeddedDataSource;
public class JmsDurableTopicTransactionRedeliverTest extends TestCase {
private static final Log LOG =
LogFactory.getLog(JmsDurableTopicTransactionRedeliverTest.class);
private String serverUri = "vm://localhost?create=false";
protected BrokerService brokerService;
protected ConnectionFactory connectionFactory;
protected Connection connection;
protected Session session;
protected MessageConsumer consumer;
protected MessageProducer producer;
protected Destination destination;
protected boolean topic = true;
/**
* Sends a batch of messages and validates the rollbacked message
exists after
* reconnection.
*
* @throws Exception
*/
public void testReceiveRollback() throws Exception {
Message[] outbound = new Message[]
{session.createTextMessage("First Message"),
session.createTextMessage("Second Message")};
// lets consume any outstanding messages from prev test runs
beginTx();
while (consumer.receive(1000) != null) {
}
commitTx();
// sent both messages
beginTx();
producer.send(outbound[0]);
producer.send(outbound[1]);
commitTx();
LOG.info("Sent 0: " + outbound[0]);
LOG.info("Sent 1: " + outbound[1]);
ArrayList<Message> messages = new ArrayList<Message>();
beginTx();
Message message = consumer.receive(1000);
messages.add(message);
assertEquals(outbound[0], message);
commitTx();
beginTx();
message = consumer.receive(1000);
assertNotNull(message);
assertEquals(outbound[1], message);
rollbackTx();
LOG.info("Rollback 1: " + message);
reconnect();
// Consume again.. the prev message
beginTx();
message = consumer.receive(9000);
assertNotNull("Should have re-received the message again!",
message);
messages.add(message);
commitTx();
Message inbound[] = new Message[messages.size()];
messages.toArray(inbound);
assertTextMessagesEqual("Rollback did not work", outbound,
inbound);
}
protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService();
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
EmbeddedDataSource dataSource = new EmbeddedDataSource();
dataSource.setDatabaseName("derbyDb");
dataSource.setCreateDatabase("create");
jdbc.setDataSource(dataSource);
jdbc.deleteAllMessages();
broker.setPersistenceAdapter(jdbc);
return broker;
}
protected void setUp() throws Exception {
super.setUp();
brokerService = createBroker();
PolicyMap policyMap = new PolicyMap();
policyMap.setDefaultEntry(getDefaultPolicy());
brokerService.setDestinationPolicy(policyMap);
brokerService.start();
connectionFactory = createConnectionFactory();
reconnect();
}
protected PolicyEntry getDefaultPolicy() {
PolicyEntry policy = new PolicyEntry();
policy.setDispatchPolicy(new RoundRobinDispatchPolicy());
policy.setSubscriptionRecoveryPolicy(new
FixedCountSubscriptionRecoveryPolicy());
return policy;
}
protected void tearDown() throws Exception {
brokerService.stop();
brokerService = null;
super.tearDown();
}
protected void commitTx() throws Exception {
session.commit();
}
protected void rollbackTx() throws Exception {
session.rollback();
}
public ConnectionFactory createConnectionFactory() throws Exception {
return new ActiveMQConnectionFactory(serverUri);
}
public Connection createConnection(ConnectionFactory cf) throws
JMSException {
Connection connection = cf.createConnection();
if (getClass().getName() != null) {
connection.setClientID(getClass().getName());
}
return connection;
}
/**
* Recreates the connection.
*
* @throws JMSException
*/
protected void reconnect() throws Exception {
if (connection != null) {
// Close the prev connection.
connection.close();
}
session = null;
connection = createConnection(connectionFactory);
reconnectSession();
connection.start();
}
/**
* Recreates the connection.
*
* @throws JMSException
*/
protected void reconnectSession() throws JMSException {
if (session != null) {
session.close();
}
session = connection.createSession(true,
Session.SESSION_TRANSACTED);
destination = session.createTopic("TOPIC." +
getClass().getName() + "." + getName());
producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
consumer = session.createDurableSubscriber((Topic)destination,
"testsub");
}
public MessageConsumer createConsumer(Session session, Destination
destination) throws JMSException {
return session.createDurableSubscriber((Topic)destination,
"testsub");
}
protected void assertTextMessagesEqual(String messsage, Message[]
firstSet, Message[] secondSet) throws JMSException {
assertEquals("Message count does not match: " + messsage,
firstSet.length, secondSet.length);
for (int i = 0; i < secondSet.length; i++) {
TextMessage m1 = (TextMessage)firstSet[i];
TextMessage m2 = (TextMessage)secondSet[i];
assertTextMessageEqual("Message " + (i + 1) + " did not
match : ", m1, m2);
}
}
protected void assertTextMessageEqual(String message, TextMessage
m1, TextMessage m2) throws JMSException {
assertFalse(message + ": expected {" + m1 + "}, but was {" + m2
+ "}", m1 == null ^ m2 == null);
if (m1 == null) {
return;
}
assertEquals(message, m1.getText(), m2.getText());
}
protected void beginTx() throws Exception {
//no-op for local tx
}
}