Repository: activemq Updated Branches: refs/heads/master 3dd86d04e -> ba77b9f55
https://issues.apache.org/jira/browse/AMQ-6277 - tidy up logic that determines recovery location so that we don't recovery from the end of the journal in error on normal restart. This avoids suprious recovery logging Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ba77b9f5 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ba77b9f5 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ba77b9f5 Branch: refs/heads/master Commit: ba77b9f55a627846ecab63916b2667f234022d34 Parents: 3dd86d0 Author: gtully <gary.tu...@gmail.com> Authored: Tue May 3 12:47:24 2016 +0100 Committer: gtully <gary.tu...@gmail.com> Committed: Tue May 3 12:47:49 2016 +0100 ---------------------------------------------------------------------- .../activemq/store/kahadb/MessageDatabase.java | 23 +++++++----- .../activemq/store/kahadb/KahaDBTest.java | 37 ++++++++++++++++++++ 2 files changed, 51 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/ba77b9f5/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 3e754f7..4a23cbc 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -629,8 +629,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe Location ackMessageFileLocation = recoverAckMessageFileMap(); Location lastIndoubtPosition = getRecoveryPosition(); - Location recoveryPosition = minimum(producerAuditPosition, ackMessageFileLocation); - recoveryPosition = minimum(recoveryPosition, lastIndoubtPosition); + Location recoveryPosition = startOfRecovery(producerAuditPosition, ackMessageFileLocation); + recoveryPosition = startOfRecovery(recoveryPosition, lastIndoubtPosition); if (recoveryPosition != null) { int redoCounter = 0; @@ -711,16 +711,21 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return TransactionIdConversion.convertToLocal(tx); } - private Location minimum(Location producerAuditPosition, - Location lastIndoubtPosition) { + private Location startOfRecovery(Location x, + Location y) { Location min = null; - if (producerAuditPosition != null) { - min = producerAuditPosition; - if (lastIndoubtPosition != null && lastIndoubtPosition.compareTo(producerAuditPosition) < 0) { - min = lastIndoubtPosition; + if (x != null) { + min = x; + if (y != null) { + int compare = y.compareTo(x); + if (compare < 0) { + min = y; + } else if (compare == 0) { + min = null; // no recovery needed on a matched location + } } } else { - min = lastIndoubtPosition; + min = y; } return min; } http://git-wip-us.apache.org/repos/asf/activemq/blob/ba77b9f5/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java index 78f4c60..bd81524 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.store.kahadb; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; import javax.jms.JMSException; @@ -31,6 +32,10 @@ import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.util.DefaultTestAppender; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; /** * @author chirino @@ -193,6 +198,38 @@ public class KahaDBTest extends TestCase { broker.stop(); } + public void testNoReplayOnStopStart() throws Exception { + KahaDBStore kaha = createStore(true); + BrokerService broker = createBroker(kaha); + sendMessages(100); + broker.stop(); + broker.waitUntilStopped(); + + kaha = createStore(false); + kaha.setCheckForCorruptJournalFiles(true); + + final AtomicBoolean didSomeRecovery = new AtomicBoolean(false); + DefaultTestAppender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + if (event.getLevel() == Level.INFO && event.getRenderedMessage().contains("Recovering from the journal @")) { + didSomeRecovery.set(true); + } + } + }; + + Logger.getRootLogger().addAppender(appender); + + broker = createBroker(kaha); + + int count = receiveMessages(); + assertEquals("Expected to received all messages.", count, 100); + broker.stop(); + + Logger.getRootLogger().addAppender(appender); + assertFalse("Did not replay any records from the journal", didSomeRecovery.get()); + } + private void assertExistsAndDelete(File file) { assertTrue(file.exists()); file.delete();