This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 2c73e763e [kv] TabletSever must handle inconsistency or broken
snapshot scenarios. (#1482)
2c73e763e is described below
commit 2c73e763e50e05f8cec901577ae2697b358d994b
Author: Yang Wang <[email protected]>
AuthorDate: Mon Sep 8 19:04:23 2025 +0800
[kv] TabletSever must handle inconsistency or broken snapshot scenarios.
(#1482)
---
.../server/kv/snapshot/DefaultSnapshotContext.java | 6 ++
.../fluss/server/kv/snapshot/SnapshotContext.java | 13 +++
.../org/apache/fluss/server/replica/Replica.java | 18 +++-
.../TestingCompletedKvSnapshotCommitter.java | 18 ++++
.../apache/fluss/server/replica/ReplicaTest.java | 100 +++++++++++++++++++++
.../fluss/server/replica/ReplicaTestBase.java | 8 ++
6 files changed, 162 insertions(+), 1 deletion(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/DefaultSnapshotContext.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/DefaultSnapshotContext.java
index 4fafac676..a6894851b 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/DefaultSnapshotContext.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/DefaultSnapshotContext.java
@@ -155,4 +155,10 @@ public class DefaultSnapshotContext implements
SnapshotContext {
public int maxFetchLogSizeInRecoverKv() {
return maxFetchLogSizeInRecoverKv;
}
+
+ @Override
+ public void handleSnapshotBroken(CompletedSnapshot snapshot) throws
Exception {
+ completedSnapshotHandleStore.remove(snapshot.getTableBucket(),
snapshot.getSnapshotID());
+ snapshot.discardAsync(asyncOperationsThreadPool);
+ }
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotContext.java
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotContext.java
index 191c19e6b..8f528bc47 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotContext.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotContext.java
@@ -64,6 +64,19 @@ public interface SnapshotContext {
FunctionWithException<TableBucket, CompletedSnapshot, Exception>
getLatestCompletedSnapshotProvider();
+ /**
+ * Handles broken snapshots.
+ *
+ * <p>In the current implementation, broken snapshots may already have
occurred in production
+ * environments due to issues such as
https://github.com/apache/fluss/issues/1304. While we must
+ * prevent inconsistent or broken snapshots from being committed, we also
need to provide
+ * mechanisms to help the server recover from such snapshots rather than
failing permanently.
+ *
+ * @param snapshot The broken snapshot to handle
+ * @throws Exception if recovery handling fails
+ */
+ void handleSnapshotBroken(CompletedSnapshot snapshot) throws Exception;
+
/**
* Get the max fetch size for fetching log to apply kv during recovering
kv. The kv may apply
* log during recovering.
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index ddae557c0..e86a29b1d 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -176,6 +176,7 @@ public final class Replica {
private final ReadWriteLock leaderIsrUpdateLock = new
ReentrantReadWriteLock();
private final Clock clock;
+ private static final int INIT_KV_TABLET_MAX_RETRY_TIMES = 5;
/**
* storing the remote follower replicas' state, used to update leader's
highWatermark and
* replica ISR.
@@ -543,7 +544,15 @@ public final class Replica {
}
// init kv tablet and get the snapshot it uses to init if have any
- Optional<CompletedSnapshot> snapshotUsed = initKvTablet();
+ Optional<CompletedSnapshot> snapshotUsed = Optional.empty();
+ for (int i = 1; i <= INIT_KV_TABLET_MAX_RETRY_TIMES; i++) {
+ try {
+ snapshotUsed = initKvTablet();
+ break;
+ } catch (Exception e) {
+ LOG.warn("Fail to init kv tablet, retrying for {} times", i,
e);
+ }
+ }
// start periodic kv snapshot
startPeriodicKvSnapshot(snapshotUsed.orElse(null));
}
@@ -660,6 +669,13 @@ public final class Replica {
try {
kvSnapshotDataDownloader.transferAllDataToDirectory(downloadSpec,
closeableRegistry);
} catch (Exception e) {
+ if
(e.getMessage().contains(CompletedSnapshot.SNAPSHOT_DATA_NOT_EXISTS_ERROR_MESSAGE))
{
+ try {
+ snapshotContext.handleSnapshotBroken(completedSnapshot);
+ } catch (Exception t) {
+ LOG.error("Handle broken snapshot {} failed.",
completedSnapshot, t);
+ }
+ }
throw new IOException("Fail to download kv snapshot.", e);
}
long end = clock.milliseconds();
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/TestingCompletedKvSnapshotCommitter.java
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/TestingCompletedKvSnapshotCommitter.java
index 8ca02fd16..5f2cddf70 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/TestingCompletedKvSnapshotCommitter.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/TestingCompletedKvSnapshotCommitter.java
@@ -85,4 +85,22 @@ public class TestingCompletedKvSnapshotCommitter implements
CompletedKvSnapshotC
}
return -1;
}
+
+ /**
+ * Remove a snapshot with the given snapshot ID from the store. This
simulates the cleanup of
+ * broken snapshot (metadata from ZooKeeper exists, but data was
corrupted).
+ */
+ public void removeSnapshot(TableBucket tableBucket, long snapshotId) {
+ Deque<CompletedSnapshot> bucketSnapshots = snapshots.get(tableBucket);
+ if (bucketSnapshots != null) {
+ // Remove the snapshot with matching ID
+ bucketSnapshots.removeIf(snapshot -> snapshot.getSnapshotID() ==
snapshotId);
+ }
+
+ Map<Long, Integer> bucketSnapshotLeaderEpochMap =
+ bucketSnapshotLeaderEpoch.get(tableBucket);
+ if (bucketSnapshotLeaderEpochMap != null) {
+ bucketSnapshotLeaderEpochMap.remove(snapshotId);
+ }
+ }
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
index b0b7d02a5..526b1b350 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
@@ -439,6 +439,106 @@ final class ReplicaTest extends ReplicaTestBase {
.isEqualTo(latestLeaderEpoch);
}
+ @Test
+ void testBrokenSnapshotRecovery(@TempDir File snapshotKvTabletDir) throws
Exception {
+ TableBucket tableBucket = new TableBucket(DATA1_TABLE_ID_PK, 1);
+
+ // create test context with custom snapshot store
+ TestSnapshotContext testKvSnapshotContext =
+ new TestSnapshotContext(snapshotKvTabletDir.getPath());
+ ManuallyTriggeredScheduledExecutorService scheduledExecutorService =
+ testKvSnapshotContext.scheduledExecutorService;
+ TestingCompletedKvSnapshotCommitter kvSnapshotStore =
+ testKvSnapshotContext.testKvSnapshotStore;
+
+ // create a replica and make it leader
+ Replica kvReplica =
+ makeKvReplica(DATA1_PHYSICAL_TABLE_PATH_PK, tableBucket,
testKvSnapshotContext);
+ makeKvReplicaAsLeader(kvReplica);
+
+ // put initial data and create first snapshot
+ KvRecordBatch kvRecords =
+ genKvRecordBatch(
+ Tuple2.of("k1", new Object[] {1, "a"}),
+ Tuple2.of("k2", new Object[] {2, "b"}));
+ putRecordsToLeader(kvReplica, kvRecords);
+
+ // trigger first snapshot
+ scheduledExecutorService.triggerNonPeriodicScheduledTask();
+ kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 0);
+
+ // put more data and create second snapshot
+ kvRecords =
+ genKvRecordBatch(
+ Tuple2.of("k1", new Object[] {3, "c"}),
+ Tuple2.of("k3", new Object[] {4, "d"}));
+ putRecordsToLeader(kvReplica, kvRecords);
+
+ // trigger second snapshot
+ scheduledExecutorService.triggerNonPeriodicScheduledTask();
+ kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 1);
+
+ // put more data and create third snapshot (this will be the broken
one)
+ kvRecords =
+ genKvRecordBatch(
+ Tuple2.of("k4", new Object[] {5, "e"}),
+ Tuple2.of("k5", new Object[] {6, "f"}));
+ putRecordsToLeader(kvReplica, kvRecords);
+
+ // trigger third snapshot
+ scheduledExecutorService.triggerNonPeriodicScheduledTask();
+ CompletedSnapshot snapshot2 =
kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 2);
+
+ // verify that snapshot2 is the latest one before we break it
+
assertThat(kvSnapshotStore.getLatestCompletedSnapshot(tableBucket).getSnapshotID())
+ .isEqualTo(2);
+
+ // now simulate the latest snapshot (snapshot2) being broken by
+ // deleting its metadata files and unshared SST files
+ // This simulates file corruption while ZK metadata remains intact
+ snapshot2.getKvSnapshotHandle().discard();
+
+ // ZK metadata should still show snapshot2 as latest (file corruption
hasn't been detected
+ // yet)
+
assertThat(kvSnapshotStore.getLatestCompletedSnapshot(tableBucket).getSnapshotID())
+ .isEqualTo(2);
+
+ // make the replica follower to destroy the current kv tablet
+ makeKvReplicaAsFollower(kvReplica, 1);
+
+ // create a new replica with the same snapshot context
+ // During initialization, it will try to use snapshot2 but find it
broken,
+ // then handle the broken snapshot and fall back to snapshot1
+ testKvSnapshotContext =
+ new TestSnapshotContext(snapshotKvTabletDir.getPath(),
kvSnapshotStore);
+ kvReplica = makeKvReplica(DATA1_PHYSICAL_TABLE_PATH_PK, tableBucket,
testKvSnapshotContext);
+
+ // make it leader again - this should trigger the broken snapshot
recovery logic
+ // The system should detect that snapshot2 files are missing, clean up
its metadata,
+ // and successfully recover using snapshot1
+ makeKvReplicaAsLeader(kvReplica, 2);
+
+ // verify that KvTablet is successfully initialized despite the broken
snapshot
+ assertThat(kvReplica.getKvTablet()).isNotNull();
+ KvTablet kvTablet = kvReplica.getKvTablet();
+
+ // verify that the data from snapshot1 is restored (snapshot2 was
broken and cleaned up)
+ // snapshot1 should contain: k1->3,c and k3->4,d
+ List<Tuple2<byte[], byte[]>> expectedKeyValues =
+ getKeyValuePairs(
+ genKvRecords(
+ Tuple2.of("k1", new Object[] {3, "c"}),
+ Tuple2.of("k3", new Object[] {4, "d"})));
+ verifyGetKeyValues(kvTablet, expectedKeyValues);
+
+ // Verify the core functionality: KvTablet successfully initialized
despite broken snapshot
+ // The key test is that the system can handle broken snapshots and
recover correctly
+
+ // Verify that we successfully simulated the broken snapshot condition
+ File metadataFile = new
File(snapshot2.getMetadataFilePath().getPath());
+ assertThat(metadataFile.exists()).isFalse();
+ }
+
@Test
void testRestore(@TempDir Path snapshotKvTabletDirPath) throws Exception {
TableBucket tableBucket = new TableBucket(DATA1_TABLE_ID_PK, 1);
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
index 55df33f6c..8f0b271bb 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
@@ -640,5 +640,13 @@ public class ReplicaTestBase {
private void unchecked(ThrowingRunnable<?> throwingRunnable) {
ThrowingRunnable.unchecked(throwingRunnable).run();
}
+
+ @Override
+ public void handleSnapshotBroken(CompletedSnapshot snapshot) throws
Exception {
+ // Remove the broken snapshot from the snapshot store (simulating
ZK metadata removal)
+ testKvSnapshotStore.removeSnapshot(snapshot.getTableBucket(),
snapshot.getSnapshotID());
+ // Discard the snapshot files async (similar to
DefaultSnapshotContext implementation)
+ snapshot.discardAsync(executorService);
+ }
}
}