http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
index dea3452..d62b13e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.java
@@ -22,7 +22,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -40,9 +40,10 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * State handles backed by ZooKeeper.
  *
- * <p>Added state is persisted via {@link StateHandle}s, which in turn are 
written to
- * ZooKeeper. This level of indirection is necessary to keep the amount of 
data in ZooKeeper
- * small. ZooKeeper is build for data in the KB range whereas state can grow 
to multiple MBs.
+ * <p>Added state is persisted via {@link RetrievableStateHandle 
RetrievableStateHandles},
+ * which in turn are written to ZooKeeper. This level of indirection is 
necessary to keep the
+ * amount of data in ZooKeeper small. ZooKeeper is build for data in the KB 
range whereas
+ * state can grow to multiple MBs.
  *
  * <p>State modifications require some care, because it is possible that 
certain failures bring
  * the state handle backend and ZooKeeper out of sync.
@@ -72,7 +73,7 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
        /** Curator ZooKeeper client */
        private final CuratorFramework client;
 
-       private final StateStorageHelper<T> storage;
+       private final RetrievableStateStorageHelper<T> storage;
 
        /**
         * Creates a {@link ZooKeeperStateHandleStore}.
@@ -84,7 +85,7 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
         */
        public ZooKeeperStateHandleStore(
                CuratorFramework client,
-               StateStorageHelper storage) throws IOException {
+               RetrievableStateStorageHelper<T> storage) throws IOException {
 
                this.client = checkNotNull(client, "Curator client");
                this.storage = checkNotNull(storage, "State storage");
@@ -94,9 +95,9 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
         * Creates a state handle and stores it in ZooKeeper with create mode 
{@link
         * CreateMode#PERSISTENT}.
         *
-        * @see #add(String, Serializable, CreateMode)
+        * @see #add(String, T, CreateMode)
         */
-       public StateHandle<T> add(String pathInZooKeeper, T state) throws 
Exception {
+       public RetrievableStateHandle<T> add(String pathInZooKeeper, T state) 
throws Exception {
                return add(pathInZooKeeper, state, CreateMode.PERSISTENT);
        }
 
@@ -111,39 +112,39 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
         *                        start with a '/')
         * @param state           State to be added
         * @param createMode      The create mode for the new path in ZooKeeper
-        * @return Created {@link StateHandle}
+        *
+        * @return The Created {@link RetrievableStateHandle}.
         * @throws Exception If a ZooKeeper or state handle operation fails
         */
-       public StateHandle<T> add(
+       public RetrievableStateHandle<T> add(
                        String pathInZooKeeper,
                        T state,
                        CreateMode createMode) throws Exception {
                checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
                checkNotNull(state, "State");
 
-               StateHandle<T> stateHandle = storage.store(state);
+               RetrievableStateHandle<T> storeHandle = storage.store(state);
 
                boolean success = false;
 
                try {
                        // Serialize the state handle. This writes the state to 
the backend.
-                       byte[] serializedStateHandle = 
InstantiationUtil.serializeObject(stateHandle);
+                       byte[] serializedStoreHandle = 
InstantiationUtil.serializeObject(storeHandle);
 
                        // Write state handle (not the actual state) to 
ZooKeeper. This is expected to be
                        // smaller than the state itself. This level of 
indirection makes sure that data in
                        // ZooKeeper is small, because ZooKeeper is designed 
for data in the KB range, but
                        // the state can be larger.
-                       
client.create().withMode(createMode).forPath(pathInZooKeeper, 
serializedStateHandle);
+                       
client.create().withMode(createMode).forPath(pathInZooKeeper, 
serializedStoreHandle);
 
                        success = true;
-
-                       return stateHandle;
+                       return storeHandle;
                }
                finally {
                        if (!success) {
                                // Cleanup the state handle if it was not 
written to ZooKeeper.
-                               if (stateHandle != null) {
-                                       stateHandle.discardState();
+                               if (storeHandle != null) {
+                                       storeHandle.discardState();
                                }
                        }
                }
@@ -161,31 +162,29 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
                checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
                checkNotNull(state, "State");
 
-               StateHandle<T> oldStateHandle = get(pathInZooKeeper);
+               RetrievableStateHandle<T> oldStateHandle = get(pathInZooKeeper);
 
-               StateHandle<T> stateHandle = storage.store(state);
+               RetrievableStateHandle<T> newStateHandle = storage.store(state);
 
                boolean success = false;
 
                try {
                        // Serialize the new state handle. This writes the 
state to the backend.
-                       byte[] serializedStateHandle = 
InstantiationUtil.serializeObject(stateHandle);
+                       byte[] serializedStateHandle = 
InstantiationUtil.serializeObject(newStateHandle);
 
                        // Replace state handle in ZooKeeper.
                        client.setData()
                                        .withVersion(expectedVersion)
                                        .forPath(pathInZooKeeper, 
serializedStateHandle);
-
                        success = true;
-               }
-               finally {
-                       if (success) {
+               } finally {
+                       if(success) {
                                oldStateHandle.discardState();
-                       }
-                       else {
-                               stateHandle.discardState();
+                       } else {
+                               newStateHandle.discardState();
                        }
                }
+
        }
 
        /**
@@ -216,13 +215,11 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
         * @throws Exception If a ZooKeeper or state handle operation fails
         */
        @SuppressWarnings("unchecked")
-       public StateHandle<T> get(String pathInZooKeeper) throws Exception {
+       public RetrievableStateHandle<T> get(String pathInZooKeeper) throws 
Exception {
                checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 
                byte[] data = client.getData().forPath(pathInZooKeeper);
-
-               return (StateHandle<T>) InstantiationUtil
-                               .deserializeObject(data, 
ClassLoader.getSystemClassLoader());
+               return InstantiationUtil.deserializeObject(data);
        }
 
        /**
@@ -234,8 +231,8 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
         * @throws Exception If a ZooKeeper or state handle operation fails
         */
        @SuppressWarnings("unchecked")
-       public List<Tuple2<StateHandle<T>, String>> getAll() throws Exception {
-               final List<Tuple2<StateHandle<T>, String>> stateHandles = new 
ArrayList<>();
+       public List<Tuple2<RetrievableStateHandle<T>, String>> getAll() throws 
Exception {
+               final List<Tuple2<RetrievableStateHandle<T>, String>> 
stateHandles = new ArrayList<>();
 
                boolean success = false;
 
@@ -254,7 +251,7 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
                                        path = "/" + path;
 
                                        try {
-                                               final StateHandle<T> 
stateHandle = get(path);
+                                               final RetrievableStateHandle<T> 
stateHandle = get(path);
                                                stateHandles.add(new 
Tuple2<>(stateHandle, path));
                                        } catch 
(KeeperException.NoNodeException ignored) {
                                                // Concurrent deletion, retry
@@ -272,6 +269,7 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
                return stateHandles;
        }
 
+
        /**
         * Gets all available state handles from ZooKeeper sorted by name 
(ascending).
         *
@@ -281,8 +279,8 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
         * @throws Exception If a ZooKeeper or state handle operation fails
         */
        @SuppressWarnings("unchecked")
-       public List<Tuple2<StateHandle<T>, String>> getAllSortedByName() throws 
Exception {
-               final List<Tuple2<StateHandle<T>, String>> stateHandles = new 
ArrayList<>();
+       public List<Tuple2<RetrievableStateHandle<T>, String>> 
getAllSortedByName() throws Exception {
+               final List<Tuple2<RetrievableStateHandle<T>, String>> 
stateHandles = new ArrayList<>();
 
                boolean success = false;
 
@@ -303,7 +301,7 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
                                        path = "/" + path;
 
                                        try {
-                                               final StateHandle<T> 
stateHandle = get(path);
+                                               final RetrievableStateHandle<T> 
stateHandle = get(path);
                                                stateHandles.add(new 
Tuple2<>(stateHandle, path));
                                        } catch 
(KeeperException.NoNodeException ignored) {
                                                // Concurrent deletion, retry
@@ -364,7 +362,7 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
        public void removeAndDiscardState(String pathInZooKeeper) throws 
Exception {
                checkNotNull(pathInZooKeeper, "Path in ZooKeeper");
 
-               StateHandle<T> stateHandle = get(pathInZooKeeper);
+               RetrievableStateHandle<T> stateHandle = get(pathInZooKeeper);
 
                // Delete the state handle from ZooKeeper first
                
client.delete().deletingChildrenIfNeeded().forPath(pathInZooKeeper);
@@ -381,7 +379,7 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
         * @throws Exception If a ZooKeeper or state handle operation fails
         */
        public void removeAndDiscardAllState() throws Exception {
-               final List<Tuple2<StateHandle<T>, String>> allStateHandles = 
getAll();
+               final List<Tuple2<RetrievableStateHandle<T>, String>> 
allStateHandles = getAll();
 
                ZKPaths.deleteChildren(
                                client.getZookeeperClient().getZooKeeper(),
@@ -389,7 +387,7 @@ public class ZooKeeperStateHandleStore<T extends 
Serializable> {
                                false);
 
                // Discard the state handles only after they have been 
successfully deleted from ZooKeeper.
-               for (Tuple2<StateHandle<T>, String> stateHandleAndPath : 
allStateHandles) {
+               for (Tuple2<RetrievableStateHandle<T>, String> 
stateHandleAndPath : allStateHandles) {
                        stateHandleAndPath.f0.discardState();
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
index 6692ef0..a534b40 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/zookeeper/filesystem/FileSystemStateStorageHelper.java
@@ -21,22 +21,22 @@ package org.apache.flink.runtime.zookeeper.filesystem;
 import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.state.filesystem.FileSerializableStateHandle;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.runtime.zookeeper.StateStorageHelper;
 
 import java.io.IOException;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
 
 /**
- * {@link StateStorageHelper} implementation which stores the state in the 
given filesystem path.
+ * {@link RetrievableStateStorageHelper} implementation which stores the state 
in the given filesystem path.
  *
- * @param <T>
+ * @param <T> The type of the data that can be stored by this storage helper.
  */
-public class FileSystemStateStorageHelper<T extends Serializable> implements 
StateStorageHelper<T> {
+public class FileSystemStateStorageHelper<T extends Serializable> implements 
RetrievableStateStorageHelper<T> {
 
        private final Path rootPath;
 
@@ -56,7 +56,7 @@ public class FileSystemStateStorageHelper<T extends 
Serializable> implements Sta
        }
 
        @Override
-       public StateHandle<T> store(T state) throws Exception {
+       public RetrievableStateHandle<T> store(T state) throws Exception {
                Exception latestException = null;
 
                for (int attempt = 0; attempt < 10; attempt++) {
@@ -73,8 +73,7 @@ public class FileSystemStateStorageHelper<T extends 
Serializable> implements Sta
                        try(ObjectOutputStream os = new 
ObjectOutputStream(outStream)) {
                                os.writeObject(state);
                        }
-
-                       return new FileSerializableStateHandle<>(filePath);
+                       return new RetrievableStreamStateHandle<T>(filePath);
                }
 
                throw new Exception("Could not open output stream for state 
backend", latestException);

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 356f1a9..407fa01 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -738,29 +738,18 @@ class JobManager(
           sender() ! TriggerSavepointFailure(jobId, new 
IllegalArgumentException("Unknown job."))
       }
 
-    case DisposeSavepoint(savepointPath, blobKeys) =>
+    case DisposeSavepoint(savepointPath) =>
       val senderRef = sender()
       future {
         try {
           log.info(s"Disposing savepoint at '$savepointPath'.")
 
-          if (blobKeys.isDefined) {
-            // We don't need a real ID here for the library cache manager
-            val jid = new JobID()
+          val savepoint = savepointStore.loadSavepoint(savepointPath)
 
-            try {
-              libraryCacheManager.registerJob(jid, blobKeys.get, 
java.util.Collections.emptyList())
-              val classLoader = libraryCacheManager.getClassLoader(jid)
+          log.debug(s"$savepoint")
 
-              // Discard with user code loader
-              savepointStore.disposeSavepoint(savepointPath, classLoader)
-            } finally {
-              libraryCacheManager.unregisterJob(jid)
-            }
-          } else {
-            // Discard with system class loader
-            savepointStore.disposeSavepoint(savepointPath, 
getClass.getClassLoader)
-          }
+          // Dispose the savepoint
+          savepointStore.disposeSavepoint(savepointPath)
 
           senderRef ! DisposeSavepointSuccess
         } catch {

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 40c4dcf..5e2b547 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -490,13 +490,9 @@ object JobManagerMessages {
     * Disposes a savepoint.
     *
     * @param savepointPath The path of the savepoint to dispose.
-    * @param blobKeys BLOB keys if a user program JAR was uploaded for 
disposal.
-    *                 This is required when we dispose state which contains
-    *                 custom state instances (e.g. reducing state, rocksDB 
state).
     */
   case class DisposeSavepoint(
-      savepointPath: String,
-      blobKeys: Option[java.util.List[BlobKey]] = None)
+      savepointPath: String)
     extends RequiresLeaderSessionID
 
   /** Response after a successful savepoint dispose. */

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
index 1816fc9..5416292 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java
@@ -28,14 +28,19 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
 import org.apache.flink.runtime.util.SerializableObject;
-import org.apache.flink.util.SerializedValue;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -55,8 +60,11 @@ public class CheckpointStateRestoreTest {
        @Test
        public void testSetState() {
                try {
-                       final SerializedValue<StateHandle<?>> serializedState = 
new SerializedValue<StateHandle<?>>(
-                                       new 
LocalStateHandle<SerializableObject>(new SerializableObject()));
+
+                       final ChainedStateHandle<StreamStateHandle> 
serializedState = CheckpointCoordinatorTest.generateChainedStateHandle(new 
SerializableObject());
+                       KeyGroupRange keyGroupRange = KeyGroupRange.of(0,0);
+                       List<SerializableObject> testStates = 
Collections.singletonList(new SerializableObject());
+                       final List<KeyGroupsStateHandle> 
serializedKeyGroupStates = 
CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, testStates);
 
                        final JobID jid = new JobID();
                        final JobVertexID statefulId = new JobVertexID();
@@ -106,9 +114,9 @@ public class CheckpointStateRestoreTest {
                        PendingCheckpoint pending = 
coord.getPendingCheckpoints().values().iterator().next();
                        final long checkpointId = pending.getCheckpointId();
 
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, 
serializedState, 0));
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, 
serializedState, 0));
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, 
serializedState, 0));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, 
serializedState, serializedKeyGroupStates));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId, 
serializedState, serializedKeyGroupStates));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, 
serializedState, serializedKeyGroupStates));
                        coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId));
                        coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId));
 
@@ -119,11 +127,11 @@ public class CheckpointStateRestoreTest {
                        coord.restoreLatestCheckpointedState(map, true, false);
 
                        // verify that each stateful vertex got the state
-                       verify(statefulExec1, 
times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, 
SerializedValue<StateHandle<?>>>>any());
-                       verify(statefulExec2, 
times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, 
SerializedValue<StateHandle<?>>>>any());
-                       verify(statefulExec3, 
times(1)).setInitialState(Mockito.eq(serializedState), Mockito.<Map<Integer, 
SerializedValue<StateHandle<?>>>>any());
-                       verify(statelessExec1, 
times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any(), 
Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any());
-                       verify(statelessExec2, 
times(0)).setInitialState(Mockito.<SerializedValue<StateHandle<?>>>any(), 
Mockito.<Map<Integer, SerializedValue<StateHandle<?>>>>any());
+                       verify(statefulExec1, 
times(1)).setInitialState(Mockito.eq(serializedState), 
Mockito.<List<KeyGroupsStateHandle>>any());
+                       verify(statefulExec2, 
times(1)).setInitialState(Mockito.eq(serializedState), 
Mockito.<List<KeyGroupsStateHandle>>any());
+                       verify(statefulExec3, 
times(1)).setInitialState(Mockito.eq(serializedState), 
Mockito.<List<KeyGroupsStateHandle>>any());
+                       verify(statelessExec1, 
times(0)).setInitialState(Mockito.<ChainedStateHandle<StreamStateHandle>>any(), 
Mockito.<List<KeyGroupsStateHandle>>any());
+                       verify(statelessExec2, 
times(0)).setInitialState(Mockito.<ChainedStateHandle<StreamStateHandle>>any(), 
Mockito.<List<KeyGroupsStateHandle>>any());
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -134,8 +142,10 @@ public class CheckpointStateRestoreTest {
        @Test
        public void testStateOnlyPartiallyAvailable() {
                try {
-                       final SerializedValue<StateHandle<?>> serializedState = 
new SerializedValue<StateHandle<?>>(
-                                       new 
LocalStateHandle<SerializableObject>(new SerializableObject()));
+                       final ChainedStateHandle<StreamStateHandle> 
serializedState = CheckpointCoordinatorTest.generateChainedStateHandle(new 
SerializableObject());
+                       KeyGroupRange keyGroupRange = KeyGroupRange.of(0,0);
+                       List<SerializableObject> testStates = 
Collections.singletonList(new SerializableObject());
+                       final List<KeyGroupsStateHandle> 
serializedKeyGroupStates = 
CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, testStates);
 
                        final JobID jid = new JobID();
                        final JobVertexID statefulId = new JobVertexID();
@@ -186,9 +196,9 @@ public class CheckpointStateRestoreTest {
                        final long checkpointId = pending.getCheckpointId();
 
                        // the difference to the test "testSetState" is that 
one stateful subtask does not report state
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, 
serializedState, 0));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, 
serializedState, serializedKeyGroupStates));
                        coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec2.getAttemptId(), checkpointId));
-                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, 
serializedState, 0));
+                       coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statefulExec3.getAttemptId(), checkpointId, 
serializedState, serializedKeyGroupStates));
                        coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId));
                        coord.receiveAcknowledgeMessage(new 
AcknowledgeCheckpoint(jid, statelessExec2.getAttemptId(), checkpointId));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
index 634e177..6182ffd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java
@@ -21,8 +21,9 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.CheckpointMessagesTest;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
@@ -107,8 +108,6 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
                        // The ZooKeeper implementation discards asynchronously
                        expected[i - 1].awaitDiscard();
                        assertTrue(expected[i - 1].isDiscarded());
-                       assertEquals(userClassLoader, expected[i - 
1].getDiscardClassLoader());
-
                        assertEquals(1, 
checkpoints.getNumberOfRetainedCheckpoints());
                }
        }
@@ -183,7 +182,6 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
                        // The ZooKeeper implementation discards asynchronously
                        checkpoint.awaitDiscard();
                        assertTrue(checkpoint.isDiscarded());
-                       assertEquals(userClassLoader, 
checkpoint.getDiscardClassLoader());
                }
        }
 
@@ -199,14 +197,14 @@ public abstract class CompletedCheckpointStoreTest 
extends TestLogger {
                JobVertexID jvid = new JobVertexID();
 
                Map<JobVertexID, TaskState> taskGroupStates = new HashMap<>();
-               TaskState taskState = new TaskState(jvid, numberOfStates);
+               TaskState taskState = new TaskState(jvid, numberOfStates, 
numberOfStates);
                taskGroupStates.put(jvid, taskState);
 
                for (int i = 0; i < numberOfStates; i++) {
-                       SerializedValue<StateHandle<?>> stateHandle = new 
SerializedValue<StateHandle<?>>(
+                       ChainedStateHandle<StreamStateHandle> stateHandle = 
CheckpointCoordinatorTest.generateChainedStateHandle(
                                        new CheckpointMessagesTest.MyHandle());
 
-                       taskState.putState(i, new SubtaskState(stateHandle, 0, 
0));
+                       taskState.putState(i, new SubtaskState(stateHandle, 0));
                }
 
                return new TestCompletedCheckpoint(new JobID(), id, 0, 
taskGroupStates);
@@ -230,8 +228,6 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
                // Latch for test variants which discard asynchronously
                private transient final CountDownLatch discardLatch = new 
CountDownLatch(1);
 
-               private transient ClassLoader discardClassLoader;
-
                public TestCompletedCheckpoint(
                        JobID jobId,
                        long checkpointId,
@@ -242,11 +238,10 @@ public abstract class CompletedCheckpointStoreTest 
extends TestLogger {
                }
 
                @Override
-               public void discard(ClassLoader userClassLoader) throws 
Exception {
-                       super.discard(userClassLoader);
+               public void discardState() throws Exception {
+                       super.discardState();
 
                        if (!isDiscarded) {
-                               this.discardClassLoader = userClassLoader;
                                this.isDiscarded = true;
 
                                if (discardLatch != null) {
@@ -265,10 +260,6 @@ public abstract class CompletedCheckpointStoreTest extends 
TestLogger {
                        }
                }
 
-               public ClassLoader getDiscardClassLoader() {
-                       return discardClassLoader;
-               }
-
                @Override
                public boolean equals(Object o) {
                        if (this == o) return true;

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
index 90a6836..9b04244 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointTest.java
@@ -45,14 +45,14 @@ public class CompletedCheckpointTest {
 
                // Verify discard call is forwarded to state
                CompletedCheckpoint checkpoint = new CompletedCheckpoint(new 
JobID(), 0, 0, 1, taskStates, true);
-               checkpoint.discard(ClassLoader.getSystemClassLoader());
-               verify(state, 
times(1)).discard(Matchers.any(ClassLoader.class));
+               checkpoint.discardState();
+               verify(state, times(1)).discardState();
 
                Mockito.reset(state);
 
                // Verify discard call is not forwarded to state
                checkpoint = new CompletedCheckpoint(new JobID(), 0, 0, 1, 
taskStates, false);
-               checkpoint.discard(ClassLoader.getSystemClassLoader());
-               verify(state, 
times(0)).discard(Matchers.any(ClassLoader.class));
+               checkpoint.discardState();
+               verify(state, times(0)).discardState();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index d235e61..fd4e02d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -66,7 +66,7 @@ public class PendingCheckpointTest {
                setTaskState(pending, state);
 
                pending.abortDeclined();
-               verify(state, 
times(1)).discard(Matchers.any(ClassLoader.class));
+               verify(state, times(1)).discardState();
 
                // Abort error
                Mockito.reset(state);
@@ -75,7 +75,7 @@ public class PendingCheckpointTest {
                setTaskState(pending, state);
 
                pending.abortError(new Exception("Expected Test Exception"));
-               verify(state, 
times(1)).discard(Matchers.any(ClassLoader.class));
+               verify(state, times(1)).discardState();
 
                // Abort expired
                Mockito.reset(state);
@@ -84,7 +84,7 @@ public class PendingCheckpointTest {
                setTaskState(pending, state);
 
                pending.abortExpired();
-               verify(state, 
times(1)).discard(Matchers.any(ClassLoader.class));
+               verify(state, times(1)).discardState();
 
                // Abort subsumed
                Mockito.reset(state);
@@ -93,7 +93,7 @@ public class PendingCheckpointTest {
                setTaskState(pending, state);
 
                pending.abortSubsumed();
-               verify(state, 
times(1)).discard(Matchers.any(ClassLoader.class));
+               verify(state, times(1)).discardState();
        }
 
        /**
@@ -106,21 +106,20 @@ public class PendingCheckpointTest {
                PendingCheckpoint pending = createPendingCheckpoint();
                PendingCheckpointTest.setTaskState(pending, state);
 
-               pending.acknowledgeTask(ATTEMPT_ID, null, 0, null);
+               pending.acknowledgeTask(ATTEMPT_ID, null, null);
 
                CompletedCheckpoint checkpoint = pending.finalizeCheckpoint();
 
                // Does discard state
-               checkpoint.discard(ClassLoader.getSystemClassLoader());
-               verify(state, 
times(1)).discard(Matchers.any(ClassLoader.class));
+               checkpoint.discardState();
+               verify(state, times(1)).discardState();
        }
 
        // 
------------------------------------------------------------------------
 
        private static PendingCheckpoint createPendingCheckpoint() {
-               ClassLoader classLoader = ClassLoader.getSystemClassLoader();
                Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new 
HashMap<>(ACK_TASKS);
-               return new PendingCheckpoint(new JobID(), 0, 1, ackTasks, 
classLoader);
+               return new PendingCheckpoint(new JobID(), 0, 1, ackTasks);
        }
 
        @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
index 6ae6e1c..7258545 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingSavepointTest.java
@@ -71,7 +71,7 @@ public class PendingSavepointTest {
                PendingCheckpointTest.setTaskState(pending, state);
 
                pending.abortDeclined();
-               verify(state, 
times(1)).discard(Matchers.any(ClassLoader.class));
+               verify(state, times(1)).discardState();
 
                // Abort error
                Mockito.reset(state);
@@ -81,7 +81,7 @@ public class PendingSavepointTest {
                Future<String> future = pending.getCompletionFuture();
 
                pending.abortError(new Exception("Expected Test Exception"));
-               verify(state, 
times(1)).discard(Matchers.any(ClassLoader.class));
+               verify(state, times(1)).discardState();
                assertTrue(future.failed().isCompleted());
 
                // Abort expired
@@ -92,7 +92,7 @@ public class PendingSavepointTest {
                future = pending.getCompletionFuture();
 
                pending.abortExpired();
-               verify(state, 
times(1)).discard(Matchers.any(ClassLoader.class));
+               verify(state, times(1)).discardState();
                assertTrue(future.failed().isCompleted());
 
                // Abort subsumed
@@ -117,13 +117,13 @@ public class PendingSavepointTest {
 
                Future<String> future = pending.getCompletionFuture();
 
-               pending.acknowledgeTask(ATTEMPT_ID, null, 0, null);
+               pending.acknowledgeTask(ATTEMPT_ID, null, null);
 
                CompletedCheckpoint checkpoint = pending.finalizeCheckpoint();
 
                // Does _NOT_ discard state
-               checkpoint.discard(ClassLoader.getSystemClassLoader());
-               verify(state, 
times(0)).discard(Matchers.any(ClassLoader.class));
+               checkpoint.discardState();
+               verify(state, times(0)).discardState();
 
                // Future is completed
                String path = Await.result(future, Duration.Zero());
@@ -133,9 +133,8 @@ public class PendingSavepointTest {
        // 
------------------------------------------------------------------------
 
        private static PendingSavepoint createPendingSavepoint() {
-               ClassLoader classLoader = ClassLoader.getSystemClassLoader();
                Map<ExecutionAttemptID, ExecutionVertex> ackTasks = new 
HashMap<>(ACK_TASKS);
-               return new PendingSavepoint(new JobID(), 0, 1, ackTasks, 
classLoader, new HeapSavepointStore());
+               return new PendingSavepoint(new JobID(), 0, 1, ackTasks, new 
HeapSavepointStore());
        }
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 380ba2c..f273797 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -19,9 +19,8 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.curator.framework.CuratorFramework;
-import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.zookeeper.StateStorageHelper;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -29,6 +28,8 @@ import org.junit.Test;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -61,10 +62,10 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                        int maxNumberOfCheckpointsToRetain, ClassLoader 
userLoader) throws Exception {
 
                return new 
ZooKeeperCompletedCheckpointStore(maxNumberOfCheckpointsToRetain, userLoader,
-                       ZooKeeper.createClient(), CheckpointsPath, new 
StateStorageHelper<CompletedCheckpoint>() {
+                       ZooKeeper.createClient(), CheckpointsPath, new 
RetrievableStateStorageHelper<CompletedCheckpoint>() {
                        @Override
-                       public StateHandle<CompletedCheckpoint> 
store(CompletedCheckpoint state) throws Exception {
-                               return new LocalStateHandle<>(state);
+                       public RetrievableStateHandle<CompletedCheckpoint> 
store(CompletedCheckpoint state) throws Exception {
+                               return new 
HeapRetrievableStateHandle<CompletedCheckpoint>(state);
                        }
                });
        }
@@ -160,4 +161,35 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                CompletedCheckpoint recovered = store.getLatestCheckpoint();
                assertEquals(checkpoint, recovered);
        }
+
+       static class HeapRetrievableStateHandle<T extends Serializable> 
implements RetrievableStateHandle<T> {
+
+               private static final long serialVersionUID = -268548467968932L;
+
+               public HeapRetrievableStateHandle(T state) {
+                       this.state = state;
+               }
+
+               private T state;
+
+               @Override
+               public T retrieveState() throws Exception {
+                       return state;
+               }
+
+               @Override
+               public void discardState() throws Exception {
+                       state = null;
+               }
+
+               @Override
+               public long getStateSize() throws Exception {
+                       return 0;
+               }
+
+               @Override
+               public void close() throws IOException {
+                       
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
index 6b8c651..3e2de80 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/FsSavepointStoreTest.java
@@ -58,7 +58,7 @@ public class FsSavepointStoreTest {
                assertEquals(0, tmp.getRoot().listFiles().length);
 
                // Store
-               SavepointV0 stored = new SavepointV0(1929292, 
SavepointV0Test.createTaskStates(4, 24));
+               SavepointV1 stored = new SavepointV1(1929292, 
SavepointV1Test.createTaskStates(4, 24));
                String path = store.storeSavepoint(stored);
                assertEquals(1, tmp.getRoot().listFiles().length);
 
@@ -67,7 +67,7 @@ public class FsSavepointStoreTest {
                assertEquals(stored, loaded);
 
                // Dispose
-               store.disposeSavepoint(path, 
ClassLoader.getSystemClassLoader());
+               store.disposeSavepoint(path);
 
                assertEquals(0, tmp.getRoot().listFiles().length);
        }
@@ -122,7 +122,7 @@ public class FsSavepointStoreTest {
                assertEquals(1, tmp.getRoot().listFiles().length);
 
                // Savepoint v0
-               Savepoint savepoint = new SavepointV0(checkpointId, 
SavepointV0Test.createTaskStates(4, 32));
+               Savepoint savepoint = new SavepointV1(checkpointId, 
SavepointV1Test.createTaskStates(4, 32));
                String pathSavepoint = store.storeSavepoint(savepoint);
                assertEquals(2, tmp.getRoot().listFiles().length);
 
@@ -208,7 +208,7 @@ public class FsSavepointStoreTest {
                }
 
                @Override
-               public void dispose(ClassLoader classLoader) {
+               public void dispose() {
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
index 6a85195..d703bd6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointLoaderTest.java
@@ -65,7 +65,7 @@ public class SavepointLoaderTest {
                                true);
 
                // Store savepoint
-               SavepointV0 savepoint = new 
SavepointV0(stored.getCheckpointID(), taskStates.values());
+               SavepointV1 savepoint = new 
SavepointV1(stored.getCheckpointID(), taskStates.values());
                SavepointStore store = new HeapSavepointStore();
                String path = store.storeSavepoint(savepoint);
 
@@ -84,8 +84,8 @@ public class SavepointLoaderTest {
                assertEquals(stored.getCheckpointID(), 
loaded.getCheckpointID());
 
                // The loaded checkpoint should not discard state when its 
discarded
-               loaded.discard(ClassLoader.getSystemClassLoader());
-               verify(state, times(0)).discard(any(ClassLoader.class));
+               loaded.discardState();
+               verify(state, times(0)).discardState();
 
                // 2) Load and validate: parallelism mismatch
                when(vertex.getParallelism()).thenReturn(222);

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java
deleted file mode 100644
index b656d90..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0SerializerTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.commons.io.output.ByteArrayOutputStream;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-
-import static org.junit.Assert.assertEquals;
-
-public class SavepointV0SerializerTest {
-
-       /**
-        * Test serialization of {@link SavepointV0} instance.
-        */
-       @Test
-       public void testSerializeDeserializeV1() throws Exception {
-               SavepointV0 expected = new SavepointV0(123123, 
SavepointV0Test.createTaskStates(8, 32));
-
-               SavepointV0Serializer serializer = 
SavepointV0Serializer.INSTANCE;
-
-               // Serialize
-               ByteArrayOutputStream baos = new ByteArrayOutputStream();
-               serializer.serialize(expected, new 
DataOutputViewStreamWrapper(baos));
-               byte[] bytes = baos.toByteArray();
-
-               // Deserialize
-               ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
-               Savepoint actual = serializer.deserialize(new 
DataInputViewStreamWrapper(bais));
-
-               assertEquals(expected, actual);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java
deleted file mode 100644
index 4d72c42..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV0Test.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.checkpoint.savepoint;
-
-import org.apache.flink.runtime.checkpoint.SubtaskState;
-import org.apache.flink.runtime.checkpoint.TaskState;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.messages.CheckpointMessagesTest;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-public class SavepointV0Test {
-
-       /**
-        * Simple test of savepoint methods.
-        */
-       @Test
-       public void testSavepointV0() throws Exception {
-               long checkpointId = 
ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE);
-               int numTaskStates = 4;
-               int numSubtaskStates = 16;
-
-               Collection<TaskState> expected = 
createTaskStates(numTaskStates, numSubtaskStates);
-
-               SavepointV0 savepoint = new SavepointV0(checkpointId, expected);
-
-               assertEquals(SavepointV0.VERSION, savepoint.getVersion());
-               assertEquals(checkpointId, savepoint.getCheckpointId());
-               assertEquals(expected, savepoint.getTaskStates());
-
-               assertFalse(savepoint.getTaskStates().isEmpty());
-               savepoint.dispose(ClassLoader.getSystemClassLoader());
-               assertTrue(savepoint.getTaskStates().isEmpty());
-       }
-
-       static Collection<TaskState> createTaskStates(int numTaskStates, int 
numSubtaskStates) throws IOException {
-               List<TaskState> taskStates = new ArrayList<>(numTaskStates);
-
-               for (int i = 0; i < numTaskStates; i++) {
-                       TaskState taskState = new TaskState(new JobVertexID(), 
numSubtaskStates);
-                       for (int j = 0; j < numSubtaskStates; j++) {
-                               SerializedValue<StateHandle<?>> stateHandle = 
new SerializedValue<StateHandle<?>>(
-                                               new 
CheckpointMessagesTest.MyHandle());
-
-                               taskState.putState(i, new 
SubtaskState(stateHandle, 0, 0));
-                       }
-
-                       taskStates.add(taskState);
-               }
-
-               return taskStates;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
new file mode 100644
index 0000000..bad836b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1SerializerTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+
+import static org.junit.Assert.assertEquals;
+
+public class SavepointV1SerializerTest {
+
+       /**
+        * Test serialization of {@link SavepointV1} instance.
+        */
+       @Test
+       public void testSerializeDeserializeV1() throws Exception {
+               SavepointV1 expected = new SavepointV1(123123, 
SavepointV1Test.createTaskStates(8, 32));
+
+               SavepointV1Serializer serializer = 
SavepointV1Serializer.INSTANCE;
+
+               // Serialize
+               ByteArrayOutputStream baos = new ByteArrayOutputStream();
+               serializer.serialize(expected, new 
DataOutputViewStreamWrapper(baos));
+               byte[] bytes = baos.toByteArray();
+
+               // Deserialize
+               ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+               Savepoint actual = serializer.deserialize(new 
DataInputViewStreamWrapper(bais));
+
+               assertEquals(expected, actual);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java
new file mode 100644
index 0000000..ef10032
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV1Test.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint.savepoint;
+
+import org.apache.flink.runtime.checkpoint.SubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupRangeOffsets;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SavepointV1Test {
+
+       /**
+        * Simple test of savepoint methods.
+        */
+       @Test
+       public void testSavepointV1() throws Exception {
+               long checkpointId = 
ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE);
+               int numTaskStates = 4;
+               int numSubtaskStates = 16;
+
+               Collection<TaskState> expected = 
createTaskStates(numTaskStates, numSubtaskStates);
+
+               SavepointV1 savepoint = new SavepointV1(checkpointId, expected);
+
+               assertEquals(SavepointV1.VERSION, savepoint.getVersion());
+               assertEquals(checkpointId, savepoint.getCheckpointId());
+               assertEquals(expected, savepoint.getTaskStates());
+
+               assertFalse(savepoint.getTaskStates().isEmpty());
+               savepoint.dispose();
+               assertTrue(savepoint.getTaskStates().isEmpty());
+       }
+
+       static Collection<TaskState> createTaskStates(int numTaskStates, int 
numSubtaskStates) throws IOException {
+               List<TaskState> taskStates = new ArrayList<>(numTaskStates);
+
+               for (int i = 0; i < numTaskStates; i++) {
+                       TaskState taskState = new TaskState(new JobVertexID(), 
numSubtaskStates, numSubtaskStates);
+                       for (int j = 0; j < numSubtaskStates; j++) {
+                               StreamStateHandle stateHandle = new 
ByteStreamStateHandle("Hello".getBytes());
+                               taskState.putState(i, new SubtaskState(
+                                               new 
ChainedStateHandle<>(Collections.singletonList(stateHandle)), 0));
+                       }
+
+                       taskState.putKeyedState(
+                                       0,
+                                       new KeyGroupsStateHandle(
+                                                       new 
KeyGroupRangeOffsets(1,1, new long[] {42}), new 
ByteStreamStateHandle("Hello".getBytes())));
+
+                       taskStates.add(taskState);
+               }
+
+               return taskStates;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
index 12bbf82..c513e26 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.java
@@ -19,17 +19,18 @@
 package org.apache.flink.runtime.checkpoint.stats;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.Path;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.SubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskState;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -51,7 +52,7 @@ import static org.mockito.Mockito.when;
 public class SimpleCheckpointStatsTrackerTest {
 
        private static final Random RAND = new Random();
-       
+
        @Test
        public void testNoCompletedCheckpointYet() throws Exception {
                CheckpointStatsTracker tracker = new 
SimpleCheckpointStatsTracker(
@@ -154,7 +155,7 @@ public class SimpleCheckpointStatsTrackerTest {
        private static void verifyJobStats(
                        CheckpointStatsTracker tracker,
                        int historySize,
-                       CompletedCheckpoint[] checkpoints) {
+                       CompletedCheckpoint[] checkpoints) throws Exception {
 
                assertTrue(tracker.getJobStats().isDefined());
                JobCheckpointStats jobStats = tracker.getJobStats().get();
@@ -275,14 +276,15 @@ public class SimpleCheckpointStatsTrackerTest {
        }
 
        private static CompletedCheckpoint[] generateRandomCheckpoints(
-                       int numCheckpoints) throws IOException {
+                       int numCheckpoints) throws Exception {
 
                // Config
                JobID jobId = new JobID();
                int minNumOperators = 4;
                int maxNumOperators = 32;
-               int minParallelism = 4;
-               int maxParallelism = 16;
+               int minOperatorParallelism = 4;
+               int maxOperatorParallelism = 16;
+               int maxParallelism = 32;
 
                // Use yuge numbers here in order to test that summing up state 
sizes
                // does not overflow. This was a bug in the initial version, 
because
@@ -299,7 +301,7 @@ public class SimpleCheckpointStatsTrackerTest {
 
                for (int i = 0; i < numOperators; i++) {
                        operatorIds[i] = new JobVertexID();
-                       operatorParallelism[i] = RAND.nextInt(maxParallelism - 
minParallelism + 1) + minParallelism;
+                       operatorParallelism[i] = 
RAND.nextInt(maxOperatorParallelism - minOperatorParallelism + 1) + 
minOperatorParallelism;
                }
 
                // Generate checkpoints
@@ -317,7 +319,7 @@ public class SimpleCheckpointStatsTrackerTest {
                                JobVertexID operatorId = 
operatorIds[operatorIndex];
                                int parallelism = 
operatorParallelism[operatorIndex];
 
-                               TaskState taskState = new TaskState(operatorId, 
parallelism);
+                               TaskState taskState = new TaskState(operatorId, 
parallelism, maxParallelism);
 
                                taskGroupStates.put(operatorId, taskState);
 
@@ -328,9 +330,11 @@ public class SimpleCheckpointStatsTrackerTest {
                                                completionDuration = duration;
                                        }
 
+                                       final long proxySize = minStateSize + 
((long) (RAND.nextDouble() * (maxStateSize - minStateSize)));
+                                       StreamStateHandle proxy = new 
StateHandleProxy(new Path(), proxySize);
+
                                        SubtaskState subtaskState = new 
SubtaskState(
-                                               new 
SerializedValue<StateHandle<?>>(null),
-                                               minStateSize + ((long) 
(RAND.nextDouble() * (maxStateSize - minStateSize))),
+                                               new 
ChainedStateHandle<>(Arrays.asList(proxy)),
                                                duration);
 
                                        taskState.putState(subtaskIndex, 
subtaskState);
@@ -356,10 +360,32 @@ public class SimpleCheckpointStatsTrackerTest {
                        ExecutionJobVertex v = mock(ExecutionJobVertex.class);
                        when(v.getJobVertexId()).thenReturn(operatorId);
                        when(v.getParallelism()).thenReturn(parallelism);
-                       
+
                        jobVertices.add(v);
                }
 
                return jobVertices;
        }
+
+       private static class StateHandleProxy extends FileStateHandle {
+
+               private static final long serialVersionUID = 35356735683568L;
+
+               public StateHandleProxy(Path filePath, long proxySize) {
+                       super(filePath);
+                       this.proxySize = proxySize;
+               }
+
+               private long proxySize;
+
+               @Override
+               public void discardState() throws Exception {
+
+               }
+
+               @Override
+               public long getStateSize() {
+                       return proxySize;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 0e1c7c5..6b80c3d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -53,8 +53,11 @@ import 
org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
@@ -62,6 +65,7 @@ import org.apache.flink.runtime.testingUtils.TestingMessages;
 import org.apache.flink.runtime.testingUtils.TestingTaskManager;
 import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.InstantiationUtil;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
@@ -141,38 +145,38 @@ public class JobManagerHARecoveryTest {
                        instanceManager.addInstanceListener(scheduler);
 
                        archive = system.actorOf(Props.create(
-                               MemoryArchivist.class,
-                               10), "archive");
+                                       MemoryArchivist.class,
+                                       10), "archive");
 
                        Props jobManagerProps = Props.create(
-                               TestingJobManager.class,
-                               flinkConfiguration,
-                               new ForkJoinPool(),
-                               instanceManager,
-                               scheduler,
-                               new BlobLibraryCacheManager(new 
BlobServer(flinkConfiguration), 3600000),
-                               archive,
-                               new 
FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
-                               timeout,
-                               myLeaderElectionService,
-                               mySubmittedJobGraphStore,
-                               checkpointStateFactory,
-                               new HeapSavepointStore(),
-                               jobRecoveryTimeout,
-                               Option.apply(null));
+                                       TestingJobManager.class,
+                                       flinkConfiguration,
+                                       new ForkJoinPool(),
+                                       instanceManager,
+                                       scheduler,
+                                       new BlobLibraryCacheManager(new 
BlobServer(flinkConfiguration), 3600000),
+                                       archive,
+                                       new 
FixedDelayRestartStrategy.FixedDelayRestartStrategyFactory(Int.MaxValue(), 100),
+                                       timeout,
+                                       myLeaderElectionService,
+                                       mySubmittedJobGraphStore,
+                                       checkpointStateFactory,
+                                       new HeapSavepointStore(),
+                                       jobRecoveryTimeout,
+                                       Option.apply(null));
 
                        jobManager = system.actorOf(jobManagerProps, 
"jobmanager");
                        ActorGateway gateway = new AkkaActorGateway(jobManager, 
leaderSessionID);
 
                        taskManager = 
TaskManager.startTaskManagerComponentsAndActor(
-                               flinkConfiguration,
-                               ResourceID.generate(),
-                               system,
-                               "localhost",
-                               Option.apply("taskmanager"),
-                               Option.apply((LeaderRetrievalService) 
myLeaderRetrievalService),
-                               true,
-                               TestingTaskManager.class);
+                                       flinkConfiguration,
+                                       ResourceID.generate(),
+                                       system,
+                                       "localhost",
+                                       Option.apply("taskmanager"),
+                                       Option.apply((LeaderRetrievalService) 
myLeaderRetrievalService),
+                                       true,
+                                       TestingTaskManager.class);
 
                        ActorGateway tmGateway = new 
AkkaActorGateway(taskManager, leaderSessionID);
 
@@ -199,12 +203,12 @@ public class JobManagerHARecoveryTest {
                        
BlockingStatefulInvokable.initializeStaticHelpers(slots);
 
                        Future<Object> isLeader = gateway.ask(
-                               TestingJobManagerMessages.getNotifyWhenLeader(),
-                               deadline.timeLeft());
+                                       
TestingJobManagerMessages.getNotifyWhenLeader(),
+                                       deadline.timeLeft());
 
                        Future<Object> isConnectedToJobManager = tmGateway.ask(
-                               new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager),
-                               deadline.timeLeft());
+                                       new 
TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(jobManager),
+                                       deadline.timeLeft());
 
                        // tell jobManager that he's the leader
                        myLeaderElectionService.isLeader(leaderSessionID);
@@ -216,8 +220,8 @@ public class JobManagerHARecoveryTest {
 
                        // submit blocking job
                        Future<Object> jobSubmitted = gateway.ask(
-                               new JobManagerMessages.SubmitJob(jobGraph, 
ListeningBehaviour.DETACHED),
-                               deadline.timeLeft());
+                                       new 
JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED),
+                                       deadline.timeLeft());
 
                        Await.ready(jobSubmitted, deadline.timeLeft());
 
@@ -298,7 +302,7 @@ public class JobManagerHARecoveryTest {
                public void addCheckpoint(CompletedCheckpoint checkpoint) 
throws Exception {
                        checkpoints.addLast(checkpoint);
                        if (checkpoints.size() > 1) {
-                               
checkpoints.removeFirst().discard(ClassLoader.getSystemClassLoader());
+                               checkpoints.removeFirst().discardState();
                        }
                }
 
@@ -342,10 +346,12 @@ public class JobManagerHARecoveryTest {
                }
 
                @Override
-               public void start() {}
+               public void start() {
+               }
 
                @Override
-               public void stop() {}
+               public void stop() {
+               }
 
                @Override
                public CompletedCheckpointStore createCheckpointStore(JobID 
jobId, ClassLoader userClassLoader) throws Exception {
@@ -408,7 +414,7 @@ public class JobManagerHARecoveryTest {
 
                @Override
                public void invoke() throws Exception {
-                       while(blocking) {
+                       while (blocking) {
                                synchronized (lock) {
                                        lock.wait();
                                }
@@ -424,7 +430,7 @@ public class JobManagerHARecoveryTest {
                }
        }
 
-       public static class BlockingStatefulInvokable extends BlockingInvokable 
implements StatefulTask<StateHandle<Long>> {
+       public static class BlockingStatefulInvokable extends BlockingInvokable 
implements StatefulTask {
 
                private static final int NUM_CHECKPOINTS_TO_COMPLETE = 5;
 
@@ -435,18 +441,28 @@ public class JobManagerHARecoveryTest {
                private int completedCheckpoints = 0;
 
                @Override
-               public void setInitialState(StateHandle<Long> stateHandle) 
throws Exception {
+               public void 
setInitialState(ChainedStateHandle<StreamStateHandle> chainedState, 
List<KeyGroupsStateHandle> keyGroupsState) throws Exception {
                        int subtaskIndex = getIndexInSubtaskGroup();
                        if (subtaskIndex < recoveredStates.length) {
-                               recoveredStates[subtaskIndex] = 
stateHandle.getState(getUserCodeClassLoader());
+                               recoveredStates[subtaskIndex] = 
InstantiationUtil.deserializeObject(chainedState.get(0).openInputStream());
                        }
                }
 
                @Override
                public boolean triggerCheckpoint(long checkpointId, long 
timestamp) {
-                       StateHandle<Long> state = new 
LocalStateHandle<>(checkpointId);
-                       getEnvironment().acknowledgeCheckpoint(checkpointId, 
state);
-                       return true;
+                       try {
+                               ByteStreamStateHandle byteStreamStateHandle = 
new ByteStreamStateHandle(
+                                               
InstantiationUtil.serializeObject(checkpointId));
+                               RetrievableStreamStateHandle<Long> state = new 
RetrievableStreamStateHandle<Long>(byteStreamStateHandle);
+                               ChainedStateHandle<StreamStateHandle> 
chainedStateHandle = new 
ChainedStateHandle<StreamStateHandle>(Collections.singletonList(state));
+                               getEnvironment().acknowledgeCheckpoint(
+                                               checkpointId,
+                                               chainedStateHandle,
+                                               
Collections.<KeyGroupsStateHandle>emptyList());
+                               return true;
+                       } catch (Exception ex) {
+                               throw new RuntimeException(ex);
+                       }
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
index 426dfba..6ef184d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/ZooKeeperSubmittedJobGraphsStoreITCase.java
@@ -19,16 +19,17 @@
 package org.apache.flink.runtime.jobmanager;
 
 import akka.actor.ActorRef;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.akka.ListeningBehaviour;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import 
org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener;
-import org.apache.flink.runtime.state.LocalStateHandle;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.zookeeper.StateStorageHelper;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -36,6 +37,7 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
@@ -57,10 +59,12 @@ public class ZooKeeperSubmittedJobGraphsStoreITCase extends 
TestLogger {
 
        private final static ZooKeeperTestEnvironment ZooKeeper = new 
ZooKeeperTestEnvironment(1);
 
-       private final static StateStorageHelper<SubmittedJobGraph> 
localStateStorage = new StateStorageHelper<SubmittedJobGraph>() {
+       private final static RetrievableStateStorageHelper<SubmittedJobGraph> 
localStateStorage = new RetrievableStateStorageHelper<SubmittedJobGraph>() {
                @Override
-               public StateHandle<SubmittedJobGraph> store(SubmittedJobGraph 
state) throws Exception {
-                       return new LocalStateHandle<>(state);
+               public RetrievableStateHandle<SubmittedJobGraph> 
store(SubmittedJobGraph state) throws IOException {
+                       ByteStreamStateHandle byteStreamStateHandle = new 
ByteStreamStateHandle(
+                                       
InstantiationUtil.serializeObject(state));
+                       return new 
RetrievableStreamStateHandle<SubmittedJobGraph>(byteStreamStateHandle);
                }
        };
 

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index 73bf204..c6eb249 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -18,32 +18,38 @@
 
 package org.apache.flink.runtime.messages;
 
-import static org.junit.Assert.*;
-
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTest;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
+import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
 import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.util.SerializedValue;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.fail;
 
 public class CheckpointMessagesTest {
-       
+
        @Test
        public void testTriggerAndConfirmCheckpoint() {
                try {
                        NotifyCheckpointComplete cc = new 
NotifyCheckpointComplete(new JobID(), new ExecutionAttemptID(), 
45287698767345L, 467L);
                        testSerializabilityEqualsHashCode(cc);
-                       
+
                        TriggerCheckpoint tc = new TriggerCheckpoint(new 
JobID(), new ExecutionAttemptID(), 347652734L, 7576752L);
                        testSerializabilityEqualsHashCode(tc);
-                       
+
                }
                catch (Exception e) {
                        e.printStackTrace();
@@ -55,35 +61,40 @@ public class CheckpointMessagesTest {
        public void testConfirmTaskCheckpointed() {
                try {
                        AcknowledgeCheckpoint noState = new 
AcknowledgeCheckpoint(
-                                                                               
        new JobID(), new ExecutionAttemptID(), 569345L);
+                                       new JobID(), new ExecutionAttemptID(), 
569345L);
+
+                       KeyGroupRange keyGroupRange = KeyGroupRange.of(42,42);
 
                        AcknowledgeCheckpoint withState = new 
AcknowledgeCheckpoint(
-                                                                               
        new JobID(), new ExecutionAttemptID(), 87658976143L, 
-                                                                               
        new SerializedValue<StateHandle<?>>(new MyHandle()), 0);
-                       
+                                       new JobID(),
+                                       new ExecutionAttemptID(),
+                                       87658976143L,
+                                       
CheckpointCoordinatorTest.generateChainedStateHandle(new MyHandle()),
+                                       
CheckpointCoordinatorTest.generateKeyGroupState(
+                                                       keyGroupRange, 
Collections.singletonList(new MyHandle())));
+
                        testSerializabilityEqualsHashCode(noState);
                        testSerializabilityEqualsHashCode(withState);
-               }
-               catch (Exception e) {
+               } catch (Exception e) {
                        e.printStackTrace();
                        fail(e.getMessage());
                }
        }
-       
+
        private static void testSerializabilityEqualsHashCode(Serializable o) 
throws IOException {
                Object copy = CommonTestUtils.createCopySerializable(o);
+               System.out.println(o.getClass() +" "+copy.getClass());
                assertEquals(o, copy);
                assertEquals(o.hashCode(), copy.hashCode());
                assertNotNull(o.toString());
                assertNotNull(copy.toString());
        }
-       
-       public static class MyHandle implements StateHandle<Serializable> {
+
+       public static class MyHandle implements StreamStateHandle {
 
                private static final long serialVersionUID = 
8128146204128728332L;
 
-               @Override
-               public Serializable getState(ClassLoader userCodeClassLoader) {
+               public Serializable get(ClassLoader userCodeClassLoader) {
                        return null;
                }
 
@@ -107,5 +118,10 @@ public class CheckpointMessagesTest {
 
                @Override
                public void close() throws IOException {}
+
+               @Override
+               public FSDataInputStream openInputStream() throws Exception {
+                       return null;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 87540bc..19317f9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -36,10 +36,13 @@ import 
org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Future;
 
@@ -149,10 +152,15 @@ public class DummyEnvironment implements Environment {
        }
 
        @Override
-       public void acknowledgeCheckpoint(long checkpointId) {}
+       public void acknowledgeCheckpoint(long checkpointId) {
+
+       }
 
        @Override
-       public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> 
state) {}
+       public void acknowledgeCheckpoint(long checkpointId,
+                       ChainedStateHandle<StreamStateHandle> 
chainedStateHandle,
+                       List<KeyGroupsStateHandle> keyGroupStateHandles) {
+       }
 
        @Override
        public void failExternally(Throwable cause) {

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 7b966c3..2c76399 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -45,7 +45,10 @@ import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.types.Record;
 import org.apache.flink.util.MutableObjectIterator;
@@ -96,9 +99,13 @@ public class MockEnvironment implements Environment {
        private final int bufferSize;
 
        public MockEnvironment(String taskName, long memorySize, 
MockInputSplitProvider inputSplitProvider, int bufferSize) {
+               this(taskName, memorySize, inputSplitProvider, bufferSize, new 
Configuration());
+       }
+
+       public MockEnvironment(String taskName, long memorySize, 
MockInputSplitProvider inputSplitProvider, int bufferSize, Configuration 
taskConfiguration) {
                this.taskInfo = new TaskInfo(taskName, 0, 1, 0);
                this.jobConfiguration = new Configuration();
-               this.taskConfiguration = new Configuration();
+               this.taskConfiguration = taskConfiguration;
                this.inputs = new LinkedList<InputGate>();
                this.outputs = new LinkedList<ResultPartitionWriter>();
 
@@ -298,7 +305,9 @@ public class MockEnvironment implements Environment {
        }
 
        @Override
-       public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> 
state) {
+       public void acknowledgeCheckpoint(long checkpointId,
+                       ChainedStateHandle<StreamStateHandle> 
chainedStateHandle,
+                       List<KeyGroupsStateHandle> keyGroupStateHandles) {
                throw new UnsupportedOperationException();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
index ad3339a..40e1852 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/AbstractCloseableHandleTest.java
@@ -85,5 +85,15 @@ public class AbstractCloseableHandleTest {
 
        private static final class CloseableHandle extends 
AbstractCloseableHandle {
                private static final long serialVersionUID = 1L;
+
+               @Override
+               public void discardState() throws Exception {
+
+               }
+
+               @Override
+               public long getStateSize() throws Exception {
+                       return 0;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
index 0f1c0f7..04fa089 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -24,7 +24,8 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
+
+import org.apache.flink.runtime.state.filesystem.FileStateHandle;
 import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 
@@ -86,7 +87,12 @@ public class FileStateBackendTest extends 
StateBackendTestBase<FsStateBackend> {
 
                        // no file operations should be possible right now
                        try {
-                               backend.checkpointStateSerializable("exception 
train rolling in", 2L, System.currentTimeMillis());
+                               FsStateBackend.FsCheckpointStateOutputStream 
out = backend.createCheckpointStateOutputStream(
+                                               2L,
+                                               System.currentTimeMillis());
+
+                               out.write(1);
+                               out.closeAndGetHandle();
                                fail("should fail with an exception");
                        } catch (IllegalStateException e) {
                                // supreme!
@@ -114,43 +120,6 @@ public class FileStateBackendTest extends 
StateBackendTestBase<FsStateBackend> {
        }
 
        @Test
-       public void testSerializableState() {
-               File tempDir = new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString());
-               try {
-                       FsStateBackend backend = 
CommonTestUtils.createCopySerializable(new 
FsStateBackend(localFileUri(tempDir)));
-                       backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test-op", IntSerializer.INSTANCE);
-
-                       File checkpointDir = new 
File(backend.getCheckpointDirectory().toUri().getPath());
-
-                       String state1 = "dummy state";
-                       String state2 = "row row row your boat";
-                       Integer state3 = 42;
-
-                       StateHandle<String> handle1 = 
backend.checkpointStateSerializable(state1, 439568923746L, 
System.currentTimeMillis());
-                       StateHandle<String> handle2 = 
backend.checkpointStateSerializable(state2, 439568923746L, 
System.currentTimeMillis());
-                       StateHandle<Integer> handle3 = 
backend.checkpointStateSerializable(state3, 439568923746L, 
System.currentTimeMillis());
-
-                       assertEquals(state1, 
handle1.getState(getClass().getClassLoader()));
-                       handle1.discardState();
-
-                       assertEquals(state2, 
handle2.getState(getClass().getClassLoader()));
-                       handle2.discardState();
-
-                       assertEquals(state3, 
handle3.getState(getClass().getClassLoader()));
-                       handle3.discardState();
-
-                       assertTrue(isDirectoryEmpty(checkpointDir));
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-               finally {
-                       deleteDirectorySilently(tempDir);
-               }
-       }
-
-       @Test
        public void testStateOutputStream() {
                File tempDir = new 
File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, 
UUID.randomUUID().toString());
                try {
@@ -185,16 +154,16 @@ public class FileStateBackendTest extends 
StateBackendTestBase<FsStateBackend> {
                        stream2.write(state2);
                        stream3.write(state3);
 
-                       FileStreamStateHandle handle1 = (FileStreamStateHandle) 
stream1.closeAndGetHandle();
+                       FileStateHandle handle1 = (FileStateHandle) 
stream1.closeAndGetHandle();
                        ByteStreamStateHandle handle2 = (ByteStreamStateHandle) 
stream2.closeAndGetHandle();
                        ByteStreamStateHandle handle3 = (ByteStreamStateHandle) 
stream3.closeAndGetHandle();
 
                        // use with try-with-resources
-                       FileStreamStateHandle handle4;
+                       StreamStateHandle handle4;
                        try (AbstractStateBackend.CheckpointStateOutputStream 
stream4 =
                                        
backend.createCheckpointStateOutputStream(checkpointId, 
System.currentTimeMillis())) {
                                stream4.write(state4);
-                               handle4 = (FileStreamStateHandle) 
stream4.closeAndGetHandle();
+                               handle4 = stream4.closeAndGetHandle();
                        }
 
                        // close before accessing handle
@@ -209,18 +178,18 @@ public class FileStateBackendTest extends 
StateBackendTestBase<FsStateBackend> {
                                // uh-huh
                        }
 
-                       
validateBytesInStream(handle1.getState(getClass().getClassLoader()), state1);
+                       validateBytesInStream(handle1.openInputStream(), 
state1);
                        handle1.discardState();
                        assertFalse(isDirectoryEmpty(checkpointDir));
                        ensureLocalFileDeleted(handle1.getFilePath());
 
-                       
validateBytesInStream(handle2.getState(getClass().getClassLoader()), state2);
+                       validateBytesInStream(handle2.openInputStream(), 
state2);
                        handle2.discardState();
 
-                       
validateBytesInStream(handle3.getState(getClass().getClassLoader()), state3);
+                       validateBytesInStream(handle3.openInputStream(), 
state3);
                        handle3.discardState();
 
-                       
validateBytesInStream(handle4.getState(getClass().getClassLoader()), state4);
+                       validateBytesInStream(handle4.openInputStream(), 
state4);
                        handle4.discardState();
                        assertTrue(isDirectoryEmpty(checkpointDir));
                }

Reply via email to