This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.19 by this push:
     new bd916bd89e7 [FLINK-38574][checkpoint] Avoid reusing re-uploaded sst 
files when checkpoint notification is delayed (#27157)
bd916bd89e7 is described below

commit bd916bd89e739504f63debb4d155a709f3b0569b
Author: Zakelly <[email protected]>
AuthorDate: Fri Oct 31 17:22:43 2025 +0800

    [FLINK-38574][checkpoint] Avoid reusing re-uploaded sst files when 
checkpoint notification is delayed (#27157)
---
 .../snapshot/RocksDBSnapshotStrategyBase.java      | 90 ++++++++++++++++++++--
 .../snapshot/RocksIncrementalSnapshotStrategy.java | 16 ++--
 .../RocksIncrementalSnapshotStrategyTest.java      | 35 +++++++++
 3 files changed, 127 insertions(+), 14 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
index 9cb73d01dc9..366b2438179 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
@@ -67,6 +67,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.SortedMap;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
@@ -392,22 +393,94 @@ public abstract class RocksDBSnapshotStrategyBase<K, R 
extends SnapshotResources
     }
 
     protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
-            new PreviousSnapshot(Collections.emptyList());
+            new PreviousSnapshot(null, -1L);
 
     /** Previous snapshot with uploaded sst files. */
     protected static class PreviousSnapshot {
 
         @Nonnull private final Map<String, StreamStateHandle> 
confirmedSstFiles;
 
-        protected PreviousSnapshot(@Nullable Collection<HandleAndLocalPath> 
confirmedSstFiles) {
+        /**
+         * Constructor of PreviousSnapshot. Giving a map of uploaded sst files 
in previous
+         * checkpoints, prune the sst files which have been re-uploaded in the 
following
+         * checkpoints. The prune logic is used to resolve the mismatch 
between TM and JM due to
+         * notification delay. Following steps for example:
+         *
+         * <ul>
+         *   <li>1) checkpoint 1 uses file 00001.SST uploaded as xxx.sst.
+         *   <li>2) checkpoint 2 uses the same file 00001.SST but re-uploads 
it as yyy.sst because
+         *       CP 1 wasn't yet confirmed.
+         *   <li>3) TM get a confirmation of checkpoint 1.
+         *   <li>4) JM completes checkpoint 2 and subsumes checkpoint 1 - 
removing xxx.sst.
+         *   <li>5) checkpoint 3 tries to re-use file 00001.SST uploaded as 
xxx.sst in checkpoint 1,
+         *       but it was deleted in (4) by JM.
+         * </ul>
+         *
+         * @param currentUploadedSstFiles the sst files uploaded in previous 
checkpoints.
+         * @param lastCompletedCheckpoint the last completed checkpoint id.
+         */
+        protected PreviousSnapshot(
+                @Nullable SortedMap<Long, Collection<HandleAndLocalPath>> 
currentUploadedSstFiles,
+                long lastCompletedCheckpoint) {
             this.confirmedSstFiles =
-                    confirmedSstFiles != null
-                            ? confirmedSstFiles.stream()
+                    currentUploadedSstFiles != null
+                            ? pruneFirstCheckpointSstFiles(
+                                    currentUploadedSstFiles, 
lastCompletedCheckpoint)
+                            : Collections.emptyMap();
+        }
+
+        /**
+         * The last completed checkpoint's uploaded sst files are all 
included, then for each
+         * following checkpoint, if a sst file has been re-uploaded, remove it 
from the first
+         * checkpoint's sst files.
+         *
+         * @param currentUploadedSstFiles the sst files uploaded in the 
following checkpoint.
+         * @param lastCompletedCheckpoint the last completed checkpoint id.
+         */
+        private Map<String, StreamStateHandle> pruneFirstCheckpointSstFiles(
+                @Nonnull SortedMap<Long, Collection<HandleAndLocalPath>> 
currentUploadedSstFiles,
+                long lastCompletedCheckpoint) {
+            Map<String, StreamStateHandle> prunedSstFiles = null;
+            int removedCount = 0;
+            for (Map.Entry<Long, Collection<HandleAndLocalPath>> entry :
+                    currentUploadedSstFiles.entrySet()) {
+                // Iterate checkpoints in ascending order of checkpoint id.
+                if (entry.getKey() == lastCompletedCheckpoint) {
+                    // The first checkpoint's uploaded sst files are all 
included.
+                    prunedSstFiles =
+                            entry.getValue().stream()
                                     .collect(
                                             Collectors.toMap(
                                                     
HandleAndLocalPath::getLocalPath,
-                                                    
HandleAndLocalPath::getHandle))
-                            : Collections.emptyMap();
+                                                    
HandleAndLocalPath::getHandle));
+                } else if (prunedSstFiles == null) {
+                    // The last completed checkpoint's uploaded sst files are 
not existed.
+                    // So we skip the pruning process.
+                    break;
+                } else if (!prunedSstFiles.isEmpty()) {
+                    // Prune sst files which have been re-uploaded in the 
following checkpoints.
+                    for (HandleAndLocalPath handleAndLocalPath : 
entry.getValue()) {
+                        if (!(handleAndLocalPath.getHandle()
+                                instanceof PlaceholderStreamStateHandle)) {
+                            // If it's not a placeholder handle, it means the 
sst file has been
+                            // re-uploaded in the following checkpoint.
+                            if 
(prunedSstFiles.remove(handleAndLocalPath.getLocalPath()) != null) {
+                                removedCount++;
+                            }
+                        }
+                    }
+                }
+            }
+            if (removedCount > 0 && LOG.isTraceEnabled()) {
+                LOG.trace(
+                        "Removed {} re-uploaded sst files from base file set 
for incremental "
+                                + "checkpoint. Base checkpoint id: {}",
+                        removedCount,
+                        currentUploadedSstFiles.firstKey());
+            }
+            return (prunedSstFiles != null && !prunedSstFiles.isEmpty())
+                    ? Collections.unmodifiableMap(prunedSstFiles)
+                    : Collections.emptyMap();
         }
 
         protected Optional<StreamStateHandle> getUploaded(String filename) {
@@ -425,5 +498,10 @@ public abstract class RocksDBSnapshotStrategyBase<K, R 
extends SnapshotResources
                 return Optional.empty();
             }
         }
+
+        @Override
+        public String toString() {
+            return "PreviousSnapshot{" + "confirmedSstFiles=" + 
confirmedSstFiles + '}';
+        }
     }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
index 436a0f2ec1c..404d671cc38 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -195,30 +195,29 @@ public class RocksIncrementalSnapshotStrategy<K>
             long checkpointId, @Nonnull List<StateMetaInfoSnapshot> 
stateMetaInfoSnapshots) {
 
         final long lastCompletedCheckpoint;
-        final Collection<HandleAndLocalPath> confirmedSstFiles;
+        final SortedMap<Long, Collection<HandleAndLocalPath>> 
currentUploadedSstFiles;
 
         // use the last completed checkpoint as the comparison base.
         synchronized (uploadedSstFiles) {
             lastCompletedCheckpoint = lastCompletedCheckpointId;
-            confirmedSstFiles = uploadedSstFiles.get(lastCompletedCheckpoint);
-            LOG.trace(
-                    "Use confirmed SST files for checkpoint {}: {}",
-                    checkpointId,
-                    confirmedSstFiles);
+            currentUploadedSstFiles =
+                    new 
TreeMap<>(uploadedSstFiles.tailMap(lastCompletedCheckpoint));
         }
+        PreviousSnapshot previousSnapshot =
+                new PreviousSnapshot(currentUploadedSstFiles, 
lastCompletedCheckpoint);
         LOG.trace(
                 "Taking incremental snapshot for checkpoint {}. Snapshot is 
based on last completed checkpoint {} "
                         + "assuming the following (shared) confirmed files as 
base: {}.",
                 checkpointId,
                 lastCompletedCheckpoint,
-                confirmedSstFiles);
+                previousSnapshot);
 
         // snapshot meta data to save
         for (Map.Entry<String, RocksDbKvStateInfo> stateMetaInfoEntry :
                 kvStateInformation.entrySet()) {
             
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().metaInfo.snapshot());
         }
-        return new PreviousSnapshot(confirmedSstFiles);
+        return previousSnapshot;
     }
 
     /**
@@ -343,6 +342,7 @@ public class RocksIncrementalSnapshotStrategy<K>
                 List<Path> miscFilePaths = new ArrayList<>(files.length);
 
                 createUploadFilePaths(files, sstFiles, sstFilePaths, 
miscFilePaths);
+                LOG.info("Will re-use {} SST files. {}", sstFiles.size(), 
sstFiles);
 
                 final CheckpointedStateScope stateScope =
                         sharingFilesStrategy == 
SnapshotType.SharingFilesStrategy.NO_SHARING
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
index c4cb7058421..899708d12d5 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
@@ -96,6 +96,41 @@ class RocksIncrementalSnapshotStrategyTest {
         }
     }
 
+    @Test
+    void testCheckpointIsIncrementalWithLateNotification() throws Exception {
+
+        try (CloseableRegistry closeableRegistry = new CloseableRegistry();
+                RocksIncrementalSnapshotStrategy<?> checkpointSnapshotStrategy 
=
+                        createSnapshotStrategy()) {
+            FsCheckpointStreamFactory checkpointStreamFactory = 
createFsCheckpointStreamFactory();
+
+            // make and checkpoint with id 1
+            snapshot(1L, checkpointSnapshotStrategy, checkpointStreamFactory, 
closeableRegistry);
+
+            // make and checkpoint with id 2
+            snapshot(2L, checkpointSnapshotStrategy, checkpointStreamFactory, 
closeableRegistry);
+
+            // Late notify checkpoint with id 1
+            checkpointSnapshotStrategy.notifyCheckpointComplete(1L);
+
+            // make checkpoint with id 3, based on checkpoint 1
+            IncrementalRemoteKeyedStateHandle 
incrementalRemoteKeyedStateHandle3 =
+                    snapshot(
+                            3L,
+                            checkpointSnapshotStrategy,
+                            checkpointStreamFactory,
+                            closeableRegistry);
+
+            // Late notify checkpoint with id 2
+            checkpointSnapshotStrategy.notifyCheckpointComplete(2L);
+
+            // 3rd checkpoint is based on 1st checkpoint, BUT the 2nd 
checkpoint re-uploaded the 1st
+            // one, so it should be based on nothing, thus this is effectively 
a full checkpoint.
+            assertThat(incrementalRemoteKeyedStateHandle3.getStateSize())
+                    
.isEqualTo(incrementalRemoteKeyedStateHandle3.getCheckpointedSize());
+        }
+    }
+
     public RocksIncrementalSnapshotStrategy<?> createSnapshotStrategy()
             throws IOException, RocksDBException {
 

Reply via email to