[FLINK-7268] [checkpoints] Scope SharedStateRegistry objects per (re)start

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/09caa9ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/09caa9ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/09caa9ff

Branch: refs/heads/release-1.3
Commit: 09caa9ffdc8168610c7d0260360c034ea87f904c
Parents: 0225db2
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Tue Jul 25 12:04:16 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Fri Jul 28 15:42:28 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBKeyedStateBackend.java         |  13 +-
 .../checkpoint/CheckpointCoordinator.java       |  32 +-
 .../runtime/checkpoint/CompletedCheckpoint.java |   3 +-
 .../checkpoint/CompletedCheckpointStore.java    |   5 +-
 .../StandaloneCompletedCheckpointStore.java     |   4 +-
 .../ZooKeeperCompletedCheckpointStore.java      |  12 +-
 .../runtime/executiongraph/ExecutionGraph.java  |   6 +-
 .../state/IncrementalKeyedStateHandle.java      |  68 ++-
 .../runtime/state/KeyGroupsStateHandle.java     |   2 +-
 .../runtime/state/MultiStreamStateHandle.java   |  10 +-
 .../runtime/state/SharedStateRegistry.java      |  52 ++-
 .../state/SharedStateRegistryFactory.java       |  35 ++
 .../state/memory/ByteStreamStateHandle.java     |   1 +
 ...tCoordinatorExternalizedCheckpointsTest.java |  22 +-
 .../CheckpointCoordinatorFailureTest.java       |   7 +-
 .../CheckpointCoordinatorMasterHooksTest.java   |   7 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 437 ++++++++++---------
 .../checkpoint/CheckpointStateRestoreTest.java  |  10 +-
 ...ZooKeeperCompletedCheckpointStoreITCase.java |  25 +-
 .../ZooKeeperCompletedCheckpointStoreTest.java  |   7 +-
 .../state/IncrementalKeyedStateHandleTest.java  |  75 +++-
 .../RecoverableCompletedCheckpointStore.java    |  33 +-
 .../streaming/runtime/tasks/StreamTask.java     |   1 -
 ...tractEventTimeWindowCheckpointingITCase.java |  85 +++-
 ...ckendEventTimeWindowCheckpointingITCase.java |   4 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |   4 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |   4 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |  51 +++
 ...ckendEventTimeWindowCheckpointingITCase.java |   4 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |   4 +-
 ...ckendEventTimeWindowCheckpointingITCase.java |   4 +-
 31 files changed, 688 insertions(+), 339 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index a6b53ec..7e0910e 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -231,7 +231,10 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                this.restoredKvStateMetaInfos = new HashMap<>();
                this.materializedSstFiles = new TreeMap<>();
                this.backendUID = UUID.randomUUID();
-               LOG.debug("Setting initial keyed backend uid for operator {} to 
{}.", this.operatorIdentifier, this.backendUID);
+
+               LOG.debug("Setting initial backend ID in 
RocksDBKeyedStateBackend for operator {} to {}.",
+                       this.operatorIdentifier,
+                       this.backendUID);
        }
 
        /**
@@ -835,11 +838,17 @@ public class RocksDBKeyedStateBackend<K> extends 
AbstractKeyedStateBackend<K> {
                void takeSnapshot() throws Exception {
                        assert 
(Thread.holdsLock(stateBackend.asyncSnapshotLock));
 
+                       final long lastCompletedCheckpoint;
+
                        // use the last completed checkpoint as the comparison 
base.
                        synchronized (stateBackend.materializedSstFiles) {
-                               baseSstFiles = 
stateBackend.materializedSstFiles.get(stateBackend.lastCompletedCheckpointId);
+                               lastCompletedCheckpoint = 
stateBackend.lastCompletedCheckpointId;
+                               baseSstFiles = 
stateBackend.materializedSstFiles.get(lastCompletedCheckpoint);
                        }
 
+                       LOG.trace("Taking incremental snapshot for checkpoint 
{}. Snapshot is based on last completed checkpoint {} " +
+                               "assuming the following (shared) files as base: 
{}.", checkpointId, lastCompletedCheckpoint, baseSstFiles);
+
                        // save meta data
                        for (Map.Entry<String, Tuple2<ColumnFamilyHandle, 
RegisteredKeyedBackendStateMetaInfo<?, ?>>> stateMetaInfoEntry
                                        : 
stateBackend.kvStateInformation.entrySet()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 82933ac..fe94d25 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -42,6 +42,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistryFactory;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.util.Preconditions;
@@ -174,8 +175,11 @@ public class CheckpointCoordinator {
        @Nullable
        private CheckpointStatsTracker statsTracker;
 
+       /** A factory for SharedStateRegistry objects */
+       private final SharedStateRegistryFactory sharedStateRegistryFactory;
+
        /** Registry that tracks state which is shared across (incremental) 
checkpoints */
-       private final SharedStateRegistry sharedStateRegistry;
+       private SharedStateRegistry sharedStateRegistry;
 
        // 
--------------------------------------------------------------------------------------------
 
@@ -192,7 +196,8 @@ public class CheckpointCoordinator {
                        CheckpointIDCounter checkpointIDCounter,
                        CompletedCheckpointStore completedCheckpointStore,
                        @Nullable String checkpointDirectory,
-                       Executor executor) {
+                       Executor executor,
+                       SharedStateRegistryFactory sharedStateRegistryFactory) {
 
                // sanity checks
                checkArgument(baseInterval > 0, "Checkpoint timeout must be 
larger than zero");
@@ -230,7 +235,8 @@ public class CheckpointCoordinator {
                this.completedCheckpointStore = 
checkNotNull(completedCheckpointStore);
                this.checkpointDirectory = checkpointDirectory;
                this.executor = checkNotNull(executor);
-               this.sharedStateRegistry = new SharedStateRegistry(executor);
+               this.sharedStateRegistryFactory = 
checkNotNull(sharedStateRegistryFactory);
+               this.sharedStateRegistry = 
sharedStateRegistryFactory.create(executor);
 
                this.recentPendingCheckpoints = new 
ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
                this.masterHooks = new HashMap<>();
@@ -1044,10 +1050,23 @@ public class CheckpointCoordinator {
                                throw new 
IllegalStateException("CheckpointCoordinator is shut down");
                        }
 
-                       // Recover the checkpoints
-                       completedCheckpointStore.recover(sharedStateRegistry);
+                       // We create a new shared state registry object, so 
that all pending async disposal requests from previous
+                       // runs will go against the old object (were they can 
do no harm).
+                       // This must happen under the checkpoint lock.
+                       sharedStateRegistry.close();
+                       sharedStateRegistry = 
sharedStateRegistryFactory.create(executor);
+
+                       // Recover the checkpoints, TODO this could be done 
only when there is a new leader, not on each recovery
+                       completedCheckpointStore.recover();
+
+                       // Now, we re-register all (shared) states from the 
checkpoint store with the new registry
+                       for (CompletedCheckpoint completedCheckpoint : 
completedCheckpointStore.getAllCheckpoints()) {
+                               
completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
+                       }
+
+                       LOG.debug("Status of the shared state registry after 
restore: {}.", sharedStateRegistry);
 
-                       // restore from the latest checkpoint
+                       // Restore from the latest checkpoint
                        CompletedCheckpoint latest = 
completedCheckpointStore.getLatestCheckpoint();
 
                        if (latest == null) {
@@ -1121,7 +1140,6 @@ public class CheckpointCoordinator {
                CompletedCheckpoint savepoint = 
SavepointLoader.loadAndValidateSavepoint(
                                job, tasks, savepointPath, userClassLoader, 
allowNonRestored);
 
-               
savepoint.registerSharedStatesAfterRestored(sharedStateRegistry);
                completedCheckpointStore.addCheckpoint(savepoint);
                
                // Reset the checkpoint ID counter

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
index 56aa19d..76d1580 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
@@ -209,6 +209,8 @@ public class CompletedCheckpoint implements Serializable {
 
        private void doDiscard() throws Exception {
 
+               LOG.trace("Executing discard procedure for {}.", this);
+
                try {
                        // collect exceptions and continue cleanup
                        Exception exception = null;
@@ -225,7 +227,6 @@ public class CompletedCheckpoint implements Serializable {
                        // discard private state objects
                        try {
                                Collection<OperatorState> values = 
operatorStates.values();
-                               LOG.trace("About to discard operator states 
{}.", values);
                                
StateUtil.bestEffortDiscardAllStateObjects(values);
                        } catch (Exception e) {
                                exception = ExceptionUtils.firstOrSuppressed(e, 
exception);

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
index 45d407e..82193b5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.state.SharedStateRegistry;
 
 import java.util.List;
 
@@ -33,10 +32,8 @@ public interface CompletedCheckpointStore {
         *
         * <p>After a call to this method, {@link #getLatestCheckpoint()} 
returns the latest
         * available checkpoint.
-        *
-        * @param sharedStateRegistry the shared state registry to register 
recovered states.
         */
-       void recover(SharedStateRegistry sharedStateRegistry) throws Exception;
+       void recover() throws Exception;
 
        /**
         * Adds a {@link CompletedCheckpoint} instance to the list of completed 
checkpoints.

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
index fbb0198..63e7468 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
-import org.apache.flink.runtime.state.SharedStateRegistry;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,7 +57,7 @@ public class StandaloneCompletedCheckpointStore implements 
CompletedCheckpointSt
        }
 
        @Override
-       public void recover(SharedStateRegistry sharedStateRegistry) throws 
Exception {
+       public void recover() throws Exception {
                // Nothing to do
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
index c4cb6bc..88dd0d4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java
@@ -18,20 +18,21 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
-import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
 import org.apache.flink.util.FlinkException;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.utils.ZKPaths;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
@@ -138,14 +139,13 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
         * that the history of checkpoints is consistent.
         */
        @Override
-       public void recover(SharedStateRegistry sharedStateRegistry) throws 
Exception {
+       public void recover() throws Exception {
                LOG.info("Recovering checkpoints from ZooKeeper.");
 
                // Clear local handles in order to prevent duplicates on
                // recovery. The local handles should reflect the state
                // of ZooKeeper.
                completedCheckpoints.clear();
-               sharedStateRegistry.clear();
 
                // Get all there is first
                List<Tuple2<RetrievableStateHandle<CompletedCheckpoint>, 
String>> initialCheckpoints;
@@ -170,8 +170,6 @@ public class ZooKeeperCompletedCheckpointStore implements 
CompletedCheckpointSto
                        try {
                                completedCheckpoint = 
retrieveCompletedCheckpoint(checkpointStateHandle);
                                if (completedCheckpoint != null) {
-                                       // Re-register all shared states in the 
checkpoint.
-                                       
completedCheckpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
                                        
completedCheckpoints.add(completedCheckpoint);
                                }
                        } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index f9d2d69..c105d2d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -66,6 +66,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.util.SerializedThrowable;
@@ -74,8 +75,8 @@ import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
-
 import org.apache.flink.util.StringUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -459,7 +460,8 @@ public class ExecutionGraph implements 
AccessExecutionGraph, Archiveable<Archive
                        checkpointIDCounter,
                        checkpointStore,
                        checkpointDir,
-                       ioExecutor);
+                       ioExecutor,
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                // register the master hooks on the checkpoint coordinator
                for (MasterTriggerRestoreHook<?> hook : masterHooks) {

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
index 0085890..0268b10 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandle.java
@@ -65,27 +65,27 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
        private final UUID backendIdentifier;
 
        /**
-        * The key-group range covered by this state handle
+        * The key-group range covered by this state handle.
         */
        private final KeyGroupRange keyGroupRange;
 
        /**
-        * The checkpoint Id
+        * The checkpoint Id.
         */
        private final long checkpointId;
 
        /**
-        * Shared state in the incremental checkpoint. This i
+        * Shared state in the incremental checkpoint.
         */
        private final Map<StateHandleID, StreamStateHandle> sharedState;
 
        /**
-        * Private state in the incremental checkpoint
+        * Private state in the incremental checkpoint.
         */
        private final Map<StateHandleID, StreamStateHandle> privateState;
 
        /**
-        * Primary meta data state of the incremental checkpoint
+        * Primary meta data state of the incremental checkpoint.
         */
        private final StreamStateHandle metaStateHandle;
 
@@ -143,16 +143,21 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
 
        @Override
        public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
-               if (this.keyGroupRange.getIntersection(keyGroupRange) != 
KeyGroupRange.EMPTY_KEY_GROUP_RANGE) {
-                       return this;
-               } else {
-                       return null;
-               }
+               return 
KeyGroupRange.EMPTY_KEY_GROUP_RANGE.equals(this.keyGroupRange.getIntersection(keyGroupRange))
 ?
+                       null : this;
        }
 
        @Override
        public void discardState() throws Exception {
 
+               SharedStateRegistry registry = this.sharedStateRegistry;
+               final boolean isRegistered = (registry != null);
+
+               LOG.trace("Discarding IncrementalKeyedStateHandle (registered = 
{}) for checkpoint {} from backend with id {}.",
+                       isRegistered,
+                       checkpointId,
+                       backendIdentifier);
+
                try {
                        metaStateHandle.discardState();
                } catch (Exception e) {
@@ -168,19 +173,20 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
                // If this was not registered, we can delete the shared state. 
We can simply apply this
                // to all handles, because all handles that have not been 
created for the first time for this
                // are only placeholders at this point (disposing them is a 
NOP).
-               if (sharedStateRegistry == null) {
-                       try {
-                               
StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
-                       } catch (Exception e) {
-                               LOG.warn("Could not properly discard new sst 
file states.", e);
-                       }
-               } else {
+               if (isRegistered) {
                        // If this was registered, we only unregister all our 
referenced shared states
                        // from the registry.
                        for (StateHandleID stateHandleID : 
sharedState.keySet()) {
-                               sharedStateRegistry.unregisterReference(
+                               registry.unregisterReference(
                                        
createSharedStateRegistryKeyFromFileName(stateHandleID));
                        }
+               } else {
+                       // Otherwise, we assume to own those handles and 
dispose them directly.
+                       try {
+                               
StateUtil.bestEffortDiscardAllStateObjects(sharedState.values());
+                       } catch (Exception e) {
+                               LOG.warn("Could not properly discard new sst 
file states.", e);
+                       }
                }
        }
 
@@ -202,10 +208,21 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
        @Override
        public void registerSharedStates(SharedStateRegistry stateRegistry) {
 
-               Preconditions.checkState(sharedStateRegistry == null, "The 
state handle has already registered its shared states.");
+               // This is a quick check to avoid that we register twice with 
the same registry. However, the code allows to
+               // register again with a different registry. The implication is 
that ownership is transferred to this new
+               // registry. This should only happen in case of a restart, when 
the CheckpointCoordinator creates a new
+               // SharedStateRegistry for the current attempt and the old 
registry becomes meaningless. We also assume that
+               // an old registry object from a previous run is due to be GCed 
and will never be used for registration again.
+               Preconditions.checkState(
+                       sharedStateRegistry != stateRegistry,
+                       "The state handle has already registered its shared 
states to the given registry.");
 
                sharedStateRegistry = Preconditions.checkNotNull(stateRegistry);
 
+               LOG.trace("Registering IncrementalKeyedStateHandle for 
checkpoint {} from backend with id {}.",
+                       checkpointId,
+                       backendIdentifier);
+
                for (Map.Entry<StateHandleID, StreamStateHandle> 
sharedStateHandle : sharedState.entrySet()) {
                        SharedStateRegistryKey registryKey =
                                
createSharedStateRegistryKeyFromFileName(sharedStateHandle.getKey());
@@ -284,5 +301,18 @@ public class IncrementalKeyedStateHandle implements 
KeyedStateHandle {
                result = 31 * result + getMetaStateHandle().hashCode();
                return result;
        }
+
+       @Override
+       public String toString() {
+               return "IncrementalKeyedStateHandle{" +
+                       "backendIdentifier=" + backendIdentifier +
+                       ", keyGroupRange=" + keyGroupRange +
+                       ", checkpointId=" + checkpointId +
+                       ", sharedState=" + sharedState +
+                       ", privateState=" + privateState +
+                       ", metaStateHandle=" + metaStateHandle +
+                       ", registered=" + (sharedStateRegistry != null) +
+                       '}';
+       }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index 8e38ad4..8092f6c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -141,7 +141,7 @@ public class KeyGroupsStateHandle implements 
StreamStateHandle, KeyedStateHandle
        public String toString() {
                return "KeyGroupsStateHandle{" +
                                "groupRangeOffsets=" + groupRangeOffsets +
-                               ", data=" + stateHandle +
+                               ", stateHandle=" + stateHandle +
                                '}';
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
index b95dace..1960c1c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/MultiStreamStateHandle.java
@@ -38,7 +38,7 @@ public class MultiStreamStateHandle implements 
StreamStateHandle {
        private final List<StreamStateHandle> stateHandles;
        private final long stateSize;
 
-       public MultiStreamStateHandle(List<StreamStateHandle> stateHandles) 
throws IOException {
+       public MultiStreamStateHandle(List<StreamStateHandle> stateHandles) {
                this.stateHandles = Preconditions.checkNotNull(stateHandles);
                long calculateSize = 0L;
                for(StreamStateHandle stateHandle : stateHandles) {
@@ -62,6 +62,14 @@ public class MultiStreamStateHandle implements 
StreamStateHandle {
                return stateSize;
        }
 
+       @Override
+       public String toString() {
+               return "MultiStreamStateHandle{" +
+                       "stateHandles=" + stateHandles +
+                       ", stateSize=" + stateSize +
+                       '}';
+       }
+
        static final class MultiFSDataInputStream extends 
AbstractMultiFSDataInputStream {
 
                private final TreeMap<Long, StreamStateHandle> stateHandleMap;

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
index e0ca873..347f30c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistry.java
@@ -38,13 +38,24 @@ import java.util.concurrent.Executor;
  * maintain the reference count of {@link StreamStateHandle}s by a key that 
(logically) identifies
  * them.
  */
-public class SharedStateRegistry {
+public class SharedStateRegistry implements AutoCloseable {
 
        private static final Logger LOG = 
LoggerFactory.getLogger(SharedStateRegistry.class);
 
+       /** A singleton object for the default implementation of a {@link 
SharedStateRegistryFactory} */
+       public static final SharedStateRegistryFactory DEFAULT_FACTORY = new 
SharedStateRegistryFactory() {
+               @Override
+               public SharedStateRegistry create(Executor deleteExecutor) {
+                       return new SharedStateRegistry(deleteExecutor);
+               }
+       };
+
        /** All registered state objects by an artificial key */
        private final Map<SharedStateRegistryKey, 
SharedStateRegistry.SharedStateEntry> registeredStates;
 
+       /** This flag indicates whether or not the registry is open or if 
close() was called */
+       private boolean open;
+
        /** Executor for async state deletion */
        private final Executor asyncDisposalExecutor;
 
@@ -56,6 +67,7 @@ public class SharedStateRegistry {
        public SharedStateRegistry(Executor asyncDisposalExecutor) {
                this.registeredStates = new HashMap<>();
                this.asyncDisposalExecutor = 
Preconditions.checkNotNull(asyncDisposalExecutor);
+               this.open = true;
        }
 
        /**
@@ -82,6 +94,9 @@ public class SharedStateRegistry {
                SharedStateRegistry.SharedStateEntry entry;
 
                synchronized (registeredStates) {
+
+                       Preconditions.checkState(open, "Attempt to register 
state to closed SharedStateRegistry.");
+
                        entry = registeredStates.get(registrationKey);
 
                        if (entry == null) {
@@ -96,6 +111,11 @@ public class SharedStateRegistry {
                                // delete if this is a real duplicate
                                if (!Objects.equals(state, entry.stateHandle)) {
                                        scheduledStateDeletion = state;
+                                       LOG.trace("Identified duplicate state 
registration under key {}. New state {} was determined to " +
+                                                       "be an unnecessary copy 
of existing state {} and will be dropped.",
+                                               registrationKey,
+                                               state,
+                                               entry.stateHandle);
                                }
                                entry.increaseReferenceCount();
                        }
@@ -112,7 +132,8 @@ public class SharedStateRegistry {
         *
         * @param registrationKey the shared state for which we release a 
reference.
         * @return the result of the request, consisting of the reference count 
after this operation
-        * and the state handle, or null if the state handle was deleted 
through this request.
+        * and the state handle, or null if the state handle was deleted 
through this request. Returns null if the registry
+        * was previously closed.
         */
        public Result unregisterReference(SharedStateRegistryKey 
registrationKey) {
 
@@ -123,6 +144,7 @@ public class SharedStateRegistry {
                SharedStateRegistry.SharedStateEntry entry;
 
                synchronized (registeredStates) {
+
                        entry = registeredStates.get(registrationKey);
 
                        Preconditions.checkState(entry != null,
@@ -164,10 +186,18 @@ public class SharedStateRegistry {
                }
        }
 
+       @Override
+       public String toString() {
+               synchronized (registeredStates) {
+                       return "SharedStateRegistry{" +
+                               "registeredStates=" + registeredStates +
+                               '}';
+               }
+       }
+
        private void scheduleAsyncDelete(StreamStateHandle streamStateHandle) {
                // We do the small optimization to not issue discards for 
placeholders, which are NOPs.
                if (streamStateHandle != null && 
!isPlaceholder(streamStateHandle)) {
-
                        LOG.trace("Scheduled delete of state handle {}.", 
streamStateHandle);
                        asyncDisposalExecutor.execute(
                                new 
SharedStateRegistry.AsyncDisposalRunnable(streamStateHandle));
@@ -178,6 +208,13 @@ public class SharedStateRegistry {
                return stateHandle instanceof PlaceholderStreamStateHandle;
        }
 
+       @Override
+       public void close() {
+               synchronized (registeredStates) {
+                       open = false;
+               }
+       }
+
        /**
         * An entry in the registry, tracking the handle and the corresponding 
reference count.
         */
@@ -279,13 +316,4 @@ public class SharedStateRegistry {
                        }
                }
        }
-
-       /**
-        * Clears the registry.
-        */
-       public void clear() {
-               synchronized (registeredStates) {
-                       registeredStates.clear();
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
new file mode 100644
index 0000000..05c9825
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SharedStateRegistryFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Simple factory to produce {@link SharedStateRegistry} objects.
+ */
+public interface SharedStateRegistryFactory {
+
+       /**
+        * Factory method for {@link SharedStateRegistry}.
+        *
+        * @param deleteExecutor executor used to run (async) deletes.
+        * @return a SharedStateRegistry object
+        */
+       SharedStateRegistry create(Executor deleteExecutor);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
index 9ba9d35..3a43d4f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
@@ -95,6 +95,7 @@ public class ByteStreamStateHandle implements 
StreamStateHandle {
        public String toString() {
                return "ByteStreamStateHandle{" +
                        "handleName='" + handleName + '\'' +
+                       ", dataBytes=" + data.length +
                        '}';
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
index d293eea..edc29fe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorExternalizedCheckpointsTest.java
@@ -18,14 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
 import org.apache.flink.runtime.concurrent.Executors;
@@ -37,11 +29,22 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
 /**
  * CheckpointCoordinator tests for externalized checkpoints.
  *
@@ -91,7 +94,8 @@ public class CheckpointCoordinatorExternalizedCheckpointsTest 
{
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1),
                        checkpointDir.getAbsolutePath(),
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
                assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
index 344b340..5cca94f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java
@@ -78,7 +78,8 @@ public class CheckpointCoordinatorFailureTest extends 
TestLogger {
                        new StandaloneCheckpointIDCounter(),
                        new FailingCompletedCheckpointStore(),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                coord.triggerCheckpoint(triggerTimestamp, false);
 
@@ -113,7 +114,7 @@ public class CheckpointCoordinatorFailureTest extends 
TestLogger {
                
when(subtaskState.getManagedKeyedState()).thenReturn(managedRawHandle);
                
                AcknowledgeCheckpoint acknowledgeMessage = new 
AcknowledgeCheckpoint(jid, executionAttemptId, checkpointId, new 
CheckpointMetrics(), subtaskState);
-               
+
                try {
                        coord.receiveAcknowledgeMessage(acknowledgeMessage);
                        fail("Expected a checkpoint exception because the 
completed checkpoint store could not " +
@@ -136,7 +137,7 @@ public class CheckpointCoordinatorFailureTest extends 
TestLogger {
        private static final class FailingCompletedCheckpointStore implements 
CompletedCheckpointStore {
 
                @Override
-               public void recover(SharedStateRegistry sharedStateRegistry) 
throws Exception {
+               public void recover() throws Exception {
                        throw new UnsupportedOperationException("Not 
implemented.");
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
index d6daa4e..94063a9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java
@@ -30,9 +30,9 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 
 import org.junit.Test;
-
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -47,14 +47,12 @@ import java.util.List;
 import java.util.concurrent.Executor;
 
 import static 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest.mockExecutionVertex;
-
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isNull;
 import static org.mockito.Mockito.any;
@@ -405,7 +403,8 @@ public class CheckpointCoordinatorMasterHooksTest {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(10),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
        }
 
        private static <T> T mockGeneric(Class<?> clazz) {

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 186a819..16a89ea 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -56,6 +54,9 @@ import 
org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -140,7 +141,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(1),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        // nothing should be happening
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -200,7 +202,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(1),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        // nothing should be happening
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -251,7 +254,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(1),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        // nothing should be happening
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -303,7 +307,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(1),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -407,7 +412,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(1),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -526,7 +532,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(1),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -698,7 +705,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(2),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -828,7 +836,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(10),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
                        assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -992,7 +1001,8 @@ public class CheckpointCoordinatorTest extends TestLogger {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(2),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        // trigger a checkpoint, partially acknowledged
                        assertTrue(coord.triggerCheckpoint(timestamp, false));
@@ -1019,8 +1029,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                Thread.sleep(250);
                        }
                        while (!checkpoint.isDiscarded() &&
-                                       coord.getNumberOfPendingCheckpoints() > 
0 &&
-                                       System.currentTimeMillis() < deadline);
+                               coord.getNumberOfPendingCheckpoints() > 0 &&
+                               System.currentTimeMillis() < deadline);
 
                        assertTrue("Checkpoint was not canceled by the 
timeout", checkpoint.isDiscarded());
                        assertEquals(0, coord.getNumberOfPendingCheckpoints());
@@ -1071,7 +1081,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(2),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -1134,7 +1145,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -1274,7 +1286,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(2),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
 
                        coord.startCheckpointScheduler();
@@ -1296,7 +1309,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        int numCallsSoFar = numCalls.get();
                        Thread.sleep(400);
                        assertTrue(numCallsSoFar == numCalls.get() ||
-                                       numCallsSoFar+1 == numCalls.get());
+                               numCallsSoFar+1 == numCalls.get());
 
                        // start another sequence of periodic scheduling
                        numCalls.set(0);
@@ -1318,7 +1331,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        numCallsSoFar = numCalls.get();
                        Thread.sleep(400);
                        assertTrue(numCallsSoFar == numCalls.get() ||
-                                       numCallsSoFar + 1 == numCalls.get());
+                               numCallsSoFar + 1 == numCalls.get());
 
                        coord.shutdown(JobStatus.FINISHED);
                }
@@ -1354,19 +1367,20 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                final long delay = 50;
 
                final CheckpointCoordinator coord = new CheckpointCoordinator(
-                               jid,
-                               2,           // periodic interval is 2 ms
-                               200_000,     // timeout is very long (200 s)
-                               delay,       // 50 ms delay between checkpoints
-                               1,
-                               ExternalizedCheckpointSettings.none(),
-                               new ExecutionVertex[] { vertex },
-                               new ExecutionVertex[] { vertex },
-                               new ExecutionVertex[] { vertex },
-                               new StandaloneCheckpointIDCounter(),
-                               new StandaloneCompletedCheckpointStore(2),
-                               "dummy-path",
-                               Executors.directExecutor());
+                       jid,
+                       2,           // periodic interval is 2 ms
+                       200_000,     // timeout is very long (200 s)
+                       delay,       // 50 ms delay between checkpoints
+                       1,
+                       ExternalizedCheckpointSettings.none(),
+                       new ExecutionVertex[] { vertex },
+                       new ExecutionVertex[] { vertex },
+                       new ExecutionVertex[] { vertex },
+                       new StandaloneCheckpointIDCounter(),
+                       new StandaloneCompletedCheckpointStore(2),
+                       "dummy-path",
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                try {
                        coord.startCheckpointScheduler();
@@ -1439,7 +1453,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                assertEquals(0, coord.getNumberOfPendingCheckpoints());
                assertEquals(0, 
coord.getNumberOfRetainedSuccessfulCheckpoints());
@@ -1596,7 +1611,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        counter,
                        new StandaloneCompletedCheckpointStore(10),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
@@ -1702,7 +1718,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(2),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        coord.startCheckpointScheduler();
 
@@ -1715,12 +1732,12 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                                Thread.sleep(20);
                        }
                        while ((now = System.currentTimeMillis()) < minDuration 
||
-                                       (numCalls.get() < maxConcurrentAttempts 
&& now < timeout));
+                               (numCalls.get() < maxConcurrentAttempts && now 
< timeout));
 
                        assertEquals(maxConcurrentAttempts, numCalls.get());
 
                        verify(triggerVertex.getCurrentExecutionAttempt(), 
times(maxConcurrentAttempts))
-                                       .triggerCheckpoint(anyLong(), 
anyLong(), any(CheckpointOptions.class));
+                               .triggerCheckpoint(anyLong(), anyLong(), 
any(CheckpointOptions.class));
 
                        // now, once we acknowledge one checkpoint, it should 
trigger the next one
                        coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, ackAttemptID, 1L));
@@ -1775,7 +1792,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(2),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        coord.startCheckpointScheduler();
 
@@ -1788,7 +1806,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                Thread.sleep(20);
                        }
                        while ((now = System.currentTimeMillis()) < minDuration 
||
-                                       (coord.getNumberOfPendingCheckpoints() 
< maxConcurrentAttempts && now < timeout));
+                               (coord.getNumberOfPendingCheckpoints() < 
maxConcurrentAttempts && now < timeout));
 
                        // validate that the pending checkpoints are there
                        assertEquals(maxConcurrentAttempts, 
coord.getNumberOfPendingCheckpoints());
@@ -1806,7 +1824,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                Thread.sleep(20);
                        }
                        while (coord.getPendingCheckpoints().get(4L) == null &&
-                                       System.currentTimeMillis() < 
newTimeout);
+                               System.currentTimeMillis() < newTimeout);
 
                        // do the final check
                        assertEquals(maxConcurrentAttempts, 
coord.getNumberOfPendingCheckpoints());
@@ -1837,12 +1855,12 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
 
                        final AtomicReference<ExecutionState> currentState = 
new AtomicReference<>(ExecutionState.CREATED);
                        
when(triggerVertex.getCurrentExecutionAttempt().getState()).thenAnswer(
-                                       new Answer<ExecutionState>() {
-                                               @Override
-                                               public ExecutionState 
answer(InvocationOnMock invocation){
-                                                       return 
currentState.get();
-                                               }
-                                       });
+                               new Answer<ExecutionState>() {
+                                       @Override
+                                       public ExecutionState 
answer(InvocationOnMock invocation){
+                                               return currentState.get();
+                                       }
+                               });
 
                        CheckpointCoordinator coord = new CheckpointCoordinator(
                                jid,
@@ -1857,7 +1875,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(2),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        coord.startCheckpointScheduler();
 
@@ -1874,7 +1893,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                Thread.sleep(20);
                        }
                        while (System.currentTimeMillis() < timeout &&
-                                       coord.getNumberOfPendingCheckpoints() 
== 0);
+                               coord.getNumberOfPendingCheckpoints() == 0);
 
                        assertTrue(coord.getNumberOfPendingCheckpoints() > 0);
                }
@@ -1909,7 +1928,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        checkpointIDCounter,
                        new StandaloneCompletedCheckpointStore(2),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                List<Future<CompletedCheckpoint>> savepointFutures = new 
ArrayList<>();
 
@@ -1962,7 +1982,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(2),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                String savepointDir = tmpFolder.newFolder().getAbsolutePath();
 
@@ -2006,7 +2027,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                
allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
 
                ExecutionVertex[] arrayExecutionVertices =
-                               allExecutionVertices.toArray(new 
ExecutionVertex[allExecutionVertices.size()]);
+                       allExecutionVertices.toArray(new 
ExecutionVertex[allExecutionVertices.size()]);
 
                CompletedCheckpointStore store = new 
RecoverableCompletedCheckpointStore();
 
@@ -2024,7 +2045,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new StandaloneCheckpointIDCounter(),
                        store,
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                // trigger the checkpoint
                coord.triggerCheckpoint(timestamp, false);
@@ -2051,11 +2073,11 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                        SubtaskState subtaskState = 
mockSubtaskState(jobVertexID1, index, keyGroupPartitions1.get(index));
 
                        AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
-                                       jid,
-                                       
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-                                       checkpointId,
-                                       new CheckpointMetrics(),
-                                       subtaskState);
+                               jid,
+                               
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+                               checkpointId,
+                               new CheckpointMetrics(),
+                               subtaskState);
 
                        coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
                }
@@ -2064,11 +2086,11 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                        SubtaskState subtaskState = 
mockSubtaskState(jobVertexID2, index, keyGroupPartitions2.get(index));
 
                        AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
-                                       jid,
-                                       
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-                                       checkpointId,
-                                       new CheckpointMetrics(),
-                                       subtaskState);
+                               jid,
+                               
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+                               checkpointId,
+                               new CheckpointMetrics(),
+                               subtaskState);
 
                        coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
                }
@@ -2150,7 +2172,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                // trigger the checkpoint
                coord.triggerCheckpoint(timestamp, false);
@@ -2167,11 +2190,11 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                        KeyGroupsStateHandle keyGroupState = 
generateKeyGroupState(jobVertexID1, keyGroupPartitions1.get(index), false);
                        SubtaskState checkpointStateHandles = new 
SubtaskState(valueSizeTuple, null, null, keyGroupState, null);
                        AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
-                                       jid,
-                                       
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-                                       checkpointId,
-                                       new CheckpointMetrics(),
-                                       checkpointStateHandles);
+                               jid,
+                               
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+                               checkpointId,
+                               new CheckpointMetrics(),
+                               checkpointStateHandles);
 
                        coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
                }
@@ -2182,11 +2205,11 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                        KeyGroupsStateHandle keyGroupState = 
generateKeyGroupState(jobVertexID2, keyGroupPartitions2.get(index), false);
                        SubtaskState checkpointStateHandles = new 
SubtaskState(valueSizeTuple, null, null, keyGroupState, null);
                        AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
-                                       jid,
-                                       
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-                                       checkpointId,
-                                       new CheckpointMetrics(),
-                                       checkpointStateHandles);
+                               jid,
+                               
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+                               checkpointId,
+                               new CheckpointMetrics(),
+                               checkpointStateHandles);
 
                        coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
                }
@@ -2251,7 +2274,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                
allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
 
                ExecutionVertex[] arrayExecutionVertices =
-                               allExecutionVertices.toArray(new 
ExecutionVertex[allExecutionVertices.size()]);
+                       allExecutionVertices.toArray(new 
ExecutionVertex[allExecutionVertices.size()]);
 
                // set up the coordinator and validate the initial state
                CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -2267,7 +2290,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                // trigger the checkpoint
                coord.triggerCheckpoint(timestamp, false);
@@ -2277,22 +2301,22 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, 0L);
 
                List<KeyGroupRange> keyGroupPartitions1 =
-                               
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, 
parallelism1);
+                       
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, 
parallelism1);
                List<KeyGroupRange> keyGroupPartitions2 =
-                               
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, 
parallelism2);
+                       
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, 
parallelism2);
 
                for (int index = 0; index < jobVertex1.getParallelism(); 
index++) {
                        ChainedStateHandle<StreamStateHandle> valueSizeTuple = 
generateStateForVertex(jobVertexID1, index);
                        KeyGroupsStateHandle keyGroupState = 
generateKeyGroupState(
-                                       jobVertexID1, 
keyGroupPartitions1.get(index), false);
+                               jobVertexID1, keyGroupPartitions1.get(index), 
false);
 
                        SubtaskState checkpointStateHandles = new 
SubtaskState(valueSizeTuple, null, null, keyGroupState, null);
                        AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
-                                       jid,
-                                       
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-                                       checkpointId,
-                                       new CheckpointMetrics(),
-                                       checkpointStateHandles);
+                               jid,
+                               
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+                               checkpointId,
+                               new CheckpointMetrics(),
+                               checkpointStateHandles);
 
                        coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
                }
@@ -2302,15 +2326,15 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
 
                        ChainedStateHandle<StreamStateHandle> state = 
generateStateForVertex(jobVertexID2, index);
                        KeyGroupsStateHandle keyGroupState = 
generateKeyGroupState(
-                                       jobVertexID2, 
keyGroupPartitions2.get(index), false);
+                               jobVertexID2, keyGroupPartitions2.get(index), 
false);
 
                        SubtaskState checkpointStateHandles = new 
SubtaskState(state, null, null, keyGroupState, null);
                        AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
-                                       jid,
-                                       
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-                                       checkpointId,
-                                       new CheckpointMetrics(),
-                                       checkpointStateHandles);
+                               jid,
+                               
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+                               checkpointId,
+                               new CheckpointMetrics(),
+                               checkpointStateHandles);
 
                        coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
                }
@@ -2390,13 +2414,13 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                int newParallelism2 = scaleOut ? 13 : 2;
 
                final ExecutionJobVertex jobVertex1 = mockExecutionJobVertex(
-                               jobVertexID1,
-                               parallelism1,
-                               maxParallelism1);
+                       jobVertexID1,
+                       parallelism1,
+                       maxParallelism1);
                final ExecutionJobVertex jobVertex2 = mockExecutionJobVertex(
-                               jobVertexID2,
-                               parallelism2,
-                               maxParallelism2);
+                       jobVertexID2,
+                       parallelism2,
+                       maxParallelism2);
 
                List<ExecutionVertex> allExecutionVertices = new 
ArrayList<>(parallelism1 + parallelism2);
 
@@ -2404,7 +2428,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                
allExecutionVertices.addAll(Arrays.asList(jobVertex2.getTaskVertices()));
 
                ExecutionVertex[] arrayExecutionVertices =
-                               allExecutionVertices.toArray(new 
ExecutionVertex[allExecutionVertices.size()]);
+                       allExecutionVertices.toArray(new 
ExecutionVertex[allExecutionVertices.size()]);
 
                // set up the coordinator and validate the initial state
                CheckpointCoordinator coord = new CheckpointCoordinator(
@@ -2420,7 +2444,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                // trigger the checkpoint
                coord.triggerCheckpoint(timestamp, false);
@@ -2430,9 +2455,9 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(checkpointId, 0L);
 
                List<KeyGroupRange> keyGroupPartitions1 =
-                               
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, 
parallelism1);
+                       
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism1, 
parallelism1);
                List<KeyGroupRange> keyGroupPartitions2 =
-                               
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, 
parallelism2);
+                       
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, 
parallelism2);
 
                //vertex 1
                for (int index = 0; index < jobVertex1.getParallelism(); 
index++) {
@@ -2443,11 +2468,11 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
 
                        SubtaskState checkpointStateHandles = new 
SubtaskState(valueSizeTuple, opStateBackend, null, keyedStateBackend, 
keyedStateRaw);
                        AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
-                                       jid,
-                                       
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-                                       checkpointId,
-                                       new CheckpointMetrics(),
-                                       checkpointStateHandles);
+                               jid,
+                               
jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+                               checkpointId,
+                               new CheckpointMetrics(),
+                               checkpointStateHandles);
 
                        coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
                }
@@ -2463,14 +2488,14 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                        expectedOpStatesBackend.add(opStateBackend);
                        expectedOpStatesRaw.add(opStateRaw);
                        SubtaskState checkpointStateHandles =
-                                       new SubtaskState(new 
ChainedStateHandle<>(
-                                                       
Collections.<StreamStateHandle>singletonList(null)), opStateBackend, 
opStateRaw, keyedStateBackend, keyedStateRaw);
+                               new SubtaskState(new ChainedStateHandle<>(
+                                       
Collections.<StreamStateHandle>singletonList(null)), opStateBackend, 
opStateRaw, keyedStateBackend, keyedStateRaw);
                        AcknowledgeCheckpoint acknowledgeCheckpoint = new 
AcknowledgeCheckpoint(
-                                       jid,
-                                       
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
-                                       checkpointId,
-                                       new CheckpointMetrics(),
-                                       checkpointStateHandles);
+                               jid,
+                               
jobVertex2.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(),
+                               checkpointId,
+                               new CheckpointMetrics(),
+                               checkpointStateHandles);
 
                        coord.receiveAcknowledgeMessage(acknowledgeCheckpoint);
                }
@@ -2482,18 +2507,18 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
 
                List<KeyGroupRange> newKeyGroupPartitions2 =
-                               
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, 
newParallelism2);
+                       
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, 
newParallelism2);
 
                final ExecutionJobVertex newJobVertex1 = mockExecutionJobVertex(
-                               jobVertexID1,
-                               parallelism1,
-                               maxParallelism1);
+                       jobVertexID1,
+                       parallelism1,
+                       maxParallelism1);
 
                // rescale vertex 2
                final ExecutionJobVertex newJobVertex2 = mockExecutionJobVertex(
-                               jobVertexID2,
-                               newParallelism2,
-                               maxParallelism2);
+                       jobVertexID2,
+                       newParallelism2,
+                       maxParallelism2);
 
                tasks.put(jobVertexID1, newJobVertex1);
                tasks.put(jobVertexID2, newJobVertex2);
@@ -2534,7 +2559,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID);
                return new Tuple2<>(jobVertexID, operatorID);
        }
-       
+
        /**
         * old topology
         * [operator1,operator2] * parallelism1 -> [operator3,operator4] * 
parallelism2
@@ -2575,7 +2600,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        OperatorState taskState = new OperatorState(id.f1, 
parallelism1, maxParallelism1);
                        operatorStates.put(id.f1, taskState);
                        for (int index = 0; index < taskState.getParallelism(); 
index++) {
-                               StreamStateHandle subNonPartitionedState = 
+                               StreamStateHandle subNonPartitionedState =
                                        generateStateForVertex(id.f0, index)
                                                .get(0);
                                OperatorStateHandle subManagedOperatorState =
@@ -2673,15 +2698,15 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                        spy(new StandaloneCompletedCheckpointStore(1));
 
                CompletedCheckpoint completedCheckpoint = new 
CompletedCheckpoint(
-                               jobID,
-                               2,
-                               System.currentTimeMillis(),
-                               System.currentTimeMillis() + 3000,
-                               operatorStates,
-                               Collections.<MasterState>emptyList(),
-                               CheckpointProperties.forStandardCheckpoint(),
-                               null,
-                               null);
+                       jobID,
+                       2,
+                       System.currentTimeMillis(),
+                       System.currentTimeMillis() + 3000,
+                       operatorStates,
+                       Collections.<MasterState>emptyList(),
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       null);
 
                
when(standaloneCompletedCheckpointStore.getLatestCheckpoint()).thenReturn(completedCheckpoint);
 
@@ -2699,7 +2724,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new StandaloneCheckpointIDCounter(),
                        standaloneCompletedCheckpointStore,
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                coord.restoreLatestCheckpointedState(tasks, false, true);
 
@@ -2832,7 +2858,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(1),
                                "fake-directory",
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        assertTrue(coord.triggerCheckpoint(timestamp, false));
 
@@ -2862,14 +2889,14 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
 
                OperatorStateRepartitioner repartitioner = 
RoundRobinOperatorStateRepartitioner.INSTANCE;
                List<Collection<OperatorStateHandle>> repartitionedStates =
-                               
repartitioner.repartitionState(Collections.singletonList(osh), 3);
+                       
repartitioner.repartitionState(Collections.singletonList(osh), 3);
 
                Map<String, Integer> checkCounts = new HashMap<>(3);
 
                for (Collection<OperatorStateHandle> operatorStateHandles : 
repartitionedStates) {
                        for (OperatorStateHandle operatorStateHandle : 
operatorStateHandles) {
                                for (Map.Entry<String, 
OperatorStateHandle.StateMetaInfo> stateNameToMetaInfo :
-                                               
operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) {
+                                       
operatorStateHandle.getStateNameToPartitionOffsets().entrySet()) {
 
                                        String stateName = 
stateNameToMetaInfo.getKey();
                                        Integer count = 
checkCounts.get(stateName);
@@ -2900,8 +2927,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
        // 
------------------------------------------------------------------------
 
        public static KeyGroupsStateHandle generateKeyGroupState(
-                       JobVertexID jobVertexID,
-                       KeyGroupRange keyGroupPartition, boolean rawState) 
throws IOException {
+               JobVertexID jobVertexID,
+               KeyGroupRange keyGroupPartition, boolean rawState) throws 
IOException {
 
                List<Integer> testStatesLists = new 
ArrayList<>(keyGroupPartition.getNumberOfKeyGroups());
 
@@ -2918,27 +2945,27 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
        }
 
        public static KeyGroupsStateHandle generateKeyGroupState(
-                       KeyGroupRange keyGroupRange,
-                       List<? extends Serializable> states) throws IOException 
{
+               KeyGroupRange keyGroupRange,
+               List<? extends Serializable> states) throws IOException {
 
                
Preconditions.checkArgument(keyGroupRange.getNumberOfKeyGroups() == 
states.size());
 
                Tuple2<byte[], List<long[]>> serializedDataWithOffsets =
-                               
serializeTogetherAndTrackOffsets(Collections.<List<? extends 
Serializable>>singletonList(states));
+                       serializeTogetherAndTrackOffsets(Collections.<List<? 
extends Serializable>>singletonList(states));
 
                KeyGroupRangeOffsets keyGroupRangeOffsets = new 
KeyGroupRangeOffsets(keyGroupRange, serializedDataWithOffsets.f1.get(0));
 
                ByteStreamStateHandle allSerializedStatesHandle = new 
TestByteStreamStateHandleDeepCompare(
-                               String.valueOf(UUID.randomUUID()),
-                               serializedDataWithOffsets.f0);
+                       String.valueOf(UUID.randomUUID()),
+                       serializedDataWithOffsets.f0);
                KeyGroupsStateHandle keyGroupsStateHandle = new 
KeyGroupsStateHandle(
-                               keyGroupRangeOffsets,
-                               allSerializedStatesHandle);
+                       keyGroupRangeOffsets,
+                       allSerializedStatesHandle);
                return keyGroupsStateHandle;
        }
 
        public static Tuple2<byte[], List<long[]>> 
serializeTogetherAndTrackOffsets(
-                       List<List<? extends Serializable>> serializables) 
throws IOException {
+               List<List<? extends Serializable>> serializables) throws 
IOException {
 
                List<long[]> offsets = new ArrayList<>(serializables.size());
                List<byte[]> serializedGroupValues = new ArrayList<>();
@@ -2962,19 +2989,19 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                runningGroupsOffset = 0;
                for (byte[] serializedGroupValue : serializedGroupValues) {
                        System.arraycopy(
-                                       serializedGroupValue,
-                                       0,
-                                       allSerializedValuesConcatenated,
-                                       runningGroupsOffset,
-                                       serializedGroupValue.length);
+                               serializedGroupValue,
+                               0,
+                               allSerializedValuesConcatenated,
+                               runningGroupsOffset,
+                               serializedGroupValue.length);
                        runningGroupsOffset += serializedGroupValue.length;
                }
                return new Tuple2<>(allSerializedValuesConcatenated, offsets);
        }
 
        public static ChainedStateHandle<StreamStateHandle> 
generateStateForVertex(
-                       JobVertexID jobVertexID,
-                       int index) throws IOException {
+               JobVertexID jobVertexID,
+               int index) throws IOException {
 
                Random random = new Random(jobVertexID.hashCode() + index);
                int value = random.nextInt();
@@ -2982,17 +3009,17 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
        }
 
        public static ChainedStateHandle<StreamStateHandle> 
generateChainedStateHandle(
-                       Serializable value) throws IOException {
+               Serializable value) throws IOException {
                return ChainedStateHandle.wrapSingleHandle(
-                               
TestByteStreamStateHandleDeepCompare.fromSerializable(String.valueOf(UUID.randomUUID()),
 value));
+                       
TestByteStreamStateHandleDeepCompare.fromSerializable(String.valueOf(UUID.randomUUID()),
 value));
        }
 
        public static ChainedStateHandle<OperatorStateHandle> 
generateChainedPartitionableStateHandle(
-                       JobVertexID jobVertexID,
-                       int index,
-                       int namedStates,
-                       int partitionsPerState,
-                       boolean rawState) throws IOException {
+               JobVertexID jobVertexID,
+               int index,
+               int namedStates,
+               int partitionsPerState,
+               boolean rawState) throws IOException {
 
                Map<String, List<? extends Serializable>> statesListsMap = new 
HashMap<>(namedStates);
 
@@ -3015,7 +3042,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
        }
 
        private static ChainedStateHandle<OperatorStateHandle> 
generateChainedPartitionableStateHandle(
-                       Map<String, List<? extends Serializable>> states) 
throws IOException {
+               Map<String, List<? extends Serializable>> states) throws 
IOException {
 
                List<List<? extends Serializable>> namedStateSerializables = 
new ArrayList<>(states.size());
 
@@ -3030,26 +3057,26 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                int idx = 0;
                for (Map.Entry<String, List<? extends Serializable>> entry : 
states.entrySet()) {
                        offsetsMap.put(
-                                       entry.getKey(),
-                                       new OperatorStateHandle.StateMetaInfo(
-                                                       
serializationWithOffsets.f1.get(idx),
-                                                       
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
+                               entry.getKey(),
+                               new OperatorStateHandle.StateMetaInfo(
+                                       serializationWithOffsets.f1.get(idx),
+                                       
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE));
                        ++idx;
                }
 
                ByteStreamStateHandle streamStateHandle = new 
TestByteStreamStateHandleDeepCompare(
-                               String.valueOf(UUID.randomUUID()),
-                               serializationWithOffsets.f0);
+                       String.valueOf(UUID.randomUUID()),
+                       serializationWithOffsets.f0);
 
                OperatorStateHandle operatorStateHandle =
-                               new OperatorStateHandle(offsetsMap, 
streamStateHandle);
+                       new OperatorStateHandle(offsetsMap, streamStateHandle);
                return ChainedStateHandle.wrapSingleHandle(operatorStateHandle);
        }
 
        static ExecutionJobVertex mockExecutionJobVertex(
-                       JobVertexID jobVertexID,
-                       int parallelism,
-                       int maxParallelism) {
+               JobVertexID jobVertexID,
+               int parallelism,
+               int maxParallelism) {
 
                return mockExecutionJobVertex(
                        jobVertexID,
@@ -3131,7 +3158,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
 
                ExecutionJobVertex jobVertex = mock(ExecutionJobVertex.class);
                when(jobVertex.getOperatorIDs()).thenReturn(jobVertexIDs);
-               
+
                when(vertex.getJobVertex()).thenReturn(jobVertex);
 
                return vertex;
@@ -3158,8 +3185,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
        }
 
        public static void verifyStateRestore(
-                       JobVertexID jobVertexID, ExecutionJobVertex 
executionJobVertex,
-                       List<KeyGroupRange> keyGroupPartitions) throws 
Exception {
+               JobVertexID jobVertexID, ExecutionJobVertex executionJobVertex,
+               List<KeyGroupRange> keyGroupPartitions) throws Exception {
 
                for (int i = 0; i < executionJobVertex.getParallelism(); i++) {
 
@@ -3168,28 +3195,28 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                        ChainedStateHandle<StreamStateHandle> 
expectNonPartitionedState = generateStateForVertex(jobVertexID, i);
                        ChainedStateHandle<StreamStateHandle> 
actualNonPartitionedState = taskStateHandles.getLegacyOperatorState();
                        assertTrue(CommonTestUtils.isSteamContentEqual(
-                                       
expectNonPartitionedState.get(0).openInputStream(),
-                                       
actualNonPartitionedState.get(0).openInputStream()));
+                               
expectNonPartitionedState.get(0).openInputStream(),
+                               
actualNonPartitionedState.get(0).openInputStream()));
 
                        ChainedStateHandle<OperatorStateHandle> 
expectedOpStateBackend =
-                                       
generateChainedPartitionableStateHandle(jobVertexID, i, 2, 8, false);
+                               
generateChainedPartitionableStateHandle(jobVertexID, i, 2, 8, false);
 
                        List<Collection<OperatorStateHandle>> 
actualPartitionableState = taskStateHandles.getManagedOperatorState();
 
                        assertTrue(CommonTestUtils.isSteamContentEqual(
-                                       
expectedOpStateBackend.get(0).openInputStream(),
-                                       
actualPartitionableState.get(0).iterator().next().openInputStream()));
+                               expectedOpStateBackend.get(0).openInputStream(),
+                               
actualPartitionableState.get(0).iterator().next().openInputStream()));
 
                        KeyGroupsStateHandle expectPartitionedKeyGroupState = 
generateKeyGroupState(
-                                       jobVertexID, keyGroupPartitions.get(i), 
false);
+                               jobVertexID, keyGroupPartitions.get(i), false);
                        Collection<KeyedStateHandle> 
actualPartitionedKeyGroupState = taskStateHandles.getManagedKeyedState();
                        
compareKeyedState(Collections.singletonList(expectPartitionedKeyGroupState), 
actualPartitionedKeyGroupState);
                }
        }
 
        public static void compareKeyedState(
-                       Collection<KeyGroupsStateHandle> 
expectPartitionedKeyGroupState,
-                       Collection<? extends KeyedStateHandle> 
actualPartitionedKeyGroupState) throws Exception {
+               Collection<KeyGroupsStateHandle> expectPartitionedKeyGroupState,
+               Collection<? extends KeyedStateHandle> 
actualPartitionedKeyGroupState) throws Exception {
 
                KeyGroupsStateHandle expectedHeadOpKeyGroupStateHandle = 
expectPartitionedKeyGroupState.iterator().next();
                int expectedTotalKeyGroups = 
expectedHeadOpKeyGroupStateHandle.getKeyGroupRange().getNumberOfKeyGroups();
@@ -3207,7 +3234,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                long offset = 
expectedHeadOpKeyGroupStateHandle.getOffsetForKeyGroup(groupId);
                                inputStream.seek(offset);
                                int expectedKeyGroupState =
-                                               
InstantiationUtil.deserializeObject(inputStream, 
Thread.currentThread().getContextClassLoader());
+                                       
InstantiationUtil.deserializeObject(inputStream, 
Thread.currentThread().getContextClassLoader());
                                for (KeyedStateHandle oneActualKeyedStateHandle 
: actualPartitionedKeyGroupState) {
 
                                        assertTrue(oneActualKeyedStateHandle 
instanceof KeyGroupsStateHandle);
@@ -3218,7 +3245,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                                try (FSDataInputStream 
actualInputStream = oneActualKeyGroupStateHandle.openInputStream()) {
                                                        
actualInputStream.seek(actualOffset);
                                                        int actualGroupState = 
InstantiationUtil.
-                                                                       
deserializeObject(actualInputStream, 
Thread.currentThread().getContextClassLoader());
+                                                               
deserializeObject(actualInputStream, 
Thread.currentThread().getContextClassLoader());
                                                        
assertEquals(expectedKeyGroupState, actualGroupState);
                                                }
                                        }
@@ -3228,8 +3255,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
        }
 
        public static void comparePartitionableState(
-                       List<ChainedStateHandle<OperatorStateHandle>> expected,
-                       List<List<Collection<OperatorStateHandle>>> actual) 
throws Exception {
+               List<ChainedStateHandle<OperatorStateHandle>> expected,
+               List<List<Collection<OperatorStateHandle>>> actual) throws 
Exception {
 
                List<String> expectedResult = new ArrayList<>();
                for (ChainedStateHandle<OperatorStateHandle> chainedStateHandle 
: expected) {
@@ -3263,7 +3290,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                                for (long offset : 
entry.getValue().getOffsets()) {
                                        in.seek(offset);
                                        Integer state = InstantiationUtil.
-                                                       deserializeObject(in, 
Thread.currentThread().getContextClassLoader());
+                                               deserializeObject(in, 
Thread.currentThread().getContextClassLoader());
                                        resultCollector.add(opIdx + " : " + 
entry.getKey() + " : " + state);
                                }
                        }
@@ -3308,24 +3335,25 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                // Periodic
                CheckpointTriggerResult triggerResult = coord.triggerCheckpoint(
-                               System.currentTimeMillis(),
-                               CheckpointProperties.forStandardCheckpoint(),
-                               null,
-                               true);
+                       System.currentTimeMillis(),
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       true);
 
                assertTrue(triggerResult.isFailure());
                
assertEquals(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN, 
triggerResult.getFailureReason());
 
                // Not periodic
                triggerResult = coord.triggerCheckpoint(
-                               System.currentTimeMillis(),
-                               CheckpointProperties.forStandardCheckpoint(),
-                               null,
-                               false);
+                       System.currentTimeMillis(),
+                       CheckpointProperties.forStandardCheckpoint(),
+                       null,
+                       false);
 
                assertFalse(triggerResult.isFailure());
        }
@@ -3352,12 +3380,12 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                        int maxPartitionsPerState = 1 + r.nextInt(9);
 
                        doTestPartitionableStateRepartitioning(
-                                       r, oldParallelism, newParallelism, 
numNamedStates, maxPartitionsPerState);
+                               r, oldParallelism, newParallelism, 
numNamedStates, maxPartitionsPerState);
                }
        }
 
        private void doTestPartitionableStateRepartitioning(
-                       Random r, int oldParallelism, int newParallelism, int 
numNamedStates, int maxPartitionsPerState) {
+               Random r, int oldParallelism, int newParallelism, int 
numNamedStates, int maxPartitionsPerState) {
 
                List<OperatorStateHandle> previousParallelOpInstanceStates = 
new ArrayList<>(oldParallelism);
 
@@ -3374,15 +3402,15 @@ public class CheckpointCoordinatorTest extends 
TestLogger {
                                }
 
                                OperatorStateHandle.Mode mode = r.nextInt(10) 
== 0 ?
-                                               
OperatorStateHandle.Mode.BROADCAST : OperatorStateHandle.Mode.SPLIT_DISTRIBUTE;
+                                       OperatorStateHandle.Mode.BROADCAST : 
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE;
                                namedStatesToOffsets.put(
-                                               "State-" + s,
-                                               new 
OperatorStateHandle.StateMetaInfo(offs, mode));
+                                       "State-" + s,
+                                       new 
OperatorStateHandle.StateMetaInfo(offs, mode));
 
                        }
 
                        previousParallelOpInstanceStates.add(
-                                       new 
OperatorStateHandle(namedStatesToOffsets, new FileStateHandle(fakePath, -1)));
+                               new OperatorStateHandle(namedStatesToOffsets, 
new FileStateHandle(fakePath, -1)));
                }
 
                Map<StreamStateHandle, Map<String, List<Long>>> expected = new 
HashMap<>();
@@ -3395,7 +3423,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
 
                                long[] offs = e.getValue().getOffsets();
                                int replication = 
e.getValue().getDistributionMode().equals(OperatorStateHandle.Mode.BROADCAST) ?
-                                               newParallelism : 1;
+                                       newParallelism : 1;
 
                                expectedTotalPartitions += replication * 
offs.length;
                                List<Long> offsList = new 
ArrayList<>(offs.length);
@@ -3413,7 +3441,7 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                OperatorStateRepartitioner repartitioner = 
RoundRobinOperatorStateRepartitioner.INSTANCE;
 
                List<Collection<OperatorStateHandle>> pshs =
-                               
repartitioner.repartitionState(previousParallelOpInstanceStates, 
newParallelism);
+                       
repartitioner.repartitionState(previousParallelOpInstanceStates, 
newParallelism);
 
                Map<StreamStateHandle, Map<String, List<Long>>> actual = new 
HashMap<>();
 
@@ -3486,7 +3514,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                CheckpointStatsTracker tracker = 
mock(CheckpointStatsTracker.class);
                coord.setCheckpointStatsTracker(tracker);
@@ -3524,7 +3553,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        new StandaloneCheckpointIDCounter(),
                        store,
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                store.addCheckpoint(new CompletedCheckpoint(
                        new JobID(),
@@ -3580,7 +3610,8 @@ public class CheckpointCoordinatorTest extends TestLogger 
{
                        checkpointIDCounter,
                        completedCheckpointStore,
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                // trigger a first checkpoint
                assertTrue(

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 7d24568..0888cff 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.util.SerializableObject;
@@ -109,7 +110,8 @@ public class CheckpointStateRestoreTest {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(1),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        // create ourselves a checkpoint with state
                        final long timestamp = 34623786L;
@@ -183,7 +185,8 @@ public class CheckpointStateRestoreTest {
                                new StandaloneCheckpointIDCounter(),
                                new StandaloneCompletedCheckpointStore(1),
                                null,
-                               Executors.directExecutor());
+                               Executors.directExecutor(),
+                               SharedStateRegistry.DEFAULT_FACTORY);
 
                        try {
                                coord.restoreLatestCheckpointedState(new 
HashMap<JobVertexID, ExecutionJobVertex>(), true, false);
@@ -240,7 +243,8 @@ public class CheckpointStateRestoreTest {
                        new StandaloneCheckpointIDCounter(),
                        new StandaloneCompletedCheckpointStore(1),
                        null,
-                       Executors.directExecutor());
+                       Executors.directExecutor(),
+                       SharedStateRegistry.DEFAULT_FACTORY);
 
                StreamStateHandle serializedState = CheckpointCoordinatorTest
                                .generateChainedStateHandle(new 
SerializableObject())

Reply via email to