This is an automated email from the ASF dual-hosted git repository.

yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 02bf7d9c5 [server] Skip batch sequence validation during writer state 
rebuilding (#2188)
02bf7d9c5 is described below

commit 02bf7d9c5376bda05591c166ce440ad836657d38
Author: Liebing <[email protected]>
AuthorDate: Tue Dec 16 20:54:58 2025 +0800

    [server] Skip batch sequence validation during writer state rebuilding 
(#2188)
---
 .../org/apache/fluss/server/log/LogTablet.java     |   2 +-
 .../org/apache/fluss/server/log/LogLoaderTest.java | 372 +++++++++++++++++++++
 2 files changed, 373 insertions(+), 1 deletion(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java 
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
index 8a5c54cbd..a62d7c237 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
@@ -1277,7 +1277,7 @@ public final class LogTablet {
         Map<Long, WriterAppendInfo> loadedWriters = new HashMap<>();
         for (LogRecordBatch batch : records.batches()) {
             if (batch.hasWriterId()) {
-                updateWriterAppendInfo(writerStateManager, batch, 
loadedWriters, true);
+                updateWriterAppendInfo(writerStateManager, batch, 
loadedWriters, false);
             }
         }
         loadedWriters.values().forEach(writerStateManager::update);
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java
index 795b91cff..03d2ddadd 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java
@@ -27,6 +27,7 @@ import org.apache.fluss.record.LogTestBase;
 import org.apache.fluss.record.MemoryLogRecords;
 import org.apache.fluss.server.exception.CorruptIndexException;
 import org.apache.fluss.server.metrics.group.TestingMetricGroups;
+import org.apache.fluss.utils.FlussPaths;
 import org.apache.fluss.utils.clock.Clock;
 import org.apache.fluss.utils.clock.ManualClock;
 import org.apache.fluss.utils.clock.SystemClock;
@@ -40,10 +41,16 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -52,6 +59,8 @@ import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
 import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
 import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
 import static 
org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords;
+import static 
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
+import static 
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsWithWriterId;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -269,6 +278,355 @@ final class LogLoaderTest extends LogTestBase {
         
assertThat(logTablet.logSegments().size()).isEqualTo(corruptSegmentIndex + 1);
     }
 
+    @Test
+    void testWriterSnapshotRecoveryFromDiscontinuousBatchSequence() throws 
Exception {
+        LogTablet log = createLogTablet(true);
+        long wid1 = 1L;
+
+        // Use appendAsFollower to append records with batchSequence not from 0
+        log.appendAsFollower(
+                genMemoryLogRecordsWithWriterId(
+                        Collections.singletonList(new Object[] {1, "a"}), 
wid1, 10, 0L));
+        log.appendAsFollower(
+                genMemoryLogRecordsWithWriterId(
+                        Collections.singletonList(new Object[] {2, "b"}), 
wid1, 11, 1L));
+        log.roll(Optional.empty());
+
+        log.appendAsFollower(
+                genMemoryLogRecordsWithWriterId(
+                        Collections.singletonList(new Object[] {3, "c"}), 
wid1, 12, 2L));
+        log.appendAsFollower(
+                genMemoryLogRecordsWithWriterId(
+                        Collections.singletonList(new Object[] {4, "d"}), 
wid1, 13, 3L));
+
+        // Close the log, we should now have 2 segments
+        log.close();
+        assertThat(log.logSegments().size()).isEqualTo(2);
+        assertThat(
+                        WriterStateManager.listSnapshotFiles(logDir).stream()
+                                .map(snapshotFile -> snapshotFile.offset)
+                                .sorted())
+                .containsExactly(2L, 4L);
+
+        // Delete all offset index files to trigger segment recover
+        deleteAllOffsetIndexFile(log);
+
+        log = createLogTablet(false);
+        assertThat(
+                        WriterStateManager.listSnapshotFiles(logDir).stream()
+                                .map(snapshotFile -> snapshotFile.offset)
+                                .sorted())
+                .containsExactly(2L, 4L);
+
+        
assertThat(log.writerStateManager().activeWriters().size()).isEqualTo(1);
+        
assertThat(log.writerStateManager().activeWriters().get(wid1).lastBatchSequence())
+                .isEqualTo(13);
+    }
+
+    @Test
+    void testWriterSnapshotsRecoveryAfterCleanShutdown() throws Exception {
+        LogTablet log = createLogTablet(true);
+        assertThat(log.writerStateManager().oldestSnapshotOffset()).isEmpty();
+
+        long wid1 = 1L;
+        long wid2 = 2L;
+        int seq1 = 0;
+        int seq2 = 0;
+
+        // Append some records to create multiple segments,
+        for (int i = 0; i <= 5; i++) {
+            if (i % 2 == 0) {
+                MemoryLogRecords records =
+                        genMemoryLogRecordsWithWriterId(
+                                Collections.singletonList(new Object[] {seq1, 
"a"}),
+                                wid1,
+                                seq1,
+                                0L);
+                log.appendAsLeader(records);
+                seq1++;
+            } else {
+                MemoryLogRecords records =
+                        genMemoryLogRecordsWithWriterId(
+                                Collections.singletonList(new Object[] {seq2, 
"a"}),
+                                wid2,
+                                seq2,
+                                0L);
+                log.appendAsLeader(records);
+                seq2++;
+            }
+            log.roll(Optional.empty());
+        }
+
+        // Append some records to the last segment
+        MemoryLogRecords records =
+                genMemoryLogRecordsByObject(Collections.singletonList(new 
Object[] {1, "a"}));
+        log.appendAsLeader(records);
+        log.close();
+
+        // Test writer state recovery after clean shutdown
+        log = createLogTablet(true);
+
+        List<Long> segmentOffsets =
+                log.logSegments().stream()
+                        .map(LogSegment::getBaseOffset)
+                        .collect(Collectors.toList());
+
+        // verify that the snapshot files are created
+        Set<Long> expectedSnapshotOffsets =
+                new HashSet<>(segmentOffsets.subList(1, 
segmentOffsets.size()));
+        expectedSnapshotOffsets.add(log.localLogEndOffset());
+        Set<Long> actualSnapshotOffsets =
+                WriterStateManager.listSnapshotFiles(logDir).stream()
+                        .map(snapshotFile -> snapshotFile.offset)
+                        .collect(Collectors.toSet());
+        assertThat(actualSnapshotOffsets)
+                .containsExactlyInAnyOrderElementsOf(expectedSnapshotOffsets);
+
+        // Verify that expected writers last batch sequence
+        Map<Long, Integer> actualWritersLastBatchSequence =
+                log.writerStateManager().activeWriters().entrySet().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        Map.Entry::getKey, e -> 
e.getValue().lastBatchSequence()));
+        Map<Long, Integer> expectedWritersLastBatchSequence = new HashMap<>();
+        expectedWritersLastBatchSequence.put(wid1, seq1 - 1);
+        expectedWritersLastBatchSequence.put(wid2, seq2 - 1);
+        
assertThat(actualWritersLastBatchSequence).isEqualTo(expectedWritersLastBatchSequence);
+    }
+
+    @Test
+    void testWriterSnapshotsRecoveryAfterUncleanShutdown() throws Exception {
+        LogTablet log = createLogTablet(true);
+        assertThat(log.writerStateManager().oldestSnapshotOffset()).isEmpty();
+
+        long wid1 = 1L;
+        long wid2 = 2L;
+        int seq1 = 0;
+        int seq2 = 0;
+
+        // Append some records to create multiple segments,
+        // after this step, the segments should be
+        //              [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-]
+        // writer id    1      2      1      2      1      2
+        // se1 id       0      0      1      1      2      2
+        // snapshot            1      2      3      4      5      6
+        for (int i = 0; i <= 5; i++) {
+            if (i % 2 == 0) {
+                MemoryLogRecords records =
+                        genMemoryLogRecordsWithWriterId(
+                                Collections.singletonList(new Object[] {seq1, 
"a"}),
+                                wid1,
+                                seq1,
+                                0L);
+                log.appendAsLeader(records);
+                seq1++;
+            } else {
+                MemoryLogRecords records =
+                        genMemoryLogRecordsWithWriterId(
+                                Collections.singletonList(new Object[] {seq2, 
"a"}),
+                                wid2,
+                                seq2,
+                                0L);
+                log.appendAsLeader(records);
+                seq2++;
+            }
+            log.roll(Optional.empty());
+        }
+        // Append some records to the last segment
+        MemoryLogRecords records =
+                genMemoryLogRecordsByObject(Collections.singletonList(new 
Object[] {1, "a"}));
+        log.appendAsLeader(records);
+
+        assertThat(log.logSegments().size()).isGreaterThanOrEqualTo(5);
+
+        List<Long> segmentOffsets =
+                log.logSegments().stream()
+                        .map(LogSegment::getBaseOffset)
+                        .collect(Collectors.toList());
+
+        // verify that the snapshot files are created
+        Set<Long> expectedSnapshotOffsets =
+                new HashSet<>(segmentOffsets.subList(1, 
segmentOffsets.size()));
+        Set<Long> actualSnapshotOffsets =
+                WriterStateManager.listSnapshotFiles(logDir).stream()
+                        .map(snapshotFile -> snapshotFile.offset)
+                        .collect(Collectors.toSet());
+        assertThat(actualSnapshotOffsets)
+                .containsExactlyInAnyOrderElementsOf(expectedSnapshotOffsets);
+
+        // Verify that expected writers last batch sequence
+        Map<Long, Integer> actualWritersLastBatchSequence =
+                log.writerStateManager().activeWriters().entrySet().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        Map.Entry::getKey, e -> 
e.getValue().lastBatchSequence()));
+        Map<Long, Integer> expectedWritersLastBatchSequence = new HashMap<>();
+        expectedWritersLastBatchSequence.put(wid1, seq1 - 1);
+        expectedWritersLastBatchSequence.put(wid2, seq2 - 1);
+        
assertThat(actualWritersLastBatchSequence).isEqualTo(expectedWritersLastBatchSequence);
+        log.close();
+
+        //  [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-]
+        //                               |----> 
offsetForSegmentAfterRecoveryPoint
+        //                        |----> recoveryPoint
+        long offsetForSegmentAfterRecoveryPoint = 
segmentOffsets.get(segmentOffsets.size() - 3);
+        long recoveryPoint = segmentOffsets.get(segmentOffsets.size() - 4);
+        
assertThat(recoveryPoint).isLessThan(offsetForSegmentAfterRecoveryPoint);
+
+        // 1. Test unclean shut without any recovery
+        // Retain snapshots for the last 2 segments (delete snapshots before 
that)
+        long snapshotRetentionOffset = 
segmentOffsets.get(segmentOffsets.size() - 2);
+        
log.writerStateManager().deleteSnapshotsBefore(snapshotRetentionOffset);
+        log.close();
+
+        // Reopen the log with recovery point. Although we use unclean 
shutdown here,
+        // all the index files are correctly close, so Fluss will not trigger 
recover for any
+        // segment.
+        log = createLogTablet(false, recoveryPoint);
+
+        // Expected snapshot offsets: last 2 segment base offsets + log end 
offset
+        List<Long> lastTowSegmentOffsets =
+                segmentOffsets.subList(
+                        Math.max(0, segmentOffsets.size() - 2), 
segmentOffsets.size());
+        expectedSnapshotOffsets = new HashSet<>(lastTowSegmentOffsets);
+        expectedSnapshotOffsets.add(log.localLogEndOffset());
+
+        // Verify that expected snapshot offsets exist
+        actualSnapshotOffsets =
+                WriterStateManager.listSnapshotFiles(logDir).stream()
+                        .map(snapshotFile -> snapshotFile.offset)
+                        .collect(Collectors.toSet());
+        assertThat(actualSnapshotOffsets).isEqualTo(expectedSnapshotOffsets);
+
+        // Verify that expected writers last batch sequence
+        actualWritersLastBatchSequence =
+                log.writerStateManager().activeWriters().entrySet().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        Map.Entry::getKey, e -> 
e.getValue().lastBatchSequence()));
+        expectedWritersLastBatchSequence = new HashMap<>();
+        expectedWritersLastBatchSequence.put(wid1, seq1 - 1);
+        expectedWritersLastBatchSequence.put(wid2, seq2 - 1);
+        
assertThat(actualWritersLastBatchSequence).isEqualTo(expectedWritersLastBatchSequence);
+        log.close();
+
+        // 2. Test unclean shut down without recover segment will rebuild 
writer state
+        // Delete all snapshot files
+        deleteAllWriterSnapshot(logDir);
+
+        // Reopen the log with recovery point. Although Fluss will not trigger 
recover for any
+        // segment, but the writer state should be rebuilt.
+        log = createLogTablet(false, recoveryPoint);
+
+        actualSnapshotOffsets =
+                WriterStateManager.listSnapshotFiles(logDir).stream()
+                        .map(snapshotFile -> snapshotFile.offset)
+                        .collect(Collectors.toSet());
+        // Will rebuild writer state for all segments, but only take snapshot 
for the last 2
+        // segments and the last offset
+        expectedSnapshotOffsets = new HashSet<>();
+        expectedSnapshotOffsets.add(segmentOffsets.get(segmentOffsets.size() - 
2));
+        expectedSnapshotOffsets.add(segmentOffsets.get(segmentOffsets.size() - 
1));
+        expectedSnapshotOffsets.add(log.localLogEndOffset());
+        assertThat(actualSnapshotOffsets).isEqualTo(expectedSnapshotOffsets);
+
+        // Verify that expected writers last batch sequence
+        actualWritersLastBatchSequence =
+                log.writerStateManager().activeWriters().entrySet().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        Map.Entry::getKey, e -> 
e.getValue().lastBatchSequence()));
+        expectedWritersLastBatchSequence = new HashMap<>();
+        expectedWritersLastBatchSequence.put(wid1, seq1 - 1);
+        expectedWritersLastBatchSequence.put(wid2, seq2 - 1);
+        
assertThat(actualWritersLastBatchSequence).isEqualTo(expectedWritersLastBatchSequence);
+        log.close();
+
+        // 3. Test unclean shut down with recover segment will rebuild writer 
state for each segment
+        // after recovery point.
+        // Delete all snapshot files and index files (to trigger segment 
recover)
+        deleteAllWriterSnapshot(logDir);
+        deleteAllOffsetIndexFile(log);
+
+        // Reopen the log with recovery point. All segments will be recovered 
since we delete all
+        // the index files
+        log = createLogTablet(false, recoveryPoint);
+
+        // Writer snapshot files should be rebuilt for each segment after 
recovery point
+        actualSnapshotOffsets =
+                WriterStateManager.listSnapshotFiles(logDir).stream()
+                        .map(snapshotFile -> snapshotFile.offset)
+                        .collect(Collectors.toSet());
+        expectedSnapshotOffsets =
+                log.logSegments(recoveryPoint, 
log.localLogEndOffset()).stream()
+                        .map(LogSegment::getBaseOffset)
+                        .collect(Collectors.toSet());
+        expectedSnapshotOffsets.add(log.localLogEndOffset());
+        assertThat(actualSnapshotOffsets).isEqualTo(expectedSnapshotOffsets);
+
+        actualWritersLastBatchSequence =
+                log.writerStateManager().activeWriters().entrySet().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        Map.Entry::getKey, e -> 
e.getValue().lastBatchSequence()));
+        expectedWritersLastBatchSequence = new HashMap<>();
+        expectedWritersLastBatchSequence.put(wid1, seq1 - 1);
+        expectedWritersLastBatchSequence.put(wid2, seq2 - 1);
+        
assertThat(actualWritersLastBatchSequence).isEqualTo(expectedWritersLastBatchSequence);
+        log.close();
+    }
+
+    @Test
+    void testLoadingLogKeepsLargestStrayWriterStateSnapshot() throws Exception 
{
+        LogTablet log = createLogTablet(true);
+        long wid1 = 1L;
+
+        log.appendAsLeader(
+                genMemoryLogRecordsWithWriterId(
+                        Collections.singletonList(new Object[] {1, "a"}), 
wid1, 0, 0L));
+        log.roll(Optional.empty());
+        log.appendAsLeader(
+                genMemoryLogRecordsWithWriterId(
+                        Collections.singletonList(new Object[] {2, "b"}), 
wid1, 1, 0L));
+        log.roll(Optional.empty());
+
+        log.appendAsLeader(
+                genMemoryLogRecordsWithWriterId(
+                        Collections.singletonList(new Object[] {3, "c"}), 
wid1, 2, 0L));
+        log.appendAsLeader(
+                genMemoryLogRecordsWithWriterId(
+                        Collections.singletonList(new Object[] {4, "d"}), 
wid1, 3, 0L));
+
+        // Close the log, we should now have 3 segments
+        log.close();
+        assertThat(log.logSegments().size()).isEqualTo(3);
+        // We expect 3 snapshot files, two of which are for the first two 
segments, the last was
+        // written out during log closing.
+        assertThat(
+                        WriterStateManager.listSnapshotFiles(logDir).stream()
+                                .map(snapshotFile -> snapshotFile.offset)
+                                .sorted())
+                .containsExactly(1L, 2L, 4L);
+        // Inject a stray snapshot file within the bounds of the log at offset 
3, it should be
+        // cleaned up after loading the log
+        Path path = FlussPaths.writerSnapshotFile(logDir, 3L).toPath();
+        Files.createFile(path);
+        assertThat(
+                        WriterStateManager.listSnapshotFiles(logDir).stream()
+                                .map(snapshotFile -> snapshotFile.offset)
+                                .sorted())
+                .containsExactly(1L, 2L, 3L, 4L);
+
+        createLogTablet(false);
+        // We should clean up the stray writer state snapshot file, but keep 
the largest snapshot
+        // file (4)
+        assertThat(
+                        WriterStateManager.listSnapshotFiles(logDir).stream()
+                                .map(snapshotFile -> snapshotFile.offset)
+                                .sorted())
+                .containsExactly(1L, 2L, 4L);
+    }
+
     private LogTablet createLogTablet(boolean isCleanShutdown) throws 
Exception {
         return createLogTablet(isCleanShutdown, 0);
     }
@@ -325,4 +683,18 @@ final class LogLoaderTest extends LogTestBase {
         }
         return indexFiles;
     }
+
+    private void deleteAllOffsetIndexFile(LogTablet logTablet) throws 
IOException {
+        List<LogSegment> logSegments = logTablet.logSegments();
+        for (LogSegment segment : logSegments) {
+            segment.offsetIndex().deleteIfExists();
+        }
+    }
+
+    private void deleteAllWriterSnapshot(File logDir) throws Exception {
+        List<SnapshotFile> snapshotFiles = 
WriterStateManager.listSnapshotFiles(logDir);
+        for (SnapshotFile snapshotFile : snapshotFiles) {
+            snapshotFile.deleteIfExists();
+        }
+    }
 }

Reply via email to