This is an automated email from the ASF dual-hosted git repository.
yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 02bf7d9c5 [server] Skip batch sequence validation during writer state
rebuilding (#2188)
02bf7d9c5 is described below
commit 02bf7d9c5376bda05591c166ce440ad836657d38
Author: Liebing <[email protected]>
AuthorDate: Tue Dec 16 20:54:58 2025 +0800
[server] Skip batch sequence validation during writer state rebuilding
(#2188)
---
.../org/apache/fluss/server/log/LogTablet.java | 2 +-
.../org/apache/fluss/server/log/LogLoaderTest.java | 372 +++++++++++++++++++++
2 files changed, 373 insertions(+), 1 deletion(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
index 8a5c54cbd..a62d7c237 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java
@@ -1277,7 +1277,7 @@ public final class LogTablet {
Map<Long, WriterAppendInfo> loadedWriters = new HashMap<>();
for (LogRecordBatch batch : records.batches()) {
if (batch.hasWriterId()) {
- updateWriterAppendInfo(writerStateManager, batch,
loadedWriters, true);
+ updateWriterAppendInfo(writerStateManager, batch,
loadedWriters, false);
}
}
loadedWriters.values().forEach(writerStateManager::update);
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java
index 795b91cff..03d2ddadd 100644
--- a/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java
+++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java
@@ -27,6 +27,7 @@ import org.apache.fluss.record.LogTestBase;
import org.apache.fluss.record.MemoryLogRecords;
import org.apache.fluss.server.exception.CorruptIndexException;
import org.apache.fluss.server.metrics.group.TestingMetricGroups;
+import org.apache.fluss.utils.FlussPaths;
import org.apache.fluss.utils.clock.Clock;
import org.apache.fluss.utils.clock.ManualClock;
import org.apache.fluss.utils.clock.SystemClock;
@@ -40,10 +41,16 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@@ -52,6 +59,8 @@ import static org.apache.fluss.record.TestData.DATA1_TABLE_ID;
import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH;
import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
import static
org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords;
+import static
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
+import static
org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsWithWriterId;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -269,6 +278,355 @@ final class LogLoaderTest extends LogTestBase {
assertThat(logTablet.logSegments().size()).isEqualTo(corruptSegmentIndex + 1);
}
+ @Test
+ void testWriterSnapshotRecoveryFromDiscontinuousBatchSequence() throws
Exception {
+ LogTablet log = createLogTablet(true);
+ long wid1 = 1L;
+
+ // Use appendAsFollower to append records with batchSequence not from 0
+ log.appendAsFollower(
+ genMemoryLogRecordsWithWriterId(
+ Collections.singletonList(new Object[] {1, "a"}),
wid1, 10, 0L));
+ log.appendAsFollower(
+ genMemoryLogRecordsWithWriterId(
+ Collections.singletonList(new Object[] {2, "b"}),
wid1, 11, 1L));
+ log.roll(Optional.empty());
+
+ log.appendAsFollower(
+ genMemoryLogRecordsWithWriterId(
+ Collections.singletonList(new Object[] {3, "c"}),
wid1, 12, 2L));
+ log.appendAsFollower(
+ genMemoryLogRecordsWithWriterId(
+ Collections.singletonList(new Object[] {4, "d"}),
wid1, 13, 3L));
+
+ // Close the log, we should now have 2 segments
+ log.close();
+ assertThat(log.logSegments().size()).isEqualTo(2);
+ assertThat(
+ WriterStateManager.listSnapshotFiles(logDir).stream()
+ .map(snapshotFile -> snapshotFile.offset)
+ .sorted())
+ .containsExactly(2L, 4L);
+
+ // Delete all offset index files to trigger segment recover
+ deleteAllOffsetIndexFile(log);
+
+ log = createLogTablet(false);
+ assertThat(
+ WriterStateManager.listSnapshotFiles(logDir).stream()
+ .map(snapshotFile -> snapshotFile.offset)
+ .sorted())
+ .containsExactly(2L, 4L);
+
+
assertThat(log.writerStateManager().activeWriters().size()).isEqualTo(1);
+
assertThat(log.writerStateManager().activeWriters().get(wid1).lastBatchSequence())
+ .isEqualTo(13);
+ }
+
+ @Test
+ void testWriterSnapshotsRecoveryAfterCleanShutdown() throws Exception {
+ LogTablet log = createLogTablet(true);
+ assertThat(log.writerStateManager().oldestSnapshotOffset()).isEmpty();
+
+ long wid1 = 1L;
+ long wid2 = 2L;
+ int seq1 = 0;
+ int seq2 = 0;
+
+ // Append some records to create multiple segments,
+ for (int i = 0; i <= 5; i++) {
+ if (i % 2 == 0) {
+ MemoryLogRecords records =
+ genMemoryLogRecordsWithWriterId(
+ Collections.singletonList(new Object[] {seq1,
"a"}),
+ wid1,
+ seq1,
+ 0L);
+ log.appendAsLeader(records);
+ seq1++;
+ } else {
+ MemoryLogRecords records =
+ genMemoryLogRecordsWithWriterId(
+ Collections.singletonList(new Object[] {seq2,
"a"}),
+ wid2,
+ seq2,
+ 0L);
+ log.appendAsLeader(records);
+ seq2++;
+ }
+ log.roll(Optional.empty());
+ }
+
+ // Append some records to the last segment
+ MemoryLogRecords records =
+ genMemoryLogRecordsByObject(Collections.singletonList(new
Object[] {1, "a"}));
+ log.appendAsLeader(records);
+ log.close();
+
+ // Test writer state recovery after clean shutdown
+ log = createLogTablet(true);
+
+ List<Long> segmentOffsets =
+ log.logSegments().stream()
+ .map(LogSegment::getBaseOffset)
+ .collect(Collectors.toList());
+
+ // verify that the snapshot files are created
+ Set<Long> expectedSnapshotOffsets =
+ new HashSet<>(segmentOffsets.subList(1,
segmentOffsets.size()));
+ expectedSnapshotOffsets.add(log.localLogEndOffset());
+ Set<Long> actualSnapshotOffsets =
+ WriterStateManager.listSnapshotFiles(logDir).stream()
+ .map(snapshotFile -> snapshotFile.offset)
+ .collect(Collectors.toSet());
+ assertThat(actualSnapshotOffsets)
+ .containsExactlyInAnyOrderElementsOf(expectedSnapshotOffsets);
+
+ // Verify that expected writers last batch sequence
+ Map<Long, Integer> actualWritersLastBatchSequence =
+ log.writerStateManager().activeWriters().entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey, e ->
e.getValue().lastBatchSequence()));
+ Map<Long, Integer> expectedWritersLastBatchSequence = new HashMap<>();
+ expectedWritersLastBatchSequence.put(wid1, seq1 - 1);
+ expectedWritersLastBatchSequence.put(wid2, seq2 - 1);
+
assertThat(actualWritersLastBatchSequence).isEqualTo(expectedWritersLastBatchSequence);
+ }
+
+ @Test
+ void testWriterSnapshotsRecoveryAfterUncleanShutdown() throws Exception {
+ LogTablet log = createLogTablet(true);
+ assertThat(log.writerStateManager().oldestSnapshotOffset()).isEmpty();
+
+ long wid1 = 1L;
+ long wid2 = 2L;
+ int seq1 = 0;
+ int seq2 = 0;
+
+ // Append some records to create multiple segments,
+ // after this step, the segments should be
+ // [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-]
+ // writer id 1 2 1 2 1 2
+ // se1 id 0 0 1 1 2 2
+ // snapshot 1 2 3 4 5 6
+ for (int i = 0; i <= 5; i++) {
+ if (i % 2 == 0) {
+ MemoryLogRecords records =
+ genMemoryLogRecordsWithWriterId(
+ Collections.singletonList(new Object[] {seq1,
"a"}),
+ wid1,
+ seq1,
+ 0L);
+ log.appendAsLeader(records);
+ seq1++;
+ } else {
+ MemoryLogRecords records =
+ genMemoryLogRecordsWithWriterId(
+ Collections.singletonList(new Object[] {seq2,
"a"}),
+ wid2,
+ seq2,
+ 0L);
+ log.appendAsLeader(records);
+ seq2++;
+ }
+ log.roll(Optional.empty());
+ }
+ // Append some records to the last segment
+ MemoryLogRecords records =
+ genMemoryLogRecordsByObject(Collections.singletonList(new
Object[] {1, "a"}));
+ log.appendAsLeader(records);
+
+ assertThat(log.logSegments().size()).isGreaterThanOrEqualTo(5);
+
+ List<Long> segmentOffsets =
+ log.logSegments().stream()
+ .map(LogSegment::getBaseOffset)
+ .collect(Collectors.toList());
+
+ // verify that the snapshot files are created
+ Set<Long> expectedSnapshotOffsets =
+ new HashSet<>(segmentOffsets.subList(1,
segmentOffsets.size()));
+ Set<Long> actualSnapshotOffsets =
+ WriterStateManager.listSnapshotFiles(logDir).stream()
+ .map(snapshotFile -> snapshotFile.offset)
+ .collect(Collectors.toSet());
+ assertThat(actualSnapshotOffsets)
+ .containsExactlyInAnyOrderElementsOf(expectedSnapshotOffsets);
+
+ // Verify that expected writers last batch sequence
+ Map<Long, Integer> actualWritersLastBatchSequence =
+ log.writerStateManager().activeWriters().entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey, e ->
e.getValue().lastBatchSequence()));
+ Map<Long, Integer> expectedWritersLastBatchSequence = new HashMap<>();
+ expectedWritersLastBatchSequence.put(wid1, seq1 - 1);
+ expectedWritersLastBatchSequence.put(wid2, seq2 - 1);
+
assertThat(actualWritersLastBatchSequence).isEqualTo(expectedWritersLastBatchSequence);
+ log.close();
+
+ // [0-0], [1-1], [2-2], [3-3], [4-4], [5-5], [6-]
+ // |---->
offsetForSegmentAfterRecoveryPoint
+ // |----> recoveryPoint
+ long offsetForSegmentAfterRecoveryPoint =
segmentOffsets.get(segmentOffsets.size() - 3);
+ long recoveryPoint = segmentOffsets.get(segmentOffsets.size() - 4);
+
assertThat(recoveryPoint).isLessThan(offsetForSegmentAfterRecoveryPoint);
+
+ // 1. Test unclean shut without any recovery
+ // Retain snapshots for the last 2 segments (delete snapshots before
that)
+ long snapshotRetentionOffset =
segmentOffsets.get(segmentOffsets.size() - 2);
+
log.writerStateManager().deleteSnapshotsBefore(snapshotRetentionOffset);
+ log.close();
+
+ // Reopen the log with recovery point. Although we use unclean
shutdown here,
+ // all the index files are correctly close, so Fluss will not trigger
recover for any
+ // segment.
+ log = createLogTablet(false, recoveryPoint);
+
+ // Expected snapshot offsets: last 2 segment base offsets + log end
offset
+ List<Long> lastTowSegmentOffsets =
+ segmentOffsets.subList(
+ Math.max(0, segmentOffsets.size() - 2),
segmentOffsets.size());
+ expectedSnapshotOffsets = new HashSet<>(lastTowSegmentOffsets);
+ expectedSnapshotOffsets.add(log.localLogEndOffset());
+
+ // Verify that expected snapshot offsets exist
+ actualSnapshotOffsets =
+ WriterStateManager.listSnapshotFiles(logDir).stream()
+ .map(snapshotFile -> snapshotFile.offset)
+ .collect(Collectors.toSet());
+ assertThat(actualSnapshotOffsets).isEqualTo(expectedSnapshotOffsets);
+
+ // Verify that expected writers last batch sequence
+ actualWritersLastBatchSequence =
+ log.writerStateManager().activeWriters().entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey, e ->
e.getValue().lastBatchSequence()));
+ expectedWritersLastBatchSequence = new HashMap<>();
+ expectedWritersLastBatchSequence.put(wid1, seq1 - 1);
+ expectedWritersLastBatchSequence.put(wid2, seq2 - 1);
+
assertThat(actualWritersLastBatchSequence).isEqualTo(expectedWritersLastBatchSequence);
+ log.close();
+
+ // 2. Test unclean shut down without recover segment will rebuild
writer state
+ // Delete all snapshot files
+ deleteAllWriterSnapshot(logDir);
+
+ // Reopen the log with recovery point. Although Fluss will not trigger
recover for any
+ // segment, but the writer state should be rebuilt.
+ log = createLogTablet(false, recoveryPoint);
+
+ actualSnapshotOffsets =
+ WriterStateManager.listSnapshotFiles(logDir).stream()
+ .map(snapshotFile -> snapshotFile.offset)
+ .collect(Collectors.toSet());
+ // Will rebuild writer state for all segments, but only take snapshot
for the last 2
+ // segments and the last offset
+ expectedSnapshotOffsets = new HashSet<>();
+ expectedSnapshotOffsets.add(segmentOffsets.get(segmentOffsets.size() -
2));
+ expectedSnapshotOffsets.add(segmentOffsets.get(segmentOffsets.size() -
1));
+ expectedSnapshotOffsets.add(log.localLogEndOffset());
+ assertThat(actualSnapshotOffsets).isEqualTo(expectedSnapshotOffsets);
+
+ // Verify that expected writers last batch sequence
+ actualWritersLastBatchSequence =
+ log.writerStateManager().activeWriters().entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey, e ->
e.getValue().lastBatchSequence()));
+ expectedWritersLastBatchSequence = new HashMap<>();
+ expectedWritersLastBatchSequence.put(wid1, seq1 - 1);
+ expectedWritersLastBatchSequence.put(wid2, seq2 - 1);
+
assertThat(actualWritersLastBatchSequence).isEqualTo(expectedWritersLastBatchSequence);
+ log.close();
+
+ // 3. Test unclean shut down with recover segment will rebuild writer
state for each segment
+ // after recovery point.
+ // Delete all snapshot files and index files (to trigger segment
recover)
+ deleteAllWriterSnapshot(logDir);
+ deleteAllOffsetIndexFile(log);
+
+ // Reopen the log with recovery point. All segments will be recovered
since we delete all
+ // the index files
+ log = createLogTablet(false, recoveryPoint);
+
+ // Writer snapshot files should be rebuilt for each segment after
recovery point
+ actualSnapshotOffsets =
+ WriterStateManager.listSnapshotFiles(logDir).stream()
+ .map(snapshotFile -> snapshotFile.offset)
+ .collect(Collectors.toSet());
+ expectedSnapshotOffsets =
+ log.logSegments(recoveryPoint,
log.localLogEndOffset()).stream()
+ .map(LogSegment::getBaseOffset)
+ .collect(Collectors.toSet());
+ expectedSnapshotOffsets.add(log.localLogEndOffset());
+ assertThat(actualSnapshotOffsets).isEqualTo(expectedSnapshotOffsets);
+
+ actualWritersLastBatchSequence =
+ log.writerStateManager().activeWriters().entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey, e ->
e.getValue().lastBatchSequence()));
+ expectedWritersLastBatchSequence = new HashMap<>();
+ expectedWritersLastBatchSequence.put(wid1, seq1 - 1);
+ expectedWritersLastBatchSequence.put(wid2, seq2 - 1);
+
assertThat(actualWritersLastBatchSequence).isEqualTo(expectedWritersLastBatchSequence);
+ log.close();
+ }
+
+ @Test
+ void testLoadingLogKeepsLargestStrayWriterStateSnapshot() throws Exception
{
+ LogTablet log = createLogTablet(true);
+ long wid1 = 1L;
+
+ log.appendAsLeader(
+ genMemoryLogRecordsWithWriterId(
+ Collections.singletonList(new Object[] {1, "a"}),
wid1, 0, 0L));
+ log.roll(Optional.empty());
+ log.appendAsLeader(
+ genMemoryLogRecordsWithWriterId(
+ Collections.singletonList(new Object[] {2, "b"}),
wid1, 1, 0L));
+ log.roll(Optional.empty());
+
+ log.appendAsLeader(
+ genMemoryLogRecordsWithWriterId(
+ Collections.singletonList(new Object[] {3, "c"}),
wid1, 2, 0L));
+ log.appendAsLeader(
+ genMemoryLogRecordsWithWriterId(
+ Collections.singletonList(new Object[] {4, "d"}),
wid1, 3, 0L));
+
+ // Close the log, we should now have 3 segments
+ log.close();
+ assertThat(log.logSegments().size()).isEqualTo(3);
+ // We expect 3 snapshot files, two of which are for the first two
segments, the last was
+ // written out during log closing.
+ assertThat(
+ WriterStateManager.listSnapshotFiles(logDir).stream()
+ .map(snapshotFile -> snapshotFile.offset)
+ .sorted())
+ .containsExactly(1L, 2L, 4L);
+ // Inject a stray snapshot file within the bounds of the log at offset
3, it should be
+ // cleaned up after loading the log
+ Path path = FlussPaths.writerSnapshotFile(logDir, 3L).toPath();
+ Files.createFile(path);
+ assertThat(
+ WriterStateManager.listSnapshotFiles(logDir).stream()
+ .map(snapshotFile -> snapshotFile.offset)
+ .sorted())
+ .containsExactly(1L, 2L, 3L, 4L);
+
+ createLogTablet(false);
+ // We should clean up the stray writer state snapshot file, but keep
the largest snapshot
+ // file (4)
+ assertThat(
+ WriterStateManager.listSnapshotFiles(logDir).stream()
+ .map(snapshotFile -> snapshotFile.offset)
+ .sorted())
+ .containsExactly(1L, 2L, 4L);
+ }
+
private LogTablet createLogTablet(boolean isCleanShutdown) throws
Exception {
return createLogTablet(isCleanShutdown, 0);
}
@@ -325,4 +683,18 @@ final class LogLoaderTest extends LogTestBase {
}
return indexFiles;
}
+
+ private void deleteAllOffsetIndexFile(LogTablet logTablet) throws
IOException {
+ List<LogSegment> logSegments = logTablet.logSegments();
+ for (LogSegment segment : logSegments) {
+ segment.offsetIndex().deleteIfExists();
+ }
+ }
+
+ private void deleteAllWriterSnapshot(File logDir) throws Exception {
+ List<SnapshotFile> snapshotFiles =
WriterStateManager.listSnapshotFiles(logDir);
+ for (SnapshotFile snapshotFile : snapshotFiles) {
+ snapshotFile.deleteIfExists();
+ }
+ }
}