This is an automated email from the ASF dual-hosted git repository. zakelly pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new fb5fd483f91 [FLINK-35570][Checkpoint] Consider PlaceholderStreamStateHandle in file merging fb5fd483f91 is described below commit fb5fd483f91f524c81130c0fc4125d3382a7ffdc Author: Zakelly <zakelly....@gmail.com> AuthorDate: Wed Jun 12 12:28:58 2024 +0800 [FLINK-35570][Checkpoint] Consider PlaceholderStreamStateHandle in file merging This fixes #24924 --- .../filemerging/FileMergingSnapshotManager.java | 7 ++++ .../FileMergingSnapshotManagerBase.java | 13 ++++++- .../runtime/state/CheckpointStreamFactory.java | 9 ++--- .../state/PlaceholderStreamStateHandle.java | 9 ++++- .../runtime/scheduler/SchedulerUtilsTest.java | 2 +- .../snapshot/RocksDBSnapshotStrategyBase.java | 5 ++- .../ResumeCheckpointManuallyITCase.java | 10 +++--- .../SnapshotFileMergingCompatibilityITCase.java | 42 ++++++++++++++++------ 8 files changed, 71 insertions(+), 26 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java index a4e066ad9c6..4a101be5674 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManager.java @@ -25,6 +25,7 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CheckpointedStateScope; +import org.apache.flink.runtime.state.PlaceholderStreamStateHandle; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.TaskStateManager; import org.apache.flink.runtime.state.filemerging.DirectoryStreamStateHandle; @@ -361,4 +362,10 @@ public interface FileMergingSnapshotManager extends Closeable { + '}'; } } + + static boolean isFileMergingHandle(StreamStateHandle handle) { + return (handle instanceof SegmentFileStateHandle) + || (handle instanceof PlaceholderStreamStateHandle + && ((PlaceholderStreamStateHandle) handle).isFileMerged()); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java index 097e3ee7b09..cceeb27cfe1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/filemerging/FileMergingSnapshotManagerBase.java @@ -571,7 +571,8 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps if (file != null) { file.advanceLastCheckpointId(checkpointId); } - } else if (stateHandle instanceof PlaceholderStreamStateHandle) { + } else if (stateHandle instanceof PlaceholderStreamStateHandle + && ((PlaceholderStreamStateHandle) stateHandle).isFileMerged()) { // Since the rocksdb state backend will leverage the PlaceholderStreamStateHandle, // the manager should recognize this. LogicalFile file = @@ -643,6 +644,16 @@ public abstract class FileMergingSnapshotManagerBase implements FileMergingSnaps if (file != null) { return file.getPhysicalFile().isCouldReuse(); } + } else if (stateHandle instanceof PlaceholderStreamStateHandle + && ((PlaceholderStreamStateHandle) stateHandle).isFileMerged()) { + // Since the rocksdb state backend will leverage the PlaceholderStreamStateHandle, + // the manager should recognize this. + LogicalFile file = + knownLogicalFiles.get( + new LogicalFileId(stateHandle.getStreamStateHandleID().getKeyString())); + if (file != null) { + return file.getPhysicalFile().isCouldReuse(); + } } // If a stateHandle is not of the type SegmentFileStateHandle or if its corresponding file // is not recognized by the fileMergingManager, it needs to be re-uploaded. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java index 59129758bcf..ff770cecc71 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.state; -import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager; import java.io.IOException; import java.util.Collection; @@ -91,13 +91,8 @@ public interface CheckpointStreamFactory { * @return true if it can be reused. */ default boolean couldReuseStateHandle(StreamStateHandle stateHandle) { - // By default, the CheckpointStreamFactory doesn't support snapshot-file-merging, so the // SegmentFileStateHandle type of stateHandle can not be reused. - if (stateHandle instanceof SegmentFileStateHandle) { - return false; - } - - return true; + return !FileMergingSnapshotManager.isFileMergingHandle(stateHandle); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java index a593ab0dbe2..ccd09e5eb62 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java @@ -35,10 +35,13 @@ public class PlaceholderStreamStateHandle implements StreamStateHandle { private final PhysicalStateHandleID physicalID; private final long stateSize; + private final boolean fileMerged; - public PlaceholderStreamStateHandle(PhysicalStateHandleID physicalID, long stateSize) { + public PlaceholderStreamStateHandle( + PhysicalStateHandleID physicalID, long stateSize, boolean fileMerged) { this.physicalID = physicalID; this.stateSize = stateSize; + this.fileMerged = fileMerged; } @Override @@ -67,4 +70,8 @@ public class PlaceholderStreamStateHandle implements StreamStateHandle { public long getStateSize() { return stateSize; } + + public boolean isFileMerged() { + return fileMerged; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java index 071476e619c..e5ce3166a88 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java @@ -114,7 +114,7 @@ class SchedulerUtilsTest { buildIncrementalHandle( localPath, new PlaceholderStreamStateHandle( - handle.getStreamStateHandleID(), handle.getStateSize()), + handle.getStreamStateHandleID(), handle.getStateSize(), false), backendId); newHandle.registerSharedStates(sharedStateRegistry, 1L); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java index f6bb541cf0d..38c7076638d 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/snapshot/RocksDBSnapshotStrategyBase.java @@ -24,6 +24,7 @@ import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDb import org.apache.flink.core.fs.CloseableRegistry; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.runtime.checkpoint.filemerging.FileMergingSnapshotManager; import org.apache.flink.runtime.state.CheckpointStreamFactory; import org.apache.flink.runtime.state.CheckpointStreamWithResultProvider; import org.apache.flink.runtime.state.CheckpointedStateScope; @@ -418,7 +419,9 @@ public abstract class RocksDBSnapshotStrategyBase<K, R extends SnapshotResources // (created from a previous checkpoint). return Optional.of( new PlaceholderStreamStateHandle( - handle.getStreamStateHandleID(), handle.getStateSize())); + handle.getStreamStateHandleID(), + handle.getStateSize(), + FileMergingSnapshotManager.isFileMergingHandle(handle))); } else { // Don't use any uploaded but not confirmed handles because they might be deleted // (by TM) if the previous checkpoint failed. See FLINK-25395 diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java index 32c1b41b3a5..9e862a8cc56 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java @@ -402,8 +402,9 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { MiniClusterWithClientResource cluster, RestoreMode restoreMode) throws Exception { + // complete at least two checkpoints so that the initial checkpoint can be subsumed return runJobAndGetExternalizedCheckpoint( - backend, externalCheckpoint, cluster, restoreMode, new Configuration()); + backend, externalCheckpoint, cluster, restoreMode, new Configuration(), 2); } static String runJobAndGetExternalizedCheckpoint( @@ -411,7 +412,8 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { @Nullable String externalCheckpoint, MiniClusterWithClientResource cluster, RestoreMode restoreMode, - Configuration jobConfig) + Configuration jobConfig, + int consecutiveCheckpoints) throws Exception { JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint, restoreMode, jobConfig); NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(PARALLELISM); @@ -420,8 +422,8 @@ public class ResumeCheckpointManuallyITCase extends TestLogger { // wait until all sources have been started NotifyingInfiniteTupleSource.countDownLatch.await(); - // complete at least two checkpoints so that the initial checkpoint can be subsumed - waitForCheckpoint(initialJobGraph.getJobID(), cluster.getMiniCluster(), 2); + waitForCheckpoint( + initialJobGraph.getJobID(), cluster.getMiniCluster(), consecutiveCheckpoints); cluster.getClusterClient().cancel(initialJobGraph.getJobID()).get(); waitUntilJobCanceled(initialJobGraph.getJobID(), cluster.getClusterClient()); diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java index a6e9783236f..ed759673777 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java @@ -33,7 +33,6 @@ import org.apache.flink.test.util.MiniClusterWithClientResource; import org.apache.flink.test.util.TestUtils; import org.apache.flink.util.TestLogger; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -90,6 +89,9 @@ public class SnapshotFileMergingCompatibilityITCase extends TestLogger { boolean fileMergingAcrossBoundary) throws Exception { final Configuration config = new Configuration(); + // Wait for 4 checkpoints each round to subsume the original one and produce the + // PlaceholderStreamStateHandle in the final round + final int consecutiveCheckpoint = 4; config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toUri().toString()); config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, true); config.set(CheckpointingOptions.FILE_MERGING_ACROSS_BOUNDARY, fileMergingAcrossBoundary); @@ -108,7 +110,12 @@ public class SnapshotFileMergingCompatibilityITCase extends TestLogger { try { firstCheckpoint = runJobAndGetExternalizedCheckpoint( - stateBackend1, null, firstCluster, restoreMode, config); + stateBackend1, + null, + firstCluster, + restoreMode, + config, + consecutiveCheckpoint); assertThat(firstCheckpoint).isNotNull(); verifyStateHandleType(firstCheckpoint, firstFileMergingSwitch); } finally { @@ -130,7 +137,12 @@ public class SnapshotFileMergingCompatibilityITCase extends TestLogger { try { secondCheckpoint = runJobAndGetExternalizedCheckpoint( - stateBackend2, firstCheckpoint, secondCluster, restoreMode, config); + stateBackend2, + firstCheckpoint, + secondCluster, + restoreMode, + config, + consecutiveCheckpoint); assertThat(secondCheckpoint).isNotNull(); verifyStateHandleType(secondCheckpoint, secondFileMergingSwitch); } finally { @@ -150,7 +162,12 @@ public class SnapshotFileMergingCompatibilityITCase extends TestLogger { try { String thirdCheckpoint = runJobAndGetExternalizedCheckpoint( - stateBackend3, secondCheckpoint, thirdCluster, restoreMode, config); + stateBackend3, + secondCheckpoint, + thirdCluster, + restoreMode, + config, + consecutiveCheckpoint); assertThat(thirdCheckpoint).isNotNull(); verifyStateHandleType(thirdCheckpoint, secondFileMergingSwitch); } finally { @@ -167,22 +184,25 @@ public class SnapshotFileMergingCompatibilityITCase extends TestLogger { // Check keyed state handle List<KeyedStateHandle> keyedStateHandles = new ArrayList<>(subtaskState.getManagedKeyedState()); - keyedStateHandles.addAll(subtaskState.getRawKeyedState()); for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { - Assertions.assertInstanceOf( - IncrementalRemoteKeyedStateHandle.class, keyedStateHandle); + assertThat(keyedStateHandle) + .isInstanceOf(IncrementalRemoteKeyedStateHandle.class); ((IncrementalRemoteKeyedStateHandle) keyedStateHandle) .streamSubHandles() .forEach( handle -> { - Assertions.assertEquals( - fileMergingEnabled, - handle instanceof SegmentFileStateHandle); + if (fileMergingEnabled) { + assertThat(handle) + .isInstanceOf(SegmentFileStateHandle.class); + } else { + assertThat(handle) + .isNotInstanceOf(SegmentFileStateHandle.class); + } }); hasKeyedState = true; } } } - Assertions.assertTrue(hasKeyedState); + assertThat(hasKeyedState).isTrue(); } }