This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 60d56e6d06dc9ef5645b1b2b6a6a1aa5d1fb85e7
Author: Hang Chen <[email protected]>
AuthorDate: Sun Jun 25 23:58:57 2023 +0800

    Fix data lost when configured multiple ledger directories (#3329)
    
    (cherry picked from commit 8a76703ee44b1f5af9eaedd68a53368dbf5855f0)
---
 .../java/org/apache/bookkeeper/bookie/Bookie.java  |   5 +
 .../java/org/apache/bookkeeper/bookie/Journal.java |   2 +-
 .../ldb/SingleDirectoryDbLedgerStorage.java        |   7 +-
 .../bookie/storage/ldb/DbLedgerStorageTest.java    | 183 +++++++++++++++++++++
 .../TestCompatUpgradeWithHostnameBookieId.groovy   |   2 +-
 5 files changed, 196 insertions(+), 3 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 0e7e62d04d..f691f320ee 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -1714,4 +1714,9 @@ public class Bookie extends BookieCriticalThread {
             }
         }
     }
+
+    @VisibleForTesting
+    public List<Journal> getJournals() {
+        return this.journals;
+    }
 }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index ba09dc1b2e..340d67bd33 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -229,7 +229,7 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
          * The last mark should first be max journal log id,
          * and then max log position in max journal log.
          */
-        void readLog() {
+        public void readLog() {
             byte[] buff = new byte[16];
             ByteBuffer bb = ByteBuffer.wrap(buff);
             LogMark mark = new LogMark();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
index 23013e0139..db05913790 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java
@@ -137,6 +137,8 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
 
     private final long maxReadAheadBytesSize;
 
+    private final boolean singleLedgerDirs;
+
     public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, 
LedgerManager ledgerManager,
             LedgerDirsManager ledgerDirsManager, LedgerDirsManager 
indexDirsManager, StateManager stateManager,
             CheckpointSource checkpointSource, Checkpointer checkpointer, 
StatsLogger statsLogger,
@@ -152,6 +154,7 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
         this.writeCacheMaxSize = writeCacheSize;
         this.writeCache = new WriteCache(allocator, writeCacheMaxSize / 2);
         this.writeCacheBeingFlushed = new WriteCache(allocator, 
writeCacheMaxSize / 2);
+        this.singleLedgerDirs = conf.getLedgerDirs().length == 1;
 
         this.checkpointSource = checkpointSource;
 
@@ -717,7 +720,9 @@ public class SingleDirectoryDbLedgerStorage implements 
CompactableLedgerStorage
     public void flush() throws IOException {
         Checkpoint cp = checkpointSource.newCheckpoint();
         checkpoint(cp);
-        checkpointSource.checkpointComplete(cp, true);
+        if (singleLedgerDirs) {
+            checkpointSource.checkpointComplete(cp, true);
+        }
     }
 
     @Override
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
index a39d380e6d..cb667c57df 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java
@@ -31,15 +31,20 @@ import io.netty.buffer.ByteBufUtil;
 import io.netty.buffer.Unpooled;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.bookkeeper.bookie.Bookie;
 import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
 import org.apache.bookkeeper.bookie.BookieException;
+import org.apache.bookkeeper.bookie.CheckpointSource;
+import org.apache.bookkeeper.bookie.CheckpointSourceList;
 import org.apache.bookkeeper.bookie.EntryLocation;
 import org.apache.bookkeeper.bookie.EntryLogger;
 import org.apache.bookkeeper.bookie.LedgerDirsManager;
+import org.apache.bookkeeper.bookie.LogMark;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.conf.TestBKConfiguration;
 import org.apache.bookkeeper.proto.BookieProtocol;
@@ -445,4 +450,182 @@ public class DbLedgerStorageTest {
         // and another is EntryLogManagerForEntryLogPerLedger
         assertEquals(2, ledgerDirsManager.getListeners().size());
     }
+
+    @Test
+    public void testMultiLedgerDirectoryCheckpoint() throws Exception {
+        int gcWaitTime = 1000;
+        File firstDir = new File(tmpDir, "dir1");
+        File secondDir = new File(tmpDir, "dir2");
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
+        conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
+        conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+        conf.setLedgerDirNames(new String[] { firstDir.getCanonicalPath(), 
secondDir.getCanonicalPath() });
+
+        Bookie bookie = new Bookie(conf);
+        ByteBuf entry1 = Unpooled.buffer(1024);
+        entry1.writeLong(1); // ledger id
+        entry1.writeLong(2); // entry id
+        entry1.writeBytes("entry-1".getBytes());
+
+        bookie.getLedgerStorage().addEntry(entry1);
+        // write one entry to first ledger directory and flush with logMark(1, 
2),
+        // only the first ledger directory should have lastMark
+        
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(1, 2);
+        ((DbLedgerStorage) 
bookie.getLedgerStorage()).getLedgerStorageList().get(0).flush();
+
+        File firstDirMark = new File(firstDir + "/current", "lastMark");
+        File secondDirMark = new File(secondDir + "/current", "lastMark");
+
+        // LedgerStorage flush won't trigger lastMark update due to two ledger 
directories configured
+        try {
+            readLogMark(firstDirMark);
+            readLogMark(secondDirMark);
+            fail();
+        } catch (Exception e) {
+            //
+        }
+
+        // write the second entry to second leger directory and flush with 
log(4, 5),
+        // the fist ledger directory's lastMark is (1, 2) and the second 
ledger directory's lastMark is (4, 5);
+        ByteBuf entry2 = Unpooled.buffer(1024);
+        entry2.writeLong(2); // ledger id
+        entry2.writeLong(1); // entry id
+        entry2.writeBytes("entry-2".getBytes());
+
+        bookie.getLedgerStorage().addEntry(entry2);
+        // write one entry to first ledger directory and flush with logMark(1, 
2),
+        // only the first ledger directory should have lastMark
+        
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(4, 5);
+        ((DbLedgerStorage) 
bookie.getLedgerStorage()).getLedgerStorageList().get(1).flush();
+
+        // LedgerStorage flush won't trigger lastMark update due to two ledger 
directories configured
+        try {
+            readLogMark(firstDirMark);
+            readLogMark(secondDirMark);
+            fail();
+        } catch (Exception e) {
+            //
+        }
+
+        // The dbLedgerStorage flush also won't trigger lastMark update due to 
two ledger directories configured.
+        bookie.getLedgerStorage().flush();
+        try {
+            readLogMark(firstDirMark);
+            readLogMark(secondDirMark);
+            fail();
+        } catch (Exception e) {
+            //
+        }
+
+        // trigger checkpoint simulate SyncThread do checkpoint.
+        CheckpointSource checkpointSource = new 
CheckpointSourceList(bookie.getJournals());
+        
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(7, 8);
+        CheckpointSource.Checkpoint checkpoint = 
checkpointSource.newCheckpoint();
+        checkpointSource.checkpointComplete(checkpoint, false);
+
+        try {
+            LogMark firstLogMark = readLogMark(firstDirMark);
+            LogMark secondLogMark = readLogMark(secondDirMark);
+            assertEquals(7, firstLogMark.getLogFileId());
+            assertEquals(8, firstLogMark.getLogFileOffset());
+            assertEquals(7, secondLogMark.getLogFileId());
+            assertEquals(8, secondLogMark.getLogFileOffset());
+        } catch (Exception e) {
+            fail();
+        }
+
+        // test replay journal lastMark, to make sure we get the right 
LastMark position
+        bookie.getJournals().get(0).getLastLogMark().readLog();
+        LogMark logMark = 
bookie.getJournals().get(0).getLastLogMark().getCurMark();
+        assertEquals(7, logMark.getLogFileId());
+        assertEquals(8, logMark.getLogFileOffset());
+    }
+
+    private LogMark readLogMark(File file) throws IOException {
+        byte[] buff = new byte[16];
+        ByteBuffer bb = ByteBuffer.wrap(buff);
+        LogMark mark = new LogMark();
+        try (FileInputStream fis = new FileInputStream(file)) {
+            int bytesRead = fis.read(buff);
+            if (bytesRead != 16) {
+                throw new IOException("Couldn't read enough bytes from 
lastMark."
+                    + " Wanted " + 16 + ", got " + bytesRead);
+            }
+        }
+        bb.clear();
+        mark.readLogMark(bb);
+
+        return mark;
+    }
+
+    @Test
+    public void testSingleLedgerDirectoryCheckpoint() throws Exception {
+        int gcWaitTime = 1000;
+        File ledgerDir = new File(tmpDir, "dir");
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        conf.setGcWaitTime(gcWaitTime);
+        conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
+        conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
+        conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
+        conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() });
+
+        Bookie bookie = new Bookie(conf);
+        ByteBuf entry1 = Unpooled.buffer(1024);
+        entry1.writeLong(1); // ledger id
+        entry1.writeLong(2); // entry id
+        entry1.writeBytes("entry-1".getBytes());
+        bookie.getLedgerStorage().addEntry(entry1);
+
+        
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(1, 2);
+        ((DbLedgerStorage) 
bookie.getLedgerStorage()).getLedgerStorageList().get(0).flush();
+
+        File ledgerDirMark = new File(ledgerDir + "/current", "lastMark");
+        try {
+            LogMark logMark = readLogMark(ledgerDirMark);
+            assertEquals(1, logMark.getLogFileId());
+            assertEquals(2, logMark.getLogFileOffset());
+        } catch (Exception e) {
+            fail();
+        }
+
+        ByteBuf entry2 = Unpooled.buffer(1024);
+        entry2.writeLong(2); // ledger id
+        entry2.writeLong(1); // entry id
+        entry2.writeBytes("entry-2".getBytes());
+
+        bookie.getLedgerStorage().addEntry(entry2);
+        // write one entry to first ledger directory and flush with logMark(1, 
2),
+        // only the first ledger directory should have lastMark
+        
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(4, 5);
+
+        bookie.getLedgerStorage().flush();
+        try {
+            LogMark logMark = readLogMark(ledgerDirMark);
+            assertEquals(4, logMark.getLogFileId());
+            assertEquals(5, logMark.getLogFileOffset());
+        } catch (Exception e) {
+            fail();
+        }
+
+        CheckpointSource checkpointSource = new 
CheckpointSourceList(bookie.getJournals());
+        
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(7, 8);
+        CheckpointSource.Checkpoint checkpoint = 
checkpointSource.newCheckpoint();
+        checkpointSource.checkpointComplete(checkpoint, false);
+
+        try {
+            LogMark firstLogMark = readLogMark(ledgerDirMark);
+            assertEquals(7, firstLogMark.getLogFileId());
+            assertEquals(8, firstLogMark.getLogFileOffset());
+        } catch (Exception e) {
+            fail();
+        }
+
+        // test replay journal lastMark, to make sure we get the right 
LastMark position
+        bookie.getJournals().get(0).getLastLogMark().readLog();
+        LogMark logMark = 
bookie.getJournals().get(0).getLastLogMark().getCurMark();
+        assertEquals(7, logMark.getLogFileId());
+        assertEquals(8, logMark.getLogFileOffset());
+    }
 }
diff --git 
a/tests/backward-compat/hostname-bookieid/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgradeWithHostnameBookieId.groovy
 
b/tests/backward-compat/hostname-bookieid/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgradeWithHostnameBookieId.groovy
index eb6114da57..e4a6cfb614 100644
--- 
a/tests/backward-compat/hostname-bookieid/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgradeWithHostnameBookieId.groovy
+++ 
b/tests/backward-compat/hostname-bookieid/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgradeWithHostnameBookieId.groovy
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information

Reply via email to