hi,

i've created a test to check if a message can be consumed after it was rollbacked and after reconnection using durable topic transaction and jdbc persistence. i would appreciate if someone could verify it.


package org.apache.activemq;

import java.util.ArrayList;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import junit.framework.TestCase;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.FixedCountSubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.derby.jdbc.EmbeddedDataSource;

public class JmsDurableTopicTransactionRedeliverTest extends TestCase {

private static final Log LOG = LogFactory.getLog(JmsDurableTopicTransactionRedeliverTest.class);

    private String serverUri = "vm://localhost?create=false";
    protected BrokerService brokerService;
    protected ConnectionFactory connectionFactory;
    protected Connection connection;
    protected Session session;
    protected MessageConsumer consumer;
    protected MessageProducer producer;
    protected Destination destination;
    protected boolean topic = true;


    /**
* Sends a batch of messages and validates the rollbacked message exists after
     * reconnection.
     *
     * @throws Exception
     */
    public void testReceiveRollback() throws Exception {
Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};

        // lets consume any outstanding messages from prev test runs
        beginTx();
        while (consumer.receive(1000) != null) {
        }
        commitTx();

        // sent both messages
        beginTx();
        producer.send(outbound[0]);
        producer.send(outbound[1]);
        commitTx();

        LOG.info("Sent 0: " + outbound[0]);
        LOG.info("Sent 1: " + outbound[1]);

        ArrayList<Message> messages = new ArrayList<Message>();
        beginTx();
        Message message = consumer.receive(1000);
        messages.add(message);
        assertEquals(outbound[0], message);
        commitTx();


        beginTx();
        message = consumer.receive(1000);
        assertNotNull(message);
        assertEquals(outbound[1], message);
        rollbackTx();
        LOG.info("Rollback 1: " + message);


        reconnect();

        // Consume again.. the prev message
        beginTx();
        message = consumer.receive(9000);
assertNotNull("Should have re-received the message again!", message);
        messages.add(message);
        commitTx();

        Message inbound[] = new Message[messages.size()];
        messages.toArray(inbound);
assertTextMessagesEqual("Rollback did not work", outbound, inbound);
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService broker = new BrokerService();
        JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
        EmbeddedDataSource dataSource = new EmbeddedDataSource();
        dataSource.setDatabaseName("derbyDb");
        dataSource.setCreateDatabase("create");
        jdbc.setDataSource(dataSource);
        jdbc.deleteAllMessages();
        broker.setPersistenceAdapter(jdbc);
        return broker;
    }

    protected void setUp() throws Exception {
        super.setUp();
        brokerService = createBroker();
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(getDefaultPolicy());
        brokerService.setDestinationPolicy(policyMap);
        brokerService.start();

        connectionFactory = createConnectionFactory();
        reconnect();
    }

    protected PolicyEntry getDefaultPolicy() {
        PolicyEntry policy = new PolicyEntry();
        policy.setDispatchPolicy(new RoundRobinDispatchPolicy());
policy.setSubscriptionRecoveryPolicy(new FixedCountSubscriptionRecoveryPolicy());
        return policy;
    }

    protected void tearDown() throws Exception {
        brokerService.stop();
        brokerService = null;
        super.tearDown();

    }

    protected void commitTx() throws Exception {
        session.commit();
    }

    protected void rollbackTx() throws Exception {
        session.rollback();
    }

    public ConnectionFactory createConnectionFactory() throws Exception {
        return new ActiveMQConnectionFactory(serverUri);
    }

public Connection createConnection(ConnectionFactory cf) throws JMSException {
        Connection connection = cf.createConnection();
        if (getClass().getName() != null) {
            connection.setClientID(getClass().getName());
        }
        return connection;
    }


    /**
     * Recreates the connection.
     *
     * @throws JMSException
     */
    protected void reconnect() throws Exception {

        if (connection != null) {
            // Close the prev connection.
            connection.close();
        }
        session = null;
        connection = createConnection(connectionFactory);
        reconnectSession();
        connection.start();
    }

    /**
     * Recreates the connection.
     *
     * @throws JMSException
     */
    protected void reconnectSession() throws JMSException {
        if (session != null) {
            session.close();
        }

session = connection.createSession(true, Session.SESSION_TRANSACTED); destination = session.createTopic("TOPIC." + getClass().getName() + "." + getName());
        producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
consumer = session.createDurableSubscriber((Topic)destination, "testsub");
    }




public MessageConsumer createConsumer(Session session, Destination destination) throws JMSException {

return session.createDurableSubscriber((Topic)destination, "testsub");
    }

protected void assertTextMessagesEqual(String messsage, Message[] firstSet, Message[] secondSet) throws JMSException { assertEquals("Message count does not match: " + messsage, firstSet.length, secondSet.length);

        for (int i = 0; i < secondSet.length; i++) {
            TextMessage m1 = (TextMessage)firstSet[i];
            TextMessage m2 = (TextMessage)secondSet[i];
assertTextMessageEqual("Message " + (i + 1) + " did not match : ", m1, m2);
        }
    }

protected void assertTextMessageEqual(String message, TextMessage m1, TextMessage m2) throws JMSException { assertFalse(message + ": expected {" + m1 + "}, but was {" + m2 + "}", m1 == null ^ m2 == null);

        if (m1 == null) {
            return;
        }

        assertEquals(message, m1.getText(), m2.getText());
    }

    protected void beginTx() throws Exception {
        //no-op for local tx
    }
}


Reply via email to