Hi,

I've created a test case that tries to reproduce your problem
(attached), but it passes against the trunk version of the code.

What version of ActiveMQ are you using? Can you assure that this test
case simulates the desired behavior?

Cheers

-- 
Dejan Bosanac


http://www.ttmsolutions.com - get a free ActiveMQ user guide

ActiveMQ in Action - http://www.manning.com/snyder/
Scripting in Java - http://www.scriptinginjava.net


sakkew wrote:
> I need to integrate functionality to an existing system so that operators can
> manually reprocess messages from a dead letter queue, that is to put them
> back into the original queue.
>
> I use the QueueViewMBean as shown below. The message is put back to the
> original queue and processed by the ActiveMQ server as it should, but if the
> message fails again, the message disappears. When I restart the server the
> message appears in the DLQ as it should.
>
> Here's the code (I used the Web Monitor as base):
> QueueViewMBean queue = broker.getQueue(dlq);
> queue.moveMessageTo(mapMessage.getJMSMessageID(),
> mapMessage.originalDestination());       
>
> Is there something in the message that should be reset to avoid confusing
> the reprocessing? 
>
>
> Both client and server uses ActiveMQ 5.1
>                                    
>
> cheers, 
> Sakke
>
>   

package org.apache.activemq.broker.policy;

import javax.jms.Message;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class ReprocessingTest extends DeadLetterTest {
    private static final Log LOG = LogFactory.getLog(DeadLetterTest.class);

    private int rollbackCount;

    protected void doTest() throws Exception {
        connection.start();

        ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
        rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1;
        LOG.info("Will redeliver messages: " + rollbackCount + " times");

        makeConsumer();
        makeDlqConsumer();

        sendMessages();

        // now lets receive and rollback N times
        for (int i = 0; i < messageCount; i++) {
            consumeAndRollback(i);
        }

        // move messages back to the original queue
       
        QueueViewMBean queue = getQueue();
        assertEquals(messageCount, queue.getQueueSize());
        CompositeData[] compdatalist = queue.browse();
        String[] messageIDs = new String[messageCount];
        for (int i = 0; i < messageCount; i++) {
            CompositeData cdata = compdatalist[i];
            String messageID = (String) cdata.get("JMSMessageID");
            messageIDs[i] = messageID;
        }
        
        for (String messageID : messageIDs) {
            queue.moveMessageTo(messageID, getDestinationString());
        }
        assertEquals(0, queue.getQueueSize());
        
        
        rollbackCount = 1;
        // now consume again
        for (int i = 0; i < messageCount; i++) {
            consumeAndRollback(i);
        }
        assertEquals(messageCount, queue.getQueueSize());
     
        
    }
    
    protected QueueViewMBean getQueue() throws Exception {
    	MBeanServer mbeanServer = broker.getManagementContext().getMBeanServer();
    	return (QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer, new ObjectName("org.apache.activemq:BrokerName=localhost,Type=Queue,Destination=ActiveMQ.DLQ"), QueueViewMBean.class, true);
    }

    protected void consumeAndRollback(int messageCounter) throws Exception {
        for (int i = 0; i < rollbackCount; i++) {
            Message message = consumer.receive(5000);
            assertNotNull("No message received for message: " + messageCounter + " and rollback loop: " + i, message);
            assertMessage(message, messageCounter);

            session.rollback();
        }
        LOG.info("Rolled back: " + rollbackCount + " times");
    }
    
    public void testTransientTopicMessage() throws Exception {
    	// skip
    }

    public void testDurableTopicMessage() throws Exception {
    	//skip
    }
	
}

Reply via email to