This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit 51adea54526e870b3967c8e49d78aa294b280eed
Author: Hang Chen <[email protected]>
AuthorDate: Mon Jun 19 19:40:50 2023 +0800

    Support skip invalid journal record in replying journal stage (#3956)
    
    Co-authored-by: zhiyuanlei <[email protected]>
    (cherry picked from commit 5e9fdc2f81a7add645bba43f90ef630dffc1993f)
---
 .../java/org/apache/bookkeeper/bookie/Bookie.java  |   2 +-
 .../java/org/apache/bookkeeper/bookie/Journal.java |  16 ++-
 .../bookkeeper/conf/ServerConfiguration.java       |  21 +++-
 .../cli/commands/bookie/ReadJournalCommand.java    |   2 +-
 .../bookkeeper/bookie/BookieJournalTest.java       | 117 ++++++++++++++++++++-
 5 files changed, 149 insertions(+), 9 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
index 1a29ff1811..0e7e62d04d 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java
@@ -994,7 +994,7 @@ public class Bookie extends BookieCriticalThread {
                 logPosition = markedLog.getLogFileOffset();
             }
             LOG.info("Replaying journal {} from position {}", id, logPosition);
-            long scanOffset = journal.scanJournal(id, logPosition, scanner);
+            long scanOffset = journal.scanJournal(id, logPosition, scanner, 
conf.isSkipReplayJournalInvalidRecord());
             // Update LastLogMark after completely replaying journal
             // scanOffset will point to EOF position
             // After LedgerStorage flush, SyncThread should persist this to 
disk
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
index b01cc3115e..ba09dc1b2e 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java
@@ -777,13 +777,14 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
     /**
      * Scan the journal.
      *
-     * @param journalId Journal Log Id
-     * @param journalPos Offset to start scanning
-     * @param scanner Scanner to handle entries
+     * @param journalId         Journal Log Id
+     * @param journalPos        Offset to start scanning
+     * @param scanner           Scanner to handle entries
+     * @param skipInvalidRecord when invalid record,should we skip it or not
      * @return scanOffset - represents the byte till which journal was read
      * @throws IOException
      */
-    public long scanJournal(long journalId, long journalPos, JournalScanner 
scanner)
+    public long scanJournal(long journalId, long journalPos, JournalScanner 
scanner, boolean skipInvalidRecord)
         throws IOException {
         JournalChannel recLog;
         if (journalPos <= 0) {
@@ -846,6 +847,13 @@ public class Journal extends BookieCriticalThread 
implements CheckpointSource {
                 }
             }
             return recLog.fc.position();
+        } catch (IOException e) {
+            if (skipInvalidRecord) {
+                LOG.warn("Failed to parse journal file, and skipInvalidRecord 
is true, skip this journal file reply");
+            } else {
+                throw e;
+            }
+            return recLog.fc.position();
         } finally {
             recLog.close();
         }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
index 367132bc47..18c7e32112 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java
@@ -309,6 +309,8 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
     protected static final String AUTHORIZED_ROLES = "authorizedRoles";
     protected static final String ROCKSDB_DELETE_ENTRIES_BATCH_SIZE = 
"rocksDBDeleteEntriesBatchSize";
 
+    protected static final String SKIP_REPLAY_JOURNAL_INVALID_RECORD = 
"skipReplayJournalInvalidRecord";
+
     /**
      * Construct a default configuration object.
      */
@@ -3617,7 +3619,24 @@ public class ServerConfiguration extends 
AbstractConfiguration<ServerConfigurati
     }
 
     /**
-     * Get entry log location index delete entries batch size from RocksDB.
+     * When this config is set to true,if we replay journal failed, we will 
skip.
+     * @param skipReplayJournalInvalidRecord
+     * @return
+     */
+    public ServerConfiguration setSkipReplayJournalInvalidRecord(boolean 
skipReplayJournalInvalidRecord) {
+        this.setProperty(SKIP_REPLAY_JOURNAL_INVALID_RECORD,
+                Boolean.toString(skipReplayJournalInvalidRecord));
+        return this;
+    }
+
+    /**
+     * @see #isSkipReplayJournalInvalidRecord .
+     */
+    public boolean isSkipReplayJournalInvalidRecord() {
+        return this.getBoolean(SKIP_REPLAY_JOURNAL_INVALID_RECORD, false);
+    }
+
+    /**
      *
      * @return Int rocksDB delete entries batch size configured in Service 
configuration.
      */
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadJournalCommand.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadJournalCommand.java
index b2c63f4991..d38f236c55 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadJournalCommand.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadJournalCommand.java
@@ -211,6 +211,6 @@ public class ReadJournalCommand extends 
BookieCommand<ReadJournalCommand.ReadJou
     }
 
     private void scanJournal(Journal journal, long journalId, 
Journal.JournalScanner scanner) throws IOException {
-        journal.scanJournal(journalId, 0L, scanner);
+        journal.scanJournal(journalId, 0L, scanner, false);
     }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
index 04b2138031..4043cb5b62 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java
@@ -297,6 +297,52 @@ public class BookieJournalTest {
         return jc;
     }
 
+    private JournalChannel writeV4JournalWithInvalidRecord(File journalDir,
+                                                           int numEntries, 
byte[] masterKey) throws Exception {
+        long logId = System.currentTimeMillis();
+        JournalChannel jc = new JournalChannel(journalDir, logId);
+
+        moveToPosition(jc, JournalChannel.VERSION_HEADER_SIZE);
+
+        BufferedChannel bc = jc.getBufferedChannel();
+
+        byte[] data = new byte[1024];
+        Arrays.fill(data, (byte) 'X');
+        long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
+        for (int i = 0; i <= numEntries; i++) {
+            ByteBuf packet;
+            if (i == 0) {
+                packet = generateMetaEntry(1, masterKey);
+            } else {
+                packet = ClientUtil.generatePacket(1, i, lastConfirmed, i * 
data.length, data);
+            }
+            lastConfirmed = i;
+            ByteBuffer lenBuff = ByteBuffer.allocate(4);
+            if (i == numEntries - 1) {
+                //mock when flush data to file ,it writes an invalid entry to 
journal
+                lenBuff.putInt(-1);
+            } else {
+                lenBuff.putInt(packet.readableBytes());
+            }
+            lenBuff.flip();
+            bc.write(Unpooled.wrappedBuffer(lenBuff));
+            bc.write(packet);
+            packet.release();
+        }
+
+        // write fence key
+        ByteBuf packet = generateFenceEntry(1);
+        ByteBuf lenBuf = Unpooled.buffer();
+        lenBuf.writeInt(packet.readableBytes());
+        //mock
+        bc.write(lenBuf);
+        bc.write(packet);
+        bc.flushAndForceWrite(false);
+        updateJournalVersion(jc, JournalChannel.V4);
+
+        return jc;
+    }
+
     static JournalChannel writeV5Journal(File journalDir, int numEntries,
                                          byte[] masterKey) throws Exception {
         return writeV5Journal(journalDir, numEntries, masterKey, false);
@@ -838,7 +884,7 @@ public class BookieJournalTest {
             assertEquals(journalIds.size(), 1);
 
             try {
-                journal.scanJournal(journalIds.get(0), Long.MAX_VALUE, 
journalScanner);
+                journal.scanJournal(journalIds.get(0), Long.MAX_VALUE, 
journalScanner, false);
                 fail("Should not have been able to scan the journal");
             } catch (Exception e) {
                 // Expected
@@ -848,7 +894,74 @@ public class BookieJournalTest {
         b.shutdown();
     }
 
-    private class DummyJournalScan implements Journal.JournalScanner {
+    /**
+     * Test for invalid record data during read of Journal.
+     */
+    @Test
+    public void testJournalScanInvalidRecordWithSkipFlag() throws Exception {
+        File journalDir = createTempDir("bookie", "journal");
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
+
+        File ledgerDir = createTempDir("bookie", "ledger");
+        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(ledgerDir));
+
+        try {
+            
writeV4JournalWithInvalidRecord(Bookie.getCurrentDirectory(journalDir),
+                100, "testPasswd".getBytes());
+        } catch (Exception e) {
+            fail();
+        }
+
+
+        ServerConfiguration conf = 
TestBKConfiguration.newServerConfiguration();
+        // Disabled skip broken journal files by default
+        conf.setJournalDirName(journalDir.getPath())
+                .setLedgerDirNames(new String[] { ledgerDir.getPath() })
+                .setMetadataServiceUri(null)
+                .setSkipReplayJournalInvalidRecord(true);
+
+        Journal.JournalScanner journalScanner = new DummyJournalScan();
+
+        Bookie b = new Bookie(conf);
+
+        for (Journal journal : b.journals) {
+            List<Long> journalIds = 
Journal.listJournalIds(journal.getJournalDirectory(), null);
+            assertEquals(journalIds.size(), 1);
+            try {
+                journal.scanJournal(journalIds.get(0), 0, journalScanner, 
conf.isSkipReplayJournalInvalidRecord());
+            } catch (Exception e) {
+                fail("Should pass the journal scanning because we enabled skip 
flag by default.");
+            }
+        }
+
+        b.shutdown();
+
+        // Disabled skip broken journal files by default
+        conf = TestBKConfiguration.newServerConfiguration();
+        conf.setJournalDirName(journalDir.getPath())
+                .setLedgerDirNames(new String[] { ledgerDir.getPath() })
+                .setMetadataServiceUri(null);
+
+        journalScanner = new DummyJournalScan();
+
+        b = new Bookie(conf);
+
+        for (Journal journal : b.journals) {
+            List<Long> journalIds = 
Journal.listJournalIds(journal.getJournalDirectory(), null);
+            assertEquals(journalIds.size(), 1);
+            try {
+                journal.scanJournal(journalIds.get(0), 0, journalScanner, 
conf.isSkipReplayJournalInvalidRecord());
+                fail("Should fail the journal scanning because of disabled 
skip flag");
+            } catch (Exception e) {
+                // expected.
+            }
+        }
+
+        b.shutdown();
+    }
+
+
+    static class DummyJournalScan implements Journal.JournalScanner {
 
         @Override
         public void process(int journalVersion, long offset, ByteBuffer entry) 
throws IOException {

Reply via email to