[FLINK-7213] Introduce state management by OperatorID in TaskManager

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b71154a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b71154a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b71154a7

Branch: refs/heads/master
Commit: b71154a734ea9f4489dffe1be6761efbb90cff41
Parents: 3b0321a
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Mon Jun 26 18:07:59 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Tue Aug 15 14:56:54 2017 +0200

----------------------------------------------------------------------
 .../state/RocksDBAsyncSnapshotTest.java         |  21 +-
 .../checkpoint/CheckpointCoordinator.java       |   7 +-
 .../CheckpointCoordinatorGateway.java           |   2 +-
 .../flink/runtime/checkpoint/OperatorState.java |   4 +-
 .../checkpoint/OperatorSubtaskState.java        | 224 ++++++---
 .../runtime/checkpoint/PendingCheckpoint.java   |  54 +-
 .../RoundRobinOperatorStateRepartitioner.java   |   4 +
 .../checkpoint/StateAssignmentOperation.java    | 177 ++++---
 .../runtime/checkpoint/TaskStateSnapshot.java   | 139 ++++++
 .../savepoint/SavepointV2Serializer.java        |  20 +-
 .../deployment/TaskDeploymentDescriptor.java    |   8 +-
 .../flink/runtime/execution/Environment.java    |   9 +-
 .../flink/runtime/executiongraph/Execution.java |   8 +-
 .../runtime/executiongraph/ExecutionVertex.java |   8 +-
 .../runtime/jobgraph/tasks/StatefulTask.java    |   8 +-
 .../flink/runtime/jobmaster/JobMaster.java      |   5 +-
 .../checkpoint/AcknowledgeCheckpoint.java       |   8 +-
 .../state/StateInitializationContextImpl.java   |  11 +-
 .../flink/runtime/state/TaskStateHandles.java   | 172 -------
 .../rpc/RpcCheckpointResponder.java             |   4 +-
 .../ActorGatewayCheckpointResponder.java        |   4 +-
 .../taskmanager/CheckpointResponder.java        |   4 +-
 .../runtime/taskmanager/RuntimeEnvironment.java |   4 +-
 .../apache/flink/runtime/taskmanager/Task.java  |   8 +-
 .../CheckpointCoordinatorFailureTest.java       |  49 +-
 .../checkpoint/CheckpointCoordinatorTest.java   | 498 ++++++++++---------
 .../checkpoint/CheckpointStateRestoreTest.java  |  49 +-
 .../CompletedCheckpointStoreTest.java           |   2 +-
 .../checkpoint/PendingCheckpointTest.java       |   2 +-
 .../TaskDeploymentDescriptorTest.java           |   4 +-
 .../ExecutionVertexLocalityTest.java            |  10 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |  60 +--
 .../messages/CheckpointMessagesTest.java        |  23 +-
 .../operators/testutils/DummyEnvironment.java   |   4 +-
 .../operators/testutils/MockEnvironment.java    |   6 +-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   6 +-
 .../flink/runtime/taskmanager/TaskStopTest.java |  26 +-
 .../runtime/util/JvmExitOnFatalErrorTest.java   |   7 +-
 .../flink/streaming/api/graph/StreamConfig.java |  13 +-
 .../api/graph/StreamingJobGraphGenerator.java   |  11 +-
 .../api/operators/AbstractStreamOperator.java   |  19 +-
 .../streaming/api/operators/StreamOperator.java |   6 +-
 .../runtime/tasks/OperatorStateHandles.java     |  19 -
 .../streaming/runtime/tasks/StreamTask.java     | 196 +++-----
 .../AbstractUdfStreamOperatorLifecycleTest.java |   5 +-
 .../operators/async/AsyncWaitOperatorTest.java  |  16 +-
 .../streaming/runtime/io/BarrierBufferTest.java |   4 +-
 .../runtime/io/BarrierTrackerTest.java          |   4 +-
 .../runtime/operators/StreamTaskTimerTest.java  |   2 +
 .../TestProcessingTimeServiceTest.java          |   2 +
 .../runtime/tasks/BlockingCheckpointsTest.java  |   2 +
 .../tasks/InterruptSensitiveRestoreTest.java    |  55 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   |  34 +-
 .../SourceExternalCheckpointTriggerTest.java    |   2 +
 .../runtime/tasks/SourceStreamTaskTest.java     |   3 +
 .../runtime/tasks/StreamMockEnvironment.java    |   4 +-
 .../StreamTaskCancellationBarrierTest.java      |   3 +
 .../tasks/StreamTaskTerminationTest.java        |   2 +
 .../streaming/runtime/tasks/StreamTaskTest.java |  79 ++-
 .../runtime/tasks/StreamTaskTestHarness.java    |   2 +
 .../runtime/tasks/TwoInputStreamTaskTest.java   |   5 +
 .../util/AbstractStreamOperatorTestHarness.java |  22 +-
 .../test/checkpointing/SavepointITCase.java     |   2 +-
 63 files changed, 1185 insertions(+), 986 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
----------------------------------------------------------------------
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index d2edf0e..c752e53 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -32,8 +32,10 @@ import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
@@ -74,6 +76,7 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.util.Arrays;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
@@ -81,7 +84,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
@@ -137,6 +140,7 @@ public class RocksDBAsyncSnapshotTest {
                streamConfig.setStateBackend(backend);
 
                streamConfig.setStreamOperator(new AsyncCheckpointOperator());
+               streamConfig.setOperatorID(new OperatorID());
 
                final OneShotLatch delayCheckpointLatch = new OneShotLatch();
                final OneShotLatch ensureCheckpointLatch = new OneShotLatch();
@@ -152,7 +156,7 @@ public class RocksDBAsyncSnapshotTest {
                        public void acknowledgeCheckpoint(
                                        long checkpointId,
                                        CheckpointMetrics checkpointMetrics,
-                                       SubtaskState checkpointStateHandles) {
+                                       TaskStateSnapshot 
checkpointStateHandles) {
 
                                super.acknowledgeCheckpoint(checkpointId, 
checkpointMetrics);
 
@@ -164,8 +168,16 @@ public class RocksDBAsyncSnapshotTest {
                                        throw new RuntimeException(e);
                                }
 
+                               boolean hasManagedKeyedState = false;
+                               for (Map.Entry<OperatorID, 
OperatorSubtaskState> entry : checkpointStateHandles.getSubtaskStateMappings()) 
{
+                                       OperatorSubtaskState state = 
entry.getValue();
+                                       if (state != null) {
+                                               hasManagedKeyedState |= 
state.getManagedKeyedState() != null;
+                                       }
+                               }
+
                                // should be one k/v state
-                               
assertNotNull(checkpointStateHandles.getManagedKeyedState());
+                               assertTrue(hasManagedKeyedState);
 
                                // we now know that the checkpoint went through
                                ensureCheckpointLatch.trigger();
@@ -241,6 +253,7 @@ public class RocksDBAsyncSnapshotTest {
                streamConfig.setStateBackend(backend);
 
                streamConfig.setStreamOperator(new AsyncCheckpointOperator());
+               streamConfig.setOperatorID(new OperatorID());
 
                StreamMockEnvironment mockEnv = new StreamMockEnvironment(
                                testHarness.jobConfig,

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 6f41867..0b64a73 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -40,7 +40,6 @@ import 
org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
 import org.apache.flink.runtime.state.SharedStateRegistry;
-import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.StringUtils;
@@ -1016,7 +1015,7 @@ public class CheckpointCoordinator {
         * Restores the latest checkpointed state.
         *
         * @param tasks Map of job vertices to restore. State for these 
vertices is
-        * restored via {@link Execution#setInitialState(TaskStateHandles)}.
+        * restored via {@link Execution#setInitialState(TaskStateSnapshot)}.
         * @param errorIfNoCheckpoint Fail if no completed checkpoint is 
available to
         * restore from.
         * @param allowNonRestoredState Allow checkpoint state that cannot be 
mapped
@@ -1102,7 +1101,7 @@ public class CheckpointCoordinator {
         *                         mapped to any job vertex in tasks.
         * @param tasks            Map of job vertices to restore. State for 
these 
         *                         vertices is restored via 
-        *                         {@link 
Execution#setInitialState(TaskStateHandles)}.
+        *                         {@link 
Execution#setInitialState(TaskStateSnapshot)}.
         * @param userClassLoader  The class loader to resolve serialized 
classes in 
         *                         legacy savepoint versions. 
         */
@@ -1256,7 +1255,7 @@ public class CheckpointCoordinator {
                        final JobID jobId,
                        final ExecutionAttemptID executionAttemptID,
                        final long checkpointId,
-                       final SubtaskState subtaskState) {
+                       final TaskStateSnapshot subtaskState) {
 
                if (subtaskState != null) {
                        executor.execute(new Runnable() {

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
index 43d66ee..22244f6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorGateway.java
@@ -29,7 +29,7 @@ public interface CheckpointCoordinatorGateway extends 
RpcGateway {
                        final ExecutionAttemptID executionAttemptID,
                        final long checkpointId,
                        final CheckpointMetrics checkpointMetrics,
-                       final SubtaskState subtaskState);
+                       final TaskStateSnapshot subtaskState);
 
        void declineCheckpoint(
                        JobID jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
index b153028..145ff6a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
@@ -30,8 +30,8 @@ import java.util.Map;
 import java.util.Objects;
 
 /**
- * Simple container class which contains the raw/managed/legacy operator state 
and key-group state handles for the sub
- * tasks of an operator.
+ * Simple container class which contains the raw/managed/legacy operator state 
and key-group state handles from all sub
+ * tasks of an operator and therefore represents the complete state of a 
logical operator.
  */
 public class OperatorState implements CompositeStateHandle {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
index e2ae632..296b5ab 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.state.CompositeStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
@@ -25,13 +26,35 @@ import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 
 /**
- * Container for the state of one parallel subtask of an operator. This is 
part of the {@link OperatorState}.
+ * This class encapsulates the state for one parallel instance of an operator. 
The complete state of a (logical)
+ * operator (e.g. a flatmap operator) consists of the union of all {@link 
OperatorSubtaskState}s from all
+ * parallel tasks that physically execute parallelized, physical instances of 
the operator.
+ *
+ * <p>The full state of the logical operator is represented by {@link 
OperatorState} which consists of
+ * {@link OperatorSubtaskState}s.
+ *
+ * <p>Typically, we expect all collections in this class to be of size 0 or 1, 
because there is up to one state handle
+ * produced per state type (e.g. managed-keyed, raw-operator, ...). In 
particular, this holds when taking a snapshot.
+ * The purpose of having the state handles in collections is that this class 
is also reused in restoring state.
+ * Under normal circumstances, the expected size of each collection is still 0 
or 1, except for scale-down. In
+ * scale-down, one operator subtask can become responsible for the state of 
multiple previous subtasks. The collections
+ * can then store all the state handles that are relevant to build up the new 
subtask state.
+ *
+ * <p>There is no collection for legacy state because it is not rescalable.
  */
 public class OperatorSubtaskState implements CompositeStateHandle {
 
@@ -46,27 +69,32 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
         * Can be removed when we remove the APIs for non-repartitionable 
operator state.
         */
        @Deprecated
+       @Nullable
        private final StreamStateHandle legacyOperatorState;
 
        /**
         * Snapshot from the {@link 
org.apache.flink.runtime.state.OperatorStateBackend}.
         */
-       private final OperatorStateHandle managedOperatorState;
+       @Nonnull
+       private final Collection<OperatorStateHandle> managedOperatorState;
 
        /**
         * Snapshot written using {@link 
org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream}.
         */
-       private final OperatorStateHandle rawOperatorState;
+       @Nonnull
+       private final Collection<OperatorStateHandle> rawOperatorState;
 
        /**
         * Snapshot from {@link 
org.apache.flink.runtime.state.KeyedStateBackend}.
         */
-       private final KeyedStateHandle managedKeyedState;
+       @Nonnull
+       private final Collection<KeyedStateHandle> managedKeyedState;
 
        /**
         * Snapshot written using {@link 
org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream}.
         */
-       private final KeyedStateHandle rawKeyedState;
+       @Nonnull
+       private final Collection<KeyedStateHandle> rawKeyedState;
 
        /**
         * The state size. This is also part of the deserialized state handle.
@@ -75,31 +103,79 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
         */
        private final long stateSize;
 
+       @VisibleForTesting
+       public OperatorSubtaskState(StreamStateHandle legacyOperatorState) {
+
+               this(legacyOperatorState,
+                       Collections.<OperatorStateHandle>emptyList(),
+                       Collections.<OperatorStateHandle>emptyList(),
+                       Collections.<KeyedStateHandle>emptyList(),
+                       Collections.<KeyedStateHandle>emptyList());
+       }
+
+       /**
+        * Empty state.
+        */
+       public OperatorSubtaskState() {
+               this(null);
+       }
+
        public OperatorSubtaskState(
                StreamStateHandle legacyOperatorState,
-               OperatorStateHandle managedOperatorState,
-               OperatorStateHandle rawOperatorState,
-               KeyedStateHandle managedKeyedState,
-               KeyedStateHandle rawKeyedState) {
+               Collection<OperatorStateHandle> managedOperatorState,
+               Collection<OperatorStateHandle> rawOperatorState,
+               Collection<KeyedStateHandle> managedKeyedState,
+               Collection<KeyedStateHandle> rawKeyedState) {
 
                this.legacyOperatorState = legacyOperatorState;
-               this.managedOperatorState = managedOperatorState;
-               this.rawOperatorState = rawOperatorState;
-               this.managedKeyedState = managedKeyedState;
-               this.rawKeyedState = rawKeyedState;
+               this.managedOperatorState = 
Preconditions.checkNotNull(managedOperatorState);
+               this.rawOperatorState = 
Preconditions.checkNotNull(rawOperatorState);
+               this.managedKeyedState = 
Preconditions.checkNotNull(managedKeyedState);
+               this.rawKeyedState = Preconditions.checkNotNull(rawKeyedState);
 
                try {
                        long calculateStateSize = 
getSizeNullSafe(legacyOperatorState);
-                       calculateStateSize += 
getSizeNullSafe(managedOperatorState);
-                       calculateStateSize += getSizeNullSafe(rawOperatorState);
-                       calculateStateSize += 
getSizeNullSafe(managedKeyedState);
-                       calculateStateSize += getSizeNullSafe(rawKeyedState);
+                       calculateStateSize += sumAllSizes(managedOperatorState);
+                       calculateStateSize += sumAllSizes(rawOperatorState);
+                       calculateStateSize += sumAllSizes(managedKeyedState);
+                       calculateStateSize += sumAllSizes(rawKeyedState);
                        stateSize = calculateStateSize;
                } catch (Exception e) {
                        throw new RuntimeException("Failed to get state size.", 
e);
                }
        }
 
+       /**
+        * For convenience because the size of the collections is typically 0 
or 1. Null values are translated into empty
+        * Collections (except for legacy state).
+        */
+       public OperatorSubtaskState(
+               StreamStateHandle legacyOperatorState,
+               OperatorStateHandle managedOperatorState,
+               OperatorStateHandle rawOperatorState,
+               KeyedStateHandle managedKeyedState,
+               KeyedStateHandle rawKeyedState) {
+
+               this(legacyOperatorState,
+                       singletonOrEmptyOnNull(managedOperatorState),
+                       singletonOrEmptyOnNull(rawOperatorState),
+                       singletonOrEmptyOnNull(managedKeyedState),
+                       singletonOrEmptyOnNull(rawKeyedState));
+       }
+
+       private static <T> Collection<T> singletonOrEmptyOnNull(T element) {
+               return element != null ? Collections.singletonList(element) : 
Collections.<T>emptyList();
+       }
+
+       private static long sumAllSizes(Collection<? extends StateObject> 
stateObject) throws Exception {
+               long size = 0L;
+               for (StateObject object : stateObject) {
+                       size += getSizeNullSafe(object);
+               }
+
+               return size;
+       }
+
        private static long getSizeNullSafe(StateObject stateObject) throws 
Exception {
                return stateObject != null ? stateObject.getStateSize() : 0L;
        }
@@ -111,36 +187,58 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
         * Can be removed when we remove the APIs for non-repartitionable 
operator state.
         */
        @Deprecated
+       @Nullable
        public StreamStateHandle getLegacyOperatorState() {
                return legacyOperatorState;
        }
 
-       public OperatorStateHandle getManagedOperatorState() {
+       /**
+        * Returns a handle to the managed operator state.
+        */
+       @Nonnull
+       public Collection<OperatorStateHandle> getManagedOperatorState() {
                return managedOperatorState;
        }
 
-       public OperatorStateHandle getRawOperatorState() {
+       /**
+        * Returns a handle to the raw operator state.
+        */
+       @Nonnull
+       public Collection<OperatorStateHandle> getRawOperatorState() {
                return rawOperatorState;
        }
 
-       public KeyedStateHandle getManagedKeyedState() {
+       /**
+        * Returns a handle to the managed keyed state.
+        */
+       @Nonnull
+       public Collection<KeyedStateHandle> getManagedKeyedState() {
                return managedKeyedState;
        }
 
-       public KeyedStateHandle getRawKeyedState() {
+       /**
+        * Returns a handle to the raw keyed state.
+        */
+       @Nonnull
+       public Collection<KeyedStateHandle> getRawKeyedState() {
                return rawKeyedState;
        }
 
        @Override
        public void discardState() {
                try {
-                       StateUtil.bestEffortDiscardAllStateObjects(
-                               Arrays.asList(
-                                       legacyOperatorState,
-                                       managedOperatorState,
-                                       rawOperatorState,
-                                       managedKeyedState,
-                                       rawKeyedState));
+                       List<StateObject> toDispose =
+                               new ArrayList<>(1 +
+                                       managedOperatorState.size() +
+                                       rawOperatorState.size() +
+                                       managedKeyedState.size() +
+                                       rawKeyedState.size());
+                       toDispose.add(legacyOperatorState);
+                       toDispose.addAll(managedOperatorState);
+                       toDispose.addAll(rawOperatorState);
+                       toDispose.addAll(managedKeyedState);
+                       toDispose.addAll(rawKeyedState);
+                       StateUtil.bestEffortDiscardAllStateObjects(toDispose);
                } catch (Exception e) {
                        LOG.warn("Error while discarding operator states.", e);
                }
@@ -148,12 +246,17 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
 
        @Override
        public void registerSharedStates(SharedStateRegistry 
sharedStateRegistry) {
-               if (managedKeyedState != null) {
-                       
managedKeyedState.registerSharedStates(sharedStateRegistry);
-               }
+               registerSharedState(sharedStateRegistry, managedKeyedState);
+               registerSharedState(sharedStateRegistry, rawKeyedState);
+       }
 
-               if (rawKeyedState != null) {
-                       rawKeyedState.registerSharedStates(sharedStateRegistry);
+       private static void registerSharedState(
+               SharedStateRegistry sharedStateRegistry,
+               Iterable<KeyedStateHandle> stateHandles) {
+               for (KeyedStateHandle stateHandle : stateHandles) {
+                       if (stateHandle != null) {
+                               
stateHandle.registerSharedStates(sharedStateRegistry);
+                       }
                }
        }
 
@@ -175,44 +278,32 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
 
                OperatorSubtaskState that = (OperatorSubtaskState) o;
 
-               if (stateSize != that.stateSize) {
+               if (getStateSize() != that.getStateSize()) {
                        return false;
                }
-
-               if (legacyOperatorState != null ?
-                       !legacyOperatorState.equals(that.legacyOperatorState)
-                       : that.legacyOperatorState != null) {
+               if (getLegacyOperatorState() != null ? 
!getLegacyOperatorState().equals(that.getLegacyOperatorState()) : 
that.getLegacyOperatorState() != null) {
                        return false;
                }
-               if (managedOperatorState != null ?
-                       !managedOperatorState.equals(that.managedOperatorState)
-                       : that.managedOperatorState != null) {
+               if 
(!getManagedOperatorState().equals(that.getManagedOperatorState())) {
                        return false;
                }
-               if (rawOperatorState != null ?
-                       !rawOperatorState.equals(that.rawOperatorState)
-                       : that.rawOperatorState != null) {
+               if (!getRawOperatorState().equals(that.getRawOperatorState())) {
                        return false;
                }
-               if (managedKeyedState != null ?
-                       !managedKeyedState.equals(that.managedKeyedState)
-                       : that.managedKeyedState != null) {
+               if 
(!getManagedKeyedState().equals(that.getManagedKeyedState())) {
                        return false;
                }
-               return rawKeyedState != null ?
-                       rawKeyedState.equals(that.rawKeyedState)
-                       : that.rawKeyedState == null;
-
+               return getRawKeyedState().equals(that.getRawKeyedState());
        }
 
        @Override
        public int hashCode() {
-               int result = legacyOperatorState != null ? 
legacyOperatorState.hashCode() : 0;
-               result = 31 * result + (managedOperatorState != null ? 
managedOperatorState.hashCode() : 0);
-               result = 31 * result + (rawOperatorState != null ? 
rawOperatorState.hashCode() : 0);
-               result = 31 * result + (managedKeyedState != null ? 
managedKeyedState.hashCode() : 0);
-               result = 31 * result + (rawKeyedState != null ? 
rawKeyedState.hashCode() : 0);
-               result = 31 * result + (int) (stateSize ^ (stateSize >>> 32));
+               int result = getLegacyOperatorState() != null ? 
getLegacyOperatorState().hashCode() : 0;
+               result = 31 * result + getManagedOperatorState().hashCode();
+               result = 31 * result + getRawOperatorState().hashCode();
+               result = 31 * result + getManagedKeyedState().hashCode();
+               result = 31 * result + getRawKeyedState().hashCode();
+               result = 31 * result + (int) (getStateSize() ^ (getStateSize() 
>>> 32));
                return result;
        }
 
@@ -227,4 +318,21 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
                        ", stateSize=" + stateSize +
                        '}';
        }
+
+       public boolean hasState() {
+               return legacyOperatorState != null
+                       || hasState(managedOperatorState)
+                       || hasState(rawOperatorState)
+                       || hasState(managedKeyedState)
+                       || hasState(rawKeyedState);
+       }
+
+       private boolean hasState(Iterable<? extends StateObject> states) {
+               for (StateObject state : states) {
+                       if (state != null) {
+                               return true;
+                       }
+               }
+               return false;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
index 3472fc2..16231dd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java
@@ -25,19 +25,18 @@ import 
org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.state.ChainedStateHandle;
-import org.apache.flink.runtime.state.KeyedStateHandle;
-import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -353,13 +352,13 @@ public class PendingCheckpoint {
         * Acknowledges the task with the given execution attempt id and the 
given subtask state.
         *
         * @param executionAttemptId of the acknowledged task
-        * @param subtaskState of the acknowledged task
+        * @param operatorSubtaskStates of the acknowledged task
         * @param metrics Checkpoint metrics for the stats
         * @return TaskAcknowledgeResult of the operation
         */
        public TaskAcknowledgeResult acknowledgeTask(
                        ExecutionAttemptID executionAttemptId,
-                       SubtaskState subtaskState,
+                       TaskStateSnapshot operatorSubtaskStates,
                        CheckpointMetrics metrics) {
 
                synchronized (lock) {
@@ -383,21 +382,19 @@ public class PendingCheckpoint {
                        int subtaskIndex = vertex.getParallelSubtaskIndex();
                        long ackTimestamp = System.currentTimeMillis();
 
-                       long stateSize = 0;
-                       if (subtaskState != null) {
-                               stateSize = subtaskState.getStateSize();
-
-                               @SuppressWarnings("deprecation")
-                               ChainedStateHandle<StreamStateHandle> 
nonPartitionedState =
-                                       subtaskState.getLegacyOperatorState();
-                               ChainedStateHandle<OperatorStateHandle> 
partitioneableState =
-                                       subtaskState.getManagedOperatorState();
-                               ChainedStateHandle<OperatorStateHandle> 
rawOperatorState =
-                                       subtaskState.getRawOperatorState();
-
-                               // break task state apart into separate 
operator states
-                               for (int x = 0; x < operatorIDs.size(); x++) {
-                                       OperatorID operatorID = 
operatorIDs.get(x);
+                       long stateSize = 0L;
+
+                       if (operatorSubtaskStates != null) {
+                               for (OperatorID operatorID : operatorIDs) {
+
+                                       OperatorSubtaskState 
operatorSubtaskState =
+                                               
operatorSubtaskStates.getSubtaskStateByOperatorID(operatorID);
+
+                                       // if no real operatorSubtaskState was 
reported, we insert an empty state
+                                       if (operatorSubtaskState == null) {
+                                               operatorSubtaskState = new 
OperatorSubtaskState();
+                                       }
+
                                        OperatorState operatorState = 
operatorStates.get(operatorID);
 
                                        if (operatorState == null) {
@@ -408,23 +405,8 @@ public class PendingCheckpoint {
                                                operatorStates.put(operatorID, 
operatorState);
                                        }
 
-                                       KeyedStateHandle managedKeyedState = 
null;
-                                       KeyedStateHandle rawKeyedState = null;
-
-                                       // only the head operator retains the 
keyed state
-                                       if (x == operatorIDs.size() - 1) {
-                                               managedKeyedState = 
subtaskState.getManagedKeyedState();
-                                               rawKeyedState = 
subtaskState.getRawKeyedState();
-                                       }
-
-                                       OperatorSubtaskState 
operatorSubtaskState = new OperatorSubtaskState(
-                                                       nonPartitionedState != 
null ? nonPartitionedState.get(x) : null,
-                                                       partitioneableState != 
null ? partitioneableState.get(x) : null,
-                                                       rawOperatorState != 
null ? rawOperatorState.get(x) : null,
-                                                       managedKeyedState,
-                                                       rawKeyedState);
-
                                        operatorState.putState(subtaskIndex, 
operatorSubtaskState);
+                                       stateSize += 
operatorSubtaskState.getStateSize();
                                }
                        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
index 046096f..4513ef8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/RoundRobinOperatorStateRepartitioner.java
@@ -89,6 +89,10 @@ public class RoundRobinOperatorStateRepartitioner implements 
OperatorStateRepart
 
                for (OperatorStateHandle psh : previousParallelSubtaskStates) {
 
+                       if (psh == null) {
+                               continue;
+                       }
+
                        for (Map.Entry<String, 
OperatorStateHandle.StateMetaInfo> e :
                                        
psh.getStateNameToPartitionOffsets().entrySet()) {
                                OperatorStateHandle.StateMetaInfo metaInfo = 
e.getValue();

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
index 5712ea1..b69285e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateAssignmentOperation.java
@@ -23,15 +23,14 @@ import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
-import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -185,7 +184,8 @@ public class StateAssignmentOperation {
                                        subNonPartitionableState);
 
                                // PartitionedState
-                               
reAssignSubPartitionableState(newManagedOperatorStates,
+                               reAssignSubPartitionableState(
+                                       newManagedOperatorStates,
                                        newRawOperatorStates,
                                        subTaskIndex,
                                        operatorIndex,
@@ -193,36 +193,57 @@ public class StateAssignmentOperation {
                                        subRawOperatorState);
 
                                // KeyedState
-                               if (operatorIndex == operatorIDs.size() - 1) {
-                                       subKeyedState = 
reAssignSubKeyedStates(operatorState,
+                               if (isHeadOperator(operatorIndex, operatorIDs)) 
{
+                                       subKeyedState = reAssignSubKeyedStates(
+                                               operatorState,
                                                keyGroupPartitions,
                                                subTaskIndex,
                                                newParallelism,
                                                oldParallelism);
-
                                }
                        }
 
-
                        // check if a stateless task
                        if (!allElementsAreNull(subNonPartitionableState) ||
                                !allElementsAreNull(subManagedOperatorState) ||
                                !allElementsAreNull(subRawOperatorState) ||
                                subKeyedState != null) {
 
-                               TaskStateHandles taskStateHandles = new 
TaskStateHandles(
+                               TaskStateSnapshot taskState = new 
TaskStateSnapshot();
 
-                                       new 
ChainedStateHandle<>(subNonPartitionableState),
-                                       subManagedOperatorState,
-                                       subRawOperatorState,
-                                       subKeyedState != null ? 
subKeyedState.f0 : null,
-                                       subKeyedState != null ? 
subKeyedState.f1 : null);
+                               for (int i = 0; i < operatorIDs.size(); ++i) {
+
+                                       OperatorID operatorID = 
operatorIDs.get(i);
+
+                                       Collection<KeyedStateHandle> rawKeyed = 
Collections.emptyList();
+                                       Collection<KeyedStateHandle> 
managedKeyed = Collections.emptyList();
+
+                                       // keyed state case
+                                       if (subKeyedState != null) {
+                                               managedKeyed = subKeyedState.f0;
+                                               rawKeyed = subKeyedState.f1;
+                                       }
+
+                                       OperatorSubtaskState 
operatorSubtaskState =
+                                               new OperatorSubtaskState(
+                                                       
subNonPartitionableState.get(i),
+                                                       
subManagedOperatorState.get(i),
+                                                       
subRawOperatorState.get(i),
+                                                       managedKeyed,
+                                                       rawKeyed
+                                               );
+
+                                       
taskState.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState);
+                               }
 
-                               
currentExecutionAttempt.setInitialState(taskStateHandles);
+                               
currentExecutionAttempt.setInitialState(taskState);
                        }
                }
        }
 
+       private static boolean isHeadOperator(int opIdx, List<OperatorID> 
operatorIDs) {
+               return opIdx == operatorIDs.size() - 1;
+       }
 
        public void checkParallelismPreconditions(List<OperatorState> 
operatorStates, ExecutionJobVertex executionJobVertex) {
 
@@ -239,18 +260,18 @@ public class StateAssignmentOperation {
                        List<Collection<OperatorStateHandle>> 
subManagedOperatorState,
                        List<Collection<OperatorStateHandle>> 
subRawOperatorState) {
 
-               if (newMangedOperatorStates.get(operatorIndex) != null) {
-                       
subManagedOperatorState.add(newMangedOperatorStates.get(operatorIndex).get(subTaskIndex));
+               if (newMangedOperatorStates.get(operatorIndex) != null && 
!newMangedOperatorStates.get(operatorIndex).isEmpty()) {
+                       Collection<OperatorStateHandle> operatorStateHandles = 
newMangedOperatorStates.get(operatorIndex).get(subTaskIndex);
+                       subManagedOperatorState.add(operatorStateHandles != 
null ? operatorStateHandles : Collections.<OperatorStateHandle>emptyList());
                } else {
-                       subManagedOperatorState.add(null);
+                       
subManagedOperatorState.add(Collections.<OperatorStateHandle>emptyList());
                }
-               if (newRawOperatorStates.get(operatorIndex) != null) {
-                       
subRawOperatorState.add(newRawOperatorStates.get(operatorIndex).get(subTaskIndex));
+               if (newRawOperatorStates.get(operatorIndex) != null && 
!newRawOperatorStates.get(operatorIndex).isEmpty()) {
+                       Collection<OperatorStateHandle> operatorStateHandles = 
newRawOperatorStates.get(operatorIndex).get(subTaskIndex);
+                       subRawOperatorState.add(operatorStateHandles != null ? 
operatorStateHandles : Collections.<OperatorStateHandle>emptyList());
                } else {
-                       subRawOperatorState.add(null);
+                       
subRawOperatorState.add(Collections.<OperatorStateHandle>emptyList());
                }
-
-
        }
 
        private Tuple2<Collection<KeyedStateHandle>, 
Collection<KeyedStateHandle>> reAssignSubKeyedStates(
@@ -265,24 +286,22 @@ public class StateAssignmentOperation {
 
                if (newParallelism == oldParallelism) {
                        if (operatorState.getState(subTaskIndex) != null) {
-                               KeyedStateHandle oldSubManagedKeyedState = 
operatorState.getState(subTaskIndex).getManagedKeyedState();
-                               KeyedStateHandle oldSubRawKeyedState = 
operatorState.getState(subTaskIndex).getRawKeyedState();
-                               subManagedKeyedState = oldSubManagedKeyedState 
!= null ? Collections.singletonList(
-                                       oldSubManagedKeyedState) : null;
-                               subRawKeyedState = oldSubRawKeyedState != null 
? Collections.singletonList(
-                                       oldSubRawKeyedState) : null;
+                               subManagedKeyedState = 
operatorState.getState(subTaskIndex).getManagedKeyedState();
+                               subRawKeyedState = 
operatorState.getState(subTaskIndex).getRawKeyedState();
                        } else {
-                               subManagedKeyedState = null;
-                               subRawKeyedState = null;
+                               subManagedKeyedState = Collections.emptyList();
+                               subRawKeyedState = Collections.emptyList();
                        }
                } else {
                        subManagedKeyedState = 
getManagedKeyedStateHandles(operatorState, 
keyGroupPartitions.get(subTaskIndex));
                        subRawKeyedState = 
getRawKeyedStateHandles(operatorState, keyGroupPartitions.get(subTaskIndex));
                }
-               if (subManagedKeyedState == null && subRawKeyedState == null) {
+
+               if (subManagedKeyedState.isEmpty() && 
subRawKeyedState.isEmpty()) {
                        return null;
+               } else {
+                       return new Tuple2<>(subManagedKeyedState, 
subRawKeyedState);
                }
-               return new Tuple2<>(subManagedKeyedState, subRawKeyedState);
        }
 
 
@@ -318,7 +337,7 @@ public class StateAssignmentOperation {
                        List<List<Collection<OperatorStateHandle>>> 
newManagedOperatorStates,
                        List<List<Collection<OperatorStateHandle>>> 
newRawOperatorStates) {
 
-               //collect the old partitionalbe state
+               //collect the old partitionable state
                List<List<OperatorStateHandle>> oldManagedOperatorStates = new 
ArrayList<>();
                List<List<OperatorStateHandle>> oldRawOperatorStates = new 
ArrayList<>();
 
@@ -351,19 +370,16 @@ public class StateAssignmentOperation {
                        for (int i = 0; i < operatorState.getParallelism(); 
i++) {
                                OperatorSubtaskState operatorSubtaskState = 
operatorState.getState(i);
                                if (operatorSubtaskState != null) {
-                                       if 
(operatorSubtaskState.getManagedOperatorState() != null) {
-                                               if (managedOperatorState == 
null) {
-                                                       managedOperatorState = 
new ArrayList<>();
-                                               }
-                                               
managedOperatorState.add(operatorSubtaskState.getManagedOperatorState());
+
+                                       if (managedOperatorState == null) {
+                                               managedOperatorState = new 
ArrayList<>();
                                        }
+                                       
managedOperatorState.addAll(operatorSubtaskState.getManagedOperatorState());
 
-                                       if 
(operatorSubtaskState.getRawOperatorState() != null) {
-                                               if (rawOperatorState == null) {
-                                                       rawOperatorState = new 
ArrayList<>();
-                                               }
-                                               
rawOperatorState.add(operatorSubtaskState.getRawOperatorState());
+                                       if (rawOperatorState == null) {
+                                               rawOperatorState = new 
ArrayList<>();
                                        }
+                                       
rawOperatorState.addAll(operatorSubtaskState.getRawOperatorState());
                                }
 
                        }
@@ -382,21 +398,19 @@ public class StateAssignmentOperation {
         * @return all managedKeyedStateHandles which have intersection with 
given KeyGroupRange
         */
        public static List<KeyedStateHandle> getManagedKeyedStateHandles(
-                       OperatorState operatorState,
-                       KeyGroupRange subtaskKeyGroupRange) {
+               OperatorState operatorState,
+               KeyGroupRange subtaskKeyGroupRange) {
 
-               List<KeyedStateHandle> subtaskKeyedStateHandles = null;
+               List<KeyedStateHandle> subtaskKeyedStateHandles = new 
ArrayList<>();
 
                for (int i = 0; i < operatorState.getParallelism(); i++) {
-                       if (operatorState.getState(i) != null && 
operatorState.getState(i).getManagedKeyedState() != null) {
-                               KeyedStateHandle intersectedKeyedStateHandle = 
operatorState.getState(i).getManagedKeyedState().getIntersection(subtaskKeyGroupRange);
+                       if (operatorState.getState(i) != null) {
 
-                               if (intersectedKeyedStateHandle != null) {
-                                       if (subtaskKeyedStateHandles == null) {
-                                               subtaskKeyedStateHandles = new 
ArrayList<>();
-                                       }
-                                       
subtaskKeyedStateHandles.add(intersectedKeyedStateHandle);
-                               }
+                               Collection<KeyedStateHandle> keyedStateHandles 
= operatorState.getState(i).getManagedKeyedState();
+                               extractIntersectingState(
+                                       keyedStateHandles,
+                                       subtaskKeyGroupRange,
+                                       subtaskKeyedStateHandles);
                        }
                }
 
@@ -415,22 +429,40 @@ public class StateAssignmentOperation {
                OperatorState operatorState,
                KeyGroupRange subtaskKeyGroupRange) {
 
-               List<KeyedStateHandle> subtaskKeyedStateHandles = null;
+               List<KeyedStateHandle> extractedKeyedStateHandles = new 
ArrayList<>();
 
                for (int i = 0; i < operatorState.getParallelism(); i++) {
-                       if (operatorState.getState(i) != null && 
operatorState.getState(i).getRawKeyedState() != null) {
-                               KeyedStateHandle intersectedKeyedStateHandle = 
operatorState.getState(i).getRawKeyedState().getIntersection(subtaskKeyGroupRange);
+                       if (operatorState.getState(i) != null) {
+                               Collection<KeyedStateHandle> rawKeyedState = 
operatorState.getState(i).getRawKeyedState();
+                               extractIntersectingState(
+                                       rawKeyedState,
+                                       subtaskKeyGroupRange,
+                                       extractedKeyedStateHandles);
+                       }
+               }
+
+               return extractedKeyedStateHandles;
+       }
+
+       /**
+        * Extracts certain key group ranges from the given state handles and 
adds them to the collector.
+        */
+       private static void extractIntersectingState(
+               Collection<KeyedStateHandle> originalSubtaskStateHandles,
+               KeyGroupRange rangeToExtract,
+               List<KeyedStateHandle> extractedStateCollector) {
+
+               for (KeyedStateHandle keyedStateHandle : 
originalSubtaskStateHandles) {
+
+                       if (keyedStateHandle != null) {
+
+                               KeyedStateHandle intersectedKeyedStateHandle = 
keyedStateHandle.getIntersection(rangeToExtract);
 
                                if (intersectedKeyedStateHandle != null) {
-                                       if (subtaskKeyedStateHandles == null) {
-                                               subtaskKeyedStateHandles = new 
ArrayList<>();
-                                       }
-                                       
subtaskKeyedStateHandles.add(intersectedKeyedStateHandle);
+                                       
extractedStateCollector.add(intersectedKeyedStateHandle);
                                }
                        }
                }
-
-               return subtaskKeyedStateHandles;
        }
 
        /**
@@ -554,7 +586,7 @@ public class StateAssignmentOperation {
                        int newParallelism) {
 
                if (chainOpParallelStates == null) {
-                       return null;
+                       return Collections.emptyList();
                }
 
                //We only redistribute if the parallelism of the operator 
changed from previous executions
@@ -567,20 +599,23 @@ public class StateAssignmentOperation {
                        List<Collection<OperatorStateHandle>> repackStream = 
new ArrayList<>(newParallelism);
                        for (OperatorStateHandle operatorStateHandle : 
chainOpParallelStates) {
 
-                               Map<String, OperatorStateHandle.StateMetaInfo> 
partitionOffsets =
+                               if (operatorStateHandle != null) {
+                                       Map<String, 
OperatorStateHandle.StateMetaInfo> partitionOffsets =
                                                
operatorStateHandle.getStateNameToPartitionOffsets();
 
-                               for (OperatorStateHandle.StateMetaInfo metaInfo 
: partitionOffsets.values()) {
 
-                                       // if we find any broadcast state, we 
cannot take the shortcut and need to go through repartitioning
-                                       if 
(OperatorStateHandle.Mode.BROADCAST.equals(metaInfo.getDistributionMode())) {
-                                               return 
opStateRepartitioner.repartitionState(
+                                       for (OperatorStateHandle.StateMetaInfo 
metaInfo : partitionOffsets.values()) {
+
+                                               // if we find any broadcast 
state, we cannot take the shortcut and need to go through repartitioning
+                                               if 
(OperatorStateHandle.Mode.BROADCAST.equals(metaInfo.getDistributionMode())) {
+                                                       return 
opStateRepartitioner.repartitionState(
                                                                
chainOpParallelStates,
                                                                newParallelism);
+                                               }
                                        }
-                               }
 
-                               
repackStream.add(Collections.singletonList(operatorStateHandle));
+                                       
repackStream.add(Collections.singletonList(operatorStateHandle));
+                               }
                        }
                        return repackStream;
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
new file mode 100644
index 0000000..c416f3f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CompositeStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateUtil;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class encapsulates state handles to the snapshots of all operator 
instances executed within one task. A task
+ * can run multiple operator instances as a result of operator chaining, and 
all operator instances from the chain can
+ * register their state under their operator id. Each operator instance is a 
physical execution responsible for
+ * processing a partition of the data that goes through a logical operator. 
This partitioning happens to parallelize
+ * execution of logical operators, e.g. distributing a map function.
+ *
+ * <p>One instance of this class contains the information that one task will 
send to acknowledge a checkpoint request by
+ * the checkpoint coordinator. Tasks run operator instances in parallel, so 
the union of all
+ * {@link TaskStateSnapshot} that are collected by the checkpoint coordinator 
from all tasks represent the whole
+ * state of a job at the time of the checkpoint.
+ *
+ * <p>This class should be called TaskState once the old class with this name 
that we keep for backwards
+ * compatibility goes away.
+ */
+public class TaskStateSnapshot implements CompositeStateHandle {
+
+       private static final long serialVersionUID = 1L;
+
+       /** Mapping from an operator id to the state of one subtask of this 
operator */
+       private final Map<OperatorID, OperatorSubtaskState> 
subtaskStatesByOperatorID;
+
+       public TaskStateSnapshot() {
+               this(10);
+       }
+
+       public TaskStateSnapshot(int size) {
+               this(new HashMap<OperatorID, OperatorSubtaskState>(size));
+       }
+
+       public TaskStateSnapshot(Map<OperatorID, OperatorSubtaskState> 
subtaskStatesByOperatorID) {
+               this.subtaskStatesByOperatorID = 
Preconditions.checkNotNull(subtaskStatesByOperatorID);
+       }
+
+       /**
+        * Returns the subtask state for the given operator id (or null if not 
contained).
+        */
+       public OperatorSubtaskState getSubtaskStateByOperatorID(OperatorID 
operatorID) {
+               return subtaskStatesByOperatorID.get(operatorID);
+       }
+
+       /**
+        * Maps the given operator id to the given subtask state. Returns the 
subtask state of a previous mapping, if such
+        * a mapping existed or null otherwise.
+        */
+       public OperatorSubtaskState putSubtaskStateByOperatorID(OperatorID 
operatorID, OperatorSubtaskState state) {
+               return subtaskStatesByOperatorID.put(operatorID, 
Preconditions.checkNotNull(state));
+       }
+
+       /**
+        * Returns the set of all mappings from operator id to the 
corresponding subtask state.
+        */
+       public Set<Map.Entry<OperatorID, OperatorSubtaskState>> 
getSubtaskStateMappings() {
+               return subtaskStatesByOperatorID.entrySet();
+       }
+
+       @Override
+       public void discardState() throws Exception {
+               
StateUtil.bestEffortDiscardAllStateObjects(subtaskStatesByOperatorID.values());
+       }
+
+       @Override
+       public long getStateSize() {
+               long size = 0L;
+
+               for (OperatorSubtaskState subtaskState : 
subtaskStatesByOperatorID.values()) {
+                       if (subtaskState != null) {
+                               size += subtaskState.getStateSize();
+                       }
+               }
+
+               return size;
+       }
+
+       @Override
+       public void registerSharedStates(SharedStateRegistry stateRegistry) {
+               for (OperatorSubtaskState operatorSubtaskState : 
subtaskStatesByOperatorID.values()) {
+                       if (operatorSubtaskState != null) {
+                               
operatorSubtaskState.registerSharedStates(stateRegistry);
+                       }
+               }
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+
+               TaskStateSnapshot that = (TaskStateSnapshot) o;
+
+               return 
subtaskStatesByOperatorID.equals(that.subtaskStatesByOperatorID);
+       }
+
+       @Override
+       public int hashCode() {
+               return subtaskStatesByOperatorID.hashCode();
+       }
+
+       @Override
+       public String toString() {
+               return "TaskOperatorSubtaskStates{" +
+                       "subtaskStatesByOperatorID=" + 
subtaskStatesByOperatorID +
+                       '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
index 4cbbfcf..15628a0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java
@@ -240,6 +240,18 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
        //  task state (de)serialization methods
        // 
------------------------------------------------------------------------
 
+       private static <T> T extractSingleton(Collection<T> collection) {
+               if (collection == null || collection.isEmpty()) {
+                       return null;
+               }
+
+               if (collection.size() == 1) {
+                       return collection.iterator().next();
+               } else {
+                       throw new IllegalStateException("Expected singleton 
collection, but found size: " + collection.size());
+               }
+       }
+
        private static void serializeSubtaskState(OperatorSubtaskState 
subtaskState, DataOutputStream dos) throws IOException {
 
                dos.writeLong(-1);
@@ -252,7 +264,7 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                        serializeStreamStateHandle(nonPartitionableState, dos);
                }
 
-               OperatorStateHandle operatorStateBackend = 
subtaskState.getManagedOperatorState();
+               OperatorStateHandle operatorStateBackend = 
extractSingleton(subtaskState.getManagedOperatorState());
 
                len = operatorStateBackend != null ? 1 : 0;
                dos.writeInt(len);
@@ -260,7 +272,7 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                        serializeOperatorStateHandle(operatorStateBackend, dos);
                }
 
-               OperatorStateHandle operatorStateFromStream = 
subtaskState.getRawOperatorState();
+               OperatorStateHandle operatorStateFromStream = 
extractSingleton(subtaskState.getRawOperatorState());
 
                len = operatorStateFromStream != null ? 1 : 0;
                dos.writeInt(len);
@@ -268,10 +280,10 @@ class SavepointV2Serializer implements 
SavepointSerializer<SavepointV2> {
                        serializeOperatorStateHandle(operatorStateFromStream, 
dos);
                }
 
-               KeyedStateHandle keyedStateBackend = 
subtaskState.getManagedKeyedState();
+               KeyedStateHandle keyedStateBackend = 
extractSingleton(subtaskState.getManagedKeyedState());
                serializeKeyedStateHandle(keyedStateBackend, dos);
 
-               KeyedStateHandle keyedStateStream = 
subtaskState.getRawKeyedState();
+               KeyedStateHandle keyedStateStream = 
extractSingleton(subtaskState.getRawKeyedState());
                serializeKeyedStateHandle(keyedStateStream, dos);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 0578b78..1fa5eb5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -18,11 +18,11 @@
 
 package org.apache.flink.runtime.deployment;
 
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 
@@ -64,7 +64,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
        private final int targetSlotNumber;
 
        /** State handles for the sub task. */
-       private final TaskStateHandles taskStateHandles;
+       private final TaskStateSnapshot taskStateHandles;
 
        public TaskDeploymentDescriptor(
                        SerializedValue<JobInformation> 
serializedJobInformation,
@@ -74,7 +74,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                        int subtaskIndex,
                        int attemptNumber,
                        int targetSlotNumber,
-                       TaskStateHandles taskStateHandles,
+                       TaskStateSnapshot taskStateHandles,
                        Collection<ResultPartitionDeploymentDescriptor> 
resultPartitionDeploymentDescriptors,
                        Collection<InputGateDeploymentDescriptor> 
inputGateDeploymentDescriptors) {
 
@@ -153,7 +153,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                return inputGates;
        }
 
-       public TaskStateHandles getTaskStateHandles() {
+       public TaskStateSnapshot getTaskStateHandles() {
                return taskStateHandles;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 9e9f7c4..203ee85 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.execution;
 
-import java.util.Map;
-import java.util.concurrent.Future;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.TaskInfo;
@@ -28,7 +26,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
@@ -41,6 +39,9 @@ import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 
+import java.util.Map;
+import java.util.concurrent.Future;
+
 /**
  * The Environment gives the code executed in a task access to the task's 
properties
  * (such as name, parallelism), the configurations, the data stream readers 
and writers,
@@ -175,7 +176,7 @@ public interface Environment {
         * @param checkpointMetrics metrics for this checkpoint
         * @param subtaskState All state handles for the checkpointed state
         */
-       void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics 
checkpointMetrics, SubtaskState subtaskState);
+       void acknowledgeCheckpoint(long checkpointId, CheckpointMetrics 
checkpointMetrics, TaskStateSnapshot subtaskState);
 
        /**
         * Declines a checkpoint. This tells the checkpoint coordinator that 
this task will

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index bd5bc7f..2074820 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
@@ -41,7 +42,6 @@ import 
org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.StackTraceSampleResponse;
-import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
 
@@ -133,7 +133,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        private volatile Throwable failureCause;          // once assigned, 
never changes
 
        /** The handle to the state that the task gets on restore */
-       private volatile TaskStateHandles taskState;
+       private volatile TaskStateSnapshot taskState;
 
        // ------------------------ Accumulators & Metrics 
------------------------
 
@@ -253,7 +253,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                return state.isTerminal();
        }
 
-       public TaskStateHandles getTaskStateHandles() {
+       public TaskStateSnapshot getTaskStateSnapshot() {
                return taskState;
        }
 
@@ -263,7 +263,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
         *
         * @param checkpointStateHandles all checkpointed operator state
         */
-       public void setInitialState(TaskStateHandles checkpointStateHandles) {
+       public void setInitialState(TaskStateSnapshot checkpointStateHandles) {
                checkState(state == CREATED, "Can only assign operator state 
when execution attempt is in CREATED");
                this.taskState = checkpointStateHandles;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 0ff71e7..9aac133 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -22,7 +22,9 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import 
org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
@@ -38,11 +40,9 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.EvictingBoundedList;
 import org.apache.flink.util.ExceptionUtils;
@@ -457,7 +457,7 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
         */
        public Iterable<TaskManagerLocation> 
getPreferredLocationsBasedOnState() {
                TaskManagerLocation priorLocation;
-               if (currentExecution.getTaskStateHandles() != null && 
(priorLocation = getLatestPriorLocation()) != null) {
+               if (currentExecution.getTaskStateSnapshot() != null && 
(priorLocation = getLatestPriorLocation()) != null) {
                        return Collections.singleton(priorLocation);
                }
                else {
@@ -719,7 +719,7 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
        TaskDeploymentDescriptor createDeploymentDescriptor(
                        ExecutionAttemptID executionId,
                        SimpleSlot targetSlot,
-                       TaskStateHandles taskStateHandles,
+                       TaskStateSnapshot taskStateHandles,
                        int attemptNumber) throws ExecutionGraphException {
                
                // Produced intermediate results

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
index 0930011..00db01f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/StatefulTask.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.jobgraph.tasks;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.state.TaskStateHandles;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 
 /**
  * This interface must be implemented by any invokable that has recoverable 
state and participates
@@ -35,7 +35,7 @@ public interface StatefulTask {
         *
         * @param taskStateHandles All state handle for the task.
         */
-       void setInitialState(TaskStateHandles taskStateHandles) throws 
Exception;
+       void setInitialState(TaskStateSnapshot taskStateHandles) throws 
Exception;
 
        /**
         * This method is called to trigger a checkpoint, asynchronously by the 
checkpoint
@@ -43,8 +43,8 @@ public interface StatefulTask {
         * 
         * <p>This method is called for tasks that start the checkpoints by 
injecting the initial barriers,
         * i.e., the source tasks. In contrast, checkpoints on downstream 
operators, which are the result of
-        * receiving checkpoint barriers, invoke the {@link 
#triggerCheckpointOnBarrier(CheckpointMetaData, CheckpointMetrics)}
-        * method.
+        * receiving checkpoint barriers, invoke the
+        * {@link #triggerCheckpointOnBarrier(CheckpointMetaData, 
CheckpointOptions, CheckpointMetrics)} method.
         *
         * @param checkpointMetaData Meta data for about this checkpoint
         * @param checkpointOptions Options for performing this checkpoint

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 31036f6..25df19b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -31,7 +31,7 @@ import 
org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -96,6 +96,7 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -586,7 +587,7 @@ public class JobMaster extends RpcEndpoint implements 
JobMasterGateway {
                        final ExecutionAttemptID executionAttemptID,
                        final long checkpointId,
                        final CheckpointMetrics checkpointMetrics,
-                       final SubtaskState checkpointState) {
+                       final TaskStateSnapshot checkpointState) {
 
                final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
                final AcknowledgeCheckpoint ackMessage = 

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
index 9721c2c..65e3019 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.messages.checkpoint;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 
 /**
@@ -36,7 +36,7 @@ public class AcknowledgeCheckpoint extends 
AbstractCheckpointMessage implements
 
        private static final long serialVersionUID = -7606214777192401493L;
 
-       private final SubtaskState subtaskState;
+       private final TaskStateSnapshot subtaskState;
 
        private final CheckpointMetrics checkpointMetrics;
 
@@ -47,7 +47,7 @@ public class AcknowledgeCheckpoint extends 
AbstractCheckpointMessage implements
                        ExecutionAttemptID taskExecutionId,
                        long checkpointId,
                        CheckpointMetrics checkpointMetrics,
-                       SubtaskState subtaskState) {
+                       TaskStateSnapshot subtaskState) {
 
                super(job, taskExecutionId, checkpointId);
 
@@ -64,7 +64,7 @@ public class AcknowledgeCheckpoint extends 
AbstractCheckpointMessage implements
        //  properties
        // 
------------------------------------------------------------------------
 
-       public SubtaskState getSubtaskState() {
+       public TaskStateSnapshot getSubtaskState() {
                return subtaskState;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
index d82af72..031d7c7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateInitializationContextImpl.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.state;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -26,6 +25,8 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.commons.io.IOUtils;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -139,6 +140,7 @@ public class StateInitializationContextImpl implements 
StateInitializationContex
        }
 
        private static Collection<KeyGroupsStateHandle> 
transform(Collection<KeyedStateHandle> keyedStateHandles) {
+
                if (keyedStateHandles == null) {
                        return null;
                }
@@ -146,13 +148,14 @@ public class StateInitializationContextImpl implements 
StateInitializationContex
                List<KeyGroupsStateHandle> keyGroupsStateHandles = new 
ArrayList<>();
 
                for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
-                       if (! (keyedStateHandle instanceof 
KeyGroupsStateHandle)) {
+
+                       if (keyedStateHandle instanceof KeyGroupsStateHandle) {
+                               
keyGroupsStateHandles.add((KeyGroupsStateHandle) keyedStateHandle);
+                       } else if (keyedStateHandle != null) {
                                throw new IllegalStateException("Unexpected 
state handle type, " +
                                        "expected: " + 
KeyGroupsStateHandle.class +
                                        ", but found: " + 
keyedStateHandle.getClass() + ".");
                        }
-
-                       keyGroupsStateHandles.add((KeyGroupsStateHandle) 
keyedStateHandle);
                }
 
                return keyGroupsStateHandles;

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java
deleted file mode 100644
index 2fde548..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateHandles.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * This class encapsulates all state handles for a task.
- */
-public class TaskStateHandles implements Serializable {
-
-       public static final TaskStateHandles EMPTY = new TaskStateHandles();
-
-       private static final long serialVersionUID = 267686583583579359L;
-
-       /**
-        * State handle with the (non-partitionable) legacy operator state
-        *
-        * @deprecated Non-repartitionable operator state that has been 
deprecated.
-        * Can be removed when we remove the APIs for non-repartitionable 
operator state.
-        */
-       @Deprecated
-       private final ChainedStateHandle<StreamStateHandle> legacyOperatorState;
-
-       /** Collection of handles which represent the managed keyed state of 
the head operator */
-       private final Collection<KeyedStateHandle> managedKeyedState;
-
-       /** Collection of handles which represent the raw/streamed keyed state 
of the head operator */
-       private final Collection<KeyedStateHandle> rawKeyedState;
-
-       /** Outer list represents the operator chain, each collection holds 
handles for managed state of a single operator */
-       private final List<Collection<OperatorStateHandle>> 
managedOperatorState;
-
-       /** Outer list represents the operator chain, each collection holds 
handles for raw/streamed state of a single operator */
-       private final List<Collection<OperatorStateHandle>> rawOperatorState;
-
-       public TaskStateHandles() {
-               this(null, null, null, null, null);
-       }
-
-       public TaskStateHandles(SubtaskState checkpointStateHandles) {
-               this(checkpointStateHandles.getLegacyOperatorState(),
-                               
transform(checkpointStateHandles.getManagedOperatorState()),
-                               
transform(checkpointStateHandles.getRawOperatorState()),
-                               
transform(checkpointStateHandles.getManagedKeyedState()),
-                               
transform(checkpointStateHandles.getRawKeyedState()));
-       }
-
-       public TaskStateHandles(
-                       ChainedStateHandle<StreamStateHandle> 
legacyOperatorState,
-                       List<Collection<OperatorStateHandle>> 
managedOperatorState,
-                       List<Collection<OperatorStateHandle>> rawOperatorState,
-                       Collection<KeyedStateHandle> managedKeyedState,
-                       Collection<KeyedStateHandle> rawKeyedState) {
-
-               this.legacyOperatorState = legacyOperatorState;
-               this.managedKeyedState = managedKeyedState;
-               this.rawKeyedState = rawKeyedState;
-               this.managedOperatorState = managedOperatorState;
-               this.rawOperatorState = rawOperatorState;
-       }
-
-       /**
-        * @deprecated Non-repartitionable operator state that has been 
deprecated.
-        * Can be removed when we remove the APIs for non-repartitionable 
operator state.
-        */
-       @Deprecated
-       public ChainedStateHandle<StreamStateHandle> getLegacyOperatorState() {
-               return legacyOperatorState;
-       }
-
-       public Collection<KeyedStateHandle> getManagedKeyedState() {
-               return managedKeyedState;
-       }
-
-       public Collection<KeyedStateHandle> getRawKeyedState() {
-               return rawKeyedState;
-       }
-
-       public List<Collection<OperatorStateHandle>> getRawOperatorState() {
-               return rawOperatorState;
-       }
-
-       public List<Collection<OperatorStateHandle>> getManagedOperatorState() {
-               return managedOperatorState;
-       }
-
-       private static List<Collection<OperatorStateHandle>> 
transform(ChainedStateHandle<OperatorStateHandle> in) {
-               if (null == in) {
-                       return Collections.emptyList();
-               }
-               List<Collection<OperatorStateHandle>> out = new 
ArrayList<>(in.getLength());
-               for (int i = 0; i < in.getLength(); ++i) {
-                       OperatorStateHandle osh = in.get(i);
-                       out.add(osh != null ? Collections.singletonList(osh) : 
null);
-               }
-               return out;
-       }
-
-       private static <T> List<T> transform(T in) {
-               return in == null ? Collections.<T>emptyList() : 
Collections.singletonList(in);
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               TaskStateHandles that = (TaskStateHandles) o;
-
-               if (legacyOperatorState != null ?
-                               
!legacyOperatorState.equals(that.legacyOperatorState)
-                               : that.legacyOperatorState != null) {
-                       return false;
-               }
-               if (managedKeyedState != null ?
-                               
!managedKeyedState.equals(that.managedKeyedState)
-                               : that.managedKeyedState != null) {
-                       return false;
-               }
-               if (rawKeyedState != null ?
-                               !rawKeyedState.equals(that.rawKeyedState)
-                               : that.rawKeyedState != null) {
-                       return false;
-               }
-
-               if (rawOperatorState != null ?
-                               !rawOperatorState.equals(that.rawOperatorState)
-                               : that.rawOperatorState != null) {
-                       return false;
-               }
-               return managedOperatorState != null ?
-                               
managedOperatorState.equals(that.managedOperatorState)
-                               : that.managedOperatorState == null;
-       }
-
-       @Override
-       public int hashCode() {
-               int result = legacyOperatorState != null ? 
legacyOperatorState.hashCode() : 0;
-               result = 31 * result + (managedKeyedState != null ? 
managedKeyedState.hashCode() : 0);
-               result = 31 * result + (rawKeyedState != null ? 
rawKeyedState.hashCode() : 0);
-               result = 31 * result + (managedOperatorState != null ? 
managedOperatorState.hashCode() : 0);
-               result = 31 * result + (rawOperatorState != null ? 
rawOperatorState.hashCode() : 0);
-               return result;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
index bf60161..aba8bda 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcCheckpointResponder.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.taskexecutor.rpc;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.util.Preconditions;
@@ -40,7 +40,7 @@ public class RpcCheckpointResponder implements 
CheckpointResponder {
                        ExecutionAttemptID executionAttemptID,
                        long checkpointId,
                        CheckpointMetrics checkpointMetrics,
-                       SubtaskState subtaskState) {
+                       TaskStateSnapshot subtaskState) {
 
                checkpointCoordinatorGateway.acknowledgeCheckpoint(
                        jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
index ad0df71..e9f600d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayCheckpointResponder.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
@@ -44,7 +44,7 @@ public class ActorGatewayCheckpointResponder implements 
CheckpointResponder {
                        ExecutionAttemptID executionAttemptID,
                        long checkpointId,
                        CheckpointMetrics checkpointMetrics,
-                       SubtaskState checkpointStateHandles) {
+                       TaskStateSnapshot checkpointStateHandles) {
 
                AcknowledgeCheckpoint message = new AcknowledgeCheckpoint(
                                jobID, executionAttemptID, checkpointId, 
checkpointMetrics,

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
index cc66a3f..b3584a6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointResponder.java
@@ -20,7 +20,7 @@ package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 
 /**
@@ -47,7 +47,7 @@ public interface CheckpointResponder {
                ExecutionAttemptID executionAttemptID,
                long checkpointId,
                CheckpointMetrics checkpointMetrics,
-               SubtaskState subtaskState);
+               TaskStateSnapshot subtaskState);
 
        /**
         * Declines the given checkpoint.

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
index 788a590..92b5886 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java
@@ -26,7 +26,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -245,7 +245,7 @@ public class RuntimeEnvironment implements Environment {
        public void acknowledgeCheckpoint(
                        long checkpointId,
                        CheckpointMetrics checkpointMetrics,
-                       SubtaskState checkpointStateHandles) {
+                       TaskStateSnapshot checkpointStateHandles) {
 
                checkpointResponder.acknowledgeCheckpoint(
                                jobId, executionId, checkpointId, 
checkpointMetrics,

http://git-wip-us.apache.org/repos/asf/flink/blob/b71154a7/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 596d365..04cb990 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -34,6 +34,7 @@ import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotCheckpointingException;
 import 
org.apache.flink.runtime.checkpoint.decline.CheckpointDeclineTaskNotReadyException;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -67,16 +68,17 @@ import 
org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.TaskStateHandles;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.WrappingRuntimeException;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.net.URL;
 import java.util.Collection;
@@ -250,7 +252,7 @@ public class Task implements Runnable, TaskActions {
         * The handles to the states that the task was initialized with. Will 
be set
         * to null after the initialization, to be memory friendly.
         */
-       private volatile TaskStateHandles taskStateHandles;
+       private volatile TaskStateSnapshot taskStateHandles;
 
        /** Initialized from the Flink configuration. May also be set at the 
ExecutionConfig */
        private long taskCancellationInterval;
@@ -272,7 +274,7 @@ public class Task implements Runnable, TaskActions {
                Collection<ResultPartitionDeploymentDescriptor> 
resultPartitionDeploymentDescriptors,
                Collection<InputGateDeploymentDescriptor> 
inputGateDeploymentDescriptors,
                int targetSlotNumber,
-               TaskStateHandles taskStateHandles,
+               TaskStateSnapshot taskStateHandles,
                MemoryManager memManager,
                IOManager ioManager,
                NetworkEnvironment networkEnvironment,

Reply via email to