This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/master by this push:
     new fa2daa2  AMQ-7129 - fix durable message size statistics with 
individual ack
fa2daa2 is described below

commit fa2daa25e9acd3f37bb1ee0d37717d2383e67a85
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
AuthorDate: Fri Jan 11 09:56:03 2019 -0500

    AMQ-7129 - fix durable message size statistics with individual ack
    
    Make sure that the pending message size for a durable sub only includes
    messages part of the ack range
---
 .../apache/activemq/store/kahadb/MessageDatabase.java   | 13 ++++++++++---
 .../store/kahadb/KahaDBDurableMessageRecoveryTest.java  | 17 +++++++++++++++++
 2 files changed, 27 insertions(+), 3 deletions(-)

diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 5d9b548..0bc6e81 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -3030,16 +3030,23 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
             SequenceSet messageSequences = sd.ackPositions.get(tx, 
subscriptionKey);
 
             if (messageSequences != null) {
-                Sequence head = messageSequences.getHead();
-                if (head != null) {
+                if (!messageSequences.isEmpty()) {
+                    final Sequence head = messageSequences.getHead();
+
                     //get an iterator over the order index starting at the 
first unacked message
                     //and go over each message to add up the size
                     Iterator<Entry<Long, MessageKeys>> iterator = 
sd.orderIndex.iterator(tx,
                             new MessageOrderCursor(head.getFirst()));
 
+                    final boolean contiguousRange = messageSequences.size() == 
1;
                     while (iterator.hasNext()) {
                         Entry<Long, MessageKeys> entry = iterator.next();
-                        locationSize += entry.getValue().location.getSize();
+                        //Verify sequence contains the key
+                        //if contiguous we just add all starting with the 
first but if not
+                        //we need to check if the id is part of the range - 
could happen if individual ack mode was used
+                        if (contiguousRange || 
messageSequences.contains(entry.getKey())) {
+                            locationSize += 
entry.getValue().location.getSize();
+                        }
                     }
                 }
             }
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
index a44e8c0..519648e 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBDurableMessageRecoveryTest.java
@@ -209,6 +209,12 @@ public class KahaDBDurableMessageRecoveryTest {
         // Verify there are 8 messages left still and restart broker
         assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, 
"clientId1", "sub1"), 3000, 500));
         assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, 
"clientId1", "sub2"), 3000, 500));
+
+        //Verify the pending size is less for sub1
+        assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") > 0);
+        assertTrue(getPendingMessageSize(topic, "clientId1", "sub2") > 0);
+        assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") < 
getPendingMessageSize(topic, "clientId1", "sub2"));
+
         subscriber1.close();
         subscriber2.close();
         restartBroker(recoverIndex);
@@ -217,6 +223,11 @@ public class KahaDBDurableMessageRecoveryTest {
         assertTrue(Wait.waitFor(() -> 8 == getPendingMessageCount(topic, 
"clientId1", "sub1"), 3000, 500));
         assertTrue(Wait.waitFor(() -> 10 == getPendingMessageCount(topic, 
"clientId1", "sub2"), 3000, 500));
 
+        //Verify the pending size is less for sub1
+        assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") > 0);
+        assertTrue(getPendingMessageSize(topic, "clientId1", "sub2") > 0);
+        assertTrue(getPendingMessageSize(topic, "clientId1", "sub1") < 
getPendingMessageSize(topic, "clientId1", "sub2"));
+
         // Recreate subscriber and try and receive the other 8 messages
         session = getSession(ActiveMQSession.AUTO_ACKNOWLEDGE);
         subscriber1 = session.createDurableSubscriber(topic, "sub1");
@@ -347,4 +358,10 @@ public class KahaDBDurableMessageRecoveryTest {
         final TopicMessageStore store = (TopicMessageStore) 
brokerTopic.getMessageStore();
         return store.getMessageCount(clientId, subId);
     }
+
+    protected long getPendingMessageSize(ActiveMQTopic topic, String clientId, 
String subId) throws Exception {
+        final Topic brokerTopic = (Topic) broker.getDestination(topic);
+        final TopicMessageStore store = (TopicMessageStore) 
brokerTopic.getMessageStore();
+        return store.getMessageSize(clientId, subId);
+    }
 }

Reply via email to