Repository: activemq Updated Branches: refs/heads/master 82dec402a -> 81062fde8
AMQ-7082 - Make sure that the recovery will only mark pages as free if they were created in a previous execution Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0d343389 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0d343389 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0d343389 Branch: refs/heads/master Commit: 0d34338919edee863bd71693ee30999d9d9d6ce9 Parents: 82dec40 Author: Alan Protasio <alanp...@gmail.com> Authored: Tue Nov 6 01:13:18 2018 -0800 Committer: Alan Protasio <alanp...@gmail.com> Committed: Tue Nov 6 04:48:22 2018 -0800 ---------------------------------------------------------------------- .../store/kahadb/disk/page/PageFile.java | 16 ++- .../store/kahadb/disk/page/PageFileTest.java | 108 +++++++++++++++++-- 2 files changed, 113 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/0d343389/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java index fe79a2d..7456dfa 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/page/PageFile.java @@ -425,19 +425,19 @@ public class PageFile { getFreeFile().delete(); startWriter(); if (needsFreePageRecovery) { - asyncFreePageRecovery(); + asyncFreePageRecovery(nextFreePageId.get()); } } else { throw new IllegalStateException("Cannot load the page file when it is already loaded."); } } - private void asyncFreePageRecovery() { + private void asyncFreePageRecovery(final long lastRecoveryPage) { Thread thread = new Thread("KahaDB Index Free Page Recovery") { @Override public void run() { try { - recoverFreePages(); + recoverFreePages(lastRecoveryPage); } catch (Throwable e) { if (loaded.get()) { LOG.warn("Error recovering index free page list", e); @@ -450,7 +450,7 @@ public class PageFile { thread.start(); } - private void recoverFreePages() throws Exception { + private void recoverFreePages(final long lastRecoveryPage) throws Exception { LOG.info(toString() + ". Recovering pageFile free list due to prior unclean shutdown.."); SequenceSet newFreePages = new SequenceSet(); // need new pageFile instance to get unshared readFile @@ -459,6 +459,11 @@ public class PageFile { try { for (Iterator<Page> i = new Transaction(recoveryPageFile).iterator(true); i.hasNext(); ) { Page page = i.next(); + + if (page.getPageId() >= lastRecoveryPage) { + break; + } + if (page.getType() == Page.PAGE_FREE_TYPE) { newFreePages.add(page.getPageId()); } @@ -817,6 +822,9 @@ public class PageFile { return toOffset(nextFreePageId.get()); } + public boolean isFreePage(long pageId) { + return freeList.contains(pageId); + } /** * @return the number of pages allocated in the PageFile */ http://git-wip-us.apache.org/repos/asf/activemq/blob/0d343389/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java ---------------------------------------------------------------------- diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java index 3a5cefd..db1ecf3 100644 --- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/page/PageFileTest.java @@ -16,6 +16,12 @@ */ package org.apache.activemq.store.kahadb.disk.page; +import junit.framework.TestCase; +import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; @@ -23,13 +29,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.HashSet; - -import org.apache.activemq.store.kahadb.disk.util.StringMarshaller; - -import junit.framework.TestCase; -import org.apache.activemq.util.Wait; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.util.LinkedList; +import java.util.List; @SuppressWarnings("rawtypes") public class PageFileTest extends TestCase { @@ -261,4 +262,97 @@ public class PageFileTest extends TestCase { assertEquals(pf.getFreePageCount(), 10); } + + public void testBackgroundRecoveryIsThreadSafe() throws Exception { + + PageFile pf = new PageFile(new File("target/test-data"), getName()); + pf.delete(); + pf.setEnableRecoveryFile(false); + pf.load(); + + Transaction tx = pf.tx(); + tx.allocate(100000); + tx.commit(); + LOG.info("Number of free pages:" + pf.getFreePageCount()); + pf.flush(); + + //Simulate an unclean shutdown + final PageFile pf2 = new PageFile(new File("target/test-data"), getName()); + pf2.setEnableRecoveryFile(false); + pf2.load(); + + Transaction tx2 = pf2.tx(); + tx2.allocate(100000); + tx2.commit(); + LOG.info("Number of free pages:" + pf2.getFreePageCount()); + + List<Transaction> transactions = new LinkedList<>(); + + Thread.sleep(500); + LOG.info("Creating Transactions"); + for (int i = 0; i < 20; i++) { + Transaction txConcurrent = pf2.tx(); + Page page = txConcurrent.allocate(); + String t = "page:" + i; + page.set(t); + txConcurrent.store(page, StringMarshaller.INSTANCE, false); + txConcurrent.commit(); + transactions.add(txConcurrent); + Thread.sleep(50); + } + + assertTrue("We have 199980 free pages", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + pf2.flush(); + long freePages = pf2.getFreePageCount(); + LOG.info("free page count: " + freePages); + return freePages == 199980; + } + }, 12000000)); + + for (Transaction txConcurrent2: transactions) { + for (Page page : txConcurrent2) { + assertFalse(pf2.isFreePage(page.pageId)); + } + } + + } + + public void testBackgroundWillNotMarkEaslyPagesAsFree() throws Exception { + + PageFile pf = new PageFile(new File("target/test-data"), getName()); + pf.delete(); + pf.setEnableRecoveryFile(false); + pf.load(); + + Transaction tx = pf.tx(); + tx.allocate(100000); + tx.commit(); + LOG.info("Number of free pages:" + pf.getFreePageCount()); + pf.flush(); + + //Simulate an unclean shutdown + final PageFile pf2 = new PageFile(new File("target/test-data"), getName()); + pf2.setEnableRecoveryFile(false); + pf2.load(); + + Transaction tx2 = pf2.tx(); + tx2.allocate(200); + tx2.commit(); + LOG.info("Number of free pages:" + pf2.getFreePageCount()); + + Transaction tx3 = pf2.tx(); + tx3.allocate(100); + + assertTrue("We have 10 free pages", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + pf2.flush(); + long freePages = pf2.getFreePageCount(); + LOG.info("free page count: " + freePages); + return freePages == 100100; + } + }, 12000000)); + } }