This is an automated email from the ASF dual-hosted git repository.

gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 937573b1907 [FLINK-39513][checkpointing] Parameterize 
allowNonRestoredState in restoreInitialCheckpointIfPresent
937573b1907 is described below

commit 937573b1907e41c0c05f0191ea91f438a5a369e1
Author: Omid Hemmati <[email protected]>
AuthorDate: Tue Apr 28 01:18:47 2026 -0400

    [FLINK-39513][checkpointing] Parameterize allowNonRestoredState in 
restoreInitialCheckpointIfPresent
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |  7 ++-
 .../scheduler/DefaultExecutionGraphFactory.java    |  4 +-
 .../CheckpointCoordinatorRestoringTest.java        | 64 +++++++++++++++++++++-
 3 files changed, 70 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index bff3baa54c0..2a86a566b1f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1720,9 +1720,12 @@ public class CheckpointCoordinator {
      *
      * @param tasks Set of job vertices to restore. State for these vertices 
is restored via {@link
      *     Execution#setInitialState(JobManagerTaskRestore)}.
+     * @param allowNonRestoredState Allow checkpoint state that cannot be 
mapped to any job vertex
+     *     in tasks.
      * @return True, if a checkpoint was found and its state was restored, 
false otherwise.
      */
-    public boolean restoreInitialCheckpointIfPresent(final 
Set<ExecutionJobVertex> tasks)
+    public boolean restoreInitialCheckpointIfPresent(
+            final Set<ExecutionJobVertex> tasks, final boolean 
allowNonRestoredState)
             throws Exception {
         final OptionalLong restoredCheckpointId =
                 restoreLatestCheckpointedStateInternal(
@@ -1730,7 +1733,7 @@ public class CheckpointCoordinator {
                         
OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT,
                         false, // initial checkpoints exist only on JobManager 
failover. ok if not
                         // present.
-                        false,
+                        allowNonRestoredState,
                         true); // JobManager failover means JobGraphs match 
exactly.
 
         return restoredCheckpointId.isPresent();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
index 7d84eebdc25..1c1c7b5c0b1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.StateRecoveryOptions;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
@@ -186,7 +187,8 @@ public class DefaultExecutionGraphFactory implements 
ExecutionGraphFactory {
         if (checkpointCoordinator != null) {
             // check whether we find a valid checkpoint
             if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
-                    new 
HashSet<>(newExecutionGraph.getAllVertices().values()))) {
+                    new HashSet<>(newExecutionGraph.getAllVertices().values()),
+                    
configuration.get(StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE))) {
 
                 // check whether we can restore from a savepoint
                 tryRestoreExecutionGraphFromSavepoint(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index 4d9ba0ecb5e..1768e1391b2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -1113,7 +1113,7 @@ class CheckpointCoordinatorRestoringTest {
                         .build(graph);
 
         ExecutionJobVertex vertex = graph.getJobVertex(jobVertexID);
-        coord.restoreInitialCheckpointIfPresent(Collections.singleton(vertex));
+        coord.restoreInitialCheckpointIfPresent(Collections.singleton(vertex), 
false);
         TaskStateSnapshot restoredState =
                 vertex.getTaskVertices()[0]
                         .getCurrentExecutionAttempt()
@@ -1159,12 +1159,72 @@ class CheckpointCoordinatorRestoringTest {
                                         })
                         .build(graph);
         restoreCoordinator.restoreInitialCheckpointIfPresent(
-                new HashSet<>(graph.getAllVertices().values()));
+                new HashSet<>(graph.getAllVertices().values()), false);
         assertThat(checked.get())
                 .as("The finished states should be checked when job is 
restored on startup")
                 .isTrue();
     }
 
+    @Test
+    void testRestoreInitialCheckpointAllowsNonRestoredStateWhenTrue() throws 
Exception {
+        Tuple2<CheckpointCoordinator, ExecutionJobVertex> fixture = 
buildNonRestoredStateFixture();
+
+        assertThat(
+                        fixture.f0.restoreInitialCheckpointIfPresent(
+                                Collections.singleton(fixture.f1), true))
+                .isTrue();
+    }
+
+    @Test
+    void testRestoreInitialCheckpointRejectsNonRestoredStateWhenFalse() throws 
Exception {
+        Tuple2<CheckpointCoordinator, ExecutionJobVertex> fixture = 
buildNonRestoredStateFixture();
+
+        assertThatThrownBy(
+                        () ->
+                                fixture.f0.restoreInitialCheckpointIfPresent(
+                                        Collections.singleton(fixture.f1), 
false))
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("There is no operator for the state");
+    }
+
+    private Tuple2<CheckpointCoordinator, ExecutionJobVertex> 
buildNonRestoredStateFixture()
+            throws Exception {
+        final JobVertexID jobVertexID = new JobVertexID();
+        ExecutionGraph graph =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                        .addJobVertex(jobVertexID, 1, 1)
+                        .build(EXECUTOR_RESOURCE.getExecutor());
+
+        OperatorID orphanedOperatorID = new OperatorID();
+        Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
+        operatorStates.put(
+                orphanedOperatorID, new OperatorState(null, null, 
orphanedOperatorID, 1, 1));
+
+        CompletedCheckpoint completedCheckpoint =
+                new CompletedCheckpoint(
+                        graph.getJobID(),
+                        1,
+                        System.currentTimeMillis(),
+                        System.currentTimeMillis() + 3000,
+                        operatorStates,
+                        Collections.emptyList(),
+                        CheckpointProperties.forCheckpoint(
+                                
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
+                        new TestCompletedCheckpointStorageLocation(),
+                        null);
+
+        CompletedCheckpointStore completedCheckpointStore = new 
EmbeddedCompletedCheckpointStore();
+        completedCheckpointStore.addCheckpointAndSubsumeOldestOne(
+                completedCheckpoint, new CheckpointsCleaner(), () -> {});
+
+        CheckpointCoordinator coord =
+                new CheckpointCoordinatorBuilder()
+                        .setCompletedCheckpointStore(completedCheckpointStore)
+                        .build(graph);
+
+        return Tuple2.of(coord, graph.getJobVertex(jobVertexID));
+    }
+
     @Test
     void testJobGraphModificationsAreCheckedForSavepoint() throws Exception {
         final JobVertexID jobVertexID = new JobVertexID();

Reply via email to