This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c0936deaf99 [FLINK-26985][runtime] Don't discard shared state of 
restored checkpoints
c0936deaf99 is described below

commit c0936deaf99390fc727acc8633e3be22e62f4bf5
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Fri Apr 1 16:08:32 2022 +0200

    [FLINK-26985][runtime] Don't discard shared state of restored checkpoints
    
    Currently, in LEGACY restore mode, shared state of
    incremental checkpoints can be discarded regardless
    of whether they were created by this job or not.
    This invalidates the checkpoint from which the job
    was restored.
    
    The bug was introduced in FLINK-24611. Before that,
    reference count was maintained for each shared state
    entry; "initial" checkpoints did not decrement this
    count, preventing their shared state from being discarded.
    
    This change makes SharedStateRegistry to:
    1. Remember the max checkpiont ID encountered during recovery
    2. Associate each shared state entry with a checkpoint ID that created it
    3. Only discard the entry if its createdByCheckpointID > 
highestRetainCheckpointID
    
    (1) is called from:
    - CheckpointCoordinator.restoreSavepoint - to cover initial restore from a 
checkpoint
    - SharedStateFactory, when building checkpoint store - to cover the 
failover case
    (see DefaultExecutionGraphFactory.createAndRestoreExecutionGraph)
    
    Adjusting only the CheckpointCoordinator path isn't sufficient:
    - job recovers from an existing checkpoints, adds it to the store
    - a new checkpoint is created - with the default restore settings
    - a failure happens, job recovers from a newer checkpoint
    - when a newer checkpoint is subsumed, its (inherited) shared state
    might be deleted
---
 .../KubernetesCheckpointRecoveryFactory.java       |   7 +-
 .../flink/kubernetes/utils/KubernetesUtils.java    |   7 +-
 .../runtime/checkpoint/CheckpointCoordinator.java  |   3 +-
 .../checkpoint/CheckpointRecoveryFactory.java      |   5 +-
 .../runtime/checkpoint/CompletedCheckpoint.java    |   7 +-
 .../EmbeddedCompletedCheckpointStore.java          |  13 +-
 .../PerJobCheckpointRecoveryFactory.java           |  12 +-
 .../StandaloneCheckpointRecoveryFactory.java       |   9 +-
 .../StandaloneCompletedCheckpointStore.java        |  17 +-
 .../ZooKeeperCheckpointRecoveryFactory.java        |   7 +-
 .../cleanup/CheckpointResourcesCleanupRunner.java  |   7 +-
 .../EmbeddedHaServicesWithLeadershipControl.java   |   9 +-
 .../apache/flink/runtime/jobgraph/RestoreMode.java |   2 +
 .../runtime/jobgraph/SavepointConfigOptions.java   |   2 +-
 .../flink/runtime/scheduler/SchedulerUtils.java    |  14 +-
 .../flink/runtime/state/SharedStateRegistry.java   |  20 +-
 .../runtime/state/SharedStateRegistryFactory.java  |   8 +-
 .../runtime/state/SharedStateRegistryImpl.java     |  28 ++-
 .../apache/flink/runtime/util/ZooKeeperUtils.java  |   8 +-
 .../CheckpointCoordinatorFailureTest.java          |   3 +-
 .../CheckpointCoordinatorRestoringTest.java        |   6 +-
 .../checkpoint/CheckpointCoordinatorTest.java      | 262 +++++++++++----------
 .../checkpoint/CompletedCheckpointTest.java        |   5 +-
 .../DefaultCompletedCheckpointStoreTest.java       |   5 +-
 .../checkpoint/PerJobCheckpointRecoveryTest.java   |  13 +-
 .../TestingCheckpointRecoveryFactory.java          |   4 +-
 .../ZooKeeperCompletedCheckpointStoreITCase.java   |   4 +-
 .../ZooKeeperCompletedCheckpointStoreTest.java     |   4 +-
 .../dispatcher/DispatcherCleanupITCase.java        |  15 +-
 .../CheckpointResourcesCleanupRunnerTest.java      |   4 +-
 .../runtime/scheduler/SchedulerUtilsTest.java      |  13 +-
 .../flink/runtime/testutils/CommonTestUtils.java   |  33 ++-
 .../ResumeCheckpointManuallyITCase.java            | 119 +++++++---
 .../test/state/ChangelogCompatibilityITCase.java   |   2 +-
 .../flink/test/state/ChangelogRescalingITCase.java |   2 +-
 35 files changed, 452 insertions(+), 227 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
index ea78ecbbd28..7150034bbb8 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
@@ -25,6 +25,7 @@ import org.apache.flink.kubernetes.utils.KubernetesUtils;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 
 import javax.annotation.Nullable;
@@ -81,7 +82,8 @@ public class KubernetesCheckpointRecoveryFactory implements 
CheckpointRecoveryFa
             JobID jobID,
             int maxNumberOfCheckpointsToRetain,
             SharedStateRegistryFactory sharedStateRegistryFactory,
-            Executor ioExecutor)
+            Executor ioExecutor,
+            RestoreMode restoreMode)
             throws Exception {
         final String configMapName = getConfigMapNameFunction.apply(jobID);
         KubernetesUtils.createConfigMapIfItDoesNotExist(kubeClient, 
configMapName, clusterId);
@@ -94,7 +96,8 @@ public class KubernetesCheckpointRecoveryFactory implements 
CheckpointRecoveryFa
                 lockIdentity,
                 maxNumberOfCheckpointsToRetain,
                 sharedStateRegistryFactory,
-                ioExecutor);
+                ioExecutor,
+                restoreMode);
     }
 
     @Override
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
index e02d3a824e2..f8afbafde0a 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore;
 import 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.jobmanager.DefaultJobGraphStore;
 import org.apache.flink.runtime.jobmanager.JobGraphStore;
 import org.apache.flink.runtime.jobmanager.NoOpJobGraphStoreWatcher;
@@ -296,6 +297,7 @@ public class KubernetesUtils {
      * @param lockIdentity lock identity to check the leadership
      * @param maxNumberOfCheckpointsToRetain max number of checkpoints to 
retain on state store
      *     handle
+     * @param restoreMode the mode in which the job is restoring
      * @return a {@link DefaultCompletedCheckpointStore} with {@link 
KubernetesStateHandleStore}.
      * @throws Exception when create the storage helper failed
      */
@@ -307,7 +309,8 @@ public class KubernetesUtils {
             @Nullable String lockIdentity,
             int maxNumberOfCheckpointsToRetain,
             SharedStateRegistryFactory sharedStateRegistryFactory,
-            Executor ioExecutor)
+            Executor ioExecutor,
+            RestoreMode restoreMode)
             throws Exception {
 
         final RetrievableStateStorageHelper<CompletedCheckpoint> stateStorage =
@@ -331,7 +334,7 @@ public class KubernetesUtils {
                 stateHandleStore,
                 KubernetesCheckpointStoreUtil.INSTANCE,
                 checkpoints,
-                sharedStateRegistryFactory.create(ioExecutor, checkpoints),
+                sharedStateRegistryFactory.create(ioExecutor, checkpoints, 
restoreMode),
                 executor);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 6fd6ad19fea..2d47d79f063 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1789,7 +1789,8 @@ public class CheckpointCoordinator {
         // register shared state - even before adding the checkpoint to the 
store
         // because the latter might trigger subsumption so the ref counts must 
be up-to-date
         savepoint.registerSharedStatesAfterRestored(
-                completedCheckpointStore.getSharedStateRegistry());
+                completedCheckpointStore.getSharedStateRegistry(),
+                restoreSettings.getRestoreMode());
 
         completedCheckpointStore.addCheckpointAndSubsumeOldestOne(
                 savepoint, checkpointsCleaner, this::scheduleTriggerRequest);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
index ab0b5ef8506..64c68caa8a9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 
@@ -37,13 +38,15 @@ public interface CheckpointRecoveryFactory {
      * @param sharedStateRegistryFactory Simple factory to produce {@link 
SharedStateRegistry}
      *     objects.
      * @param ioExecutor Executor used to run (async) deletes.
+     * @param restoreMode the restore mode with which the job is restoring.
      * @return {@link CompletedCheckpointStore} instance for the job
      */
     CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
             JobID jobId,
             int maxNumberOfCheckpointsToRetain,
             SharedStateRegistryFactory sharedStateRegistryFactory,
-            Executor ioExecutor)
+            Executor ioExecutor,
+            RestoreMode restoreMode)
             throws Exception;
 
     /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 41c577452bb..f0270ab661a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateUtil;
@@ -207,11 +208,13 @@ public class CompletedCheckpoint implements Serializable, 
Checkpoint {
      * checkpoint is added into the store.
      *
      * @param sharedStateRegistry The registry where shared states are 
registered
+     * @param restoreMode the mode in which this checkpoint was restored from
      */
-    public void registerSharedStatesAfterRestored(SharedStateRegistry 
sharedStateRegistry) {
+    public void registerSharedStatesAfterRestored(
+            SharedStateRegistry sharedStateRegistry, RestoreMode restoreMode) {
         // in claim mode we should not register any shared handles
         if (!props.isUnclaimed()) {
-            sharedStateRegistry.registerAll(operatorStates.values(), 
checkpointID);
+            sharedStateRegistry.registerAllAfterRestored(this, restoreMode);
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
index 1e5e47c8855..744083ca6af 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.Executors;
@@ -53,16 +54,22 @@ public class EmbeddedCompletedCheckpointStore extends 
AbstractCompleteCheckpoint
 
     @VisibleForTesting
     public EmbeddedCompletedCheckpointStore(int maxRetainedCheckpoints) {
-        this(maxRetainedCheckpoints, Collections.emptyList());
+        this(
+                maxRetainedCheckpoints,
+                Collections.emptyList(),
+                /* Using the default restore mode in tests to detect any 
breaking changes early. */
+                RestoreMode.DEFAULT);
     }
 
     public EmbeddedCompletedCheckpointStore(
-            int maxRetainedCheckpoints, Collection<CompletedCheckpoint> 
initialCheckpoints) {
+            int maxRetainedCheckpoints,
+            Collection<CompletedCheckpoint> initialCheckpoints,
+            RestoreMode restoreMode) {
         this(
                 maxRetainedCheckpoints,
                 initialCheckpoints,
                 SharedStateRegistry.DEFAULT_FACTORY.create(
-                        Executors.directExecutor(), initialCheckpoints));
+                        Executors.directExecutor(), initialCheckpoints, 
restoreMode));
     }
 
     public EmbeddedCompletedCheckpointStore(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
index bc18c453969..7a70f5624a5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 
 import javax.annotation.Nullable;
@@ -42,7 +43,7 @@ public class PerJobCheckpointRecoveryFactory<T extends 
CompletedCheckpointStore>
     public static <T extends CompletedCheckpointStore>
             CheckpointRecoveryFactory 
withoutCheckpointStoreRecovery(IntFunction<T> storeFn) {
         return new PerJobCheckpointRecoveryFactory<>(
-                (maxCheckpoints, previous, sharedStateRegistry, ioExecutor) -> 
{
+                (maxCheckpoints, previous, sharedStateRegistry, ioExecutor, 
restoreMode) -> {
                     if (previous != null) {
                         throw new UnsupportedOperationException(
                                 "Checkpoint store recovery is not supported.");
@@ -75,7 +76,8 @@ public class PerJobCheckpointRecoveryFactory<T extends 
CompletedCheckpointStore>
             JobID jobId,
             int maxNumberOfCheckpointsToRetain,
             SharedStateRegistryFactory sharedStateRegistryFactory,
-            Executor ioExecutor) {
+            Executor ioExecutor,
+            RestoreMode restoreMode) {
         return store.compute(
                 jobId,
                 (key, previous) ->
@@ -83,7 +85,8 @@ public class PerJobCheckpointRecoveryFactory<T extends 
CompletedCheckpointStore>
                                 maxNumberOfCheckpointsToRetain,
                                 previous,
                                 sharedStateRegistryFactory,
-                                ioExecutor));
+                                ioExecutor,
+                                restoreMode));
     }
 
     @Override
@@ -98,6 +101,7 @@ public class PerJobCheckpointRecoveryFactory<T extends 
CompletedCheckpointStore>
                 int maxNumberOfCheckpointsToRetain,
                 @Nullable StoreType previousStore,
                 SharedStateRegistryFactory sharedStateRegistryFactory,
-                Executor ioExecutor);
+                Executor ioExecutor,
+                RestoreMode restoreMode);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index 95f9da72406..abcb704ad7c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 
@@ -32,11 +33,15 @@ public class StandaloneCheckpointRecoveryFactory implements 
CheckpointRecoveryFa
             JobID jobId,
             int maxNumberOfCheckpointsToRetain,
             SharedStateRegistryFactory sharedStateRegistryFactory,
-            Executor ioExecutor)
+            Executor ioExecutor,
+            RestoreMode restoreMode)
             throws Exception {
 
         return new StandaloneCompletedCheckpointStore(
-                maxNumberOfCheckpointsToRetain, sharedStateRegistryFactory, 
ioExecutor);
+                maxNumberOfCheckpointsToRetain,
+                sharedStateRegistryFactory,
+                ioExecutor,
+                restoreMode);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 87a6486a911..6c89dcc7127 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
@@ -56,32 +57,38 @@ public class StandaloneCompletedCheckpointStore extends 
AbstractCompleteCheckpoi
         this(
                 maxNumberOfCheckpointsToRetain,
                 SharedStateRegistry.DEFAULT_FACTORY,
-                Executors.directExecutor());
+                Executors.directExecutor(),
+                /* Using the default restore mode in tests to detect any 
breaking changes early. */
+                RestoreMode.DEFAULT);
     }
 
     /**
      * Creates {@link StandaloneCompletedCheckpointStore}.
      *
+     * @param restoreMode
      * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints 
to retain (at least
      *     1). Adding more checkpoints than this results in older checkpoints 
being discarded.
      */
     public StandaloneCompletedCheckpointStore(
             int maxNumberOfCheckpointsToRetain,
             SharedStateRegistryFactory sharedStateRegistryFactory,
-            Executor ioExecutor) {
+            Executor ioExecutor,
+            RestoreMode restoreMode) {
         this(
                 maxNumberOfCheckpointsToRetain,
                 sharedStateRegistryFactory,
                 new ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1),
-                ioExecutor);
+                ioExecutor,
+                restoreMode);
     }
 
     private StandaloneCompletedCheckpointStore(
             int maxNumberOfCheckpointsToRetain,
             SharedStateRegistryFactory sharedStateRegistryFactory,
             ArrayDeque<CompletedCheckpoint> checkpoints,
-            Executor ioExecutor) {
-        super(sharedStateRegistryFactory.create(ioExecutor, checkpoints));
+            Executor ioExecutor,
+            RestoreMode restoreMode) {
+        super(sharedStateRegistryFactory.create(ioExecutor, checkpoints, 
restoreMode));
         checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at 
least one checkpoint.");
         this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain;
         this.checkpoints = checkpoints;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index 052b3405235..c522296cf89 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 import org.apache.flink.runtime.util.ZooKeeperUtils;
@@ -51,7 +52,8 @@ public class ZooKeeperCheckpointRecoveryFactory implements 
CheckpointRecoveryFac
             JobID jobId,
             int maxNumberOfCheckpointsToRetain,
             SharedStateRegistryFactory sharedStateRegistryFactory,
-            Executor ioExecutor)
+            Executor ioExecutor,
+            RestoreMode restoreMode)
             throws Exception {
 
         return ZooKeeperUtils.createCompletedCheckpoints(
@@ -61,7 +63,8 @@ public class ZooKeeperCheckpointRecoveryFactory implements 
CheckpointRecoveryFac
                 maxNumberOfCheckpointsToRetain,
                 sharedStateRegistryFactory,
                 ioExecutor,
-                executor);
+                executor,
+                restoreMode);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
index c97f8644a1f..e657c8d7400 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunner.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils;
 import org.apache.flink.runtime.dispatcher.JobCancellationFailedException;
 import 
org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult;
 import org.apache.flink.runtime.jobmaster.JobMaster;
@@ -145,7 +146,11 @@ public class CheckpointResourcesCleanupRunner implements 
JobManagerRunner {
                 
DefaultCompletedCheckpointStoreUtils.getMaximumNumberOfRetainedCheckpoints(
                         jobManagerConfiguration, LOG),
                 sharedStateRegistryFactory,
-                cleanupExecutor);
+                cleanupExecutor,
+                // Using RestoreMode.CLAIM to be able to discard shared state, 
if any.
+                // Note that it also means that the original shared state 
might be discarded as well
+                // because the initial checkpoint might be subsumed.
+                RestoreMode.CLAIM);
     }
 
     private CheckpointIDCounter createCheckpointIDCounter() throws Exception {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
index ce59d0d99e9..b9859ba0f27 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
@@ -40,13 +40,18 @@ public class EmbeddedHaServicesWithLeadershipControl 
extends EmbeddedHaServices
         this(
                 executor,
                 new 
PerJobCheckpointRecoveryFactory<EmbeddedCompletedCheckpointStore>(
-                        (maxCheckpoints, previous, stateRegistryFactory, 
ioExecutor) -> {
+                        (maxCheckpoints,
+                                previous,
+                                stateRegistryFactory,
+                                ioExecutor,
+                                restoreMode) -> {
                             List<CompletedCheckpoint> checkpoints =
                                     previous != null
                                             ? previous.getAllCheckpoints()
                                             : Collections.emptyList();
                             SharedStateRegistry stateRegistry =
-                                    stateRegistryFactory.create(ioExecutor, 
checkpoints);
+                                    stateRegistryFactory.create(
+                                            ioExecutor, checkpoints, 
restoreMode);
                             if (previous != null) {
                                 if (!previous.getShutdownStatus().isPresent()) 
{
                                     throw new IllegalStateException(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java
index 10a4f2ac60d..da6c325265a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/RestoreMode.java
@@ -53,4 +53,6 @@ public enum RestoreMode implements DescribedEnum {
     public InlineElement getDescription() {
         return text(description);
     }
+
+    public static final RestoreMode DEFAULT = NO_CLAIM;
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
index e8a9dd86d1c..a38e05c5f3f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
@@ -52,7 +52,7 @@ public class SavepointConfigOptions {
     public static final ConfigOption<RestoreMode> RESTORE_MODE =
             key("execution.savepoint-restore-mode")
                     .enumType(RestoreMode.class)
-                    .defaultValue(RestoreMode.NO_CLAIM)
+                    .defaultValue(RestoreMode.DEFAULT)
                     .withDescription(
                             "Describes the mode how Flink should restore from 
the given"
                                     + " savepoint or retained checkpoint.");
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
index 6b14801d8eb..87f2e56c6ba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 
 import org.slf4j.Logger;
@@ -55,7 +56,12 @@ public final class SchedulerUtils {
         if (DefaultExecutionGraphBuilder.isCheckpointingEnabled(jobGraph)) {
             try {
                 return createCompletedCheckpointStore(
-                        configuration, checkpointRecoveryFactory, ioExecutor, 
log, jobId);
+                        configuration,
+                        checkpointRecoveryFactory,
+                        ioExecutor,
+                        log,
+                        jobId,
+                        
jobGraph.getSavepointRestoreSettings().getRestoreMode());
             } catch (Exception e) {
                 throw new JobExecutionException(
                         jobId,
@@ -73,14 +79,16 @@ public final class SchedulerUtils {
             CheckpointRecoveryFactory recoveryFactory,
             Executor ioExecutor,
             Logger log,
-            JobID jobId)
+            JobID jobId,
+            RestoreMode restoreMode)
             throws Exception {
         return recoveryFactory.createRecoveredCompletedCheckpointStore(
                 jobId,
                 
DefaultCompletedCheckpointStoreUtils.getMaximumNumberOfRetainedCheckpoints(
                         jobManagerConfig, log),
                 SharedStateRegistry.DEFAULT_FACTORY,
-                ioExecutor);
+                ioExecutor,
+                restoreMode);
     }
 
     public static CheckpointIDCounter 
createCheckpointIDCounterIfCheckpointingIsEnabled(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index 7172bec4c24..b816f09e767 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 
 /**
  * This registry manages state that is shared across (incremental) 
checkpoints, and is responsible
@@ -32,11 +33,11 @@ public interface SharedStateRegistry extends AutoCloseable {
 
     /** A singleton object for the default implementation of a {@link 
SharedStateRegistryFactory} */
     SharedStateRegistryFactory DEFAULT_FACTORY =
-            (deleteExecutor, checkpoints) -> {
+            (deleteExecutor, checkpoints, restoreMode) -> {
                 SharedStateRegistry sharedStateRegistry =
                         new SharedStateRegistryImpl(deleteExecutor);
                 for (CompletedCheckpoint checkpoint : checkpoints) {
-                    
checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
+                    
checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry, restoreMode);
                 }
                 return sharedStateRegistry;
             };
@@ -66,10 +67,25 @@ public interface SharedStateRegistry extends AutoCloseable {
     /**
      * Register given shared states in the registry.
      *
+     * <p>NOTE: For state from checkpoints from other jobs or runs (i.e. after 
recovery), please use
+     * {@link #registerAllAfterRestored(CompletedCheckpoint, RestoreMode)}
+     *
      * @param stateHandles The shared states to register.
      * @param checkpointID which uses the states.
      */
     void registerAll(Iterable<? extends CompositeStateHandle> stateHandles, 
long checkpointID);
 
+    /**
+     * Set the lowest checkpoint ID below which no state is discarded, 
inclusive.
+     *
+     * <p>After recovery from an incremental checkpoint, its state should NOT 
be discarded, even if
+     * {@link #unregisterUnusedState(long) not used} anymore (unless 
recovering in {@link
+     * RestoreMode#CLAIM CLAIM} mode).
+     *
+     * <p>This should hold for both cases: when recovering from that initial 
checkpoint; and from
+     * any subsequent checkpoint derived from it.
+     */
+    void registerAllAfterRestored(CompletedCheckpoint checkpoint, RestoreMode 
mode);
+
     void checkpointCompleted(long checkpointId);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
index bbdd2fd0959..bc8118cce42 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 
 import java.util.Collection;
 import java.util.concurrent.Executor;
@@ -29,10 +30,13 @@ public interface SharedStateRegistryFactory {
     /**
      * Factory method for {@link SharedStateRegistry}.
      *
-     * @param checkpoints whose shared state will be registered.
      * @param deleteExecutor executor used to run (async) deletes.
+     * @param checkpoints whose shared state will be registered.
+     * @param restoreMode the mode in which the given checkpoints were restored
      * @return a SharedStateRegistry object
      */
     SharedStateRegistry create(
-            Executor deleteExecutor, Collection<CompletedCheckpoint> 
checkpoints);
+            Executor deleteExecutor,
+            Collection<CompletedCheckpoint> checkpoints,
+            RestoreMode restoreMode);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
index b87d8646086..4dce6455277 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryImpl.java
@@ -19,6 +19,8 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.util.concurrent.Executors;
 
 import org.slf4j.Logger;
@@ -51,6 +53,9 @@ public class SharedStateRegistryImpl implements 
SharedStateRegistry {
     /** Executor for async state deletion */
     private final Executor asyncDisposalExecutor;
 
+    /** Checkpoint ID below which no state is discarded, inclusive. */
+    private long highestNotClaimedCheckpointID = -1L;
+
     /** Default uses direct executor to delete unreferenced state */
     public SharedStateRegistryImpl() {
         this(Executors.directExecutor());
@@ -147,7 +152,9 @@ public class SharedStateRegistryImpl implements 
SharedStateRegistry {
             while (it.hasNext()) {
                 SharedStateEntry entry = it.next();
                 if (entry.lastUsedCheckpointID < lowestCheckpointID) {
-                    subsumed.add(entry.stateHandle);
+                    if (entry.createdByCheckpointID > 
highestNotClaimedCheckpointID) {
+                        subsumed.add(entry.stateHandle);
+                    }
                     it.remove();
                 }
             }
@@ -174,6 +181,20 @@ public class SharedStateRegistryImpl implements 
SharedStateRegistry {
         }
     }
 
+    @Override
+    public void registerAllAfterRestored(CompletedCheckpoint checkpoint, 
RestoreMode mode) {
+        registerAll(checkpoint.getOperatorStates().values(), 
checkpoint.getCheckpointID());
+        // In NO_CLAIM and LEGACY restore modes, shared state of the initial 
checkpoints must be
+        // preserved. This is achieved by advancing highestRetainCheckpointID 
here, and then
+        // checking entry.createdByCheckpointID against it on checkpoint 
subsumption.
+        // In CLAIM restore mode, the shared state of the initial checkpoints 
must be
+        // discarded as soon as it becomes unused - so 
highestRetainCheckpointID is not updated.
+        if (mode != RestoreMode.CLAIM) {
+            highestNotClaimedCheckpointID =
+                    Math.max(highestNotClaimedCheckpointID, 
checkpoint.getCheckpointID());
+        }
+    }
+
     @Override
     public void checkpointCompleted(long checkpointId) {
         for (SharedStateEntry entry : registeredStates.values()) {
@@ -251,6 +272,8 @@ public class SharedStateRegistryImpl implements 
SharedStateRegistry {
         /** The shared state handle */
         StreamStateHandle stateHandle;
 
+        private final long createdByCheckpointID;
+
         private long lastUsedCheckpointID;
 
         /** Whether this entry is included into a confirmed checkpoint. */
@@ -258,6 +281,7 @@ public class SharedStateRegistryImpl implements 
SharedStateRegistry {
 
         SharedStateEntry(StreamStateHandle value, long checkpointID) {
             this.stateHandle = value;
+            this.createdByCheckpointID = checkpointID;
             this.lastUsedCheckpointID = checkpointID;
         }
 
@@ -266,6 +290,8 @@ public class SharedStateRegistryImpl implements 
SharedStateRegistry {
             return "SharedStateEntry{"
                     + "stateHandle="
                     + stateHandle
+                    + ", createdByCheckpointID="
+                    + createdByCheckpointID
                     + ", lastUsedCheckpointID="
                     + lastUsedCheckpointID
                     + '}';
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index c62b0c3bde0..2de67d006d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointStoreUtil;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import 
org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.jobmanager.DefaultJobGraphStore;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.jobmanager.JobGraphStore;
@@ -569,6 +570,7 @@ public class ZooKeeperUtils {
      * @param configuration {@link Configuration} object
      * @param maxNumberOfCheckpointsToRetain The maximum number of checkpoints 
to retain
      * @param executor to run ZooKeeper callbacks
+     * @param restoreMode the mode in which the job is being restored
      * @return {@link DefaultCompletedCheckpointStore} instance
      * @throws Exception if the completed checkpoint store cannot be created
      */
@@ -578,7 +580,8 @@ public class ZooKeeperUtils {
             int maxNumberOfCheckpointsToRetain,
             SharedStateRegistryFactory sharedStateRegistryFactory,
             Executor ioExecutor,
-            Executor executor)
+            Executor executor,
+            RestoreMode restoreMode)
             throws Exception {
 
         checkNotNull(configuration, "Configuration");
@@ -597,7 +600,8 @@ public class ZooKeeperUtils {
                         completedCheckpointStateHandleStore,
                         ZooKeeperCheckpointStoreUtil.INSTANCE,
                         completedCheckpoints,
-                        sharedStateRegistryFactory.create(ioExecutor, 
completedCheckpoints),
+                        sharedStateRegistryFactory.create(
+                                ioExecutor, completedCheckpoints, restoreMode),
                         executor);
         LOG.info(
                 "Initialized {} in '{}' with {}.",
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index d892319cbd0..edc0a6f7e34 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
 import org.apache.flink.runtime.state.InputChannelStateHandle;
@@ -286,7 +287,7 @@ public class CheckpointCoordinatorFailureTest extends 
TestLogger {
         public FailingCompletedCheckpointStore(Exception addCheckpointFailure) 
{
             super(
                     SharedStateRegistry.DEFAULT_FACTORY.create(
-                            Executors.directExecutor(), emptyList()));
+                            Executors.directExecutor(), emptyList(), 
RestoreMode.DEFAULT));
             this.addCheckpointFailure = addCheckpointFailure;
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index 2d80c065a94..a26e7ea5059 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
@@ -241,7 +242,7 @@ public class CheckpointCoordinatorRestoringTest extends 
TestLogger {
         final ExecutionGraph executionGraph = createExecutionGraph(vertices);
         final EmbeddedCompletedCheckpointStore store =
                 new EmbeddedCompletedCheckpointStore(
-                        completedCheckpoints.size(), completedCheckpoints);
+                        completedCheckpoints.size(), completedCheckpoints, 
RestoreMode.DEFAULT);
 
         // set up the coordinator and validate the initial state
         final CheckpointCoordinator coordinator =
@@ -780,7 +781,8 @@ public class CheckpointCoordinatorRestoringTest extends 
TestLogger {
 
         // set up the coordinator and validate the initial state
         SharedStateRegistry sharedStateRegistry =
-                
SharedStateRegistry.DEFAULT_FACTORY.create(Executors.directExecutor(), 
emptyList());
+                SharedStateRegistry.DEFAULT_FACTORY.create(
+                        Executors.directExecutor(), emptyList(), 
RestoreMode.DEFAULT);
         CheckpointCoordinator coord =
                 new CheckpointCoordinatorBuilder()
                         .setCompletedCheckpointStore(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index ccc0555ec44..da1d49bf00a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
@@ -2860,161 +2861,176 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
 
     @Test
     public void testSharedStateRegistrationOnRestore() throws Exception {
-        JobVertexID jobVertexID1 = new JobVertexID();
+        for (RestoreMode restoreMode : RestoreMode.values()) {
+            JobVertexID jobVertexID1 = new JobVertexID();
 
-        int parallelism1 = 2;
-        int maxParallelism1 = 4;
+            int parallelism1 = 2;
+            int maxParallelism1 = 4;
 
-        ExecutionGraph graph =
-                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
-                        .addJobVertex(jobVertexID1, parallelism1, 
maxParallelism1)
-                        .build(EXECUTOR_RESOURCE.getExecutor());
+            ExecutionGraph graph =
+                    new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
+                            .addJobVertex(jobVertexID1, parallelism1, 
maxParallelism1)
+                            .build(EXECUTOR_RESOURCE.getExecutor());
 
-        ExecutionJobVertex jobVertex1 = graph.getJobVertex(jobVertexID1);
+            ExecutionJobVertex jobVertex1 = graph.getJobVertex(jobVertexID1);
 
-        List<CompletedCheckpoint> checkpoints = Collections.emptyList();
-        SharedStateRegistry firstInstance =
-                SharedStateRegistry.DEFAULT_FACTORY.create(
-                        
org.apache.flink.util.concurrent.Executors.directExecutor(), checkpoints);
-        final EmbeddedCompletedCheckpointStore store =
-                new EmbeddedCompletedCheckpointStore(10, checkpoints, 
firstInstance);
+            List<CompletedCheckpoint> checkpoints = Collections.emptyList();
+            SharedStateRegistry firstInstance =
+                    SharedStateRegistry.DEFAULT_FACTORY.create(
+                            
org.apache.flink.util.concurrent.Executors.directExecutor(),
+                            checkpoints,
+                            restoreMode);
+            final EmbeddedCompletedCheckpointStore store =
+                    new EmbeddedCompletedCheckpointStore(10, checkpoints, 
firstInstance);
 
-        // set up the coordinator and validate the initial state
-        final CheckpointCoordinatorBuilder coordinatorBuilder =
-                new 
CheckpointCoordinatorBuilder().setTimer(manuallyTriggeredScheduledExecutor);
-        final CheckpointCoordinator coordinator =
-                
coordinatorBuilder.setCompletedCheckpointStore(store).build(graph);
+            // set up the coordinator and validate the initial state
+            final CheckpointCoordinatorBuilder coordinatorBuilder =
+                    new 
CheckpointCoordinatorBuilder().setTimer(manuallyTriggeredScheduledExecutor);
+            final CheckpointCoordinator coordinator =
+                    
coordinatorBuilder.setCompletedCheckpointStore(store).build(graph);
 
-        final int numCheckpoints = 3;
+            final int numCheckpoints = 3;
 
-        List<KeyGroupRange> keyGroupPartitions1 =
-                
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, 
parallelism1);
+            List<KeyGroupRange> keyGroupPartitions1 =
+                    StateAssignmentOperation.createKeyGroupPartitions(
+                            maxParallelism1, parallelism1);
 
-        for (int i = 0; i < numCheckpoints; ++i) {
-            performIncrementalCheckpoint(
-                    graph.getJobID(), coordinator, jobVertex1, 
keyGroupPartitions1, i);
-        }
+            for (int i = 0; i < numCheckpoints; ++i) {
+                performIncrementalCheckpoint(
+                        graph.getJobID(), coordinator, jobVertex1, 
keyGroupPartitions1, i);
+            }
 
-        List<CompletedCheckpoint> completedCheckpoints = 
coordinator.getSuccessfulCheckpoints();
-        assertEquals(numCheckpoints, completedCheckpoints.size());
+            List<CompletedCheckpoint> completedCheckpoints = 
coordinator.getSuccessfulCheckpoints();
+            assertEquals(numCheckpoints, completedCheckpoints.size());
 
-        int sharedHandleCount = 0;
+            int sharedHandleCount = 0;
 
-        List<Map<StateHandleID, StreamStateHandle>> sharedHandlesByCheckpoint =
-                new ArrayList<>(numCheckpoints);
+            List<Map<StateHandleID, StreamStateHandle>> 
sharedHandlesByCheckpoint =
+                    new ArrayList<>(numCheckpoints);
 
-        for (int i = 0; i < numCheckpoints; ++i) {
-            sharedHandlesByCheckpoint.add(new HashMap<>(2));
-        }
+            for (int i = 0; i < numCheckpoints; ++i) {
+                sharedHandlesByCheckpoint.add(new HashMap<>(2));
+            }
 
-        int cp = 0;
-        for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
-            for (OperatorState taskState : 
completedCheckpoint.getOperatorStates().values()) {
-                for (OperatorSubtaskState subtaskState : 
taskState.getStates()) {
-                    for (KeyedStateHandle keyedStateHandle : 
subtaskState.getManagedKeyedState()) {
-                        // test we are once registered with the current 
registry
-                        verify(keyedStateHandle, times(1))
-                                .registerSharedStates(
-                                        firstInstance, 
completedCheckpoint.getCheckpointID());
-                        IncrementalRemoteKeyedStateHandle 
incrementalKeyedStateHandle =
-                                (IncrementalRemoteKeyedStateHandle) 
keyedStateHandle;
-
-                        sharedHandlesByCheckpoint
-                                .get(cp)
-                                
.putAll(incrementalKeyedStateHandle.getSharedState());
-
-                        for (StreamStateHandle streamStateHandle :
-                                
incrementalKeyedStateHandle.getSharedState().values()) {
-                            assertTrue(
-                                    !(streamStateHandle instanceof 
PlaceholderStreamStateHandle));
-                            verify(streamStateHandle, never()).discardState();
-                            ++sharedHandleCount;
-                        }
+            int cp = 0;
+            for (CompletedCheckpoint completedCheckpoint : 
completedCheckpoints) {
+                for (OperatorState taskState : 
completedCheckpoint.getOperatorStates().values()) {
+                    for (OperatorSubtaskState subtaskState : 
taskState.getStates()) {
+                        for (KeyedStateHandle keyedStateHandle :
+                                subtaskState.getManagedKeyedState()) {
+                            // test we are once registered with the current 
registry
+                            verify(keyedStateHandle, times(1))
+                                    .registerSharedStates(
+                                            firstInstance, 
completedCheckpoint.getCheckpointID());
+                            IncrementalRemoteKeyedStateHandle 
incrementalKeyedStateHandle =
+                                    (IncrementalRemoteKeyedStateHandle) 
keyedStateHandle;
+
+                            sharedHandlesByCheckpoint
+                                    .get(cp)
+                                    
.putAll(incrementalKeyedStateHandle.getSharedState());
+
+                            for (StreamStateHandle streamStateHandle :
+                                    
incrementalKeyedStateHandle.getSharedState().values()) {
+                                assertTrue(
+                                        !(streamStateHandle
+                                                instanceof 
PlaceholderStreamStateHandle));
+                                verify(streamStateHandle, 
never()).discardState();
+                                ++sharedHandleCount;
+                            }
 
-                        for (StreamStateHandle streamStateHandle :
-                                
incrementalKeyedStateHandle.getPrivateState().values()) {
-                            verify(streamStateHandle, never()).discardState();
+                            for (StreamStateHandle streamStateHandle :
+                                    
incrementalKeyedStateHandle.getPrivateState().values()) {
+                                verify(streamStateHandle, 
never()).discardState();
+                            }
+
+                            
verify(incrementalKeyedStateHandle.getMetaStateHandle(), never())
+                                    .discardState();
                         }
 
-                        
verify(incrementalKeyedStateHandle.getMetaStateHandle(), never())
-                                .discardState();
+                        verify(subtaskState, never()).discardState();
                     }
-
-                    verify(subtaskState, never()).discardState();
                 }
+                ++cp;
             }
-            ++cp;
-        }
 
-        // 2 (parallelism) x (1 (CP0) + 2 (CP1) + 2 (CP2)) = 10
-        assertEquals(10, sharedHandleCount);
+            // 2 (parallelism) x (1 (CP0) + 2 (CP1) + 2 (CP2)) = 10
+            assertEquals(10, sharedHandleCount);
 
-        // discard CP0
-        store.removeOldestCheckpoint();
+            // discard CP0
+            store.removeOldestCheckpoint();
 
-        // we expect no shared state was discarded because the state of CP0 is 
still referenced by
-        // CP1
-        for (Map<StateHandleID, StreamStateHandle> cpList : 
sharedHandlesByCheckpoint) {
-            for (StreamStateHandle streamStateHandle : cpList.values()) {
-                verify(streamStateHandle, never()).discardState();
+            // we expect no shared state was discarded because the state of 
CP0 is still referenced
+            // by
+            // CP1
+            for (Map<StateHandleID, StreamStateHandle> cpList : 
sharedHandlesByCheckpoint) {
+                for (StreamStateHandle streamStateHandle : cpList.values()) {
+                    verify(streamStateHandle, never()).discardState();
+                }
             }
-        }
 
-        // shutdown the store
-        store.shutdown(JobStatus.SUSPENDED, new CheckpointsCleaner());
-
-        // restore the store
-        Set<ExecutionJobVertex> tasks = new HashSet<>();
-        tasks.add(jobVertex1);
-
-        assertEquals(JobStatus.SUSPENDED, 
store.getShutdownStatus().orElse(null));
-        SharedStateRegistry secondInstance =
-                SharedStateRegistry.DEFAULT_FACTORY.create(
-                        
org.apache.flink.util.concurrent.Executors.directExecutor(),
-                        store.getAllCheckpoints());
-        final EmbeddedCompletedCheckpointStore secondStore =
-                new EmbeddedCompletedCheckpointStore(10, 
store.getAllCheckpoints(), secondInstance);
-        final CheckpointCoordinator secondCoordinator =
-                
coordinatorBuilder.setCompletedCheckpointStore(secondStore).build(graph);
-        
assertTrue(secondCoordinator.restoreLatestCheckpointedStateToAll(tasks, false));
-
-        // validate that all shared states are registered again after the 
recovery.
-        cp = 0;
-        for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
-            for (OperatorState taskState : 
completedCheckpoint.getOperatorStates().values()) {
-                for (OperatorSubtaskState subtaskState : 
taskState.getStates()) {
-                    for (KeyedStateHandle keyedStateHandle : 
subtaskState.getManagedKeyedState()) {
-                        VerificationMode verificationMode;
-                        // test we are once registered with the new registry
-                        if (cp > 0) {
-                            verificationMode = times(1);
-                        } else {
-                            verificationMode = never();
-                        }
+            // shutdown the store
+            store.shutdown(JobStatus.SUSPENDED, new CheckpointsCleaner());
+
+            // restore the store
+            Set<ExecutionJobVertex> tasks = new HashSet<>();
+            tasks.add(jobVertex1);
+
+            assertEquals(JobStatus.SUSPENDED, 
store.getShutdownStatus().orElse(null));
+            SharedStateRegistry secondInstance =
+                    SharedStateRegistry.DEFAULT_FACTORY.create(
+                            
org.apache.flink.util.concurrent.Executors.directExecutor(),
+                            store.getAllCheckpoints(),
+                            restoreMode);
+            final EmbeddedCompletedCheckpointStore secondStore =
+                    new EmbeddedCompletedCheckpointStore(
+                            10, store.getAllCheckpoints(), secondInstance);
+            final CheckpointCoordinator secondCoordinator =
+                    
coordinatorBuilder.setCompletedCheckpointStore(secondStore).build(graph);
+            
assertTrue(secondCoordinator.restoreLatestCheckpointedStateToAll(tasks, false));
+
+            // validate that all shared states are registered again after the 
recovery.
+            cp = 0;
+            for (CompletedCheckpoint completedCheckpoint : 
completedCheckpoints) {
+                for (OperatorState taskState : 
completedCheckpoint.getOperatorStates().values()) {
+                    for (OperatorSubtaskState subtaskState : 
taskState.getStates()) {
+                        for (KeyedStateHandle keyedStateHandle :
+                                subtaskState.getManagedKeyedState()) {
+                            VerificationMode verificationMode;
+                            // test we are once registered with the new 
registry
+                            if (cp > 0) {
+                                verificationMode = times(1);
+                            } else {
+                                verificationMode = never();
+                            }
 
-                        // check that all are registered with the new registry
-                        verify(keyedStateHandle, verificationMode)
-                                .registerSharedStates(
-                                        secondInstance, 
completedCheckpoint.getCheckpointID());
+                            // check that all are registered with the new 
registry
+                            verify(keyedStateHandle, verificationMode)
+                                    .registerSharedStates(
+                                            secondInstance, 
completedCheckpoint.getCheckpointID());
+                        }
                     }
                 }
+                ++cp;
             }
-            ++cp;
-        }
 
-        // discard CP1
-        secondStore.removeOldestCheckpoint();
+            // discard CP1
+            secondStore.removeOldestCheckpoint();
 
-        // we expect that all shared state from CP0 is no longer referenced 
and discarded. CP2 is
-        // still live and also
-        // references the state from CP1, so we expect they are not discarded.
-        verifyDiscard(sharedHandlesByCheckpoint, cpId -> cpId == 0 ? times(1) 
: never());
+            // we expect that all shared state from CP0 is no longer 
referenced and discarded. CP2
+            // is
+            // still live and also
+            // references the state from CP1, so we expect they are not 
discarded.
+            verifyDiscard(
+                    sharedHandlesByCheckpoint,
+                    cpId -> restoreMode == RestoreMode.CLAIM && cpId == 0 ? 
times(1) : never());
 
-        // discard CP2
-        secondStore.removeOldestCheckpoint();
+            // discard CP2
+            secondStore.removeOldestCheckpoint();
 
-        // still expect shared state not to be discarded because it may be 
used in later checkpoints
-        verifyDiscard(sharedHandlesByCheckpoint, cpId -> cpId == 1 ? never() : 
atLeast(0));
+            // still expect shared state not to be discarded because it may be 
used in later
+            // checkpoints
+            verifyDiscard(sharedHandlesByCheckpoint, cpId -> cpId == 1 ? 
never() : atLeast(0));
+        }
     }
 
     @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index b6461a9c984..a0b67cd0dfa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistryImpl;
 import org.apache.flink.runtime.state.testutils.EmptyStreamStateHandle;
@@ -235,7 +236,7 @@ public class CompletedCheckpointTest {
                         null);
 
         SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistryImpl();
-        checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
+        checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry, 
RestoreMode.DEFAULT);
         verify(state, times(1)).registerSharedStates(sharedStateRegistry, 0L);
     }
 
@@ -267,7 +268,7 @@ public class CompletedCheckpointTest {
                         null);
 
         SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistryImpl();
-        checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
+        checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry, 
RestoreMode.DEFAULT);
         verify(state, times(1)).registerSharedStates(sharedStateRegistry, 0L);
 
         // Subsume
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
index 064e26a9b9f..22cd5dd021c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.persistence.StateHandleStore;
 import 
org.apache.flink.runtime.persistence.TestingRetrievableStateStorageHelper;
 import org.apache.flink.runtime.persistence.TestingStateHandleStore;
@@ -397,7 +398,9 @@ public class DefaultCompletedCheckpointStoreTest extends 
TestLogger {
                 
DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints(
                         stateHandleStore, checkpointStoreUtil),
                 SharedStateRegistry.DEFAULT_FACTORY.create(
-                        
org.apache.flink.util.concurrent.Executors.directExecutor(), emptyList()),
+                        
org.apache.flink.util.concurrent.Executors.directExecutor(),
+                        emptyList(),
+                        RestoreMode.DEFAULT),
                 executorService);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
index 58a0d954318..7203edfc709 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.Executors;
@@ -49,7 +50,8 @@ public class PerJobCheckpointRecoveryTest extends TestLogger {
                         firstJobId,
                         1,
                         SharedStateRegistry.DEFAULT_FACTORY,
-                        Executors.directExecutor()));
+                        Executors.directExecutor(),
+                        RestoreMode.DEFAULT));
         assertThrows(
                 UnsupportedOperationException.class,
                 () ->
@@ -57,7 +59,8 @@ public class PerJobCheckpointRecoveryTest extends TestLogger {
                                 firstJobId,
                                 1,
                                 SharedStateRegistry.DEFAULT_FACTORY,
-                                Executors.directExecutor()));
+                                Executors.directExecutor(),
+                                RestoreMode.DEFAULT));
 
         final JobID secondJobId = new JobID();
         assertSame(
@@ -66,7 +69,8 @@ public class PerJobCheckpointRecoveryTest extends TestLogger {
                         secondJobId,
                         1,
                         SharedStateRegistry.DEFAULT_FACTORY,
-                        Executors.directExecutor()));
+                        Executors.directExecutor(),
+                        RestoreMode.DEFAULT));
         assertThrows(
                 UnsupportedOperationException.class,
                 () ->
@@ -74,6 +78,7 @@ public class PerJobCheckpointRecoveryTest extends TestLogger {
                                 secondJobId,
                                 1,
                                 SharedStateRegistry.DEFAULT_FACTORY,
-                                Executors.directExecutor()));
+                                Executors.directExecutor(),
+                                RestoreMode.DEFAULT));
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
index e164543d0ab..687196dff82 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
@@ -18,6 +18,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 
 import java.util.concurrent.Executor;
@@ -39,7 +40,8 @@ public class TestingCheckpointRecoveryFactory implements 
CheckpointRecoveryFacto
             JobID jobId,
             int maxNumberOfCheckpointsToRetain,
             SharedStateRegistryFactory sharedStateRegistryFactory,
-            Executor ioExecutor) {
+            Executor ioExecutor,
+            RestoreMode restoreMode) {
         return store;
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 5cc3db53e79..d3d705d51f7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistryImpl;
@@ -90,7 +91,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends 
CompletedCheckpoint
                 checkpointStoreUtil,
                 
DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints(
                         checkpointsInZooKeeper, checkpointStoreUtil),
-                
SharedStateRegistry.DEFAULT_FACTORY.create(Executors.directExecutor(), 
emptyList()),
+                SharedStateRegistry.DEFAULT_FACTORY.create(
+                        Executors.directExecutor(), emptyList(), 
RestoreMode.DEFAULT),
                 executor);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 804d285ae28..cd8561157cb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.core.testutils.FlinkMatchers;
 import 
org.apache.flink.runtime.highavailability.zookeeper.CuratorFrameworkWithUnhandledErrorListener;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.rest.util.NoOpFatalErrorHandler;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
@@ -196,7 +197,8 @@ public class ZooKeeperCompletedCheckpointStoreTest extends 
TestLogger {
                 checkpointsInZooKeeper,
                 zooKeeperCheckpointStoreUtil,
                 Collections.emptyList(),
-                
SharedStateRegistry.DEFAULT_FACTORY.create(Executors.directExecutor(), 
emptyList()),
+                SharedStateRegistry.DEFAULT_FACTORY.create(
+                        Executors.directExecutor(), emptyList(), 
RestoreMode.DEFAULT),
                 Executors.directExecutor());
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
index 7baad041258..d305011a7e9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherCleanupITCase.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedJobResul
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
 import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.JobGraphStore;
@@ -84,7 +85,11 @@ public class DispatcherCleanupITCase extends 
AbstractDispatcherTest {
         super.setUp();
         haServices.setCheckpointRecoveryFactory(
                 new 
PerJobCheckpointRecoveryFactory<EmbeddedCompletedCheckpointStore>(
-                        (maxCheckpoints, previous, sharedStateRegistryFactory, 
ioExecutor) -> {
+                        (maxCheckpoints,
+                                previous,
+                                sharedStateRegistryFactory,
+                                ioExecutor,
+                                restoreMode) -> {
                             if (previous != null) {
                                 // First job cleanup still succeeded for the
                                 // CompletedCheckpointStore because the 
JobGraph cleanup happens
@@ -95,13 +100,17 @@ public class DispatcherCleanupITCase extends 
AbstractDispatcherTest {
                                         maxCheckpoints,
                                         previous.getAllCheckpoints(),
                                         sharedStateRegistryFactory.create(
-                                                ioExecutor, 
previous.getAllCheckpoints()));
+                                                ioExecutor,
+                                                previous.getAllCheckpoints(),
+                                                restoreMode));
                             }
                             return new EmbeddedCompletedCheckpointStore(
                                     maxCheckpoints,
                                     Collections.emptyList(),
                                     sharedStateRegistryFactory.create(
-                                            ioExecutor, 
Collections.emptyList()));
+                                            ioExecutor,
+                                            Collections.emptyList(),
+                                            RestoreMode.DEFAULT));
                         }));
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
index 8a226fed8aa..cedcfc3d435 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/cleanup/CheckpointResourcesCleanupRunnerTest.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
 import 
org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult;
 import org.apache.flink.runtime.jobmaster.JobResult;
 import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
@@ -622,7 +623,8 @@ public class CheckpointResourcesCleanupRunnerTest {
                 JobID jobId,
                 int maxNumberOfCheckpointsToRetain,
                 SharedStateRegistryFactory sharedStateRegistryFactory,
-                Executor ioExecutor)
+                Executor ioExecutor,
+                RestoreMode restoreMode)
                 throws Exception {
             creationLatch.await();
             return completedCheckpointStore;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
index dfc3b40da07..9e21dd42e08 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerUtilsTest.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
@@ -78,7 +79,8 @@ public class SchedulerUtilsTest extends TestLogger {
                         new StandaloneCheckpointRecoveryFactory(),
                         Executors.directExecutor(),
                         log,
-                        new JobID());
+                        new JobID(),
+                        RestoreMode.CLAIM);
 
         assertEquals(
                 maxNumberOfCheckpointsToRetain,
@@ -104,7 +106,8 @@ public class SchedulerUtilsTest extends TestLogger {
                         recoveryFactory,
                         Executors.directExecutor(),
                         log,
-                        new JobID());
+                        new JobID(),
+                        RestoreMode.CLAIM);
 
         SharedStateRegistry sharedStateRegistry = 
checkpointStore.getSharedStateRegistry();
 
@@ -122,12 +125,14 @@ public class SchedulerUtilsTest extends TestLogger {
                     JobID jobId,
                     int maxNumberOfCheckpointsToRetain,
                     SharedStateRegistryFactory sharedStateRegistryFactory,
-                    Executor ioExecutor) {
+                    Executor ioExecutor,
+                    RestoreMode restoreMode) {
                 List<CompletedCheckpoint> checkpoints = 
singletonList(checkpoint);
                 return new EmbeddedCompletedCheckpointStore(
                         maxNumberOfCheckpointsToRetain,
                         checkpoints,
-                        sharedStateRegistryFactory.create(ioExecutor, 
checkpoints));
+                        sharedStateRegistryFactory.create(
+                                ioExecutor, checkpoints, RestoreMode.DEFAULT));
             }
 
             @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
index 7292b54b929..d36100be22e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/CommonTestUtils.java
@@ -54,6 +54,7 @@ import java.util.function.Predicate;
 import java.util.stream.Stream;
 
 import static java.lang.String.format;
+import static org.apache.flink.util.Preconditions.checkState;
 import static org.junit.jupiter.api.Assertions.fail;
 
 /** This class contains auxiliary methods for unit tests. */
@@ -309,18 +310,30 @@ public class CommonTestUtils {
                 });
     }
 
-    /** Wait for at least one successful checkpoint. */
-    public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster)
+    /** Wait for (at least) the given number of successful checkpoints. */
+    public static void waitForCheckpoint(JobID jobID, MiniCluster miniCluster, 
int numCheckpoints)
             throws Exception, FlinkJobNotFoundException {
         waitUntilCondition(
-                () ->
-                        Optional.ofNullable(
-                                        miniCluster
-                                                .getExecutionGraph(jobID)
-                                                .get()
-                                                .getCheckpointStatsSnapshot())
-                                .filter(st -> 
st.getCounts().getNumberOfCompletedCheckpoints() > 0)
-                                .isPresent());
+                () -> {
+                    AccessExecutionGraph graph = 
miniCluster.getExecutionGraph(jobID).get();
+                    if (Optional.ofNullable(graph.getCheckpointStatsSnapshot())
+                            .filter(
+                                    st ->
+                                            
st.getCounts().getNumberOfCompletedCheckpoints()
+                                                    >= numCheckpoints)
+                            .isPresent()) {
+                        return true;
+                    } else if (graph.getState().isGloballyTerminalState()) {
+                        checkState(
+                                graph.getFailureInfo() != null,
+                                "Job terminated before taking required %s 
checkpoints: %s",
+                                numCheckpoints,
+                                graph.getState());
+                        throw graph.getFailureInfo().getException();
+                    } else {
+                        return false;
+                    }
+                });
     }
 
     /**
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
index ab5d7f9b111..884c65c731d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java
@@ -24,14 +24,15 @@ import 
org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
 import org.apache.flink.api.common.eventtime.WatermarkGenerator;
 import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
@@ -43,13 +44,14 @@ import 
org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindo
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.test.state.ManualWindowSpeedITCase;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
-import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.apache.curator.test.TestingServer;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import javax.annotation.Nullable;
 
@@ -57,6 +59,9 @@ import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.getLatestCompletedCheckpointPath;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForCheckpoint;
+import static org.apache.flink.test.util.TestUtils.waitUntilJobCanceled;
 import static org.junit.Assert.assertNotNull;
 
 /**
@@ -68,26 +73,42 @@ import static org.junit.Assert.assertNotNull;
  * <p>This tests considers full and incremental checkpoints and was introduced 
to guard against
  * problems like FLINK-6964.
  */
+@RunWith(Parameterized.class)
 public class ResumeCheckpointManuallyITCase extends TestLogger {
 
     private static final int PARALLELISM = 2;
     private static final int NUM_TASK_MANAGERS = 2;
     private static final int SLOTS_PER_TASK_MANAGER = 2;
 
+    @Parameterized.Parameter public RestoreMode restoreMode;
+
+    @Parameterized.Parameters(name = "RestoreMode = {0}")
+    public static Object[] parameters() {
+        return RestoreMode.values();
+    }
+
     @ClassRule public static TemporaryFolder temporaryFolder = new 
TemporaryFolder();
 
     @Test
     public void testExternalizedIncrementalRocksDBCheckpointsStandalone() 
throws Exception {
         final File checkpointDir = temporaryFolder.newFolder();
         testExternalizedCheckpoints(
-                checkpointDir, null, createRocksDBStateBackend(checkpointDir, 
true), false);
+                checkpointDir,
+                null,
+                createRocksDBStateBackend(checkpointDir, true),
+                false,
+                restoreMode);
     }
 
     @Test
     public void testExternalizedFullRocksDBCheckpointsStandalone() throws 
Exception {
         final File checkpointDir = temporaryFolder.newFolder();
         testExternalizedCheckpoints(
-                checkpointDir, null, createRocksDBStateBackend(checkpointDir, 
false), false);
+                checkpointDir,
+                null,
+                createRocksDBStateBackend(checkpointDir, false),
+                false,
+                restoreMode);
     }
 
     @Test
@@ -95,7 +116,11 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
             throws Exception {
         final File checkpointDir = temporaryFolder.newFolder();
         testExternalizedCheckpoints(
-                checkpointDir, null, createRocksDBStateBackend(checkpointDir, 
true), true);
+                checkpointDir,
+                null,
+                createRocksDBStateBackend(checkpointDir, true),
+                true,
+                restoreMode);
     }
 
     @Test
@@ -103,20 +128,25 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
             throws Exception {
         final File checkpointDir = temporaryFolder.newFolder();
         testExternalizedCheckpoints(
-                checkpointDir, null, createRocksDBStateBackend(checkpointDir, 
false), true);
+                checkpointDir,
+                null,
+                createRocksDBStateBackend(checkpointDir, false),
+                true,
+                restoreMode);
     }
 
     @Test
     public void testExternalizedFSCheckpointsStandalone() throws Exception {
         final File checkpointDir = temporaryFolder.newFolder();
         testExternalizedCheckpoints(
-                checkpointDir, null, createFsStateBackend(checkpointDir), 
false);
+                checkpointDir, null, createFsStateBackend(checkpointDir), 
false, restoreMode);
     }
 
     @Test
     public void testExternalizedFSCheckpointsWithLocalRecoveryStandalone() 
throws Exception {
         final File checkpointDir = temporaryFolder.newFolder();
-        testExternalizedCheckpoints(checkpointDir, null, 
createFsStateBackend(checkpointDir), true);
+        testExternalizedCheckpoints(
+                checkpointDir, null, createFsStateBackend(checkpointDir), 
true, restoreMode);
     }
 
     @Test
@@ -129,7 +159,8 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                     checkpointDir,
                     zkServer.getConnectString(),
                     createRocksDBStateBackend(checkpointDir, true),
-                    false);
+                    false,
+                    restoreMode);
         } finally {
             zkServer.stop();
         }
@@ -145,7 +176,8 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                     checkpointDir,
                     zkServer.getConnectString(),
                     createRocksDBStateBackend(checkpointDir, false),
-                    false);
+                    false,
+                    restoreMode);
         } finally {
             zkServer.stop();
         }
@@ -162,7 +194,8 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                     checkpointDir,
                     zkServer.getConnectString(),
                     createRocksDBStateBackend(checkpointDir, true),
-                    true);
+                    true,
+                    restoreMode);
         } finally {
             zkServer.stop();
         }
@@ -179,7 +212,8 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                     checkpointDir,
                     zkServer.getConnectString(),
                     createRocksDBStateBackend(checkpointDir, false),
-                    true);
+                    true,
+                    restoreMode);
         } finally {
             zkServer.stop();
         }
@@ -195,7 +229,8 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                     checkpointDir,
                     zkServer.getConnectString(),
                     createFsStateBackend(checkpointDir),
-                    false);
+                    false,
+                    restoreMode);
         } finally {
             zkServer.stop();
         }
@@ -211,7 +246,8 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
                     checkpointDir,
                     zkServer.getConnectString(),
                     createFsStateBackend(checkpointDir),
-                    true);
+                    true,
+                    restoreMode);
         } finally {
             zkServer.stop();
         }
@@ -227,8 +263,12 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
         return new RocksDBStateBackend(checkpointDir.toURI().toString(), 
incrementalCheckpointing);
     }
 
-    private void testExternalizedCheckpoints(
-            File checkpointDir, String zooKeeperQuorum, StateBackend backend, 
boolean localRecovery)
+    private static void testExternalizedCheckpoints(
+            File checkpointDir,
+            String zooKeeperQuorum,
+            StateBackend backend,
+            boolean localRecovery,
+            RestoreMode restoreMode)
             throws Exception {
 
         final Configuration config = new Configuration();
@@ -264,22 +304,28 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
 
         cluster.before();
 
-        ClusterClient<?> client = cluster.getClusterClient();
-
         try {
             // main test sequence:  start job -> eCP -> restore job -> eCP -> 
restore job
             String firstExternalCheckpoint =
-                    runJobAndGetExternalizedCheckpoint(backend, checkpointDir, 
null, client);
+                    runJobAndGetExternalizedCheckpoint(backend, null, cluster, 
restoreMode);
             assertNotNull(firstExternalCheckpoint);
 
             String secondExternalCheckpoint =
                     runJobAndGetExternalizedCheckpoint(
-                            backend, checkpointDir, firstExternalCheckpoint, 
client);
+                            backend, firstExternalCheckpoint, cluster, 
restoreMode);
             assertNotNull(secondExternalCheckpoint);
 
             String thirdExternalCheckpoint =
                     runJobAndGetExternalizedCheckpoint(
-                            backend, checkpointDir, secondExternalCheckpoint, 
client);
+                            backend,
+                            // in CLAIM mode, the previous run is only 
guaranteed to preserve the
+                            // latest checkpoint; in NO_CLAIM/LEGACY, even the 
initial checkpoints
+                            // must remain valid
+                            restoreMode == RestoreMode.CLAIM
+                                    ? secondExternalCheckpoint
+                                    : firstExternalCheckpoint,
+                            cluster,
+                            restoreMode);
             assertNotNull(thirdExternalCheckpoint);
         } finally {
             cluster.after();
@@ -288,26 +334,32 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
 
     private static String runJobAndGetExternalizedCheckpoint(
             StateBackend backend,
-            File checkpointDir,
             @Nullable String externalCheckpoint,
-            ClusterClient<?> client)
+            MiniClusterWithClientResource cluster,
+            RestoreMode restoreMode)
             throws Exception {
-        JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint);
+        JobGraph initialJobGraph = getJobGraph(backend, externalCheckpoint, 
restoreMode);
         NotifyingInfiniteTupleSource.countDownLatch = new 
CountDownLatch(PARALLELISM);
-
-        client.submitJob(initialJobGraph).get();
+        cluster.getClusterClient().submitJob(initialJobGraph).get();
 
         // wait until all sources have been started
         NotifyingInfiniteTupleSource.countDownLatch.await();
 
-        TestUtils.waitUntilExternalizedCheckpointCreated(checkpointDir);
-        client.cancel(initialJobGraph.getJobID()).get();
-        TestUtils.waitUntilJobCanceled(initialJobGraph.getJobID(), client);
-
-        return 
TestUtils.getMostRecentCompletedCheckpoint(checkpointDir).getAbsolutePath();
+        // complete at least two checkpoints so that the initial checkpoint 
can be subsumed
+        waitForCheckpoint(initialJobGraph.getJobID(), 
cluster.getMiniCluster(), 2);
+        cluster.getClusterClient().cancel(initialJobGraph.getJobID()).get();
+        waitUntilJobCanceled(initialJobGraph.getJobID(), 
cluster.getClusterClient());
+
+        return getLatestCompletedCheckpointPath(
+                        initialJobGraph.getJobID(), cluster.getMiniCluster())
+                .<IllegalStateException>orElseThrow(
+                        () -> {
+                            throw new IllegalStateException("Checkpoint not 
generated");
+                        });
     }
 
-    private static JobGraph getJobGraph(StateBackend backend, @Nullable String 
externalCheckpoint) {
+    private static JobGraph getJobGraph(
+            StateBackend backend, @Nullable String externalCheckpoint, 
RestoreMode restoreMode) {
         final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
         env.enableCheckpointing(500);
@@ -316,6 +368,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
         env.getCheckpointConfig()
                 .setExternalizedCheckpointCleanup(
                         
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+        env.setRestartStrategy(RestartStrategies.noRestart());
 
         env.addSource(new NotifyingInfiniteTupleSource(10_000))
                 
.assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create())
@@ -331,7 +384,7 @@ public class ResumeCheckpointManuallyITCase extends 
TestLogger {
         // recover from previous iteration?
         if (externalCheckpoint != null) {
             jobGraph.setSavepointRestoreSettings(
-                    SavepointRestoreSettings.forPath(externalCheckpoint));
+                    SavepointRestoreSettings.forPath(externalCheckpoint, 
false, restoreMode));
         }
 
         return jobGraph;
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
index 6dc4606158f..aafa0966878 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogCompatibilityITCase.java
@@ -143,7 +143,7 @@ public class ChangelogCompatibilityITCase {
         ClusterClient<?> client = miniClusterResource.getClusterClient();
         submit(jobGraph, client);
         if (testCase.restoreSource == RestoreSource.CHECKPOINT) {
-            waitForCheckpoint(jobGraph.getJobID(), 
miniClusterResource.getMiniCluster());
+            waitForCheckpoint(jobGraph.getJobID(), 
miniClusterResource.getMiniCluster(), 1);
             client.cancel(jobGraph.getJobID()).get();
             // obtain the latest checkpoint *after* cancellation - that one 
won't be subsumed
             return CommonTestUtils.getLatestCompletedCheckpointPath(
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
index 0ad238b5b27..ed7b3d9e289 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/state/ChangelogRescalingITCase.java
@@ -326,7 +326,7 @@ public class ChangelogRescalingITCase extends TestLogger {
     }
 
     private String checkpointAndCancel(JobID jobID) throws Exception {
-        waitForCheckpoint(jobID, cluster.getMiniCluster());
+        waitForCheckpoint(jobID, cluster.getMiniCluster(), 1);
         cluster.getClusterClient().cancel(jobID).get();
         checkStatus(jobID);
         return CommonTestUtils.getLatestCompletedCheckpointPath(jobID, 
cluster.getMiniCluster())

Reply via email to