Repository: activemq
Updated Branches:
  refs/heads/activemq-5.12.x 961770072 -> bbb002afd


https://issues.apache.org/jira/browse/AMQ-6069

Fixed contains method in PrioritizedPendinList which was not returning
correctly.  This was causing messages to not be removed from the
dispatchPendingList when purge was called inside a Queue leading to an
eventual OOM error if enough messages were purged. This fix also
improves performance of the contains method.

(cherry picked from commit 8363c99b51a98eb176e6baea82fcafce3225ba2c)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bbb002af
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bbb002af
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bbb002af

Branch: refs/heads/activemq-5.12.x
Commit: bbb002afda43e9bc0445bf050c808f117b7ab1e8
Parents: 9617700
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Tue Dec 1 19:33:53 2015 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Wed Dec 2 13:43:03 2015 +0000

----------------------------------------------------------------------
 .../region/cursors/PrioritizedPendingList.java  |  5 ++-
 .../activemq/broker/region/QueuePurgeTest.java  | 38 +++++++++++++++-----
 2 files changed, 32 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/bbb002af/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
index 9235b2c..aa9f467 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/PrioritizedPendingList.java
@@ -133,10 +133,9 @@ public class PrioritizedPendingList implements PendingList 
{
 
     @Override
     public boolean contains(MessageReference message) {
-        if (map.values().contains(message)) {
-            return true;
+        if (message != null) {
+            return this.map.containsKey(message.getMessageId());
         }
-
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/bbb002af/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
index a121619..85faeab 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/QueuePurgeTest.java
@@ -30,6 +30,7 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerService;
@@ -57,6 +58,7 @@ public class QueuePurgeTest extends CombinationTestSupport {
     Queue queue;
     MessageConsumer consumer;
 
+    @Override
     protected void setUp() throws Exception {
         setMaxTestTime(10*60*1000); // 10 mins
         setAutoFail(true);
@@ -78,6 +80,7 @@ public class QueuePurgeTest extends CombinationTestSupport {
         connection.start();
     }
 
+    @Override
     protected void tearDown() throws Exception {
         super.tearDown();
         if (consumer != null) {
@@ -90,7 +93,15 @@ public class QueuePurgeTest extends CombinationTestSupport {
     }
 
     public void testPurgeLargeQueue() throws Exception {
-        applyBrokerSpoolingPolicy();
+        testPurgeLargeQueue(false);
+    }
+
+    public void testPurgeLargeQueuePrioritizedMessages() throws Exception {
+        testPurgeLargeQueue(true);
+    }
+
+    private void testPurgeLargeQueue(boolean prioritizedMessages) throws 
Exception {
+        applyBrokerSpoolingPolicy(prioritizedMessages);
         createProducerAndSendMessages(NUM_TO_SEND);
         QueueViewMBean proxy = getProxyToQueueViewMBean();
         LOG.info("purging..");
@@ -127,10 +138,11 @@ public class QueuePurgeTest extends 
CombinationTestSupport {
                 proxy.getQueueSize());
         assertTrue("cache is disabled, temp store being used", 
!proxy.isCacheEnabled());
         assertTrue("got expected info purge log message", 
gotPurgeLogMessage.get());
+        assertEquals("Found messages when browsing", 0, 
proxy.browseMessages().size());
     }
 
-    public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception {  
     
-        applyBrokerSpoolingPolicy();
+    public void testRepeatedExpiryProcessingOfLargeQueue() throws Exception {
+        applyBrokerSpoolingPolicy(false);
         final int expiryPeriod = 500;
         applyExpiryDuration(expiryPeriod);
         createProducerAndSendMessages(NUM_TO_SEND);
@@ -140,15 +152,16 @@ public class QueuePurgeTest extends 
CombinationTestSupport {
         assertEquals("Queue size is has not changed " + proxy.getQueueSize(), 
NUM_TO_SEND,
                 proxy.getQueueSize());
     }
-    
+
 
     private void applyExpiryDuration(int i) {
         
broker.getDestinationPolicy().getDefaultEntry().setExpireMessagesPeriod(i);
     }
 
-    private void applyBrokerSpoolingPolicy() {
+    private void applyBrokerSpoolingPolicy(boolean prioritizedMessages) {
         PolicyMap policyMap = new PolicyMap();
         PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setPrioritizedMessages(prioritizedMessages);
         defaultEntry.setProducerFlowControl(false);
         PendingQueueMessageStoragePolicy pendingQueuePolicy = new 
FilePendingQueueMessageStoragePolicy();
         defaultEntry.setPendingQueuePolicy(pendingQueuePolicy);
@@ -156,9 +169,17 @@ public class QueuePurgeTest extends CombinationTestSupport 
{
         broker.setDestinationPolicy(policyMap);
     }
 
-    
-    public void testPurgeLargeQueueWithConsumer() throws Exception {       
-        applyBrokerSpoolingPolicy();
+
+    public void testPurgeLargeQueueWithConsumer() throws Exception {
+        testPurgeLargeQueueWithConsumer(false);
+    }
+
+    public void testPurgeLargeQueueWithConsumerPrioritizedMessages() throws 
Exception {
+        testPurgeLargeQueueWithConsumer(true);
+    }
+
+    private void testPurgeLargeQueueWithConsumer(boolean prioritizedMessages) 
throws Exception {
+        applyBrokerSpoolingPolicy(prioritizedMessages);
         createProducerAndSendMessages(NUM_TO_SEND);
         QueueViewMBean proxy = getProxyToQueueViewMBean();
         createConsumer();
@@ -177,6 +198,7 @@ public class QueuePurgeTest extends CombinationTestSupport {
             }
         } while (msg != null);
         assertEquals("Queue size not valid", 0, proxy.getQueueSize());
+        assertEquals("Found messages when browsing", 0, 
proxy.browseMessages().size());
     }
 
     private QueueViewMBean getProxyToQueueViewMBean()

Reply via email to