Repository: flink
Updated Branches:
  refs/heads/master 94d3166b4 -> 8cff17fcc


[FLINK-6964] [checkpoint] Fix externalized incremental checkpoints for 
StandaloneCompletedCheckpointStore


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8cff17fc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8cff17fc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8cff17fc

Branch: refs/heads/master
Commit: 8cff17fcc9b4bca6499c26fc2a6318c759cbf251
Parents: 94d3166
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Wed Jun 21 11:22:28 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Fri Jul 14 15:52:09 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |  12 +-
 .../checkpoint/CheckpointCoordinator.java       |  34 ++-
 .../savepoint/SavepointV2Serializer.java        |   7 +-
 .../state/IncrementalKeyedStateHandle.java      |  21 +-
 .../runtime/state/SharedStateRegistry.java      |  45 +++-
 .../ZooKeeperCompletedCheckpointStoreTest.java  |  19 +-
 .../savepoint/CheckpointTestUtils.java          |   2 +-
 .../state/IncrementalKeyedStateHandleTest.java  |   3 +-
 .../runtime/testingUtils/TestingCluster.scala   |  49 +++-
 .../testingUtils/TestingJobManagerLike.scala    |  59 ++++-
 .../TestingJobManagerMessages.scala             |  39 +++-
 .../ExternalizedCheckpointITCase.java           | 228 +++++++++++++++++++
 12 files changed, 482 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 291973c..9d289b4 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -185,6 +185,9 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
        /** The identifier of the last completed checkpoint. */
        private long lastCompletedCheckpointId = -1;
 
+       /** Unique ID of this backend. */
+       private UUID backendUID;
+
        private static final String SST_FILE_SUFFIX = ".sst";
 
        public RocksDBKeyedStateBackend(
@@ -233,6 +236,8 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                this.kvStateInformation = new HashMap<>();
                this.restoredKvStateMetaInfos = new HashMap<>();
                this.materializedSstFiles = new TreeMap<>();
+               this.backendUID = UUID.randomUUID();
+               LOG.debug("Setting initial keyed backend uid for operator {} to 
{}.", this.operatorIdentifier, this.backendUID);
        }
 
        /**
@@ -926,7 +931,7 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                        }
 
                        return new IncrementalKeyedStateHandle(
-                               stateBackend.operatorIdentifier,
+                               stateBackend.backendUID,
                                stateBackend.keyGroupRange,
                                checkpointId,
                                sstFiles,
@@ -1438,6 +1443,11 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                                                }
                                        }
                                } else {
+                                       // pick up again the old backend id, so 
the we can reference existing state
+                                       stateBackend.backendUID = 
restoreStateHandle.getBackendIdentifier();
+
+                                       LOG.debug("Restoring keyed backend uid 
in operator {} from incremental snapshot to {}.",
+                                               
stateBackend.operatorIdentifier, stateBackend.backendUID);
 
                                        // create hard links in the instance 
directory
                                        if 
(!stateBackend.instanceRocksDBPath.mkdirs()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 905d132..3e36158 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
@@ -417,6 +418,36 @@ public class CheckpointCoordinator {
                return triggerCheckpoint(timestamp, checkpointProperties, 
checkpointDirectory, isPeriodic).isSuccess();
        }
 
+       /**
+        * Test method to trigger a checkpoint/savepoint.
+        *
+        * @param timestamp The timestamp for the checkpoint.
+        * @param options The checkpoint options.
+        * @return A future to the completed checkpoint
+        */
+       @VisibleForTesting
+       @Internal
+       public Future<CompletedCheckpoint> triggerCheckpoint(long timestamp, 
CheckpointOptions options) throws Exception {
+               switch (options.getCheckpointType()) {
+                       case SAVEPOINT:
+                               return triggerSavepoint(timestamp, 
options.getTargetLocation());
+
+                       case FULL_CHECKPOINT:
+                               CheckpointTriggerResult triggerResult =
+                                       triggerCheckpoint(timestamp, 
checkpointProperties, checkpointDirectory, false);
+
+                               if (triggerResult.isSuccess()) {
+                                       return 
triggerResult.getPendingCheckpoint().getCompletionFuture();
+                               } else {
+                                       Throwable cause = new Exception("Failed 
to trigger checkpoint: " + triggerResult.getFailureReason().message());
+                                       return 
FlinkCompletableFuture.completedExceptionally(cause);
+                               }
+
+                       default:
+                               throw new IllegalArgumentException("Unknown 
checkpoint type: " + options.getCheckpointType());
+               }
+       }
+
        @VisibleForTesting
        CheckpointTriggerResult triggerCheckpoint(
                        long timestamp,
@@ -1092,6 +1123,7 @@ public class CheckpointCoordinator {
                CompletedCheckpoint savepoint = 
SavepointLoader.loadAndValidateSavepoint(
                                job, tasks, savepointPath, userClassLoader, 
allowNonRestored);
 
+               
savepoint.registerSharedStatesAfterRestored(sharedStateRegistry);
                completedCheckpointStore.addCheckpoint(savepoint);
                
                // Reset the checkpoint ID counter
@@ -1099,7 +1131,7 @@ public class CheckpointCoordinator {
                checkpointIdCounter.setCount(nextCheckpointId);
                
                LOG.info("Reset the checkpoint ID to {}.", nextCheckpointId);
-               
+
                return restoreLatestCheckpointedState(tasks, true, 
allowNonRestored);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
index da0022c..c8d695f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -45,6 +45,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 /**
  * (De)serializer for checkpoint metadata format version 2.
@@ -320,7 +321,7 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                        dos.writeByte(INCREMENTAL_KEY_GROUPS_HANDLE);
 
                        
dos.writeLong(incrementalKeyedStateHandle.getCheckpointId());
-                       
dos.writeUTF(incrementalKeyedStateHandle.getOperatorIdentifier());
+                       
dos.writeUTF(String.valueOf(incrementalKeyedStateHandle.getBackendIdentifier()));
                        
dos.writeInt(incrementalKeyedStateHandle.getKeyGroupRange().getStartKeyGroup());
                        
dos.writeInt(incrementalKeyedStateHandle.getKeyGroupRange().getNumberOfKeyGroups());
 
@@ -380,7 +381,7 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                } else if (INCREMENTAL_KEY_GROUPS_HANDLE == type) {
 
                        long checkpointId = dis.readLong();
-                       String operatorId = dis.readUTF();
+                       String backendId = dis.readUTF();
                        int startKeyGroup = dis.readInt();
                        int numKeyGroups = dis.readInt();
                        KeyGroupRange keyGroupRange =
@@ -391,7 +392,7 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                        Map<StateHandleID, StreamStateHandle> privateStates = 
deserializeStreamStateHandleMap(dis);
 
                        return new IncrementalKeyedStateHandle(
-                               operatorId,
+                               UUID.fromString(backendId),
                                keyGroupRange,
                                checkpointId,
                                sharedStates,

http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
index 770b5a9..0085890 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
@@ -20,10 +20,12 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
+import java.util.UUID;
 
 /**
  * The handle to states of an incremental snapshot.
@@ -57,9 +59,10 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
        private static final long serialVersionUID = -8328808513197388231L;
 
        /**
-        * The operator instance identifier for this handle
+        * UUID to identify the backend which created this state handle. This 
is in creating the key for the
+        * {@link SharedStateRegistry}.
         */
-       private final String operatorIdentifier;
+       private final UUID backendIdentifier;
 
        /**
         * The key-group range covered by this state handle
@@ -97,14 +100,14 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
        private transient SharedStateRegistry sharedStateRegistry;
 
        public IncrementalKeyedStateHandle(
-               String operatorIdentifier,
+               UUID backendIdentifier,
                KeyGroupRange keyGroupRange,
                long checkpointId,
                Map<StateHandleID, StreamStateHandle> sharedState,
                Map<StateHandleID, StreamStateHandle> privateState,
                StreamStateHandle metaStateHandle) {
 
-               this.operatorIdentifier = 
Preconditions.checkNotNull(operatorIdentifier);
+               this.backendIdentifier = 
Preconditions.checkNotNull(backendIdentifier);
                this.keyGroupRange = Preconditions.checkNotNull(keyGroupRange);
                this.checkpointId = checkpointId;
                this.sharedState = Preconditions.checkNotNull(sharedState);
@@ -134,8 +137,8 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
                return metaStateHandle;
        }
 
-       public String getOperatorIdentifier() {
-               return operatorIdentifier;
+       public UUID getBackendIdentifier() {
+               return backendIdentifier;
        }
 
        @Override
@@ -231,7 +234,7 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
         */
        @VisibleForTesting
        public SharedStateRegistryKey 
createSharedStateRegistryKeyFromFileName(StateHandleID shId) {
-               return new SharedStateRegistryKey(operatorIdentifier + '-' + 
keyGroupRange, shId);
+               return new 
SharedStateRegistryKey(String.valueOf(backendIdentifier) + '-' + keyGroupRange, 
shId);
        }
 
        /**
@@ -252,7 +255,7 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
                if (getCheckpointId() != that.getCheckpointId()) {
                        return false;
                }
-               if 
(!getOperatorIdentifier().equals(that.getOperatorIdentifier())) {
+               if 
(!getBackendIdentifier().equals(that.getBackendIdentifier())) {
                        return false;
                }
                if (!getKeyGroupRange().equals(that.getKeyGroupRange())) {
@@ -273,7 +276,7 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
        @VisibleForTesting
        @Override
        public int hashCode() {
-               int result = getOperatorIdentifier().hashCode();
+               int result = getBackendIdentifier().hashCode();
                result = 31 * result + getKeyGroupRange().hashCode();
                result = 31 * result + (int) (getCheckpointId() ^ 
(getCheckpointId() >>> 32));
                result = 31 * result + getSharedState().hashCode();

http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index af9ac9d..e0ca873 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -47,6 +48,7 @@ public class SharedStateRegistry {
        /** Executor for async state deletion */
        private final Executor asyncDisposalExecutor;
 
+       /** Default uses direct executor to delete unreferenced state */
        public SharedStateRegistry() {
                this(Executors.directExecutor());
        }
@@ -83,11 +85,16 @@ public class SharedStateRegistry {
                        entry = registeredStates.get(registrationKey);
 
                        if (entry == null) {
+
+                               // Additional check that should never fail, 
because only state handles that are not placeholders should
+                               // ever be inserted to the registry.
+                               Preconditions.checkState(!isPlaceholder(state), 
"Attempt to reference unknown state: " + registrationKey);
+
                                entry = new 
SharedStateRegistry.SharedStateEntry(state);
                                registeredStates.put(registrationKey, entry);
                        } else {
                                // delete if this is a real duplicate
-                               if (!Objects.equals(state, entry.state)) {
+                               if (!Objects.equals(state, entry.stateHandle)) {
                                        scheduledStateDeletion = state;
                                }
                                entry.increaseReferenceCount();
@@ -95,6 +102,7 @@ public class SharedStateRegistry {
                }
 
                scheduleAsyncDelete(scheduledStateDeletion);
+               LOG.trace("Registered shared state {} under key {}.", entry, 
registrationKey);
                return new Result(entry);
        }
 
@@ -112,9 +120,10 @@ public class SharedStateRegistry {
 
                final Result result;
                final StreamStateHandle scheduledStateDeletion;
+               SharedStateRegistry.SharedStateEntry entry;
 
                synchronized (registeredStates) {
-                       SharedStateRegistry.SharedStateEntry entry = 
registeredStates.get(registrationKey);
+                       entry = registeredStates.get(registrationKey);
 
                        Preconditions.checkState(entry != null,
                                "Cannot unregister a state that is not 
registered.");
@@ -124,7 +133,7 @@ public class SharedStateRegistry {
                        // Remove the state from the registry when it's not 
referenced any more.
                        if (entry.getReferenceCount() <= 0) {
                                registeredStates.remove(registrationKey);
-                               scheduledStateDeletion = entry.getState();
+                               scheduledStateDeletion = entry.getStateHandle();
                                result = new Result(null, 0);
                        } else {
                                scheduledStateDeletion = null;
@@ -132,6 +141,7 @@ public class SharedStateRegistry {
                        }
                }
 
+               LOG.trace("Unregistered shared state {} under key {}.", entry, 
registrationKey);
                scheduleAsyncDelete(scheduledStateDeletion);
                return result;
        }
@@ -142,6 +152,7 @@ public class SharedStateRegistry {
         * @param stateHandles The shared states to register.
         */
        public void registerAll(Iterable<? extends CompositeStateHandle> 
stateHandles) {
+
                if (stateHandles == null) {
                        return;
                }
@@ -156,6 +167,8 @@ public class SharedStateRegistry {
        private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
                // We do the small optimization to not issue discards for 
placeholders, which are NOPs.
                if (streamStateHandle != null && 
!isPlaceholder(streamStateHandle)) {
+
+                       LOG.trace("Scheduled delete of state handle {}.", 
streamStateHandle);
                        asyncDisposalExecutor.execute(
                                new 
SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle));
                }
@@ -171,18 +184,18 @@ public class SharedStateRegistry {
        private static class SharedStateEntry {
 
                /** The shared state handle */
-               private final StreamStateHandle state;
+               private final StreamStateHandle stateHandle;
 
                /** The current reference count of the state handle */
                private int referenceCount;
 
                SharedStateEntry(StreamStateHandle value) {
-                       this.state = value;
+                       this.stateHandle = value;
                        this.referenceCount = 1;
                }
 
-               StreamStateHandle getState() {
-                       return state;
+               StreamStateHandle getStateHandle() {
+                       return stateHandle;
                }
 
                int getReferenceCount() {
@@ -196,6 +209,14 @@ public class SharedStateRegistry {
                void decreaseReferenceCount() {
                        --referenceCount;
                }
+
+               @Override
+               public String toString() {
+                       return "SharedStateEntry{" +
+                               "stateHandle=" + stateHandle +
+                               ", referenceCount=" + referenceCount +
+                               '}';
+               }
        }
 
        /**
@@ -210,7 +231,7 @@ public class SharedStateRegistry {
                private final int referenceCount;
 
                private Result(SharedStateEntry sharedStateEntry) {
-                       this.reference = sharedStateEntry.getState();
+                       this.reference = sharedStateEntry.getStateHandle();
                        this.referenceCount = 
sharedStateEntry.getReferenceCount();
                }
 
@@ -228,6 +249,14 @@ public class SharedStateRegistry {
                public int getReferenceCount() {
                        return referenceCount;
                }
+
+               @Override
+               public String toString() {
+                       return "Result{" +
+                               "reference=" + reference +
+                               ", referenceCount=" + referenceCount +
+                               '}';
+               }
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index b5854dd..91bab85 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -18,12 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.api.BackgroundCallback;
-import org.apache.curator.framework.api.CuratorEvent;
-import org.apache.curator.framework.api.CuratorEventType;
-import org.apache.curator.framework.api.ErrorListenerPathable;
-import org.apache.curator.utils.EnsurePath;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
@@ -31,6 +25,13 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.TestLogger;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.api.ErrorListenerPathable;
+import org.apache.curator.utils.EnsurePath;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mockito;
@@ -51,6 +52,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyCollection;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -160,9 +162,12 @@ public class ZooKeeperCompletedCheckpointStoreTest extends 
TestLogger {
                        stateStorage,
                        Executors.directExecutor());
 
-               SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
+               SharedStateRegistry sharedStateRegistry = spy(new 
SharedStateRegistry());
                zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry);
 
+               verify(retrievableStateHandle1.retrieveState(), 
times(1)).registerSharedStatesAfterRestored(sharedStateRegistry);
+               verify(retrievableStateHandle2.retrieveState(), 
times(1)).registerSharedStatesAfterRestored(sharedStateRegistry);
+
                CompletedCheckpoint latestCompletedCheckpoint = 
zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
 
                // check that we return the latest retrievable checkpoint

http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
index f985573..de1f599 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/CheckpointTestUtils.java
@@ -274,7 +274,7 @@ public class CheckpointTestUtils {
 
        public static IncrementalKeyedStateHandle 
createDummyIncrementalKeyedStateHandle(Random rnd) {
                return new IncrementalKeyedStateHandle(
-                       createRandomUUID(rnd).toString(),
+                       createRandomUUID(rnd),
                        new KeyGroupRange(1, 1),
                        42L,
                        createRandomStateHandleMap(rnd),

http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
index 2a6975a..c1b3ccd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
@@ -23,6 +23,7 @@ import org.junit.Test;
 
 import java.util.Map;
 import java.util.Random;
+import java.util.UUID;
 
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -187,7 +188,7 @@ public class IncrementalKeyedStateHandleTest {
 
        private static IncrementalKeyedStateHandle create(Random rnd) {
                return new IncrementalKeyedStateHandle(
-                       "test",
+                       UUID.nameUUIDFromBytes("test".getBytes()),
                        KeyGroupRange.of(0, 0),
                        1L,
                        
placeSpies(CheckpointTestUtils.createRandomStateHandleMap(rnd)),

http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
index 9e0a6e1..0e3eae5 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala
@@ -26,10 +26,10 @@ import akka.pattern.Patterns._
 import akka.pattern.ask
 import akka.testkit.CallingThreadDispatcher
 import org.apache.flink.api.common.JobID
-import org.apache.flink.configuration.{ConfigConstants, Configuration, 
JobManagerOptions}
+import org.apache.flink.configuration.{Configuration, JobManagerOptions}
 import org.apache.flink.runtime.akka.AkkaUtils
-import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
+import org.apache.flink.runtime.checkpoint.{CheckpointOptions, 
CheckpointRecoveryFactory}
 import org.apache.flink.runtime.clusterframework.FlinkResourceManager
 import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
@@ -39,11 +39,12 @@ import org.apache.flink.runtime.instance.{ActorGateway, 
InstanceManager}
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler
 import org.apache.flink.runtime.jobmanager.{JobManager, MemoryArchivist, 
SubmittedJobGraphStore}
 import org.apache.flink.runtime.leaderelection.LeaderElectionService
+import org.apache.flink.runtime.messages.JobManagerMessages
 import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.metrics.MetricRegistry
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster
 import org.apache.flink.runtime.taskmanager.TaskManager
-import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ResponseSavepoint
+import 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{CheckpointRequest,
 CheckpointRequestFailure, CheckpointRequestSuccess, ResponseSavepoint}
 import org.apache.flink.runtime.testingUtils.TestingMessages.Alive
 import 
org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager
 import org.apache.flink.runtime.testutils.TestingResourceManager
@@ -372,6 +373,48 @@ class TestingCluster(
       case _ => throw new IOException("Dispose savepoint failed")
     }
   }
+
+  @throws(classOf[IOException])
+  def requestCheckpoint(jobId: JobID, options : CheckpointOptions): String = {
+    val jobManagerGateway = getLeaderGateway(timeout)
+
+    // wait until the cluster is ready to take a checkpoint.
+    val allRunning = jobManagerGateway.ask(
+      TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobId), timeout)
+
+    Await.ready(allRunning, timeout)
+
+    // trigger checkpoint
+    val result = Await.result(
+      jobManagerGateway.ask(CheckpointRequest(jobId, options), timeout), 
timeout)
+
+    result match {
+      case success: CheckpointRequestSuccess => success.path
+      case fail: CheckpointRequestFailure => {
+        if (fail.cause.getMessage.contains("tasks not ready")) {
+          // retry if the tasks are not ready yet.
+          Thread.sleep(50)
+          requestCheckpoint(jobId, options)
+        } else {
+          throw new IOException(fail.cause)
+        }
+      }
+      case _ => throw new IllegalStateException("Trigger checkpoint failed")
+    }
+  }
+
+  @throws[Exception]
+  def cancelJob(jobId: JobID): Unit = {
+    if (getCurrentlyRunningJobsJava.contains(jobId)) {
+      val jobManagerGateway = getLeaderGateway(timeout)
+      val cancelFuture = jobManagerGateway.ask(new 
JobManagerMessages.CancelJob(jobId), timeout)
+      val result = Await.result(cancelFuture, timeout)
+      if (!result.isInstanceOf[JobManagerMessages.CancellationSuccess]) {
+        throw new Exception("Cancellation failed")
+      }
+    }
+    else throw new IllegalStateException("Job is not running")
+  }
  }
 
 object TestingCluster {

http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
index a3d31f5..3d3af95 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerLike.scala
@@ -22,14 +22,17 @@ import akka.actor.{ActorRef, Cancellable, Terminated}
 import akka.pattern.{ask, pipe}
 import org.apache.flink.api.common.JobID
 import org.apache.flink.runtime.FlinkActor
+import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType
+import org.apache.flink.runtime.checkpoint.CompletedCheckpoint
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore
+import org.apache.flink.runtime.concurrent.BiFunction
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.jobgraph.JobStatus
 import org.apache.flink.runtime.jobmanager.JobManager
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway
 import org.apache.flink.runtime.messages.Acknowledge
 import 
org.apache.flink.runtime.messages.ExecutionGraphMessages.JobStatusChanged
-import org.apache.flink.runtime.messages.JobManagerMessages.{GrantLeadership, 
RegisterJobClient, RequestClassloadingProps}
+import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.Messages.Disconnect
 import 
org.apache.flink.runtime.messages.RegistrationMessages.RegisterTaskManager
 import org.apache.flink.runtime.messages.TaskManagerMessages.Heartbeat
@@ -336,6 +339,60 @@ trait TestingJobManagerLike extends FlinkActor {
         }
       }
 
+    case CheckpointRequest(jobId, checkpointOptions) =>
+      currentJobs.get(jobId) match {
+        case Some((graph, _)) =>
+          val checkpointCoordinator = graph.getCheckpointCoordinator()
+
+          if (checkpointCoordinator != null) {
+            // Immutable copy for the future
+            val senderRef = sender()
+            try {
+              // Do this async, because checkpoint coordinator operations can
+              // contain blocking calls to the state backend or ZooKeeper.
+              val cpFuture = checkpointCoordinator.triggerCheckpoint(
+                System.currentTimeMillis(),
+                checkpointOptions)
+
+              cpFuture.handleAsync[Void](
+                new BiFunction[CompletedCheckpoint, Throwable, Void] {
+                  override def apply(success: CompletedCheckpoint, cause: 
Throwable): Void = {
+                    if (success != null) {
+                      if (success.getExternalPointer == null &&
+                        
CheckpointType.SAVEPOINT.equals(checkpointOptions.getCheckpointType)) {
+                        senderRef ! CheckpointRequestFailure(
+                          jobId, new Exception("Savepoint has not been 
persisted.")
+                        )
+                      } else {
+                        senderRef ! CheckpointRequestSuccess(
+                          jobId,
+                          success.getCheckpointID,
+                          success.getExternalPointer,
+                          success.getTimestamp)
+                      }
+                    } else {
+                      senderRef ! CheckpointRequestFailure(
+                        jobId, new Exception("Failed to complete checkpoint", 
cause))
+                    }
+                    null
+                  }
+                },
+                context.dispatcher)
+            } catch {
+              case e: Exception =>
+                senderRef ! CheckpointRequestFailure(jobId, new Exception(
+                  "Failed to trigger checkpoint", e))
+            }
+          } else {
+            sender() ! CheckpointRequestFailure(jobId, new 
IllegalStateException(
+              "Checkpointing disabled. You can enable it via the execution 
environment of " +
+                "your job."))
+          }
+
+        case None =>
+          sender() ! CheckpointRequestFailure(jobId, new 
IllegalArgumentException("Unknown job."))
+      }
+
     case NotifyWhenLeader =>
       if (leaderElectionService.hasLeadership) {
         sender() ! true

http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
index 48c2bfb..f79c124 100644
--- 
a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
+++ 
b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingJobManagerMessages.scala
@@ -23,10 +23,13 @@ import java.util.Map
 import akka.actor.ActorRef
 import org.apache.flink.api.common.JobID
 import org.apache.flink.api.common.accumulators.Accumulator
+import org.apache.flink.runtime.checkpoint.CheckpointOptions
 import org.apache.flink.runtime.checkpoint.savepoint.Savepoint
-import org.apache.flink.runtime.executiongraph.{AccessExecutionGraph, 
ExecutionAttemptID, ExecutionGraph}
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph
 import org.apache.flink.runtime.instance.ActorGateway
 import org.apache.flink.runtime.jobgraph.JobStatus
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID
+import org.apache.flink.runtime.messages.checkpoint.AbstractCheckpointMessage
 
 object TestingJobManagerMessages {
 
@@ -59,6 +62,40 @@ object TestingJobManagerMessages {
   case class TaskManagerTerminated(taskManager: ActorRef)
 
   /**
+    * Triggers a checkpoint for the specified job.
+    *
+    * This is not a subtype of [[AbstractCheckpointMessage]], because it is a
+    * control-flow message, which is *not* part of the checkpointing mechanism
+    * of triggering and acknowledging checkpoints.
+    *
+    * @param jobId The JobID of the job to trigger the savepoint for.
+    * @param options properties of the checkpoint
+    */
+  case class CheckpointRequest(
+    jobId: JobID,
+    options: CheckpointOptions) extends RequiresLeaderSessionID
+
+  /**
+    * Response after a successful checkpoint trigger containing the savepoint 
path.
+    *
+    * @param jobId The job ID for which the savepoint was triggered.
+    * @param path  The path of the savepoint.
+    */
+  case class CheckpointRequestSuccess(
+    jobId: JobID,
+    checkpointId: Long,
+    path: String,
+    triggerTime: Long)
+
+  /**
+    * Response after a failed checkpoint trigger containing the failure cause.
+    *
+    * @param jobId The job ID for which the savepoint was triggered.
+    * @param cause The cause of the failure.
+    */
+  case class CheckpointRequestFailure(jobId: JobID, cause: Throwable)
+
+  /**
    * Registers a listener to receive a message when accumulators changed.
    * The change must be explicitly triggered by the TestingTaskManager which 
can receive an
    * 
[[org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages.AccumulatorsChanged]]

http://git-wip-us.apache.org/repos/asf/flink/blob/8cff17fc/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
new file mode 100644
index 0000000..e341741
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ExternalizedCheckpointITCase.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.test.state.ManualWindowSpeedITCase;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+
+/**
+ * IT case for externalized checkpoints with {@link 
org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore}
+ * and {@link 
org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore}.
+ *
+ * <p>This tests considers full and incremental checkpoints and was introduced 
to guard against problems like FLINK-6964.
+ */
+public class ExternalizedCheckpointITCase {
+
+       private static final int PARALLELISM = 2;
+       private static final int NUM_TASK_MANAGERS = 2;
+       private static final int SLOTS_PER_TASK_MANAGER = 2;
+
+       @ClassRule
+       public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       @Test
+       public void testExternalizedIncrementalRocksDBCheckpointsStandalone() 
throws Exception {
+               final File checkpointDir = temporaryFolder.newFolder();
+               testExternalizedCheckpoints(
+                       checkpointDir,
+                       null,
+                       new 
RocksDBStateBackend(checkpointDir.toURI().toString(), true));
+       }
+
+       @Test
+       public void testExternalizedFullRocksDBCheckpointsStandalone() throws 
Exception {
+               final File checkpointDir = temporaryFolder.newFolder();
+               testExternalizedCheckpoints(
+                       checkpointDir,
+                       null,
+                       new 
RocksDBStateBackend(checkpointDir.toURI().toString(), false));
+       }
+
+       @Test
+       public void testExternalizedFSCheckpointsStandalone() throws Exception {
+               final File checkpointDir = temporaryFolder.newFolder();
+               testExternalizedCheckpoints(
+                       checkpointDir,
+                       null,
+                       new FsStateBackend(checkpointDir.toURI().toString(), 
true));
+
+       }
+
+       @Test
+       public void testExternalizedIncrementalRocksDBCheckpointsZookeeper() 
throws Exception {
+               TestingServer zkServer = new TestingServer();
+               zkServer.start();
+               try {
+                       final File checkpointDir = temporaryFolder.newFolder();
+                       testExternalizedCheckpoints(
+                               checkpointDir,
+                               zkServer.getConnectString(),
+                               new 
RocksDBStateBackend(checkpointDir.toURI().toString(), true));
+               } finally {
+                       zkServer.stop();
+               }
+       }
+
+       @Test
+       public void testExternalizedFullRocksDBCheckpointsZookeeper() throws 
Exception {
+               TestingServer zkServer = new TestingServer();
+               zkServer.start();
+               try {
+                       final File checkpointDir = temporaryFolder.newFolder();
+                       testExternalizedCheckpoints(
+                               checkpointDir,
+                               zkServer.getConnectString(),
+                               new 
RocksDBStateBackend(checkpointDir.toURI().toString(), false));
+               } finally {
+                       zkServer.stop();
+               }
+       }
+
+       @Test
+       public void testExternalizedFSCheckpointsZookeeper() throws Exception {
+               TestingServer zkServer = new TestingServer();
+               zkServer.start();
+               try {
+                       final File checkpointDir = temporaryFolder.newFolder();
+                       testExternalizedCheckpoints(
+                               checkpointDir,
+                               zkServer.getConnectString(),
+                               new 
FsStateBackend(checkpointDir.toURI().toString(), true));
+               } finally {
+                       zkServer.stop();
+               }
+       }
+
+       private void testExternalizedCheckpoints(
+               File checkpointDir,
+               String zooKeeperQuorum,
+               AbstractStateBackend backend) throws Exception {
+
+               final Configuration config = new Configuration();
+
+               config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_TASK_MANAGERS);
+               config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
SLOTS_PER_TASK_MANAGER);
+
+               final File savepointDir = temporaryFolder.newFolder();
+
+               
config.setString(FsStateBackendFactory.CHECKPOINT_DIRECTORY_URI_CONF_KEY, 
checkpointDir.toURI().toString());
+               config.setString(CoreOptions.SAVEPOINT_DIRECTORY, 
savepointDir.toURI().toString());
+               config.setString(CoreOptions.CHECKPOINTS_DIRECTORY, 
checkpointDir.toURI().toString());
+
+               // ZooKeeper recovery mode?
+               if (zooKeeperQuorum != null) {
+                       final File haDir = temporaryFolder.newFolder();
+                       config.setString(HighAvailabilityOptions.HA_MODE, 
"ZOOKEEPER");
+                       
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperQuorum);
+                       
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
haDir.toURI().toString());
+               }
+
+               TestingCluster cluster = new TestingCluster(config);
+               cluster.start();
+
+               String externalCheckpoint = null;
+
+               try {
+
+                       // main test sequence:  start job -> eCP -> restore job 
-> eCP -> restore job -> eCP
+                       for (int i = 0; i < 3; ++i) {
+                               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+                               env.setStateBackend(backend);
+                               
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+
+                               
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
+                               env.setParallelism(PARALLELISM);
+
+                               env.addSource(new 
ManualWindowSpeedITCase.InfiniteTupleSource(10_000))
+                                       .keyBy(0)
+                                       .timeWindow(Time.seconds(3))
+                                       .reduce(new 
ReduceFunction<Tuple2<String, Integer>>() {
+                                               private static final long 
serialVersionUID = 1L;
+
+                                               @Override
+                                               public Tuple2<String, Integer> 
reduce(
+                                                       Tuple2<String, Integer> 
value1,
+                                                       Tuple2<String, Integer> 
value2) throws Exception {
+                                                       return 
Tuple2.of(value1.f0, value1.f1 + value2.f1);
+                                               }
+                                       })
+                                       .filter(new 
FilterFunction<Tuple2<String, Integer>>() {
+                                               private static final long 
serialVersionUID = 1L;
+
+                                               @Override
+                                               public boolean 
filter(Tuple2<String, Integer> value) throws Exception {
+                                                       return 
value.f0.startsWith("Tuple 0");
+                                               }
+                                       });
+
+                               StreamGraph streamGraph = env.getStreamGraph();
+                               streamGraph.setJobName("Test");
+
+                               JobGraph jobGraph = streamGraph.getJobGraph();
+
+                               // recover from previous iteration?
+                               if (externalCheckpoint != null) {
+                                       
jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(externalCheckpoint));
+                               }
+
+                               config.addAll(jobGraph.getJobConfiguration());
+                               JobSubmissionResult submissionResult = 
cluster.submitJobDetached(jobGraph);
+
+                               // let the job do some progress
+                               Thread.sleep(200);
+
+                               externalCheckpoint =
+                                       
cluster.requestCheckpoint(submissionResult.getJobID(), 
CheckpointOptions.forFullCheckpoint());
+
+                               cluster.cancelJob(submissionResult.getJobID());
+                       }
+               } finally {
+                       cluster.stop();
+                       cluster.awaitTermination();
+               }
+       }
+}

Reply via email to