[AMQ-6847] limit the retry loop to one iteration overa all pending messages 
such that new additions are not replayed to avoid duplicates

(cherry picked from commit 03b19b9da4d50c3bb8985f930e93596c7d994d26)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/185160c0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/185160c0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/185160c0

Branch: refs/heads/activemq-5.14.x
Commit: 185160c0daf955bf7ef43a9440cf5bc9672bd461
Parents: e62705a
Author: gtully <gary.tu...@gmail.com>
Authored: Wed Nov 1 11:26:36 2017 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Tue Dec 19 07:19:19 2017 -0500

----------------------------------------------------------------------
 .../java/org/apache/activemq/broker/region/Queue.java     | 10 +++++++---
 .../java/org/apache/activemq/broker/jmx/MBeanTest.java    | 10 +++++++---
 2 files changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/185160c0/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 78010e9..1fe4f58 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -1443,8 +1443,8 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
             try {
                 messages.rollback(m.getMessageId());
                 if (isDLQ()) {
-                    DeadLetterStrategy stratagy = getDeadLetterStrategy();
-                    stratagy.rollback(m.getMessage());
+                    DeadLetterStrategy strategy = getDeadLetterStrategy();
+                    strategy.rollback(m.getMessage());
                 }
             } finally {
                 messagesLock.writeLock().unlock();
@@ -1523,6 +1523,9 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
             throw new Exception("Retry of message is only possible on Dead 
Letter Queues!");
         }
         int restoredCounter = 0;
+        // ensure we deal with a snapshot to avoid potential duplicates in the 
event of messages
+        // getting immediate dlq'ed
+        long numberOfRetryAttemptsToCheckAllMessagesOnce = 
this.destinationStatistics.getMessages().getCount();
         Set<MessageReference> set = new LinkedHashSet<MessageReference>();
         do {
             doPageIn(true);
@@ -1534,6 +1537,7 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
             }
             List<MessageReference> list = new ArrayList<MessageReference>(set);
             for (MessageReference ref : list) {
+                numberOfRetryAttemptsToCheckAllMessagesOnce--;
                 if (ref.getMessage().getOriginalDestination() != null) {
 
                     moveMessageTo(context, (QueueMessageReference)ref, 
ref.getMessage().getOriginalDestination());
@@ -1543,7 +1547,7 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
                     }
                 }
             }
-        } while (set.size() < 
this.destinationStatistics.getMessages().getCount() && set.size() < 
maximumMessages);
+        } while (numberOfRetryAttemptsToCheckAllMessagesOnce > 0 && set.size() 
< this.destinationStatistics.getMessages().getCount());
         return restoredCounter;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/185160c0/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
index e79f007..e0a2ee0 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
@@ -206,22 +206,26 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
             }});
 
 
+        ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + 
":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + 
getDestinationString());
+        QueueViewMBean queue = 
MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, 
QueueViewMBean.class, true);
+
         ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + 
":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + 
SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME );
         QueueViewMBean dlq = 
MBeanServerInvocationHandler.newProxyInstance(mbeanServer, 
dlqQueueViewMBeanName, QueueViewMBean.class, true);
 
-        assertTrue("messagees on dlq", Wait.waitFor(new Wait.Condition() {
+        assertTrue("messages on dlq", Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
+                LOG.info("Dlq size: " + dlq.getQueueSize() + ", qSize: " + 
queue.getQueueSize());
                 return MESSAGE_COUNT == dlq.getQueueSize();
             }
         }));
 
         dlq.retryMessages();
 
-        assertTrue("messagees on dlq after retry", Wait.waitFor(new 
Wait.Condition() {
+        assertTrue("messages on dlq after retry", Wait.waitFor(new 
Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
-                LOG.info("Dlq size: " + dlq.getQueueSize());
+                LOG.info("Dlq size: " + dlq.getQueueSize() + ", qSize: " + 
queue.getQueueSize());
                 return MESSAGE_COUNT == dlq.getQueueSize();
             }
         }));

Reply via email to