Repository: activemq Updated Branches: refs/heads/master c17b7fdc7 -> 25ff5699f
https://issues.apache.org/jira/browse/AMQ-6158 Computing messageSize for a durable subscription in KahaDB now runs much faster (n vs n^2) which is noticable when there are a large number of pending messages for a durable subscription. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/25ff5699 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/25ff5699 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/25ff5699 Branch: refs/heads/master Commit: 25ff5699f1fb7c73668d5da2eacb53ad2ef14289 Parents: c17b7fd Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Thu Feb 4 15:16:11 2016 +0000 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Thu Feb 4 15:17:49 2016 +0000 ---------------------------------------------------------------------- .../activemq/store/kahadb/MessageDatabase.java | 25 ++++++-------- .../AbstractPendingMessageCursorTest.java | 6 ++++ .../cursors/KahaDBPendingMessageCursorTest.java | 36 ++++++++++++++++++++ 3 files changed, 53 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/25ff5699/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index c7e1a40..931a18b 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -2588,31 +2588,28 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } public long getStoredMessageSize(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException { + //grab the messages attached to this subscription SequenceSet messageSequences = sd.ackPositions.get(tx, subscriptionKey); + long locationSize = 0; if (messageSequences != null) { - Iterator<Long> sequences = messageSequences.iterator(); + Sequence head = messageSequences.getHead(); + if (head != null) { + //get an iterator over the order index starting at the first unacked message + //and go over each message to add up the size + Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx, + new MessageOrderCursor(head.getFirst())); - while (sequences.hasNext()) { - Long sequenceId = sequences.next(); - //the last item is the next marker - if (!sequences.hasNext()) { - break; - } - Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); while (iterator.hasNext()) { - Entry<Location, Long> entry = iterator.next(); - if (entry.getValue() == sequenceId - 1) { - locationSize += entry.getKey().getSize(); - break; - } - + Entry<Long, MessageKeys> entry = iterator.next(); + locationSize += entry.getValue().location.getSize(); } } } return locationSize; } + protected String key(KahaDestination destination) { return destination.getType().getNumber() + ":" + destination.getName(); } http://git-wip-us.apache.org/repos/asf/activemq/blob/25ff5699/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java index f496806..a31f402 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.broker.region.cursors; +import static org.junit.Assert.assertEquals; + import java.io.IOException; import java.net.URI; import java.util.concurrent.atomic.AtomicLong; @@ -276,6 +278,10 @@ public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStat verifyPendingStats(dest, subKey, 200, publishedMessageSize.get()); verifyStoreStats(dest, 200, publishedMessageSize.get()); + //should be equal in this case + assertEquals(dest.getDurableTopicSubs().get(subKey).getPendingMessageSize(), + dest.getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize()); + //consume all messages consumeDurableTestMessages(connection, "sub1", 200, publishedMessageSize); http://git-wip-us.apache.org/repos/asf/activemq/blob/25ff5699/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java index e0921dc..8a2c287 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/KahaDBPendingMessageCursorTest.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.broker.region.cursors; +import static org.junit.Assert.assertEquals; + import java.io.File; import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; @@ -84,6 +86,10 @@ public class KahaDBPendingMessageCursorTest extends verifyPendingStats(topic, subKey, 200, publishedMessageSize.get()); verifyStoreStats(topic, 200, publishedMessageSize.get()); + //should be equal in this case + assertEquals(topic.getDurableTopicSubs().get(subKey).getPendingMessageSize(), + topic.getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize()); + // stop, restart broker and publish more messages stopBroker(); this.setUpBroker(false); @@ -101,6 +107,36 @@ public class KahaDBPendingMessageCursorTest extends } + @Test(timeout=60000) + public void testMessageSizeTwoDurablesPartialConsumption() throws Exception { + AtomicLong publishedMessageSize = new AtomicLong(); + + Connection connection = new ActiveMQConnectionFactory(brokerConnectURI).createConnection(); + connection.setClientID("clientId"); + connection.start(); + + SubscriptionKey subKey = new SubscriptionKey("clientId", "sub1"); + SubscriptionKey subKey2 = new SubscriptionKey("clientId", "sub2"); + org.apache.activemq.broker.region.Topic dest = publishTestMessagesDurable( + connection, new String[] {"sub1", "sub2"}, 200, publishedMessageSize, DeliveryMode.PERSISTENT); + + //verify the count and size - durable is offline so all 200 should be pending since none are in prefetch + verifyPendingStats(dest, subKey, 200, publishedMessageSize.get()); + verifyStoreStats(dest, 200, publishedMessageSize.get()); + + //consume all messages + consumeDurableTestMessages(connection, "sub1", 50, publishedMessageSize); + + //150 should be left + verifyPendingStats(dest, subKey, 150, publishedMessageSize.get()); + + //200 should be left + verifyPendingStats(dest, subKey2, 200, publishedMessageSize.get()); + verifyStoreStats(dest, 200, publishedMessageSize.get()); + + connection.close(); + } + /** * Test that the the counter restores size and works after restart and more * messages are published