Repository: activemq Updated Branches: refs/heads/activemq-5.13.x 70e2b92f1 -> f1642b424
https://issues.apache.org/jira/browse/AMQ-6356 Fixing store size calculation on KahaUpdateMessageCommand processing so that the size won't increase inadvertently if the existing location of the command in the journal is the same as the new location (cherry picked from commit a5050a8bc5b5f45852269867dd9bf46b9381912d) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f1642b42 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f1642b42 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f1642b42 Branch: refs/heads/activemq-5.13.x Commit: f1642b4244b0bbbdeaaec6fe4808c9e54835fbf7 Parents: 70e2b92 Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Thu Jul 14 09:21:23 2016 -0400 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Thu Jul 14 09:22:59 2016 -0400 ---------------------------------------------------------------------- .../activemq/store/kahadb/MessageDatabase.java | 11 +- .../store/kahadb/MessageDatabaseSizeTest.java | 147 +++++++++++++++++++ 2 files changed, 155 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/f1642b42/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 75f1966..8dac54e 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1452,10 +1452,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe ); sd.locationIndex.put(tx, location, id); incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize()); - // on first update previous is original location, on recovery/replay it may be the updated location - if(previousKeys != null && !previousKeys.location.equals(location)) { - sd.locationIndex.remove(tx, previousKeys.location); + + if (previousKeys != null) { + //Remove the existing from the size decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize()); + + // on first update previous is original location, on recovery/replay it may be the updated location + if(!previousKeys.location.equals(location)) { + sd.locationIndex.remove(tx, previousKeys.location); + } } metadata.lastUpdate = location; } else { http://git-wip-us.apache.org/repos/asf/activemq/blob/f1642b42/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java new file mode 100644 index 0000000..357dc5f --- /dev/null +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import static org.junit.Assert.assertEquals; + +import java.io.File; +import java.io.IOException; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination; +import org.apache.activemq.store.kahadb.data.KahaDestination; +import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand; +import org.apache.activemq.store.kahadb.disk.journal.Location; +import org.apache.activemq.store.kahadb.disk.page.Transaction; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MessageDatabaseSizeTest { + + private static final Logger LOG = LoggerFactory.getLogger(MessageDatabaseSizeTest.class); + + @Rule + public TemporaryFolder dataDir = new TemporaryFolder(new File("target")); + private final String payload = new String(new byte[1024]); + + private BrokerService broker = null; + private final ActiveMQQueue destination = new ActiveMQQueue("Test"); + private KahaDBPersistenceAdapter adapter; + + protected void startBroker() throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(true); + broker.setUseJmx(true); + broker.setDataDirectory(dataDir.getRoot().getAbsolutePath()); + adapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + broker.start(); + LOG.info("Starting broker.."); + } + + @Before + public void start() throws Exception { + startBroker(); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + /** + * Test that when only updating the index and not rewriting the message to the journal + * that the size doesn't change + * + * This was broken before AMQ-6356 + */ + @Test + public void testUpdateMessageSameLocation() throws Exception { + final KahaDBStore store = adapter.getStore(); + MessageId messageId = new MessageId("111:222:333"); + ActiveMQTextMessage textMessage = getMessage(new MessageId("111:222:333")); + + //Add a single message and update once so we can compare the size consistently + MessageStore messageStore = store.createQueueMessageStore(destination); + messageStore.addMessage(broker.getAdminConnectionContext(), textMessage); + messageStore.updateMessage(textMessage); + + Location location = findMessageLocation(messageId.toString(), store.convert(destination)); + long existingSize = messageStore.getMessageSize(); + + //Process the update command for the index and verify the size doesn't change + KahaUpdateMessageCommand updateMessageCommand = (KahaUpdateMessageCommand) store.load(location); + store.process(updateMessageCommand, location); + assertEquals(existingSize, messageStore.getMessageSize()); + } + + /** + * Test that when updating an existing message to a different location in the + * journal that the index size doesn't change + */ + @Test + public void testUpdateMessageDifferentLocation() throws Exception { + final KahaDBStore store = adapter.getStore(); + ActiveMQTextMessage textMessage = getMessage(new MessageId("111:222:333")); + + //Add a single message and update once so we can compare the size consistently + MessageStore messageStore = store.createQueueMessageStore(destination); + messageStore.addMessage(broker.getAdminConnectionContext(), textMessage); + messageStore.updateMessage(textMessage); + + //Update again and make sure the size is the same + long existingSize = messageStore.getMessageSize(); + messageStore.updateMessage(textMessage); + assertEquals(existingSize, messageStore.getMessageSize()); + } + + private ActiveMQTextMessage getMessage(final MessageId messageId) throws Exception { + ActiveMQTextMessage textMessage = new ActiveMQTextMessage(); + textMessage.setMessageId(messageId); + textMessage.setText(payload); + + return textMessage; + } + + private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException { + final KahaDBStore store = adapter.getStore(); + return store.pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() { + @Override + public Location execute(Transaction tx) throws IOException { + StoredDestination sd = store.getStoredDestination(destination, tx); + Long sequence = sd.messageIdIndex.get(tx, key); + if (sequence == null) { + return null; + } + return sd.orderIndex.get(tx, sequence).location; + } + }); + } + +}