Repository: activemq Updated Branches: refs/heads/master f56ea45e5 -> efc9a8d57
Fix for https://issues.apache.org/jira/browse/AMQ-5689 Queue dispatching hangs when there are redelivered messages that dont match current consumers selectors, refactored out the pendingDispatchList in Queue implementation Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/efc9a8d5 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/efc9a8d5 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/efc9a8d5 Branch: refs/heads/master Commit: efc9a8d5782143b8c1e86750d34ce74af5a2940a Parents: f56ea45 Author: Christian Posta <christian.po...@gmail.com> Authored: Thu Mar 26 15:48:34 2015 -0700 Committer: Christian Posta <christian.po...@gmail.com> Committed: Fri Mar 27 15:19:56 2015 -0700 ---------------------------------------------------------------------- .../apache/activemq/broker/region/Queue.java | 69 +++----- .../cursors/QueueDispatchPendingList.java | 174 +++++++++++++++++++ .../apache/activemq/JmsQueueSelectorTest.java | 52 ++++++ .../apache/activemq/JmsTopicSelectorTest.java | 7 +- 4 files changed, 252 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/efc9a8d5/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index c3de541..3438dcc 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -49,12 +49,7 @@ import javax.jms.ResourceAllocationException; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; -import org.apache.activemq.broker.region.cursors.OrderedPendingList; -import org.apache.activemq.broker.region.cursors.PendingList; -import org.apache.activemq.broker.region.cursors.PendingMessageCursor; -import org.apache.activemq.broker.region.cursors.PrioritizedPendingList; -import org.apache.activemq.broker.region.cursors.StoreQueueCursor; -import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; +import org.apache.activemq.broker.region.cursors.*; import org.apache.activemq.broker.region.group.CachedMessageGroupMapFactory; import org.apache.activemq.broker.region.group.MessageGroupMap; import org.apache.activemq.broker.region.group.MessageGroupMapFactory; @@ -109,8 +104,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index private final PendingList pagedInMessages = new OrderedPendingList(); // Messages that are paged in but have not yet been targeted at a subscription private final ReentrantReadWriteLock pagedInPendingDispatchLock = new ReentrantReadWriteLock(); - protected PendingList pagedInPendingDispatch = new OrderedPendingList(); - protected PendingList redeliveredWaitingDispatch = new OrderedPendingList(); + protected QueueDispatchPendingList dispatchPendingList = new QueueDispatchPendingList(); private MessageGroupMap messageGroupOwners; private DispatchPolicy dispatchPolicy = new RoundRobinDispatchPolicy(); private MessageGroupMapFactory messageGroupMapFactory = new CachedMessageGroupMapFactory(); @@ -343,14 +337,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index @Override public void setPrioritizedMessages(boolean prioritizedMessages) { super.setPrioritizedMessages(prioritizedMessages); - - if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) { - pagedInPendingDispatch = new PrioritizedPendingList(); - redeliveredWaitingDispatch = new PrioritizedPendingList(); - } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) { - pagedInPendingDispatch = new OrderedPendingList(); - redeliveredWaitingDispatch = new OrderedPendingList(); - } + dispatchPendingList.setPrioritizedMessages(prioritizedMessages); } @Override @@ -583,7 +570,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index } } if (!qmr.isDropped()) { - redeliveredWaitingDispatch.addMessageLast(qmr); + dispatchPendingList.addMessageForRedelivery(qmr); } } if (sub instanceof QueueBrowserSubscription) { @@ -591,7 +578,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index browserDispatches.remove(sub); } // AMQ-5107: don't resend if the broker is shutting down - if (!redeliveredWaitingDispatch.isEmpty() && (! this.brokerService.isStopping())) { + if (dispatchPendingList.hasRedeliveries() && (! this.brokerService.isStopping())) { doDispatch(new OrderedPendingList()); } } finally { @@ -1118,8 +1105,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index pageInMessages(!memoryUsage.isFull(110)); }; - doBrowseList(browseList, max, redeliveredWaitingDispatch, pagedInPendingDispatchLock, connectionContext, "redeliveredWaitingDispatch"); - doBrowseList(browseList, max, pagedInPendingDispatch, pagedInPendingDispatchLock, connectionContext, "pagedInPendingDispatch"); + doBrowseList(browseList, max, dispatchPendingList, pagedInPendingDispatchLock, connectionContext, "redeliveredWaitingDispatch+pagedInPendingDispatch"); doBrowseList(browseList, max, pagedInMessages, pagedInMessagesLock, connectionContext, "pagedInMessages"); // we need a store iterator to walk messages on disk, independent of the cursor which is tracking @@ -1581,7 +1567,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index pagedInPendingDispatchLock.readLock().lock(); try { - pageInMoreMessages |= !pagedInPendingDispatch.isEmpty(); + pageInMoreMessages |= !dispatchPendingList.isEmpty(); } finally { pagedInPendingDispatchLock.readLock().unlock(); } @@ -1593,7 +1579,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index // then we do a dispatch. boolean hasBrowsers = browserDispatches.size() > 0; - if (pageInMoreMessages || hasBrowsers || !redeliveredWaitingDispatch.isEmpty()) { + if (pageInMoreMessages || hasBrowsers || !dispatchPendingList.hasRedeliveries()) { try { pageInMessages(hasBrowsers); } catch (Throwable e) { @@ -1710,7 +1696,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index removeMessage(c, null, r); pagedInPendingDispatchLock.writeLock().lock(); try { - pagedInPendingDispatch.remove(r); + dispatchPendingList.remove(r); } finally { pagedInPendingDispatchLock.writeLock().unlock(); } @@ -1857,13 +1843,13 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index PendingList newlyPaged = doPageInForDispatch(force, processExpired); pagedInPendingDispatchLock.writeLock().lock(); try { - if (pagedInPendingDispatch.isEmpty()) { - pagedInPendingDispatch.addAll(newlyPaged); + if (dispatchPendingList.isEmpty()) { + dispatchPendingList.addAll(newlyPaged); } else { for (MessageReference qmr : newlyPaged) { - if (!pagedInPendingDispatch.contains(qmr)) { - pagedInPendingDispatch.addMessageLast(qmr); + if (!dispatchPendingList.contains(qmr)) { + dispatchPendingList.addMessageLast(qmr); } } } @@ -1880,7 +1866,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index int pagedInPendingSize = 0; pagedInPendingDispatchLock.readLock().lock(); try { - pagedInPendingSize = pagedInPendingDispatch.size(); + pagedInPendingSize = dispatchPendingList.size(); } finally { pagedInPendingDispatchLock.readLock().unlock(); } @@ -1973,27 +1959,16 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index pagedInPendingDispatchLock.writeLock().lock(); try { - if (!redeliveredWaitingDispatch.isEmpty()) { - // Try first to dispatch redelivered messages to keep an - // proper order - redeliveredWaitingDispatch = doActualDispatch(redeliveredWaitingDispatch); - } - if (redeliveredWaitingDispatch.isEmpty()) { - if (!pagedInPendingDispatch.isEmpty()) { - // Next dispatch anything that had not been - // dispatched before. - pagedInPendingDispatch = doActualDispatch(pagedInPendingDispatch); - } - } + doActualDispatch(dispatchPendingList); // and now see if we can dispatch the new stuff.. and append to the pending // list anything that does not actually get dispatched. if (list != null && !list.isEmpty()) { - if (redeliveredWaitingDispatch.isEmpty() && pagedInPendingDispatch.isEmpty()) { - pagedInPendingDispatch.addAll(doActualDispatch(list)); + if (dispatchPendingList.isEmpty()) { + dispatchPendingList.addAll(doActualDispatch(list)); } else { for (MessageReference qmr : list) { - if (!pagedInPendingDispatch.contains(qmr)) { - pagedInPendingDispatch.addMessageLast(qmr); + if (!dispatchPendingList.contains(qmr)) { + dispatchPendingList.addMessageLast(qmr); } } doWakeUp = true; @@ -2192,10 +2167,10 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index pagedInPendingDispatchLock.writeLock().lock(); try { - for (MessageReference ref : pagedInPendingDispatch) { + for (MessageReference ref : dispatchPendingList) { if (messageId.equals(ref.getMessageId())) { message = (QueueMessageReference)ref; - pagedInPendingDispatch.remove(ref); + dispatchPendingList.remove(ref); break; } } @@ -2245,7 +2220,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index throw new JMSException("Slave broker out of sync with master - Message: " + messageDispatchNotification.getMessageId() + " on " + messageDispatchNotification.getDestination() + " does not exist among pending(" - + pagedInPendingDispatch.size() + ") for subscription: " + + dispatchPendingList.size() + ") for subscription: " + messageDispatchNotification.getConsumerId()); } return message; http://git-wip-us.apache.org/repos/asf/activemq/blob/efc9a8d5/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java new file mode 100644 index 0000000..8c6032b --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/QueueDispatchPendingList.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.cursors; + +import org.apache.activemq.broker.region.MessageReference; +import org.apache.activemq.broker.region.QueueMessageReference; +import org.apache.activemq.command.MessageId; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; + +/** + * An abstraction that keeps the correct order of messages that need to be dispatched + * to consumers, but also hides the fact that there might be redelivered messages that + * should be dispatched ahead of any other paged in messages. + * + * Direct usage of this class is recommended as you can control when redeliveries need + * to be added vs regular pending messages (the next set of messages that can be dispatched) + * + * Created by ceposta + * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>. + */ +public class QueueDispatchPendingList implements PendingList { + + private PendingList pagedInPendingDispatch = new OrderedPendingList(); + private PendingList redeliveredWaitingDispatch = new OrderedPendingList(); + + + @Override + public boolean isEmpty() { + return pagedInPendingDispatch.isEmpty() && redeliveredWaitingDispatch.isEmpty(); + } + + @Override + public void clear() { + pagedInPendingDispatch.clear(); + redeliveredWaitingDispatch.clear(); + } + + /** + * Messages added are added directly to the pagedInPendingDispatch set of messages. If + * you're trying to add a message that is marked redelivered add it using addMessageForRedelivery() + * method + * @param message + * The MessageReference that is to be added to this list. + * + * @return + */ + @Override + public PendingNode addMessageFirst(MessageReference message) { + return pagedInPendingDispatch.addMessageFirst(message); + } + + /** + * Messages added are added directly to the pagedInPendingDispatch set of messages. If + * you're trying to add a message that is marked redelivered add it using addMessageForRedelivery() + * method + * @param message + * The MessageReference that is to be added to this list. + * + * @return + */ + @Override + public PendingNode addMessageLast(MessageReference message) { + return pagedInPendingDispatch.addMessageLast(message); + } + + @Override + public PendingNode remove(MessageReference message) { + if (pagedInPendingDispatch.contains(message)) { + return pagedInPendingDispatch.remove(message); + }else if (redeliveredWaitingDispatch.contains(message)) { + return redeliveredWaitingDispatch.remove(message); + } + return null; + } + + @Override + public int size() { + return pagedInPendingDispatch.size() + redeliveredWaitingDispatch.size(); + } + + @Override + public Iterator<MessageReference> iterator() { + return new Iterator<MessageReference>() { + + Iterator<MessageReference> redeliveries = redeliveredWaitingDispatch.iterator(); + Iterator<MessageReference> pendingDispatch = pagedInPendingDispatch.iterator(); + Iterator<MessageReference> current = redeliveries; + + + @Override + public boolean hasNext() { + if (!redeliveries.hasNext() && (current == redeliveries)) { + current = pendingDispatch; + } + return current.hasNext(); + } + + @Override + public MessageReference next() { + return current.next(); + } + + @Override + public void remove() { + current.remove(); + } + }; + } + + @Override + public boolean contains(MessageReference message) { + return pagedInPendingDispatch.contains(message) || redeliveredWaitingDispatch.contains(message); + } + + @Override + public Collection<MessageReference> values() { + List<MessageReference> messageReferences = new ArrayList<MessageReference>(); + Iterator<MessageReference> iterator = iterator(); + while (iterator.hasNext()) { + messageReferences.add(iterator.next()); + } + return messageReferences; + } + + @Override + public void addAll(PendingList pendingList) { + pagedInPendingDispatch.addAll(pendingList); + } + + @Override + public MessageReference get(MessageId messageId) { + MessageReference rc = pagedInPendingDispatch.get(messageId); + if (rc == null) { + return redeliveredWaitingDispatch.get(messageId); + } + return rc; + } + + public void setPrioritizedMessages(boolean prioritizedMessages) { + if (prioritizedMessages && this.pagedInPendingDispatch instanceof OrderedPendingList) { + pagedInPendingDispatch = new PrioritizedPendingList(); + redeliveredWaitingDispatch = new PrioritizedPendingList(); + } else if(pagedInPendingDispatch instanceof PrioritizedPendingList) { + pagedInPendingDispatch = new OrderedPendingList(); + redeliveredWaitingDispatch = new OrderedPendingList(); + } + } + + public void addMessageForRedelivery(QueueMessageReference qmr) { + redeliveredWaitingDispatch.addMessageLast(qmr); + } + + public boolean hasRedeliveries(){ + return !redeliveredWaitingDispatch.isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/efc9a8d5/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueSelectorTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueSelectorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueSelectorTest.java index 449edda..63e9856 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueSelectorTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsQueueSelectorTest.java @@ -17,6 +17,8 @@ package org.apache.activemq; +import javax.jms.*; + /** * */ @@ -25,4 +27,54 @@ public class JmsQueueSelectorTest extends JmsTopicSelectorTest { topic = false; super.setUp(); } + + public void testRedeliveryWithSelectors() throws Exception { + consumer = createConsumer(""); + + // send a message that would go to this consumer, but not to the next consumer we open + TextMessage message = session.createTextMessage("1"); + message.setIntProperty("id", 1); + message.setJMSType("b"); + message.setStringProperty("stringProperty", "b"); + message.setLongProperty("longProperty", 1); + message.setBooleanProperty("booleanProperty", true); + producer.send(message); + + // don't consume any messages.. close the consumer so that messages that had + // been dispatched get marked as delivered, and queued for redelivery + consumer.close(); + + // send a message that will match the selector for the next consumer + message = session.createTextMessage("1"); + message.setIntProperty("id", 1); + message.setJMSType("a"); + message.setStringProperty("stringProperty", "a"); + message.setLongProperty("longProperty", 1); + message.setBooleanProperty("booleanProperty", true); + producer.send(message); + + consumer = createConsumer("stringProperty = 'a' and longProperty = 1 and booleanProperty = true"); + + // now we, should only receive 1 message, not two + int remaining = 2; + + javax.jms.Message recievedMsg = null; + + while (true) { + recievedMsg = consumer.receive(1000); + if (recievedMsg == null) { + break; + } + String text = ((TextMessage)recievedMsg).getText(); + if (!text.equals("1") && !text.equals("3")) { + fail("unexpected message: " + text); + } + remaining--; + } + + assertEquals(1, remaining); + consumer.close(); + consumeMessages(remaining); + + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/efc9a8d5/activemq-unit-tests/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java index e6a2503..d81af94 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JmsTopicSelectorTest.java @@ -156,11 +156,12 @@ public class JmsTopicSelectorTest extends TestSupport { remaining--; } - assertEquals(remaining, 0); + assertEquals(0, remaining); consumer.close(); consumeMessages(remaining); } + public void testPropertySelector() throws Exception { int remaining = 5; Message message = null; @@ -177,7 +178,7 @@ public class JmsTopicSelectorTest extends TestSupport { } remaining--; } - assertEquals(remaining, 3); + assertEquals(3, remaining); consumer.close(); consumeMessages(remaining); @@ -199,7 +200,7 @@ public class JmsTopicSelectorTest extends TestSupport { } remaining--; } - assertEquals(remaining, 2); + assertEquals(2, remaining); consumer.close(); consumeMessages(remaining);