Repository: activemq
Updated Branches:
  refs/heads/activemq-5.13.x 70e2b92f1 -> f1642b424


https://issues.apache.org/jira/browse/AMQ-6356

Fixing store size calculation on KahaUpdateMessageCommand processing so
that the size won't increase inadvertently if the existing location of
the command in the journal is the same as the new location

(cherry picked from commit a5050a8bc5b5f45852269867dd9bf46b9381912d)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f1642b42
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f1642b42
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f1642b42

Branch: refs/heads/activemq-5.13.x
Commit: f1642b4244b0bbbdeaaec6fe4808c9e54835fbf7
Parents: 70e2b92
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Thu Jul 14 09:21:23 2016 -0400
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Thu Jul 14 09:22:59 2016 -0400

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |  11 +-
 .../store/kahadb/MessageDatabaseSizeTest.java   | 147 +++++++++++++++++++
 2 files changed, 155 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f1642b42/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 75f1966..8dac54e 100644
--- 
a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ 
b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -1452,10 +1452,15 @@ public abstract class MessageDatabase extends 
ServiceSupport implements BrokerSe
             );
             sd.locationIndex.put(tx, location, id);
             incrementAndAddSizeToStoreStat(command.getDestination(), 
location.getSize());
-            // on first update previous is original location, on 
recovery/replay it may be the updated location
-            if(previousKeys != null && 
!previousKeys.location.equals(location)) {
-                sd.locationIndex.remove(tx, previousKeys.location);
+
+            if (previousKeys != null) {
+                //Remove the existing from the size
                 decrementAndSubSizeToStoreStat(command.getDestination(), 
previousKeys.location.getSize());
+
+                // on first update previous is original location, on 
recovery/replay it may be the updated location
+                if(!previousKeys.location.equals(location)) {
+                    sd.locationIndex.remove(tx, previousKeys.location);
+                }
             }
             metadata.lastUpdate = location;
         } else {

http://git-wip-us.apache.org/repos/asf/activemq/blob/f1642b42/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java
 
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java
new file mode 100644
index 0000000..357dc5f
--- /dev/null
+++ 
b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/MessageDatabaseSizeTest.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.store.MessageStore;
+import org.apache.activemq.store.kahadb.MessageDatabase.StoredDestination;
+import org.apache.activemq.store.kahadb.data.KahaDestination;
+import org.apache.activemq.store.kahadb.data.KahaUpdateMessageCommand;
+import org.apache.activemq.store.kahadb.disk.journal.Location;
+import org.apache.activemq.store.kahadb.disk.page.Transaction;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MessageDatabaseSizeTest {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MessageDatabaseSizeTest.class);
+
+    @Rule
+    public TemporaryFolder dataDir = new TemporaryFolder(new File("target"));
+    private final String payload = new String(new byte[1024]);
+
+    private BrokerService broker = null;
+    private final ActiveMQQueue destination = new ActiveMQQueue("Test");
+    private KahaDBPersistenceAdapter adapter;
+
+    protected void startBroker() throws Exception {
+        broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setPersistent(true);
+        broker.setUseJmx(true);
+        broker.setDataDirectory(dataDir.getRoot().getAbsolutePath());
+        adapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+        broker.start();
+        LOG.info("Starting broker..");
+    }
+
+    @Before
+    public void start() throws Exception {
+        startBroker();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        if (broker != null) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    /**
+     * Test that when only updating the index and not rewriting the message to 
the journal
+     * that the size doesn't change
+     *
+     * This was broken before AMQ-6356
+     */
+    @Test
+    public void testUpdateMessageSameLocation() throws Exception {
+        final KahaDBStore store = adapter.getStore();
+        MessageId messageId = new MessageId("111:222:333");
+        ActiveMQTextMessage textMessage = getMessage(new 
MessageId("111:222:333"));
+
+        //Add a single message and update once so we can compare the size 
consistently
+        MessageStore messageStore = store.createQueueMessageStore(destination);
+        messageStore.addMessage(broker.getAdminConnectionContext(), 
textMessage);
+        messageStore.updateMessage(textMessage);
+
+        Location location = findMessageLocation(messageId.toString(), 
store.convert(destination));
+        long existingSize = messageStore.getMessageSize();
+
+        //Process the update command for the index and verify the size doesn't 
change
+        KahaUpdateMessageCommand updateMessageCommand = 
(KahaUpdateMessageCommand) store.load(location);
+        store.process(updateMessageCommand, location);
+        assertEquals(existingSize, messageStore.getMessageSize());
+    }
+
+    /**
+     * Test that when updating an existing message to a different location in 
the
+     * journal that the index size doesn't change
+     */
+    @Test
+    public void testUpdateMessageDifferentLocation() throws Exception {
+        final KahaDBStore store = adapter.getStore();
+        ActiveMQTextMessage textMessage = getMessage(new 
MessageId("111:222:333"));
+
+        //Add a single message and update once so we can compare the size 
consistently
+        MessageStore messageStore = store.createQueueMessageStore(destination);
+        messageStore.addMessage(broker.getAdminConnectionContext(), 
textMessage);
+        messageStore.updateMessage(textMessage);
+
+        //Update again and make sure the size is the same
+        long existingSize = messageStore.getMessageSize();
+        messageStore.updateMessage(textMessage);
+        assertEquals(existingSize, messageStore.getMessageSize());
+    }
+
+    private ActiveMQTextMessage getMessage(final MessageId messageId) throws 
Exception {
+        ActiveMQTextMessage textMessage = new ActiveMQTextMessage();
+        textMessage.setMessageId(messageId);
+        textMessage.setText(payload);
+
+        return textMessage;
+    }
+
+    private Location findMessageLocation(final String key, final 
KahaDestination destination) throws IOException {
+        final KahaDBStore store = adapter.getStore();
+        return store.pageFile.tx().execute(new 
Transaction.CallableClosure<Location, IOException>() {
+            @Override
+            public Location execute(Transaction tx) throws IOException {
+                StoredDestination sd = store.getStoredDestination(destination, 
tx);
+                Long sequence = sd.messageIdIndex.get(tx, key);
+                if (sequence == null) {
+                    return null;
+                }
+                return sd.orderIndex.get(tx, sequence).location;
+            }
+        });
+    }
+
+}

Reply via email to