This is an automated email from the ASF dual-hosted git repository.

zakelly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new fb5fd483f91 [FLINK-35570][Checkpoint] Consider 
PlaceholderStreamStateHandle in file merging
fb5fd483f91 is described below

commit fb5fd483f91f524c81130c0fc4125d3382a7ffdc
Author: Zakelly <zakelly....@gmail.com>
AuthorDate: Wed Jun 12 12:28:58 2024 +0800

    [FLINK-35570][Checkpoint] Consider PlaceholderStreamStateHandle in file 
merging
    
    This fixes #24924
---
 .../filemerging/FileMergingSnapshotManager.java    |  7 ++++
 .../FileMergingSnapshotManagerBase.java            | 13 ++++++-
 .../runtime/state/CheckpointStreamFactory.java     |  9 ++---
 .../state/PlaceholderStreamStateHandle.java        |  9 ++++-
 .../runtime/scheduler/SchedulerUtilsTest.java      |  2 +-
 .../snapshot/RocksDBSnapshotStrategyBase.java      |  5 ++-
 .../ResumeCheckpointManuallyITCase.java            | 10 +++---
 .../SnapshotFileMergingCompatibilityITCase.java    | 42 ++++++++++++++++------
 8 files changed, 71 insertions(+), 26 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
index a4e066ad9c6..4a101be5674 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle;
@@ -361,4 +362,10 @@ public interface FileMergingSnapshotManager extends 
Closeable {
                     + '}';
         }
     }
+
+    static boolean isFileMergingHandle(StreamStateHandle handle) {
+        return (handle instanceof SegmentFileStateHandle)
+                || (handle instanceof PlaceholderStreamStateHandle
+                        && ((PlaceholderStreamStateHandle) 
handle).isFileMerged());
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
index 097e3ee7b09..cceeb27cfe1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java
@@ -571,7 +571,8 @@ public abstract class FileMergingSnapshotManagerBase 
implements FileMergingSnaps
                 if (file != null) {
                     file.advanceLastCheckpointId(checkpointId);
                 }
-            } else if (stateHandle instanceof PlaceholderStreamStateHandle) {
+            } else if (stateHandle instanceof PlaceholderStreamStateHandle
+                    && ((PlaceholderStreamStateHandle) 
stateHandle).isFileMerged()) {
                 // Since the rocksdb state backend will leverage the 
PlaceholderStreamStateHandle,
                 // the manager should recognize this.
                 LogicalFile file =
@@ -643,6 +644,16 @@ public abstract class FileMergingSnapshotManagerBase 
implements FileMergingSnaps
             if (file != null) {
                 return file.getPhysicalFile().isCouldReuse();
             }
+        } else if (stateHandle instanceof PlaceholderStreamStateHandle
+                && ((PlaceholderStreamStateHandle) 
stateHandle).isFileMerged()) {
+            // Since the rocksdb state backend will leverage the 
PlaceholderStreamStateHandle,
+            // the manager should recognize this.
+            LogicalFile file =
+                    knownLogicalFiles.get(
+                            new 
LogicalFileId(stateHandle.getStreamStateHandleID().getKeyString()));
+            if (file != null) {
+                return file.getPhysicalFile().isCouldReuse();
+            }
         }
         // If a stateHandle is not of the type SegmentFileStateHandle or if 
its corresponding file
         // is not recognized by the fileMergingManager, it needs to be 
re-uploaded.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
index 59129758bcf..ff770cecc71 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -91,13 +91,8 @@ public interface CheckpointStreamFactory {
      * @return true if it can be reused.
      */
     default boolean couldReuseStateHandle(StreamStateHandle stateHandle) {
-
         // By default, the CheckpointStreamFactory doesn't support 
snapshot-file-merging, so the
         // SegmentFileStateHandle type of stateHandle can not be reused.
-        if (stateHandle instanceof SegmentFileStateHandle) {
-            return false;
-        }
-
-        return true;
+        return !FileMergingSnapshotManager.isFileMergingHandle(stateHandle);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
index a593ab0dbe2..ccd09e5eb62 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
@@ -35,10 +35,13 @@ public class PlaceholderStreamStateHandle implements 
StreamStateHandle {
 
     private final PhysicalStateHandleID physicalID;
     private final long stateSize;
+    private final boolean fileMerged;
 
-    public PlaceholderStreamStateHandle(PhysicalStateHandleID physicalID, long 
stateSize) {
+    public PlaceholderStreamStateHandle(
+            PhysicalStateHandleID physicalID, long stateSize, boolean 
fileMerged) {
         this.physicalID = physicalID;
         this.stateSize = stateSize;
+        this.fileMerged = fileMerged;
     }
 
     @Override
@@ -67,4 +70,8 @@ public class PlaceholderStreamStateHandle implements 
StreamStateHandle {
     public long getStateSize() {
         return stateSize;
     }
+
+    public boolean isFileMerged() {
+        return fileMerged;
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
index 071476e619c..e5ce3166a88 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
@@ -114,7 +114,7 @@ class SchedulerUtilsTest {
                 buildIncrementalHandle(
                         localPath,
                         new PlaceholderStreamStateHandle(
-                                handle.getStreamStateHandleID(), 
handle.getStateSize()),
+                                handle.getStreamStateHandleID(), 
handle.getStateSize(), false),
                         backendId);
         newHandle.registerSharedStates(sharedStateRegistry, 1L);
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
index f6bb541cf0d..38c7076638d 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDb
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import 
org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider;
 import org.apache.flink.runtime.state.CheckpointedStateScope;
@@ -418,7 +419,9 @@ public abstract class RocksDBSnapshotStrategyBase<K, R 
extends SnapshotResources
                 // (created from a previous checkpoint).
                 return Optional.of(
                         new PlaceholderStreamStateHandle(
-                                handle.getStreamStateHandleID(), 
handle.getStateSize()));
+                                handle.getStreamStateHandleID(),
+                                handle.getStateSize(),
+                                
FileMergingSnapshotManager.isFileMergingHandle(handle)));
             } else {
                 // Don't use any uploaded but not confirmed handles because 
they might be deleted
                 // (by TM) if the previous checkpoint failed. See FLINK-25395
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index 32c1b41b3a5..9e862a8cc56 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -402,8 +402,9 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
             MiniClusterWithClientResource cluster,
             RestoreMode restoreMode)
             throws Exception {
+        // complete at least two checkpoints so that the initial checkpoint 
can be subsumed
         return runJobAndGetExternalizedCheckpoint(
-                backend, externalCheckpoint, cluster, restoreMode, new 
Configuration());
+                backend, externalCheckpoint, cluster, restoreMode, new 
Configuration(), 2);
     }
 
     static String runJobAndGetExternalizedCheckpoint(
@@ -411,7 +412,8 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
             @Nullable String externalCheckpoint,
             MiniClusterWithClientResource cluster,
             RestoreMode restoreMode,
-            Configuration jobConfig)
+            Configuration jobConfig,
+            int consecutiveCheckpoints)
             throws Exception {
         JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint, 
restoreMode, jobConfig);
         NotifyingInfiniteTupleSource.countDownLatch = new 
CountDownLatch(PARALLELISM);
@@ -420,8 +422,8 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
         // wait until all sources have been started
         NotifyingInfiniteTupleSource.countDownLatch.await();
 
-        // complete at least two checkpoints so that the initial checkpoint 
can be subsumed
-        waitForCheckpoint(initialJobGraph.getJobID(), 
cluster.getMiniCluster(), 2);
+        waitForCheckpoint(
+                initialJobGraph.getJobID(), cluster.getMiniCluster(), 
consecutiveCheckpoints);
         cluster.getClusterClient().cancel(initialJobGraph.getJobID()).get();
         waitUntilJobCanceled(initialJobGraph.getJobID(), 
cluster.getClusterClient());
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java
index a6e9783236f..ed759673777 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java
@@ -33,7 +33,6 @@ import 
org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -90,6 +89,9 @@ public class SnapshotFileMergingCompatibilityITCase extends 
TestLogger {
             boolean fileMergingAcrossBoundary)
             throws Exception {
         final Configuration config = new Configuration();
+        // Wait for 4 checkpoints each round to subsume the original one and 
produce the
+        // PlaceholderStreamStateHandle in the final round
+        final int consecutiveCheckpoint = 4;
         config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toUri().toString());
         config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true);
         config.set(CheckpointingOptions.FILE_MERGING_ACROSS_BOUNDARY, 
fileMergingAcrossBoundary);
@@ -108,7 +110,12 @@ public class SnapshotFileMergingCompatibilityITCase 
extends TestLogger {
         try {
             firstCheckpoint =
                     runJobAndGetExternalizedCheckpoint(
-                            stateBackend1, null, firstCluster, restoreMode, 
config);
+                            stateBackend1,
+                            null,
+                            firstCluster,
+                            restoreMode,
+                            config,
+                            consecutiveCheckpoint);
             assertThat(firstCheckpoint).isNotNull();
             verifyStateHandleType(firstCheckpoint, firstFileMergingSwitch);
         } finally {
@@ -130,7 +137,12 @@ public class SnapshotFileMergingCompatibilityITCase 
extends TestLogger {
         try {
             secondCheckpoint =
                     runJobAndGetExternalizedCheckpoint(
-                            stateBackend2, firstCheckpoint, secondCluster, 
restoreMode, config);
+                            stateBackend2,
+                            firstCheckpoint,
+                            secondCluster,
+                            restoreMode,
+                            config,
+                            consecutiveCheckpoint);
             assertThat(secondCheckpoint).isNotNull();
             verifyStateHandleType(secondCheckpoint, secondFileMergingSwitch);
         } finally {
@@ -150,7 +162,12 @@ public class SnapshotFileMergingCompatibilityITCase 
extends TestLogger {
         try {
             String thirdCheckpoint =
                     runJobAndGetExternalizedCheckpoint(
-                            stateBackend3, secondCheckpoint, thirdCluster, 
restoreMode, config);
+                            stateBackend3,
+                            secondCheckpoint,
+                            thirdCluster,
+                            restoreMode,
+                            config,
+                            consecutiveCheckpoint);
             assertThat(thirdCheckpoint).isNotNull();
             verifyStateHandleType(thirdCheckpoint, secondFileMergingSwitch);
         } finally {
@@ -167,22 +184,25 @@ public class SnapshotFileMergingCompatibilityITCase 
extends TestLogger {
                 // Check keyed state handle
                 List<KeyedStateHandle> keyedStateHandles =
                         new ArrayList<>(subtaskState.getManagedKeyedState());
-                keyedStateHandles.addAll(subtaskState.getRawKeyedState());
                 for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
-                    Assertions.assertInstanceOf(
-                            IncrementalRemoteKeyedStateHandle.class, 
keyedStateHandle);
+                    assertThat(keyedStateHandle)
+                            
.isInstanceOf(IncrementalRemoteKeyedStateHandle.class);
                     ((IncrementalRemoteKeyedStateHandle) keyedStateHandle)
                             .streamSubHandles()
                             .forEach(
                                     handle -> {
-                                        Assertions.assertEquals(
-                                                fileMergingEnabled,
-                                                handle instanceof 
SegmentFileStateHandle);
+                                        if (fileMergingEnabled) {
+                                            assertThat(handle)
+                                                    
.isInstanceOf(SegmentFileStateHandle.class);
+                                        } else {
+                                            assertThat(handle)
+                                                    
.isNotInstanceOf(SegmentFileStateHandle.class);
+                                        }
                                     });
                     hasKeyedState = true;
                 }
             }
         }
-        Assertions.assertTrue(hasKeyedState);
+        assertThat(hasKeyedState).isTrue();
     }
 }

Reply via email to