This is an automated email from the ASF dual-hosted git repository.
gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 937573b1907 [FLINK-39513][checkpointing] Parameterize
allowNonRestoredState in restoreInitialCheckpointIfPresent
937573b1907 is described below
commit 937573b1907e41c0c05f0191ea91f438a5a369e1
Author: Omid Hemmati <[email protected]>
AuthorDate: Tue Apr 28 01:18:47 2026 -0400
[FLINK-39513][checkpointing] Parameterize allowNonRestoredState in
restoreInitialCheckpointIfPresent
---
.../runtime/checkpoint/CheckpointCoordinator.java | 7 ++-
.../scheduler/DefaultExecutionGraphFactory.java | 4 +-
.../CheckpointCoordinatorRestoringTest.java | 64 +++++++++++++++++++++-
3 files changed, 70 insertions(+), 5 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index bff3baa54c0..2a86a566b1f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1720,9 +1720,12 @@ public class CheckpointCoordinator {
*
* @param tasks Set of job vertices to restore. State for these vertices
is restored via {@link
* Execution#setInitialState(JobManagerTaskRestore)}.
+ * @param allowNonRestoredState Allow checkpoint state that cannot be
mapped to any job vertex
+ * in tasks.
* @return True, if a checkpoint was found and its state was restored,
false otherwise.
*/
- public boolean restoreInitialCheckpointIfPresent(final
Set<ExecutionJobVertex> tasks)
+ public boolean restoreInitialCheckpointIfPresent(
+ final Set<ExecutionJobVertex> tasks, final boolean
allowNonRestoredState)
throws Exception {
final OptionalLong restoredCheckpointId =
restoreLatestCheckpointedStateInternal(
@@ -1730,7 +1733,7 @@ public class CheckpointCoordinator {
OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT,
false, // initial checkpoints exist only on JobManager
failover. ok if not
// present.
- false,
+ allowNonRestoredState,
true); // JobManager failover means JobGraphs match
exactly.
return restoredCheckpointId.isPresent();
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
index 7d84eebdc25..1c1c7b5c0b1 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
@@ -19,6 +19,7 @@
package org.apache.flink.runtime.scheduler;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateRecoveryOptions;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -186,7 +187,8 @@ public class DefaultExecutionGraphFactory implements
ExecutionGraphFactory {
if (checkpointCoordinator != null) {
// check whether we find a valid checkpoint
if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
- new
HashSet<>(newExecutionGraph.getAllVertices().values()))) {
+ new HashSet<>(newExecutionGraph.getAllVertices().values()),
+
configuration.get(StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE))) {
// check whether we can restore from a savepoint
tryRestoreExecutionGraphFromSavepoint(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index 4d9ba0ecb5e..1768e1391b2 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -1113,7 +1113,7 @@ class CheckpointCoordinatorRestoringTest {
.build(graph);
ExecutionJobVertex vertex = graph.getJobVertex(jobVertexID);
- coord.restoreInitialCheckpointIfPresent(Collections.singleton(vertex));
+ coord.restoreInitialCheckpointIfPresent(Collections.singleton(vertex),
false);
TaskStateSnapshot restoredState =
vertex.getTaskVertices()[0]
.getCurrentExecutionAttempt()
@@ -1159,12 +1159,72 @@ class CheckpointCoordinatorRestoringTest {
})
.build(graph);
restoreCoordinator.restoreInitialCheckpointIfPresent(
- new HashSet<>(graph.getAllVertices().values()));
+ new HashSet<>(graph.getAllVertices().values()), false);
assertThat(checked.get())
.as("The finished states should be checked when job is
restored on startup")
.isTrue();
}
+ @Test
+ void testRestoreInitialCheckpointAllowsNonRestoredStateWhenTrue() throws
Exception {
+ Tuple2<CheckpointCoordinator, ExecutionJobVertex> fixture =
buildNonRestoredStateFixture();
+
+ assertThat(
+ fixture.f0.restoreInitialCheckpointIfPresent(
+ Collections.singleton(fixture.f1), true))
+ .isTrue();
+ }
+
+ @Test
+ void testRestoreInitialCheckpointRejectsNonRestoredStateWhenFalse() throws
Exception {
+ Tuple2<CheckpointCoordinator, ExecutionJobVertex> fixture =
buildNonRestoredStateFixture();
+
+ assertThatThrownBy(
+ () ->
+ fixture.f0.restoreInitialCheckpointIfPresent(
+ Collections.singleton(fixture.f1),
false))
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessageContaining("There is no operator for the state");
+ }
+
+ private Tuple2<CheckpointCoordinator, ExecutionJobVertex>
buildNonRestoredStateFixture()
+ throws Exception {
+ final JobVertexID jobVertexID = new JobVertexID();
+ ExecutionGraph graph =
+ new
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+ .addJobVertex(jobVertexID, 1, 1)
+ .build(EXECUTOR_RESOURCE.getExecutor());
+
+ OperatorID orphanedOperatorID = new OperatorID();
+ Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
+ operatorStates.put(
+ orphanedOperatorID, new OperatorState(null, null,
orphanedOperatorID, 1, 1));
+
+ CompletedCheckpoint completedCheckpoint =
+ new CompletedCheckpoint(
+ graph.getJobID(),
+ 1,
+ System.currentTimeMillis(),
+ System.currentTimeMillis() + 3000,
+ operatorStates,
+ Collections.emptyList(),
+ CheckpointProperties.forCheckpoint(
+
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+ new TestCompletedCheckpointStorageLocation(),
+ null);
+
+ CompletedCheckpointStore completedCheckpointStore = new
EmbeddedCompletedCheckpointStore();
+ completedCheckpointStore.addCheckpointAndSubsumeOldestOne(
+ completedCheckpoint, new CheckpointsCleaner(), () -> {});
+
+ CheckpointCoordinator coord =
+ new CheckpointCoordinatorBuilder()
+ .setCompletedCheckpointStore(completedCheckpointStore)
+ .build(graph);
+
+ return Tuple2.of(coord, graph.getJobVertex(jobVertexID));
+ }
+
@Test
void testJobGraphModificationsAreCheckedForSavepoint() throws Exception {
final JobVertexID jobVertexID = new JobVertexID();