Repository: activemq Updated Branches: refs/heads/master 01f56d0ca -> 4a821186a
https://issues.apache.org/jira/browse/AMQ-5578 - preallocation could ocurr after a restart over an existing journal file! - fix and test Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4a821186 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4a821186 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4a821186 Branch: refs/heads/master Commit: 4a821186a4c8e7296637438fee932365d73b936e Parents: 01f56d0 Author: gtully <[email protected]> Authored: Mon Apr 20 16:01:50 2015 +0100 Committer: gtully <[email protected]> Committed: Mon Apr 20 16:04:01 2015 +0100 ---------------------------------------------------------------------- .../activemq/store/kahadb/MessageDatabase.java | 1 + .../kahadb/disk/journal/DataFileAppender.java | 7 +++++-- .../JournalCorruptionEofIndexRecoveryTest.java | 17 +++++++++++++++++ .../disk/journal/PreallocationJournalTest.java | 8 ++++++-- 4 files changed, 29 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/4a821186/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 32b7170..41c9aba 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -610,6 +610,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe redoCounter++; } catch (IOException failedRecovery) { if (isIgnoreMissingJournalfiles()) { + LOG.debug("Failed to recover data at position:" + recoveryPosition, failedRecovery); // track this dud location journal.corruptRecoveryLocation(recoveryPosition); } else { http://git-wip-us.apache.org/repos/asf/activemq/blob/4a821186/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java index fbb276a..0ce647a 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java @@ -314,8 +314,11 @@ class DataFileAppender implements FileAppender { } dataFile = wb.dataFile; file = dataFile.openRandomAccessFile(); - // pre allocate on first open - journal.preallocateEntireJournalDataFile(file); + // pre allocate on first open of new file (length==0) + // note dataFile.length cannot be used because it is updated in enqueue + if (file.length() == 0l) { + journal.preallocateEntireJournalDataFile(file); + } } Journal.WriteCommand write = wb.writes.getHead(); http://git-wip-us.apache.org/repos/asf/activemq/blob/4a821186/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java index 948b543..bb56e7d 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java @@ -116,6 +116,9 @@ public class JournalCorruptionEofIndexRecoveryTest { adapter.setCheckForCorruptJournalFiles(true); adapter.setIgnoreMissingJournalfiles(true); + adapter.setPreallocationStrategy("zeros"); + adapter.setPreallocationScope("entire_journal"); + } @After @@ -186,6 +189,20 @@ public class JournalCorruptionEofIndexRecoveryTest { } + @Test + public void testRecoverIndex() throws Exception { + startBroker(); + + final int numToSend = 4; + produceMessagesToConsumeMultipleDataFiles(numToSend); + + // force journal replay by whacking the index + restartBroker(false, true); + + assertEquals("Drain", numToSend, drainQueue(numToSend)); + + } + private void corruptBatchCheckSumSplash(int id) throws Exception{ Collection<DataFile> files = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values(); http://git-wip-us.apache.org/repos/asf/activemq/blob/4a821186/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java index 9c21a56..d4095f3 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java @@ -19,6 +19,8 @@ package org.apache.activemq.store.kahadb.disk.journal; import org.apache.activemq.store.kahadb.KahaDBStore; import org.apache.activemq.util.Wait; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.FileInputStream; @@ -36,6 +38,8 @@ import static org.junit.Assert.assertTrue; */ public class PreallocationJournalTest { + private static final Logger LOG = LoggerFactory.getLogger(PreallocationJournalTest.class); + @Test public void testSparseFilePreallocation() throws Exception { executeTest("sparse_file"); @@ -76,6 +80,7 @@ public class PreallocationJournalTest { assertTrue("file size as expected", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { + LOG.info ("file size:" + journalLog + ", chan.size " + channel.size() + ", jfileSize.length: " + journalLog.length()); return Journal.DEFAULT_MAX_FILE_LENGTH == channel.size(); } })); @@ -87,8 +92,7 @@ public class PreallocationJournalTest { buff.position(0); assertEquals(0x00, buff.get()); - System.out.println("File size: " + channel.size()); - + LOG.info("File size: " + channel.size()); store.stop(); }
