[FLINK-7268] [checkpoints] Scope SharedStateRegistry objects per (re)start

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/91a4b276
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/91a4b276
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/91a4b276

Branch: refs/heads/master
Commit: 91a4b276171afb760bfff9ccf30593e648e91dfb
Parents: b71154a
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Tue Jul 25 12:04:16 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Tue Aug 15 14:56:54 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |  13 +-
 .../checkpoint/CheckpointCoordinator.java       |  32 +-
 .../runtime/checkpoint/CompletedCheckpoint.java |   2 +
 .../checkpoint/CompletedCheckpointStore.java    |   5 +-
 .../StandaloneCompletedCheckpointStore.java     |   4 +-
 .../ZooKeeperCompletedCheckpointStore.java      |  12 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   6 +-
 .../executiongraph/ExecutionJobVertex.java      |   2 +-
 .../flink/runtime/jobmaster/JobMaster.java      |   2 +-
 .../state/IncrementalKeyedStateHandle.java      |  68 ++--
 .../runtime/state/KeyGroupsStateHandle.java     |   2 +-
 .../runtime/state/MultiStreamStateHandle.java   |  10 +-
 .../runtime/state/SharedStateRegistry.java      |  52 ++-
 .../state/SharedStateRegistryFactory.java       |  35 ++
 .../state/memory/ByteStreamStateHandle.java     |   1 +
 ...tCoordinatorExternalizedCheckpointsTest.java |  22 +-
 .../CheckpointCoordinatorFailureTest.java       |   7 +-
 .../CheckpointCoordinatorMasterHooksTest.java   |   7 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 341 +++++++++++++++++--
 .../checkpoint/CheckpointStateRestoreTest.java  |  10 +-
 ...ZooKeeperCompletedCheckpointStoreITCase.java |  25 +-
 .../ZooKeeperCompletedCheckpointStoreTest.java  |   7 +-
 .../state/IncrementalKeyedStateHandleTest.java  |  75 +++-
 .../RecoverableCompletedCheckpointStore.java    |  33 +-
 .../streaming/runtime/tasks/StreamTask.java     |  21 +-
 ...tractEventTimeWindowCheckpointingITCase.java |  76 +++--
 ...ckendEventTimeWindowCheckpointingITCase.java |  49 +++
 27 files changed, 743 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index bba5b55..756cfdd 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -253,7 +253,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                this.restoredKvStateMetaInfos = new HashMap<>();
                this.materializedSstFiles = new TreeMap<>();
                this.backendUID = UUID.randomUUID();
-               LOG.debug("Setting initial keyed backend uid for operator {} to 
{}.", this.operatorIdentifier, this.backendUID);
+
+               LOG.debug("Setting initial backend ID in 
RocksDBKeyedStateBackend for operator {} to {}.",
+                       this.operatorIdentifier,
+                       this.backendUID);
        }
 
        /**
@@ -883,11 +886,17 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                void takeSnapshot() throws Exception {
                        assert 
(Thread.holdsLock(stateBackend.asyncSnapshotLock));
 
+                       final long lastCompletedCheckpoint;
+
                        // use the last completed checkpoint as the comparison 
base.
                        synchronized (stateBackend.materializedSstFiles) {
-                               baseSstFiles = 
stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
+                               lastCompletedCheckpoint = 
stateBackend.lastCompletedCheckpointId;
+                               baseSstFiles = 
stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
                        }
 
+                       LOG.trace("Taking incremental snapshot for checkpoint 
{}. Snapshot is based on last completed checkpoint {} " +
+                               "assuming the following (shared) files as base: 
{}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
+
                        // save meta data
                        for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
                                        : 
stateBackend.kvStateInformation.entrySet()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 0b64a73..c98d3aa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -40,6 +40,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
@@ -174,8 +175,11 @@ public class CheckpointCoordinator {
        @Nullable
        private CheckpointStatsTracker statsTracker;
 
+       /** A factory for SharedStateRegistry objects */
+       private final SharedStateRegistryFactory sharedStateRegistryFactory;
+
        /** Registry that tracks state which is shared across (incremental) 
checkpoints */
-       private final SharedStateRegistry sharedStateRegistry;
+       private SharedStateRegistry sharedStateRegistry;
 
        // 
--------------------------------------------------------------------------------------------
 
@@ -192,7 +196,8 @@ public class CheckpointCoordinator {
                        CheckpointIDCounter checkpointIDCounter,
                        CompletedCheckpointStore completedCheckpointStore,
                        @Nullable String checkpointDirectory,
-                       Executor executor) {
+                       Executor executor,
+                       SharedStateRegistryFactory sharedStateRegistryFactory) {
 
                // sanity checks
                checkArgument(baseInterval > 0, "Checkpoint timeout must be 
larger than zero");
@@ -230,7 +235,8 @@ public class CheckpointCoordinator {
                this.completedCheckpointStore = 
checkNotNull(completedCheckpointStore);
                this.checkpointDirectory = checkpointDirectory;
                this.executor = checkNotNull(executor);
-               this.sharedStateRegistry = new SharedStateRegistry(executor);
+               this.sharedStateRegistryFactory = 
checkNotNull(sharedStateRegistryFactory);
+               this.sharedStateRegistry = 
sharedStateRegistryFactory.create(executor);
 
                this.recentPendingCheckpoints = new 
ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
                this.masterHooks = new HashMap<>();
@@ -1043,10 +1049,23 @@ public class CheckpointCoordinator {
                                throw new 
IllegalStateException("CheckpointCoordinator is shut down");
                        }
 
-                       // Recover the checkpoints
-                       completedCheckpointStore.recover(sharedStateRegistry);
+                       // We create a new shared state registry object, so 
that all pending async disposal requests from previous
+                       // runs will go against the old object (were they can 
do no harm).
+                       // This must happen under the checkpoint lock.
+                       sharedStateRegistry.close();
+                       sharedStateRegistry = 
sharedStateRegistryFactory.create(executor);
+
+                       // Recover the checkpoints, TODO this could be done 
only when there is a new leader, not on each recovery
+                       completedCheckpointStore.recover();
+
+                       // Now, we re-register all (shared) states from the 
checkpoint store with the new registry
+                       for (CompletedCheckpoint completedCheckpoint : 
completedCheckpointStore.getAllCheckpoints()) {
+                               
completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
+                       }
+
+                       LOG.debug("Status of the shared state registry after 
restore: {}.", sharedStateRegistry);
 
-                       // restore from the latest checkpoint
+                       // Restore from the latest checkpoint
                        CompletedCheckpoint latest = 
completedCheckpointStore.getLatestCheckpoint();
 
                        if (latest == null) {
@@ -1120,7 +1139,6 @@ public class CheckpointCoordinator {
                CompletedCheckpoint savepoint = 
SavepointLoader.loadAndValidateSavepoint(
                                job, tasks, savepointPath, userClassLoader, 
allowNonRestored);
 
-               
savepoint.registerSharedStatesAfterRestored(sharedStateRegistry);
                completedCheckpointStore.addCheckpoint(savepoint);
                
                // Reset the checkpoint ID counter

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 7c3edee..d3f61e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -209,6 +209,8 @@ public class CompletedCheckpoint implements Serializable {
 
        private void doDiscard() throws Exception {
 
+               LOG.trace("Executing discard procedure for {}.", this);
+
                try {
                        // collect exceptions and continue cleanup
                        Exception exception = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index 45d407e..82193b5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.state.SharedStateRegistry;
 
 import java.util.List;
 
@@ -33,10 +32,8 @@ public interface CompletedCheckpointStore {
         *
         * <p>After a call to this method, {@link #getLatestCheckpoint()} 
returns the latest
         * available checkpoint.
-        *
-        * @param sharedStateRegistry the shared state registry to register 
recovered states.
         */
-       void recover(SharedStateRegistry sharedStateRegistry) throws Exception;
+       void recover() throws Exception;
 
        /**
         * Adds a {@link CompletedCheckpoint} instance to the list of completed 
checkpoints.

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index fbb0198..63e7468 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.state.SharedStateRegistry;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,7 +57,7 @@ public class StandaloneCompletedCheckpointStore implements 
CompletedCheckpointSt
        }
 
        @Override
-       public void recover(SharedStateRegistry sharedStateRegistry) throws 
Exception {
+       public void recover() throws Exception {
                // Nothing to do
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index c4cb6bc..88dd0d4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -18,20 +18,21 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.FlinkException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -138,14 +139,13 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
         * that the history of checkpoints is consistent.
         */
        @Override
-       public void recover(SharedStateRegistry sharedStateRegistry) throws 
Exception {
+       public void recover() throws Exception {
                LOG.info("Recovering checkpoints from ZooKeeper.");
 
                // Clear local handles in order to prevent duplicates on
                // recovery. The local handles should reflect the state
                // of ZooKeeper.
                completedCheckpoints.clear();
-               sharedStateRegistry.clear();
 
                // Get all there is first
                List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String>> initialCheckpoints;
@@ -170,8 +170,6 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
                        try {
                                completedCheckpoint = 
retrieveCompletedCheckpoint(checkpointStateHandle);
                                if (completedCheckpoint != null) {
-                                       // Re-register all shared states in the 
checkpoint.
-                                       
completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
                                        
completedCheckpoints.add(completedCheckpoint);
                                }
                        } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 139f484..2e5f3d1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -61,6 +61,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.SerializedThrowable;
@@ -69,8 +70,8 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
-
 import org.apache.flink.util.StringUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -456,7 +457,8 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        checkpointIDCounter,
                        checkpointStore,
                        checkpointDir,
-                       ioExecutor);
+                       ioExecutor,
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                // register the master hooks on the checkpoint coordinator
                for (MasterTriggerRestoreHook<?> hook : masterHooks) {

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
index 5ee7a9f..e6d49d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.core.io.InputSplitSource;
@@ -39,7 +40,6 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 25df19b..d6019db 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -89,9 +89,9 @@ import 
org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.util.SerializedThrowable;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedThrowable;
 
 import org.slf4j.Logger;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
index 0085890..0268b10 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
@@ -65,27 +65,27 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
        private final UUID backendIdentifier;
 
        /**
-        * The key-group range covered by this state handle
+        * The key-group range covered by this state handle.
         */
        private final KeyGroupRange keyGroupRange;
 
        /**
-        * The checkpoint Id
+        * The checkpoint Id.
         */
        private final long checkpointId;
 
        /**
-        * Shared state in the incremental checkpoint. This i
+        * Shared state in the incremental checkpoint.
         */
        private final Map<StateHandleID, StreamStateHandle> sharedState;
 
        /**
-        * Private state in the incremental checkpoint
+        * Private state in the incremental checkpoint.
         */
        private final Map<StateHandleID, StreamStateHandle> privateState;
 
        /**
-        * Primary meta data state of the incremental checkpoint
+        * Primary meta data state of the incremental checkpoint.
         */
        private final StreamStateHandle metaStateHandle;
 
@@ -143,16 +143,21 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
 
        @Override
        public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
-               if (this.keyGroupRange.getIntersection(keyGroupRange) != 
KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
-                       return this;
-               } else {
-                       return null;
-               }
+               return 
KeyGroupRange.EMPTY_KEY_GROUP_RANGE.equals(this.keyGroupRange.getIntersection(keyGroupRange))
 ?
+                       null : this;
        }
 
        @Override
        public void discardState() throws Exception {
 
+               SharedStateRegistry registry = this.sharedStateRegistry;
+               final boolean isRegistered = (registry != null);
+
+               LOG.trace("Discarding IncrementalKeyedStateHandle (registered = 
{}) for checkpoint {} from backend with id {}.",
+                       isRegistered,
+                       checkpointId,
+                       backendIdentifier);
+
                try {
                        metaStateHandle.discardState();
                } catch (Exception e) {
@@ -168,19 +173,20 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
                // If this was not registered, we can delete the shared state. 
We can simply apply this
                // to all handles, because all handles that have not been 
created for the first time for this
                // are only placeholders at this point (disposing them is a 
NOP).
-               if (sharedStateRegistry == null) {
-                       try {
-                               
StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
-                       } catch (Exception e) {
-                               LOG.warn("Could not properly discard new sst 
file states.", e);
-                       }
-               } else {
+               if (isRegistered) {
                        // If this was registered, we only unregister all our 
referenced shared states
                        // from the registry.
                        for (StateHandleID stateHandleID : 
sharedState.keySet()) {
-                               sharedStateRegistry.unregisterReference(
+                               registry.unregisterReference(
                                        
createSharedStateRegistryKeyFromFileName(stateHandleID));
                        }
+               } else {
+                       // Otherwise, we assume to own those handles and 
dispose them directly.
+                       try {
+                               
StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
+                       } catch (Exception e) {
+                               LOG.warn("Could not properly discard new sst 
file states.", e);
+                       }
                }
        }
 
@@ -202,10 +208,21 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
        @Override
        public void registerSharedStates(SharedStateRegistry stateRegistry) {
 
-               Preconditions.checkState(sharedStateRegistry == null, "The 
state handle has already registered its shared states.");
+               // This is a quick check to avoid that we register twice with 
the same registry. However, the code allows to
+               // register again with a different registry. The implication is 
that ownership is transferred to this new
+               // registry. This should only happen in case of a restart, when 
the CheckpointCoordinator creates a new
+               // SharedStateRegistry for the current attempt and the old 
registry becomes meaningless. We also assume that
+               // an old registry object from a previous run is due to be GCed 
and will never be used for registration again.
+               Preconditions.checkState(
+                       sharedStateRegistry != stateRegistry,
+                       "The state handle has already registered its shared 
states to the given registry.");
 
                sharedStateRegistry = Preconditions.checkNotNull(stateRegistry);
 
+               LOG.trace("Registering IncrementalKeyedStateHandle for 
checkpoint {} from backend with id {}.",
+                       checkpointId,
+                       backendIdentifier);
+
                for (Map.Entry<StateHandleID, StreamStateHandle> 
sharedStateHandle : sharedState.entrySet()) {
                        SharedStateRegistryKey registryKey =
                                
createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey());
@@ -284,5 +301,18 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
                result = 31 * result + getMetaStateHandle().hashCode();
                return result;
        }
+
+       @Override
+       public String toString() {
+               return "IncrementalKeyedStateHandle{" +
+                       "backendIdentifier=" + backendIdentifier +
+                       ", keyGroupRange=" + keyGroupRange +
+                       ", checkpointId=" + checkpointId +
+                       ", sharedState=" + sharedState +
+                       ", privateState=" + privateState +
+                       ", metaStateHandle=" + metaStateHandle +
+                       ", registered=" + (sharedStateRegistry != null) +
+                       '}';
+       }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index 8e38ad4..8092f6c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -141,7 +141,7 @@ public class KeyGroupsStateHandle implements 
StreamStateHandle, KeyedStateHandle
        public String toString() {
                return "KeyGroupsStateHandle{" +
                                "groupRangeOffsets=" + groupRangeOffsets +
-                               ", data=" + stateHandle +
+                               ", stateHandle=" + stateHandle +
                                '}';
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
index b95dace..1960c1c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
@@ -38,7 +38,7 @@ public class MultiStreamStateHandle implements 
StreamStateHandle {
        private final List<StreamStateHandle> stateHandles;
        private final long stateSize;
 
-       public MultiStreamStateHandle(List<StreamStateHandle> stateHandles) 
throws IOException {
+       public MultiStreamStateHandle(List<StreamStateHandle> stateHandles) {
                this.stateHandles = Preconditions.checkNotNull(stateHandles);
                long calculateSize = 0L;
                for(StreamStateHandle stateHandle : stateHandles) {
@@ -62,6 +62,14 @@ public class MultiStreamStateHandle implements 
StreamStateHandle {
                return stateSize;
        }
 
+       @Override
+       public String toString() {
+               return "MultiStreamStateHandle{" +
+                       "stateHandles=" + stateHandles +
+                       ", stateSize=" + stateSize +
+                       '}';
+       }
+
        static final class MultiFSDataInputStream extends 
AbstractMultiFSDataInputStream {
 
                private final TreeMap<Long, StreamStateHandle> stateHandleMap;

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index e0ca873..347f30c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -38,13 +38,24 @@ import java.util.concurrent.Executor;
  * maintain the reference count of {@link StreamStateHandle}s by a key that 
(logically) identifies
  * them.
  */
-public class SharedStateRegistry {
+public class SharedStateRegistry implements AutoCloseable {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(SharedStateRegistry.class);
 
+       /** A singleton object for the default implementation of a {@link 
SharedStateRegistryFactory} */
+       public static final SharedStateRegistryFactory DEFAULT_FACTORY = new 
SharedStateRegistryFactory() {
+               @Override
+               public SharedStateRegistry create(Executor deleteExecutor) {
+                       return new SharedStateRegistry(deleteExecutor);
+               }
+       };
+
        /** All registered state objects by an artificial key */
        private final Map<SharedStateRegistryKey, 
SharedStateRegistry.SharedStateEntry> registeredStates;
 
+       /** This flag indicates whether or not the registry is open or if 
close() was called */
+       private boolean open;
+
        /** Executor for async state deletion */
        private final Executor asyncDisposalExecutor;
 
@@ -56,6 +67,7 @@ public class SharedStateRegistry {
        public SharedStateRegistry(Executor asyncDisposalExecutor) {
                this.registeredStates = new HashMap<>();
                this.asyncDisposalExecutor = 
Preconditions.checkNotNull(asyncDisposalExecutor);
+               this.open = true;
        }
 
        /**
@@ -82,6 +94,9 @@ public class SharedStateRegistry {
                SharedStateRegistry.SharedStateEntry entry;
 
                synchronized (registeredStates) {
+
+                       Preconditions.checkState(open, "Attempt to register 
state to closed SharedStateRegistry.");
+
                        entry = registeredStates.get(registrationKey);
 
                        if (entry == null) {
@@ -96,6 +111,11 @@ public class SharedStateRegistry {
                                // delete if this is a real duplicate
                                if (!Objects.equals(state, entry.stateHandle)) {
                                        scheduledStateDeletion = state;
+                                       LOG.trace("Identified duplicate state 
registration under key {}. New state {} was determined to " +
+                                                       "be an unnecessary copy 
of existing state {} and will be dropped.",
+                                               registrationKey,
+                                               state,
+                                               entry.stateHandle);
                                }
                                entry.increaseReferenceCount();
                        }
@@ -112,7 +132,8 @@ public class SharedStateRegistry {
         *
         * @param registrationKey the shared state for which we release a 
reference.
         * @return the result of the request, consisting of the reference count 
after this operation
-        * and the state handle, or null if the state handle was deleted 
through this request.
+        * and the state handle, or null if the state handle was deleted 
through this request. Returns null if the registry
+        * was previously closed.
         */
        public Result unregisterReference(SharedStateRegistryKey 
registrationKey) {
 
@@ -123,6 +144,7 @@ public class SharedStateRegistry {
                SharedStateRegistry.SharedStateEntry entry;
 
                synchronized (registeredStates) {
+
                        entry = registeredStates.get(registrationKey);
 
                        Preconditions.checkState(entry != null,
@@ -164,10 +186,18 @@ public class SharedStateRegistry {
                }
        }
 
+       @Override
+       public String toString() {
+               synchronized (registeredStates) {
+                       return "SharedStateRegistry{" +
+                               "registeredStates=" + registeredStates +
+                               '}';
+               }
+       }
+
        private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
                // We do the small optimization to not issue discards for 
placeholders, which are NOPs.
                if (streamStateHandle != null && 
!isPlaceholder(streamStateHandle)) {
-
                        LOG.trace("Scheduled delete of state handle {}.", 
streamStateHandle);
                        asyncDisposalExecutor.execute(
                                new 
SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle));
@@ -178,6 +208,13 @@ public class SharedStateRegistry {
                return stateHandle instanceof PlaceholderStreamStateHandle;
        }
 
+       @Override
+       public void close() {
+               synchronized (registeredStates) {
+                       open = false;
+               }
+       }
+
        /**
         * An entry in the registry, tracking the handle and the corresponding 
reference count.
         */
@@ -279,13 +316,4 @@ public class SharedStateRegistry {
                        }
                }
        }
-
-       /**
-        * Clears the registry.
-        */
-       public void clear() {
-               synchronized (registeredStates) {
-                       registeredStates.clear();
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
new file mode 100644
index 0000000..05c9825
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Simple factory to produce {@link SharedStateRegistry} objects.
+ */
+public interface SharedStateRegistryFactory {
+
+       /**
+        * Factory method for {@link SharedStateRegistry}.
+        *
+        * @param deleteExecutor executor used to run (async) deletes.
+        * @return a SharedStateRegistry object
+        */
+       SharedStateRegistry create(Executor deleteExecutor);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
index 9ba9d35..3a43d4f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
@@ -95,6 +95,7 @@ public class ByteStreamStateHandle implements 
StreamStateHandle {
        public String toString() {
                return "ByteStreamStateHandle{" +
                        "handleName='" + handleName + '\'' +
+                       ", dataBytes=" + data.length +
                        '}';
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
index d293eea..edc29fe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
 import org.apache.flink.runtime.concurrent.Executors;
@@ -37,11 +29,22 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 /**
  * CheckpointCoordinator tests for externalized checkpoints.
  *
@@ -91,7 +94,8 @@ public class CheckpointCoordinatorExternalizedCheckpointsTest 
{
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1),
                        checkpointDir.getAbsolutePath(),
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
                assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 88b95f5..26db772 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -79,7 +79,8 @@ public class CheckpointCoordinatorFailureTest extends 
TestLogger {
                        new StandaloneCheckpointIDCounter(),
                        new FailingCompletedCheckpointStore(),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                coord.triggerCheckpoint(triggerTimestamp, false);
 
@@ -111,7 +112,7 @@ public class CheckpointCoordinatorFailureTest extends 
TestLogger {
                
when(subtaskState.getSubtaskStateByOperatorID(OperatorID.fromJobVertexID(vertex.getJobvertexId()))).thenReturn(operatorSubtaskState);
 
                AcknowledgeCheckpoint acknowledgeMessage = new 
AcknowledgeCheckpoint(jid, executionAttemptId, checkpointId, new 
CheckpointMetrics(), subtaskState);
-               
+
                try {
                        coord.receiveAcknowledgeMessage(acknowledgeMessage);
                        fail("Expected a checkpoint exception because the 
completed checkpoint store could not " +
@@ -135,7 +136,7 @@ public class CheckpointCoordinatorFailureTest extends 
TestLogger {
        private static final class FailingCompletedCheckpointStore implements 
CompletedCheckpointStore {
 
                @Override
-               public void recover(SharedStateRegistry sharedStateRegistry) 
throws Exception {
+               public void recover() throws Exception {
                        throw new UnsupportedOperationException("Not 
implemented.");
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index e23f6a2..2f860e0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -28,9 +28,9 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 
 import org.junit.Test;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -46,14 +46,12 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 import static 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.mockExecutionVertex;
-
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.any;
@@ -404,7 +402,8 @@ public class CheckpointCoordinatorMasterHooksTest {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(10),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
        }
 
        private static <T> T mockGeneric(Class<?> clazz) {

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index d9af879..45cbbc3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -36,32 +36,36 @@ import 
org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistryFactory;
+import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
 import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
-import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+import org.mockito.verification.VerificationMode;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -139,7 +143,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(1),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        // nothing should be happening
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -199,7 +204,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(1),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        // nothing should be happening
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -250,7 +256,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(1),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        // nothing should be happening
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -302,7 +309,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(1),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -406,7 +414,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(1),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -525,7 +534,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(1),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -692,7 +702,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(2),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -822,7 +833,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(10),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -986,7 +998,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(2),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        // trigger a checkpoint, partially acknowledged
                        assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -1063,7 +1076,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(2),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -1126,7 +1140,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -1258,7 +1273,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(2),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
 
                        coord.startCheckpointScheduler();
@@ -1350,7 +1366,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(2),
                                "dummy-path",
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                try {
                        coord.startCheckpointScheduler();
@@ -1423,7 +1440,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
                assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1574,7 +1592,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        counter,
                        new StandaloneCompletedCheckpointStore(10),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
@@ -1680,7 +1699,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(2),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        coord.startCheckpointScheduler();
 
@@ -1753,7 +1773,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(2),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        coord.startCheckpointScheduler();
 
@@ -1835,7 +1856,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(2),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        coord.startCheckpointScheduler();
 
@@ -1887,7 +1909,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        checkpointIDCounter,
                        new StandaloneCompletedCheckpointStore(2),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                List<CompletableFuture<CompletedCheckpoint>> savepointFutures = 
new ArrayList<>();
 
@@ -1940,7 +1963,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(2),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
@@ -2002,7 +2026,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new StandaloneCheckpointIDCounter(),
                        store,
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                // trigger the checkpoint
                coord.triggerCheckpoint(timestamp, false);
@@ -2116,7 +2141,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                // trigger the checkpoint
                coord.triggerCheckpoint(timestamp, false);
@@ -2237,7 +2263,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                // trigger the checkpoint
                coord.triggerCheckpoint(timestamp, false);
@@ -2395,7 +2422,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                // trigger the checkpoint
                coord.triggerCheckpoint(timestamp, false);
@@ -2686,7 +2714,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new StandaloneCheckpointIDCounter(),
                        standaloneCompletedCheckpointStore,
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                coord.restoreLatestCheckpointedState(tasks, false, true);
 
@@ -2847,7 +2876,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(1),
                                "fake-directory",
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -3351,7 +3381,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                // Periodic
                CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(
@@ -3529,7 +3560,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                CheckpointStatsTracker tracker = 
mock(CheckpointStatsTracker.class);
                coord.setCheckpointStatsTracker(tracker);
@@ -3567,7 +3599,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new StandaloneCheckpointIDCounter(),
                        store,
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                store.addCheckpoint(new CompletedCheckpoint(
                        new JobID(),
@@ -3623,7 +3656,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        checkpointIDCounter,
                        completedCheckpointStore,
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                // trigger a first checkpoint
                assertTrue(
@@ -3673,4 +3707,245 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                        "The latest completed (proper) checkpoint should have 
been added to the completed checkpoint store.",
                        
completedCheckpointStore.getLatestCheckpoint().getCheckpointID() == 
checkpointIDCounter.getLast());
        }
+
+       @Test
+       public void testSharedStateRegistrationOnRestore() throws Exception {
+
+               final JobID jid = new JobID();
+               final long timestamp = System.currentTimeMillis();
+
+               final JobVertexID jobVertexID1 = new JobVertexID();
+
+               int parallelism1 = 2;
+               int maxParallelism1 = 4;
+
+               final ExecutionJobVertex jobVertex1 = mockExecutionJobVertex(
+                       jobVertexID1,
+                       parallelism1,
+                       maxParallelism1);
+
+               List<ExecutionVertex> allExecutionVertices = new 
ArrayList<>(parallelism1);
+
+               
allExecutionVertices.addAll(Arrays.asList(jobVertex1.getTaskVertices()));
+
+               ExecutionVertex[] arrayExecutionVertices =
+                       allExecutionVertices.toArray(new 
ExecutionVertex[allExecutionVertices.size()]);
+
+               RecoverableCompletedCheckpointStore store = new 
RecoverableCompletedCheckpointStore(10);
+
+               final List<SharedStateRegistry> createdSharedStateRegistries = 
new ArrayList<>(2);
+
+               // set up the coordinator and validate the initial state
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       jid,
+                       600000,
+                       600000,
+                       0,
+                       Integer.MAX_VALUE,
+                       ExternalizedCheckpointSettings.none(),
+                       arrayExecutionVertices,
+                       arrayExecutionVertices,
+                       arrayExecutionVertices,
+                       new StandaloneCheckpointIDCounter(),
+                       store,
+                       null,
+                       Executors.directExecutor(),
+                       new SharedStateRegistryFactory() {
+                               @Override
+                               public SharedStateRegistry create(Executor 
deleteExecutor) {
+                                       SharedStateRegistry instance = new 
SharedStateRegistry(deleteExecutor);
+                                       
createdSharedStateRegistries.add(instance);
+                                       return instance;
+                               }
+                       });
+
+               final int numCheckpoints = 3;
+
+               List<KeyGroupRange> keyGroupPartitions1 =
+                       
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, 
parallelism1);
+
+               for (int i = 0; i < numCheckpoints; ++i) {
+                       performIncrementalCheckpoint(jid, coord, jobVertex1, 
keyGroupPartitions1, timestamp + i, i);
+               }
+
+               List<CompletedCheckpoint> completedCheckpoints = 
coord.getSuccessfulCheckpoints();
+               assertEquals(numCheckpoints, completedCheckpoints.size());
+
+               int sharedHandleCount = 0;
+
+               List<Map<StateHandleID, StreamStateHandle>> 
sharedHandlesByCheckpoint = new ArrayList<>(numCheckpoints);
+
+               for (int i = 0; i < numCheckpoints; ++i) {
+                       sharedHandlesByCheckpoint.add(new 
HashMap<StateHandleID, StreamStateHandle>(2));
+               }
+
+               int cp = 0;
+               for (CompletedCheckpoint completedCheckpoint : 
completedCheckpoints) {
+                       for (OperatorState taskState : 
completedCheckpoint.getOperatorStates().values()) {
+                               for (OperatorSubtaskState subtaskState : 
taskState.getStates()) {
+                                       for (KeyedStateHandle keyedStateHandle 
: subtaskState.getManagedKeyedState()) {
+                                               // test we are once registered 
with the current registry
+                                               verify(keyedStateHandle, 
times(1)).registerSharedStates(createdSharedStateRegistries.get(0));
+                                               IncrementalKeyedStateHandle 
incrementalKeyedStateHandle = (IncrementalKeyedStateHandle) keyedStateHandle;
+
+                                               
sharedHandlesByCheckpoint.get(cp).putAll(incrementalKeyedStateHandle.getSharedState());
+
+                                               for (StreamStateHandle 
streamStateHandle : incrementalKeyedStateHandle.getSharedState().values()) {
+                                                       
assertTrue(!(streamStateHandle instanceof PlaceholderStreamStateHandle));
+                                                       
verify(streamStateHandle, never()).discardState();
+                                                       ++sharedHandleCount;
+                                               }
+
+                                               for (StreamStateHandle 
streamStateHandle : incrementalKeyedStateHandle.getPrivateState().values()) {
+                                                       
verify(streamStateHandle, never()).discardState();
+                                               }
+
+                                               
verify(incrementalKeyedStateHandle.getMetaStateHandle(), 
never()).discardState();
+                                       }
+
+                                       verify(subtaskState, 
never()).discardState();
+                               }
+                       }
+                       ++cp;
+               }
+
+               // 2 (parallelism) x (1 (CP0) + 2 (CP1) + 2 (CP2)) = 10
+               assertEquals(10, sharedHandleCount);
+
+               // discard CP0
+               store.removeOldestCheckpoint();
+
+               // we expect no shared state was discarded because the state of 
CP0 is still referenced by CP1
+               for (Map<StateHandleID, StreamStateHandle> cpList : 
sharedHandlesByCheckpoint) {
+                       for (StreamStateHandle streamStateHandle : 
cpList.values()) {
+                               verify(streamStateHandle, 
never()).discardState();
+                       }
+               }
+
+               // shutdown the store
+               store.shutdown(JobStatus.SUSPENDED);
+
+               // restore the store
+               Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
+               tasks.put(jobVertexID1, jobVertex1);
+               coord.restoreLatestCheckpointedState(tasks, true, false);
+
+               // validate that all shared states are registered again after 
the recovery.
+               cp = 0;
+               for (CompletedCheckpoint completedCheckpoint : 
completedCheckpoints) {
+                       for (OperatorState taskState : 
completedCheckpoint.getOperatorStates().values()) {
+                               for (OperatorSubtaskState subtaskState : 
taskState.getStates()) {
+                                       for (KeyedStateHandle keyedStateHandle 
: subtaskState.getManagedKeyedState()) {
+                                               VerificationMode 
verificationMode;
+                                               // test we are once registered 
with the new registry
+                                               if (cp > 0) {
+                                                       verificationMode = 
times(1);
+                                               } else {
+                                                       verificationMode = 
never();
+                                               }
+
+                                               //check that all are registered 
with the new registry
+                                               verify(keyedStateHandle, 
verificationMode).registerSharedStates(createdSharedStateRegistries.get(1));
+                                       }
+                               }
+                       }
+                       ++cp;
+               }
+
+               // discard CP1
+               store.removeOldestCheckpoint();
+
+               // we expect that all shared state from CP0 is no longer 
referenced and discarded. CP2 is still live and also
+               // references the state from CP1, so we expect they are not 
discarded.
+               for (Map<StateHandleID, StreamStateHandle> cpList : 
sharedHandlesByCheckpoint) {
+                       for (Map.Entry<StateHandleID, StreamStateHandle> entry 
: cpList.entrySet()) {
+                               String key = entry.getKey().getKeyString();
+                               int belongToCP = 
Integer.parseInt(String.valueOf(key.charAt(key.length() - 1)));
+                               if (belongToCP == 0) {
+                                       verify(entry.getValue(), 
times(1)).discardState();
+                               } else {
+                                       verify(entry.getValue(), 
never()).discardState();
+                               }
+                       }
+               }
+
+               // discard CP2
+               store.removeOldestCheckpoint();
+
+               // we expect all shared state was discarded now, because all 
CPs are
+               for (Map<StateHandleID, StreamStateHandle> cpList : 
sharedHandlesByCheckpoint) {
+                       for (StreamStateHandle streamStateHandle : 
cpList.values()) {
+                               verify(streamStateHandle, 
times(1)).discardState();
+                       }
+               }
+       }
+
+       private void performIncrementalCheckpoint(
+               JobID jid,
+               CheckpointCoordinator coord,
+               ExecutionJobVertex jobVertex1,
+               List<KeyGroupRange> keyGroupPartitions1,
+               long timestamp,
+               int cpSequenceNumber) throws Exception {
+
+               // trigger the checkpoint
+               coord.triggerCheckpoint(timestamp, false);
+
+               assertTrue(coord.getPendingCheckpoints().keySet().size() == 1);
+               long checkpointId = 
Iterables.getOnlyElement(coord.getPendingCheckpoints().keySet());
+
+               for (int index = 0; index < jobVertex1.getParallelism(); 
index++) {
+
+                       KeyGroupRange keyGroupRange = 
keyGroupPartitions1.get(index);
+
+                       Map<StateHandleID, StreamStateHandle> privateState = 
new HashMap<>();
+                       privateState.put(
+                               new StateHandleID("private-1"),
+                               spy(new ByteStreamStateHandle("private-1", new 
byte[]{'p'})));
+
+                       Map<StateHandleID, StreamStateHandle> sharedState = new 
HashMap<>();
+
+                       // let all but the first CP overlap by one shared state.
+                       if (cpSequenceNumber > 0) {
+                               sharedState.put(
+                                       new StateHandleID("shared-" + 
(cpSequenceNumber - 1)),
+                                       spy(new 
PlaceholderStreamStateHandle()));
+                       }
+
+                       sharedState.put(
+                               new StateHandleID("shared-" + cpSequenceNumber),
+                               spy(new ByteStreamStateHandle("shared-" + 
cpSequenceNumber + "-" + keyGroupRange, new byte[]{'s'})));
+
+                       IncrementalKeyedStateHandle managedState =
+                               spy(new IncrementalKeyedStateHandle(
+                                       new UUID(42L, 42L),
+                                       keyGroupRange,
+                                       checkpointId,
+                                       sharedState,
+                                       privateState,
+                                       spy(new ByteStreamStateHandle("meta", 
new byte[]{'m'}))));
+
+                       OperatorSubtaskState operatorSubtaskState =
+                               spy(new OperatorSubtaskState(null,
+                                       
Collections.<OperatorStateHandle>emptyList(),
+                                       
Collections.<OperatorStateHandle>emptyList(),
+                                       
Collections.<KeyedStateHandle>singletonList(managedState),
+                                       
Collections.<KeyedStateHandle>emptyList()));
+
+                       Map<OperatorID, OperatorSubtaskState> opStates = new 
HashMap<>();
+
+                       opStates.put(jobVertex1.getOperatorIDs().get(0), 
operatorSubtaskState);
+
+                       TaskStateSnapshot taskStateSnapshot = new 
TaskStateSnapshot(opStates);
+
+                       AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
+                               jid,
+                               
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+                               checkpointId,
+                               new CheckpointMetrics(),
+                               taskStateSnapshot);
+
+                       coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 6ce071b..791bffa 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.util.SerializableObject;
 
@@ -109,7 +110,8 @@ public class CheckpointStateRestoreTest {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(1),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        // create ourselves a checkpoint with state
                        final long timestamp = 34623786L;
@@ -186,7 +188,8 @@ public class CheckpointStateRestoreTest {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(1),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        try {
                                coord.restoreLatestCheckpointedState(new 
HashMap<JobVertexID, ExecutionJobVertex>(), true, false);
@@ -243,7 +246,8 @@ public class CheckpointStateRestoreTest {
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                StreamStateHandle serializedState = CheckpointCoordinatorTest
                                .generateChainedStateHandle(new 
SerializableObject())

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 77423c2..dc2b11e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -106,8 +107,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
 
                // Recover
-               sharedStateRegistry.clear();
-               checkpoints.recover(sharedStateRegistry);
+               sharedStateRegistry.close();
+               sharedStateRegistry = new SharedStateRegistry();
+               checkpoints.recover();
 
                assertEquals(3, 
ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
                assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
@@ -148,8 +150,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                assertEquals(0, store.getNumberOfRetainedCheckpoints());
                assertNull(client.checkExists().forPath(CHECKPOINT_PATH + 
ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
-               sharedStateRegistry.clear();
-               store.recover(sharedStateRegistry);
+               sharedStateRegistry.close();
+               store.recover();
 
                assertEquals(0, store.getNumberOfRetainedCheckpoints());
        }
@@ -182,8 +184,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                assertEquals("The checkpoint node should not be locked.", 0, 
stat.getNumChildren());
 
                // Recover again
-               sharedStateRegistry.clear();
-               store.recover(sharedStateRegistry);
+               sharedStateRegistry.close();
+               store.recover();
 
                CompletedCheckpoint recovered = store.getLatestCheckpoint();
                assertEquals(checkpoint, recovered);
@@ -209,8 +211,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                        checkpointStore.addCheckpoint(checkpoint);
                }
 
-               sharedStateRegistry.clear();
-               checkpointStore.recover(sharedStateRegistry);
+               sharedStateRegistry.close();
+               checkpointStore.recover();
 
                CompletedCheckpoint latestCheckpoint = 
checkpointStore.getLatestCheckpoint();
 
@@ -239,8 +241,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                zkCheckpointStore1.addCheckpoint(completedCheckpoint);
 
                // recover the checkpoint by a different checkpoint store
-               sharedStateRegistry.clear();
-               zkCheckpointStore2.recover(sharedStateRegistry);
+               sharedStateRegistry.close();
+               sharedStateRegistry = new SharedStateRegistry();
+               zkCheckpointStore2.recover();
 
                CompletedCheckpoint recoveredCheckpoint = 
zkCheckpointStore2.getLatestCheckpoint();
                assertTrue(recoveredCheckpoint instanceof 
TestCompletedCheckpoint);

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 91bab85..3171f1f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -52,7 +52,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyCollection;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -162,11 +161,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends 
TestLogger {
                        stateStorage,
                        Executors.directExecutor());
 
-               SharedStateRegistry sharedStateRegistry = spy(new 
SharedStateRegistry());
-               zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry);
-
-               verify(retrievableStateHandle1.retrieveState(), 
times(1)).registerSharedStatesAfterRestored(sharedStateRegistry);
-               verify(retrievableStateHandle2.retrieveState(), 
times(1)).registerSharedStatesAfterRestored(sharedStateRegistry);
+               zooKeeperCompletedCheckpointStore.recover();
 
                CompletedCheckpoint latestCompletedCheckpoint = 
zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
index c1b3ccd..9f6f88e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
@@ -19,12 +19,15 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.runtime.checkpoint.savepoint.CheckpointTestUtils;
+
 import org.junit.Test;
 
 import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.powermock.api.mockito.PowerMockito.spy;
@@ -59,8 +62,6 @@ public class IncrementalKeyedStateHandleTest {
        @Test
        public void testSharedStateDeRegistration() throws Exception {
 
-               Random rnd = new Random(42);
-
                SharedStateRegistry registry = spy(new SharedStateRegistry());
 
                // Create two state handles with overlapping shared state
@@ -186,6 +187,76 @@ public class IncrementalKeyedStateHandleTest {
                verify(stateHandle2.getMetaStateHandle(), 
times(1)).discardState();
        }
 
+       /**
+        * This tests that re-registration of shared state with another 
registry works as expected. This simulates a
+        * recovery from a checkpoint, when the checkpoint coordinator creates 
a new shared state registry and re-registers
+        * all live checkpoint states.
+        */
+       @Test
+       public void testSharedStateReRegistration() throws Exception {
+
+               SharedStateRegistry stateRegistryA = spy(new 
SharedStateRegistry());
+
+               IncrementalKeyedStateHandle stateHandleX = create(new 
Random(1));
+               IncrementalKeyedStateHandle stateHandleY = create(new 
Random(2));
+               IncrementalKeyedStateHandle stateHandleZ = create(new 
Random(3));
+
+               // Now we register first time ...
+               stateHandleX.registerSharedStates(stateRegistryA);
+               stateHandleY.registerSharedStates(stateRegistryA);
+               stateHandleZ.registerSharedStates(stateRegistryA);
+
+               try {
+                       // Second attempt should fail
+                       stateHandleX.registerSharedStates(stateRegistryA);
+                       fail("Should not be able to register twice with the 
same registry.");
+               } catch (IllegalStateException ignore) {
+               }
+
+               // Everything should be discarded for this handle
+               stateHandleZ.discardState();
+               verify(stateHandleZ.getMetaStateHandle(), 
times(1)).discardState();
+               for (StreamStateHandle stateHandle : 
stateHandleZ.getSharedState().values()) {
+                       verify(stateHandle, times(1)).discardState();
+               }
+
+               // Close the first registry
+               stateRegistryA.close();
+
+               // Attempt to register to closed registry should trigger 
exception
+               try {
+                       create(new 
Random(4)).registerSharedStates(stateRegistryA);
+                       fail("Should not be able to register new state to 
closed registry.");
+               } catch (IllegalStateException ignore) {
+               }
+
+               // All state should still get discarded
+               stateHandleY.discardState();
+               verify(stateHandleY.getMetaStateHandle(), 
times(1)).discardState();
+               for (StreamStateHandle stateHandle : 
stateHandleY.getSharedState().values()) {
+                       verify(stateHandle, times(1)).discardState();
+               }
+
+               // This should still be unaffected
+               verify(stateHandleX.getMetaStateHandle(), 
never()).discardState();
+               for (StreamStateHandle stateHandle : 
stateHandleX.getSharedState().values()) {
+                       verify(stateHandle, never()).discardState();
+               }
+
+               // We re-register the handle with a new registry
+               SharedStateRegistry sharedStateRegistryB = spy(new 
SharedStateRegistry());
+               stateHandleX.registerSharedStates(sharedStateRegistryB);
+               stateHandleX.discardState();
+
+               // Should be completely discarded because it is tracked through 
the new registry
+               verify(stateHandleX.getMetaStateHandle(), 
times(1)).discardState();
+               for (StreamStateHandle stateHandle : 
stateHandleX.getSharedState().values()) {
+                       verify(stateHandle, times(1)).discardState();
+               }
+
+               sharedStateRegistryB.close();
+       }
+
        private static IncrementalKeyedStateHandle create(Random rnd) {
                return new IncrementalKeyedStateHandle(
                        UUID.nameUUIDFromBytes("test".getBytes()),

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
index a0c4412..037ecd1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
@@ -21,7 +21,8 @@ package org.apache.flink.runtime.testutils;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,14 +42,21 @@ public class RecoverableCompletedCheckpointStore implements 
CompletedCheckpointS
 
        private final ArrayDeque<CompletedCheckpoint> suspended = new 
ArrayDeque<>(2);
 
+       private final int maxRetainedCheckpoints;
+
+       public RecoverableCompletedCheckpointStore() {
+               this(1);
+       }
+
+       public RecoverableCompletedCheckpointStore(int maxRetainedCheckpoints) {
+               Preconditions.checkArgument(maxRetainedCheckpoints > 0);
+               this.maxRetainedCheckpoints = maxRetainedCheckpoints;
+       }
+
        @Override
-       public void recover(SharedStateRegistry sharedStateRegistry) throws 
Exception {
+       public void recover() throws Exception {
                checkpoints.addAll(suspended);
                suspended.clear();
-
-               for (CompletedCheckpoint checkpoint : checkpoints) {
-                       
checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
-               }
        }
 
        @Override
@@ -56,13 +64,16 @@ public class RecoverableCompletedCheckpointStore implements 
CompletedCheckpointS
 
                checkpoints.addLast(checkpoint);
 
-
-               if (checkpoints.size() > 1) {
-                       CompletedCheckpoint checkpointToSubsume = 
checkpoints.removeFirst();
-                       checkpointToSubsume.discardOnSubsume();
+               if (checkpoints.size() > maxRetainedCheckpoints) {
+                       removeOldestCheckpoint();
                }
        }
 
+       public void removeOldestCheckpoint() throws Exception {
+               CompletedCheckpoint checkpointToSubsume = 
checkpoints.removeFirst();
+               checkpointToSubsume.discardOnSubsume();
+       }
+
        @Override
        public CompletedCheckpoint getLatestCheckpoint() throws Exception {
                return checkpoints.isEmpty() ? null : checkpoints.getLast();
@@ -96,7 +107,7 @@ public class RecoverableCompletedCheckpointStore implements 
CompletedCheckpointS
 
        @Override
        public int getMaxNumberOfRetainedCheckpoints() {
-               return 1;
+               return maxRetainedCheckpoints;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index cb8639b..1ba5fb1 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.TaskInfo;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
@@ -779,9 +780,11 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
        }
 
        private String createOperatorIdentifier(StreamOperator<?> operator, int 
vertexId) {
+
+               TaskInfo taskInfo = getEnvironment().getTaskInfo();
                return operator.getClass().getSimpleName() +
-                               "_" + vertexId +
-                               "_" + 
getEnvironment().getTaskInfo().getIndexOfThisSubtask();
+                       "_" + operator.getOperatorID() +
+                       "_(" + taskInfo.getIndexOfThisSubtask() + "/" + 
taskInfo.getNumberOfParallelSubtasks() + ")";
        }
 
        /**
@@ -892,18 +895,22 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                                if 
(asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,
                                                
CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
 
+                                       TaskStateSnapshot acknowledgedState = 
hasState ? taskOperatorSubtaskStates : null;
+
                                        // we signal stateless tasks by 
reporting null, so that there are no attempts to assign empty state
                                        // to stateless tasks on restore. This 
enables simple job modifications that only concern
                                        // stateless without the need to assign 
them uids to match their (always empty) states.
                                        
owner.getEnvironment().acknowledgeCheckpoint(
                                                
checkpointMetaData.getCheckpointId(),
                                                checkpointMetrics,
-                                               hasState ? 
taskOperatorSubtaskStates : null);
+                                               acknowledgedState);
+
+                                       LOG.debug("{} - finished asynchronous 
part of checkpoint {}. Asynchronous duration: {} ms",
+                                               owner.getName(), 
checkpointMetaData.getCheckpointId(), asyncDurationMillis);
+
+                                       LOG.trace("{} - reported the following 
states in snapshot for checkpoint {}: {}.",
+                                               owner.getName(), 
checkpointMetaData.getCheckpointId(), acknowledgedState);
 
-                                       if (LOG.isDebugEnabled()) {
-                                               LOG.debug("{} - finished 
asynchronous part of checkpoint {}. Asynchronous duration: {} ms",
-                                                       owner.getName(), 
checkpointMetaData.getCheckpointId(), asyncDurationMillis);
-                                       }
                                } else {
                                        LOG.debug("{} - asynchronous part of 
checkpoint {} could not be completed because it was closed before.",
                                                owner.getName(),

http://git-wip-us.apache.org/repos/asf/flink/blob/91a4b276/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 22ed847..c525a37 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.core.fs.Path;
@@ -48,21 +49,22 @@ import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.curator.test.TestingServer;
 import org.junit.After;
-import org.junit.AfterClass;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.TestName;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -87,6 +89,8 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
        private static TestStreamEnvironment env;
 
+       private static TestingServer zkServer;
+
        @Rule
        public TemporaryFolder tempFolder = new TemporaryFolder();
 
@@ -101,11 +105,27 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
        }
 
        enum StateBackendEnum {
-               MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, MEM_ASYNC, 
FILE_ASYNC
+               MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, 
ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC
        }
 
-       @BeforeClass
-       public static void startTestCluster() {
+       @Before
+       public void startTestCluster() throws Exception {
+
+               // print a message when starting a test method to avoid Travis' 
<tt>"Maven produced no
+               // output for xxx seconds."</tt> messages
+               System.out.println(
+                       "Starting " + getClass().getCanonicalName() + "#" + 
name.getMethodName() + ".");
+
+               // Testing HA Scenario / ZKCompletedCheckpointStore with 
incremental checkpoints
+               if (ROCKSDB_INCREMENTAL_ZK.equals(stateBackendEnum)) {
+                       zkServer = new TestingServer();
+                       zkServer.start();
+               }
+
+               TemporaryFolder temporaryFolder = new TemporaryFolder();
+               temporaryFolder.create();
+               final File haDir = temporaryFolder.newFolder();
+
                Configuration config = new Configuration();
                config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
                config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
PARALLELISM / 2);
@@ -113,28 +133,18 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                // the default network buffers size (10% of heap max =~ 150MB) 
seems to much for this test case
                config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 
80L << 20); // 80 MB
 
+               if (zkServer != null) {
+                       config.setString(HighAvailabilityOptions.HA_MODE, 
"ZOOKEEPER");
+                       
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkServer.getConnectString());
+                       
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
haDir.toURI().toString());
+               }
+
                cluster = new LocalFlinkMiniCluster(config, false);
                cluster.start();
 
                env = new TestStreamEnvironment(cluster, PARALLELISM);
                env.getConfig().setUseSnapshotCompression(true);
-       }
-
-       @AfterClass
-       public static void stopTestCluster() {
-               if (cluster != null) {
-                       cluster.stop();
-               }
-       }
-
-       @Before
-       public void beforeTest() throws IOException {
-               // print a message when starting a test method to avoid Travis' 
<tt>"Maven produced no
-               // output for xxx seconds."</tt> messages
-               System.out.println(
-                       "Starting " + getClass().getCanonicalName() + "#" + 
name.getMethodName() + ".");
 
-               // init state back-end
                switch (stateBackendEnum) {
                        case MEM:
                                this.stateBackend = new 
MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
@@ -159,7 +169,8 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                                this.stateBackend = rdb;
                                break;
                        }
-                       case ROCKSDB_INCREMENTAL: {
+                       case ROCKSDB_INCREMENTAL:
+                       case ROCKSDB_INCREMENTAL_ZK: {
                                String rocksDb = 
tempFolder.newFolder().getAbsolutePath();
                                String backups = 
tempFolder.newFolder().getAbsolutePath();
                                // we use the fs backend with small threshold 
here to test the behaviour with file
@@ -173,16 +184,25 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                                this.stateBackend = rdb;
                                break;
                        }
-
+                       default:
+                               throw new IllegalStateException("No backend 
selected.");
                }
        }
 
-       /**
-        * Prints a message when finishing a test method to avoid Travis' 
<tt>"Maven produced no output
-        * for xxx seconds."</tt> messages.
-        */
        @After
-       public void afterTest() {
+       public void stopTestCluster() throws IOException {
+               if (cluster != null) {
+                       cluster.stop();
+                       cluster = null;
+               }
+
+               if (zkServer != null) {
+                       zkServer.stop();
+                       zkServer = null;
+               }
+
+               //Prints a message when finishing a test method to avoid 
Travis' <tt>"Maven produced no output
+               // for xxx seconds."</tt> messages.
                System.out.println(
                        "Finished " + getClass().getCanonicalName() + "#" + 
name.getMethodName() + ".");
        }

Reply via email to