This is an automated email from the ASF dual-hosted git repository.
roman pushed a commit to branch release-1.19
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.19 by this push:
new bd916bd89e7 [FLINK-38574][checkpoint] Avoid reusing re-uploaded sst
files when checkpoint notification is delayed (#27157)
bd916bd89e7 is described below
commit bd916bd89e739504f63debb4d155a709f3b0569b
Author: Zakelly <[email protected]>
AuthorDate: Fri Oct 31 17:22:43 2025 +0800
[FLINK-38574][checkpoint] Avoid reusing re-uploaded sst files when
checkpoint notification is delayed (#27157)
---
.../snapshot/RocksDBSnapshotStrategyBase.java | 90 ++++++++++++++++++++--
.../snapshot/RocksIncrementalSnapshotStrategy.java | 16 ++--
.../RocksIncrementalSnapshotStrategyTest.java | 35 +++++++++
3 files changed, 127 insertions(+), 14 deletions(-)
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
index 9cb73d01dc9..366b2438179 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
@@ -67,6 +67,7 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.SortedMap;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -392,22 +393,94 @@ public abstract class RocksDBSnapshotStrategyBase<K, R
extends SnapshotResources
}
protected static final PreviousSnapshot EMPTY_PREVIOUS_SNAPSHOT =
- new PreviousSnapshot(Collections.emptyList());
+ new PreviousSnapshot(null, -1L);
/** Previous snapshot with uploaded sst files. */
protected static class PreviousSnapshot {
@Nonnull private final Map<String, StreamStateHandle>
confirmedSstFiles;
- protected PreviousSnapshot(@Nullable Collection<HandleAndLocalPath>
confirmedSstFiles) {
+ /**
+ * Constructor of PreviousSnapshot. Giving a map of uploaded sst files
in previous
+ * checkpoints, prune the sst files which have been re-uploaded in the
following
+ * checkpoints. The prune logic is used to resolve the mismatch
between TM and JM due to
+ * notification delay. Following steps for example:
+ *
+ * <ul>
+ * <li>1) checkpoint 1 uses file 00001.SST uploaded as xxx.sst.
+ * <li>2) checkpoint 2 uses the same file 00001.SST but re-uploads
it as yyy.sst because
+ * CP 1 wasn't yet confirmed.
+ * <li>3) TM get a confirmation of checkpoint 1.
+ * <li>4) JM completes checkpoint 2 and subsumes checkpoint 1 -
removing xxx.sst.
+ * <li>5) checkpoint 3 tries to re-use file 00001.SST uploaded as
xxx.sst in checkpoint 1,
+ * but it was deleted in (4) by JM.
+ * </ul>
+ *
+ * @param currentUploadedSstFiles the sst files uploaded in previous
checkpoints.
+ * @param lastCompletedCheckpoint the last completed checkpoint id.
+ */
+ protected PreviousSnapshot(
+ @Nullable SortedMap<Long, Collection<HandleAndLocalPath>>
currentUploadedSstFiles,
+ long lastCompletedCheckpoint) {
this.confirmedSstFiles =
- confirmedSstFiles != null
- ? confirmedSstFiles.stream()
+ currentUploadedSstFiles != null
+ ? pruneFirstCheckpointSstFiles(
+ currentUploadedSstFiles,
lastCompletedCheckpoint)
+ : Collections.emptyMap();
+ }
+
+ /**
+ * The last completed checkpoint's uploaded sst files are all
included, then for each
+ * following checkpoint, if a sst file has been re-uploaded, remove it
from the first
+ * checkpoint's sst files.
+ *
+ * @param currentUploadedSstFiles the sst files uploaded in the
following checkpoint.
+ * @param lastCompletedCheckpoint the last completed checkpoint id.
+ */
+ private Map<String, StreamStateHandle> pruneFirstCheckpointSstFiles(
+ @Nonnull SortedMap<Long, Collection<HandleAndLocalPath>>
currentUploadedSstFiles,
+ long lastCompletedCheckpoint) {
+ Map<String, StreamStateHandle> prunedSstFiles = null;
+ int removedCount = 0;
+ for (Map.Entry<Long, Collection<HandleAndLocalPath>> entry :
+ currentUploadedSstFiles.entrySet()) {
+ // Iterate checkpoints in ascending order of checkpoint id.
+ if (entry.getKey() == lastCompletedCheckpoint) {
+ // The first checkpoint's uploaded sst files are all
included.
+ prunedSstFiles =
+ entry.getValue().stream()
.collect(
Collectors.toMap(
HandleAndLocalPath::getLocalPath,
-
HandleAndLocalPath::getHandle))
- : Collections.emptyMap();
+
HandleAndLocalPath::getHandle));
+ } else if (prunedSstFiles == null) {
+ // The last completed checkpoint's uploaded sst files are
not existed.
+ // So we skip the pruning process.
+ break;
+ } else if (!prunedSstFiles.isEmpty()) {
+ // Prune sst files which have been re-uploaded in the
following checkpoints.
+ for (HandleAndLocalPath handleAndLocalPath :
entry.getValue()) {
+ if (!(handleAndLocalPath.getHandle()
+ instanceof PlaceholderStreamStateHandle)) {
+ // If it's not a placeholder handle, it means the
sst file has been
+ // re-uploaded in the following checkpoint.
+ if
(prunedSstFiles.remove(handleAndLocalPath.getLocalPath()) != null) {
+ removedCount++;
+ }
+ }
+ }
+ }
+ }
+ if (removedCount > 0 && LOG.isTraceEnabled()) {
+ LOG.trace(
+ "Removed {} re-uploaded sst files from base file set
for incremental "
+ + "checkpoint. Base checkpoint id: {}",
+ removedCount,
+ currentUploadedSstFiles.firstKey());
+ }
+ return (prunedSstFiles != null && !prunedSstFiles.isEmpty())
+ ? Collections.unmodifiableMap(prunedSstFiles)
+ : Collections.emptyMap();
}
protected Optional<StreamStateHandle> getUploaded(String filename) {
@@ -425,5 +498,10 @@ public abstract class RocksDBSnapshotStrategyBase<K, R
extends SnapshotResources
return Optional.empty();
}
}
+
+ @Override
+ public String toString() {
+ return "PreviousSnapshot{" + "confirmedSstFiles=" +
confirmedSstFiles + '}';
+ }
}
}
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
index 436a0f2ec1c..404d671cc38 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategy.java
@@ -195,30 +195,29 @@ public class RocksIncrementalSnapshotStrategy<K>
long checkpointId, @Nonnull List<StateMetaInfoSnapshot>
stateMetaInfoSnapshots) {
final long lastCompletedCheckpoint;
- final Collection<HandleAndLocalPath> confirmedSstFiles;
+ final SortedMap<Long, Collection<HandleAndLocalPath>>
currentUploadedSstFiles;
// use the last completed checkpoint as the comparison base.
synchronized (uploadedSstFiles) {
lastCompletedCheckpoint = lastCompletedCheckpointId;
- confirmedSstFiles = uploadedSstFiles.get(lastCompletedCheckpoint);
- LOG.trace(
- "Use confirmed SST files for checkpoint {}: {}",
- checkpointId,
- confirmedSstFiles);
+ currentUploadedSstFiles =
+ new
TreeMap<>(uploadedSstFiles.tailMap(lastCompletedCheckpoint));
}
+ PreviousSnapshot previousSnapshot =
+ new PreviousSnapshot(currentUploadedSstFiles,
lastCompletedCheckpoint);
LOG.trace(
"Taking incremental snapshot for checkpoint {}. Snapshot is
based on last completed checkpoint {} "
+ "assuming the following (shared) confirmed files as
base: {}.",
checkpointId,
lastCompletedCheckpoint,
- confirmedSstFiles);
+ previousSnapshot);
// snapshot meta data to save
for (Map.Entry<String, RocksDbKvStateInfo> stateMetaInfoEntry :
kvStateInformation.entrySet()) {
stateMetaInfoSnapshots.add(stateMetaInfoEntry.getValue().metaInfo.snapshot());
}
- return new PreviousSnapshot(confirmedSstFiles);
+ return previousSnapshot;
}
/**
@@ -343,6 +342,7 @@ public class RocksIncrementalSnapshotStrategy<K>
List<Path> miscFilePaths = new ArrayList<>(files.length);
createUploadFilePaths(files, sstFiles, sstFilePaths,
miscFilePaths);
+ LOG.info("Will re-use {} SST files. {}", sstFiles.size(),
sstFiles);
final CheckpointedStateScope stateScope =
sharingFilesStrategy ==
SnapshotType.SharingFilesStrategy.NO_SHARING
diff --git
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
index c4cb7058421..899708d12d5 100644
---
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
+++
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/snapshot/RocksIncrementalSnapshotStrategyTest.java
@@ -96,6 +96,41 @@ class RocksIncrementalSnapshotStrategyTest {
}
}
+ @Test
+ void testCheckpointIsIncrementalWithLateNotification() throws Exception {
+
+ try (CloseableRegistry closeableRegistry = new CloseableRegistry();
+ RocksIncrementalSnapshotStrategy<?> checkpointSnapshotStrategy
=
+ createSnapshotStrategy()) {
+ FsCheckpointStreamFactory checkpointStreamFactory =
createFsCheckpointStreamFactory();
+
+ // make and checkpoint with id 1
+ snapshot(1L, checkpointSnapshotStrategy, checkpointStreamFactory,
closeableRegistry);
+
+ // make and checkpoint with id 2
+ snapshot(2L, checkpointSnapshotStrategy, checkpointStreamFactory,
closeableRegistry);
+
+ // Late notify checkpoint with id 1
+ checkpointSnapshotStrategy.notifyCheckpointComplete(1L);
+
+ // make checkpoint with id 3, based on checkpoint 1
+ IncrementalRemoteKeyedStateHandle
incrementalRemoteKeyedStateHandle3 =
+ snapshot(
+ 3L,
+ checkpointSnapshotStrategy,
+ checkpointStreamFactory,
+ closeableRegistry);
+
+ // Late notify checkpoint with id 2
+ checkpointSnapshotStrategy.notifyCheckpointComplete(2L);
+
+ // 3rd checkpoint is based on 1st checkpoint, BUT the 2nd
checkpoint re-uploaded the 1st
+ // one, so it should be based on nothing, thus this is effectively
a full checkpoint.
+ assertThat(incrementalRemoteKeyedStateHandle3.getStateSize())
+
.isEqualTo(incrementalRemoteKeyedStateHandle3.getCheckpointedSize());
+ }
+ }
+
public RocksIncrementalSnapshotStrategy<?> createSnapshotStrategy()
throws IOException, RocksDBException {