[AMQ-6847] limit the retry loop to one iteration overa all pending messages such that new additions are not replayed to avoid duplicates
(cherry picked from commit 03b19b9da4d50c3bb8985f930e93596c7d994d26) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0464d532 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0464d532 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0464d532 Branch: refs/heads/activemq-5.15.x Commit: 0464d53233f95535dfaf8da6adcef9470cb52bdf Parents: eb9e50f Author: gtully <gary.tu...@gmail.com> Authored: Wed Nov 1 11:26:36 2017 +0000 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Tue Dec 19 07:18:12 2017 -0500 ---------------------------------------------------------------------- .../java/org/apache/activemq/broker/region/Queue.java | 10 +++++++--- .../java/org/apache/activemq/broker/jmx/MBeanTest.java | 10 +++++++--- 2 files changed, 14 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/0464d532/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index ac206e3..48cbfbe 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1475,8 +1475,8 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index try { messages.rollback(m.getMessageId()); if (isDLQ()) { - DeadLetterStrategy stratagy = getDeadLetterStrategy(); - stratagy.rollback(m.getMessage()); + DeadLetterStrategy strategy = getDeadLetterStrategy(); + strategy.rollback(m.getMessage()); } } finally { messagesLock.writeLock().unlock(); @@ -1560,6 +1560,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index throw new Exception("Retry of message is only possible on Dead Letter Queues!"); } int restoredCounter = 0; + // ensure we deal with a snapshot to avoid potential duplicates in the event of messages + // getting immediate dlq'ed + long numberOfRetryAttemptsToCheckAllMessagesOnce = this.destinationStatistics.getMessages().getCount(); Set<MessageReference> set = new LinkedHashSet<MessageReference>(); do { doPageIn(true); @@ -1571,6 +1574,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } List<MessageReference> list = new ArrayList<MessageReference>(set); for (MessageReference ref : list) { + numberOfRetryAttemptsToCheckAllMessagesOnce--; if (ref.getMessage().getOriginalDestination() != null) { moveMessageTo(context, (QueueMessageReference)ref, ref.getMessage().getOriginalDestination()); @@ -1580,7 +1584,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } } } - } while (set.size() < this.destinationStatistics.getMessages().getCount() && set.size() < maximumMessages); + } while (numberOfRetryAttemptsToCheckAllMessagesOnce > 0 && set.size() < this.destinationStatistics.getMessages().getCount()); return restoredCounter; } http://git-wip-us.apache.org/repos/asf/activemq/blob/0464d532/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java index ecc6894..0ccf1cb 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java @@ -206,22 +206,26 @@ public class MBeanTest extends EmbeddedBrokerTestSupport { }}); + ObjectName queueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + getDestinationString()); + QueueViewMBean queue = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, queueViewMBeanName, QueueViewMBean.class, true); + ObjectName dlqQueueViewMBeanName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME ); QueueViewMBean dlq = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, dlqQueueViewMBeanName, QueueViewMBean.class, true); - assertTrue("messagees on dlq", Wait.waitFor(new Wait.Condition() { + assertTrue("messages on dlq", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { + LOG.info("Dlq size: " + dlq.getQueueSize() + ", qSize: " + queue.getQueueSize()); return MESSAGE_COUNT == dlq.getQueueSize(); } })); dlq.retryMessages(); - assertTrue("messagees on dlq after retry", Wait.waitFor(new Wait.Condition() { + assertTrue("messages on dlq after retry", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { - LOG.info("Dlq size: " + dlq.getQueueSize()); + LOG.info("Dlq size: " + dlq.getQueueSize() + ", qSize: " + queue.getQueueSize()); return MESSAGE_COUNT == dlq.getQueueSize(); } }));