http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.checkpoints.job.html
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.checkpoints.job.html
 
b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.checkpoints.job.html
index faeac34..d21349c 100644
--- 
a/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.checkpoints.job.html
+++ 
b/flink-runtime-web/web-dashboard/web/partials/jobs/job.plan.node.checkpoints.job.html
@@ -50,6 +50,10 @@ limitations under the License.
         <p><strong>Average:</strong><span> {{ 
jobCheckpointStats['size']['avg'] | humanizeBytes }}</span></p>
       </td>
     </tr>
+    <tr ng-if="jobCheckpointStats['external-path']">
+      <td colspan="4"><strong>Latest Checkpoint Path:</strong> {{ 
jobCheckpointStats['external-path'] }}
+      </td>
+    </tr>
   </tbody>
 </table>
 <div ng-if="!showHistory &amp;&amp; jobCheckpointStats &amp;&amp; 
jobCheckpointStats['history'].length &gt; 0"><a ng-click="toggleHistory()" 
class="btn btn-default"><strong>Show history</strong> ({{ 
jobCheckpointStats['history'].length }}) <i class="fa 
fa-chevron-down"></i></a></div>

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index e95afe0..ab4bde7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -18,17 +18,20 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import akka.dispatch.Futures;
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.JobStatusListener;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
@@ -43,7 +46,6 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.Future;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -70,7 +72,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class CheckpointCoordinator {
 
-       protected static final Logger LOG = 
LoggerFactory.getLogger(CheckpointCoordinator.class);
+       private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointCoordinator.class);
 
        /** The number of recent checkpoints whose IDs are remembered */
        private static final int NUM_GHOST_CHECKPOINT_IDS = 16;
@@ -106,9 +108,9 @@ public class CheckpointCoordinator {
         * accessing this don't block the job manager actor and run 
asynchronously. */
        private final CompletedCheckpointStore completedCheckpointStore;
 
-       /** Store for savepoints. */
-       private final SavepointStore savepointStore;
-       
+       /** Default directory for persistent checkpoints; <code>null</code> if 
none configured. */
+       private final String checkpointDirectory;
+
        /** A list of recent checkpoint IDs, to identify late messages (vs 
invalid ones) */
        private final ArrayDeque<Long> recentPendingCheckpoints;
 
@@ -157,6 +159,9 @@ public class CheckpointCoordinator {
        /** Helper for tracking checkpoint statistics  */
        private final CheckpointStatsTracker statsTracker;
 
+       /** Default checkpoint properties **/
+       private final CheckpointProperties checkpointProperties;
+
        // 
--------------------------------------------------------------------------------------------
 
        public CheckpointCoordinator(
@@ -165,12 +170,13 @@ public class CheckpointCoordinator {
                        long checkpointTimeout,
                        long minPauseBetweenCheckpoints,
                        int maxConcurrentCheckpointAttempts,
+                       ExternalizedCheckpointSettings externalizeSettings,
                        ExecutionVertex[] tasksToTrigger,
                        ExecutionVertex[] tasksToWaitFor,
                        ExecutionVertex[] tasksToCommitTo,
                        CheckpointIDCounter checkpointIDCounter,
                        CompletedCheckpointStore completedCheckpointStore,
-                       SavepointStore savepointStore,
+                       String checkpointDirectory,
                        CheckpointStatsTracker statsTracker) {
 
                // sanity checks
@@ -179,6 +185,12 @@ public class CheckpointCoordinator {
                checkArgument(minPauseBetweenCheckpoints >= 0, 
"minPauseBetweenCheckpoints must be >= 0");
                checkArgument(maxConcurrentCheckpointAttempts >= 1, 
"maxConcurrentCheckpointAttempts must be >= 1");
 
+               if (externalizeSettings.externalizeCheckpoints() && 
checkpointDirectory == null) {
+                       throw new IllegalStateException("CheckpointConfig says 
to persist periodic " +
+                                       "checkpoints, but no checkpoint 
directory has been configured. You can " +
+                                       "configure configure one via key '" + 
ConfigConstants.CHECKPOINTS_DIRECTORY_KEY + "'.");
+               }
+
                // it does not make sense to schedule checkpoints more often 
then the desired
                // time between checkpoints
                if (baseInterval < minPauseBetweenCheckpoints) {
@@ -196,12 +208,19 @@ public class CheckpointCoordinator {
                this.pendingCheckpoints = new LinkedHashMap<>();
                this.checkpointIdCounter = checkNotNull(checkpointIDCounter);
                this.completedCheckpointStore = 
checkNotNull(completedCheckpointStore);
-               this.savepointStore = checkNotNull(savepointStore);
+               this.checkpointDirectory = checkpointDirectory;
                this.recentPendingCheckpoints = new 
ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
                this.statsTracker = checkNotNull(statsTracker);
 
                this.timer = new Timer("Checkpoint Timer", true);
 
+               if (externalizeSettings.externalizeCheckpoints()) {
+                       LOG.info("Persisting periodic checkpoints externally at 
{}.", checkpointDirectory);
+                       checkpointProperties = 
CheckpointProperties.forExternalizedCheckpoint(externalizeSettings.deleteOnCancellation());
+               } else {
+                       checkpointProperties = 
CheckpointProperties.forStandardCheckpoint();
+               }
+
                try {
                        // Make sure the checkpoint ID enumerator is running. 
Possibly
                        // issues a blocking call to ZooKeeper.
@@ -219,33 +238,9 @@ public class CheckpointCoordinator {
         * Shuts down the checkpoint coordinator.
         *
         * <p>After this method has been called, the coordinator does not accept
-        * and further messages and cannot trigger any further checkpoints. All
-        * checkpoint state is discarded.
-        */
-       public void shutdown() throws Exception {
-               shutdown(true);
-       }
-
-       /**
-        * Suspends the checkpoint coordinator.
-        *
-        * <p>After this method has been called, the coordinator does not accept
         * and further messages and cannot trigger any further checkpoints.
-        *
-        * <p>The difference to shutdown is that checkpoint state in the store
-        * and counter is kept around if possible to recover later.
         */
-       public void suspend() throws Exception {
-               shutdown(false);
-       }
-
-       /**
-        * Shuts down the checkpoint coordinator.
-        *
-        * @param shutdownStoreAndCounter Depending on this flag the checkpoint
-        * state services are shut down or suspended.
-        */
-       private void shutdown(boolean shutdownStoreAndCounter) throws Exception 
{
+       public void shutdown(JobStatus jobStatus) throws Exception {
                synchronized (lock) {
                        if (!shutdown) {
                                shutdown = true;
@@ -263,13 +258,8 @@ public class CheckpointCoordinator {
                                }
                                pendingCheckpoints.clear();
 
-                               if (shutdownStoreAndCounter) {
-                                       completedCheckpointStore.shutdown();
-                                       checkpointIdCounter.shutdown();
-                               } else {
-                                       completedCheckpointStore.suspend();
-                                       checkpointIdCounter.suspend();
-                               }
+                               completedCheckpointStore.shutdown(jobStatus);
+                               checkpointIdCounter.shutdown(jobStatus);
                        }
                }
        }
@@ -282,29 +272,49 @@ public class CheckpointCoordinator {
        //  Handling checkpoints and messages
        // 
--------------------------------------------------------------------------------------------
 
-       public Future<String> triggerSavepoint(long timestamp) throws Exception 
{
-               CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
CheckpointProperties.forStandardSavepoint());
+       /**
+        * Triggers a savepoint with the given savepoint directory as a target.
+        *
+        * @param timestamp The timestamp for the savepoint.
+        * @param targetDirectory Target directory for the savepoint.
+        * @return A future to the completed checkpoint
+        * @throws IllegalStateException If no savepoint directory has been
+        *                               specified and no default savepoint 
directory has been
+        *                               configured
+        * @throws Exception             Failures during triggering are 
forwarded
+        */
+       public Future<CompletedCheckpoint> triggerSavepoint(long timestamp, 
String targetDirectory) throws Exception {
+               checkNotNull(targetDirectory, "Savepoint target directory");
+
+               CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
+               CheckpointTriggerResult result = triggerCheckpoint(timestamp, 
props, targetDirectory);
 
                if (result.isSuccess()) {
-                       PendingSavepoint savepoint = (PendingSavepoint) 
result.getPendingCheckpoint();
-                       return savepoint.getCompletionFuture();
-               }
-               else {
-                       return Futures.failed(new Exception("Failed to trigger 
savepoint: " + result.getFailureReason().message()));
+                       return 
result.getPendingCheckpoint().getCompletionFuture();
+               } else {
+                       Throwable cause = new Exception("Failed to trigger 
savepoint: " + result.getFailureReason().message());
+                       Future<CompletedCheckpoint> failed = 
FlinkCompletableFuture.completedExceptionally(cause);
+                       return failed;
                }
        }
 
        /**
-        * Triggers a new checkpoint and uses the given timestamp as the 
checkpoint
+        * Triggers a new standard checkpoint and uses the given timestamp as 
the checkpoint
         * timestamp.
         *
         * @param timestamp The timestamp for the checkpoint.
+        * @return <code>true</code> if triggering the checkpoint succeeded.
         */
        public boolean triggerCheckpoint(long timestamp) throws Exception {
-               return triggerCheckpoint(timestamp, 
CheckpointProperties.forStandardCheckpoint()).isSuccess();
+               return triggerCheckpoint(timestamp, checkpointProperties, 
checkpointDirectory).isSuccess();
        }
 
-       CheckpointTriggerResult triggerCheckpoint(long timestamp, 
CheckpointProperties props) throws Exception {
+       CheckpointTriggerResult triggerCheckpoint(long timestamp, 
CheckpointProperties props, String targetDirectory) throws Exception {
+               // Sanity check
+               if (props.externalizeCheckpoint() && targetDirectory == null) {
+                       throw new IllegalStateException("No target directory 
specified to persist checkpoint to.");
+               }
+
                // make some eager pre-checks
                synchronized (lock) {
                        // abort if the coordinator has been shutdown in the 
meantime
@@ -315,7 +325,7 @@ public class CheckpointCoordinator {
                        // validate whether the checkpoint can be triggered, 
with respect to the limit of
                        // concurrent checkpoints, and the minimum time between 
checkpoints.
                        // these checks are not relevant for savepoints
-                       if (!props.isSavepoint()) {
+                       if (!props.forceCheckpoint()) {
                                // sanity check: there should never be more 
than one trigger request queued
                                if (triggerRequestQueued) {
                                        LOG.warn("Trying to trigger another 
checkpoint while one was queued already");
@@ -402,9 +412,13 @@ public class CheckpointCoordinator {
                                return new 
CheckpointTriggerResult(CheckpointDeclineReason.EXCEPTION);
                        }
 
-                       final PendingCheckpoint checkpoint = 
props.isSavepoint() ?
-                                       new PendingSavepoint(job, checkpointID, 
timestamp, ackTasks, savepointStore) :
-                                       new PendingCheckpoint(job, 
checkpointID, timestamp, ackTasks);
+                       final PendingCheckpoint checkpoint = new 
PendingCheckpoint(
+                                       job,
+                                       checkpointID,
+                                       timestamp,
+                                       ackTasks,
+                                       props,
+                                       targetDirectory);
 
                        // schedule the timer that will clean up the expired 
checkpoints
                        TimerTask canceller = new TimerTask() {
@@ -439,7 +453,7 @@ public class CheckpointCoordinator {
                                        if (shutdown) {
                                                return new 
CheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);
                                        }
-                                       else if (!props.isSavepoint()) {
+                                       else if (!props.forceCheckpoint()) {
                                                if (triggerRequestQueued) {
                                                        LOG.warn("Trying to 
trigger another checkpoint while one was queued already");
                                                        return new 
CheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);
@@ -566,7 +580,7 @@ public class CheckpointCoordinator {
                                }
                                if (!haveMoreRecentPending && 
!triggerRequestQueued) {
                                        LOG.info("Triggering new checkpoint 
because of discarded checkpoint " + checkpointId);
-                                       
triggerCheckpoint(System.currentTimeMillis());
+                                       
triggerCheckpoint(System.currentTimeMillis(), checkpoint.getProps(), 
checkpoint.getTargetDirectory());
                                } else if (!haveMoreRecentPending) {
                                        LOG.info("Promoting queued checkpoint 
request because of discarded checkpoint " + checkpointId);
                                        triggerQueuedRequests();

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
index 76af4be..48cec7d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointIDCounter.java
@@ -18,29 +18,27 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
 /**
  * A checkpoint ID counter.
  */
 public interface CheckpointIDCounter {
 
        /**
-        * Starts the {@link CheckpointIDCounter} service.
+        * Starts the {@link CheckpointIDCounter} service down.
         */
        void start() throws Exception;
 
        /**
-        * Shuts the {@link CheckpointIDCounter} service down and frees all 
created
-        * resources.
-        */
-       void shutdown() throws Exception;
-
-       /**
-        * Suspends the counter.
+        * Shuts the {@link CheckpointIDCounter} service.
+        *
+        * <p>The job status is forwarded and used to decide whether state 
should
+        * actually be discarded or kept.
         *
-        * <p>If the implementation allows recovery, the counter state needs to 
be
-        * kept. Otherwise, this acts as shutdown.
+        * @param jobStatus Job state on shut down
         */
-       void suspend() throws Exception;
+       void shutdown(JobStatus jobStatus) throws Exception;
 
        /**
         * Atomically increments the current checkpoint ID.

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
index 7ea645a..e4856cf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointProperties.java
@@ -18,44 +18,252 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+import java.io.Serializable;
+
 /**
  * The configuration of a checkpoint, such as whether
  * <ul>
- *     <li>The checkpoint is a savepoint</li>
- *     <li>The checkpoint must be full, or may be incremental</li>
- *     <li>The checkpoint format must be the common (cross backend) format, or 
may be state-backend specific</li>
+ *     <li>The checkpoint should be persisted</li>
+ *     <li>The checkpoint must be full, or may be incremental (not yet 
implemented)</li>
+ *     <li>The checkpoint format must be the common (cross backend) format,
+ *     or may be state-backend specific (not yet implemented)</li>
+ *     <li>when the checkpoint should be garbage collected</li>
  * </ul>
  */
-public class CheckpointProperties {
+public class CheckpointProperties implements Serializable {
+
+       private static final long serialVersionUID = -8835900655844879469L;
+
+       private final boolean forced;
+
+       private final boolean externalize;
 
-       private final boolean isSavepoint;
+       private final boolean discardSubsumed;
+       private final boolean discardFinished;
+       private final boolean discardCancelled;
+       private final boolean discardFailed;
+       private final boolean discardSuspended;
 
-       private CheckpointProperties(boolean isSavepoint) {
-               this.isSavepoint = isSavepoint;
+       CheckpointProperties(
+                       boolean forced,
+                       boolean externalize,
+                       boolean discardSubsumed,
+                       boolean discardFinished,
+                       boolean discardCancelled,
+                       boolean discardFailed,
+                       boolean discardSuspended) {
+
+               this.forced = forced;
+               this.externalize = externalize;
+               this.discardSubsumed = discardSubsumed;
+               this.discardFinished = discardFinished;
+               this.discardCancelled = discardCancelled;
+               this.discardFailed = discardFailed;
+               this.discardSuspended = discardSuspended;
+
+               // Not persisted, but needs manual clean up
+               if (!externalize && !(discardSubsumed && discardFinished && 
discardCancelled
+                               && discardFailed && discardSuspended)) {
+                       throw new IllegalStateException("CheckpointProperties 
say to *not* persist the " +
+                                       "checkpoint, but the checkpoint 
requires manual cleanup.");
+               }
        }
 
        // 
------------------------------------------------------------------------
 
-       public boolean isSavepoint() {
-               return isSavepoint;
+       /**
+        * Returns whether the checkpoint should be forced.
+        *
+        * <p>Forced checkpoints ignore the configured maximum number of 
concurrent
+        * checkpoints and minimum time between checkpoints. Furthermore, they 
are
+        * not subsumed by more recent checkpoints as long as they are pending.
+        *
+        * @return <code>true</code> if the checkpoint should be forced;
+        * <code>false</code> otherwise.
+        *
+        * @see CheckpointCoordinator
+        * @see PendingCheckpoint
+        */
+       public boolean forceCheckpoint() {
+               return forced;
+       }
+
+       /**
+        * Returns whether the checkpoint should be persisted externally.
+        *
+        * @return <code>true</code> if the checkpoint should be persisted
+        * externally; <code>false</code> otherwise.
+        *
+        * @see PendingCheckpoint
+        */
+       public boolean externalizeCheckpoint() {
+               return externalize;
        }
 
        // 
------------------------------------------------------------------------
+       // Garbage collection behaviour
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Returns whether the checkpoint should be discarded when it is 
subsumed.
+        *
+        * <p>A checkpoint is subsumed when the maximum number of retained
+        * checkpoints is reached and a more recent checkpoint completes..
+        *
+        * @return <code>true</code> if the checkpoint should be discarded when 
it
+        * is subsumed; <code>false</code> otherwise.
+        *
+        * @see CompletedCheckpointStore
+        */
+       public boolean discardOnSubsumed() {
+               return discardSubsumed;
+       }
+
+       /**
+        * Returns whether the checkpoint should be discarded when the owning 
job
+        * reaches the {@link JobStatus#FINISHED} state.
+        *
+        * @return <code>true</code> if the checkpoint should be discarded when 
the
+        * owning job reaches the {@link JobStatus#FINISHED} state; 
<code>false</code>
+        * otherwise.
+        *
+        * @see CompletedCheckpointStore
+        */
+       public boolean discardOnJobFinished() {
+               return discardFinished;
+       }
+
+       /**
+        * Returns whether the checkpoint should be discarded when the owning 
job
+        * reaches the {@link JobStatus#CANCELED} state.
+        *
+        * @return <code>true</code> if the checkpoint should be discarded when 
the
+        * owning job reaches the {@link JobStatus#CANCELED} state; 
<code>false</code>
+        * otherwise.
+        *
+        * @see CompletedCheckpointStore
+        */
+       public boolean discardOnJobCancelled() {
+               return discardCancelled;
+       }
+
+       /**
+        * Returns whether the checkpoint should be discarded when the owning 
job
+        * reaches the {@link JobStatus#FAILED} state.
+        *
+        * @return <code>true</code> if the checkpoint should be discarded when 
the
+        * owning job reaches the {@link JobStatus#FAILED} state; 
<code>false</code>
+        * otherwise.
+        *
+        * @see CompletedCheckpointStore
+        */
+       public boolean discardOnJobFailed() {
+               return discardFailed;
+       }
+
+       /**
+        * Returns whether the checkpoint should be discarded when the owning 
job
+        * reaches the {@link JobStatus#SUSPENDED} state.
+        *
+        * @return <code>true</code> if the checkpoint should be discarded when 
the
+        * owning job reaches the {@link JobStatus#SUSPENDED} state; 
<code>false</code>
+        * otherwise.
+        *
+        * @see CompletedCheckpointStore
+        */
+       public boolean discardOnJobSuspended() {
+               return discardSuspended;
+       }
+
+       // 
------------------------------------------------------------------------
+
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               CheckpointProperties that = (CheckpointProperties) o;
+               return forced == that.forced &&
+                               externalize == that.externalize &&
+                               discardSubsumed == that.discardSubsumed &&
+                               discardFinished == that.discardFinished &&
+                               discardCancelled == that.discardCancelled &&
+                               discardFailed == that.discardFailed &&
+                               discardSuspended == that.discardSuspended;
+       }
+
+       @Override
+       public int hashCode() {
+               int result = (forced ? 1 : 0);
+               result = 31 * result + (externalize ? 1 : 0);
+               result = 31 * result + (discardSubsumed ? 1 : 0);
+               result = 31 * result + (discardFinished ? 1 : 0);
+               result = 31 * result + (discardCancelled ? 1 : 0);
+               result = 31 * result + (discardFailed ? 1 : 0);
+               result = 31 * result + (discardSuspended ? 1 : 0);
+               return result;
+       }
 
        @Override
        public String toString() {
-               return "CheckpointProperties {" +
-                               "isSavepoint=" + isSavepoint +
+               return "CheckpointProperties{" +
+                               "forced=" + forced +
+                               ", externalize=" + externalizeCheckpoint() +
+                               ", discardSubsumed=" + discardSubsumed +
+                               ", discardFinished=" + discardFinished +
+                               ", discardCancelled=" + discardCancelled +
+                               ", discardFailed=" + discardFailed +
+                               ", discardSuspended=" + discardSuspended +
                                '}';
        }
 
        // 
------------------------------------------------------------------------
 
+       /**
+        * Creates the checkpoint properties for a (manually triggered) 
savepoint.
+        *
+        * <p>Savepoints are forced and persisted externally. They have to be
+        * garbage collected manually.
+        *
+        * @return Checkpoint properties for a (manually triggered) savepoint.
+        */
        public static CheckpointProperties forStandardSavepoint() {
-               return new CheckpointProperties(true);
+               return new CheckpointProperties(true, true, false, false, 
false, false, false);
        }
 
+       /**
+        * Creates the checkpoint properties for a regular checkpoint.
+        *
+        * <p>Regular checkpoints are not forced and not persisted externally. 
They
+        * are garbage collected automatically.
+        *
+        * @return Checkpoint properties for a regular checkpoint.
+        */
        public static CheckpointProperties forStandardCheckpoint() {
-               return new CheckpointProperties(false);
+               return new CheckpointProperties(false, false, true, true, true, 
true, true);
+       }
+
+       /**
+        * Creates the checkpoint properties for an external checkpoint.
+        *
+        * <p>External checkpoints are not forced, but persisted externally. 
They
+        * are garbage collected automatically, except when the owning job
+        * terminates in state {@link JobStatus#FAILED}. The user is required to
+        * configure the clean up behaviour on job cancellation.
+        *
+        * @param deleteOnCancellation Flag indicating whether to discard on 
cancellation.
+        *
+        * @return Checkpoint properties for an external checkpoint.
+        */
+       public static CheckpointProperties forExternalizedCheckpoint(boolean 
deleteOnCancellation) {
+               return new CheckpointProperties(false, true, true, true, 
deleteOnCancellation, false, true);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 0d279f1..e135272 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -19,11 +19,15 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.Map;
 import java.util.Objects;
 
@@ -34,7 +38,9 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * A successful checkpoint describes a checkpoint after all required tasks 
acknowledged it (with their state)
  * and that is considered completed.
  */
-public class CompletedCheckpoint implements StateObject {
+public class CompletedCheckpoint implements Serializable {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(CompletedCheckpoint.class);
 
        private static final long serialVersionUID = -8360248179615702014L;
 
@@ -51,9 +57,11 @@ public class CompletedCheckpoint implements StateObject {
        /** States of the different task groups belonging to this checkpoint */
        private final Map<JobVertexID, TaskState> taskStates;
 
-       /** Flag to indicate whether the completed checkpoint data should be 
deleted when this
-        * handle to the checkpoint is disposed */
-       private final boolean deleteStateWhenDisposed;
+       /** Properties for this checkpoint. */
+       private final CheckpointProperties props;
+
+       /** External path if persisted checkpoint; <code>null</code> otherwise. 
*/
+       private final String externalPath;
 
        // 
------------------------------------------------------------------------
 
@@ -63,7 +71,8 @@ public class CompletedCheckpoint implements StateObject {
                        long timestamp,
                        long completionTimestamp,
                        Map<JobVertexID, TaskState> taskStates,
-                       boolean deleteStateWhenDisposed) {
+                       CheckpointProperties props,
+                       String externalPath) {
 
                checkArgument(checkpointID >= 0);
                checkArgument(timestamp >= 0);
@@ -74,7 +83,13 @@ public class CompletedCheckpoint implements StateObject {
                this.timestamp = timestamp;
                this.duration = completionTimestamp - timestamp;
                this.taskStates = checkNotNull(taskStates);
-               this.deleteStateWhenDisposed = deleteStateWhenDisposed;
+               this.props = checkNotNull(props);
+               this.externalPath = externalPath;
+
+               if (props.externalizeCheckpoint() && externalPath == null) {
+                       throw new NullPointerException("Checkpoint properties 
say that the checkpoint " +
+                                       "should have been persisted, but 
missing external path.");
+               }
        }
 
        // 
------------------------------------------------------------------------
@@ -95,19 +110,50 @@ public class CompletedCheckpoint implements StateObject {
                return duration;
        }
 
-       @Override
-       public void discardState() throws Exception {
-               if (deleteStateWhenDisposed) {
+       public CheckpointProperties getProperties() {
+               return props;
+       }
 
-                       try {
-                               
StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
-                       } finally {
-                               taskStates.clear();
+       public boolean subsume() throws Exception {
+               if (props.discardOnSubsumed()) {
+                       discard();
+                       return true;
+               }
+
+               return false;
+       }
+
+       public boolean discard(JobStatus jobStatus) throws Exception {
+               if (jobStatus == JobStatus.FINISHED && 
props.discardOnJobFinished() ||
+                               jobStatus == JobStatus.CANCELED && 
props.discardOnJobCancelled() ||
+                               jobStatus == JobStatus.FAILED && 
props.discardOnJobFailed() ||
+                               jobStatus == JobStatus.SUSPENDED && 
props.discardOnJobSuspended()) {
+
+                       discard();
+                       return true;
+               } else {
+                       if (externalPath != null) {
+                               LOG.info("Persistent checkpoint with ID {} at 
'{}' not discarded.",
+                                               checkpointID,
+                                               externalPath);
                        }
+
+                       return false;
+               }
+       }
+
+       private void discard() throws Exception {
+               try {
+                       if (externalPath != null) {
+                               SavepointStore.removeSavepoint(externalPath);
+                       }
+
+                       
StateUtil.bestEffortDiscardAllStateObjects(taskStates.values());
+               } finally {
+                       taskStates.clear();
                }
        }
 
-       @Override
        public long getStateSize() throws IOException {
                long result = 0L;
 
@@ -126,6 +172,10 @@ public class CompletedCheckpoint implements StateObject {
                return taskStates.get(jobVertexID);
        }
 
+       public String getExternalPath() {
+               return externalPath;
+       }
+
        // 
--------------------------------------------------------------------------------------------
 
        @Override
@@ -153,4 +203,5 @@ public class CompletedCheckpoint implements StateObject {
        public String toString() {
                return String.format("Checkpoint %d @ %d for %s", checkpointID, 
timestamp, job);
        }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index c52fc25..d2c0f6c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
 import java.util.List;
 
 /**
@@ -38,7 +40,7 @@ public interface CompletedCheckpointStore {
         *
         * <p>Only a bounded number of checkpoints is kept. When exceeding the 
maximum number of
         * retained checkpoints, the oldest one will be discarded via {@link
-        * CompletedCheckpoint#discard(ClassLoader)}.
+        * CompletedCheckpoint#discard()}.
         */
        void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception;
 
@@ -49,17 +51,14 @@ public interface CompletedCheckpointStore {
        CompletedCheckpoint getLatestCheckpoint() throws Exception;
 
        /**
-        * Shuts down the store and discards all checkpoint instances.
-        */
-       void shutdown() throws Exception;
-
-       /**
-        * Suspends the store.
+        * Shuts down the store.
+        *
+        * <p>The job status is forwarded and used to decide whether state 
should
+        * actually be discarded or kept.
         *
-        * <p>If the implementation allows recovery, checkpoint state needs to 
be
-        * kept around. Otherwise, this should act like shutdown.
+        * @param jobStatus Job state on shut down
         */
-       void suspend() throws Exception;
+       void shutdown(JobStatus jobStatus) throws Exception;
 
        /**
         * Returns all {@link CompletedCheckpoint} instances.

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 2ca9d69..983f1d7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -19,6 +19,11 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
+import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -28,6 +33,8 @@ import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.List;
@@ -46,6 +53,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class PendingCheckpoint {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointCoordinator.class);
+
        private final Object lock = new Object();
 
        private final JobID jobId;
@@ -58,7 +67,17 @@ public class PendingCheckpoint {
 
        private final Map<ExecutionAttemptID, ExecutionVertex> 
notYetAcknowledgedTasks;
 
-       private final boolean disposeWhenSubsumed;
+       /**
+        * The checkpoint properties. If the checkpoint should be persisted
+        * externally, it happens in {@link #finalizeCheckpoint()}.
+        */
+       private final CheckpointProperties props;
+
+       /** Target directory to potentially persist checkpoint to; 
<code>null</code> if none configured. */
+       private final String targetDirectory;
+
+       /** The promise to fulfill once the checkpoint has been completed. */
+       private final FlinkCompletableFuture<CompletedCheckpoint> 
onCompletionPromise = new FlinkCompletableFuture<>();
 
        private int numAcknowledgedTasks;
 
@@ -70,23 +89,21 @@ public class PendingCheckpoint {
                        JobID jobId,
                        long checkpointId,
                        long checkpointTimestamp,
-                       Map<ExecutionAttemptID, ExecutionVertex> 
verticesToConfirm) {
-               this(jobId, checkpointId, checkpointTimestamp, 
verticesToConfirm, true);
-       }
-
-       PendingCheckpoint(
-                       JobID jobId,
-                       long checkpointId,
-                       long checkpointTimestamp,
                        Map<ExecutionAttemptID, ExecutionVertex> 
verticesToConfirm,
-                       boolean disposeWhenSubsumed)
-       {
+                       CheckpointProperties props,
+                       String targetDirectory) {
                this.jobId = checkNotNull(jobId);
                this.checkpointId = checkpointId;
                this.checkpointTimestamp = checkpointTimestamp;
                this.notYetAcknowledgedTasks = checkNotNull(verticesToConfirm);
-               this.disposeWhenSubsumed = disposeWhenSubsumed;
                this.taskStates = new HashMap<>();
+               this.props = checkNotNull(props);
+               this.targetDirectory = targetDirectory;
+
+               // Sanity check
+               if (props.externalizeCheckpoint() && targetDirectory == null) {
+                       throw new NullPointerException("No target directory 
specified to persist checkpoint to.");
+               }
 
                checkArgument(verticesToConfirm.size() > 0,
                                "Checkpoint needs at least one vertex that 
commits the checkpoint");
@@ -137,33 +154,71 @@ public class PendingCheckpoint {
         * @return True if the checkpoint can be subsumed, false otherwise.
         */
        public boolean canBeSubsumed() {
-               return true;
+               // If the checkpoint is forced, it cannot be subsumed.
+               return !props.forceCheckpoint();
+       }
+
+       CheckpointProperties getProps() {
+               return props;
+       }
+
+       String getTargetDirectory() {
+               return targetDirectory;
        }
 
        // 
------------------------------------------------------------------------
        //  Progress and Completion
        // 
------------------------------------------------------------------------
 
+       /**
+        * Returns the completion future.
+        *
+        * @return A future to the completed checkpoint
+        */
+       public Future<CompletedCheckpoint> getCompletionFuture() {
+               return onCompletionPromise;
+       }
+
        public CompletedCheckpoint finalizeCheckpoint() throws Exception {
                synchronized (lock) {
-                       if (discarded) {
-                               throw new IllegalStateException("pending 
checkpoint is discarded");
-                       }
-                       if (notYetAcknowledgedTasks.isEmpty()) {
-                               CompletedCheckpoint completed =  new 
CompletedCheckpoint(
-                                       jobId,
-                                       checkpointId,
-                                       checkpointTimestamp,
-                                       System.currentTimeMillis(),
-                                       new HashMap<>(taskStates),
-                                       disposeWhenSubsumed);
-
-                               dispose(false);
-
-                               return completed;
-                       }
-                       else {
-                               throw new IllegalStateException("Cannot 
complete checkpoint while not all tasks are acknowledged");
+                       try {
+                               if (discarded) {
+                                       throw new 
IllegalStateException("pending checkpoint is discarded");
+                               }
+                               if (notYetAcknowledgedTasks.isEmpty()) {
+                                       // Persist if required
+                                       String externalPath = null;
+                                       if (props.externalizeCheckpoint()) {
+                                               try {
+                                                       Savepoint savepoint = 
new SavepointV1(checkpointId, taskStates.values());
+                                                       externalPath = 
SavepointStore.storeSavepoint(
+                                                                       
targetDirectory,
+                                                                       
savepoint);
+                                               } catch (Throwable t) {
+                                                       LOG.error("Failed to 
persist checkpoints " + checkpointId + ".", t);
+                                               }
+                                       }
+
+                                       CompletedCheckpoint completed = new 
CompletedCheckpoint(
+                                                       jobId,
+                                                       checkpointId,
+                                                       checkpointTimestamp,
+                                                       
System.currentTimeMillis(),
+                                                       new 
HashMap<>(taskStates),
+                                                       props,
+                                                       externalPath);
+
+                                       onCompletionPromise.complete(completed);
+
+                                       dispose(false);
+
+                                       return completed;
+                               } else {
+                                       throw new IllegalStateException("Cannot 
complete checkpoint while not all tasks are acknowledged");
+                               }
+                       } catch (Throwable t) {
+                               onCompletionPromise.completeExceptionally(t);
+                               throw t;
                        }
                }
        }
@@ -180,7 +235,6 @@ public class PendingCheckpoint {
                        ExecutionVertex vertex = 
notYetAcknowledgedTasks.remove(attemptID);
 
                        if (vertex != null) {
-
                                if (checkpointStateHandles != null) {
                                        List<KeyGroupsStateHandle> 
keyGroupsState = checkpointStateHandles.getKeyGroupsStateHandle();
                                        ChainedStateHandle<StreamStateHandle> 
nonPartitionedState =
@@ -256,18 +310,36 @@ public class PendingCheckpoint {
         * Aborts a checkpoint because it expired (took too long).
         */
        public void abortExpired() throws Exception {
-               dispose(true);
+               try {
+                       onCompletionPromise.completeExceptionally(new 
Exception("Checkpoint expired before completing"));
+               } finally {
+                       dispose(true);
+               }
        }
 
        /**
         * Aborts the pending checkpoint because a newer completed checkpoint 
subsumed it.
         */
        public void abortSubsumed() throws Exception {
-               dispose(true);
+               try {
+                       if (props.forceCheckpoint()) {
+                               onCompletionPromise.completeExceptionally(new 
Exception("Bug: forced checkpoints must never be subsumed"));
+
+                               throw new IllegalStateException("Bug: forced 
checkpoints must never be subsumed");
+                       } else {
+                               onCompletionPromise.completeExceptionally(new 
Exception("Checkpoints has been subsumed"));
+                       }
+               } finally {
+                       dispose(true);
+               }
        }
 
        public void abortDeclined() throws Exception {
-               dispose(true);
+               try {
+                       onCompletionPromise.completeExceptionally(new 
Exception("Checkpoint was declined (tasks not ready)"));
+               } finally {
+                       dispose(true);
+               }
        }
 
        /**
@@ -275,10 +347,14 @@ public class PendingCheckpoint {
         * @param cause The error's exception.
         */
        public void abortError(Throwable cause) throws Exception {
-               dispose(true);
+               try {
+                       onCompletionPromise.completeExceptionally(new 
Exception("Checkpoint failed: " + cause.getMessage(), cause));
+               } finally {
+                       dispose(true);
+               }
        }
 
-       protected void dispose(boolean releaseState) throws Exception {
+       private void dispose(boolean releaseState) throws Exception {
                synchronized (lock) {
                        try {
                                discarded = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
deleted file mode 100644
index 0bb6a91..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingSavepoint.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.savepoint.Savepoint;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
-import org.apache.flink.runtime.checkpoint.savepoint.SavepointV1;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.executiongraph.ExecutionVertex;
-import org.apache.flink.util.ExceptionUtils;
-import org.slf4j.Logger;
-import scala.concurrent.Future;
-import scala.concurrent.Promise;
-
-import java.util.Map;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A pending savepoint is like a pending checkpoint, but it additionally 
performs some
- * actions upon completion, like notifying the triggerer.
- */
-public class PendingSavepoint extends PendingCheckpoint {
-
-       private static final Logger LOG = CheckpointCoordinator.LOG;
-
-       private final SavepointStore store;
-
-       /** The promise to fulfill once the savepoint is complete */
-       private final Promise<String> onCompletionPromise;
-       
-       // 
--------------------------------------------------------------------------------------------
-
-       public PendingSavepoint(
-                       JobID jobId,
-                       long checkpointId,
-                       long checkpointTimestamp,
-                       Map<ExecutionAttemptID, ExecutionVertex> 
verticesToConfirm,
-                       SavepointStore store)
-       {
-               super(jobId, checkpointId, checkpointTimestamp, 
verticesToConfirm, false);
-
-               this.store = checkNotNull(store);
-               this.onCompletionPromise = new 
scala.concurrent.impl.Promise.DefaultPromise<>();
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Savepoint completion
-       // 
--------------------------------------------------------------------------------------------
-
-       public Future<String> getCompletionFuture() {
-               return onCompletionPromise.future();
-       }
-       
-       @Override
-       public CompletedCheckpoint finalizeCheckpoint() throws Exception {
-               // finalize checkpoint (this also disposes this pending 
checkpoint)
-               CompletedCheckpoint completedCheckpoint = 
super.finalizeCheckpoint();
-
-               // now store the checkpoint externally as a savepoint
-               try {
-                       Savepoint savepoint = new SavepointV1(
-                                       completedCheckpoint.getCheckpointID(),
-                                       
completedCheckpoint.getTaskStates().values());
-                       
-                       String path = store.storeSavepoint(savepoint);
-                       onCompletionPromise.success(path);
-               }
-               catch (Throwable t) {
-                       LOG.warn("Failed to store savepoint.", t);
-                       onCompletionPromise.failure(t);
-
-                       ExceptionUtils.rethrow(t, "Failed to store savepoint.");
-               }
-
-               return completedCheckpoint;
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Cancellation / Disposal
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public boolean canBeSubsumed() {
-               return false;
-       }
-
-       @Override
-       public void abortSubsumed() throws Exception {
-               try {
-                       Exception e = new Exception("Bug: Savepoints must never 
be subsumed");
-                       onCompletionPromise.failure(e);
-                       throw e;
-               }
-               finally {
-                       dispose(true);
-               }
-       }
-
-       @Override
-       public void abortExpired() throws Exception {
-               try {
-                       LOG.info("Savepoint with checkpoint ID " + 
getCheckpointId() + " expired before completing.");
-                       onCompletionPromise.failure(new Exception("Savepoint 
expired before completing"));
-               }
-               finally {
-                       dispose(true);
-               }
-       }
-
-       @Override
-       public void abortDeclined() throws Exception {
-               try {
-                       LOG.info("Savepoint with checkpoint ID " + 
getCheckpointId() + " was declined (tasks not ready).");
-                       onCompletionPromise.failure(new Exception("Savepoint 
was declined (tasks not ready)"));
-               }
-               finally {
-                       dispose(true);
-               }
-       }
-
-       @Override
-       public void abortError(Throwable cause) throws Exception {
-               try {
-                       LOG.info("Savepoint with checkpoint ID " + 
getCheckpointId() + " failed due to an error", cause);
-                       onCompletionPromise.failure(
-                                       new Exception("Savepoint could not be 
completed: " + cause.getMessage(), cause));
-               }
-               finally {
-                       dispose(true);
-               }
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public String toString() {
-               return String.format("Pending Savepoint %d @ %d - confirmed=%d, 
pending=%d",
-                               getCheckpointId(), getCheckpointTimestamp(),
-                               getNumberOfAcknowledgedTasks(), 
getNumberOfNonAcknowledgedTasks());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
index 84cbe19..e4ed996 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointIDCounter.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 
 import java.util.concurrent.atomic.AtomicLong;
@@ -35,10 +36,7 @@ public class StandaloneCheckpointIDCounter implements 
CheckpointIDCounter {
        public void start() throws Exception {}
 
        @Override
-       public void shutdown() throws Exception {}
-
-       @Override
-       public void suspend() throws Exception {}
+       public void shutdown(JobStatus jobStatus) throws Exception {}
 
        @Override
        public long getAndIncrement() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
index aecb51e..a9624fb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCheckpointRecoveryFactory.java
@@ -41,7 +41,7 @@ public class StandaloneCheckpointRecoveryFactory implements 
CheckpointRecoveryFa
                        throws Exception {
 
                return new 
StandaloneCompletedCheckpointStore(CheckpointRecoveryFactory
-                               .NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, 
userClassLoader);
+                               .NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index 5e03988..082bca9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.state.StateUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -32,6 +34,8 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  */
 public class StandaloneCompletedCheckpointStore implements 
CompletedCheckpointStore {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(StandaloneCompletedCheckpointStore.class);
+
        /** The maximum number of checkpoints to retain (at least 1). */
        private final int maxNumberOfCheckpointsToRetain;
 
@@ -44,16 +48,10 @@ public class StandaloneCompletedCheckpointStore implements 
CompletedCheckpointSt
         * @param maxNumberOfCheckpointsToRetain The maximum number of 
checkpoints to retain (at
         *                                       least 1). Adding more 
checkpoints than this results
         *                                       in older checkpoints being 
discarded.
-        * @param userClassLoader                The user class loader used to 
discard checkpoints
         */
-       public StandaloneCompletedCheckpointStore(
-                       int maxNumberOfCheckpointsToRetain,
-                       ClassLoader userClassLoader) {
-
+       public StandaloneCompletedCheckpointStore(int 
maxNumberOfCheckpointsToRetain) {
                checkArgument(maxNumberOfCheckpointsToRetain >= 1, "Must retain 
at least one checkpoint.");
-
                this.maxNumberOfCheckpointsToRetain = 
maxNumberOfCheckpointsToRetain;
-
                this.checkpoints = new 
ArrayDeque<>(maxNumberOfCheckpointsToRetain + 1);
        }
 
@@ -64,9 +62,9 @@ public class StandaloneCompletedCheckpointStore implements 
CompletedCheckpointSt
 
        @Override
        public void addCheckpoint(CompletedCheckpoint checkpoint) throws 
Exception {
-               checkpoints.addLast(checkpoint);
+               checkpoints.add(checkpoint);
                if (checkpoints.size() > maxNumberOfCheckpointsToRetain) {
-                       checkpoints.removeFirst().discardState();
+                       checkpoints.remove().subsume();
                }
        }
 
@@ -86,17 +84,16 @@ public class StandaloneCompletedCheckpointStore implements 
CompletedCheckpointSt
        }
 
        @Override
-       public void shutdown() throws Exception {
+       public void shutdown(JobStatus jobStatus) throws Exception {
                try {
-                       StateUtil.bestEffortDiscardAllStateObjects(checkpoints);
+                       LOG.info("Shutting down");
+
+                       for (CompletedCheckpoint checkpoint : checkpoints) {
+                               checkpoint.discard(jobStatus);
+                       }
                } finally {
                        checkpoints.clear();
                }
        }
 
-       @Override
-       public void suspend() throws Exception {
-               // Do a regular shutdown, because we can't recovery anything
-               shutdown();
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
index 0bceb8b..80e79b2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointIDCounter.java
@@ -23,6 +23,7 @@ import 
org.apache.curator.framework.recipes.shared.SharedCount;
 import org.apache.curator.framework.recipes.shared.VersionedValue;
 import org.apache.curator.framework.state.ConnectionState;
 import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -91,30 +92,17 @@ public class ZooKeeperCheckpointIDCounter implements 
CheckpointIDCounter {
        }
 
        @Override
-       public void shutdown() throws Exception {
+       public void shutdown(JobStatus jobStatus) throws Exception {
                synchronized (startStopLock) {
                        if (isStarted) {
                                LOG.info("Shutting down.");
                                sharedCount.close();
                                
client.getConnectionStateListenable().removeListener(connStateListener);
 
-                               LOG.info("Removing {} from ZooKeeper", 
counterPath);
-                               
client.delete().deletingChildrenIfNeeded().inBackground().forPath(counterPath);
-
-                               isStarted = false;
-                       }
-               }
-       }
-
-       @Override
-       public void suspend() throws Exception {
-               synchronized (startStopLock) {
-                       if (isStarted) {
-                               LOG.info("Suspending.");
-                               sharedCount.close();
-                               
client.getConnectionStateListenable().removeListener(connStateListener);
-
-                               // Don't remove any state
+                               if (jobStatus.isGloballyTerminalState()) {
+                                       LOG.info("Removing {} from ZooKeeper", 
counterPath);
+                                       
client.delete().deletingChildrenIfNeeded().inBackground().forPath(counterPath);
+                               }
 
                                isStarted = false;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
index 55a0bbb..f47012d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCheckpointRecoveryFactory.java
@@ -55,7 +55,7 @@ public class ZooKeeperCheckpointRecoveryFactory implements 
CheckpointRecoveryFac
                        throws Exception {
 
                return ZooKeeperUtils.createCompletedCheckpoints(client, 
config, jobId,
-                               NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN, 
userClassLoader);
+                               NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index b826d9f..4f67921 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -24,9 +24,10 @@ import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.curator.framework.api.CuratorEventType;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
-import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -35,6 +36,7 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.ConcurrentModificationException;
 import java.util.List;
+import java.util.concurrent.Callable;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -75,9 +77,6 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
        /** The maximum number of checkpoints to retain (at least 1). */
        private final int maxNumberOfCheckpointsToRetain;
 
-       /** User class loader for discarding {@link CompletedCheckpoint} 
instances. */
-       private final ClassLoader userClassLoader;
-
        /** Local completed checkpoints. */
        private final 
ArrayDeque<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String>> 
checkpointStateHandles;
 
@@ -88,7 +87,6 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
         *                                       least 1). Adding more 
checkpoints than this results
         *                                       in older checkpoints being 
discarded. On recovery,
         *                                       we will only start with a 
single checkpoint.
-        * @param userClassLoader                The user class loader used to 
discard checkpoints
         * @param client                         The Curator ZooKeeper client
         * @param checkpointsPath                The ZooKeeper path for the 
checkpoints (needs to
         *                                       start with a '/')
@@ -98,7 +96,6 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
         */
        public ZooKeeperCompletedCheckpointStore(
                        int maxNumberOfCheckpointsToRetain,
-                       ClassLoader userClassLoader,
                        CuratorFramework client,
                        String checkpointsPath,
                        RetrievableStateStorageHelper<CompletedCheckpoint> 
stateStorage) throws Exception {
@@ -107,7 +104,6 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
                checkNotNull(stateStorage, "State storage");
 
                this.maxNumberOfCheckpointsToRetain = 
maxNumberOfCheckpointsToRetain;
-               this.userClassLoader = checkNotNull(userClassLoader, "User 
class loader");
 
                checkNotNull(client, "Curator client");
                checkNotNull(checkpointsPath, "Checkpoints path");
@@ -172,7 +168,7 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
 
                        for (int i = 0; i < numberOfInitialCheckpoints - 1; 
i++) {
                                try {
-                                       
removeFromZooKeeperAndDiscardCheckpoint(initialCheckpoints.get(i));
+                                       
removeSubsumed(initialCheckpoints.get(i));
                                }
                                catch (Exception e) {
                                        LOG.error("Failed to discard 
checkpoint", e);
@@ -200,7 +196,7 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
 
                // Everything worked, let's remove a previous checkpoint if 
necessary.
                if (checkpointStateHandles.size() > 
maxNumberOfCheckpointsToRetain) {
-                       
removeFromZooKeeperAndDiscardCheckpoint(checkpointStateHandles.removeFirst());
+                       removeSubsumed(checkpointStateHandles.removeFirst());
                }
 
                LOG.debug("Added {} to {}.", checkpoint, path);
@@ -233,68 +229,90 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
        }
 
        @Override
-       public void shutdown() throws Exception {
-               LOG.info("Shutting down");
+       public void shutdown(JobStatus jobStatus) throws Exception {
+               if (jobStatus.isGloballyTerminalState()) {
+                       LOG.info("Shutting down");
 
-               for (Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String> checkpoint : checkpointStateHandles) {
-                       try {
-                               
removeFromZooKeeperAndDiscardCheckpoint(checkpoint);
-                       }
-                       catch (Exception e) {
-                               LOG.error("Failed to discard checkpoint.", e);
+                       for 
(Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> checkpoint : 
checkpointStateHandles) {
+                               try {
+                                       removeShutdown(checkpoint, jobStatus);
+                               } catch (Exception e) {
+                                       LOG.error("Failed to discard 
checkpoint.", e);
+                               }
                        }
+
+                       checkpointStateHandles.clear();
+
+                       String path = "/" + client.getNamespace();
+
+                       LOG.info("Removing {} from ZooKeeper", path);
+                       
ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
+               } else {
+                       LOG.info("Suspending");
+
+                       // Clear the local handles, but don't remove any state
+                       checkpointStateHandles.clear();
                }
+       }
 
-               checkpointStateHandles.clear();
+       // 
------------------------------------------------------------------------
 
-               String path = "/" + client.getNamespace();
+       private void removeSubsumed(final 
Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath) 
throws Exception {
+               Callable<Void> action = new Callable<Void>() {
+                       @Override
+                       public Void call() throws Exception {
+                               stateHandleAndPath.f0.retrieveState().subsume();
+                               return null;
+                       }
+               };
 
-               LOG.info("Removing {} from ZooKeeper", path);
-               
ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
+               remove(stateHandleAndPath, action);
        }
 
-       @Override
-       public void suspend() throws Exception {
-               LOG.info("Suspending");
+       private void removeShutdown(
+                       final 
Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath,
+                       final JobStatus jobStatus) throws Exception {
 
-               // Clear the local handles, but don't remove any state
-               checkpointStateHandles.clear();
+               Callable<Void> action = new Callable<Void>() {
+                       @Override
+                       public Void call() throws Exception {
+                               CompletedCheckpoint checkpoint = 
stateHandleAndPath.f0.retrieveState();
+                               checkpoint.discard(jobStatus);
+                               return null;
+                       }
+               };
+
+               remove(stateHandleAndPath, action);
        }
 
        /**
         * Removes the state handle from ZooKeeper, discards the checkpoints, 
and the state handle.
         */
-       private void removeFromZooKeeperAndDiscardCheckpoint(
-                       final 
Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath) 
throws Exception {
+       private void remove(
+                       final 
Tuple2<RetrievableStateHandle<CompletedCheckpoint>, String> stateHandleAndPath,
+                       final Callable<Void> action) throws Exception {
 
-               final BackgroundCallback callback = new BackgroundCallback() {
+               BackgroundCallback callback = new BackgroundCallback() {
                        @Override
                        public void processResult(CuratorFramework client, 
CuratorEvent event) throws Exception {
                                try {
                                        if (event.getType() == 
CuratorEventType.DELETE) {
                                                if (event.getResultCode() == 0) 
{
-                                                       // The checkpoint
                                                        try {
-                                                               
CompletedCheckpoint checkpoint = stateHandleAndPath.f0.retrieveState();
-                                                               
checkpoint.discardState();
-                                                               // Discard the 
checkpoint
-                                                               
LOG.debug("Discarded " + checkpoint);
+                                                               action.call();
                                                        } finally {
                                                                // Discard the 
state handle
                                                                
stateHandleAndPath.f0.discardState();
                                                        }
-                                               }
-                                               else {
+                                               } else {
                                                        throw new 
IllegalStateException("Unexpected result code " +
                                                                        
event.getResultCode() + " in '" + event + "' callback.");
                                                }
-                                       }
-                                       else {
+                                       } else {
                                                throw new 
IllegalStateException("Unexpected event type " +
                                                                event.getType() 
+ " in '" + event + "' callback.");
                                        }
-                               }
-                               catch (Exception e) {
+                               } catch (Exception e) {
                                        LOG.error("Failed to discard 
checkpoint.", e);
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java
deleted file mode 100644
index 49f51be..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStore.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.core.fs.FSDataOutputStream;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A file system based {@link SavepointStore}.
- *
- * <p>Stored savepoints have the following format:
- * <pre>
- * MagicNumber SavepointVersion Savepoint
- *   - MagicNumber => int
- *   - SavepointVersion => int (returned by Savepoint#getVersion())
- *   - Savepoint => bytes (serialized via version-specific SavepointSerializer)
- * </pre>
- */
-public class FsSavepointStore implements SavepointStore {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(FsSavepointStore.class);
-
-       /** Magic number for sanity checks against stored savepoints. */
-       int MAGIC_NUMBER = 0x4960672d;
-
-       /** Root path for savepoints. */
-       private final Path rootPath;
-
-       /** Prefix for savepoint files. */
-       private final String prefix;
-
-       /** File system to use for file access. */
-       private final FileSystem fileSystem;
-
-       /**
-        * Creates a new file system based {@link SavepointStore}.
-        *
-        * @param rootPath Root path for savepoints
-        * @param prefix   Prefix for savepoint files
-        * @throws IOException On failure to access root path
-        */
-       FsSavepointStore(String rootPath, String prefix) throws IOException {
-               this.rootPath = new Path(checkNotNull(rootPath, "Root path"));
-               this.prefix = checkNotNull(prefix, "Prefix");
-
-               this.fileSystem = FileSystem.get(this.rootPath.toUri());
-       }
-
-       @Override
-       public <T extends Savepoint> String storeSavepoint(T savepoint) throws 
IOException {
-               Preconditions.checkNotNull(savepoint, "Savepoint");
-
-               Exception latestException = null;
-               Path path = null;
-               FSDataOutputStream fdos = null;
-
-               // Try to create a FS output stream
-               for (int attempt = 0; attempt < 10; attempt++) {
-                       path = new Path(rootPath, 
FileUtils.getRandomFilename(prefix));
-                       try {
-                               fdos = fileSystem.create(path, false);
-                               break;
-                       } catch (Exception e) {
-                               latestException = e;
-                       }
-               }
-
-               if (fdos == null) {
-                       throw new IOException("Failed to create file output 
stream at " + path, latestException);
-               }
-
-               boolean success = false;
-               try (DataOutputStream dos = new DataOutputStream(fdos)) {
-                       // Write header
-                       dos.writeInt(MAGIC_NUMBER);
-                       dos.writeInt(savepoint.getVersion());
-
-                       // Write savepoint
-                       SavepointSerializer<T> serializer = 
SavepointSerializers.getSerializer(savepoint);
-                       serializer.serialize(savepoint, dos);
-                       success = true;
-               } finally {
-                       if (!success && fileSystem.exists(path)) {
-                               if (!fileSystem.delete(path, true)) {
-                                       LOG.warn("Failed to delete file " + 
path + " after failed write.");
-                               }
-                       }
-               }
-
-               return path.toString();
-       }
-
-       @Override
-       public Savepoint loadSavepoint(String path) throws IOException {
-               Preconditions.checkNotNull(path, "Path");
-
-               try (DataInputStream dis = new 
DataInputViewStreamWrapper(createFsInputStream(new Path(path)))) {
-                       int magicNumber = dis.readInt();
-
-                       if (magicNumber == MAGIC_NUMBER) {
-                               int version = dis.readInt();
-
-                               SavepointSerializer<?> serializer = 
SavepointSerializers.getSerializer(version);
-                               return serializer.deserialize(dis);
-                       } else {
-                               throw new RuntimeException("Unexpected magic 
number. This is most likely " +
-                                               "caused by trying to load a 
Flink 1.0 savepoint. You cannot load a " +
-                                               "savepoint triggered by Flink 
1.0 with this version of Flink. If it is " +
-                                               "_not_ a Flink 1.0 savepoint, 
this error indicates that the specified " +
-                                               "file is not a proper savepoint 
or the file has been corrupted.");
-                       }
-               }
-       }
-
-       @Override
-       public void disposeSavepoint(String path) throws Exception {
-               Preconditions.checkNotNull(path, "Path");
-
-               try {
-                       Savepoint savepoint = loadSavepoint(path);
-                       LOG.info("Disposing savepoint: " + path);
-                       savepoint.dispose();
-
-                       Path filePath = new Path(path);
-
-                       if (fileSystem.exists(filePath)) {
-                               if (!fileSystem.delete(filePath, true)) {
-                                       throw new IOException("Failed to delete 
" + filePath + ".");
-                               }
-                       } else {
-                               throw new IllegalArgumentException("Invalid 
path '" + filePath.toUri() + "'.");
-                       }
-               } catch (Throwable t) {
-                       throw new IOException("Failed to dispose savepoint " + 
path + ".", t);
-               }
-       }
-
-       @Override
-       public void shutdown() throws Exception {
-               // Nothing to do, because the savepoint life-cycle is 
independent of
-               // the cluster life-cycle.
-       }
-
-       private FSDataInputStream createFsInputStream(Path path) throws 
IOException {
-               if (fileSystem.exists(path)) {
-                       return fileSystem.open(path);
-               } else {
-                       throw new IllegalArgumentException("Invalid path '" + 
path.toUri() + "'.");
-               }
-       }
-
-       /**
-        * Returns the savepoint root path.
-        *
-        * @return Savepoint root path
-        */
-       Path getRootPath() {
-               return rootPath;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java
deleted file mode 100644
index 2cf8f31..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/HeapSavepointStore.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.util.Preconditions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Heap-backed savepoint store.
- *
- * <p>The life-cycle of savepoints is bound to the life-cycle of the cluster.
- */
-public class HeapSavepointStore implements SavepointStore {
-
-       private static final Logger LOG = 
LoggerFactory.getLogger(HeapSavepointStore.class);
-
-       private final Object shutDownLock = new Object();
-
-       /** Stored savepoints. */
-       private final Map<String, Savepoint> savepoints = new HashMap<>(1);
-
-       /** ID counter to identify savepoints. */
-       private final AtomicInteger currentId = new AtomicInteger();
-
-       /** Flag indicating whether state store has been shut down. */
-       private boolean shutDown;
-
-       /** Shut down hook. */
-       private final Thread shutdownHook;
-
-       /**
-        * Creates a heap-backed savepoint store.
-        *
-        * <p>Savepoint are discarded on {@link #shutdown()}.
-        */
-       public HeapSavepointStore() {
-               this.shutdownHook = new Thread(new Runnable() {
-                       @Override
-                       public void run() {
-                               try {
-                                       shutdown();
-                               } catch (Throwable t) {
-                                       LOG.warn("Failure during shut down 
hook.", t);
-                               }
-                       }
-               });
-
-               try {
-                       Runtime.getRuntime().addShutdownHook(shutdownHook);
-               } catch (IllegalStateException ignored) {
-                       // JVM is already shutting down
-               } catch (Throwable t) {
-                       LOG.warn("Failed to register shutdown hook.");
-               }
-       }
-
-       @Override
-       public <T extends Savepoint> String storeSavepoint(T savepoint) throws 
IOException {
-               Preconditions.checkNotNull(savepoint, "Savepoint");
-
-               synchronized (shutDownLock) {
-                       if (shutDown) {
-                               throw new IllegalStateException("Shut down");
-                       } else {
-                               String path = "jobmanager://savepoints/" + 
currentId.incrementAndGet();
-                               savepoints.put(path, savepoint);
-                               return path;
-                       }
-               }
-       }
-
-       @Override
-       public Savepoint loadSavepoint(String path) throws IOException {
-               Preconditions.checkNotNull(path, "Path");
-
-               Savepoint savepoint;
-               synchronized (shutDownLock) {
-                       savepoint = savepoints.get(path);
-               }
-
-               if (savepoint != null) {
-                       return savepoint;
-               } else {
-                       throw new IllegalArgumentException("Invalid path '" + 
path + "'.");
-               }
-       }
-
-       @Override
-       public void disposeSavepoint(String path) throws Exception {
-               Preconditions.checkNotNull(path, "Path");
-
-               Savepoint savepoint;
-               synchronized (shutDownLock) {
-                       savepoint = savepoints.remove(path);
-               }
-
-               if (savepoint != null) {
-                       savepoint.dispose();
-               } else {
-                       throw new IllegalArgumentException("Invalid path '" + 
path + "'.");
-               }
-       }
-
-       @Override
-       public void shutdown() throws Exception {
-               synchronized (shutDownLock) {
-                       // This is problematic as the user code class loader is 
not
-                       // available at this point.
-                       for (Savepoint savepoint : savepoints.values()) {
-                               try {
-                                       savepoint.dispose();
-                               } catch (Throwable t) {
-                                       LOG.warn("Failed to dispose savepoint " 
+ savepoint.getCheckpointId(), t);
-                               }
-                       }
-
-                       savepoints.clear();
-
-                       // Remove shutdown hook to prevent resource leaks, 
unless this is
-                       // invoked by the shutdown hook itself.
-                       if (shutdownHook != null && shutdownHook != 
Thread.currentThread()) {
-                               try {
-                                       
Runtime.getRuntime().removeShutdownHook(shutdownHook);
-                               } catch (IllegalStateException ignored) {
-                                       // Race, JVM is in shutdown already, we 
can safely ignore this
-                               } catch (Throwable t) {
-                                       LOG.warn("Failed to unregister shut 
down hook.");
-                               }
-                       }
-
-                       shutDown = true;
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
index 47917b4..845008d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoader.java
@@ -19,16 +19,18 @@
 package org.apache.flink.runtime.checkpoint.savepoint;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
 /**
- * The SavepointLoader is a utility to load and verify a Savepoint, and to 
create a checkpoint from it. 
+ * The SavepointLoader is a utility to load and verify a Savepoint, and to 
create a checkpoint from it.
  */
 public class SavepointLoader {
 
@@ -39,7 +41,6 @@ public class SavepointLoader {
         *
         * @param jobId          The JobID of the job to load the savepoint for.
         * @param tasks          Tasks that will possibly be reset
-        * @param savepointStore The store that holds the savepoint.
         * @param savepointPath  The path of the savepoint to rollback to
         *
         * @throws IllegalStateException If mismatch between program and 
savepoint state
@@ -48,13 +49,12 @@ public class SavepointLoader {
        public static CompletedCheckpoint loadAndValidateSavepoint(
                        JobID jobId,
                        Map<JobVertexID, ExecutionJobVertex> tasks,
-                       SavepointStore savepointStore,
-                       String savepointPath) throws Exception {
+                       String savepointPath) throws IOException {
 
                // (1) load the savepoint
-               Savepoint savepoint = 
savepointStore.loadSavepoint(savepointPath);
+               Savepoint savepoint = 
SavepointStore.loadSavepoint(savepointPath);
                final Map<JobVertexID, TaskState> taskStates = new 
HashMap<>(savepoint.getTaskStates().size());
-               
+
                // (2) validate it (parallelism, etc)
                for (TaskState taskState : savepoint.getTaskStates()) {
                        ExecutionJobVertex executionJobVertex = 
tasks.get(taskState.getJobVertexID());
@@ -66,12 +66,12 @@ public class SavepointLoader {
                                else {
                                        String msg = String.format("Failed to 
rollback to savepoint %s. " +
                                                                        "Max 
parallelism mismatch between savepoint state and new program. " +
-                                                                       "Cannot 
map operator %s with parallelism %d to new program with " +
+                                                                       "Cannot 
map operator %s with max parallelism %d to new program with " +
                                                                        
"parallelism %d. This indicates that the program has been changed " +
                                                                        "in a 
non-compatible way after the savepoint.",
                                                        savepoint,
                                                        
taskState.getJobVertexID(),
-                                                       
taskState.getParallelism(),
+                                                       
taskState.getMaxParallelism(),
                                                        
executionJobVertex.getParallelism());
 
                                        throw new IllegalStateException(msg);
@@ -87,8 +87,9 @@ public class SavepointLoader {
                }
 
                // (3) convert to checkpoint so the system can fall back to it
-               return new CompletedCheckpoint(jobId, 
savepoint.getCheckpointId(), 0L, 0L, taskStates, false);
-       } 
+               CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
+               return new CompletedCheckpoint(jobId, 
savepoint.getCheckpointId(), 0L, 0L, taskStates, props, savepointPath);
+       }
 
        // 
------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
index 211209c..6a55b33 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointSerializer.java
@@ -25,7 +25,7 @@ import java.io.IOException;
 /**
  * Serializer for {@link Savepoint} instances.
  *
- * <p>This serializer is used to read/write a savepoint via {@link 
FsSavepointStore}.
+ * <p>This serializer is used to read/write a savepoint via {@link 
SavepointStore}.
  *
  * <p>Version-specific serializers are accessed via the {@link 
SavepointSerializers} helper.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/fd410d9f/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
index 68b88d2..4b65418 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointStore.java
@@ -18,23 +18,105 @@
 
 package org.apache.flink.runtime.checkpoint.savepoint;
 
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * Savepoint store used to persist {@link Savepoint} instances.
+ * A file system based savepoint store.
  *
- * <p>The main implementation is the {@link FsSavepointStore}. We also have the
- * {@link HeapSavepointStore} for historical reasons (introduced in Flink 1.0).
+ * <p>Stored savepoints have the following format:
+ * <pre>
+ * MagicNumber SavepointVersion Savepoint
+ *   - MagicNumber => int
+ *   - SavepointVersion => int (returned by Savepoint#getVersion())
+ *   - Savepoint => bytes (serialized via version-specific SavepointSerializer)
+ * </pre>
  */
-public interface SavepointStore {
+public class SavepointStore {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(SavepointStore.class);
+
+       /** Magic number for sanity checks against stored savepoints. */
+       private static final int MAGIC_NUMBER = 0x4960672d;
+
+       /** Prefix for savepoint files. */
+       private static final String prefix = "savepoint-";
 
        /**
         * Stores the savepoint.
         *
+        * @param targetDirectory Target directory to store savepoint in
         * @param savepoint Savepoint to be stored
         * @param <T>       Savepoint type
         * @return Path of stored savepoint
         * @throws Exception Failures during store are forwarded
         */
-       <T extends Savepoint> String storeSavepoint(T savepoint) throws 
Exception;
+       public static <T extends Savepoint> String storeSavepoint(
+                       String targetDirectory,
+                       T savepoint) throws IOException {
+
+               checkNotNull(targetDirectory, "Target directory");
+               checkNotNull(savepoint, "Savepoint");
+
+               Exception latestException = null;
+               Path path = null;
+               FSDataOutputStream fdos = null;
+
+               FileSystem fs = null;
+
+               // Try to create a FS output stream
+               for (int attempt = 0; attempt < 10; attempt++) {
+                       path = new Path(targetDirectory, 
FileUtils.getRandomFilename(prefix));
+
+                       if (fs == null) {
+                               fs = FileSystem.get(path.toUri());
+                       }
+
+                       try {
+                               fdos = fs.create(path, false);
+                               break;
+                       } catch (Exception e) {
+                               latestException = e;
+                       }
+               }
+
+               if (fdos == null) {
+                       throw new IOException("Failed to create file output 
stream at " + path, latestException);
+               }
+
+               boolean success = false;
+               try (DataOutputStream dos = new DataOutputStream(fdos)) {
+                       // Write header
+                       dos.writeInt(MAGIC_NUMBER);
+                       dos.writeInt(savepoint.getVersion());
+
+                       // Write savepoint
+                       SavepointSerializer<T> serializer = 
SavepointSerializers.getSerializer(savepoint);
+                       serializer.serialize(savepoint, dos);
+                       success = true;
+               } finally {
+                       if (!success && fs.exists(path)) {
+                               if (!fs.delete(path, true)) {
+                                       LOG.warn("Failed to delete file {} 
after failed write.", path);
+                               }
+                       }
+               }
+
+               return path.toString();
+       }
 
        /**
         * Loads the savepoint at the specified path.
@@ -43,24 +125,62 @@ public interface SavepointStore {
         * @return The loaded savepoint
         * @throws Exception Failures during load are forwared
         */
-       Savepoint loadSavepoint(String path) throws Exception;
+       public static Savepoint loadSavepoint(String path) throws IOException {
+               Preconditions.checkNotNull(path, "Path");
 
-       /**
-        * Disposes the savepoint at the specified path.
-        *
-        * @param path        Path of savepoint to dispose
-        * @throws Exception Failures during diposal are forwarded
-        */
-       void disposeSavepoint(String path) throws Exception;
+               try (DataInputStream dis = new 
DataInputViewStreamWrapper(createFsInputStream(new Path(path)))) {
+                       int magicNumber = dis.readInt();
+
+                       if (magicNumber == MAGIC_NUMBER) {
+                               int version = dis.readInt();
+
+                               SavepointSerializer<?> serializer = 
SavepointSerializers.getSerializer(version);
+                               return serializer.deserialize(dis);
+                       } else {
+                               throw new RuntimeException("Unexpected magic 
number. This is most likely " +
+                                               "caused by trying to load a 
Flink 1.0 savepoint. You cannot load a " +
+                                               "savepoint triggered by Flink 
1.0 with this version of Flink. If it is " +
+                                               "_not_ a Flink 1.0 savepoint, 
this error indicates that the specified " +
+                                               "file is not a proper savepoint 
or the file has been corrupted.");
+                       }
+               }
+       }
 
        /**
-        * Shut downs the savepoint store.
+        * Removes the savepoint meta data w/o loading and disposing it.
         *
-        * <p>Only necessary for implementations where the savepoint life-cycle 
is
-        * bound to the cluster life-cycle.
-        *
-        * @throws Exception Failures during shut down are forwarded
+        * @param path Path of savepoint to remove
+        * @throws Exception Failures during disposal are forwarded
         */
-       void shutdown() throws Exception;
+       public static void removeSavepoint(String path) throws IOException {
+               Preconditions.checkNotNull(path, "Path");
+
+               try {
+                       LOG.info("Removing savepoint: {}.", path);
+
+                       Path filePath = new Path(path);
+                       FileSystem fs = FileSystem.get(filePath.toUri());
+
+                       if (fs.exists(filePath)) {
+                               if (!fs.delete(filePath, true)) {
+                                       throw new IOException("Failed to delete 
" + filePath + ".");
+                               }
+                       } else {
+                               throw new IllegalArgumentException("Invalid 
path '" + filePath.toUri() + "'.");
+                       }
+               } catch (Throwable t) {
+                       throw new IOException("Failed to dispose savepoint " + 
path + ".", t);
+               }
+       }
+
+       private static FSDataInputStream createFsInputStream(Path path) throws 
IOException {
+               FileSystem fs = FileSystem.get(path.toUri());
+
+               if (fs.exists(path)) {
+                       return fs.open(path);
+               } else {
+                       throw new IllegalArgumentException("Invalid path '" + 
path.toUri() + "'.");
+               }
+       }
 
 }

Reply via email to