This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c73e763e [kv] TabletSever must handle inconsistency or broken 
snapshot scenarios. (#1482)
2c73e763e is described below

commit 2c73e763e50e05f8cec901577ae2697b358d994b
Author: Yang Wang <[email protected]>
AuthorDate: Mon Sep 8 19:04:23 2025 +0800

    [kv] TabletSever must handle inconsistency or broken snapshot scenarios. 
(#1482)
---
 .../server/kv/snapshot/DefaultSnapshotContext.java |   6 ++
 .../fluss/server/kv/snapshot/SnapshotContext.java  |  13 +++
 .../org/apache/fluss/server/replica/Replica.java   |  18 +++-
 .../TestingCompletedKvSnapshotCommitter.java       |  18 ++++
 .../apache/fluss/server/replica/ReplicaTest.java   | 100 +++++++++++++++++++++
 .../fluss/server/replica/ReplicaTestBase.java      |   8 ++
 6 files changed, 162 insertions(+), 1 deletion(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/DefaultSnapshotContext.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/DefaultSnapshotContext.java
index 4fafac676..a6894851b 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/DefaultSnapshotContext.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/DefaultSnapshotContext.java
@@ -155,4 +155,10 @@ public class DefaultSnapshotContext implements 
SnapshotContext {
     public int maxFetchLogSizeInRecoverKv() {
         return maxFetchLogSizeInRecoverKv;
     }
+
+    @Override
+    public void handleSnapshotBroken(CompletedSnapshot snapshot) throws 
Exception {
+        completedSnapshotHandleStore.remove(snapshot.getTableBucket(), 
snapshot.getSnapshotID());
+        snapshot.discardAsync(asyncOperationsThreadPool);
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotContext.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotContext.java
index 191c19e6b..8f528bc47 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotContext.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/kv/snapshot/SnapshotContext.java
@@ -64,6 +64,19 @@ public interface SnapshotContext {
     FunctionWithException<TableBucket, CompletedSnapshot, Exception>
             getLatestCompletedSnapshotProvider();
 
+    /**
+     * Handles broken snapshots.
+     *
+     * <p>In the current implementation, broken snapshots may already have 
occurred in production
+     * environments due to issues such as 
https://github.com/apache/fluss/issues/1304. While we must
+     * prevent inconsistent or broken snapshots from being committed, we also 
need to provide
+     * mechanisms to help the server recover from such snapshots rather than 
failing permanently.
+     *
+     * @param snapshot The broken snapshot to handle
+     * @throws Exception if recovery handling fails
+     */
+    void handleSnapshotBroken(CompletedSnapshot snapshot) throws Exception;
+
     /**
      * Get the max fetch size for fetching log to apply kv during recovering 
kv. The kv may apply
      * log during recovering.
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index ddae557c0..e86a29b1d 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -176,6 +176,7 @@ public final class Replica {
     private final ReadWriteLock leaderIsrUpdateLock = new 
ReentrantReadWriteLock();
     private final Clock clock;
 
+    private static final int INIT_KV_TABLET_MAX_RETRY_TIMES = 5;
     /**
      * storing the remote follower replicas' state, used to update leader's 
highWatermark and
      * replica ISR.
@@ -543,7 +544,15 @@ public final class Replica {
         }
 
         // init kv tablet and get the snapshot it uses to init if have any
-        Optional<CompletedSnapshot> snapshotUsed = initKvTablet();
+        Optional<CompletedSnapshot> snapshotUsed = Optional.empty();
+        for (int i = 1; i <= INIT_KV_TABLET_MAX_RETRY_TIMES; i++) {
+            try {
+                snapshotUsed = initKvTablet();
+                break;
+            } catch (Exception e) {
+                LOG.warn("Fail to init kv tablet, retrying for {} times", i, 
e);
+            }
+        }
         // start periodic kv snapshot
         startPeriodicKvSnapshot(snapshotUsed.orElse(null));
     }
@@ -660,6 +669,13 @@ public final class Replica {
         try {
             kvSnapshotDataDownloader.transferAllDataToDirectory(downloadSpec, 
closeableRegistry);
         } catch (Exception e) {
+            if 
(e.getMessage().contains(CompletedSnapshot.SNAPSHOT_DATA_NOT_EXISTS_ERROR_MESSAGE))
 {
+                try {
+                    snapshotContext.handleSnapshotBroken(completedSnapshot);
+                } catch (Exception t) {
+                    LOG.error("Handle broken snapshot {} failed.", 
completedSnapshot, t);
+                }
+            }
             throw new IOException("Fail to download kv snapshot.", e);
         }
         long end = clock.milliseconds();
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/TestingCompletedKvSnapshotCommitter.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/TestingCompletedKvSnapshotCommitter.java
index 8ca02fd16..5f2cddf70 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/TestingCompletedKvSnapshotCommitter.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/kv/snapshot/TestingCompletedKvSnapshotCommitter.java
@@ -85,4 +85,22 @@ public class TestingCompletedKvSnapshotCommitter implements 
CompletedKvSnapshotC
         }
         return -1;
     }
+
+    /**
+     * Remove a snapshot with the given snapshot ID from the store. This 
simulates the cleanup of
+     * broken snapshot (metadata from ZooKeeper exists, but data was 
corrupted).
+     */
+    public void removeSnapshot(TableBucket tableBucket, long snapshotId) {
+        Deque<CompletedSnapshot> bucketSnapshots = snapshots.get(tableBucket);
+        if (bucketSnapshots != null) {
+            // Remove the snapshot with matching ID
+            bucketSnapshots.removeIf(snapshot -> snapshot.getSnapshotID() == 
snapshotId);
+        }
+
+        Map<Long, Integer> bucketSnapshotLeaderEpochMap =
+                bucketSnapshotLeaderEpoch.get(tableBucket);
+        if (bucketSnapshotLeaderEpochMap != null) {
+            bucketSnapshotLeaderEpochMap.remove(snapshotId);
+        }
+    }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
index b0b7d02a5..526b1b350 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
@@ -439,6 +439,106 @@ final class ReplicaTest extends ReplicaTestBase {
                 .isEqualTo(latestLeaderEpoch);
     }
 
+    @Test
+    void testBrokenSnapshotRecovery(@TempDir File snapshotKvTabletDir) throws 
Exception {
+        TableBucket tableBucket = new TableBucket(DATA1_TABLE_ID_PK, 1);
+
+        // create test context with custom snapshot store
+        TestSnapshotContext testKvSnapshotContext =
+                new TestSnapshotContext(snapshotKvTabletDir.getPath());
+        ManuallyTriggeredScheduledExecutorService scheduledExecutorService =
+                testKvSnapshotContext.scheduledExecutorService;
+        TestingCompletedKvSnapshotCommitter kvSnapshotStore =
+                testKvSnapshotContext.testKvSnapshotStore;
+
+        // create a replica and make it leader
+        Replica kvReplica =
+                makeKvReplica(DATA1_PHYSICAL_TABLE_PATH_PK, tableBucket, 
testKvSnapshotContext);
+        makeKvReplicaAsLeader(kvReplica);
+
+        // put initial data and create first snapshot
+        KvRecordBatch kvRecords =
+                genKvRecordBatch(
+                        Tuple2.of("k1", new Object[] {1, "a"}),
+                        Tuple2.of("k2", new Object[] {2, "b"}));
+        putRecordsToLeader(kvReplica, kvRecords);
+
+        // trigger first snapshot
+        scheduledExecutorService.triggerNonPeriodicScheduledTask();
+        kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 0);
+
+        // put more data and create second snapshot
+        kvRecords =
+                genKvRecordBatch(
+                        Tuple2.of("k1", new Object[] {3, "c"}),
+                        Tuple2.of("k3", new Object[] {4, "d"}));
+        putRecordsToLeader(kvReplica, kvRecords);
+
+        // trigger second snapshot
+        scheduledExecutorService.triggerNonPeriodicScheduledTask();
+        kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 1);
+
+        // put more data and create third snapshot (this will be the broken 
one)
+        kvRecords =
+                genKvRecordBatch(
+                        Tuple2.of("k4", new Object[] {5, "e"}),
+                        Tuple2.of("k5", new Object[] {6, "f"}));
+        putRecordsToLeader(kvReplica, kvRecords);
+
+        // trigger third snapshot
+        scheduledExecutorService.triggerNonPeriodicScheduledTask();
+        CompletedSnapshot snapshot2 = 
kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 2);
+
+        // verify that snapshot2 is the latest one before we break it
+        
assertThat(kvSnapshotStore.getLatestCompletedSnapshot(tableBucket).getSnapshotID())
+                .isEqualTo(2);
+
+        // now simulate the latest snapshot (snapshot2) being broken by
+        // deleting its metadata files and unshared SST files
+        // This simulates file corruption while ZK metadata remains intact
+        snapshot2.getKvSnapshotHandle().discard();
+
+        // ZK metadata should still show snapshot2 as latest (file corruption 
hasn't been detected
+        // yet)
+        
assertThat(kvSnapshotStore.getLatestCompletedSnapshot(tableBucket).getSnapshotID())
+                .isEqualTo(2);
+
+        // make the replica follower to destroy the current kv tablet
+        makeKvReplicaAsFollower(kvReplica, 1);
+
+        // create a new replica with the same snapshot context
+        // During initialization, it will try to use snapshot2 but find it 
broken,
+        // then handle the broken snapshot and fall back to snapshot1
+        testKvSnapshotContext =
+                new TestSnapshotContext(snapshotKvTabletDir.getPath(), 
kvSnapshotStore);
+        kvReplica = makeKvReplica(DATA1_PHYSICAL_TABLE_PATH_PK, tableBucket, 
testKvSnapshotContext);
+
+        // make it leader again - this should trigger the broken snapshot 
recovery logic
+        // The system should detect that snapshot2 files are missing, clean up 
its metadata,
+        // and successfully recover using snapshot1
+        makeKvReplicaAsLeader(kvReplica, 2);
+
+        // verify that KvTablet is successfully initialized despite the broken 
snapshot
+        assertThat(kvReplica.getKvTablet()).isNotNull();
+        KvTablet kvTablet = kvReplica.getKvTablet();
+
+        // verify that the data from snapshot1 is restored (snapshot2 was 
broken and cleaned up)
+        // snapshot1 should contain: k1->3,c and k3->4,d
+        List<Tuple2<byte[], byte[]>> expectedKeyValues =
+                getKeyValuePairs(
+                        genKvRecords(
+                                Tuple2.of("k1", new Object[] {3, "c"}),
+                                Tuple2.of("k3", new Object[] {4, "d"})));
+        verifyGetKeyValues(kvTablet, expectedKeyValues);
+
+        // Verify the core functionality: KvTablet successfully initialized 
despite broken snapshot
+        // The key test is that the system can handle broken snapshots and 
recover correctly
+
+        // Verify that we successfully simulated the broken snapshot condition
+        File metadataFile = new 
File(snapshot2.getMetadataFilePath().getPath());
+        assertThat(metadataFile.exists()).isFalse();
+    }
+
     @Test
     void testRestore(@TempDir Path snapshotKvTabletDirPath) throws Exception {
         TableBucket tableBucket = new TableBucket(DATA1_TABLE_ID_PK, 1);
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
index 55df33f6c..8f0b271bb 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
@@ -640,5 +640,13 @@ public class ReplicaTestBase {
         private void unchecked(ThrowingRunnable<?> throwingRunnable) {
             ThrowingRunnable.unchecked(throwingRunnable).run();
         }
+
+        @Override
+        public void handleSnapshotBroken(CompletedSnapshot snapshot) throws 
Exception {
+            // Remove the broken snapshot from the snapshot store (simulating 
ZK metadata removal)
+            testKvSnapshotStore.removeSnapshot(snapshot.getTableBucket(), 
snapshot.getSnapshotID());
+            // Discard the snapshot files async (similar to 
DefaultSnapshotContext implementation)
+            snapshot.discardAsync(executorService);
+        }
     }
 }

Reply via email to