https://issues.apache.org/jira/browse/AMQ-6059

Ensure that a message sent to the store for the DLQ is rewritten so that
its updated values are written to prevent exirpation loops and loss of
reollback cause etc.
(cherry picked from commit 505a76a8bb7180debbd36637dce1b9101150d0b4)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b04cfeb8
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b04cfeb8
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b04cfeb8

Branch: refs/heads/activemq-5.13.x
Commit: b04cfeb8af2cdf7d9ccbcd65a309478d4e01db9f
Parents: b4405be
Author: Timothy Bish <tabish...@gmail.com>
Authored: Tue Jan 12 11:51:35 2016 -0500
Committer: Timothy Bish <tabish...@gmail.com>
Committed: Tue Jan 12 12:00:41 2016 -0500

----------------------------------------------------------------------
 .../org/apache/activemq/util/BrokerSupport.java |  13 +-
 .../org/apache/activemq/bugs/AMQ6059Test.java   | 199 +++++++++++++++++++
 2 files changed, 206 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b04cfeb8/activemq-broker/src/main/java/org/apache/activemq/util/BrokerSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/util/BrokerSupport.java 
b/activemq-broker/src/main/java/org/apache/activemq/util/BrokerSupport.java
index f3f3b78..df3b658 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/util/BrokerSupport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/util/BrokerSupport.java
@@ -31,24 +31,24 @@ import org.apache.activemq.state.ProducerState;
  */
 public final class BrokerSupport {
 
-    private BrokerSupport() {        
+    private BrokerSupport() {
     }
-    
+
     public static void resendNoCopy(final ConnectionContext context, Message 
originalMessage, ActiveMQDestination deadLetterDestination) throws Exception {
         doResend(context, originalMessage, deadLetterDestination, false);
     }
-    
+
     /**
      * @param context
-     * @param originalMessage 
+     * @param originalMessage
      * @param deadLetterDestination
      * @throws Exception
      */
     public static void resend(final ConnectionContext context, Message 
originalMessage, ActiveMQDestination deadLetterDestination) throws Exception {
         doResend(context, originalMessage, deadLetterDestination, true);
     }
-    
-    public static void doResend(final ConnectionContext context, Message 
originalMessage, ActiveMQDestination deadLetterDestination, boolean copy) 
throws Exception {       
+
+    public static void doResend(final ConnectionContext context, Message 
originalMessage, ActiveMQDestination deadLetterDestination, boolean copy) 
throws Exception {
         Message message = copy ? originalMessage.copy() : originalMessage;
         message.setOriginalDestination(message.getDestination());
         message.setOriginalTransactionId(message.getTransactionId());
@@ -56,6 +56,7 @@ public final class BrokerSupport {
         message.setTransactionId(null);
         message.setMemoryUsage(null);
         message.setRedeliveryCounter(0);
+        message.getMessageId().setDataLocator(null);
         boolean originalFlowControl = context.isProducerFlowControl();
         try {
             context.setProducerFlowControl(false);

http://git-wip-us.apache.org/repos/asf/activemq/blob/b04cfeb8/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6059Test.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6059Test.java 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6059Test.java
new file mode 100644
index 0000000..049d683
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ6059Test.java
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Enumeration;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.leveldb.LevelDBStore;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Once the wire format is completed we can test against real persistence 
storage.
+ */
+public class AMQ6059Test {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(AMQ6059Test.class);
+
+    private BrokerService broker;
+
+    @Before
+    public void setUp() throws Exception {
+        broker = createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    @Test
+    public void testDLQRecovery() throws Exception {
+
+        sendMessage(new ActiveMQQueue("leveldbQueue"));
+        TimeUnit.SECONDS.sleep(3);
+
+        LOG.info("### Check for expired message moving to DLQ.");
+
+        Queue dlqQueue = (Queue) createDlqDestination();
+        verifyIsDlq(dlqQueue);
+
+        final QueueViewMBean queueViewMBean = 
getProxyToQueue(dlqQueue.getQueueName());
+
+        assertTrue("The message expired", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("DLQ stats: Enqueues {}, Dispatches {}, Expired {}, 
Inflight {}",
+                    new Object[] { queueViewMBean.getEnqueueCount(),
+                                   queueViewMBean.getDispatchCount(),
+                                   queueViewMBean.getExpiredCount(),
+                                   queueViewMBean.getInFlightCount()});
+                return queueViewMBean.getEnqueueCount() == 1;
+            }
+        }));
+
+        verifyMessageIsRecovered(dlqQueue);
+        restartBroker(broker);
+        verifyMessageIsRecovered(dlqQueue);
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        return createBrokerWithDLQ(true);
+    }
+
+    private BrokerService createBrokerWithDLQ(boolean purge) throws Exception {
+        BrokerService broker = new BrokerService();
+
+        File directory = new File("target/activemq-data/leveldb");
+        if (purge) {
+            IOHelper.deleteChildren(directory);
+        }
+
+        LevelDBStore levelDBStore = new LevelDBStore();
+        levelDBStore.setDirectory(directory);
+        if (purge) {
+            levelDBStore.deleteAllMessages();
+        }
+
+        PolicyMap pMap = new PolicyMap();
+
+        SharedDeadLetterStrategy sharedDLQStrategy = new 
SharedDeadLetterStrategy();
+        sharedDLQStrategy.setProcessNonPersistent(true);
+        sharedDLQStrategy.setProcessExpired(true);
+        sharedDLQStrategy.setDeadLetterQueue(new 
ActiveMQQueue("ActiveMQ.DLQ"));
+        sharedDLQStrategy.setExpiration(10000);
+
+        PolicyEntry defaultPolicy = new PolicyEntry();
+        defaultPolicy.setDeadLetterStrategy(sharedDLQStrategy);
+        defaultPolicy.setExpireMessagesPeriod(2000);
+        defaultPolicy.setUseCache(false);
+
+        pMap.put(new ActiveMQQueue(">"), defaultPolicy);
+        broker.setDestinationPolicy(pMap);
+        broker.setPersistenceAdapter(levelDBStore);
+        if (purge) {
+            broker.setDeleteAllMessagesOnStartup(true);
+        }
+
+        return broker;
+    }
+
+    private void restartBroker(BrokerService broker) throws Exception {
+        broker.stop();
+        broker.waitUntilStopped();
+        broker = createBrokerWithDLQ(false);
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    private void verifyMessageIsRecovered(final Queue dlqQueue) throws 
Exception, JMSException {
+        Connection connection = createConnection();
+        connection.start();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        QueueBrowser browser = session.createBrowser(dlqQueue);
+        Enumeration<?> elements = browser.getEnumeration();
+        assertTrue(elements.hasMoreElements());
+        Message browsed = (Message) elements.nextElement();
+        assertNotNull("Recover message after broker restarts", browsed);
+    }
+
+    private void sendMessage(Destination destination) throws Exception {
+        Connection connection = createConnection();
+        connection.start();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+        producer.send(destination, session.createTextMessage("DLQ message"), 
DeliveryMode.PERSISTENT, 4, 1000);
+        connection.stop();
+        LOG.info("### Send message that will expire.");
+    }
+
+    private Connection createConnection() throws Exception {
+        ActiveMQConnectionFactory factory = new 
ActiveMQConnectionFactory(broker.getVmConnectorURI());
+        return factory.createConnection();
+    }
+
+    private Destination createDlqDestination() {
+        return new ActiveMQQueue("ActiveMQ.DLQ");
+    }
+
+    private void verifyIsDlq(Queue dlqQ) throws Exception {
+        final QueueViewMBean queueViewMBean = 
getProxyToQueue(dlqQ.getQueueName());
+        assertTrue("is dlq", queueViewMBean.isDLQ());
+    }
+
+    private QueueViewMBean getProxyToQueue(String name) throws 
MalformedObjectNameException, JMSException {
+        ObjectName queueViewMBeanName = new ObjectName(
+            
"org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName="
 + name);
+        QueueViewMBean proxy = (QueueViewMBean) 
broker.getManagementContext().newProxyInstance(
+            queueViewMBeanName, QueueViewMBean.class, true);
+        return proxy;
+    }
+}

Reply via email to