Repository: activemq Updated Branches: refs/heads/master 03a211ec0 -> 4e23adfcc
https://issues.apache.org/jira/browse/AMQ-6340 combine the lists in the correct order for later redispatch. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4e23adfc Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4e23adfc Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4e23adfc Branch: refs/heads/master Commit: 4e23adfcc981d3e13e7f1b1182b89a954160a26a Parents: 03a211e Author: Timothy Bish <[email protected]> Authored: Wed Jun 29 12:57:30 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Wed Jun 29 12:57:30 2016 -0400 ---------------------------------------------------------------------- .../amqp/JmsTransactedMessageOrderTest.java | 6 ++---- .../broker/region/PrefetchSubscription.java | 18 ++++++++++-------- 2 files changed, 12 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/4e23adfc/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java index c286497..2134759 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JmsTransactedMessageOrderTest.java @@ -69,8 +69,6 @@ public class JmsTransactedMessageOrderTest extends JMSClientTestSupport { policyEntry.setQueue(">"); policyEntry.setStrictOrderDispatch(true); - policyEntry.setProducerFlowControl(true); - policyEntry.setMemoryLimit(1024 * 1024); policyEntries.add(policyEntry); @@ -85,7 +83,7 @@ public class JmsTransactedMessageOrderTest extends JMSClientTestSupport { sendMessages(5); int counter = 0; - while (counter++ < 10) { + while (counter++ < 20) { LOG.info("Creating connection using prefetch of: {}", prefetch); JmsConnectionFactory cf = new JmsConnectionFactory(getAmqpURI("jms.prefetchPolicy.all=" + prefetch)); @@ -100,11 +98,11 @@ public class JmsTransactedMessageOrderTest extends JMSClientTestSupport { Message message = consumer.receive(5000); assertNotNull(message); assertTrue(message instanceof TextMessage); + LOG.info("Read message = {}", ((TextMessage) message).getText()); int sequenceID = message.getIntProperty("sequenceID"); assertEquals(0, sequenceID); - LOG.info("Read message = {}", ((TextMessage) message).getText()); session.rollback(); session.close(); connection.close(); http://git-wip-us.apache.org/repos/asf/activemq/blob/4e23adfc/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 0b2935c..74658cc 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -19,6 +19,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -639,31 +640,32 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } public List<MessageReference> remove(ConnectionContext context, Destination destination, List<MessageReference> dispatched) throws Exception { - List<MessageReference> rc = new ArrayList<MessageReference>(); + LinkedList<MessageReference> redispatch = new LinkedList<MessageReference>(); synchronized(pendingLock) { super.remove(context, destination); // Here is a potential problem concerning Inflight stat: // Messages not already committed or rolled back may not be removed from dispatched list at the moment // Except if each commit or rollback callback action comes before remove of subscriber. - rc.addAll(pending.remove(context, destination)); + redispatch.addAll(pending.remove(context, destination)); if (dispatched == null) { - return rc; + return redispatch; } // Synchronized to DispatchLock if necessary if (dispatched == this.dispatched) { synchronized(dispatchLock) { - updateDestinationStats(rc, destination, dispatched); + addReferencesAndUpdateRedispatch(redispatch, destination, dispatched); } } else { - updateDestinationStats(rc, destination, dispatched); + addReferencesAndUpdateRedispatch(redispatch, destination, dispatched); } } - return rc; + + return redispatch; } - private void updateDestinationStats(List<MessageReference> rc, Destination destination, List<MessageReference> dispatched) { + private void addReferencesAndUpdateRedispatch(LinkedList<MessageReference> redispatch, Destination destination, List<MessageReference> dispatched) { ArrayList<MessageReference> references = new ArrayList<MessageReference>(); for (MessageReference r : dispatched) { if (r.getRegionDestination() == destination) { @@ -671,7 +673,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize()); } } - rc.addAll(references); + redispatch.addAll(0, references); destination.getDestinationStatistics().getInflight().subtract(references.size()); dispatched.removeAll(references); }
