This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f64261c  [FLINK-22483][runtime] Remove 
CompletedCheckpointStore#recover) method an change contract of 
CheckpointRecoveryFactory#createCompletedCheckpointStore, so that newly 
constructed CheckpointStore is already recovered.
f64261c is described below

commit f64261c91b195ecdcd99975b51de540db89a3f48
Author: David Moravek <d...@apache.org>
AuthorDate: Thu Aug 5 16:52:47 2021 +0200

    [FLINK-22483][runtime] Remove CompletedCheckpointStore#recover) method an 
change contract of CheckpointRecoveryFactory#createCompletedCheckpointStore, so 
that newly constructed CheckpointStore is already recovered.
    
    It's enough to recover CompletedCheckpointStoreshould only once, right 
after JobMasterRunner gains leadership. This also ensures that we'll fetch 
checkpoints from the external store in a "jobmaster-future-thread", without 
pontetially blocking RPC threads.
    
    This closes #16652.
---
 .../KubernetesCheckpointRecoveryFactory.java       |   2 +-
 .../flink/kubernetes/utils/KubernetesUtils.java    |   3 +
 .../runtime/checkpoint/CheckpointCoordinator.java  |   9 +-
 .../checkpoint/CheckpointRecoveryFactory.java      |   6 +-
 .../checkpoint/CompletedCheckpointStore.java       |  10 +-
 ...ctivatedCheckpointCompletedCheckpointStore.java |   5 -
 .../DefaultCompletedCheckpointStore.java           | 163 ++++-------------
 .../DefaultCompletedCheckpointStoreUtils.java      | 117 +++++++++++++
 .../EmbeddedCompletedCheckpointStore.java          |  50 ++++--
 .../PerJobCheckpointRecoveryFactory.java           |  55 ++++--
 .../StandaloneCheckpointRecoveryFactory.java       |   2 +-
 .../StandaloneCompletedCheckpointStore.java        |   5 -
 .../ZooKeeperCheckpointRecoveryFactory.java        |   2 +-
 .../EmbeddedHaServicesWithLeadershipControl.java   |  29 ++-
 .../flink/runtime/minicluster/MiniCluster.java     |   2 +-
 .../flink/runtime/scheduler/SchedulerUtils.java    |   2 +-
 .../apache/flink/runtime/util/ZooKeeperUtils.java  |   4 +
 .../CheckpointCoordinatorFailureTest.java          |   5 -
 .../CheckpointCoordinatorRestoringTest.java        | 194 +++++++++++++--------
 .../checkpoint/CheckpointCoordinatorTest.java      |  33 ++--
 .../CheckpointCoordinatorTestingUtils.java         |   9 +
 .../checkpoint/CompletedCheckpointStoreTest.java   |  20 +--
 .../DefaultCompletedCheckpointStoreTest.java       |  61 ++++---
 .../DefaultCompletedCheckpointStoreUtilsTest.java  | 138 +++++++++++++++
 .../checkpoint/PerJobCheckpointRecoveryTest.java   |  58 ++++++
 .../StandaloneCompletedCheckpointStoreTest.java    |  10 +-
 .../TestingCheckpointRecoveryFactory.java          |   2 +-
 .../TestingCompletedCheckpointStore.java           |   3 -
 .../ZooKeeperCompletedCheckpointStoreITCase.java   |  41 ++---
 ...oKeeperCompletedCheckpointStoreMockitoTest.java |   8 +-
 .../ZooKeeperCompletedCheckpointStoreTest.java     | 133 +++-----------
 .../dispatcher/DispatcherFailoverITCase.java       |  18 +-
 .../jobmanager/DefaultJobGraphStoreTest.java       |   2 +-
 .../runtime/jobmaster/JobExecutionITCase.java      |   2 +-
 .../flink/runtime/jobmaster/JobMasterTest.java     |  11 +-
 .../LeaderChangeClusterComponentsTest.java         |   2 +-
 .../TestingMiniClusterConfiguration.java           |  17 +-
 .../persistence/TestingStateHandleStore.java       |   2 +-
 .../test/checkpointing/CheckpointStoreITCase.java  | 144 ++++++---------
 .../NotifyCheckpointAbortedITCase.java             |  26 +--
 .../test/checkpointing/RegionFailoverITCase.java   |  26 +--
 .../flink/test/checkpointing/SavepointITCase.java  |  28 +--
 .../ZooKeeperLeaderElectionITCase.java             |   2 +-
 43 files changed, 809 insertions(+), 652 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
index 10ff0f7..f8a661f 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesCheckpointRecoveryFactory.java
@@ -70,7 +70,7 @@ public class KubernetesCheckpointRecoveryFactory implements 
CheckpointRecoveryFa
     }
 
     @Override
-    public CompletedCheckpointStore createCheckpointStore(
+    public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
             JobID jobID, int maxNumberOfCheckpointsToRetain, ClassLoader 
userClassLoader)
             throws Exception {
 
diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
index d077c8e..ec662d0 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/utils/KubernetesUtils.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.kubernetes.kubeclient.resources.KubernetesPod;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore;
+import 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.DefaultJobGraphStore;
@@ -299,6 +300,8 @@ public class KubernetesUtils {
                 maxNumberOfCheckpointsToRetain,
                 stateHandleStore,
                 KubernetesCheckpointStoreUtil.INSTANCE,
+                
DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints(
+                        stateHandleStore, 
KubernetesCheckpointStoreUtil.INSTANCE),
                 executor);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 958bcc9..3b8dd1b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -1521,16 +1521,11 @@ public class CheckpointCoordinator {
             }
 
             // We create a new shared state registry object, so that all 
pending async disposal
-            // requests from previous
-            // runs will go against the old object (were they can do no harm).
-            // This must happen under the checkpoint lock.
+            // requests from previous runs will go against the old object 
(were they can do no
+            // harm). This must happen under the checkpoint lock.
             sharedStateRegistry.close();
             sharedStateRegistry = sharedStateRegistryFactory.create(executor);
 
-            // Recover the checkpoints, TODO this could be done only when 
there is a new leader, not
-            // on each recovery
-            completedCheckpointStore.recover();
-
             // Now, we re-register all (shared) states from the checkpoint 
store with the new
             // registry
             for (CompletedCheckpoint completedCheckpoint :
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
index bf6aa9f..88c2a56 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRecoveryFactory.java
@@ -24,14 +24,16 @@ import org.apache.flink.api.common.JobID;
 public interface CheckpointRecoveryFactory {
 
     /**
-     * Creates a {@link CompletedCheckpointStore} instance for a job.
+     * Creates a RECOVERED {@link CompletedCheckpointStore} instance for a 
job. In this context,
+     * RECOVERED means, that if we already have completed checkpoints from 
previous runs, we should
+     * use them as the initial state.
      *
      * @param jobId Job ID to recover checkpoints for
      * @param maxNumberOfCheckpointsToRetain Maximum number of checkpoints to 
retain
      * @param userClassLoader User code class loader of the job
      * @return {@link CompletedCheckpointStore} instance for the job
      */
-    CompletedCheckpointStore createCheckpointStore(
+    CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
             JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader 
userClassLoader)
             throws Exception;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index 8a73f4b..412b148 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -33,14 +33,6 @@ public interface CompletedCheckpointStore {
     Logger LOG = LoggerFactory.getLogger(CompletedCheckpointStore.class);
 
     /**
-     * Recover available {@link CompletedCheckpoint} instances.
-     *
-     * <p>After a call to this method, {@link #getLatestCheckpoint(boolean)} 
returns the latest
-     * available checkpoint.
-     */
-    void recover() throws Exception;
-
-    /**
      * Adds a {@link CompletedCheckpoint} instance to the list of completed 
checkpoints.
      *
      * <p>Only a bounded number of checkpoints is kept. When exceeding the 
maximum number of
@@ -107,7 +99,7 @@ public interface CompletedCheckpointStore {
      * or kept.
      *
      * @param jobStatus Job state on shut down
-     * @param checkpointsCleaner that will cleanup copmpleted checkpoints if 
needed
+     * @param checkpointsCleaner that will cleanup completed checkpoints if 
needed
      */
     void shutdown(JobStatus jobStatus, CheckpointsCleaner checkpointsCleaner) 
throws Exception;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java
index f0ef3cf..daee48e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DeactivatedCheckpointCompletedCheckpointStore.java
@@ -31,11 +31,6 @@ public enum DeactivatedCheckpointCompletedCheckpointStore 
implements CompletedCh
     INSTANCE;
 
     @Override
-    public void recover() throws Exception {
-        throw unsupportedOperationException();
-    }
-
-    @Override
     public void addCheckpoint(
             CompletedCheckpoint checkpoint,
             CheckpointsCleaner checkpointsCleaner,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
index 8bc3206..d505124 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStore.java
@@ -19,24 +19,20 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
 import org.apache.flink.runtime.persistence.ResourceVersion;
 import org.apache.flink.runtime.persistence.StateHandleStore;
-import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.Comparator;
+import java.util.Collection;
 import java.util.List;
-import java.util.Set;
 import java.util.concurrent.Executor;
-import java.util.stream.Collectors;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -61,9 +57,6 @@ public class DefaultCompletedCheckpointStore<R extends 
ResourceVersion<R>>
     private static final Logger LOG =
             LoggerFactory.getLogger(DefaultCompletedCheckpointStore.class);
 
-    private static final 
Comparator<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>
-            STRING_COMPARATOR = Comparator.comparing(o -> o.f1);
-
     /** Completed checkpoints state handle store. */
     private final StateHandleStore<CompletedCheckpoint, R> 
checkpointStateHandleStore;
 
@@ -81,6 +74,9 @@ public class DefaultCompletedCheckpointStore<R extends 
ResourceVersion<R>>
 
     private final CheckpointStoreUtil completedCheckpointStoreUtil;
 
+    /** False if store has been shutdown. */
+    private final AtomicBoolean running = new AtomicBoolean(true);
+
     /**
      * Creates a {@link DefaultCompletedCheckpointStore} instance.
      *
@@ -95,18 +91,14 @@ public class DefaultCompletedCheckpointStore<R extends 
ResourceVersion<R>>
             int maxNumberOfCheckpointsToRetain,
             StateHandleStore<CompletedCheckpoint, R> stateHandleStore,
             CheckpointStoreUtil completedCheckpointStoreUtil,
+            Collection<CompletedCheckpoint> completedCheckpoints,
             Executor executor) {
-
         checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain at 
least one checkpoint.");
-
         this.maxNumberOfCheckpointsToRetain = maxNumberOfCheckpointsToRetain;
-
         this.checkpointStateHandleStore = checkNotNull(stateHandleStore);
-
         this.completedCheckpoints = new 
ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
-
+        this.completedCheckpoints.addAll(completedCheckpoints);
         this.ioExecutor = checkNotNull(executor);
-
         this.completedCheckpointStoreUtil = 
checkNotNull(completedCheckpointStoreUtil);
     }
 
@@ -116,54 +108,12 @@ public class DefaultCompletedCheckpointStore<R extends 
ResourceVersion<R>>
     }
 
     /**
-     * Recover all the valid checkpoints from state handle store. All the 
successfully recovered
-     * checkpoints will be added to {@link #completedCheckpoints} sorted by 
checkpoint id.
-     */
-    @Override
-    public void recover() throws Exception {
-        LOG.info("Recovering checkpoints from {}.", 
checkpointStateHandleStore);
-
-        // Get all there is first
-        final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String>> initialCheckpoints =
-                checkpointStateHandleStore.getAllAndLock();
-
-        initialCheckpoints.sort(STRING_COMPARATOR);
-
-        final int numberOfInitialCheckpoints = initialCheckpoints.size();
-
-        LOG.info(
-                "Found {} checkpoints in {}.",
-                numberOfInitialCheckpoints,
-                checkpointStateHandleStore);
-        if (haveAllDownloaded(initialCheckpoints)) {
-            LOG.info(
-                    "All {} checkpoints found are already downloaded.", 
numberOfInitialCheckpoints);
-            return;
-        }
-
-        final List<CompletedCheckpoint> retrievedCheckpoints =
-                new ArrayList<>(numberOfInitialCheckpoints);
-        LOG.info("Trying to fetch {} checkpoints from storage.", 
numberOfInitialCheckpoints);
-
-        for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> 
checkpointStateHandle :
-                initialCheckpoints) {
-            retrievedCheckpoints.add(
-                    
checkNotNull(retrieveCompletedCheckpoint(checkpointStateHandle)));
-        }
-
-        // Clear local handles in order to prevent duplicates on recovery. The 
local handles should
-        // reflect the state handle store contents.
-        completedCheckpoints.clear();
-        completedCheckpoints.addAll(retrievedCheckpoints);
-    }
-
-    /**
      * Synchronously writes the new checkpoints to state handle store and 
asynchronously removes
      * older ones.
      *
      * @param checkpoint Completed checkpoint to add.
      * @throws PossibleInconsistentStateException if adding the checkpoint 
failed and leaving the
-     *     system in an possibly inconsistent state, i.e. it's uncertain 
whether the checkpoint
+     *     system in a possibly inconsistent state, i.e. it's uncertain 
whether the checkpoint
      *     metadata was fully written to the underlying systems or not.
      */
     @Override
@@ -172,13 +122,13 @@ public class DefaultCompletedCheckpointStore<R extends 
ResourceVersion<R>>
             CheckpointsCleaner checkpointsCleaner,
             Runnable postCleanup)
             throws Exception {
-
+        Preconditions.checkState(running.get(), "Checkpoint store has already 
been shutdown.");
         checkNotNull(checkpoint, "Checkpoint");
 
         final String path =
                 
completedCheckpointStoreUtil.checkpointIDToName(checkpoint.getCheckpointID());
 
-        // Now add the new one. If it fails, we don't want to loose existing 
data.
+        // Now add the new one. If it fails, we don't want to lose existing 
data.
         checkpointStateHandleStore.addAndLock(path, checkpoint);
 
         completedCheckpoints.addLast(checkpoint);
@@ -214,30 +164,28 @@ public class DefaultCompletedCheckpointStore<R extends 
ResourceVersion<R>>
     @Override
     public void shutdown(JobStatus jobStatus, CheckpointsCleaner 
checkpointsCleaner)
             throws Exception {
-        if (jobStatus.isGloballyTerminalState()) {
-            LOG.info("Shutting down");
-
-            for (CompletedCheckpoint checkpoint : completedCheckpoints) {
-                try {
-                    tryRemoveCompletedCheckpoint(
-                            checkpoint,
-                            checkpoint.shouldBeDiscardedOnShutdown(jobStatus),
-                            checkpointsCleaner,
-                            () -> {});
-                } catch (Exception e) {
-                    LOG.warn("Fail to remove checkpoint during shutdown.", e);
+        if (running.compareAndSet(true, false)) {
+            if (jobStatus.isGloballyTerminalState()) {
+                LOG.info("Shutting down");
+                for (CompletedCheckpoint checkpoint : completedCheckpoints) {
+                    try {
+                        tryRemoveCompletedCheckpoint(
+                                checkpoint,
+                                
checkpoint.shouldBeDiscardedOnShutdown(jobStatus),
+                                checkpointsCleaner,
+                                () -> {});
+                    } catch (Exception e) {
+                        LOG.warn("Fail to remove checkpoint during shutdown.", 
e);
+                    }
                 }
+                completedCheckpoints.clear();
+                checkpointStateHandleStore.clearEntries();
+            } else {
+                LOG.info("Suspending");
+                // Clear the local handles, but don't remove any state
+                completedCheckpoints.clear();
+                checkpointStateHandleStore.releaseAll();
             }
-
-            completedCheckpoints.clear();
-            checkpointStateHandleStore.clearEntries();
-        } else {
-            LOG.info("Suspending");
-
-            // Clear the local handles, but don't remove any state
-            completedCheckpoints.clear();
-
-            checkpointStateHandleStore.releaseAll();
         }
     }
 
@@ -257,25 +205,6 @@ public class DefaultCompletedCheckpointStore<R extends 
ResourceVersion<R>>
         }
     }
 
-    private boolean haveAllDownloaded(
-            List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> 
checkpointPointers) {
-        if (completedCheckpoints.size() != checkpointPointers.size()) {
-            return false;
-        }
-        Set<Long> localIds =
-                completedCheckpoints.stream()
-                        .map(CompletedCheckpoint::getCheckpointID)
-                        .collect(Collectors.toSet());
-        for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> 
initialCheckpoint :
-                checkpointPointers) {
-            if (!localIds.contains(
-                    
completedCheckpointStoreUtil.nameToCheckpointID(initialCheckpoint.f1))) {
-                return false;
-            }
-        }
-        return true;
-    }
-
     /**
      * Tries to remove the checkpoint identified by the given checkpoint id.
      *
@@ -286,34 +215,4 @@ public class DefaultCompletedCheckpointStore<R extends 
ResourceVersion<R>>
         return checkpointStateHandleStore.releaseAndTryRemove(
                 completedCheckpointStoreUtil.checkpointIDToName(checkpointId));
     }
-
-    private CompletedCheckpoint retrieveCompletedCheckpoint(
-            Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> 
stateHandle)
-            throws FlinkException {
-        long checkpointId = 
completedCheckpointStoreUtil.nameToCheckpointID(stateHandle.f1);
-
-        LOG.info("Trying to retrieve checkpoint {}.", checkpointId);
-
-        try {
-            return stateHandle.f0.retrieveState();
-        } catch (ClassNotFoundException cnfe) {
-            throw new FlinkException(
-                    "Could not retrieve checkpoint "
-                            + checkpointId
-                            + " from state handle under "
-                            + stateHandle.f1
-                            + ". This indicates that you are trying to recover 
from state written by an "
-                            + "older Flink version which is not compatible. 
Try cleaning the state handle store.",
-                    cnfe);
-        } catch (IOException ioe) {
-            throw new FlinkException(
-                    "Could not retrieve checkpoint "
-                            + checkpointId
-                            + " from state handle under "
-                            + stateHandle.f1
-                            + ". This indicates that the retrieved state 
handle is broken. Try cleaning the "
-                            + "state handle store.",
-                    ioe);
-        }
-    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtils.java
new file mode 100644
index 0000000..0c9d547
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtils.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.persistence.ResourceVersion;
+import org.apache.flink.runtime.persistence.StateHandleStore;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.util.FlinkException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Helper methods related to {@link DefaultCompletedCheckpointStore}. */
+public class DefaultCompletedCheckpointStoreUtils {
+
+    private static final Logger LOG =
+            
LoggerFactory.getLogger(DefaultCompletedCheckpointStoreUtils.class);
+
+    private DefaultCompletedCheckpointStoreUtils() {
+        // No-op.
+    }
+
+    /**
+     * Fetch all {@link CompletedCheckpoint completed checkpoints} from an 
{@link StateHandleStore
+     * external store}. This method is intended for retrieving an initial 
state of {@link
+     * DefaultCompletedCheckpointStore}.
+     *
+     * @param checkpointStateHandleStore Completed checkpoints in external 
store.
+     * @param completedCheckpointStoreUtil Utilities for completed checkpoint 
store.
+     * @param <R> Type of {@link ResourceVersion}
+     * @return Immutable collection of {@link CompletedCheckpoint completed 
checkpoints}.
+     * @throws Exception If we're not able to fetch checkpoints for some 
reason.
+     */
+    public static <R extends ResourceVersion<R>>
+            Collection<CompletedCheckpoint> retrieveCompletedCheckpoints(
+                    StateHandleStore<CompletedCheckpoint, R> 
checkpointStateHandleStore,
+                    CheckpointStoreUtil completedCheckpointStoreUtil)
+                    throws Exception {
+
+        LOG.info("Recovering checkpoints from {}.", 
checkpointStateHandleStore);
+
+        // Get all there is first.
+        final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String>> initialCheckpoints =
+                checkpointStateHandleStore.getAllAndLock();
+
+        // Sort checkpoints by name.
+        initialCheckpoints.sort(Comparator.comparing(o -> o.f1));
+
+        final int numberOfInitialCheckpoints = initialCheckpoints.size();
+
+        LOG.info(
+                "Found {} checkpoints in {}.",
+                numberOfInitialCheckpoints,
+                checkpointStateHandleStore);
+        final List<CompletedCheckpoint> retrievedCheckpoints =
+                new ArrayList<>(numberOfInitialCheckpoints);
+        LOG.info("Trying to fetch {} checkpoints from storage.", 
numberOfInitialCheckpoints);
+
+        for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> 
checkpointStateHandle :
+                initialCheckpoints) {
+            retrievedCheckpoints.add(
+                    checkNotNull(
+                            retrieveCompletedCheckpoint(
+                                    completedCheckpointStoreUtil, 
checkpointStateHandle)));
+        }
+        return Collections.unmodifiableList(retrievedCheckpoints);
+    }
+
+    private static CompletedCheckpoint retrieveCompletedCheckpoint(
+            CheckpointStoreUtil completedCheckpointStoreUtil,
+            Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> 
stateHandle)
+            throws FlinkException {
+        final long checkpointId = 
completedCheckpointStoreUtil.nameToCheckpointID(stateHandle.f1);
+        LOG.info("Trying to retrieve checkpoint {}.", checkpointId);
+        try {
+            return stateHandle.f0.retrieveState();
+        } catch (ClassNotFoundException exception) {
+            throw new FlinkException(
+                    String.format(
+                            "Could not retrieve checkpoint %d from state 
handle under %s. This indicates that you are trying to recover from state 
written by an older Flink version which is not compatible. Try cleaning the 
state handle store.",
+                            checkpointId, stateHandle.f1),
+                    exception);
+        } catch (IOException exception) {
+            throw new FlinkException(
+                    String.format(
+                            "Could not retrieve checkpoint %d from state 
handle under %s. This indicates that the retrieved state handle is broken. Try 
cleaning the state handle store.",
+                            checkpointId, stateHandle.f1),
+                    exception);
+        }
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
index d55857c..2c5a748 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/EmbeddedCompletedCheckpointStore.java
@@ -25,17 +25,22 @@ import org.apache.flink.util.Preconditions;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
 
-/**
- * An embedded in-memory checkpoint store, which supports shutdown and 
suspend. You can use this to
- * test HA as long as the factory always returns the same store instance.
- */
+/** An embedded in-memory checkpoint store, which supports shutdown and 
suspend. */
 public class EmbeddedCompletedCheckpointStore implements 
CompletedCheckpointStore {
 
+    private static void throwAlreadyShutdownException(JobStatus status) {
+        throw new IllegalStateException(
+                String.format("Store has been already shutdown with %s.", 
status));
+    }
+
     private final ArrayDeque<CompletedCheckpoint> checkpoints = new 
ArrayDeque<>(2);
 
-    private final Collection<CompletedCheckpoint> suspended = new 
ArrayDeque<>(2);
+    private final AtomicReference<JobStatus> shutdownStatus = new 
AtomicReference<>();
 
     private final int maxRetainedCheckpoints;
 
@@ -43,15 +48,15 @@ public class EmbeddedCompletedCheckpointStore implements 
CompletedCheckpointStor
         this(1);
     }
 
-    EmbeddedCompletedCheckpointStore(int maxRetainedCheckpoints) {
-        Preconditions.checkArgument(maxRetainedCheckpoints > 0);
-        this.maxRetainedCheckpoints = maxRetainedCheckpoints;
+    public EmbeddedCompletedCheckpointStore(int maxRetainedCheckpoints) {
+        this(maxRetainedCheckpoints, Collections.emptyList());
     }
 
-    @Override
-    public void recover() {
-        checkpoints.addAll(suspended);
-        suspended.clear();
+    public EmbeddedCompletedCheckpointStore(
+            int maxRetainedCheckpoints, Collection<CompletedCheckpoint> 
initialCheckpoints) {
+        Preconditions.checkArgument(maxRetainedCheckpoints > 0);
+        this.maxRetainedCheckpoints = maxRetainedCheckpoints;
+        this.checkpoints.addAll(initialCheckpoints);
     }
 
     @Override
@@ -60,8 +65,10 @@ public class EmbeddedCompletedCheckpointStore implements 
CompletedCheckpointStor
             CheckpointsCleaner checkpointsCleaner,
             Runnable postCleanup)
             throws Exception {
+        if (shutdownStatus.get() != null) {
+            throwAlreadyShutdownException(shutdownStatus.get());
+        }
         checkpoints.addLast(checkpoint);
-
         CheckpointSubsumeHelper.subsume(
                 checkpoints, maxRetainedCheckpoints, 
CompletedCheckpoint::discardOnSubsume);
     }
@@ -75,13 +82,13 @@ public class EmbeddedCompletedCheckpointStore implements 
CompletedCheckpointStor
     @Override
     public void shutdown(JobStatus jobStatus, CheckpointsCleaner 
checkpointsCleaner)
             throws Exception {
-        if (jobStatus.isGloballyTerminalState()) {
-            checkpoints.clear();
-            suspended.clear();
+        if (shutdownStatus.compareAndSet(null, jobStatus)) {
+            if (jobStatus.isGloballyTerminalState()) {
+                // We are done with this store. We should leave no checkpoints 
for recovery.
+                checkpoints.clear();
+            }
         } else {
-            suspended.clear();
-            suspended.addAll(checkpoints);
-            checkpoints.clear();
+            throwAlreadyShutdownException(shutdownStatus.get());
         }
     }
 
@@ -104,4 +111,9 @@ public class EmbeddedCompletedCheckpointStore implements 
CompletedCheckpointStor
     public boolean requiresExternalizedCheckpoints() {
         return false;
     }
+
+    @VisibleForTesting
+    public Optional<JobStatus> getShutdownStatus() {
+        return Optional.ofNullable(shutdownStatus.get());
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
index 3d178b3..5b91bb8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryFactory.java
@@ -21,46 +21,63 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.Function;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.function.BiFunction;
+import java.util.function.IntFunction;
 import java.util.function.Supplier;
 
 /**
  * Simple {@link CheckpointRecoveryFactory} which creates and keeps separate 
{@link
  * CompletedCheckpointStore} and {@link CheckpointIDCounter} for each {@link 
JobID}.
  */
-public class PerJobCheckpointRecoveryFactory implements 
CheckpointRecoveryFactory {
-    private final Function<Integer, CompletedCheckpointStore> 
completedCheckpointStorePerJobFactory;
+public class PerJobCheckpointRecoveryFactory<T extends 
CompletedCheckpointStore>
+        implements CheckpointRecoveryFactory {
+
+    @VisibleForTesting
+    public static <T extends CompletedCheckpointStore>
+            CheckpointRecoveryFactory 
withoutCheckpointStoreRecovery(IntFunction<T> storeFn) {
+        return new PerJobCheckpointRecoveryFactory<>(
+                (maxCheckpoints, previous) -> {
+                    if (previous != null) {
+                        throw new UnsupportedOperationException(
+                                "Checkpoint store recovery is not supported.");
+                    }
+                    return storeFn.apply(maxCheckpoints);
+                });
+    }
+
+    private final BiFunction<Integer, T, T> 
completedCheckpointStorePerJobFactory;
     private final Supplier<CheckpointIDCounter> 
checkpointIDCounterPerJobFactory;
-    private final Map<JobID, CompletedCheckpointStore> store;
-    private final Map<JobID, CheckpointIDCounter> counter;
+    private final ConcurrentMap<JobID, T> store;
+    private final ConcurrentMap<JobID, CheckpointIDCounter> counter;
+
+    public PerJobCheckpointRecoveryFactory(
+            BiFunction<Integer, T, T> completedCheckpointStorePerJobFactory) {
+        this(completedCheckpointStorePerJobFactory, 
StandaloneCheckpointIDCounter::new);
+    }
 
     public PerJobCheckpointRecoveryFactory(
-            Function<Integer, CompletedCheckpointStore> 
completedCheckpointStorePerJobFactory,
+            BiFunction<Integer, T, T> completedCheckpointStorePerJobFactory,
             Supplier<CheckpointIDCounter> checkpointIDCounterPerJobFactory) {
         this.completedCheckpointStorePerJobFactory = 
completedCheckpointStorePerJobFactory;
         this.checkpointIDCounterPerJobFactory = 
checkpointIDCounterPerJobFactory;
-        this.store = new HashMap<>();
-        this.counter = new HashMap<>();
+        this.store = new ConcurrentHashMap<>();
+        this.counter = new ConcurrentHashMap<>();
     }
 
     @Override
-    public CompletedCheckpointStore createCheckpointStore(
+    public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
             JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader 
userClassLoader) {
-        return store.computeIfAbsent(
+        return store.compute(
                 jobId,
-                jId -> 
completedCheckpointStorePerJobFactory.apply(maxNumberOfCheckpointsToRetain));
+                (key, previous) ->
+                        completedCheckpointStorePerJobFactory.apply(
+                                maxNumberOfCheckpointsToRetain, previous));
     }
 
     @Override
     public CheckpointIDCounter createCheckpointIDCounter(JobID jobId) {
         return counter.computeIfAbsent(jobId, jId -> 
checkpointIDCounterPerJobFactory.get());
     }
-
-    @VisibleForTesting
-    public static CheckpointRecoveryFactory useSameServicesForAllJobs(
-            CompletedCheckpointStore store, CheckpointIDCounter counter) {
-        return new PerJobCheckpointRecoveryFactory(n -> store, () -> counter);
-    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index ac08e79..c323256 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -25,7 +25,7 @@ import 
org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 public class StandaloneCheckpointRecoveryFactory implements 
CheckpointRecoveryFactory {
 
     @Override
-    public CompletedCheckpointStore createCheckpointStore(
+    public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
             JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader 
userClassLoader)
             throws Exception {
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 7723ff8..dce9531 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -57,11 +57,6 @@ public class StandaloneCompletedCheckpointStore implements 
CompletedCheckpointSt
     }
 
     @Override
-    public void recover() throws Exception {
-        // Nothing to do
-    }
-
-    @Override
     public void addCheckpoint(
             CompletedCheckpoint checkpoint,
             CheckpointsCleaner checkpointsCleaner,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index 379d6a3..6d665947 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -46,7 +46,7 @@ public class ZooKeeperCheckpointRecoveryFactory implements 
CheckpointRecoveryFac
     }
 
     @Override
-    public CompletedCheckpointStore createCheckpointStore(
+    public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
             JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader 
userClassLoader)
             throws Exception {
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
index 236594e..540fbf9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/nonha/embedded/EmbeddedHaServicesWithLeadershipControl.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -30,14 +29,30 @@ import java.util.concurrent.Executor;
 /** {@link EmbeddedHaServices} extension to expose leadership granting and 
revoking. */
 public class EmbeddedHaServicesWithLeadershipControl extends EmbeddedHaServices
         implements HaLeadershipControl {
-    private final CheckpointRecoveryFactory testingCheckpointRecoveryFactory;
+
+    private final CheckpointRecoveryFactory checkpointRecoveryFactory;
 
     public EmbeddedHaServicesWithLeadershipControl(Executor executor) {
+        this(
+                executor,
+                new 
PerJobCheckpointRecoveryFactory<EmbeddedCompletedCheckpointStore>(
+                        (maxCheckpoints, previous) -> {
+                            if (previous != null) {
+                                if (!previous.getShutdownStatus().isPresent()) 
{
+                                    throw new IllegalStateException(
+                                            "Completed checkpoint store from 
previous run has not yet shutdown.");
+                                }
+                                return new EmbeddedCompletedCheckpointStore(
+                                        maxCheckpoints, 
previous.getAllCheckpoints());
+                            }
+                            return new 
EmbeddedCompletedCheckpointStore(maxCheckpoints);
+                        }));
+    }
+
+    public EmbeddedHaServicesWithLeadershipControl(
+            Executor executor, CheckpointRecoveryFactory 
checkpointRecoveryFactory) {
         super(executor);
-        this.testingCheckpointRecoveryFactory =
-                new PerJobCheckpointRecoveryFactory(
-                        n -> new EmbeddedCompletedCheckpointStore(),
-                        StandaloneCheckpointIDCounter::new);
+        this.checkpointRecoveryFactory = checkpointRecoveryFactory;
     }
 
     @Override
@@ -82,7 +97,7 @@ public class EmbeddedHaServicesWithLeadershipControl extends 
EmbeddedHaServices
     public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
         synchronized (lock) {
             checkNotShutdown();
-            return testingCheckpointRecoveryFactory;
+            return checkpointRecoveryFactory;
         }
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 51b6bc2..6041a02 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -503,7 +503,7 @@ public class MiniCluster implements AutoCloseableAsync {
                 return 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
                         configuration, executor);
             default:
-                throw new IllegalConfigurationException("Unkown HA Services " 
+ haServices);
+                throw new IllegalConfigurationException("Unknown HA Services " 
+ haServices);
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
index ea31bb2..481baa3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerUtils.java
@@ -88,7 +88,7 @@ public final class SchedulerUtils {
                     
CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
         }
 
-        return recoveryFactory.createCheckpointStore(
+        return recoveryFactory.createRecoveredCompletedCheckpointStore(
                 jobId, maxNumberOfCheckpointsToRetain, classLoader);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index d9d5b18..01e9774 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore;
+import 
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStoreUtils;
 import 
org.apache.flink.runtime.checkpoint.DefaultLastStateConnectionStateListener;
 import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointStoreUtil;
@@ -439,6 +440,9 @@ public class ZooKeeperUtils {
                         maxNumberOfCheckpointsToRetain,
                         completedCheckpointStateHandleStore,
                         ZooKeeperCheckpointStoreUtil.INSTANCE,
+                        
DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints(
+                                completedCheckpointStateHandleStore,
+                                ZooKeeperCheckpointStoreUtil.INSTANCE),
                         executor);
 
         LOG.info(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 6445fe9..2424100 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -255,11 +255,6 @@ public class CheckpointCoordinatorFailureTest extends 
TestLogger {
         }
 
         @Override
-        public void recover() throws Exception {
-            throw new UnsupportedOperationException("Not implemented.");
-        }
-
-        @Override
         public void addCheckpoint(
                 CompletedCheckpoint checkpoint,
                 CheckpointsCleaner checkpointsCleaner,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
index 8adb1c8..c67b4d6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
@@ -65,6 +65,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -94,6 +95,7 @@ import static org.mockito.Mockito.verify;
 /** Tests for restoring checkpoint. */
 @SuppressWarnings("checkstyle:EmptyLineSeparator")
 public class CheckpointCoordinatorRestoringTest extends TestLogger {
+
     private static final String TASK_MANAGER_LOCATION_INFO = "Unknown 
location";
 
     private enum TestScaleType {
@@ -102,6 +104,70 @@ public class CheckpointCoordinatorRestoringTest extends 
TestLogger {
         SAME_PARALLELISM;
     }
 
+    private static void acknowledgeCheckpoint(
+            CheckpointCoordinator coordinator,
+            ExecutionGraph executionGraph,
+            ExecutionJobVertex jobVertex,
+            long checkpointId)
+            throws Exception {
+        final List<KeyGroupRange> partitions =
+                StateAssignmentOperation.createKeyGroupPartitions(
+                        jobVertex.getMaxParallelism(), 
jobVertex.getParallelism());
+        for (int partitionIdx = 0; partitionIdx < partitions.size(); 
partitionIdx++) {
+            TaskStateSnapshot subtaskState =
+                    mockSubtaskState(
+                            jobVertex.getJobVertexId(), partitionIdx, 
partitions.get(partitionIdx));
+            final AcknowledgeCheckpoint acknowledgeCheckpoint =
+                    new AcknowledgeCheckpoint(
+                            executionGraph.getJobID(),
+                            jobVertex
+                                    .getTaskVertices()[partitionIdx]
+                                    .getCurrentExecutionAttempt()
+                                    .getAttemptId(),
+                            checkpointId,
+                            new CheckpointMetrics(),
+                            subtaskState);
+            coordinator.receiveAcknowledgeMessage(
+                    acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
+        }
+    }
+
+    private static ExecutionGraph createExecutionGraph(List<TestingVertex> 
vertices)
+            throws Exception {
+        final 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder builder =
+                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder();
+        for (TestingVertex vertex : vertices) {
+            builder.addJobVertex(
+                    vertex.getId(), vertex.getParallelism(), 
vertex.getMaxParallelism());
+        }
+        return builder.build();
+    }
+
+    private static class TestingVertex {
+
+        private final JobVertexID id;
+        private final int parallelism;
+        private final int maxParallelism;
+
+        private TestingVertex(JobVertexID id, int parallelism, int 
maxParallelism) {
+            this.id = id;
+            this.parallelism = parallelism;
+            this.maxParallelism = maxParallelism;
+        }
+
+        public JobVertexID getId() {
+            return id;
+        }
+
+        public int getParallelism() {
+            return parallelism;
+        }
+
+        public int getMaxParallelism() {
+            return maxParallelism;
+        }
+    }
+
     private ManuallyTriggeredScheduledExecutor 
manuallyTriggeredScheduledExecutor;
 
     @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
@@ -116,99 +182,80 @@ public class CheckpointCoordinatorRestoringTest extends 
TestLogger {
     /**
      * Tests that the checkpointed partitioned and non-partitioned state is 
assigned properly to the
      * {@link Execution} upon recovery.
-     *
-     * @throws Exception
      */
     @Test
     public void testRestoreLatestCheckpointedState() throws Exception {
-        final JobVertexID jobVertexID1 = new JobVertexID();
-        final JobVertexID jobVertexID2 = new JobVertexID();
-        int parallelism1 = 3;
-        int parallelism2 = 2;
-        int maxParallelism1 = 42;
-        int maxParallelism2 = 13;
-
-        final ExecutionGraph graph =
-                new 
CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
-                        .addJobVertex(jobVertexID1, parallelism1, 
maxParallelism1)
-                        .addJobVertex(jobVertexID2, parallelism2, 
maxParallelism2)
-                        .build();
-
-        final ExecutionJobVertex jobVertex1 = graph.getJobVertex(jobVertexID1);
-        final ExecutionJobVertex jobVertex2 = graph.getJobVertex(jobVertexID2);
+        final List<TestingVertex> vertices =
+                Arrays.asList(
+                        new TestingVertex(new JobVertexID(), 3, 42),
+                        new TestingVertex(new JobVertexID(), 2, 13));
+        testRestoreLatestCheckpointedState(
+                vertices,
+                
testSuccessfulCheckpointsArePersistedToCompletedCheckpointStore(vertices));
+    }
 
-        CompletedCheckpointStore store = new 
EmbeddedCompletedCheckpointStore();
+    private Collection<CompletedCheckpoint>
+            testSuccessfulCheckpointsArePersistedToCompletedCheckpointStore(
+                    List<TestingVertex> vertices) throws Exception {
+        final ExecutionGraph executionGraph = createExecutionGraph(vertices);
+        final EmbeddedCompletedCheckpointStore store = new 
EmbeddedCompletedCheckpointStore();
 
         // set up the coordinator and validate the initial state
-        CheckpointCoordinator coord =
+        final CheckpointCoordinator coordinator =
                 new CheckpointCoordinatorBuilder()
-                        .setExecutionGraph(graph)
-                        .setCompletedCheckpointStore(store)
+                        .setExecutionGraph(executionGraph)
                         .setTimer(manuallyTriggeredScheduledExecutor)
+                        .setCompletedCheckpointStore(store)
                         .build();
 
         // trigger the checkpoint
-        coord.triggerCheckpoint(false);
+        coordinator.triggerCheckpoint(false);
         manuallyTriggeredScheduledExecutor.triggerAll();
 
-        assertEquals(1, coord.getPendingCheckpoints().size());
-        long checkpointId = 
Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
-
-        List<KeyGroupRange> keyGroupPartitions1 =
-                
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, 
parallelism1);
-        List<KeyGroupRange> keyGroupPartitions2 =
-                
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, 
parallelism2);
-
-        for (int index = 0; index < jobVertex1.getParallelism(); index++) {
-            TaskStateSnapshot subtaskState =
-                    mockSubtaskState(jobVertexID1, index, 
keyGroupPartitions1.get(index));
+        // we should have a single pending checkpoint
+        assertEquals(1, coordinator.getPendingCheckpoints().size());
+        final long checkpointId =
+                
Iterables.getOnlyElement(coordinator.getPendingCheckpoints().keySet());
 
-            AcknowledgeCheckpoint acknowledgeCheckpoint =
-                    new AcknowledgeCheckpoint(
-                            graph.getJobID(),
-                            jobVertex1
-                                    .getTaskVertices()[index]
-                                    .getCurrentExecutionAttempt()
-                                    .getAttemptId(),
-                            checkpointId,
-                            new CheckpointMetrics(),
-                            subtaskState);
-
-            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
TASK_MANAGER_LOCATION_INFO);
+        // acknowledge checkpoints from all vertex partitions
+        for (TestingVertex vertex : vertices) {
+            final ExecutionJobVertex executionVertex =
+                    
Objects.requireNonNull(executionGraph.getJobVertex(vertex.getId()));
+            acknowledgeCheckpoint(coordinator, executionGraph, 
executionVertex, checkpointId);
         }
 
-        for (int index = 0; index < jobVertex2.getParallelism(); index++) {
-            TaskStateSnapshot subtaskState =
-                    mockSubtaskState(jobVertexID2, index, 
keyGroupPartitions2.get(index));
-
-            AcknowledgeCheckpoint acknowledgeCheckpoint =
-                    new AcknowledgeCheckpoint(
-                            graph.getJobID(),
-                            jobVertex2
-                                    .getTaskVertices()[index]
-                                    .getCurrentExecutionAttempt()
-                                    .getAttemptId(),
-                            checkpointId,
-                            new CheckpointMetrics(),
-                            subtaskState);
-
-            coord.receiveAcknowledgeMessage(acknowledgeCheckpoint, 
TASK_MANAGER_LOCATION_INFO);
-        }
-
-        List<CompletedCheckpoint> completedCheckpoints = 
coord.getSuccessfulCheckpoints();
-
+        final List<CompletedCheckpoint> completedCheckpoints =
+                coordinator.getSuccessfulCheckpoints();
         assertEquals(1, completedCheckpoints.size());
 
         // shutdown the store
         store.shutdown(JobStatus.SUSPENDED, new CheckpointsCleaner());
 
-        // restore the store
-        Set<ExecutionJobVertex> tasks = new HashSet<>();
+        return store.getAllCheckpoints();
+    }
 
-        tasks.add(jobVertex1);
-        tasks.add(jobVertex2);
+    private void testRestoreLatestCheckpointedState(
+            List<TestingVertex> vertices, Collection<CompletedCheckpoint> 
completedCheckpoints)
+            throws Exception {
+        final ExecutionGraph executionGraph = createExecutionGraph(vertices);
+        final EmbeddedCompletedCheckpointStore store =
+                new EmbeddedCompletedCheckpointStore(
+                        completedCheckpoints.size(), completedCheckpoints);
 
-        assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false));
+        // set up the coordinator and validate the initial state
+        final CheckpointCoordinator coordinator =
+                new CheckpointCoordinatorBuilder()
+                        .setExecutionGraph(executionGraph)
+                        .setTimer(manuallyTriggeredScheduledExecutor)
+                        .setCompletedCheckpointStore(store)
+                        .build();
+
+        final Set<ExecutionJobVertex> executionVertices =
+                vertices.stream()
+                        .map(TestingVertex::getId)
+                        .map(executionGraph::getJobVertex)
+                        .collect(Collectors.toSet());
+        
assertTrue(coordinator.restoreLatestCheckpointedStateToAll(executionVertices, 
false));
 
         // validate that all shared states are registered again after the 
recovery.
         for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
@@ -221,8 +268,9 @@ public class CheckpointCoordinatorRestoringTest extends 
TestLogger {
         }
 
         // verify the restored state
-        verifyStateRestore(jobVertexID1, jobVertex1, keyGroupPartitions1);
-        verifyStateRestore(jobVertexID2, jobVertex2, keyGroupPartitions2);
+        for (ExecutionJobVertex executionVertex : executionVertices) {
+            verifyStateRestore(executionVertex);
+        }
     }
 
     @Test
@@ -393,8 +441,6 @@ public class CheckpointCoordinatorRestoringTest extends 
TestLogger {
     /**
      * Tests the checkpoint restoration with changing parallelism of job 
vertex with partitioned
      * state.
-     *
-     * @throws Exception
      */
     private void 
testRestoreLatestCheckpointedStateWithChangingParallelism(boolean scaleOut)
             throws Exception {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index def269f..933069a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2265,7 +2265,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
     }
 
     @Test
-    public void testMaxConcurrentAttempsWithSubsumption() throws Exception {
+    public void testMaxConcurrentAttemptsWithSubsumption() throws Exception {
         final int maxConcurrentAttempts = 2;
         JobVertexID jobVertexID1 = new JobVertexID();
 
@@ -2801,15 +2801,13 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
 
         ExecutionJobVertex jobVertex1 = graph.getJobVertex(jobVertexID1);
 
-        EmbeddedCompletedCheckpointStore store = new 
EmbeddedCompletedCheckpointStore(10);
-
+        final EmbeddedCompletedCheckpointStore store = new 
EmbeddedCompletedCheckpointStore(10);
         final List<SharedStateRegistry> createdSharedStateRegistries = new 
ArrayList<>(2);
 
         // set up the coordinator and validate the initial state
-        CheckpointCoordinator checkpointCoordinator =
+        final CheckpointCoordinatorBuilder coordinatorBuilder =
                 new CheckpointCoordinatorBuilder()
                         .setExecutionGraph(graph)
-                        .setCompletedCheckpointStore(store)
                         .setTimer(manuallyTriggeredScheduledExecutor)
                         .setSharedStateRegistryFactory(
                                 deleteExecutor -> {
@@ -2817,8 +2815,9 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                             new 
SharedStateRegistry(deleteExecutor);
                                     createdSharedStateRegistries.add(instance);
                                     return instance;
-                                })
-                        .build();
+                                });
+        final CheckpointCoordinator coordinator =
+                coordinatorBuilder.setCompletedCheckpointStore(store).build();
 
         final int numCheckpoints = 3;
 
@@ -2827,11 +2826,10 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
 
         for (int i = 0; i < numCheckpoints; ++i) {
             performIncrementalCheckpoint(
-                    graph.getJobID(), checkpointCoordinator, jobVertex1, 
keyGroupPartitions1, i);
+                    graph.getJobID(), coordinator, jobVertex1, 
keyGroupPartitions1, i);
         }
 
-        List<CompletedCheckpoint> completedCheckpoints =
-                checkpointCoordinator.getSuccessfulCheckpoints();
+        List<CompletedCheckpoint> completedCheckpoints = 
coordinator.getSuccessfulCheckpoints();
         assertEquals(numCheckpoints, completedCheckpoints.size());
 
         int sharedHandleCount = 0;
@@ -2901,7 +2899,13 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
         // restore the store
         Set<ExecutionJobVertex> tasks = new HashSet<>();
         tasks.add(jobVertex1);
-        
assertTrue(checkpointCoordinator.restoreLatestCheckpointedStateToAll(tasks, 
false));
+
+        assertEquals(JobStatus.SUSPENDED, 
store.getShutdownStatus().orElse(null));
+        final EmbeddedCompletedCheckpointStore secondStore =
+                new EmbeddedCompletedCheckpointStore(10, 
store.getAllCheckpoints());
+        final CheckpointCoordinator secondCoordinator =
+                
coordinatorBuilder.setCompletedCheckpointStore(secondStore).build();
+        
assertTrue(secondCoordinator.restoreLatestCheckpointedStateToAll(tasks, false));
 
         // validate that all shared states are registered again after the 
recovery.
         cp = 0;
@@ -2919,7 +2923,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
 
                         // check that all are registered with the new registry
                         verify(keyedStateHandle, verificationMode)
-                                
.registerSharedStates(createdSharedStateRegistries.get(1));
+                                .registerSharedStates(
+                                        
Iterables.getLast(createdSharedStateRegistries));
                     }
                 }
             }
@@ -2927,7 +2932,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
         }
 
         // discard CP1
-        store.removeOldestCheckpoint();
+        secondStore.removeOldestCheckpoint();
 
         // we expect that all shared state from CP0 is no longer referenced 
and discarded. CP2 is
         // still live and also
@@ -2945,7 +2950,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
         }
 
         // discard CP2
-        store.removeOldestCheckpoint();
+        secondStore.removeOldestCheckpoint();
 
         // we expect all shared state was discarded now, because all CPs are
         for (Map<StateHandleID, StreamStateHandle> cpList : 
sharedHandlesByCheckpoint) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
index 38f9d2c..b990147 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTestingUtils.java
@@ -218,6 +218,15 @@ public class CheckpointCoordinatorTestingUtils {
         return new Tuple2<>(allSerializedValuesConcatenated, offsets);
     }
 
+    public static void verifyStateRestore(ExecutionJobVertex 
executionJobVertex) throws Exception {
+        verifyStateRestore(
+                executionJobVertex.getJobVertexId(),
+                executionJobVertex,
+                StateAssignmentOperation.createKeyGroupPartitions(
+                        executionJobVertex.getMaxParallelism(),
+                        executionJobVertex.getParallelism()));
+    }
+
     public static void verifyStateRestore(
             JobVertexID jobVertexID,
             ExecutionJobVertex executionJobVertex,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index e68c889..cab096e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -45,12 +45,12 @@ import static org.junit.Assert.assertTrue;
 public abstract class CompletedCheckpointStoreTest extends TestLogger {
 
     /** Creates the {@link CompletedCheckpointStore} implementation to be 
tested. */
-    protected abstract CompletedCheckpointStore createCompletedCheckpoints(
+    protected abstract CompletedCheckpointStore 
createRecoveredCompletedCheckpointStore(
             int maxNumberOfCheckpointsToRetain, Executor executor) throws 
Exception;
 
-    protected CompletedCheckpointStore createCompletedCheckpoints(
+    protected CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
             int maxNumberOfCheckpointsToRetain) throws Exception {
-        return createCompletedCheckpoints(
+        return createRecoveredCompletedCheckpointStore(
                 maxNumberOfCheckpointsToRetain, Executors.directExecutor());
     }
 
@@ -59,14 +59,14 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
     /** Tests that at least one checkpoint needs to be retained. */
     @Test(expected = Exception.class)
     public void testExceptionOnNoRetainedCheckpoints() throws Exception {
-        createCompletedCheckpoints(0);
+        createRecoveredCompletedCheckpointStore(0);
     }
 
     /** Tests adding and getting a checkpoint. */
     @Test
     public void testAddAndGetLatestCheckpoint() throws Exception {
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-        CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4);
+        CompletedCheckpointStore checkpoints = 
createRecoveredCompletedCheckpointStore(4);
 
         // Empty state
         assertEquals(0, checkpoints.getNumberOfRetainedCheckpoints());
@@ -95,7 +95,7 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
     @Test
     public void testAddCheckpointMoreThanMaxRetained() throws Exception {
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-        CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1);
+        CompletedCheckpointStore checkpoints = 
createRecoveredCompletedCheckpointStore(1);
 
         TestCompletedCheckpoint[] expected =
                 new TestCompletedCheckpoint[] {
@@ -131,7 +131,7 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
      */
     @Test
     public void testEmptyState() throws Exception {
-        CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1);
+        CompletedCheckpointStore checkpoints = 
createRecoveredCompletedCheckpointStore(1);
 
         assertNull(checkpoints.getLatestCheckpoint(false));
         assertEquals(0, checkpoints.getAllCheckpoints().size());
@@ -142,7 +142,7 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
     @Test
     public void testGetAllCheckpoints() throws Exception {
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-        CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4);
+        CompletedCheckpointStore checkpoints = 
createRecoveredCompletedCheckpointStore(4);
 
         TestCompletedCheckpoint[] expected =
                 new TestCompletedCheckpoint[] {
@@ -169,7 +169,7 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
     @Test
     public void testDiscardAllCheckpoints() throws Exception {
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-        CompletedCheckpointStore checkpoints = createCompletedCheckpoints(4);
+        CompletedCheckpointStore checkpoints = 
createRecoveredCompletedCheckpointStore(4);
 
         TestCompletedCheckpoint[] expected =
                 new TestCompletedCheckpoint[] {
@@ -201,7 +201,7 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
     @Test
     public void testAcquireLatestCompletedCheckpointId() throws Exception {
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-        CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1);
+        CompletedCheckpointStore checkpoints = 
createRecoveredCompletedCheckpointStore(1);
         assertEquals(0, checkpoints.getLatestCheckpointId());
 
         checkpoints.addCheckpoint(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
index bd0333c..8782aa4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.testutils.FlinkMatchers;
+import org.apache.flink.runtime.persistence.StateHandleStore;
 import 
org.apache.flink.runtime.persistence.TestingRetrievableStateStorageHelper;
 import org.apache.flink.runtime.persistence.TestingStateHandleStore;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
@@ -54,6 +55,7 @@ import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.fail;
 
 /** Tests for {@link DefaultCompletedCheckpointStore}. */
@@ -69,7 +71,7 @@ public class DefaultCompletedCheckpointStoreTest extends 
TestLogger {
 
     @Before
     public void setup() {
-        builder = TestingStateHandleStore.builder();
+        builder = TestingStateHandleStore.newBuilder();
         checkpointStorageHelper = new TestingRetrievableStateStorageHelper<>();
         executorService = Executors.newFixedThreadPool(2, new 
ExecutorThreadFactory("IO-Executor"));
     }
@@ -132,7 +134,8 @@ public class DefaultCompletedCheckpointStoreTest extends 
TestLogger {
 
     /**
      * We have three completed checkpoints(1, 2, 3) in the state handle store. 
We expect that {@link
-     * DefaultCompletedCheckpointStore#recover()} should recover the sorted 
checkpoints by name.
+     * 
DefaultCompletedCheckpointStoreUtils#retrieveCompletedCheckpoints(StateHandleStore,
+     * CheckpointStoreUtil)} should recover the sorted checkpoints by name.
      */
     @Test
     public void testRecoverSortedCheckpoints() throws Exception {
@@ -140,9 +143,6 @@ public class DefaultCompletedCheckpointStoreTest extends 
TestLogger {
                 builder.setGetAllSupplier(() -> createStateHandles(3)).build();
         final CompletedCheckpointStore completedCheckpointStore =
                 createCompletedCheckpointStore(stateHandleStore);
-
-        completedCheckpointStore.recover();
-
         final List<CompletedCheckpoint> recoveredCompletedCheckpoint =
                 completedCheckpointStore.getAllCheckpoints();
         assertThat(recoveredCompletedCheckpoint.size(), is(3));
@@ -155,7 +155,7 @@ public class DefaultCompletedCheckpointStoreTest extends 
TestLogger {
 
     /** We got an {@link IOException} when retrieving checkpoint 2. It should 
NOT be skipped. */
     @Test
-    public void testCorruptDataInStateHandleStoreShouldBeSkipped() throws 
Exception {
+    public void testCorruptDataInStateHandleStoreShouldNotBeSkipped() throws 
Exception {
         final long corruptCkpId = 2L;
         checkpointStorageHelper.setRetrieveStateFunction(
                 state -> {
@@ -167,11 +167,8 @@ public class DefaultCompletedCheckpointStoreTest extends 
TestLogger {
 
         final TestingStateHandleStore<CompletedCheckpoint> stateHandleStore =
                 builder.setGetAllSupplier(() -> createStateHandles(3)).build();
-        final CompletedCheckpointStore completedCheckpointStore =
-                createCompletedCheckpointStore(stateHandleStore);
-
         try {
-            completedCheckpointStore.recover();
+            createCompletedCheckpointStore(stateHandleStore);
         } catch (Exception e) {
             if (ExceptionUtils.findThrowable(e, 
IOException.class).isPresent()) {
                 return;
@@ -196,7 +193,6 @@ public class DefaultCompletedCheckpointStoreTest extends 
TestLogger {
         final CompletedCheckpointStore completedCheckpointStore =
                 createCompletedCheckpointStore(stateHandleStore);
 
-        completedCheckpointStore.recover();
         assertThat(completedCheckpointStore.getAllCheckpoints().size(), 
is(num));
         
assertThat(completedCheckpointStore.getAllCheckpoints().get(0).getCheckpointID(),
 is(1L));
 
@@ -229,7 +225,6 @@ public class DefaultCompletedCheckpointStoreTest extends 
TestLogger {
         final CompletedCheckpointStore completedCheckpointStore =
                 createCompletedCheckpointStore(stateHandleStore);
 
-        completedCheckpointStore.recover();
         assertThat(completedCheckpointStore.getAllCheckpoints().size(), 
is(num));
         
assertThat(completedCheckpointStore.getAllCheckpoints().get(0).getCheckpointID(),
 is(1L));
 
@@ -266,7 +261,6 @@ public class DefaultCompletedCheckpointStoreTest extends 
TestLogger {
         final CompletedCheckpointStore completedCheckpointStore =
                 createCompletedCheckpointStore(stateHandleStore);
 
-        completedCheckpointStore.recover();
         assertThat(completedCheckpointStore.getAllCheckpoints().size(), 
is(num));
 
         completedCheckpointStore.shutdown(JobStatus.CANCELED, new 
CheckpointsCleaner());
@@ -294,7 +288,6 @@ public class DefaultCompletedCheckpointStoreTest extends 
TestLogger {
         final CompletedCheckpointStore completedCheckpointStore =
                 createCompletedCheckpointStore(stateHandleStore);
 
-        completedCheckpointStore.recover();
         assertThat(completedCheckpointStore.getAllCheckpoints().size(), is(3));
 
         completedCheckpointStore.shutdown(JobStatus.CANCELLING, new 
CheckpointsCleaner());
@@ -310,6 +303,26 @@ public class DefaultCompletedCheckpointStoreTest extends 
TestLogger {
         assertThat(completedCheckpointStore.getAllCheckpoints().size(), is(0));
     }
 
+    @Test
+    public void testShutdownFailsAnyFutureCallsToAddCheckpoint() throws 
Exception {
+        final CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner();
+        for (JobStatus status : JobStatus.values()) {
+            final CompletedCheckpointStore completedCheckpointStore =
+                    createCompletedCheckpointStore(builder.build());
+            completedCheckpointStore.shutdown(status, checkpointsCleaner);
+            assertThrows(
+                    IllegalStateException.class,
+                    () ->
+                            completedCheckpointStore.addCheckpoint(
+                                    
CompletedCheckpointStoreTest.createCheckpoint(
+                                            0L, new SharedStateRegistry()),
+                                    checkpointsCleaner,
+                                    () -> {
+                                        // No-op.
+                                    }));
+        }
+    }
+
     private List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> 
createStateHandles(
             int num) {
         final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String>> stateHandles =
@@ -325,16 +338,16 @@ public class DefaultCompletedCheckpointStoreTest extends 
TestLogger {
     }
 
     private CompletedCheckpointStore createCompletedCheckpointStore(
-            TestingStateHandleStore<CompletedCheckpoint> stateHandleStore) {
+            TestingStateHandleStore<CompletedCheckpoint> stateHandleStore) 
throws Exception {
         return createCompletedCheckpointStore(stateHandleStore, 1);
     }
 
     private CompletedCheckpointStore createCompletedCheckpointStore(
-            TestingStateHandleStore<CompletedCheckpoint> stateHandleStore, int 
toRetain) {
-        return new DefaultCompletedCheckpointStore<>(
-                toRetain,
-                stateHandleStore,
+            TestingStateHandleStore<CompletedCheckpoint> stateHandleStore, int 
toRetain)
+            throws Exception {
+        final CheckpointStoreUtil checkpointStoreUtil =
                 new CheckpointStoreUtil() {
+
                     @Override
                     public String checkpointIDToName(long checkpointId) {
                         return String.valueOf(checkpointId);
@@ -342,9 +355,15 @@ public class DefaultCompletedCheckpointStoreTest extends 
TestLogger {
 
                     @Override
                     public long nameToCheckpointID(String name) {
-                        return Long.valueOf(name);
+                        return Long.parseLong(name);
                     }
-                },
+                };
+        return new DefaultCompletedCheckpointStore<>(
+                toRetain,
+                stateHandleStore,
+                checkpointStoreUtil,
+                
DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints(
+                        stateHandleStore, checkpointStoreUtil),
                 executorService);
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtilsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtilsTest.java
new file mode 100644
index 0000000..984f105
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/DefaultCompletedCheckpointStoreUtilsTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.persistence.TestingStateHandleStore;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+/** Tests related to {@link DefaultCompletedCheckpointStoreUtils}. */
+public class DefaultCompletedCheckpointStoreUtilsTest extends TestLogger {
+
+    private static CompletedCheckpoint createCompletedCheckpoint(long 
checkpointId) {
+        return new CompletedCheckpoint(
+                new JobID(),
+                checkpointId,
+                0,
+                1,
+                new HashMap<>(),
+                Collections.emptyList(),
+                
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
+                new TestCompletedCheckpointStorageLocation());
+    }
+
+    private static class FailingRetrievableStateHandle<T extends Serializable>
+            implements RetrievableStateHandle<T> {
+
+        private static final int serialVersionUID = 1;
+
+        @Override
+        public T retrieveState() throws IOException, ClassNotFoundException {
+            throw new IOException("Test exception.");
+        }
+
+        @Override
+        public void discardState() throws Exception {
+            // No-op.
+        }
+
+        @Override
+        public long getStateSize() {
+            return 0;
+        }
+    }
+
+    private static class SimpleCheckpointStoreUtil implements 
CheckpointStoreUtil {
+
+        @Override
+        public String checkpointIDToName(long checkpointId) {
+            return "checkpoint-" + checkpointId;
+        }
+
+        @Override
+        public long nameToCheckpointID(String name) {
+            return Long.parseLong(name.split("-")[1]);
+        }
+    }
+
+    @Test
+    public void testRetrievedCheckpointsAreOrderedChronologically() throws 
Exception {
+        final TestingRetrievableStateStorageHelper<CompletedCheckpoint> 
storageHelper =
+                new TestingRetrievableStateStorageHelper<>();
+        final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String>> handles =
+                new ArrayList<>();
+        
handles.add(Tuple2.of(storageHelper.store(createCompletedCheckpoint(0L)), 
"checkpoint-0"));
+        
handles.add(Tuple2.of(storageHelper.store(createCompletedCheckpoint(1L)), 
"checkpoint-1"));
+        
handles.add(Tuple2.of(storageHelper.store(createCompletedCheckpoint(2L)), 
"checkpoint-2"));
+        Collections.shuffle(handles);
+        final TestingStateHandleStore<CompletedCheckpoint> stateHandleStore =
+                TestingStateHandleStore.<CompletedCheckpoint>newBuilder()
+                        .setGetAllSupplier(() -> handles)
+                        .build();
+        final Collection<CompletedCheckpoint> completedCheckpoints =
+                
DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints(
+                        stateHandleStore, new SimpleCheckpointStoreUtil());
+        // Make sure checkpoints are ordered from earliest to latest.
+        assertEquals(
+                Arrays.asList(0L, 1L, 2L),
+                completedCheckpoints.stream()
+                        .map(CompletedCheckpoint::getCheckpointID)
+                        .collect(Collectors.toList()));
+    }
+
+    @Test
+    public void 
testRetrievingCheckpointsFailsIfRetrievalOfAnyCheckpointFails() throws 
Exception {
+        final TestingRetrievableStateStorageHelper<CompletedCheckpoint> 
storageHelper =
+                new TestingRetrievableStateStorageHelper<>();
+        final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String>> handles =
+                new ArrayList<>();
+        
handles.add(Tuple2.of(storageHelper.store(createCompletedCheckpoint(0L)), 
"checkpoint-0"));
+        handles.add(Tuple2.of(new FailingRetrievableStateHandle<>(), 
"checkpoint-1"));
+        
handles.add(Tuple2.of(storageHelper.store(createCompletedCheckpoint(2L)), 
"checkpoint-2"));
+        Collections.shuffle(handles);
+        final TestingStateHandleStore<CompletedCheckpoint> stateHandleStore =
+                TestingStateHandleStore.<CompletedCheckpoint>newBuilder()
+                        .setGetAllSupplier(() -> handles)
+                        .build();
+        assertThrows(
+                FlinkException.class,
+                () ->
+                        
DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints(
+                                stateHandleStore, new 
SimpleCheckpointStoreUtil()));
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
new file mode 100644
index 0000000..6b65d00
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PerJobCheckpointRecoveryTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+
+/** Tests related to {@link PerJobCheckpointRecoveryFactory}. */
+public class PerJobCheckpointRecoveryTest extends TestLogger {
+
+    @Test
+    public void testFactoryWithoutCheckpointStoreRecovery() throws Exception {
+        final TestingCompletedCheckpointStore store =
+                new TestingCompletedCheckpointStore(new CompletableFuture<>());
+        final CheckpointRecoveryFactory factory =
+                PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery(
+                        maxCheckpoints -> store);
+        final ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
+
+        final JobID firstJobId = new JobID();
+        assertSame(
+                store, 
factory.createRecoveredCompletedCheckpointStore(firstJobId, 1, classLoader));
+        assertThrows(
+                UnsupportedOperationException.class,
+                () -> 
factory.createRecoveredCompletedCheckpointStore(firstJobId, 1, classLoader));
+
+        final JobID secondJobId = new JobID();
+        assertSame(
+                store,
+                factory.createRecoveredCompletedCheckpointStore(secondJobId, 
1, classLoader));
+        assertThrows(
+                UnsupportedOperationException.class,
+                () -> 
factory.createRecoveredCompletedCheckpointStore(secondJobId, 1, classLoader));
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
index 4f4e2a7..218b023 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java
@@ -41,9 +41,8 @@ import static org.junit.Assert.assertTrue;
 public class StandaloneCompletedCheckpointStoreTest extends 
CompletedCheckpointStoreTest {
 
     @Override
-    protected CompletedCheckpointStore createCompletedCheckpoints(
+    protected CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
             int maxNumberOfCheckpointsToRetain, Executor executor) throws 
Exception {
-
         return new 
StandaloneCompletedCheckpointStore(maxNumberOfCheckpointsToRetain);
     }
 
@@ -51,7 +50,7 @@ public class StandaloneCompletedCheckpointStoreTest extends 
CompletedCheckpointS
     @Test
     public void testShutdownDiscardsCheckpoints() throws Exception {
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-        CompletedCheckpointStore store = createCompletedCheckpoints(1);
+        CompletedCheckpointStore store = 
createRecoveredCompletedCheckpointStore(1);
         TestCompletedCheckpoint checkpoint = createCheckpoint(0, 
sharedStateRegistry);
         Collection<OperatorState> operatorStates = 
checkpoint.getOperatorStates().values();
 
@@ -72,7 +71,7 @@ public class StandaloneCompletedCheckpointStoreTest extends 
CompletedCheckpointS
     @Test
     public void testSuspendDiscardsCheckpoints() throws Exception {
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-        CompletedCheckpointStore store = createCompletedCheckpoints(1);
+        CompletedCheckpointStore store = 
createRecoveredCompletedCheckpointStore(1);
         TestCompletedCheckpoint checkpoint = createCheckpoint(0, 
sharedStateRegistry);
         Collection<OperatorState> taskStates = 
checkpoint.getOperatorStates().values();
 
@@ -95,7 +94,8 @@ public class StandaloneCompletedCheckpointStoreTest extends 
CompletedCheckpointS
 
         final int numCheckpointsToRetain = 1;
         CompletedCheckpointStore store =
-                createCompletedCheckpoints(numCheckpointsToRetain, 
Executors.directExecutor());
+                createRecoveredCompletedCheckpointStore(
+                        numCheckpointsToRetain, Executors.directExecutor());
 
         CountDownLatch discardAttempted = new CountDownLatch(1);
         for (long i = 0; i < numCheckpointsToRetain + 1; ++i) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
index cda32d6..f4e9256 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCheckpointRecoveryFactory.java
@@ -32,7 +32,7 @@ public class TestingCheckpointRecoveryFactory implements 
CheckpointRecoveryFacto
     }
 
     @Override
-    public CompletedCheckpointStore createCheckpointStore(
+    public CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
             JobID jobId, int maxNumberOfCheckpointsToRetain, ClassLoader 
userClassLoader) {
         return store;
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java
index 9e95141..629dad1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java
@@ -33,9 +33,6 @@ public final class TestingCompletedCheckpointStore implements 
CompletedCheckpoin
     }
 
     @Override
-    public void recover() {}
-
-    @Override
     public void addCheckpoint(
             CompletedCheckpoint checkpoint,
             CheckpointsCleaner checkpointsCleaner,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index ddb8ce4..d909174 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -66,9 +66,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends 
CompletedCheckpoint
 
     @AfterClass
     public static void tearDown() throws Exception {
-        if (ZOOKEEPER != null) {
-            ZOOKEEPER.shutdown();
-        }
+        ZOOKEEPER.shutdown();
     }
 
     @Before
@@ -77,18 +75,19 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
     }
 
     @Override
-    protected CompletedCheckpointStore createCompletedCheckpoints(
+    protected CompletedCheckpointStore createRecoveredCompletedCheckpointStore(
             int maxNumberOfCheckpointsToRetain, Executor executor) throws 
Exception {
         final ZooKeeperStateHandleStore<CompletedCheckpoint> 
checkpointsInZooKeeper =
                 ZooKeeperUtils.createZooKeeperStateHandleStore(
                         ZOOKEEPER.getClient(),
                         CHECKPOINT_PATH,
                         new TestingRetrievableStateStorageHelper<>());
-
         return new DefaultCompletedCheckpointStore<>(
                 maxNumberOfCheckpointsToRetain,
                 checkpointsInZooKeeper,
                 checkpointStoreUtil,
+                
DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints(
+                        checkpointsInZooKeeper, checkpointStoreUtil),
                 executor);
     }
 
@@ -103,7 +102,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
     public void testRecover() throws Exception {
 
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-        CompletedCheckpointStore checkpoints = createCompletedCheckpoints(3);
+        CompletedCheckpointStore checkpoints = 
createRecoveredCompletedCheckpointStore(3);
 
         TestCompletedCheckpoint[] expected =
                 new TestCompletedCheckpoint[] {
@@ -128,7 +127,6 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
         // Recover
         sharedStateRegistry.close();
         sharedStateRegistry = new SharedStateRegistry();
-        checkpoints.recover();
 
         assertEquals(3, 
ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
         assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
@@ -157,7 +155,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
         CuratorFramework client = ZOOKEEPER.getClient();
 
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-        CompletedCheckpointStore store = createCompletedCheckpoints(1);
+        CompletedCheckpointStore store = 
createRecoveredCompletedCheckpointStore(1);
         TestCompletedCheckpoint checkpoint = createCheckpoint(0, 
sharedStateRegistry);
 
         store.addCheckpoint(checkpoint, new CheckpointsCleaner(), () -> {});
@@ -179,9 +177,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                                                 
checkpoint.getCheckpointID())));
 
         sharedStateRegistry.close();
-        store.recover();
 
-        assertEquals(0, store.getNumberOfRetainedCheckpoints());
+        assertEquals(
+                0, 
createRecoveredCompletedCheckpointStore(1).getNumberOfRetainedCheckpoints());
     }
 
     /**
@@ -193,7 +191,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
         CuratorFramework client = ZOOKEEPER.getClient();
 
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-        CompletedCheckpointStore store = createCompletedCheckpoints(1);
+        CompletedCheckpointStore store = 
createRecoveredCompletedCheckpointStore(1);
         TestCompletedCheckpoint checkpoint = createCheckpoint(0, 
sharedStateRegistry);
 
         store.addCheckpoint(checkpoint, new CheckpointsCleaner(), () -> {});
@@ -219,7 +217,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
 
         // Recover again
         sharedStateRegistry.close();
-        store.recover();
+        store = createRecoveredCompletedCheckpointStore(1);
 
         CompletedCheckpoint recovered = store.getLatestCheckpoint(false);
         assertEquals(checkpoint, recovered);
@@ -234,7 +232,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
     public void testLatestCheckpointRecovery() throws Exception {
         final int numCheckpoints = 3;
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-        CompletedCheckpointStore checkpointStore = 
createCompletedCheckpoints(numCheckpoints);
+        CompletedCheckpointStore checkpointStore =
+                createRecoveredCompletedCheckpointStore(numCheckpoints);
         List<CompletedCheckpoint> checkpoints = new 
ArrayList<>(numCheckpoints);
 
         checkpoints.add(createCheckpoint(9, sharedStateRegistry));
@@ -246,10 +245,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
         }
 
         sharedStateRegistry.close();
-        checkpointStore.recover();
-
-        CompletedCheckpoint latestCheckpoint = 
checkpointStore.getLatestCheckpoint(false);
 
+        final CompletedCheckpoint latestCheckpoint =
+                
createRecoveredCompletedCheckpointStore(numCheckpoints).getLatestCheckpoint(false);
         assertEquals(checkpoints.get(checkpoints.size() - 1), 
latestCheckpoint);
     }
 
@@ -264,10 +262,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
         final int numberOfCheckpoints = 1;
         final long waitingTimeout = 50L;
 
-        CompletedCheckpointStore zkCheckpointStore1 =
-                createCompletedCheckpoints(numberOfCheckpoints);
-        CompletedCheckpointStore zkCheckpointStore2 =
-                createCompletedCheckpoints(numberOfCheckpoints);
+        final CompletedCheckpointStore zkCheckpointStore1 =
+                createRecoveredCompletedCheckpointStore(numberOfCheckpoints);
 
         SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
 
@@ -279,7 +275,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
         // recover the checkpoint by a different checkpoint store
         sharedStateRegistry.close();
         sharedStateRegistry = new SharedStateRegistry();
-        zkCheckpointStore2.recover();
+        final CompletedCheckpointStore zkCheckpointStore2 =
+                createRecoveredCompletedCheckpointStore(numberOfCheckpoints);
 
         CompletedCheckpoint recoveredCheckpoint = 
zkCheckpointStore2.getLatestCheckpoint(false);
         assertTrue(recoveredCheckpoint instanceof TestCompletedCheckpoint);
@@ -339,7 +336,7 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
         final int maxCheckpointsToRetain = 1;
         ManuallyTriggeredScheduledExecutor executor = new 
ManuallyTriggeredScheduledExecutor();
         CompletedCheckpointStore checkpointStore =
-                createCompletedCheckpoints(maxCheckpointsToRetain, executor);
+                
createRecoveredCompletedCheckpointStore(maxCheckpointsToRetain, executor);
 
         int nbCheckpointsToInject = 3;
         for (int i = 1; i <= nbCheckpointsToInject; i++) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
index 2196473..222f82b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java
@@ -170,10 +170,10 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest 
extends TestLogger {
                         numCheckpointsToRetain,
                         zooKeeperStateHandleStoreMock,
                         zooKeeperCheckpointStoreUtil,
+                        
DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints(
+                                zooKeeperStateHandleStoreMock, 
zooKeeperCheckpointStoreUtil),
                         Executors.directExecutor());
 
-        zooKeeperCompletedCheckpointStore.recover();
-
         CompletedCheckpoint latestCompletedCheckpoint =
                 zooKeeperCompletedCheckpointStore.getLatestCheckpoint(false);
 
@@ -306,10 +306,10 @@ public class ZooKeeperCompletedCheckpointStoreMockitoTest 
extends TestLogger {
                         numCheckpointsToRetain,
                         zooKeeperStateHandleStoreMock,
                         zooKeeperCheckpointStoreUtil,
+                        
DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints(
+                                zooKeeperStateHandleStoreMock, 
zooKeeperCheckpointStoreUtil),
                         Executors.directExecutor());
 
-        zooKeeperCompletedCheckpointStore.recover();
-
         CompletedCheckpoint latestCompletedCheckpoint =
                 zooKeeperCompletedCheckpointStore.getLatestCheckpoint(true);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index ea0a3fc..1001b52 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.testutils.FlinkMatchers;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
@@ -30,10 +31,8 @@ import 
org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLo
 import org.apache.flink.runtime.util.ZooKeeperUtils;
 import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
-import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.Executors;
-import org.apache.flink.util.function.TriConsumer;
 
 import 
org.apache.flink.shaded.curator4.org.apache.curator.framework.CuratorFramework;
 
@@ -48,14 +47,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.function.Function;
-import java.util.stream.IntStream;
 
 import static 
org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
-import static 
org.apache.flink.runtime.checkpoint.CompletedCheckpointStoreTest.createCheckpoint;
-import static org.apache.flink.util.ExceptionUtils.findThrowable;
-import static org.apache.flink.util.ExceptionUtils.rethrow;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertThrows;
 
 /** Tests for {@link DefaultCompletedCheckpointStore} with {@link 
ZooKeeperStateHandleStore}. */
 public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger {
@@ -74,102 +70,14 @@ public class ZooKeeperCompletedCheckpointStoreTest extends 
TestLogger {
         assertEquals(checkpointId, 
zooKeeperCheckpointStoreUtil.nameToCheckpointID(path));
     }
 
-    @Test(expected = ExpectedTestException.class)
-    public void testRecoverFailsIfDownloadFails() throws Exception {
-        testDownloadInternal(
-                (store, checkpointsInZk, sharedStateRegistry) -> {
-                    try {
-                        checkpointsInZk.add(
-                                createHandle(
-                                        1,
-                                        id -> {
-                                            throw new ExpectedTestException();
-                                        }));
-                        store.recover();
-                    } catch (Exception exception) {
-                        findThrowable(exception, ExpectedTestException.class)
-                                .ifPresent(ExceptionUtils::rethrow);
-                        rethrow(exception);
-                    }
-                });
-    }
-
     @Test
-    public void testNoDownloadIfCheckpointsNotChanged() throws Exception {
-        testDownloadInternal(
-                (store, checkpointsInZk, sharedStateRegistry) -> {
-                    try {
-                        checkpointsInZk.add(
-                                createHandle(
-                                        1,
-                                        id -> {
-                                            throw new AssertionError(
-                                                    "retrieveState was 
attempted for checkpoint "
-                                                            + id);
-                                        }));
-                        store.addCheckpoint(
-                                createCheckpoint(1, sharedStateRegistry),
-                                new CheckpointsCleaner(),
-                                () -> {
-                                    /*no op*/
-                                });
-                        store.recover(); // will fail in case of attempt to 
retrieve state
-                    } catch (Exception exception) {
-                        throw new RuntimeException(exception);
-                    }
-                });
-    }
-
-    @Test
-    public void testDownloadIfCheckpointsChanged() throws Exception {
-        testDownloadInternal(
-                (store, checkpointsInZk, sharedStateRegistry) -> {
-                    try {
-                        int lastInZk = 10;
-                        IntStream.range(0, lastInZk + 1)
-                                .forEach(
-                                        i ->
-                                                checkpointsInZk.add(
-                                                        createHandle(
-                                                                i,
-                                                                id ->
-                                                                        
createCheckpoint(
-                                                                               
 id,
-                                                                               
 sharedStateRegistry))));
-                        store.addCheckpoint(
-                                createCheckpoint(1, sharedStateRegistry),
-                                new CheckpointsCleaner(),
-                                () -> {
-                                    /*no op*/
-                                });
-                        store.addCheckpoint(
-                                createCheckpoint(5, sharedStateRegistry),
-                                new CheckpointsCleaner(),
-                                () -> {
-                                    /*no op*/
-                                });
-                        store.recover();
-                        assertEquals(lastInZk, 
store.getLatestCheckpoint(false).getCheckpointID());
-                    } catch (Exception exception) {
-                        throw new RuntimeException(exception);
-                    }
-                });
-    }
-
-    private void testDownloadInternal(
-            TriConsumer<
-                            CompletedCheckpointStore,
-                            
List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>>,
-                            SharedStateRegistry>
-                    test)
-            throws Exception {
-        SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
-        Configuration configuration = new Configuration();
+    public void testRecoverFailsIfDownloadFails() {
+        final Configuration configuration = new Configuration();
         configuration.setString(
                 HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zooKeeperResource.getConnectString());
-        List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> 
checkpointsInZk =
+        final List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String>> checkpointsInZk =
                 new ArrayList<>();
-        ZooKeeperStateHandleStore<CompletedCheckpoint> checkpointsInZooKeeper =
+        final ZooKeeperStateHandleStore<CompletedCheckpoint> 
checkpointsInZooKeeper =
                 new ZooKeeperStateHandleStore<CompletedCheckpoint>(
                         ZooKeeperUtils.startCuratorFramework(configuration),
                         new TestingRetrievableStateStorageHelper<>()) {
@@ -180,18 +88,19 @@ public class ZooKeeperCompletedCheckpointStoreTest extends 
TestLogger {
                     }
                 };
 
-        CompletedCheckpointStore store =
-                new DefaultCompletedCheckpointStore<>(
-                        10,
-                        checkpointsInZooKeeper,
-                        zooKeeperCheckpointStoreUtil,
-                        Executors.directExecutor());
-        try {
-            test.accept(store, checkpointsInZk, sharedStateRegistry);
-        } finally {
-            store.shutdown(JobStatus.FINISHED, new CheckpointsCleaner());
-            sharedStateRegistry.close();
-        }
+        checkpointsInZk.add(
+                createHandle(
+                        1,
+                        id -> {
+                            throw new ExpectedTestException();
+                        }));
+        final Exception exception =
+                assertThrows(
+                        Exception.class,
+                        () ->
+                                
DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints(
+                                        checkpointsInZooKeeper, 
zooKeeperCheckpointStoreUtil));
+        assertThat(exception, 
FlinkMatchers.containsCause(ExpectedTestException.class));
     }
 
     private Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> 
createHandle(
@@ -269,11 +178,11 @@ public class ZooKeeperCompletedCheckpointStoreTest 
extends TestLogger {
         final ZooKeeperStateHandleStore<CompletedCheckpoint> 
checkpointsInZooKeeper =
                 ZooKeeperUtils.createZooKeeperStateHandleStore(
                         client, "/checkpoints", new 
TestingRetrievableStateStorageHelper<>());
-
         return new DefaultCompletedCheckpointStore<>(
                 1,
                 checkpointsInZooKeeper,
                 zooKeeperCheckpointStoreUtil,
+                Collections.emptyList(),
                 Executors.directExecutor());
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
index 1b92af0..f4377a7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherFailoverITCase.java
@@ -19,11 +19,9 @@ package org.apache.flink.runtime.dispatcher;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import 
org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneRunningJobsRegistry;
@@ -58,6 +56,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /** An integration test for various fail-over scenarios of the {@link 
Dispatcher} component. */
@@ -68,11 +67,18 @@ public class DispatcherFailoverITCase extends 
AbstractDispatcherTest {
     @Before
     public void setUp() throws Exception {
         super.setUp();
-        final CompletedCheckpointStore completedCheckpointStore =
-                new EmbeddedCompletedCheckpointStore();
         haServices.setCheckpointRecoveryFactory(
-                PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs(
-                        completedCheckpointStore, new 
StandaloneCheckpointIDCounter()));
+                new 
PerJobCheckpointRecoveryFactory<EmbeddedCompletedCheckpointStore>(
+                        (maxCheckpoints, previous) -> {
+                            if (previous != null) {
+                                // First job attempt failed before cleaning up 
the checkpoint store.
+                                
assertFalse(previous.getShutdownStatus().isPresent());
+                                
assertFalse(previous.getAllCheckpoints().isEmpty());
+                                return new EmbeddedCompletedCheckpointStore(
+                                        maxCheckpoints, 
previous.getAllCheckpoints());
+                            }
+                            return new 
EmbeddedCompletedCheckpointStore(maxCheckpoints);
+                        }));
     }
 
     @After
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java
index 8438312..568e937 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java
@@ -68,7 +68,7 @@ public class DefaultJobGraphStoreTest extends TestLogger {
 
     @Before
     public void setup() {
-        builder = TestingStateHandleStore.builder();
+        builder = TestingStateHandleStore.newBuilder();
         testingJobGraphStoreWatcher = new TestingJobGraphStoreWatcher();
         testingJobGraphListener = new TestingJobGraphListener();
         jobGraphStorageHelper = new TestingRetrievableStateStorageHelper<>();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java
index 6586fae..469f290 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobExecutionITCase.java
@@ -51,7 +51,7 @@ public class JobExecutionITCase extends TestLogger {
         final JobGraph jobGraph = createJobGraph(parallelism);
 
         final TestingMiniClusterConfiguration miniClusterConfiguration =
-                new TestingMiniClusterConfiguration.Builder()
+                TestingMiniClusterConfiguration.newBuilder()
                         .setNumSlotsPerTaskManager(numSlotsPerTaskExecutor)
                         .setNumTaskManagers(numTaskExecutors)
                         .setLocalCommunication(true)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 3f30d51..455e338 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -41,7 +41,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
+import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -159,7 +159,6 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static 
org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.containsInAnyOrder;
@@ -798,8 +797,8 @@ public class JobMasterTest extends TestLogger {
         final StandaloneCompletedCheckpointStore completedCheckpointStore =
                 new StandaloneCompletedCheckpointStore(1);
         final CheckpointRecoveryFactory testingCheckpointRecoveryFactory =
-                useSameServicesForAllJobs(
-                        completedCheckpointStore, new 
StandaloneCheckpointIDCounter());
+                PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery(
+                        maxCheckpoints -> completedCheckpointStore);
         
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
 
         final JobMaster jobMaster =
@@ -873,8 +872,8 @@ public class JobMasterTest extends TestLogger {
         completedCheckpointStore.addCheckpoint(
                 completedCheckpoint, new CheckpointsCleaner(), () -> {});
         final CheckpointRecoveryFactory testingCheckpointRecoveryFactory =
-                useSameServicesForAllJobs(
-                        completedCheckpointStore, new 
StandaloneCheckpointIDCounter());
+                PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery(
+                        maxCheckpoints -> completedCheckpointStore);
         
haServices.setCheckpointRecoveryFactory(testingCheckpointRecoveryFactory);
 
         final JobMaster jobMaster = new JobMasterBuilder(jobGraph, 
rpcService).createJobMaster();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
index e8673e4..e8903b3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/LeaderChangeClusterComponentsTest.java
@@ -81,7 +81,7 @@ public class LeaderChangeClusterComponentsTest extends 
TestLogger {
 
         miniCluster =
                 new TestingMiniCluster(
-                        new TestingMiniClusterConfiguration.Builder()
+                        TestingMiniClusterConfiguration.newBuilder()
                                 .setNumTaskManagers(NUM_TMS)
                                 .setNumSlotsPerTaskManager(SLOTS_PER_TM)
                                 .build(),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java
index ac7c46b..a9b4569 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/TestingMiniClusterConfiguration.java
@@ -30,6 +30,15 @@ import static 
org.apache.flink.runtime.minicluster.RpcServiceSharing.SHARED;
 /** Configuration for the {@link TestingMiniCluster}. */
 public class TestingMiniClusterConfiguration extends MiniClusterConfiguration {
 
+    /**
+     * Create a new {@link Builder builder} for {@link 
TestingMiniClusterConfiguration}.
+     *
+     * @return New builder instance.
+     */
+    public static Builder newBuilder() {
+        return new Builder();
+    }
+
     private final int numberDispatcherResourceManagerComponents;
 
     private final boolean localCommunication;
@@ -70,8 +79,12 @@ public class TestingMiniClusterConfiguration extends 
MiniClusterConfiguration {
 
         @Nullable private String commonBindAddress = null;
 
-        public Builder setConfiguration(Configuration configuration1) {
-            this.configuration = Preconditions.checkNotNull(configuration1);
+        private Builder() {
+            // No-op.
+        }
+
+        public Builder setConfiguration(Configuration configuration) {
+            this.configuration = Preconditions.checkNotNull(configuration);
             return this;
         }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/persistence/TestingStateHandleStore.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/persistence/TestingStateHandleStore.java
index 10c055e..7ce92b1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/persistence/TestingStateHandleStore.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/persistence/TestingStateHandleStore.java
@@ -141,7 +141,7 @@ public class TestingStateHandleStore<T extends Serializable>
         releaseAllHandlesRunnable.run();
     }
 
-    public static <T extends Serializable> Builder<T> builder() {
+    public static <T extends Serializable> Builder<T> newBuilder() {
         return new Builder<>();
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java
index 7a93108..be7b45a 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/CheckpointStoreITCase.java
@@ -19,18 +19,15 @@ package org.apache.flink.test.checkpointing;
 
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.configuration.ClusterOptions;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.TestingCheckpointIDCounter;
-import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
+import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
-import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
@@ -38,22 +35,18 @@ import 
org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.test.util.MiniClusterWithClientResource;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
-import org.apache.flink.util.concurrent.Executors;
 import org.apache.flink.util.function.SerializableSupplier;
 
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Test;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 
 import static 
org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart;
-import static 
org.apache.flink.configuration.JobManagerOptions.SchedulerType.Adaptive;
 import static org.apache.flink.util.Preconditions.checkState;
-import static org.junit.Assume.assumeFalse;
+import static org.junit.Assert.assertEquals;
 
 /**
  * Test that failure on recovery leads to job restart if configured, so that 
transient recovery
@@ -63,7 +56,9 @@ public class CheckpointStoreITCase extends TestLogger {
 
     private static final Configuration CONFIGURATION =
             new Configuration()
-                    .set(HighAvailabilityOptions.HA_MODE, 
TestingHAFactory.class.getName());
+                    .set(
+                            HighAvailabilityOptions.HA_MODE,
+                            
BlockingHighAvailabilityServiceFactory.class.getName());
 
     @ClassRule
     public static final MiniClusterWithClientResource CLUSTER =
@@ -73,33 +68,41 @@ public class CheckpointStoreITCase extends TestLogger {
                             .build());
 
     @Before
-    public void init() {
-        FailingStore.reset();
+    public void setUp() {
+        BlockingHighAvailabilityServiceFactory.reset();
         FailingMapper.reset();
     }
 
     @Test
-    public void testRestartOnRecoveryFailure() throws Exception {
-        assumeFalse(
-                // TODO: remove after FLINK-22483
-                "Adaptive scheduler doesn't retry after failures on recovery",
-                ClusterOptions.getSchedulerType(CONFIGURATION) == Adaptive);
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+    public void 
testJobClientRemainsResponsiveDuringCompletedCheckpointStoreRecovery()
+            throws Exception {
+        final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.enableCheckpointing(10);
         env.setRestartStrategy(fixedDelayRestart(2 /* failure on processing + 
on recovery */, 0));
-        env.addSource(emitUntil(() -> FailingStore.recovered && 
FailingMapper.failedAndProcessed))
+        env.addSource(emitUntil(() -> FailingMapper.failedAndProcessed))
                 .map(new FailingMapper())
                 .addSink(new DiscardingSink<>());
-        env.execute();
+        final JobClient jobClient = env.executeAsync();
 
-        checkState(FailingStore.recovered && FailingMapper.failedAndProcessed);
+        
BlockingHighAvailabilityServiceFactory.fetchRemoteCheckpointsStart.await();
+        for (int i = 0; i < 10; i++) {
+            final JobStatus jobStatus = jobClient.getJobStatus().get();
+            assertEquals(JobStatus.INITIALIZING, jobStatus);
+        }
+        
BlockingHighAvailabilityServiceFactory.fetchRemoteCheckpointsFinished.countDown();
+
+        // Await for job to finish.
+        jobClient.getJobExecutionResult().get();
+
+        checkState(FailingMapper.failedAndProcessed);
     }
 
     private static class FailingMapper implements MapFunction<Integer, 
Integer> {
+
         private static volatile boolean failed = false;
         private static volatile boolean failedAndProcessed = false;
 
-        public static void reset() {
+        static void reset() {
             failed = false;
             failedAndProcessed = false;
         }
@@ -116,78 +119,43 @@ public class CheckpointStoreITCase extends TestLogger {
         }
     }
 
-    /** TestingHAFactory. */
-    public static class TestingHAFactory implements 
HighAvailabilityServicesFactory {
-
-        @Override
-        public HighAvailabilityServices createHAServices(
-                Configuration configuration, Executor executor) {
-            return new EmbeddedHaServices(Executors.directExecutor()) {
-
-                @Override
-                public CheckpointRecoveryFactory 
getCheckpointRecoveryFactory() {
-                    return new TestingCheckpointRecoveryFactory(
-                            new FailingStore(),
-                            new TestingCheckpointIDCounter(new 
CompletableFuture<>()));
-                }
-            };
-        }
-    }
+    /**
+     * Testing implementation of {@link HighAvailabilityServicesFactory} that 
lets us inject custom
+     * {@link HighAvailabilityServices}.
+     */
+    public static class BlockingHighAvailabilityServiceFactory
+            implements HighAvailabilityServicesFactory {
 
-    private static class FailingStore implements CompletedCheckpointStore {
-        private static volatile boolean started = false;
-        private static volatile boolean failed = false;
-        private static volatile boolean recovered = false;
+        private static volatile CountDownLatch fetchRemoteCheckpointsStart = 
new CountDownLatch(1);
+        private static volatile CountDownLatch fetchRemoteCheckpointsFinished =
+                new CountDownLatch(1);
 
-        public static void reset() {
-            started = failed = recovered = false;
+        static void reset() {
+            fetchRemoteCheckpointsStart = new CountDownLatch(1);
+            fetchRemoteCheckpointsFinished = new CountDownLatch(1);
         }
 
         @Override
-        public void recover() throws Exception {
-            if (!started) {
-                started = true;
-            } else if (!failed) {
-                failed = true;
-                throw new RuntimeException();
-            } else if (!recovered) {
-                recovered = true;
-            }
-        }
-
-        @Override
-        public void addCheckpoint(
-                CompletedCheckpoint checkpoint,
-                CheckpointsCleaner checkpointsCleaner,
-                Runnable postCleanup) {}
-
-        @Override
-        public void shutdown(JobStatus jobStatus, CheckpointsCleaner 
checkpointsCleaner)
-                throws Exception {}
-
-        @Override
-        public List<CompletedCheckpoint> getAllCheckpoints() {
-            return Collections.emptyList();
-        }
-
-        @Override
-        public int getNumberOfRetainedCheckpoints() {
-            return 0;
-        }
-
-        @Override
-        public int getMaxNumberOfRetainedCheckpoints() {
-            return 1;
-        }
-
-        @Override
-        public boolean requiresExternalizedCheckpoints() {
-            return false;
+        public HighAvailabilityServices createHAServices(
+                Configuration configuration, Executor executor) {
+            final CheckpointRecoveryFactory checkpointRecoveryFactory =
+                    
PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery(
+                            maxCheckpoints -> {
+                                fetchRemoteCheckpointsStart.countDown();
+                                try {
+                                    fetchRemoteCheckpointsFinished.await();
+                                } catch (InterruptedException e) {
+                                    Thread.currentThread().interrupt();
+                                }
+                                return new 
StandaloneCompletedCheckpointStore(maxCheckpoints);
+                            });
+            return new EmbeddedHaServicesWithLeadershipControl(executor, 
checkpointRecoveryFactory);
         }
     }
 
     private SourceFunction<Integer> emitUntil(SerializableSupplier<Boolean> 
until) {
         return new SourceFunction<Integer>() {
+
             private volatile boolean running = true;
 
             @Override
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
index b367042..f32ee15 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/NotifyCheckpointAbortedITCase.java
@@ -39,12 +39,11 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
-import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.state.BackendBuildingException;
@@ -417,20 +416,6 @@ public class NotifyCheckpointAbortedITCase extends 
TestLogger {
         }
     }
 
-    private static class TestingHaServices extends EmbeddedHaServices {
-        private final CheckpointRecoveryFactory checkpointRecoveryFactory;
-
-        TestingHaServices(CheckpointRecoveryFactory checkpointRecoveryFactory, 
Executor executor) {
-            super(executor);
-            this.checkpointRecoveryFactory = checkpointRecoveryFactory;
-        }
-
-        @Override
-        public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
-            return checkpointRecoveryFactory;
-        }
-    }
-
     /** An extension of {@link StandaloneCompletedCheckpointStore}. */
     private static class TestingCompletedCheckpointStore
             extends StandaloneCompletedCheckpointStore {
@@ -471,11 +456,10 @@ public class NotifyCheckpointAbortedITCase extends 
TestLogger {
         @Override
         public HighAvailabilityServices createHAServices(
                 Configuration configuration, Executor executor) {
-            return new TestingHaServices(
-                    PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs(
-                            new TestingCompletedCheckpointStore(),
-                            new StandaloneCheckpointIDCounter()),
-                    executor);
+            final CheckpointRecoveryFactory checkpointRecoveryFactory =
+                    
PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery(
+                            maxCheckpoints -> new 
TestingCompletedCheckpointStore());
+            return new EmbeddedHaServicesWithLeadershipControl(executor, 
checkpointRecoveryFactory);
         }
     }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
index 6cd7ada..cce91ed 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RegionFailoverITCase.java
@@ -35,11 +35,10 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
-import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -440,20 +439,6 @@ public class RegionFailoverITCase extends TestLogger {
         private static final long serialVersionUID = 1L;
     }
 
-    private static class TestingHaServices extends EmbeddedHaServices {
-        private final CheckpointRecoveryFactory checkpointRecoveryFactory;
-
-        TestingHaServices(CheckpointRecoveryFactory checkpointRecoveryFactory, 
Executor executor) {
-            super(executor);
-            this.checkpointRecoveryFactory = checkpointRecoveryFactory;
-        }
-
-        @Override
-        public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
-            return checkpointRecoveryFactory;
-        }
-    }
-
     /**
      * An extension of {@link StandaloneCompletedCheckpointStore} which would 
record information of
      * last completed checkpoint id and the number of completed checkpoints.
@@ -486,11 +471,10 @@ public class RegionFailoverITCase extends TestLogger {
         @Override
         public HighAvailabilityServices createHAServices(
                 Configuration configuration, Executor executor) {
-            return new TestingHaServices(
-                    PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs(
-                            new TestingCompletedCheckpointStore(),
-                            new StandaloneCheckpointIDCounter()),
-                    executor);
+            final CheckpointRecoveryFactory checkpointRecoveryFactory =
+                    
PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery(
+                            maxCheckpoints -> new 
TestingCompletedCheckpointStore());
+            return new EmbeddedHaServicesWithLeadershipControl(executor, 
checkpointRecoveryFactory);
         }
     }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index fa491dc..85c3f21 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -45,13 +45,12 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.PerJobCheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.HighAvailabilityServicesFactory;
-import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
+import 
org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServicesWithLeadershipControl;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -763,20 +762,6 @@ public class SavepointITCase extends TestLogger {
         }
     }
 
-    private static class TestingHaServices extends EmbeddedHaServices {
-        private final CheckpointRecoveryFactory checkpointRecoveryFactory;
-
-        TestingHaServices(CheckpointRecoveryFactory checkpointRecoveryFactory, 
Executor executor) {
-            super(executor);
-            this.checkpointRecoveryFactory = checkpointRecoveryFactory;
-        }
-
-        @Override
-        public CheckpointRecoveryFactory getCheckpointRecoveryFactory() {
-            return checkpointRecoveryFactory;
-        }
-    }
-
     /**
      * A factory for HA services used to inject {@link
      * FailingSyncSavepointCompletedCheckpointStore}.
@@ -784,12 +769,11 @@ public class SavepointITCase extends TestLogger {
     public static class FailingSyncSavepointHAFactory implements 
HighAvailabilityServicesFactory {
         @Override
         public HighAvailabilityServices createHAServices(
-                Configuration configuration, Executor executor) throws 
Exception {
-            return new TestingHaServices(
-                    PerJobCheckpointRecoveryFactory.useSameServicesForAllJobs(
-                            new FailingSyncSavepointCompletedCheckpointStore(),
-                            new StandaloneCheckpointIDCounter()),
-                    executor);
+                Configuration configuration, Executor executor) {
+            final CheckpointRecoveryFactory checkpointRecoveryFactory =
+                    
PerJobCheckpointRecoveryFactory.withoutCheckpointStoreRecovery(
+                            maxCheckpoints -> new 
FailingSyncSavepointCompletedCheckpointStore());
+            return new EmbeddedHaServicesWithLeadershipControl(executor, 
checkpointRecoveryFactory);
         }
     }
 
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
index 5e842bc..a412498 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.java
@@ -98,7 +98,7 @@ public class ZooKeeperLeaderElectionITCase extends TestLogger 
{
         configuration.setLong(ClusterOptions.REFUSED_REGISTRATION_DELAY, 50L);
 
         final TestingMiniClusterConfiguration miniClusterConfiguration =
-                new TestingMiniClusterConfiguration.Builder()
+                TestingMiniClusterConfiguration.newBuilder()
                         .setConfiguration(configuration)
                         
.setNumberDispatcherResourceManagerComponents(numDispatchers)
                         .setNumTaskManagers(numTMs)

Reply via email to