Repository: activemq
Updated Branches:
  refs/heads/activemq-5.13.x aaa2fdd54 -> ce604fba7


https://issues.apache.org/jira/browse/AMQ-6151 - respect prioritizeMessages for 
pending and redelivered messages

(cherry picked from commit 5af5b59d3bf3c84098e55b6cb87631c061990666)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ce604fba
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ce604fba
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ce604fba

Branch: refs/heads/activemq-5.13.x
Commit: ce604fba78a3108123ca5c1713d01e168385d7ba
Parents: aaa2fdd
Author: gtully <gary.tu...@gmail.com>
Authored: Mon Feb 1 12:19:24 2016 +0000
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Tue Feb 2 16:20:22 2016 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    | 10 ++++
 .../cursors/QueueDispatchPendingList.java       | 11 ++++-
 .../activemq/store/MessagePriorityTest.java     | 51 ++++++++++++++++++++
 3 files changed, 70 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ce604fba/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index d447ebd..d59c1d8 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -2000,6 +2000,16 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
 
         pagedInPendingDispatchLock.writeLock().lock();
         try {
+            if (isPrioritizedMessages() && !dispatchPendingList.isEmpty() && 
list != null && !list.isEmpty()) {
+                // merge all to select priority order
+                for (MessageReference qmr : list) {
+                    if (!dispatchPendingList.contains(qmr)) {
+                        dispatchPendingList.addMessageLast(qmr);
+                    }
+                }
+                list = null;
+            }
+
             doActualDispatch(dispatchPendingList);
             // and now see if we can dispatch the new stuff.. and append to 
the pending
             // list anything that does not actually get dispatched.

http://git-wip-us.apache.org/repos/asf/activemq/blob/ce604fba/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
index cdddd4c..385e2b8 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
@@ -40,6 +40,8 @@ public class QueueDispatchPendingList implements PendingList {
 
     private PendingList pagedInPendingDispatch = new OrderedPendingList();
     private PendingList redeliveredWaitingDispatch = new OrderedPendingList();
+    // when true use one PrioritizedPendingList for everything
+    private boolean prioritized = false;
 
 
     @Override
@@ -160,6 +162,7 @@ public class QueueDispatchPendingList implements 
PendingList {
     }
 
     public void setPrioritizedMessages(boolean prioritizedMessages) {
+        prioritized = prioritizedMessages;
         if (prioritizedMessages && this.pagedInPendingDispatch instanceof 
OrderedPendingList) {
             pagedInPendingDispatch = new PrioritizedPendingList();
             redeliveredWaitingDispatch = new PrioritizedPendingList();
@@ -170,10 +173,14 @@ public class QueueDispatchPendingList implements 
PendingList {
     }
 
     public void addMessageForRedelivery(QueueMessageReference qmr) {
-        redeliveredWaitingDispatch.addMessageLast(qmr);
+        if (prioritized) {
+            pagedInPendingDispatch.addMessageLast(qmr);
+        } else {
+            redeliveredWaitingDispatch.addMessageLast(qmr);
+        }
     }
 
     public boolean hasRedeliveries(){
-        return !redeliveredWaitingDispatch.isEmpty();
+        return prioritized ? !pagedInPendingDispatch.isEmpty() : 
!redeliveredWaitingDispatch.isEmpty();
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ce604fba/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
index 789e45f..e7c746c 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/MessagePriorityTest.java
@@ -110,6 +110,7 @@ abstract public class MessagePriorityTest extends 
CombinationTestSupport {
         broker.waitUntilStarted();
 
         factory = new ActiveMQConnectionFactory("vm://priorityTest");
+        factory.setMessagePrioritySupported(true);
         ActiveMQPrefetchPolicy prefetch = new ActiveMQPrefetchPolicy();
         prefetch.setAll(prefetchVal);
         factory.setPrefetchPolicy(prefetch);
@@ -668,6 +669,56 @@ abstract public class MessagePriorityTest extends 
CombinationTestSupport {
         queueConsumer.close();
     }
 
+    public void testInterleaveHiNewConsumerGetsHi() throws Exception {
+        ActiveMQQueue queue = (ActiveMQQueue) sess.createQueue("TEST");
+        doTestInterleaveHiNewConsumerGetsHi(queue);
+    }
+
+    public void testInterleaveHiNewConsumerGetsHiPull() throws Exception {
+        ActiveMQQueue queue = (ActiveMQQueue) 
sess.createQueue("TEST?consumer.prefetchSize=0");
+        doTestInterleaveHiNewConsumerGetsHi(queue);
+    }
+
+    public void doTestInterleaveHiNewConsumerGetsHi(ActiveMQQueue queue) 
throws Exception {
+
+        // one hi sandwich
+        ProducerThread producerThread = new ProducerThread(queue, 3, LOW_PRI);
+        producerThread.run();
+        producerThread = new ProducerThread(queue, 1, HIGH_PRI);
+        producerThread.run();
+        producerThread = new ProducerThread(queue, 3, LOW_PRI);
+        producerThread.run();
+
+        // consume hi
+        MessageConsumer queueConsumer = sess.createConsumer(queue);
+        Message message = queueConsumer.receive(10000);
+        assertNotNull("expect #", message);
+        assertEquals("correct priority", HIGH_PRI, message.getJMSPriority());
+        queueConsumer.close();
+
+        // last hi
+        producerThread = new ProducerThread(queue, 3, LOW_PRI);
+        producerThread.run();
+        producerThread = new ProducerThread(queue, 1, HIGH_PRI);
+        producerThread.run();
+
+        // consume hi
+        queueConsumer = sess.createConsumer(queue);
+        message = queueConsumer.receive(10000);
+        assertNotNull("expect #", message);
+        assertEquals("correct priority", HIGH_PRI, message.getJMSPriority());
+        queueConsumer.close();
+
+        // consume the rest
+        queueConsumer = sess.createConsumer(queue);
+        for (int i = 0; i < 9; i++) {
+            message = queueConsumer.receive(10000);
+            assertNotNull("expect #" + i, message);
+            assertEquals("correct priority", LOW_PRI, 
message.getJMSPriority());
+        }
+        queueConsumer.close();
+    }
+
     public void initCombosForTestEveryXHi() {
         // the cache limits the priority ordering to available memory
         addCombinationValues("useCache", new Object[] {Boolean.FALSE, 
Boolean.TRUE});

Reply via email to