Repository: activemq
Updated Branches:
  refs/heads/master f56ea45e5 -> efc9a8d57


Fix for https://issues.apache.org/jira/browse/AMQ-5689 Queue dispatching hangs 
when there are redelivered messages that dont match current consumers 
selectors, refactored out the pendingDispatchList in Queue implementation


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/efc9a8d5
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/efc9a8d5
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/efc9a8d5

Branch: refs/heads/master
Commit: efc9a8d5782143b8c1e86750d34ce74af5a2940a
Parents: f56ea45
Author: Christian Posta <christian.po...@gmail.com>
Authored: Thu Mar 26 15:48:34 2015 -0700
Committer: Christian Posta <christian.po...@gmail.com>
Committed: Fri Mar 27 15:19:56 2015 -0700

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |  69 +++-----
 .../cursors/QueueDispatchPendingList.java       | 174 +++++++++++++++++++
 .../apache/activemq/JmsQueueSelectorTest.java   |  52 ++++++
 .../apache/activemq/JmsTopicSelectorTest.java   |   7 +-
 4 files changed, 252 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/efc9a8d5/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index c3de541..3438dcc 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -49,12 +49,7 @@ import javax.jms.ResourceAllocationException;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
-import org.apache.activemq.broker.region.cursors.OrderedPendingList;
-import org.apache.activemq.broker.region.cursors.PendingList;
-import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
-import org.apache.activemq.broker.region.cursors.PrioritizedPendingList;
-import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
-import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
+import org.apache.activemq.broker.region.cursors.*;
 import org.apache.activemq.broker.region.group.CachedMessageGroupMapFactory;
 import org.apache.activemq.broker.region.group.MessageGroupMap;
 import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
@@ -109,8 +104,7 @@ public class Queue extends BaseDestination implements Task, 
UsageListener, Index
     private final PendingList pagedInMessages = new OrderedPendingList();
     // Messages that are paged in but have not yet been targeted at a 
subscription
     private final ReentrantReadWriteLock pagedInPendingDispatchLock = new 
ReentrantReadWriteLock();
-    protected PendingList pagedInPendingDispatch = new OrderedPendingList();
-    protected PendingList redeliveredWaitingDispatch = new 
OrderedPendingList();
+    protected QueueDispatchPendingList dispatchPendingList = new 
QueueDispatchPendingList();
     private MessageGroupMap messageGroupOwners;
     private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy();
     private MessageGroupMapFactory messageGroupMapFactory = new 
CachedMessageGroupMapFactory();
@@ -343,14 +337,7 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
     @Override
     public void setPrioritizedMessages(boolean prioritizedMessages) {
         super.setPrioritizedMessages(prioritizedMessages);
-
-        if (prioritizedMessages && this.pagedInPendingDispatch instanceof 
OrderedPendingList) {
-            pagedInPendingDispatch = new PrioritizedPendingList();
-            redeliveredWaitingDispatch = new PrioritizedPendingList();
-        } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) {
-            pagedInPendingDispatch = new OrderedPendingList();
-            redeliveredWaitingDispatch = new OrderedPendingList();
-        }
+        dispatchPendingList.setPrioritizedMessages(prioritizedMessages);
     }
 
     @Override
@@ -583,7 +570,7 @@ public class Queue extends BaseDestination implements Task, 
UsageListener, Index
                         }
                     }
                     if (!qmr.isDropped()) {
-                        redeliveredWaitingDispatch.addMessageLast(qmr);
+                        dispatchPendingList.addMessageForRedelivery(qmr);
                     }
                 }
                 if (sub instanceof QueueBrowserSubscription) {
@@ -591,7 +578,7 @@ public class Queue extends BaseDestination implements Task, 
UsageListener, Index
                     browserDispatches.remove(sub);
                 }
                 // AMQ-5107: don't resend if the broker is shutting down
-                if (!redeliveredWaitingDispatch.isEmpty() && (! 
this.brokerService.isStopping())) {
+                if (dispatchPendingList.hasRedeliveries() && (! 
this.brokerService.isStopping())) {
                     doDispatch(new OrderedPendingList());
                 }
             } finally {
@@ -1118,8 +1105,7 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
                 pageInMessages(!memoryUsage.isFull(110));
             };
 
-            doBrowseList(browseList, max, redeliveredWaitingDispatch, 
pagedInPendingDispatchLock, connectionContext, "redeliveredWaitingDispatch");
-            doBrowseList(browseList, max, pagedInPendingDispatch, 
pagedInPendingDispatchLock, connectionContext, "pagedInPendingDispatch");
+            doBrowseList(browseList, max, dispatchPendingList, 
pagedInPendingDispatchLock, connectionContext, 
"redeliveredWaitingDispatch+pagedInPendingDispatch");
             doBrowseList(browseList, max, pagedInMessages, 
pagedInMessagesLock, connectionContext, "pagedInMessages");
 
             // we need a store iterator to walk messages on disk, independent 
of the cursor which is tracking
@@ -1581,7 +1567,7 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
 
             pagedInPendingDispatchLock.readLock().lock();
             try {
-                pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
+                pageInMoreMessages |= !dispatchPendingList.isEmpty();
             } finally {
                 pagedInPendingDispatchLock.readLock().unlock();
             }
@@ -1593,7 +1579,7 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
             // then we do a dispatch.
             boolean hasBrowsers = browserDispatches.size() > 0;
 
-            if (pageInMoreMessages || hasBrowsers || 
!redeliveredWaitingDispatch.isEmpty()) {
+            if (pageInMoreMessages || hasBrowsers || 
!dispatchPendingList.hasRedeliveries()) {
                 try {
                     pageInMessages(hasBrowsers);
                 } catch (Throwable e) {
@@ -1710,7 +1696,7 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
         removeMessage(c, null, r);
         pagedInPendingDispatchLock.writeLock().lock();
         try {
-            pagedInPendingDispatch.remove(r);
+            dispatchPendingList.remove(r);
         } finally {
             pagedInPendingDispatchLock.writeLock().unlock();
         }
@@ -1857,13 +1843,13 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
         PendingList newlyPaged = doPageInForDispatch(force, processExpired);
         pagedInPendingDispatchLock.writeLock().lock();
         try {
-            if (pagedInPendingDispatch.isEmpty()) {
-                pagedInPendingDispatch.addAll(newlyPaged);
+            if (dispatchPendingList.isEmpty()) {
+                dispatchPendingList.addAll(newlyPaged);
 
             } else {
                 for (MessageReference qmr : newlyPaged) {
-                    if (!pagedInPendingDispatch.contains(qmr)) {
-                        pagedInPendingDispatch.addMessageLast(qmr);
+                    if (!dispatchPendingList.contains(qmr)) {
+                        dispatchPendingList.addMessageLast(qmr);
                     }
                 }
             }
@@ -1880,7 +1866,7 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
         int pagedInPendingSize = 0;
         pagedInPendingDispatchLock.readLock().lock();
         try {
-            pagedInPendingSize = pagedInPendingDispatch.size();
+            pagedInPendingSize = dispatchPendingList.size();
         } finally {
             pagedInPendingDispatchLock.readLock().unlock();
         }
@@ -1973,27 +1959,16 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
 
         pagedInPendingDispatchLock.writeLock().lock();
         try {
-            if (!redeliveredWaitingDispatch.isEmpty()) {
-                // Try first to dispatch redelivered messages to keep an
-                // proper order
-                redeliveredWaitingDispatch = 
doActualDispatch(redeliveredWaitingDispatch);
-            }
-            if (redeliveredWaitingDispatch.isEmpty()) {
-                if (!pagedInPendingDispatch.isEmpty()) {
-                    // Next dispatch anything that had not been
-                    // dispatched before.
-                    pagedInPendingDispatch = 
doActualDispatch(pagedInPendingDispatch);
-                }
-            }
+            doActualDispatch(dispatchPendingList);
             // and now see if we can dispatch the new stuff.. and append to 
the pending
             // list anything that does not actually get dispatched.
             if (list != null && !list.isEmpty()) {
-                if (redeliveredWaitingDispatch.isEmpty() && 
pagedInPendingDispatch.isEmpty()) {
-                    pagedInPendingDispatch.addAll(doActualDispatch(list));
+                if (dispatchPendingList.isEmpty()) {
+                    dispatchPendingList.addAll(doActualDispatch(list));
                 } else {
                     for (MessageReference qmr : list) {
-                        if (!pagedInPendingDispatch.contains(qmr)) {
-                            pagedInPendingDispatch.addMessageLast(qmr);
+                        if (!dispatchPendingList.contains(qmr)) {
+                            dispatchPendingList.addMessageLast(qmr);
                         }
                     }
                     doWakeUp = true;
@@ -2192,10 +2167,10 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
 
         pagedInPendingDispatchLock.writeLock().lock();
         try {
-            for (MessageReference ref : pagedInPendingDispatch) {
+            for (MessageReference ref : dispatchPendingList) {
                 if (messageId.equals(ref.getMessageId())) {
                     message = (QueueMessageReference)ref;
-                    pagedInPendingDispatch.remove(ref);
+                    dispatchPendingList.remove(ref);
                     break;
                 }
             }
@@ -2245,7 +2220,7 @@ public class Queue extends BaseDestination implements 
Task, UsageListener, Index
             throw new JMSException("Slave broker out of sync with master - 
Message: "
                     + messageDispatchNotification.getMessageId() + " on "
                     + messageDispatchNotification.getDestination() + " does 
not exist among pending("
-                    + pagedInPendingDispatch.size() + ") for subscription: "
+                    + dispatchPendingList.size() + ") for subscription: "
                     + messageDispatchNotification.getConsumerId());
         }
         return message;

http://git-wip-us.apache.org/repos/asf/activemq/blob/efc9a8d5/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
new file mode 100644
index 0000000..8c6032b
--- /dev/null
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.region.cursors;
+
+import org.apache.activemq.broker.region.MessageReference;
+import org.apache.activemq.broker.region.QueueMessageReference;
+import org.apache.activemq.command.MessageId;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * An abstraction that keeps the correct order of messages that need to be 
dispatched
+ * to consumers, but also hides the fact that there might be redelivered 
messages that
+ * should be dispatched ahead of any other paged in messages.
+ *
+ * Direct usage of this class is recommended as you can control when 
redeliveries need
+ * to be added vs regular pending messages (the next set of messages that can 
be dispatched)
+ *
+ * Created by ceposta
+ * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>.
+ */
+public class QueueDispatchPendingList implements PendingList {
+
+    private PendingList pagedInPendingDispatch = new OrderedPendingList();
+    private PendingList redeliveredWaitingDispatch = new OrderedPendingList();
+
+
+    @Override
+    public boolean isEmpty() {
+        return pagedInPendingDispatch.isEmpty() && 
redeliveredWaitingDispatch.isEmpty();
+    }
+
+    @Override
+    public void clear() {
+        pagedInPendingDispatch.clear();
+        redeliveredWaitingDispatch.clear();
+    }
+
+    /**
+     * Messages added are added directly to the pagedInPendingDispatch set of 
messages. If
+     * you're trying to add a message that is marked redelivered add it using 
addMessageForRedelivery()
+     * method
+     * @param message
+     *      The MessageReference that is to be added to this list.
+     *
+     * @return
+     */
+    @Override
+    public PendingNode addMessageFirst(MessageReference message) {
+        return pagedInPendingDispatch.addMessageFirst(message);
+    }
+
+    /**
+     * Messages added are added directly to the pagedInPendingDispatch set of 
messages. If
+     * you're trying to add a message that is marked redelivered add it using 
addMessageForRedelivery()
+     * method
+     * @param message
+     *      The MessageReference that is to be added to this list.
+     *
+     * @return
+     */
+    @Override
+    public PendingNode addMessageLast(MessageReference message) {
+        return pagedInPendingDispatch.addMessageLast(message);
+    }
+
+    @Override
+    public PendingNode remove(MessageReference message) {
+        if (pagedInPendingDispatch.contains(message)) {
+            return pagedInPendingDispatch.remove(message);
+        }else if (redeliveredWaitingDispatch.contains(message)) {
+            return redeliveredWaitingDispatch.remove(message);
+        }
+        return null;
+    }
+
+    @Override
+    public int size() {
+        return pagedInPendingDispatch.size() + 
redeliveredWaitingDispatch.size();
+    }
+
+    @Override
+    public Iterator<MessageReference> iterator() {
+        return new Iterator<MessageReference>() {
+
+            Iterator<MessageReference> redeliveries = 
redeliveredWaitingDispatch.iterator();
+            Iterator<MessageReference> pendingDispatch = 
pagedInPendingDispatch.iterator();
+            Iterator<MessageReference> current = redeliveries;
+
+
+            @Override
+            public boolean hasNext() {
+                if (!redeliveries.hasNext() && (current == redeliveries)) {
+                    current = pendingDispatch;
+                }
+                return current.hasNext();
+            }
+
+            @Override
+            public MessageReference next() {
+                return current.next();
+            }
+
+            @Override
+            public void remove() {
+                current.remove();
+            }
+        };
+    }
+
+    @Override
+    public boolean contains(MessageReference message) {
+        return pagedInPendingDispatch.contains(message) || 
redeliveredWaitingDispatch.contains(message);
+    }
+
+    @Override
+    public Collection<MessageReference> values() {
+        List<MessageReference> messageReferences = new 
ArrayList<MessageReference>();
+        Iterator<MessageReference> iterator = iterator();
+        while (iterator.hasNext()) {
+            messageReferences.add(iterator.next());
+        }
+        return messageReferences;
+    }
+
+    @Override
+    public void addAll(PendingList pendingList) {
+        pagedInPendingDispatch.addAll(pendingList);
+    }
+
+    @Override
+    public MessageReference get(MessageId messageId) {
+        MessageReference rc = pagedInPendingDispatch.get(messageId);
+        if (rc == null) {
+            return redeliveredWaitingDispatch.get(messageId);
+        }
+        return rc;
+    }
+
+    public void setPrioritizedMessages(boolean prioritizedMessages) {
+        if (prioritizedMessages && this.pagedInPendingDispatch instanceof 
OrderedPendingList) {
+            pagedInPendingDispatch = new PrioritizedPendingList();
+            redeliveredWaitingDispatch = new PrioritizedPendingList();
+        } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) {
+            pagedInPendingDispatch = new OrderedPendingList();
+            redeliveredWaitingDispatch = new OrderedPendingList();
+        }
+    }
+
+    public void addMessageForRedelivery(QueueMessageReference qmr) {
+        redeliveredWaitingDispatch.addMessageLast(qmr);
+    }
+
+    public boolean hasRedeliveries(){
+        return !redeliveredWaitingDispatch.isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/efc9a8d5/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueSelectorTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueSelectorTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueSelectorTest.java
index 449edda..63e9856 100755
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueSelectorTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueSelectorTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.activemq;
 
+import javax.jms.*;
+
 /**
  * 
  */
@@ -25,4 +27,54 @@ public class JmsQueueSelectorTest extends 
JmsTopicSelectorTest {
         topic = false;
         super.setUp();
     }
+
+    public void testRedeliveryWithSelectors() throws Exception {
+        consumer = createConsumer("");
+
+        // send a message that would go to this consumer, but not to the next 
consumer we open
+        TextMessage message = session.createTextMessage("1");
+        message.setIntProperty("id", 1);
+        message.setJMSType("b");
+        message.setStringProperty("stringProperty", "b");
+        message.setLongProperty("longProperty", 1);
+        message.setBooleanProperty("booleanProperty", true);
+        producer.send(message);
+
+        // don't consume any messages.. close the consumer so that messages 
that had
+        // been dispatched get marked as delivered, and queued for redelivery
+        consumer.close();
+
+        // send a message that will match the selector for the next consumer
+        message = session.createTextMessage("1");
+        message.setIntProperty("id", 1);
+        message.setJMSType("a");
+        message.setStringProperty("stringProperty", "a");
+        message.setLongProperty("longProperty", 1);
+        message.setBooleanProperty("booleanProperty", true);
+        producer.send(message);
+
+        consumer = createConsumer("stringProperty = 'a' and longProperty = 1 
and booleanProperty = true");
+
+        // now we, should only receive 1 message, not two
+        int remaining = 2;
+
+        javax.jms.Message recievedMsg = null;
+
+        while (true) {
+            recievedMsg = consumer.receive(1000);
+            if (recievedMsg == null) {
+                break;
+            }
+            String text = ((TextMessage)recievedMsg).getText();
+            if (!text.equals("1") && !text.equals("3")) {
+                fail("unexpected message: " + text);
+            }
+            remaining--;
+        }
+
+        assertEquals(1, remaining);
+        consumer.close();
+        consumeMessages(remaining);
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/efc9a8d5/activemq-unit-tests/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java
index e6a2503..d81af94 100755
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java
@@ -156,11 +156,12 @@ public class JmsTopicSelectorTest extends TestSupport {
 
             remaining--;
         }
-        assertEquals(remaining, 0);
+        assertEquals(0, remaining);
         consumer.close();
         consumeMessages(remaining);
     }
 
+
     public void testPropertySelector() throws Exception {
         int remaining = 5;
         Message message = null;
@@ -177,7 +178,7 @@ public class JmsTopicSelectorTest extends TestSupport {
             }
             remaining--;
         }
-        assertEquals(remaining, 3);
+        assertEquals(3, remaining);
         consumer.close();
         consumeMessages(remaining);
 
@@ -199,7 +200,7 @@ public class JmsTopicSelectorTest extends TestSupport {
             }
             remaining--;
         }
-        assertEquals(remaining, 2);
+        assertEquals(2, remaining);
         consumer.close();
         consumeMessages(remaining);
 

Reply via email to