activemq git commit: https://issues.apache.org/jira/browse/AMQ-5748

2015-08-08 Thread cshannon
Repository: activemq
Updated Branches:
  refs/heads/master a3c8bee1f -> a49d46e3c


https://issues.apache.org/jira/browse/AMQ-5748

Updating MemoryTopicMessageStore to decrement store statistics on cache
eviction.  Updating KahaDBMessageStoreSizeStatTest to account for the
fact that a LRU cache is used so the last 100 messages are kept in
memroy.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a49d46e3
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a49d46e3
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a49d46e3

Branch: refs/heads/master
Commit: a49d46e3ca689af6f2cb721c457be97d654b2492
Parents: a3c8bee
Author: Christopher L. Shannon (cshannon) 
Authored: Sat Aug 8 17:55:41 2015 +
Committer: Christopher L. Shannon (cshannon) 
Committed: Sat Aug 8 17:59:19 2015 +

--
 .../store/memory/MemoryMessageStore.java|  16 +-
 .../store/memory/MemoryTopicMessageStore.java   |  56 ++-
 .../store/AbstractMessageStoreSizeStatTest.java | 150 ++-
 .../kahadb/KahaDBMessageStoreSizeStatTest.java  |   4 +-
 .../MultiKahaDBMessageStoreSizeStatTest.java|  12 +-
 .../memory/MemoryMessageStoreSizeStatTest.java  |  57 ++-
 6 files changed, 242 insertions(+), 53 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/a49d46e3/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
index 51006c2..3989646 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
@@ -57,8 +57,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
 public synchronized void addMessage(ConnectionContext context, Message 
message) throws IOException {
 synchronized (messageTable) {
 messageTable.put(message.getMessageId(), message);
-getMessageStoreStatistics().getMessageCount().increment();
-
getMessageStoreStatistics().getMessageSize().addSize(message.getSize());
+incMessageStoreStatistics(message);
 }
 message.incrementReferenceCount();
 message.getMessageId().setFutureOrSequenceLong(sequenceId++);
@@ -94,8 +93,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
 Message removed = messageTable.remove(msgId);
 if( removed !=null ) {
 removed.decrementReferenceCount();
-getMessageStoreStatistics().getMessageCount().decrement();
-
getMessageStoreStatistics().getMessageSize().addSize(-removed.getSize());
+decMessageStoreStatistics(removed);
 }
 if ((lastBatchId != null && lastBatchId.equals(msgId)) || 
messageTable.isEmpty()) {
 lastBatchId = null;
@@ -200,4 +198,14 @@ public class MemoryMessageStore extends 
AbstractMessageStore {
 }
 }
 
+protected final void incMessageStoreStatistics(Message message) {
+getMessageStoreStatistics().getMessageCount().increment();
+
getMessageStoreStatistics().getMessageSize().addSize(message.getSize());
+}
+
+protected final void decMessageStoreStatistics(Message message) {
+getMessageStoreStatistics().getMessageCount().decrement();
+
getMessageStoreStatistics().getMessageSize().addSize(-message.getSize());
+}
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/a49d46e3/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
index 0debfe6..142547f 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
+
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
@@ -29,36 +30,47 @@ import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageId;
 import org.apache.activemq.command.SubscriptionInfo;
 import org

[2/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5748

2015-07-07 Thread tabish
https://issues.apache.org/jira/browse/AMQ-5748

Added a getMessageSize method to MessageStore to support retrieving the
total message size of all stored messages for a destination.  Added a
new storeMessageSize statistic to DestinationStatistics.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/785b16bf
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/785b16bf
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/785b16bf

Branch: refs/heads/master
Commit: 785b16bf9ef19180e7c9783442f4a125b44255e1
Parents: 7a68ad5
Author: Christopher L. Shannon (cshannon) 
Authored: Mon Apr 27 18:24:16 2015 +
Committer: Christopher L. Shannon (cshannon) 
Committed: Tue Jul 7 18:03:27 2015 +

--
 .../activemq/broker/jmx/DestinationView.java|   8 +
 .../broker/jmx/DestinationViewMBean.java|   8 +
 .../apache/activemq/broker/region/Queue.java|   1 +
 .../apache/activemq/broker/region/Topic.java|   1 +
 .../activemq/store/AbstractMessageStore.java|  21 ++
 .../org/apache/activemq/store/MessageStore.java |  12 +
 .../activemq/store/MessageStoreStatistics.java  |  81 ++
 .../activemq/store/ProxyMessageStore.java   |  11 +
 .../activemq/store/ProxyTopicMessageStore.java  |  12 +-
 .../store/memory/MemoryMessageStore.java|  43 ++-
 .../activemq/management/SizeStatisticImpl.java  |  17 ++
 .../activemq/store/jdbc/JDBCMessageStore.java   |   2 +
 .../store/journal/JournalMessageStore.java  |  15 +-
 .../activemq/store/kahadb/KahaDBStore.java  |  69 ++---
 .../activemq/store/kahadb/MessageDatabase.java  | 172 +++-
 .../activemq/store/kahadb/TempKahaDBStore.java  |  40 +--
 .../kahadb/disk/util/LocationMarshaller.java|   5 +
 .../apache/activemq/leveldb/LevelDBStore.scala  |   2 +-
 .../cursors/StoreQueueCursorOrderTest.java  |  10 +-
 .../store/AbstractMessageStoreSizeStatTest.java | 266 +++
 .../store/AbstractMessageStoreSizeTest.java |  98 +++
 .../AbstractKahaDBMessageStoreSizeTest.java | 147 ++
 .../kahadb/KahaDBMessageStoreSizeStatTest.java  |  82 ++
 .../kahadb/KahaDBMessageStoreSizeTest.java  |  46 
 .../MultiKahaDBMessageStoreSizeStatTest.java| 134 ++
 .../kahadb/MultiKahaDBMessageStoreSizeTest.java |  68 +
 .../memory/MemoryMessageStoreSizeStatTest.java  |  45 
 .../memory/MemoryMessageStoreSizeTest.java  |  45 
 .../kahadb/MessageStoreTest/version5/db-1.log   | Bin 0 -> 524288 bytes
 .../kahadb/MessageStoreTest/version5/db.data| Bin 0 -> 32768 bytes
 .../kahadb/MessageStoreTest/version5/db.redo| Bin 0 -> 32824 bytes
 .../version5/queue#3a#2f#2fTest/db-1.log| Bin 0 -> 524288 bytes
 .../version5/queue#3a#2f#2fTest/db.data | Bin 0 -> 32768 bytes
 .../version5/queue#3a#2f#2fTest/db.redo | Bin 0 -> 32824 bytes
 34 files changed, 1382 insertions(+), 79 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
index b3bf869..3e51a49 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java
@@ -38,6 +38,7 @@ import javax.management.openmbean.OpenDataException;
 import javax.management.openmbean.TabularData;
 import javax.management.openmbean.TabularDataSupport;
 import javax.management.openmbean.TabularType;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
 import org.apache.activemq.broker.region.Destination;
@@ -51,6 +52,7 @@ import org.apache.activemq.command.Message;
 import org.apache.activemq.filter.BooleanExpression;
 import org.apache.activemq.filter.MessageEvaluationContext;
 import org.apache.activemq.selector.SelectorParser;
+import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.util.URISupport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -119,6 +121,12 @@ public class DestinationView implements 
DestinationViewMBean {
 return destination.getDestinationStatistics().getMessages().getCount();
 }
 
+@Override
+public long getStoreMessageSize() {
+MessageStore messageStore = destination.getMessageStore();
+return messageStore != null ? 
messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize() : 0;
+}
+
 public long getMessagesCached() {
 return 
destination.getDestin

[1/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5748

2015-07-07 Thread tabish
Repository: activemq
Updated Branches:
  refs/heads/master 7a68ad5d9 -> 2b320ac06


http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeTest.java
--
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeTest.java
new file mode 100644
index 000..398b2f7
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.store.MessageStore;
+import org.apache.commons.io.FileUtils;
+
+/**
+ * This test is for AMQ-5748 to verify that {@link MessageStore} implements 
correctly
+ * compute the size of the messages in the store.
+ *
+ *
+ */
+public class MultiKahaDBMessageStoreSizeTest extends 
AbstractKahaDBMessageStoreSizeTest {
+
+
+@Override
+protected void createStore(boolean deleteAllMessages, String directory) 
throws Exception {
+MultiKahaDBPersistenceAdapter multiStore = new 
MultiKahaDBPersistenceAdapter();
+
+store = multiStore;
+File fileDir = new File(directory);
+
+if (deleteAllMessages && fileDir.exists()) {
+FileUtils.cleanDirectory(new File(directory));
+}
+
+KahaDBPersistenceAdapter localStore = new KahaDBPersistenceAdapter();
+localStore.setJournalMaxFileLength(1024 * 512);
+localStore.setDirectory(new File(directory));
+
+FilteredKahaDBPersistenceAdapter filtered = new 
FilteredKahaDBPersistenceAdapter();
+filtered.setPersistenceAdapter(localStore);
+filtered.setPerDestination(true);
+List stores = new ArrayList<>();
+stores.add(filtered);
+
+multiStore.setFilteredPersistenceAdapters(stores);
+multiStore.setDirectory(fileDir);
+multiStore.start();
+messageStore = store.createQueueMessageStore(destination);
+messageStore.start();
+}
+
+@Override
+protected String getVersion5Dir() {
+return 
"src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5";
+}
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java
--
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java
new file mode 100644
index 000..755936c
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.memory;
+
+import java.io.IOException;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.store.AbstractMessageStoreSizeStatTest;
+import org.slf4j.Logger;
+import org.sl

[1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5748

2015-07-07 Thread tabish
Repository: activemq
Updated Branches:
  refs/heads/master 1dcdf69f3 -> 6f457d2f5


https://issues.apache.org/jira/browse/AMQ-5748

Fixing a potential Null pointer exception in  MemoryMessageStore


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bd28c3b0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bd28c3b0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bd28c3b0

Branch: refs/heads/master
Commit: bd28c3b0ba317c3f07329c083d7097c12beee5b8
Parents: 1dcdf69
Author: Christopher L. Shannon 
Authored: Tue Jul 7 18:54:06 2015 -0400
Committer: Christopher L. Shannon 
Committed: Tue Jul 7 18:54:06 2015 -0400

--
 .../activemq/store/memory/MemoryMessageStore.java   | 12 ++--
 1 file changed, 10 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/bd28c3b0/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
--
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
index e71dab8..51006c2 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java
@@ -53,6 +53,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
 this.messageTable = Collections.synchronizedMap(messageTable);
 }
 
+@Override
 public synchronized void addMessage(ConnectionContext context, Message 
message) throws IOException {
 synchronized (messageTable) {
 messageTable.put(message.getMessageId(), message);
@@ -74,6 +75,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
 // }
 // }
 
+@Override
 public Message getMessage(MessageId identity) throws IOException {
 return messageTable.get(identity);
 }
@@ -82,6 +84,7 @@ public class MemoryMessageStore extends AbstractMessageStore {
 // return (String)messageTable.get(identity);
 // }
 
+@Override
 public void removeMessage(ConnectionContext context, MessageAck ack) 
throws IOException {
 removeMessage(ack.getLastMessageId());
 }
@@ -91,15 +94,16 @@ public class MemoryMessageStore extends 
AbstractMessageStore {
 Message removed = messageTable.remove(msgId);
 if( removed !=null ) {
 removed.decrementReferenceCount();
+getMessageStoreStatistics().getMessageCount().decrement();
+
getMessageStoreStatistics().getMessageSize().addSize(-removed.getSize());
 }
 if ((lastBatchId != null && lastBatchId.equals(msgId)) || 
messageTable.isEmpty()) {
 lastBatchId = null;
 }
-getMessageStoreStatistics().getMessageCount().decrement();
-
getMessageStoreStatistics().getMessageSize().addSize(-removed.getSize());
 }
 }
 
+@Override
 public void recover(MessageRecoveryListener listener) throws Exception {
 // the message table is a synchronizedMap - so just have to synchronize
 // here
@@ -115,6 +119,7 @@ public class MemoryMessageStore extends 
AbstractMessageStore {
 }
 }
 
+@Override
 public void removeAllMessages(ConnectionContext context) throws 
IOException {
 synchronized (messageTable) {
 messageTable.clear();
@@ -129,6 +134,7 @@ public class MemoryMessageStore extends 
AbstractMessageStore {
 }
 }
 
+@Override
 public void recoverNextMessages(int maxReturned, MessageRecoveryListener 
listener) throws Exception {
 synchronized (messageTable) {
 boolean pastLackBatch = lastBatchId == null;
@@ -151,6 +157,7 @@ public class MemoryMessageStore extends 
AbstractMessageStore {
 }
 }
 
+@Override
 public void resetBatching() {
 lastBatchId = null;
 }
@@ -160,6 +167,7 @@ public class MemoryMessageStore extends 
AbstractMessageStore {
 lastBatchId = messageId;
 }
 
+@Override
 public void updateMessage(Message message) {
 synchronized (messageTable) {
 Message original = messageTable.get(message.getMessageId());



[1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5748

2015-07-08 Thread tabish
Repository: activemq
Updated Branches:
  refs/heads/master e1c707e81 -> c853bcf43


https://issues.apache.org/jira/browse/AMQ-5748

Fixing an issue that prevented old versions of KahaDB from being
upgraded to the newest version 6


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8871b0e4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8871b0e4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8871b0e4

Branch: refs/heads/master
Commit: 8871b0e496bae1396fb7323b01d7f715b4a211f7
Parents: e1c707e
Author: Christopher L. Shannon (cshannon) 
Authored: Wed Jul 8 15:50:16 2015 +
Committer: Christopher L. Shannon (cshannon) 
Committed: Wed Jul 8 16:10:11 2015 +

--
 .../activemq/store/kahadb/MessageDatabase.java  |  52 ---
 .../store/kahadb/KahaDBVersionTest.java |   8 ++-
 .../store/kahadb/KahaDBVersion5/db-1.log| Bin 0 -> 1048576 bytes
 .../store/kahadb/KahaDBVersion5/db.data | Bin 0 -> 667648 bytes
 .../store/kahadb/KahaDBVersion5/db.redo | Bin 0 -> 668944 bytes
 5 files changed, 40 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/activemq/blob/8871b0e4/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
--
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index e35619e..2fa4bb1 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1930,7 +1930,7 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
 
 protected class StoredDestinationMarshaller extends 
VariableMarshaller {
 
-   final MessageKeysMarshaller messageKeysMarshaller = new 
MessageKeysMarshaller();
+final MessageKeysMarshaller messageKeysMarshaller = new 
MessageKeysMarshaller();
 
 @Override
 public StoredDestination readPayload(final DataInput dataIn) throws 
IOException {
@@ -2128,6 +2128,15 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
 rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE);
 rc.messageIdIndex.load(tx);
 
+//go through an upgrade old index if older than version 6
+if (metadata.version < 6) {
+for (Iterator> iterator = 
rc.locationIndex.iterator(tx); iterator.hasNext(); ) {
+Entry entry = iterator.next();
+// modify so it is upgraded
+rc.locationIndex.put(tx, entry.getKey(), entry.getValue());
+}
+}
+
 // If it was a topic...
 if (topic) {
 
@@ -2275,24 +2284,24 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
 protected final Map storeCache =
 new ConcurrentHashMap();
 
-   /**
-* Locate the storeMessageSize counter for this KahaDestination
-* @param kahaDestination
-* @return
-*/
-   protected MessageStoreStatistics getStoreStats(String kahaDestKey) {
-   MessageStoreStatistics storeStats = null;
-   try {
-   MessageStore messageStore = storeCache.get(kahaDestKey);
-   if (messageStore != null) {
-   storeStats = messageStore.getMessageStoreStatistics();
-   }
-   } catch (Exception e1) {
-LOG.error("Getting size counter of destination 
failed", e1);
-   }
-
-   return storeStats;
-   }
+/**
+ * Locate the storeMessageSize counter for this KahaDestination
+ * @param kahaDestination
+ * @return
+ */
+protected MessageStoreStatistics getStoreStats(String kahaDestKey) {
+MessageStoreStatistics storeStats = null;
+try {
+MessageStore messageStore = storeCache.get(kahaDestKey);
+if (messageStore != null) {
+storeStats = messageStore.getMessageStoreStatistics();
+}
+} catch (Exception e1) {
+ LOG.error("Getting size counter of destination failed", e1);
+}
+
+return storeStats;
+}
 
 /**
  * Determine whether this Destination matches the DestinationType
@@ -2319,6 +2328,7 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
 
 }
 
+@Override
 public Location readPayload(DataInput dataIn) throws IOException {
 Location rc = new Location();
 r