http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactory.java
deleted file mode 100644
index d2bf40e..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStoreFactory.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.IllegalConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * Factory for {@link SavepointStore} instances.
- */
-public class SavepointStoreFactory {
-
-       public static final String SAVEPOINT_BACKEND_KEY = 
"savepoints.state.backend";
-       public static final String SAVEPOINT_DIRECTORY_KEY = 
"savepoints.state.backend.fs.dir";
-       public static final String DEFAULT_SAVEPOINT_BACKEND = "jobmanager";
-
-       public static final Logger LOG = 
LoggerFactory.getLogger(SavepointStoreFactory.class);
-
-       /**
-        * Creates a {@link SavepointStore} from the specified Configuration.
-        *
-        * <p>You can configure a savepoint-specific backend for the 
savepoints. If
-        * you don't configure anything, the regular checkpoint backend will be
-        * used.
-        *
-        * <p>The default and fallback backend is the job manager, which loses 
the
-        * savepoint after the job manager shuts down.
-        *
-        * @param config The configuration to parse the savepoint backend 
configuration.
-        * @return A savepoint store.
-        */
-       public static SavepointStore createFromConfig(Configuration config) 
throws Exception {
-
-               // Try a the savepoint-specific configuration first.
-               String savepointBackend = 
config.getString(SAVEPOINT_BACKEND_KEY, DEFAULT_SAVEPOINT_BACKEND);
-
-               if (savepointBackend == null) {
-                       LOG.info("No savepoint state backend configured. " +
-                                       "Using job manager savepoint state 
backend.");
-                       return createJobManagerSavepointStore();
-               } else if (savepointBackend.equals("jobmanager")) {
-                       LOG.info("Using job manager savepoint state backend.");
-                       return createJobManagerSavepointStore();
-               } else if (savepointBackend.equals("filesystem")) {
-                       String rootPath = 
config.getString(SAVEPOINT_DIRECTORY_KEY, null);
-
-                       if (rootPath == null) {
-                               throw new IllegalConfigurationException("Using 
filesystem as savepoint state backend, " +
-                                               "but did not specify directory. 
Please set the " +
-                                               "following configuration key: 
'" + SAVEPOINT_DIRECTORY_KEY +
-                                               "' (e.g. " + 
SAVEPOINT_DIRECTORY_KEY + ": hdfs:///flink/savepoints/). " +
-                                               "Falling back to job manager 
savepoint backend.");
-                       } else {
-                               LOG.info("Using filesystem savepoint backend 
(root path: {}).", rootPath);
-
-                               return createFileSystemSavepointStore(rootPath);
-                       }
-               } else {
-                       throw new IllegalConfigurationException("Unexpected 
savepoint backend " +
-                                       "configuration '" + savepointBackend + 
"'. " +
-                                       "Falling back to job manager savepoint 
state backend.");
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       // Savepoint stores
-       // 
------------------------------------------------------------------------
-
-       private static SavepointStore createJobManagerSavepointStore() {
-               return new HeapSavepointStore();
-       }
-
-       private static SavepointStore createFileSystemSavepointStore(String 
rootPath) throws IOException {
-               return new FsSavepointStore(rootPath, "savepoint-");
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
index c474311..b1d7ff2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/JobCheckpointStats.java
@@ -47,6 +47,13 @@ public interface JobCheckpointStats {
         */
        long getCount();
 
+       /**
+        * Returns the most recent external path of a checkpoint.
+        *
+        * @return External checkpoint path or <code>null</code> if none 
available.
+        */
+       String getExternalPath();
+
        // 
------------------------------------------------------------------------
        // Duration
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
index 2217fd4..db8a0e0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTracker.java
@@ -130,6 +130,7 @@ public class SimpleCheckpointStatsTracker implements 
CheckpointStatsTracker {
 
                metrics.gauge("lastCheckpointSize", new CheckpointSizeGauge());
                metrics.gauge("lastCheckpointDuration", new 
CheckpointDurationGauge());
+               metrics.gauge("lastCheckpointExternalPath", new 
CheckpointExternalPathGauge());
        }
 
        @Override
@@ -278,6 +279,7 @@ public class SimpleCheckpointStatsTracker implements 
CheckpointStatsTracker {
                                                // Need to clone in order to 
have a consistent snapshot.
                                                // We can safely update it 
afterwards.
                                                (List<CheckpointStats>) 
history.clone(),
+                                               
latestCompletedCheckpoint.getExternalPath(),
                                                overallCount,
                                                overallMinDuration,
                                                overallMaxDuration,
@@ -349,6 +351,7 @@ public class SimpleCheckpointStatsTracker implements 
CheckpointStatsTracker {
                // General
                private final List<CheckpointStats> recentHistory;
                private final long count;
+               private final String externalPath;
 
                // Duration
                private final long minDuration;
@@ -362,6 +365,7 @@ public class SimpleCheckpointStatsTracker implements 
CheckpointStatsTracker {
 
                public JobCheckpointStatsSnapshot(
                                List<CheckpointStats> recentHistory,
+                               String externalPath,
                                long count,
                                long minDuration,
                                long maxDuration,
@@ -372,6 +376,7 @@ public class SimpleCheckpointStatsTracker implements 
CheckpointStatsTracker {
 
                        this.recentHistory = recentHistory;
                        this.count = count;
+                       this.externalPath = externalPath;
 
                        this.minDuration = minDuration;
                        this.maxDuration = maxDuration;
@@ -393,6 +398,11 @@ public class SimpleCheckpointStatsTracker implements 
CheckpointStatsTracker {
                }
 
                @Override
+               public String getExternalPath() {
+                       return externalPath;
+               }
+
+               @Override
                public long getMinDuration() {
                        return minDuration;
                }
@@ -440,4 +450,17 @@ public class SimpleCheckpointStatsTracker implements 
CheckpointStatsTracker {
                        return latestCompletedCheckpoint == null ? -1 : 
latestCompletedCheckpoint.getDuration();
                }
        }
+
+       private class CheckpointExternalPathGauge implements Gauge<String> {
+
+               @Override
+               public String getValue() {
+                       CompletedCheckpoint checkpoint = 
latestCompletedCheckpoint;
+                       if (checkpoint != null && checkpoint.getExternalPath() 
!= null) {
+                               return checkpoint.getExternalPath();
+                       } else {
+                               return "n/a";
+                       }
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index cf98ca6..10f0e88 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -35,7 +35,6 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
@@ -49,6 +48,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -353,12 +353,13 @@ public class ExecutionGraph {
                        long checkpointTimeout,
                        long minPauseBetweenCheckpoints,
                        int maxConcurrentCheckpoints,
+                       ExternalizedCheckpointSettings externalizeSettings,
                        List<ExecutionJobVertex> verticesToTrigger,
                        List<ExecutionJobVertex> verticesToWaitFor,
                        List<ExecutionJobVertex> verticesToCommitTo,
                        CheckpointIDCounter checkpointIDCounter,
                        CompletedCheckpointStore checkpointStore,
-                       SavepointStore savepointStore,
+                       String checkpointDir,
                        CheckpointStatsTracker statsTracker) {
 
                // simple sanity checks
@@ -389,12 +390,13 @@ public class ExecutionGraph {
                                checkpointTimeout,
                                minPauseBetweenCheckpoints,
                                maxConcurrentCheckpoints,
+                               externalizeSettings,
                                tasksToTrigger,
                                tasksToWaitFor,
                                tasksToCommitTo,
                                checkpointIDCounter,
                                checkpointStore,
-                               savepointStore,
+                               checkpointDir,
                                checkpointStatsTracker);
 
                // the periodic checkpoint scheduler is activated and 
deactivated as a result of
@@ -414,7 +416,7 @@ public class ExecutionGraph {
                }
 
                if (checkpointCoordinator != null) {
-                       checkpointCoordinator.suspend();
+                       checkpointCoordinator.shutdown(state);
                        checkpointCoordinator = null;
                        checkpointStatsTracker = null;
                }
@@ -1076,15 +1078,8 @@ public class ExecutionGraph {
                        CheckpointCoordinator coord = 
this.checkpointCoordinator;
                        this.checkpointCoordinator = null;
                        if (coord != null) {
-                               if (state.isGloballyTerminalState()) {
-                                       coord.shutdown();
-                               } else {
-                                       coord.suspend();
-                               }
+                               coord.shutdown(state);
                        }
-
-                       // We don't clean the checkpoint stats tracker, because 
we want
-                       // it to be available after the job has terminated.
                } catch (Exception e) {
                        LOG.error("Error while cleaning up after execution", e);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index 1c6eb8d..dcd6a5d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -28,7 +28,6 @@ import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
 import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
@@ -40,9 +39,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
-
 import org.slf4j.Logger;
-
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.ExecutionContext$;
 import scala.concurrent.duration.FiniteDuration;
@@ -71,7 +68,6 @@ public class ExecutionGraphBuilder {
                        Executor executor,
                        ClassLoader classLoader,
                        CheckpointRecoveryFactory recoveryFactory,
-                       SavepointStore savepointStore,
                        Time timeout,
                        RestartStrategy restartStrategy,
                        MetricGroup metrics,
@@ -82,7 +78,7 @@ public class ExecutionGraphBuilder {
                final ExecutionContext executionContext = 
ExecutionContext$.MODULE$.fromExecutor(executor);
                
                return buildGraph(prior, jobGraph, jobManagerConfig, 
executionContext,
-                               classLoader, recoveryFactory, savepointStore, 
timeout, restartStrategy,
+                               classLoader, recoveryFactory, timeout, 
restartStrategy,
                                metrics, parallelismForAutoMax, log);
        }
 
@@ -98,7 +94,6 @@ public class ExecutionGraphBuilder {
                        ExecutionContext executionContext,
                        ClassLoader classLoader,
                        CheckpointRecoveryFactory recoveryFactory,
-                       SavepointStore savepointStore,
                        Time timeout,
                        RestartStrategy restartStrategy,
                        MetricGroup metrics,
@@ -183,7 +178,6 @@ public class ExecutionGraphBuilder {
                // configure the state checkpointing
                JobSnapshottingSettings snapshotSettings = 
jobGraph.getSnapshotSettings();
                if (snapshotSettings != null) {
-
                        List<ExecutionJobVertex> triggerVertices = 
                                        
idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
 
@@ -220,17 +214,22 @@ public class ExecutionGraphBuilder {
                                checkpointStatsTracker = new 
SimpleCheckpointStatsTracker(historySize, ackVertices, metrics);
                        }
 
+                       /** The default directory for externalized checkpoints. 
*/
+                       String externalizedCheckpointsDir = 
jobManagerConfig.getString(
+                                       
ConfigConstants.CHECKPOINTS_DIRECTORY_KEY, null);
+
                        executionGraph.enableSnapshotCheckpointing(
                                        
snapshotSettings.getCheckpointInterval(),
                                        snapshotSettings.getCheckpointTimeout(),
                                        
snapshotSettings.getMinPauseBetweenCheckpoints(),
                                        
snapshotSettings.getMaxConcurrentCheckpoints(),
+                                       
snapshotSettings.getExternalizedCheckpointSettings(),
                                        triggerVertices,
                                        ackVertices,
                                        confirmVertices,
                                        checkpointIdCounter,
                                        completedCheckpoints,
-                                       savepointStore,
+                                       externalizedCheckpointsDir,
                                        checkpointStatsTracker);
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
new file mode 100644
index 0000000..779fc76
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/ExternalizedCheckpointSettings.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobgraph.tasks;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Grouped settings for externalized checkpoints.
+ */
+@Internal
+public class ExternalizedCheckpointSettings implements java.io.Serializable {
+
+       private static final ExternalizedCheckpointSettings NONE = new 
ExternalizedCheckpointSettings(false, false);
+
+       /** Flag indicating whether checkpoints should be externalized. */
+       private final boolean externalizeCheckpoints;
+
+       /** Flag indicating whether externalized checkpoints should delete on 
cancellation. */
+       private final boolean deleteOnCancellation;
+
+       private ExternalizedCheckpointSettings(boolean externalizeCheckpoints, 
boolean deleteOnCancellation) {
+               this.externalizeCheckpoints = externalizeCheckpoints;
+               this.deleteOnCancellation = deleteOnCancellation;
+       }
+
+       /**
+        * Returns <code>true</code> if checkpoints should be externalized.
+        *
+        * @return <code>true</code> if checkpoints should be externalized.
+        */
+       public boolean externalizeCheckpoints() {
+               return externalizeCheckpoints;
+       }
+
+       /**
+        * Returns <code>true</code> if externalized checkpoints should be 
deleted on cancellation.
+        *
+        * @return <code>true</code> if externalized checkpoints should be 
deleted on cancellation.
+        */
+       public boolean deleteOnCancellation() {
+               return deleteOnCancellation;
+       }
+
+       public static ExternalizedCheckpointSettings 
externalizeCheckpoints(boolean deleteOnCancellation) {
+               return new ExternalizedCheckpointSettings(true, 
deleteOnCancellation);
+       }
+
+       public static ExternalizedCheckpointSettings none() {
+               return NONE;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
index ab701b5..36a5c5d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/JobSnapshottingSettings.java
@@ -32,7 +32,6 @@ import static java.util.Objects.requireNonNull;
 public class JobSnapshottingSettings implements java.io.Serializable{
        
        private static final long serialVersionUID = -2593319571078198180L;
-
        
        private final List<JobVertexID> verticesToTrigger;
 
@@ -48,19 +47,25 @@ public class JobSnapshottingSettings implements 
java.io.Serializable{
        
        private final int maxConcurrentCheckpoints;
 
+       /** Settings for externalized checkpoints. */
+       private final ExternalizedCheckpointSettings 
externalizedCheckpointSettings;
+
        /** Path to savepoint to reset state back to (optional, can be null) */
        private String savepointPath;
        
-       public JobSnapshottingSettings(List<JobVertexID> verticesToTrigger,
-                                                                       
List<JobVertexID> verticesToAcknowledge,
-                                                                       
List<JobVertexID> verticesToConfirm,
-                                                                       long 
checkpointInterval, long checkpointTimeout,
-                                                                       long 
minPauseBetweenCheckpoints, int maxConcurrentCheckpoints)
-       {
+       public JobSnapshottingSettings(
+                       List<JobVertexID> verticesToTrigger,
+                       List<JobVertexID> verticesToAcknowledge,
+                       List<JobVertexID> verticesToConfirm,
+                       long checkpointInterval,
+                       long checkpointTimeout,
+                       long minPauseBetweenCheckpoints,
+                       int maxConcurrentCheckpoints,
+                       ExternalizedCheckpointSettings 
externalizedCheckpointSettings) {
+
                // sanity checks
                if (checkpointInterval < 1 || checkpointTimeout < 1 ||
-                               minPauseBetweenCheckpoints < 0 || 
maxConcurrentCheckpoints < 1)
-               {
+                               minPauseBetweenCheckpoints < 0 || 
maxConcurrentCheckpoints < 1) {
                        throw new IllegalArgumentException();
                }
                
@@ -71,6 +76,7 @@ public class JobSnapshottingSettings implements 
java.io.Serializable{
                this.checkpointTimeout = checkpointTimeout;
                this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
                this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
+               this.externalizedCheckpointSettings = 
requireNonNull(externalizedCheckpointSettings);
        }
        
        // 
--------------------------------------------------------------------------------------------
@@ -103,6 +109,10 @@ public class JobSnapshottingSettings implements 
java.io.Serializable{
                return maxConcurrentCheckpoints;
        }
 
+       public ExternalizedCheckpointSettings 
getExternalizedCheckpointSettings() {
+               return externalizedCheckpointSettings;
+       }
+
        /**
         * Sets the savepoint path.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
index e125e10..67fc397 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ZooKeeperUtils.java
@@ -218,15 +218,13 @@ public class ZooKeeperUtils {
         * @param configuration                  {@link Configuration} object
         * @param jobId                          ID of job to create the 
instance for
         * @param maxNumberOfCheckpointsToRetain The maximum number of 
checkpoints to retain
-        * @param userClassLoader                User code class loader
         * @return {@link ZooKeeperCompletedCheckpointStore} instance
         */
        public static CompletedCheckpointStore createCompletedCheckpoints(
                        CuratorFramework client,
                        Configuration configuration,
                        JobID jobId,
-                       int maxNumberOfCheckpointsToRetain,
-                       ClassLoader userClassLoader) throws Exception {
+                       int maxNumberOfCheckpointsToRetain) throws Exception {
 
                checkNotNull(configuration, "Configuration");
 
@@ -244,7 +242,6 @@ public class ZooKeeperUtils {
 
                return new ZooKeeperCompletedCheckpointStore(
                                maxNumberOfCheckpointsToRetain,
-                               userClassLoader,
                                client,
                                checkpointsPath,
                                stateStorage);

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
index 485a21e..f426254 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/clusterframework/ContaineredJobManager.scala
@@ -24,7 +24,6 @@ import akka.actor.ActorRef
 import org.apache.flink.api.common.JobID
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.clusterframework.messages._
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
@@ -69,7 +68,6 @@ abstract class ContaineredJobManager(
     leaderElectionService: LeaderElectionService,
     submittedJobGraphs : SubmittedJobGraphStore,
     checkpointRecoveryFactory : CheckpointRecoveryFactory,
-    savepointStore: SavepointStore,
     jobRecoveryTimeout: FiniteDuration,
     metricsRegistry: Option[FlinkMetricRegistry])
   extends JobManager(
@@ -84,7 +82,6 @@ abstract class ContaineredJobManager(
     leaderElectionService,
     submittedJobGraphs,
     checkpointRecoveryFactory,
-    savepointStore,
     jobRecoveryTimeout,
     metricsRegistry) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index be820ae..450e810 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -19,8 +19,8 @@
 package org.apache.flink.runtime.jobmanager
 
 import java.io.{File, IOException}
-import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket, 
UnknownHostException}
 import java.lang.management.ManagementFactory
+import java.net.{BindException, InetAddress, InetSocketAddress, ServerSocket, 
UnknownHostException}
 import java.util.UUID
 import java.util.concurrent.{ExecutorService, TimeUnit, TimeoutException}
 import javax.management.ObjectName
@@ -34,23 +34,23 @@ import org.apache.flink.api.common.time.Time
 import org.apache.flink.configuration.{ConfigConstants, Configuration, 
GlobalConfiguration, HighAvailabilityOptions}
 import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.io.InputSplitAssigner
-import org.apache.flink.metrics.{Gauge, MetricGroup}
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup
+import org.apache.flink.metrics.{Gauge, MetricGroup}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
 import org.apache.flink.runtime.akka.{AkkaUtils, ListeningBehaviour}
 import org.apache.flink.runtime.blob.BlobServer
 import org.apache.flink.runtime.checkpoint._
-import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, 
SavepointStore, SavepointStoreFactory}
-import org.apache.flink.runtime.checkpoint.stats.{CheckpointStatsTracker, 
DisabledCheckpointStatsTracker, SimpleCheckpointStatsTracker}
+import org.apache.flink.runtime.checkpoint.savepoint.{SavepointLoader, 
SavepointStore}
 import org.apache.flink.runtime.client._
-import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.messages._
 import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceID
+import org.apache.flink.runtime.concurrent.BiFunction
+import org.apache.flink.runtime.execution.SuppressRestartsException
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
-import org.apache.flink.runtime.executiongraph.{ExecutionGraphBuilder, 
ExecutionGraph, ExecutionJobVertex, StatusListenerMessenger}
+import org.apache.flink.runtime.executiongraph.{ExecutionGraph, 
ExecutionGraphBuilder, ExecutionJobVertex, StatusListenerMessenger}
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceManager}
 import org.apache.flink.runtime.io.network.PartitionState
 import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus}
@@ -66,15 +66,14 @@ import 
org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, SendSta
 import org.apache.flink.runtime.messages.TaskMessages.UpdateTaskExecutionState
 import org.apache.flink.runtime.messages.accumulators.{AccumulatorMessage, 
AccumulatorResultStringsFound, AccumulatorResultsErroneous, 
AccumulatorResultsFound, RequestAccumulatorResults, 
RequestAccumulatorResultsStringified}
 import 
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, 
AcknowledgeCheckpoint, DeclineCheckpoint}
-import org.apache.flink.runtime.messages.webmonitor.InfoMessage
-import org.apache.flink.runtime.messages.webmonitor._
-import org.apache.flink.runtime.metrics.{MetricRegistryConfiguration, 
MetricRegistry => FlinkMetricRegistry}
+import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup
+import org.apache.flink.runtime.metrics.{MetricRegistry => 
FlinkMetricRegistry, MetricRegistryConfiguration}
 import org.apache.flink.runtime.process.ProcessReaper
-import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation}
 import org.apache.flink.runtime.query.KvStateMessage.{LookupKvStateLocation, 
NotifyKvStateRegistered, NotifyKvStateUnregistered}
-import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, 
SecurityConfiguration}
+import org.apache.flink.runtime.query.{KvStateMessage, UnknownKvStateLocation}
 import org.apache.flink.runtime.security.SecurityContext
+import org.apache.flink.runtime.security.SecurityContext.{FlinkSecuredRunner, 
SecurityConfiguration}
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
@@ -128,7 +127,6 @@ class JobManager(
     protected val leaderElectionService: LeaderElectionService,
     protected val submittedJobGraphs : SubmittedJobGraphStore,
     protected val checkpointRecoveryFactory : CheckpointRecoveryFactory,
-    protected val savepointStore: SavepointStore,
     protected val jobRecoveryTimeout: FiniteDuration,
     protected val metricsRegistry: Option[FlinkMetricRegistry])
   extends FlinkActor
@@ -178,6 +176,13 @@ class JobManager(
   val webMonitorPort : Int = flinkConfiguration.getInteger(
     ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, -1)
 
+  /** The default directory for savepoints. */
+  val defaultSavepointDir: String = 
ConfigurationUtil.getStringWithDeprecatedKeys(
+    flinkConfiguration,
+    ConfigConstants.SAVEPOINT_DIRECTORY_KEY,
+    null,
+    ConfigConstants.SAVEPOINT_FS_DIRECTORY_KEY)
+
   /** The resource manager actor responsible for allocating and managing task 
manager resources. */
   var currentResourceManager: Option[ActorRef] = None
 
@@ -242,14 +247,6 @@ class JobManager(
     }
 
     try {
-      savepointStore.shutdown()
-    } catch {
-      case e: Exception =>
-        log.error("Could not shut down savepoint store.", e)
-        throw new RuntimeException("Could not stop the  savepoint store 
store.", e)
-    }
-
-    try {
       // revoke leadership and stop leader election service
       leaderElectionService.stop()
     } catch {
@@ -695,7 +692,7 @@ class JobManager(
     case kvStateMsg : KvStateMessage =>
       handleKvStateMessage(kvStateMsg)
 
-    case TriggerSavepoint(jobId) =>
+    case TriggerSavepoint(jobId, savepointDirectory) =>
       currentJobs.get(jobId) match {
         case Some((graph, _)) =>
           val checkpointCoordinator = graph.getCheckpointCoordinator()
@@ -703,31 +700,46 @@ class JobManager(
           if (checkpointCoordinator != null) {
             // Immutable copy for the future
             val senderRef = sender()
-
-            future {
-              try {
-                // Do this async, because checkpoint coordinator operations can
-                // contain blocking calls to the state backend or ZooKeeper.
-                val savepointFuture = checkpointCoordinator.triggerSavepoint(
-                  System.currentTimeMillis())
-
-                savepointFuture.onComplete {
-                  // Success, respond with the savepoint path
-                  case scala.util.Success(savepointPath) =>
-                    senderRef ! TriggerSavepointSuccess(jobId, savepointPath)
-
-                  // Failure, respond with the cause
-                  case scala.util.Failure(t) =>
-                    senderRef ! TriggerSavepointFailure(
-                      jobId,
-                      new Exception("Failed to complete savepoint", t))
-                }(context.dispatcher)
-              } catch {
-                case e: Exception =>
-                  senderRef ! TriggerSavepointFailure(jobId, new Exception(
-                    "Failed to trigger savepoint", e))
+            try {
+              val targetDirectory : String = savepointDirectory.getOrElse(
+                
flinkConfiguration.getString(ConfigConstants.SAVEPOINT_DIRECTORY_KEY, null))
+
+              if (targetDirectory == null) {
+                throw new IllegalStateException("No savepoint directory 
configured. " +
+                  "You can either specify a directory when triggering this 
savepoint or " +
+                  "configure a cluster-wide default via key '" +
+                  ConfigConstants.SAVEPOINT_DIRECTORY_KEY + "'.")
               }
-            }(context.dispatcher)
+
+              // Do this async, because checkpoint coordinator operations can
+              // contain blocking calls to the state backend or ZooKeeper.
+              val savepointFuture = checkpointCoordinator.triggerSavepoint(
+                System.currentTimeMillis(),
+                targetDirectory)
+
+              savepointFuture.handleAsync[Void](
+                new BiFunction[CompletedCheckpoint, Throwable, Void] {
+                  override def apply(success: CompletedCheckpoint, cause: 
Throwable): Void = {
+                    if (success != null) {
+                      if (success.getExternalPath != null) {
+                        senderRef ! TriggerSavepointSuccess(jobId, 
success.getExternalPath)
+                      } else {
+                        senderRef ! TriggerSavepointFailure(
+                          jobId, new Exception("Savepoint has not been 
persisted."))
+                      }
+                    } else {
+                      senderRef ! TriggerSavepointFailure(
+                        jobId, new Exception("Failed to complete savepoint", 
cause))
+                    }
+                    null
+                  }
+                },
+                context.dispatcher)
+            } catch {
+              case e: Exception =>
+                senderRef ! TriggerSavepointFailure(jobId, new Exception(
+                  "Failed to trigger savepoint", e))
+            }
           } else {
             sender() ! TriggerSavepointFailure(jobId, new 
IllegalStateException(
               "Checkpointing disabled. You can enable it via the execution 
environment of " +
@@ -744,12 +756,15 @@ class JobManager(
         try {
           log.info(s"Disposing savepoint at '$savepointPath'.")
 
-          val savepoint = savepointStore.loadSavepoint(savepointPath)
+          val savepoint = SavepointStore.loadSavepoint(savepointPath)
 
           log.debug(s"$savepoint")
 
-          // Dispose the savepoint
-          savepointStore.disposeSavepoint(savepointPath)
+          // Dispose checkpoint state
+          savepoint.dispose()
+
+          // Remove the header file
+          SavepointStore.removeSavepoint(savepointPath)
 
           senderRef ! DisposeSavepointSuccess
         } catch {
@@ -1150,7 +1165,6 @@ class JobManager(
           executionContext,
           userCodeLoader,
           checkpointRecoveryFactory,
-          savepointStore,
           Time.of(timeout.length, timeout.unit),
           restartStrategy,
           jobMetrics,
@@ -1218,7 +1232,7 @@ class JobManager(
 
                   // load the savepoint as a checkpoint into the system
                   val savepoint: CompletedCheckpoint = 
SavepointLoader.loadAndValidateSavepoint(
-                    jobId, executionGraph.getAllVertices, savepointStore, 
savepointPath)
+                    jobId, executionGraph.getAllVertices, savepointPath)
 
                   executionGraph.getCheckpointCoordinator.getCheckpointStore
                     .addCheckpoint(savepoint)
@@ -2420,7 +2434,6 @@ object JobManager {
     LeaderElectionService,
     SubmittedJobGraphStore,
     CheckpointRecoveryFactory,
-    SavepointStore,
     FiniteDuration, // timeout for job recovery
     Option[FlinkMetricRegistry]
    ) = {
@@ -2497,8 +2510,6 @@ object JobManager {
             new ZooKeeperCheckpointRecoveryFactory(client, configuration))
       }
 
-    val savepointStore = SavepointStoreFactory.createFromConfig(configuration)
-
     val jobRecoveryTimeoutStr = 
configuration.getValue(HighAvailabilityOptions.HA_JOB_DELAY)
 
     val jobRecoveryTimeout = if (jobRecoveryTimeoutStr == null || 
jobRecoveryTimeoutStr.isEmpty) {
@@ -2531,7 +2542,6 @@ object JobManager {
       leaderElectionService,
       submittedJobGraphs,
       checkpointRecoveryFactory,
-      savepointStore,
       jobRecoveryTimeout,
       metricRegistry)
   }
@@ -2595,8 +2605,7 @@ object JobManager {
     leaderElectionService,
     submittedJobGraphs,
     checkpointRecoveryFactory,
-    savepointStore,
-    jobRecoveryTimeout, 
+    jobRecoveryTimeout,
     metricsRegistry) = createJobManagerComponents(
       configuration,
       None)
@@ -2622,7 +2631,6 @@ object JobManager {
       leaderElectionService,
       submittedJobGraphs,
       checkpointRecoveryFactory,
-      savepointStore,
       jobRecoveryTimeout,
       metricsRegistry)
 
@@ -2657,7 +2665,6 @@ object JobManager {
     leaderElectionService: LeaderElectionService,
     submittedJobGraphStore: SubmittedJobGraphStore,
     checkpointRecoveryFactory: CheckpointRecoveryFactory,
-    savepointStore: SavepointStore,
     jobRecoveryTimeout: FiniteDuration,
     metricsRegistry: Option[FlinkMetricRegistry]): Props = {
 
@@ -2674,7 +2681,6 @@ object JobManager {
       leaderElectionService,
       submittedJobGraphStore,
       checkpointRecoveryFactory,
-      savepointStore,
       jobRecoveryTimeout,
       metricsRegistry)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 5e2b547..fd45cda 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -467,8 +467,11 @@ object JobManagerMessages {
     * of triggering and acknowledging checkpoints.
     *
     * @param jobId The JobID of the job to trigger the savepoint for.
+    * @param savepointDirectory Optional target directory
     */
-  case class TriggerSavepoint(jobId: JobID) extends RequiresLeaderSessionID
+  case class TriggerSavepoint(
+      jobId: JobID,
+      savepointDirectory : Option[String] = Option.empty) extends 
RequiresLeaderSessionID
 
   /**
     * Response after a successful savepoint trigger containing the savepoint 
path.

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
index 27c9dd9..2f453a3 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/LocalFlinkMiniCluster.scala
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.io.FileOutputFormat
 import org.apache.flink.configuration.{ConfigConstants, Configuration}
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import 
org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager
 import org.apache.flink.runtime.clusterframework.types.{ResourceID, 
ResourceIDRetrievable}
@@ -119,7 +118,6 @@ class LocalFlinkMiniCluster(
     leaderElectionService,
     submittedJobGraphStore,
     checkpointRecoveryFactory,
-    savepointStore,
     jobRecoveryTimeout,
     metricsRegistry) = JobManager.createJobManagerComponents(config, 
createLeaderElectionService())
 
@@ -143,7 +141,6 @@ class LocalFlinkMiniCluster(
         leaderElectionService,
         submittedJobGraphStore,
         checkpointRecoveryFactory,
-        savepointStore,
         jobRecoveryTimeout,
         metricsRegistry),
       jobManagerName)
@@ -248,7 +245,6 @@ class LocalFlinkMiniCluster(
     leaderElectionService: LeaderElectionService,
     submittedJobGraphStore: SubmittedJobGraphStore,
     checkpointRecoveryFactory: CheckpointRecoveryFactory,
-    savepointStore: SavepointStore,
     jobRecoveryTimeout: FiniteDuration,
     metricsRegistry: Option[MetricRegistry]): Props = {
 
@@ -265,7 +261,6 @@ class LocalFlinkMiniCluster(
       leaderElectionService,
       submittedJobGraphStore,
       checkpointRecoveryFactory,
-      savepointStore,
       jobRecoveryTimeout,
       metricsRegistry)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 728c7d5..7b0e819 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -23,14 +23,16 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
 import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
@@ -50,11 +52,12 @@ import 
org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import scala.concurrent.ExecutionContext;
-import scala.concurrent.Future;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -92,7 +95,8 @@ import static org.mockito.Mockito.when;
  */
 public class CheckpointCoordinatorTest {
 
-       private static final ClassLoader cl = 
Thread.currentThread().getContextClassLoader();
+       @Rule
+       public TemporaryFolder tmpFolder = new TemporaryFolder();
 
        @Test
        public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
@@ -115,13 +119,15 @@ public class CheckpointCoordinatorTest {
                                        jid,
                                        600000,
                                        600000,
-                                       0, Integer.MAX_VALUE,
+                                       0,
+                                       Integer.MAX_VALUE,
+                                       ExternalizedCheckpointSettings.none(),
                                        new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
                                        new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
                                        new ExecutionVertex[] {},
                                        new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(1, cl),
-                                       new HeapSavepointStore(),
+                                       new 
StandaloneCompletedCheckpointStore(1),
+                                       null,
                                        new DisabledCheckpointStatsTracker());
 
                        // nothing should be happening
@@ -135,7 +141,7 @@ public class CheckpointCoordinatorTest {
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
 
-                       coord.shutdown();
+                       coord.shutdown(JobStatus.FINISHED);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -168,12 +174,13 @@ public class CheckpointCoordinatorTest {
                                        600000,
                                        0,
                                        Integer.MAX_VALUE,
+                                       ExternalizedCheckpointSettings.none(),
                                        new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
                                        new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
                                        new ExecutionVertex[] {},
                                        new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(1, cl),
-                                       new HeapSavepointStore(),
+                                       new 
StandaloneCompletedCheckpointStore(1),
+                                       null,
                                        new DisabledCheckpointStatsTracker());
 
                        // nothing should be happening
@@ -187,7 +194,7 @@ public class CheckpointCoordinatorTest {
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
 
-                       coord.shutdown();
+                       coord.shutdown(JobStatus.FINISHED);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -218,12 +225,13 @@ public class CheckpointCoordinatorTest {
                                        600000,
                                        0,
                                        Integer.MAX_VALUE,
+                                       ExternalizedCheckpointSettings.none(),
                                        new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
                                        new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
                                        new ExecutionVertex[] {},
                                        new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(1, cl),
-                                       new HeapSavepointStore(),
+                                       new 
StandaloneCompletedCheckpointStore(1),
+                                       null,
                                        new DisabledCheckpointStatsTracker());
 
                        // nothing should be happening
@@ -237,7 +245,7 @@ public class CheckpointCoordinatorTest {
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
 
-                       coord.shutdown();
+                       coord.shutdown(JobStatus.FINISHED);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -269,12 +277,13 @@ public class CheckpointCoordinatorTest {
                                        600000,
                                        0,
                                        Integer.MAX_VALUE,
+                                       ExternalizedCheckpointSettings.none(),
                                        new ExecutionVertex[] { vertex1, 
vertex2 },
                                        new ExecutionVertex[] { vertex1, 
vertex2 },
                                        new ExecutionVertex[] { vertex1, 
vertex2 },
                                        new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(1, cl),
-                                       new HeapSavepointStore(),
+                                       new 
StandaloneCompletedCheckpointStore(1),
+                                       null,
                                        new DisabledCheckpointStatsTracker());
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -363,7 +372,7 @@ public class CheckpointCoordinatorTest {
                        long checkpointIdNew2 = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
                        assertEquals(checkpointIdNew2, checkpointIdNew);
 
-                       coord.shutdown();
+                       coord.shutdown(JobStatus.FINISHED);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -395,12 +404,13 @@ public class CheckpointCoordinatorTest {
                                        600000,
                                        0,
                                        Integer.MAX_VALUE,
+                                       ExternalizedCheckpointSettings.none(),
                                        new ExecutionVertex[] { vertex1, 
vertex2 },
                                        new ExecutionVertex[] { vertex1, 
vertex2 },
                                        new ExecutionVertex[] { vertex1, 
vertex2 },
                                        new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(1, cl),
-                                       new HeapSavepointStore(),
+                                       new 
StandaloneCompletedCheckpointStore(1),
+                                       null,
                                        new DisabledCheckpointStatsTracker());
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -488,7 +498,7 @@ public class CheckpointCoordinatorTest {
                        coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID2, checkpoint1Id, checkpoint1.getCheckpointTimestamp()));
                        assertTrue(checkpoint1.isDiscarded());
 
-                       coord.shutdown();
+                       coord.shutdown(JobStatus.FINISHED);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -515,12 +525,13 @@ public class CheckpointCoordinatorTest {
                                        600000,
                                        0,
                                        Integer.MAX_VALUE,
+                                       ExternalizedCheckpointSettings.none(),
                                        new ExecutionVertex[] { vertex1, 
vertex2 },
                                        new ExecutionVertex[] { vertex1, 
vertex2 },
                                        new ExecutionVertex[] { vertex1, 
vertex2 },
                                        new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(1, cl),
-                                       new HeapSavepointStore(),
+                                       new 
StandaloneCompletedCheckpointStore(1),
+                                       null,
                                        new DisabledCheckpointStatsTracker());
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -626,7 +637,7 @@ public class CheckpointCoordinatorTest {
                                verify(vertex2, 
times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
                        }
 
-                       coord.shutdown();
+                       coord.shutdown(JobStatus.FINISHED);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -668,12 +679,13 @@ public class CheckpointCoordinatorTest {
                                        600000,
                                        0,
                                        Integer.MAX_VALUE,
+                                       ExternalizedCheckpointSettings.none(),
                                        new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
                                        new ExecutionVertex[] { ackVertex1, 
ackVertex2, ackVertex3 },
                                        new ExecutionVertex[] { commitVertex },
                                        new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(2, cl),
-                                       new HeapSavepointStore(),
+                                       new 
StandaloneCompletedCheckpointStore(2),
+                                       null,
                                        new DisabledCheckpointStatsTracker());
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -766,7 +778,7 @@ public class CheckpointCoordinatorTest {
                        assertEquals(jid, sc2.getJobId());
                        assertTrue(sc2.getTaskStates().isEmpty());
 
-                       coord.shutdown();
+                       coord.shutdown(JobStatus.FINISHED);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -807,12 +819,13 @@ public class CheckpointCoordinatorTest {
                                        600000,
                                        0,
                                        Integer.MAX_VALUE,
+                                       ExternalizedCheckpointSettings.none(),
                                        new ExecutionVertex[] { triggerVertex1, 
triggerVertex2 },
                                        new ExecutionVertex[] { ackVertex1, 
ackVertex2, ackVertex3 },
                                        new ExecutionVertex[] { commitVertex },
                                        new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(10, cl),
-                                       new HeapSavepointStore(),
+                                       new 
StandaloneCompletedCheckpointStore(10),
+                                       null,
                                        new DisabledCheckpointStatsTracker());
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -893,7 +906,7 @@ public class CheckpointCoordinatorTest {
                        // send the last remaining ack for the first 
checkpoint. This should not do anything
                        coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID3, new CheckpointMetaData(checkpointId1, 
0L)));
 
-                       coord.shutdown();
+                       coord.shutdown(JobStatus.FINISHED);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -932,12 +945,13 @@ public class CheckpointCoordinatorTest {
                                        200,
                                        0,
                                        Integer.MAX_VALUE,
+                                       ExternalizedCheckpointSettings.none(),
                                        new ExecutionVertex[] { triggerVertex },
                                        new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
                                        new ExecutionVertex[] { commitVertex },
                                        new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(2, cl),
-                                       new HeapSavepointStore(),
+                                       new 
StandaloneCompletedCheckpointStore(2),
+                                       null,
                                        new DisabledCheckpointStatsTracker());
 
                        // trigger a checkpoint, partially acknowledged
@@ -968,7 +982,7 @@ public class CheckpointCoordinatorTest {
                        verify(commitVertex, times(0))
                                        
.sendMessageToCurrentExecution(any(NotifyCheckpointComplete.class), 
any(ExecutionAttemptID.class));
 
-                       coord.shutdown();
+                       coord.shutdown(JobStatus.FINISHED);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -1000,12 +1014,13 @@ public class CheckpointCoordinatorTest {
                                        200000,
                                        0,
                                        Integer.MAX_VALUE,
+                                       ExternalizedCheckpointSettings.none(),
                                        new ExecutionVertex[] { triggerVertex },
                                        new ExecutionVertex[] { ackVertex1, 
ackVertex2 },
                                        new ExecutionVertex[] { commitVertex },
                                        new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(2, cl),
-                                       new HeapSavepointStore(),
+                                       new 
StandaloneCompletedCheckpointStore(2),
+                                       null,
                                        new DisabledCheckpointStatsTracker());
 
                        assertTrue(coord.triggerCheckpoint(timestamp));
@@ -1027,7 +1042,7 @@ public class CheckpointCoordinatorTest {
                        // unknown ack vertex
                        coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointMetaData));
 
-                       coord.shutdown();
+                       coord.shutdown(JobStatus.FINISHED);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -1081,12 +1096,13 @@ public class CheckpointCoordinatorTest {
                                        200000,    // timeout is very long (200 
s)
                                        0,
                                        Integer.MAX_VALUE,
+                                       ExternalizedCheckpointSettings.none(),
                                        new ExecutionVertex[] { triggerVertex },
                                        new ExecutionVertex[] { ackVertex },
                                        new ExecutionVertex[] { commitVertex },
                                        new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(2, cl),
-                                       new HeapSavepointStore(),
+                                       new 
StandaloneCompletedCheckpointStore(2),
+                                       null,
                                        new DisabledCheckpointStatsTracker());
 
                        
@@ -1133,7 +1149,7 @@ public class CheckpointCoordinatorTest {
                        assertTrue(numCallsSoFar == numCalls.get() ||
                                        numCallsSoFar + 1 == numCalls.get());
 
-                       coord.shutdown();
+                       coord.shutdown(JobStatus.FINISHED);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -1172,12 +1188,13 @@ public class CheckpointCoordinatorTest {
                                        200000,    // timeout is very long (200 
s)
                                        500,    // 500ms delay between 
checkpoints
                                        10,
+                                       ExternalizedCheckpointSettings.none(),
                                        new ExecutionVertex[] { vertex1 },
                                        new ExecutionVertex[] { vertex1 },
                                        new ExecutionVertex[] { vertex1 },
                                        new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(2, cl),
-                                       new HeapSavepointStore(),
+                                       new 
StandaloneCompletedCheckpointStore(2),
+                                       null,
                                        new DisabledCheckpointStatsTracker());
 
                        coord.startCheckpointScheduler();
@@ -1215,7 +1232,7 @@ public class CheckpointCoordinatorTest {
 
                        coord.stopCheckpointScheduler();
 
-                       coord.shutdown();
+                       coord.shutdown(JobStatus.FINISHED);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -1256,20 +1273,22 @@ public class CheckpointCoordinatorTest {
                                600000,
                                0,
                                Integer.MAX_VALUE,
+                               ExternalizedCheckpointSettings.none(),
                                new ExecutionVertex[] { vertex1, vertex2 },
                                new ExecutionVertex[] { vertex1, vertex2 },
                                new ExecutionVertex[] { vertex1, vertex2 },
                                new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(1, cl),
-                               new HeapSavepointStore(),
+                               new StandaloneCompletedCheckpointStore(1),
+                               null,
                                new DisabledCheckpointStatsTracker());
 
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
                assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
 
                // trigger the first checkpoint. this should succeed
-               Future<String> savepointFuture = 
coord.triggerSavepoint(timestamp);
-               assertFalse(savepointFuture.isCompleted());
+               String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+               Future<CompletedCheckpoint> savepointFuture = 
coord.triggerSavepoint(timestamp, savepointDir);
+               assertFalse(savepointFuture.isDone());
 
                // validate that we have a pending savepoint
                assertEquals(1, coord.getNumberOfPendingCheckpoints());
@@ -1287,7 +1306,7 @@ public class CheckpointCoordinatorTest {
                assertFalse(pending.isDiscarded());
                assertFalse(pending.isFullyAcknowledged());
                assertFalse(pending.canBeSubsumed());
-               assertTrue(pending instanceof PendingSavepoint);
+               assertTrue(pending instanceof PendingCheckpoint);
 
                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, 0L);
 
@@ -1297,13 +1316,13 @@ public class CheckpointCoordinatorTest {
                assertEquals(1, pending.getNumberOfNonAcknowledgedTasks());
                assertFalse(pending.isDiscarded());
                assertFalse(pending.isFullyAcknowledged());
-               assertFalse(savepointFuture.isCompleted());
+               assertFalse(savepointFuture.isDone());
 
                // acknowledge the same task again (should not matter)
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID2, checkpointMetaData));
                assertFalse(pending.isDiscarded());
                assertFalse(pending.isFullyAcknowledged());
-               assertFalse(savepointFuture.isCompleted());
+               assertFalse(savepointFuture.isDone());
 
                // acknowledge the other task.
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointMetaData));
@@ -1311,7 +1330,7 @@ public class CheckpointCoordinatorTest {
                // the checkpoint is internally converted to a successful 
checkpoint and the
                // pending checkpoint object is disposed
                assertTrue(pending.isDiscarded());
-               assertTrue(savepointFuture.isCompleted());
+               assertTrue(savepointFuture.isDone());
 
                // the now we should have a completed checkpoint
                assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1335,9 +1354,8 @@ public class CheckpointCoordinatorTest {
                // trigger another checkpoint and see that this one replaces 
the other checkpoint
                // ---------------
                final long timestampNew = timestamp + 7;
-               savepointFuture = coord.triggerSavepoint(timestampNew);
-               assertFalse(savepointFuture.isCompleted());
-
+               savepointFuture = coord.triggerSavepoint(timestampNew, 
savepointDir);
+               assertFalse(savepointFuture.isDone());
 
                long checkpointIdNew = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
                CheckpointMetaData checkpointMetaDataNew = new 
CheckpointMetaData(checkpointIdNew, 0L);
@@ -1352,7 +1370,7 @@ public class CheckpointCoordinatorTest {
                assertEquals(timestampNew, successNew.getTimestamp());
                assertEquals(checkpointIdNew, successNew.getCheckpointID());
                assertTrue(successNew.getTaskStates().isEmpty());
-               assertTrue(savepointFuture.isCompleted());
+               assertTrue(savepointFuture.isDone());
 
                // validate that the relevant tasks got a confirmation message
                {
@@ -1367,7 +1385,7 @@ public class CheckpointCoordinatorTest {
                        verify(vertex2, 
times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2));
                }
 
-               coord.shutdown();
+               coord.shutdown(JobStatus.FINISHED);
        }
 
        /**
@@ -1396,16 +1414,19 @@ public class CheckpointCoordinatorTest {
                                600000,
                                0,
                                Integer.MAX_VALUE,
+                               ExternalizedCheckpointSettings.none(),
                                new ExecutionVertex[] { vertex1, vertex2 },
                                new ExecutionVertex[] { vertex1, vertex2 },
                                new ExecutionVertex[] { vertex1, vertex2 },
                                counter,
-                               new StandaloneCompletedCheckpointStore(10, cl),
-                               new HeapSavepointStore(),
+                               new StandaloneCompletedCheckpointStore(10),
+                               null,
                                new DisabledCheckpointStatsTracker());
 
+               String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+
                // Trigger savepoint and checkpoint
-               Future<String> savepointFuture1 = 
coord.triggerSavepoint(timestamp);
+               Future<CompletedCheckpoint> savepointFuture1 = 
coord.triggerSavepoint(timestamp, savepointDir);
                long savepointId1 = counter.getLast();
                CheckpointMetaData checkpointMetaDataS1 = new 
CheckpointMetaData(savepointId1, 0L);
                assertEquals(1, coord.getNumberOfPendingCheckpoints());
@@ -1427,12 +1448,12 @@ public class CheckpointCoordinatorTest {
                assertEquals(1, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
 
                
assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
-               assertFalse(savepointFuture1.isCompleted());
+               assertFalse(savepointFuture1.isDone());
 
                assertTrue(coord.triggerCheckpoint(timestamp + 3));
                assertEquals(2, coord.getNumberOfPendingCheckpoints());
 
-               Future<String> savepointFuture2 = 
coord.triggerSavepoint(timestamp + 4);
+               Future<CompletedCheckpoint> savepointFuture2 = 
coord.triggerSavepoint(timestamp + 4, savepointDir);
                long savepointId2 = counter.getLast();
                CheckpointMetaData checkpointMetaDataS2 = new 
CheckpointMetaData(savepointId2, 0L);
                assertEquals(3, coord.getNumberOfPendingCheckpoints());
@@ -1445,8 +1466,8 @@ public class CheckpointCoordinatorTest {
                assertEquals(2, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
                
assertFalse(coord.getPendingCheckpoints().get(savepointId1).isDiscarded());
 
-               assertFalse(savepointFuture1.isCompleted());
-               assertTrue(savepointFuture2.isCompleted());
+               assertFalse(savepointFuture1.isDone());
+               assertTrue(savepointFuture2.isDone());
 
                // Ack first savepoint
                coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, 
attemptID1, checkpointMetaDataS1));
@@ -1454,7 +1475,7 @@ public class CheckpointCoordinatorTest {
 
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
                assertEquals(3, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
-               assertTrue(savepointFuture1.isCompleted());
+               assertTrue(savepointFuture1.isDone());
        }
 
        private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
@@ -1486,12 +1507,13 @@ public class CheckpointCoordinatorTest {
                                        200000,    // timeout is very long (200 
s)
                                        0L,        // no extra delay
                                        maxConcurrentAttempts,
+                                       ExternalizedCheckpointSettings.none(),
                                        new ExecutionVertex[] { triggerVertex },
                                        new ExecutionVertex[] { ackVertex },
                                        new ExecutionVertex[] { commitVertex },
                                        new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(2, cl),
-                                       new HeapSavepointStore(),
+                                       new 
StandaloneCompletedCheckpointStore(2),
+                                       null,
                                        new DisabledCheckpointStatsTracker());
 
                        coord.startCheckpointScheduler();
@@ -1529,7 +1551,7 @@ public class CheckpointCoordinatorTest {
                        Thread.sleep(200);
                        assertEquals(maxConcurrentAttempts + 1, numCalls.get());
                        
-                       coord.shutdown();
+                       coord.shutdown(JobStatus.FINISHED);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -1558,12 +1580,13 @@ public class CheckpointCoordinatorTest {
                                        200000,    // timeout is very long (200 
s)
                                        0L,        // no extra delay
                                        maxConcurrentAttempts, // max two 
concurrent checkpoints
+                                       ExternalizedCheckpointSettings.none(),
                                        new ExecutionVertex[] { triggerVertex },
                                        new ExecutionVertex[] { ackVertex },
                                        new ExecutionVertex[] { commitVertex },
                                        new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(2, cl),
-                                       new HeapSavepointStore(),
+                                       new 
StandaloneCompletedCheckpointStore(2),
+                                       null,
                                        new DisabledCheckpointStatsTracker());
 
                        coord.startCheckpointScheduler();
@@ -1602,7 +1625,7 @@ public class CheckpointCoordinatorTest {
                        assertNotNull(coord.getPendingCheckpoints().get(3L));
                        assertNotNull(coord.getPendingCheckpoints().get(4L));
                        
-                       coord.shutdown();
+                       coord.shutdown(JobStatus.FINISHED);
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -1639,12 +1662,13 @@ public class CheckpointCoordinatorTest {
                                        200000,    // timeout is very long (200 
s)
                                        0L,        // no extra delay
                                        2, // max two concurrent checkpoints
+                                       ExternalizedCheckpointSettings.none(),
                                        new ExecutionVertex[] { triggerVertex },
                                        new ExecutionVertex[] { ackVertex },
                                        new ExecutionVertex[] { commitVertex },
                                        new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(2, cl),
-                                       new HeapSavepointStore(),
+                                       new 
StandaloneCompletedCheckpointStore(2),
+                                       null,
                                        new DisabledCheckpointStatsTracker());
                        
                        coord.startCheckpointScheduler();
@@ -1690,26 +1714,29 @@ public class CheckpointCoordinatorTest {
                                200000,
                                0L,
                                1, // max one checkpoint at a time => should 
not affect savepoints
+                               ExternalizedCheckpointSettings.none(),
                                new ExecutionVertex[] { vertex1 },
                                new ExecutionVertex[] { vertex1 },
                                new ExecutionVertex[] { vertex1 },
                                checkpointIDCounter,
-                               new StandaloneCompletedCheckpointStore(2, cl),
-                               new HeapSavepointStore(),
+                               new StandaloneCompletedCheckpointStore(2),
+                               null,
                                new DisabledCheckpointStatsTracker());
 
-               List<Future<String>> savepointFutures = new ArrayList<>();
+               List<Future<CompletedCheckpoint>> savepointFutures = new 
ArrayList<>();
 
                int numSavepoints = 5;
 
+               String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+
                // Trigger savepoints
                for (int i = 0; i < numSavepoints; i++) {
-                       savepointFutures.add(coord.triggerSavepoint(i));
+                       savepointFutures.add(coord.triggerSavepoint(i, 
savepointDir));
                }
 
                // After triggering multiple savepoints, all should in progress
-               for (Future<String> savepointFuture : savepointFutures) {
-                       assertFalse(savepointFuture.isCompleted());
+               for (Future<CompletedCheckpoint> savepointFuture : 
savepointFutures) {
+                       assertFalse(savepointFuture.isDone());
                }
 
                // ACK all savepoints
@@ -1719,8 +1746,8 @@ public class CheckpointCoordinatorTest {
                }
 
                // After ACKs, all should be completed
-               for (Future<String> savepointFuture : savepointFutures) {
-                       assertTrue(savepointFuture.isCompleted());
+               for (Future<CompletedCheckpoint> savepointFuture : 
savepointFutures) {
+                       assertTrue(savepointFuture.isDone());
                }
        }
 
@@ -1740,19 +1767,22 @@ public class CheckpointCoordinatorTest {
                                200000,
                                100000000L, // very long min delay => should 
not affect savepoints
                                1,
+                               ExternalizedCheckpointSettings.none(),
                                new ExecutionVertex[] { vertex1 },
                                new ExecutionVertex[] { vertex1 },
                                new ExecutionVertex[] { vertex1 },
                                new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(2, cl),
-                               new HeapSavepointStore(),
+                               new StandaloneCompletedCheckpointStore(2),
+                               null,
                                new DisabledCheckpointStatsTracker());
 
-               Future<String> savepoint0 = coord.triggerSavepoint(0);
-               assertFalse("Did not trigger savepoint", 
savepoint0.isCompleted());
+               String savepointDir = tmpFolder.newFolder().getAbsolutePath();
+
+               Future<CompletedCheckpoint> savepoint0 = 
coord.triggerSavepoint(0, savepointDir);
+               assertFalse("Did not trigger savepoint", savepoint0.isDone());
 
-               Future<String> savepoint1 = coord.triggerSavepoint(1);
-               assertFalse("Did not trigger savepoint", 
savepoint1.isCompleted());
+               Future<CompletedCheckpoint> savepoint1 = 
coord.triggerSavepoint(1, savepointDir);
+               assertFalse("Did not trigger savepoint", savepoint1.isDone());
        }
 
        // 
------------------------------------------------------------------------
@@ -1796,18 +1826,19 @@ public class CheckpointCoordinatorTest {
 
                // set up the coordinator and validate the initial state
                CheckpointCoordinator coord = new CheckpointCoordinator(
-                       jid,
-                       600000,
-                       600000,
+                               jid,
+                               600000,
+                               600000,
                                0,
                                Integer.MAX_VALUE,
-                       arrayExecutionVertices,
-                       arrayExecutionVertices,
-                       arrayExecutionVertices,
-                       new StandaloneCheckpointIDCounter(),
-                       new StandaloneCompletedCheckpointStore(1, cl),
-                       new HeapSavepointStore(),
-                       new DisabledCheckpointStatsTracker());
+                               ExternalizedCheckpointSettings.none(),
+                               arrayExecutionVertices,
+                               arrayExecutionVertices,
+                               arrayExecutionVertices,
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(1),
+                               null,
+                               new DisabledCheckpointStatsTracker());
 
                // trigger the checkpoint
                coord.triggerCheckpoint(timestamp);
@@ -1900,18 +1931,19 @@ public class CheckpointCoordinatorTest {
 
                // set up the coordinator and validate the initial state
                CheckpointCoordinator coord = new CheckpointCoordinator(
-                       jid,
-                       600000,
-                       600000,
-                       0,
-                       Integer.MAX_VALUE,
-                       arrayExecutionVertices,
-                       arrayExecutionVertices,
-                       arrayExecutionVertices,
-                       new StandaloneCheckpointIDCounter(),
-                       new StandaloneCompletedCheckpointStore(1, cl),
-                       new HeapSavepointStore(),
-                       new DisabledCheckpointStatsTracker());
+                               jid,
+                               600000,
+                               600000,
+                               0,
+                               Integer.MAX_VALUE,
+                               ExternalizedCheckpointSettings.none(),
+                               arrayExecutionVertices,
+                               arrayExecutionVertices,
+                               arrayExecutionVertices,
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(1),
+                               null,
+                               new DisabledCheckpointStatsTracker());
 
                // trigger the checkpoint
                coord.triggerCheckpoint(timestamp);
@@ -2014,18 +2046,19 @@ public class CheckpointCoordinatorTest {
 
                // set up the coordinator and validate the initial state
                CheckpointCoordinator coord = new CheckpointCoordinator(
-                       jid,
-                       600000,
-                       600000,
-                       0,
-                       Integer.MAX_VALUE,
-                       arrayExecutionVertices,
-                       arrayExecutionVertices,
-                       arrayExecutionVertices,
-                       new StandaloneCheckpointIDCounter(),
-                       new StandaloneCompletedCheckpointStore(1, cl),
-                       new HeapSavepointStore(),
-                       new DisabledCheckpointStatsTracker());
+                               jid,
+                               600000,
+                               600000,
+                               0,
+                               Integer.MAX_VALUE,
+                               ExternalizedCheckpointSettings.none(),
+                               arrayExecutionVertices,
+                               arrayExecutionVertices,
+                               arrayExecutionVertices,
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(1),
+                               null,
+                               new DisabledCheckpointStatsTracker());
 
                // trigger the checkpoint
                coord.triggerCheckpoint(timestamp);
@@ -2141,12 +2174,13 @@ public class CheckpointCoordinatorTest {
                                600000,
                                0,
                                Integer.MAX_VALUE,
+                               ExternalizedCheckpointSettings.none(),
                                arrayExecutionVertices,
                                arrayExecutionVertices,
                                arrayExecutionVertices,
                                new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(1, cl),
-                               new HeapSavepointStore(),
+                               new StandaloneCompletedCheckpointStore(1),
+                               null,
                                new DisabledCheckpointStatsTracker());
 
                // trigger the checkpoint
@@ -2235,34 +2269,57 @@ public class CheckpointCoordinatorTest {
                comparePartitionableState(originalPartitionableStates, 
actualPartitionableStates);
        }
 
-       // 
------------------------------------------------------------------------
-       //  Utilities
-       // 
------------------------------------------------------------------------
-
-       static void sendAckMessageToCoordinator(
-                       CheckpointCoordinator coord,
-                       long checkpointId, JobID jid,
-                       ExecutionJobVertex jobVertex,
-                       JobVertexID jobVertexID,
-                       List<KeyGroupRange> keyGroupPartitions) throws 
Exception {
+       /**
+        * Tests that the externalized checkpoint configuration is respected.
+        */
+       @Test
+       public void testExternalizedCheckpoints() throws Exception {
+               try {
+                       final JobID jid = new JobID();
+                       final long timestamp = System.currentTimeMillis();
 
-               for (int index = 0; index < jobVertex.getParallelism(); 
index++) {
-                       ChainedStateHandle<StreamStateHandle> state = 
generateStateForVertex(jobVertexID, index);
-                       List<KeyGroupsStateHandle> keyGroupState = 
generateKeyGroupState(
-                                       jobVertexID,
-                                       keyGroupPartitions.get(index));
+                       // create some mock Execution vertices that receive the 
checkpoint trigger messages
+                       final ExecutionAttemptID attemptID1 = new 
ExecutionAttemptID();
+                       ExecutionVertex vertex1 = 
mockExecutionVertex(attemptID1);
 
-                       CheckpointStateHandles checkpointStateHandles = new 
CheckpointStateHandles(state, null, keyGroupState);
-                       AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
+                       // set up the coordinator and validate the initial state
+                       CheckpointCoordinator coord = new CheckpointCoordinator(
                                        jid,
-                                       
jobVertex.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-                                       new CheckpointMetaData(checkpointId, 
0L),
-                                       checkpointStateHandles);
+                                       600000,
+                                       600000,
+                                       0,
+                                       Integer.MAX_VALUE,
+                                       
ExternalizedCheckpointSettings.externalizeCheckpoints(true),
+                                       new ExecutionVertex[] { vertex1 },
+                                       new ExecutionVertex[] { vertex1 },
+                                       new ExecutionVertex[] { vertex1 },
+                                       new StandaloneCheckpointIDCounter(),
+                                       new 
StandaloneCompletedCheckpointStore(1),
+                                       "fake-directory",
+                                       new DisabledCheckpointStatsTracker());
 
-                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
+                       assertTrue(coord.triggerCheckpoint(timestamp));
+
+                       for (PendingCheckpoint checkpoint : 
coord.getPendingCheckpoints().values()) {
+                               CheckpointProperties props = 
checkpoint.getProps();
+                               CheckpointProperties expected = 
CheckpointProperties.forExternalizedCheckpoint(true);
+
+                               assertEquals(expected, props);
+                       }
+
+                       // the now we should have a completed checkpoint
+                       coord.shutdown(JobStatus.FINISHED);
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
                }
        }
 
+       // 
------------------------------------------------------------------------
+       //  Utilities
+       // 
------------------------------------------------------------------------
+
        public static List<KeyGroupsStateHandle> generateKeyGroupState(
                        JobVertexID jobVertexID,
                        KeyGroupRange keyGroupPartition) throws IOException {
@@ -2720,4 +2777,96 @@ public class CheckpointCoordinatorTest {
                Assert.assertEquals(expected, actual);
        }
 
+       @Test
+       public void testDeclineCheckpointRespectsProperties() throws Exception {
+               final JobID jid = new JobID();
+               final long timestamp = System.currentTimeMillis();
+
+               // create some mock Execution vertices that receive the 
checkpoint trigger messages
+               final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
+               ExecutionVertex vertex1 = mockExecutionVertex(attemptID1);
+
+               // set up the coordinator and validate the initial state
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                               jid,
+                               600000,
+                               600000,
+                               0,
+                               Integer.MAX_VALUE,
+                               ExternalizedCheckpointSettings.none(),
+                               new ExecutionVertex[] { vertex1 },
+                               new ExecutionVertex[] { vertex1 },
+                               new ExecutionVertex[] { vertex1 },
+                               new StandaloneCheckpointIDCounter(),
+                               new StandaloneCompletedCheckpointStore(1),
+                               null,
+                               new DisabledCheckpointStatsTracker());
+
+               assertEquals(0, coord.getNumberOfPendingCheckpoints());
+               assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+               // trigger the first checkpoint. this should succeed
+               CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
+               String targetDirectory = "xjasdkjakshdmmmxna";
+
+               CheckpointTriggerResult triggerResult = 
coord.triggerCheckpoint(timestamp, props, targetDirectory);
+               assertEquals(true, triggerResult.isSuccess());
+
+               // validate that we have a pending checkpoint
+               assertEquals(1, coord.getNumberOfPendingCheckpoints());
+               assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+               long checkpointId = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+               PendingCheckpoint checkpoint = 
coord.getPendingCheckpoints().get(checkpointId);
+
+               assertNotNull(checkpoint);
+               assertEquals(checkpointId, checkpoint.getCheckpointId());
+               assertEquals(timestamp, checkpoint.getCheckpointTimestamp());
+               assertEquals(jid, checkpoint.getJobId());
+               assertEquals(1, checkpoint.getNumberOfNonAcknowledgedTasks());
+               assertEquals(0, checkpoint.getNumberOfAcknowledgedTasks());
+               assertEquals(0, checkpoint.getTaskStates().size());
+               assertFalse(checkpoint.isDiscarded());
+               assertFalse(checkpoint.isFullyAcknowledged());
+               assertEquals(props, checkpoint.getProps());
+               assertEquals(targetDirectory, checkpoint.getTargetDirectory());
+
+               {
+                       // check that the vertices received the trigger 
checkpoint message
+                       TriggerCheckpoint expectedMessage1 = new 
TriggerCheckpoint(jid, attemptID1, checkpointId, timestamp);
+                       verify(vertex1, 
times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
+               }
+
+               // decline checkpoint, this should cancel the checkpoint and 
re-trigger with correct properties
+               coord.receiveDeclineMessage(new DeclineCheckpoint(jid, 
attemptID1, checkpointId, checkpoint.getCheckpointTimestamp()));
+               assertTrue(checkpoint.isDiscarded());
+
+               // validate that we have a new pending checkpoint
+               assertEquals(1, coord.getNumberOfPendingCheckpoints());
+               assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
+
+               long checkpointIdNew = 
coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
+               PendingCheckpoint checkpointNew = 
coord.getPendingCheckpoints().get(checkpointIdNew);
+
+               assertNotNull(checkpointNew);
+               assertEquals(checkpointIdNew, checkpointNew.getCheckpointId());
+               assertEquals(jid, checkpointNew.getJobId());
+               assertEquals(1, 
checkpointNew.getNumberOfNonAcknowledgedTasks());
+               assertEquals(0, checkpointNew.getNumberOfAcknowledgedTasks());
+               assertEquals(0, checkpointNew.getTaskStates().size());
+               assertFalse(checkpointNew.isDiscarded());
+               assertFalse(checkpointNew.isFullyAcknowledged());
+               assertNotEquals(checkpoint.getCheckpointId(), 
checkpointNew.getCheckpointId());
+               // Respect the properties and target directory from the initial 
trigger
+               assertEquals(props, checkpointNew.getProps());
+               assertEquals(targetDirectory, 
checkpointNew.getTargetDirectory());
+
+               // check that the vertices received the new trigger checkpoint 
message
+               {
+                       TriggerCheckpoint expectedMessage1 = new 
TriggerCheckpoint(jid, attemptID1, checkpointIdNew, 
checkpointNew.getCheckpointTimestamp());
+                       verify(vertex1, 
times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1));
+               }
+
+               coord.shutdown(JobStatus.FINISHED);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
index 49b5fe7..9ece607 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounterTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
@@ -76,7 +77,7 @@ public abstract class CheckpointIDCounterTest extends 
TestLogger {
                        CuratorFramework client = ZooKeeper.getClient();
                        
assertNotNull(client.checkExists().forPath("/checkpoint-id-counter"));
 
-                       counter.shutdown();
+                       counter.shutdown(JobStatus.FINISHED);
                        
assertNull(client.checkExists().forPath("/checkpoint-id-counter"));
                }
 
@@ -91,7 +92,7 @@ public abstract class CheckpointIDCounterTest extends 
TestLogger {
                        CuratorFramework client = ZooKeeper.getClient();
                        
assertNotNull(client.checkExists().forPath("/checkpoint-id-counter"));
 
-                       counter.suspend();
+                       counter.shutdown(JobStatus.SUSPENDED);
                        
assertNotNull(client.checkExists().forPath("/checkpoint-id-counter"));
                }
 
@@ -120,7 +121,7 @@ public abstract class CheckpointIDCounterTest extends 
TestLogger {
                        assertEquals(4, counter.getAndIncrement());
                }
                finally {
-                       counter.shutdown();
+                       counter.shutdown(JobStatus.FINISHED);
                }
        }
 
@@ -183,7 +184,7 @@ public abstract class CheckpointIDCounterTest extends 
TestLogger {
                                executor.shutdown();
                        }
 
-                       counter.shutdown();
+                       counter.shutdown(JobStatus.FINISHED);
                }
        }
 
@@ -200,7 +201,7 @@ public abstract class CheckpointIDCounterTest extends 
TestLogger {
                assertEquals(1337, counter.getAndIncrement());
                assertEquals(1338, counter.getAndIncrement());
 
-               counter.shutdown();
+               counter.shutdown(JobStatus.FINISHED);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
index 5772fae..c996886 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointPropertiesTest.java
@@ -23,15 +23,66 @@ import org.junit.Test;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for the default checkpoint properties.
+ */
 public class CheckpointPropertiesTest {
 
+       /**
+        * Tests the default checkpoint properties.
+        */
        @Test
        public void testCheckpointProperties() {
-               
assertFalse(CheckpointProperties.forStandardCheckpoint().isSavepoint());
+               CheckpointProperties props = 
CheckpointProperties.forStandardCheckpoint();
+
+               assertFalse(props.forceCheckpoint());
+               assertFalse(props.externalizeCheckpoint());
+               assertTrue(props.discardOnSubsumed());
+               assertTrue(props.discardOnJobFinished());
+               assertTrue(props.discardOnJobCancelled());
+               assertTrue(props.discardOnJobFailed());
+               assertTrue(props.discardOnJobSuspended());
        }
 
+       /**
+        * Tests the external checkpoints properties.
+        */
+       @Test
+       public void testPersistentCheckpointProperties() {
+               CheckpointProperties props = 
CheckpointProperties.forExternalizedCheckpoint(true);
+
+               assertFalse(props.forceCheckpoint());
+               assertTrue(props.externalizeCheckpoint());
+               assertTrue(props.discardOnSubsumed());
+               assertTrue(props.discardOnJobFinished());
+               assertTrue(props.discardOnJobCancelled());
+               assertFalse(props.discardOnJobFailed());
+               assertTrue(props.discardOnJobSuspended());
+
+               props = CheckpointProperties.forExternalizedCheckpoint(false);
+
+               assertFalse(props.forceCheckpoint());
+               assertTrue(props.externalizeCheckpoint());
+               assertTrue(props.discardOnSubsumed());
+               assertTrue(props.discardOnJobFinished());
+               assertFalse(props.discardOnJobCancelled());
+               assertFalse(props.discardOnJobFailed());
+               assertTrue(props.discardOnJobSuspended());
+       }
+
+       /**
+        * Tests the default (manually triggered) savepoint properties.
+        */
        @Test
        public void testSavepointProperties() {
-               
assertTrue(CheckpointProperties.forStandardSavepoint().isSavepoint());
+               CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
+
+               assertTrue(props.forceCheckpoint());
+               assertTrue(props.externalizeCheckpoint());
+               assertFalse(props.discardOnSubsumed());
+               assertFalse(props.discardOnJobFinished());
+               assertFalse(props.discardOnJobCancelled());
+               assertFalse(props.discardOnJobFailed());
+               assertFalse(props.discardOnJobSuspended());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index c20c604..b4dcab5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.savepoint.HeapSavepointStore;
 import 
org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
@@ -27,6 +26,7 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.CheckpointStateHandles;
@@ -58,8 +58,6 @@ import static org.mockito.Mockito.when;
  */
 public class CheckpointStateRestoreTest {
 
-       private static final ClassLoader cl = 
Thread.currentThread().getContextClassLoader();
-
        @Test
        public void testSetState() {
                try {
@@ -101,12 +99,13 @@ public class CheckpointStateRestoreTest {
                                        200000L,
                                        0,
                                        Integer.MAX_VALUE,
+                                       ExternalizedCheckpointSettings.none(),
                                        new ExecutionVertex[] { stateful1, 
stateful2, stateful3, stateless1, stateless2 },
                                        new ExecutionVertex[] { stateful1, 
stateful2, stateful3, stateless1, stateless2 },
                                        new ExecutionVertex[0],
                                        new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(1, cl),
-                                       new HeapSavepointStore(),
+                                       new 
StandaloneCompletedCheckpointStore(1),
+                                       null,
                                        new DisabledCheckpointStatsTracker());
 
                        // create ourselves a checkpoint with state
@@ -199,12 +198,13 @@ public class CheckpointStateRestoreTest {
                                        200000L,
                                        0,
                                        Integer.MAX_VALUE,
+                                       ExternalizedCheckpointSettings.none(),
                                        new ExecutionVertex[] { stateful1, 
stateful2, stateful3, stateless1, stateless2 },
                                        new ExecutionVertex[] { stateful1, 
stateful2, stateful3, stateless1, stateless2 },
                                        new ExecutionVertex[0],
                                        new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(1, cl),
-                                       new HeapSavepointStore(),
+                                       new 
StandaloneCompletedCheckpointStore(1),
+                                       null,
                                        new DisabledCheckpointStatsTracker());
 
                        // create ourselves a checkpoint with state
@@ -252,12 +252,13 @@ public class CheckpointStateRestoreTest {
                                        200000L,
                                        0,
                                        Integer.MAX_VALUE,
+                                       ExternalizedCheckpointSettings.none(),
                                        new ExecutionVertex[] { 
mock(ExecutionVertex.class) },
                                        new ExecutionVertex[] { 
mock(ExecutionVertex.class) },
                                        new ExecutionVertex[0],
                                        new StandaloneCheckpointIDCounter(),
-                                       new 
StandaloneCompletedCheckpointStore(1, cl),
-                                       new HeapSavepointStore(),
+                                       new 
StandaloneCompletedCheckpointStore(1),
+                                       null,
                                        new DisabledCheckpointStatsTracker());
 
                        try {

Reply via email to