Repository: activemq Updated Branches: refs/heads/activemq-5.12.x 961770072 -> bbb002afd
https://issues.apache.org/jira/browse/AMQ-6069 Fixed contains method in PrioritizedPendinList which was not returning correctly. This was causing messages to not be removed from the dispatchPendingList when purge was called inside a Queue leading to an eventual OOM error if enough messages were purged. This fix also improves performance of the contains method. (cherry picked from commit 8363c99b51a98eb176e6baea82fcafce3225ba2c) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bbb002af Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bbb002af Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bbb002af Branch: refs/heads/activemq-5.12.x Commit: bbb002afda43e9bc0445bf050c808f117b7ab1e8 Parents: 9617700 Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Tue Dec 1 19:33:53 2015 +0000 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Wed Dec 2 13:43:03 2015 +0000 ---------------------------------------------------------------------- .../region/cursors/PrioritizedPendingList.java | 5 ++- .../activemq/broker/region/QueuePurgeTest.java | 38 +++++++++++++++----- 2 files changed, 32 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/bbb002af/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java index 9235b2c..aa9f467 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java @@ -133,10 +133,9 @@ public class PrioritizedPendingList implements PendingList { @Override public boolean contains(MessageReference message) { - if (map.values().contains(message)) { - return true; + if (message != null) { + return this.map.containsKey(message.getMessageId()); } - return false; } http://git-wip-us.apache.org/repos/asf/activemq/blob/bbb002af/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java index a121619..85faeab 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java @@ -30,6 +30,7 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; + import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.CombinationTestSupport; import org.apache.activemq.broker.BrokerService; @@ -57,6 +58,7 @@ public class QueuePurgeTest extends CombinationTestSupport { Queue queue; MessageConsumer consumer; + @Override protected void setUp() throws Exception { setMaxTestTime(10*60*1000); // 10 mins setAutoFail(true); @@ -78,6 +80,7 @@ public class QueuePurgeTest extends CombinationTestSupport { connection.start(); } + @Override protected void tearDown() throws Exception { super.tearDown(); if (consumer != null) { @@ -90,7 +93,15 @@ public class QueuePurgeTest extends CombinationTestSupport { } public void testPurgeLargeQueue() throws Exception { - applyBrokerSpoolingPolicy(); + testPurgeLargeQueue(false); + } + + public void testPurgeLargeQueuePrioritizedMessages() throws Exception { + testPurgeLargeQueue(true); + } + + private void testPurgeLargeQueue(boolean prioritizedMessages) throws Exception { + applyBrokerSpoolingPolicy(prioritizedMessages); createProducerAndSendMessages(NUM_TO_SEND); QueueViewMBean proxy = getProxyToQueueViewMBean(); LOG.info("purging.."); @@ -127,10 +138,11 @@ public class QueuePurgeTest extends CombinationTestSupport { proxy.getQueueSize()); assertTrue("cache is disabled, temp store being used", !proxy.isCacheEnabled()); assertTrue("got expected info purge log message", gotPurgeLogMessage.get()); + assertEquals("Found messages when browsing", 0, proxy.browseMessages().size()); } - public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception { - applyBrokerSpoolingPolicy(); + public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception { + applyBrokerSpoolingPolicy(false); final int expiryPeriod = 500; applyExpiryDuration(expiryPeriod); createProducerAndSendMessages(NUM_TO_SEND); @@ -140,15 +152,16 @@ public class QueuePurgeTest extends CombinationTestSupport { assertEquals("Queue size is has not changed " + proxy.getQueueSize(), NUM_TO_SEND, proxy.getQueueSize()); } - + private void applyExpiryDuration(int i) { broker.getDestinationPolicy().getDefaultEntry().setExpireMessagesPeriod(i); } - private void applyBrokerSpoolingPolicy() { + private void applyBrokerSpoolingPolicy(boolean prioritizedMessages) { PolicyMap policyMap = new PolicyMap(); PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setPrioritizedMessages(prioritizedMessages); defaultEntry.setProducerFlowControl(false); PendingQueueMessageStoragePolicy pendingQueuePolicy = new FilePendingQueueMessageStoragePolicy(); defaultEntry.setPendingQueuePolicy(pendingQueuePolicy); @@ -156,9 +169,17 @@ public class QueuePurgeTest extends CombinationTestSupport { broker.setDestinationPolicy(policyMap); } - - public void testPurgeLargeQueueWithConsumer() throws Exception { - applyBrokerSpoolingPolicy(); + + public void testPurgeLargeQueueWithConsumer() throws Exception { + testPurgeLargeQueueWithConsumer(false); + } + + public void testPurgeLargeQueueWithConsumerPrioritizedMessages() throws Exception { + testPurgeLargeQueueWithConsumer(true); + } + + private void testPurgeLargeQueueWithConsumer(boolean prioritizedMessages) throws Exception { + applyBrokerSpoolingPolicy(prioritizedMessages); createProducerAndSendMessages(NUM_TO_SEND); QueueViewMBean proxy = getProxyToQueueViewMBean(); createConsumer(); @@ -177,6 +198,7 @@ public class QueuePurgeTest extends CombinationTestSupport { } } while (msg != null); assertEquals("Queue size not valid", 0, proxy.getQueueSize()); + assertEquals("Found messages when browsing", 0, proxy.browseMessages().size()); } private QueueViewMBean getProxyToQueueViewMBean()