activemq git commit: https://issues.apache.org/jira/browse/AMQ-5748
Repository: activemq Updated Branches: refs/heads/master a3c8bee1f -> a49d46e3c https://issues.apache.org/jira/browse/AMQ-5748 Updating MemoryTopicMessageStore to decrement store statistics on cache eviction. Updating KahaDBMessageStoreSizeStatTest to account for the fact that a LRU cache is used so the last 100 messages are kept in memroy. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a49d46e3 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a49d46e3 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a49d46e3 Branch: refs/heads/master Commit: a49d46e3ca689af6f2cb721c457be97d654b2492 Parents: a3c8bee Author: Christopher L. Shannon (cshannon) Authored: Sat Aug 8 17:55:41 2015 + Committer: Christopher L. Shannon (cshannon) Committed: Sat Aug 8 17:59:19 2015 + -- .../store/memory/MemoryMessageStore.java| 16 +- .../store/memory/MemoryTopicMessageStore.java | 56 ++- .../store/AbstractMessageStoreSizeStatTest.java | 150 ++- .../kahadb/KahaDBMessageStoreSizeStatTest.java | 4 +- .../MultiKahaDBMessageStoreSizeStatTest.java| 12 +- .../memory/MemoryMessageStoreSizeStatTest.java | 57 ++- 6 files changed, 242 insertions(+), 53 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/a49d46e3/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java index 51006c2..3989646 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java @@ -57,8 +57,7 @@ public class MemoryMessageStore extends AbstractMessageStore { public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { synchronized (messageTable) { messageTable.put(message.getMessageId(), message); -getMessageStoreStatistics().getMessageCount().increment(); - getMessageStoreStatistics().getMessageSize().addSize(message.getSize()); +incMessageStoreStatistics(message); } message.incrementReferenceCount(); message.getMessageId().setFutureOrSequenceLong(sequenceId++); @@ -94,8 +93,7 @@ public class MemoryMessageStore extends AbstractMessageStore { Message removed = messageTable.remove(msgId); if( removed !=null ) { removed.decrementReferenceCount(); -getMessageStoreStatistics().getMessageCount().decrement(); - getMessageStoreStatistics().getMessageSize().addSize(-removed.getSize()); +decMessageStoreStatistics(removed); } if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) { lastBatchId = null; @@ -200,4 +198,14 @@ public class MemoryMessageStore extends AbstractMessageStore { } } +protected final void incMessageStoreStatistics(Message message) { +getMessageStoreStatistics().getMessageCount().increment(); + getMessageStoreStatistics().getMessageSize().addSize(message.getSize()); +} + +protected final void decMessageStoreStatistics(Message message) { +getMessageStoreStatistics().getMessageCount().decrement(); + getMessageStoreStatistics().getMessageSize().addSize(-message.getSize()); +} + } http://git-wip-us.apache.org/repos/asf/activemq/blob/a49d46e3/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java index 0debfe6..142547f 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryTopicMessageStore.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; + import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; @@ -29,36 +30,47 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; import org
[2/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5748
https://issues.apache.org/jira/browse/AMQ-5748 Added a getMessageSize method to MessageStore to support retrieving the total message size of all stored messages for a destination. Added a new storeMessageSize statistic to DestinationStatistics. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/785b16bf Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/785b16bf Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/785b16bf Branch: refs/heads/master Commit: 785b16bf9ef19180e7c9783442f4a125b44255e1 Parents: 7a68ad5 Author: Christopher L. Shannon (cshannon) Authored: Mon Apr 27 18:24:16 2015 + Committer: Christopher L. Shannon (cshannon) Committed: Tue Jul 7 18:03:27 2015 + -- .../activemq/broker/jmx/DestinationView.java| 8 + .../broker/jmx/DestinationViewMBean.java| 8 + .../apache/activemq/broker/region/Queue.java| 1 + .../apache/activemq/broker/region/Topic.java| 1 + .../activemq/store/AbstractMessageStore.java| 21 ++ .../org/apache/activemq/store/MessageStore.java | 12 + .../activemq/store/MessageStoreStatistics.java | 81 ++ .../activemq/store/ProxyMessageStore.java | 11 + .../activemq/store/ProxyTopicMessageStore.java | 12 +- .../store/memory/MemoryMessageStore.java| 43 ++- .../activemq/management/SizeStatisticImpl.java | 17 ++ .../activemq/store/jdbc/JDBCMessageStore.java | 2 + .../store/journal/JournalMessageStore.java | 15 +- .../activemq/store/kahadb/KahaDBStore.java | 69 ++--- .../activemq/store/kahadb/MessageDatabase.java | 172 +++- .../activemq/store/kahadb/TempKahaDBStore.java | 40 +-- .../kahadb/disk/util/LocationMarshaller.java| 5 + .../apache/activemq/leveldb/LevelDBStore.scala | 2 +- .../cursors/StoreQueueCursorOrderTest.java | 10 +- .../store/AbstractMessageStoreSizeStatTest.java | 266 +++ .../store/AbstractMessageStoreSizeTest.java | 98 +++ .../AbstractKahaDBMessageStoreSizeTest.java | 147 ++ .../kahadb/KahaDBMessageStoreSizeStatTest.java | 82 ++ .../kahadb/KahaDBMessageStoreSizeTest.java | 46 .../MultiKahaDBMessageStoreSizeStatTest.java| 134 ++ .../kahadb/MultiKahaDBMessageStoreSizeTest.java | 68 + .../memory/MemoryMessageStoreSizeStatTest.java | 45 .../memory/MemoryMessageStoreSizeTest.java | 45 .../kahadb/MessageStoreTest/version5/db-1.log | Bin 0 -> 524288 bytes .../kahadb/MessageStoreTest/version5/db.data| Bin 0 -> 32768 bytes .../kahadb/MessageStoreTest/version5/db.redo| Bin 0 -> 32824 bytes .../version5/queue#3a#2f#2fTest/db-1.log| Bin 0 -> 524288 bytes .../version5/queue#3a#2f#2fTest/db.data | Bin 0 -> 32768 bytes .../version5/queue#3a#2f#2fTest/db.redo | Bin 0 -> 32824 bytes 34 files changed, 1382 insertions(+), 79 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index b3bf869..3e51a49 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -38,6 +38,7 @@ import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; + import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory; import org.apache.activemq.broker.region.Destination; @@ -51,6 +52,7 @@ import org.apache.activemq.command.Message; import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.selector.SelectorParser; +import org.apache.activemq.store.MessageStore; import org.apache.activemq.util.URISupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -119,6 +121,12 @@ public class DestinationView implements DestinationViewMBean { return destination.getDestinationStatistics().getMessages().getCount(); } +@Override +public long getStoreMessageSize() { +MessageStore messageStore = destination.getMessageStore(); +return messageStore != null ? messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize() : 0; +} + public long getMessagesCached() { return destination.getDestin
[1/3] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5748
Repository: activemq Updated Branches: refs/heads/master 7a68ad5d9 -> 2b320ac06 http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeTest.java -- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeTest.java new file mode 100644 index 000..398b2f7 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.store.MessageStore; +import org.apache.commons.io.FileUtils; + +/** + * This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly + * compute the size of the messages in the store. + * + * + */ +public class MultiKahaDBMessageStoreSizeTest extends AbstractKahaDBMessageStoreSizeTest { + + +@Override +protected void createStore(boolean deleteAllMessages, String directory) throws Exception { +MultiKahaDBPersistenceAdapter multiStore = new MultiKahaDBPersistenceAdapter(); + +store = multiStore; +File fileDir = new File(directory); + +if (deleteAllMessages && fileDir.exists()) { +FileUtils.cleanDirectory(new File(directory)); +} + +KahaDBPersistenceAdapter localStore = new KahaDBPersistenceAdapter(); +localStore.setJournalMaxFileLength(1024 * 512); +localStore.setDirectory(new File(directory)); + +FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter(); +filtered.setPersistenceAdapter(localStore); +filtered.setPerDestination(true); +List stores = new ArrayList<>(); +stores.add(filtered); + +multiStore.setFilteredPersistenceAdapters(stores); +multiStore.setDirectory(fileDir); +multiStore.start(); +messageStore = store.createQueueMessageStore(destination); +messageStore.start(); +} + +@Override +protected String getVersion5Dir() { +return "src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5"; +} + +} http://git-wip-us.apache.org/repos/asf/activemq/blob/785b16bf/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java -- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java new file mode 100644 index 000..755936c --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.memory; + +import java.io.IOException; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.store.AbstractMessageStoreSizeStatTest; +import org.slf4j.Logger; +import org.sl
[1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5748
Repository: activemq Updated Branches: refs/heads/master 1dcdf69f3 -> 6f457d2f5 https://issues.apache.org/jira/browse/AMQ-5748 Fixing a potential Null pointer exception in MemoryMessageStore Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/bd28c3b0 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/bd28c3b0 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/bd28c3b0 Branch: refs/heads/master Commit: bd28c3b0ba317c3f07329c083d7097c12beee5b8 Parents: 1dcdf69 Author: Christopher L. Shannon Authored: Tue Jul 7 18:54:06 2015 -0400 Committer: Christopher L. Shannon Committed: Tue Jul 7 18:54:06 2015 -0400 -- .../activemq/store/memory/MemoryMessageStore.java | 12 ++-- 1 file changed, 10 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/bd28c3b0/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java -- diff --git a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java index e71dab8..51006c2 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java +++ b/activemq-broker/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java @@ -53,6 +53,7 @@ public class MemoryMessageStore extends AbstractMessageStore { this.messageTable = Collections.synchronizedMap(messageTable); } +@Override public synchronized void addMessage(ConnectionContext context, Message message) throws IOException { synchronized (messageTable) { messageTable.put(message.getMessageId(), message); @@ -74,6 +75,7 @@ public class MemoryMessageStore extends AbstractMessageStore { // } // } +@Override public Message getMessage(MessageId identity) throws IOException { return messageTable.get(identity); } @@ -82,6 +84,7 @@ public class MemoryMessageStore extends AbstractMessageStore { // return (String)messageTable.get(identity); // } +@Override public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException { removeMessage(ack.getLastMessageId()); } @@ -91,15 +94,16 @@ public class MemoryMessageStore extends AbstractMessageStore { Message removed = messageTable.remove(msgId); if( removed !=null ) { removed.decrementReferenceCount(); +getMessageStoreStatistics().getMessageCount().decrement(); + getMessageStoreStatistics().getMessageSize().addSize(-removed.getSize()); } if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) { lastBatchId = null; } -getMessageStoreStatistics().getMessageCount().decrement(); - getMessageStoreStatistics().getMessageSize().addSize(-removed.getSize()); } } +@Override public void recover(MessageRecoveryListener listener) throws Exception { // the message table is a synchronizedMap - so just have to synchronize // here @@ -115,6 +119,7 @@ public class MemoryMessageStore extends AbstractMessageStore { } } +@Override public void removeAllMessages(ConnectionContext context) throws IOException { synchronized (messageTable) { messageTable.clear(); @@ -129,6 +134,7 @@ public class MemoryMessageStore extends AbstractMessageStore { } } +@Override public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception { synchronized (messageTable) { boolean pastLackBatch = lastBatchId == null; @@ -151,6 +157,7 @@ public class MemoryMessageStore extends AbstractMessageStore { } } +@Override public void resetBatching() { lastBatchId = null; } @@ -160,6 +167,7 @@ public class MemoryMessageStore extends AbstractMessageStore { lastBatchId = messageId; } +@Override public void updateMessage(Message message) { synchronized (messageTable) { Message original = messageTable.get(message.getMessageId());
[1/2] activemq git commit: https://issues.apache.org/jira/browse/AMQ-5748
Repository: activemq Updated Branches: refs/heads/master e1c707e81 -> c853bcf43 https://issues.apache.org/jira/browse/AMQ-5748 Fixing an issue that prevented old versions of KahaDB from being upgraded to the newest version 6 Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8871b0e4 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8871b0e4 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8871b0e4 Branch: refs/heads/master Commit: 8871b0e496bae1396fb7323b01d7f715b4a211f7 Parents: e1c707e Author: Christopher L. Shannon (cshannon) Authored: Wed Jul 8 15:50:16 2015 + Committer: Christopher L. Shannon (cshannon) Committed: Wed Jul 8 16:10:11 2015 + -- .../activemq/store/kahadb/MessageDatabase.java | 52 --- .../store/kahadb/KahaDBVersionTest.java | 8 ++- .../store/kahadb/KahaDBVersion5/db-1.log| Bin 0 -> 1048576 bytes .../store/kahadb/KahaDBVersion5/db.data | Bin 0 -> 667648 bytes .../store/kahadb/KahaDBVersion5/db.redo | Bin 0 -> 668944 bytes 5 files changed, 40 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/activemq/blob/8871b0e4/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java -- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index e35619e..2fa4bb1 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1930,7 +1930,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe protected class StoredDestinationMarshaller extends VariableMarshaller { - final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller(); +final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller(); @Override public StoredDestination readPayload(final DataInput dataIn) throws IOException { @@ -2128,6 +2128,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe rc.messageIdIndex.setValueMarshaller(LongMarshaller.INSTANCE); rc.messageIdIndex.load(tx); +//go through an upgrade old index if older than version 6 +if (metadata.version < 6) { +for (Iterator> iterator = rc.locationIndex.iterator(tx); iterator.hasNext(); ) { +Entry entry = iterator.next(); +// modify so it is upgraded +rc.locationIndex.put(tx, entry.getKey(), entry.getValue()); +} +} + // If it was a topic... if (topic) { @@ -2275,24 +2284,24 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe protected final Map storeCache = new ConcurrentHashMap(); - /** -* Locate the storeMessageSize counter for this KahaDestination -* @param kahaDestination -* @return -*/ - protected MessageStoreStatistics getStoreStats(String kahaDestKey) { - MessageStoreStatistics storeStats = null; - try { - MessageStore messageStore = storeCache.get(kahaDestKey); - if (messageStore != null) { - storeStats = messageStore.getMessageStoreStatistics(); - } - } catch (Exception e1) { -LOG.error("Getting size counter of destination failed", e1); - } - - return storeStats; - } +/** + * Locate the storeMessageSize counter for this KahaDestination + * @param kahaDestination + * @return + */ +protected MessageStoreStatistics getStoreStats(String kahaDestKey) { +MessageStoreStatistics storeStats = null; +try { +MessageStore messageStore = storeCache.get(kahaDestKey); +if (messageStore != null) { +storeStats = messageStore.getMessageStoreStatistics(); +} +} catch (Exception e1) { + LOG.error("Getting size counter of destination failed", e1); +} + +return storeStats; +} /** * Determine whether this Destination matches the DestinationType @@ -2319,6 +2328,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } +@Override public Location readPayload(DataInput dataIn) throws IOException { Location rc = new Location(); r