http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
deleted file mode 100644
index 5964b72..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle;
-import org.apache.flink.runtime.state.filesystem.FsStateBackend;
-import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
-
-import org.junit.Test;
-
-import java.io.File;
-import java.io.InputStream;
-import java.util.Random;
-
-import static org.junit.Assert.*;
-
-public class FsCheckpointStateOutputStreamTest {
-       
-       /** The temp dir, obtained in a platform neutral way */
-       private static final Path TEMP_DIR_PATH = new Path(new 
File(System.getProperty("java.io.tmpdir")).toURI());
-       
-       
-       @Test(expected = IllegalArgumentException.class)
-       public void testWrongParameters() {
-               // this should fail
-               new FsStateBackend.FsCheckpointStateOutputStream(
-                       TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 4000, 
5000);
-       }
-
-
-       @Test
-       public void testEmptyState() throws Exception {
-               AbstractStateBackend.CheckpointStateOutputStream stream = new 
FsStateBackend.FsCheckpointStateOutputStream(
-                       TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 1024, 
512);
-               
-               StreamStateHandle handle = stream.closeAndGetHandle();
-               assertTrue(handle instanceof ByteStreamStateHandle);
-               
-               InputStream inStream = 
handle.getState(ClassLoader.getSystemClassLoader());
-               assertEquals(-1, inStream.read());
-       }
-       
-       @Test
-       public void testStateBlowMemThreshold() throws Exception {
-               runTest(222, 999, 512, false);
-       }
-
-       @Test
-       public void testStateOneBufferAboveThreshold() throws Exception {
-               runTest(896, 1024, 15, true);
-       }
-
-       @Test
-       public void testStateAboveMemThreshold() throws Exception {
-               runTest(576446, 259, 17, true);
-       }
-       
-       @Test
-       public void testZeroThreshold() throws Exception {
-               runTest(16678, 4096, 0, true);
-       }
-       
-       private void runTest(int numBytes, int bufferSize, int threshold, 
boolean expectFile) throws Exception {
-               AbstractStateBackend.CheckpointStateOutputStream stream =
-                       new FsStateBackend.FsCheckpointStateOutputStream(
-                               TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 
bufferSize, threshold);
-               
-               Random rnd = new Random();
-               byte[] original = new byte[numBytes];
-               byte[] bytes = new byte[original.length];
-
-               rnd.nextBytes(original);
-               System.arraycopy(original, 0, bytes, 0, original.length);
-
-               // the test writes a mixture of writing individual bytes and 
byte arrays
-               int pos = 0;
-               while (pos < bytes.length) {
-                       boolean single = rnd.nextBoolean();
-                       if (single) {
-                               stream.write(bytes[pos++]);
-                       }
-                       else {
-                               int num = rnd.nextInt(Math.min(10, bytes.length 
- pos));
-                               stream.write(bytes, pos, num);
-                               pos += num;
-                       }
-               }
-
-               StreamStateHandle handle = stream.closeAndGetHandle();
-               if (expectFile) {
-                       assertTrue(handle instanceof FileStreamStateHandle);
-               } else {
-                       assertTrue(handle instanceof ByteStreamStateHandle);
-               }
-
-               // make sure the writing process did not alter the original 
byte array
-               assertArrayEquals(original, bytes);
-
-               InputStream inStream = 
handle.getState(ClassLoader.getSystemClassLoader());
-               byte[] validation = new byte[bytes.length];
-               int bytesRead = inStream.read(validation);
-
-               assertEquals(numBytes, bytesRead);
-               assertEquals(-1, inStream.read());
-
-               assertArrayEquals(bytes, validation);
-               
-               handle.discardState();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupRangeOffsetTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupRangeOffsetTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupRangeOffsetTest.java
new file mode 100644
index 0000000..95564cc
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupRangeOffsetTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+public class KeyGroupRangeOffsetTest {
+
+       @Test
+       public void testKeyGroupIntersection() {
+               long[] offsets = new long[9];
+               for (int i = 0; i < offsets.length; ++i) {
+                       offsets[i] = i;
+               }
+
+               int startKeyGroup = 2;
+
+               KeyGroupRangeOffsets keyGroupRangeOffsets = new 
KeyGroupRangeOffsets(KeyGroupRange.of(startKeyGroup, 10), offsets);
+               KeyGroupRangeOffsets intersection = 
keyGroupRangeOffsets.getIntersection(KeyGroupRange.of(3, 7));
+               KeyGroupRangeOffsets expected = new KeyGroupRangeOffsets(
+                               KeyGroupRange.of(3, 7), 
Arrays.copyOfRange(offsets, 3 - startKeyGroup, 8 - startKeyGroup));
+               Assert.assertEquals(expected, intersection);
+
+               Assert.assertEquals(keyGroupRangeOffsets, 
keyGroupRangeOffsets.getIntersection(
+                               keyGroupRangeOffsets.getKeyGroupRange()));
+
+               intersection = 
keyGroupRangeOffsets.getIntersection(KeyGroupRange.of(11, 13));
+               Assert.assertEquals(KeyGroupRange.EMPTY_KEY_GROUP, 
intersection.getKeyGroupRange());
+               Assert.assertFalse(intersection.iterator().hasNext());
+
+               intersection = 
keyGroupRangeOffsets.getIntersection(KeyGroupRange.of(5, 13));
+               expected = new KeyGroupRangeOffsets(KeyGroupRange.of(5, 10), 
Arrays.copyOfRange(
+                               offsets, 5 - startKeyGroup, 11 - 
startKeyGroup));
+               Assert.assertEquals(expected, intersection);
+
+               intersection = 
keyGroupRangeOffsets.getIntersection(KeyGroupRange.of(0, 2));
+               expected = new KeyGroupRangeOffsets(KeyGroupRange.of(2, 2), 
Arrays.copyOfRange(
+                               offsets, 2 - startKeyGroup, 3 - startKeyGroup));
+               Assert.assertEquals(intersection, intersection);
+       }
+
+       @Test
+       public void testKeyGroupRangeOffsetsBasics() {
+               testKeyGroupRangeOffsetsBasicsInternal(0, 0);
+               testKeyGroupRangeOffsetsBasicsInternal(0, 1);
+               testKeyGroupRangeOffsetsBasicsInternal(1, 2);
+               testKeyGroupRangeOffsetsBasicsInternal(42, 42);
+               testKeyGroupRangeOffsetsBasicsInternal(3, 7);
+               testKeyGroupRangeOffsetsBasicsInternal(0, Short.MAX_VALUE);
+               testKeyGroupRangeOffsetsBasicsInternal(Short.MAX_VALUE - 1, 
Short.MAX_VALUE);
+
+               try {
+                       testKeyGroupRangeOffsetsBasicsInternal(-3, 2);
+                       Assert.fail();
+               } catch (IllegalArgumentException ex) {
+                       //expected
+               }
+
+               KeyGroupRangeOffsets testNoGivenOffsets = new 
KeyGroupRangeOffsets(3, 7);
+               for (int i = 3; i <= 7; ++i) {
+                       testNoGivenOffsets.setKeyGroupOffset(i, i + 1);
+               }
+               for (int i = 3; i <= 7; ++i) {
+                       Assert.assertEquals(i + 1, 
testNoGivenOffsets.getKeyGroupOffset(i));
+               }
+
+       }
+
+       private void testKeyGroupRangeOffsetsBasicsInternal(int startKeyGroup, 
int endKeyGroup) {
+
+               long[] offsets = new long[endKeyGroup - startKeyGroup + 1];
+               for (int i = 0; i < offsets.length; ++i) {
+                       offsets[i] = i;
+               }
+
+               KeyGroupRangeOffsets keyGroupRange = new 
KeyGroupRangeOffsets(startKeyGroup, endKeyGroup, offsets);
+               KeyGroupRangeOffsets sameButDifferentConstr =
+                               new 
KeyGroupRangeOffsets(KeyGroupRange.of(startKeyGroup, endKeyGroup), offsets);
+               Assert.assertEquals(keyGroupRange, sameButDifferentConstr);
+
+               int numberOfKeyGroup = 
keyGroupRange.getKeyGroupRange().getNumberOfKeyGroups();
+               Assert.assertEquals(Math.max(0, endKeyGroup - startKeyGroup + 
1), numberOfKeyGroup);
+               if (numberOfKeyGroup > 0) {
+                       Assert.assertEquals(startKeyGroup, 
keyGroupRange.getKeyGroupRange().getStartKeyGroup());
+                       Assert.assertEquals(endKeyGroup, 
keyGroupRange.getKeyGroupRange().getEndKeyGroup());
+                       int c = startKeyGroup;
+                       for (Tuple2<Integer, Long> tuple : keyGroupRange) {
+                               Assert.assertEquals(c, (int) tuple.f0);
+                               
Assert.assertTrue(keyGroupRange.getKeyGroupRange().contains(tuple.f0));
+                               Assert.assertEquals((long) c - startKeyGroup, 
(long) tuple.f1);
+                               ++c;
+                       }
+
+                       for (int i = startKeyGroup; i <= endKeyGroup; ++i) {
+                               Assert.assertEquals(i - startKeyGroup, 
keyGroupRange.getKeyGroupOffset(i));
+                       }
+
+                       int newOffset = 42;
+                       for (int i = startKeyGroup; i <= endKeyGroup; ++i) {
+                               keyGroupRange.setKeyGroupOffset(i, newOffset);
+                               ++newOffset;
+                       }
+
+                       for (int i = startKeyGroup; i <= endKeyGroup; ++i) {
+                               Assert.assertEquals(42 + i - startKeyGroup, 
keyGroupRange.getKeyGroupOffset(i));
+                       }
+
+                       Assert.assertEquals(endKeyGroup + 1, c);
+                       
Assert.assertFalse(keyGroupRange.getKeyGroupRange().contains(startKeyGroup - 
1));
+                       
Assert.assertFalse(keyGroupRange.getKeyGroupRange().contains(endKeyGroup + 1));
+               } else {
+                       Assert.assertEquals(KeyGroupRange.EMPTY_KEY_GROUP, 
keyGroupRange);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupRangeTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupRangeTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupRangeTest.java
new file mode 100644
index 0000000..ab0c327
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupRangeTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class KeyGroupRangeTest {
+
+       @Test
+       public void testKeyGroupIntersection() {
+               KeyGroupRange keyGroupRange1 = KeyGroupRange.of(0, 10);
+               KeyGroupRange keyGroupRange2 = KeyGroupRange.of(3, 7);
+               KeyGroupRange intersection = 
keyGroupRange1.getIntersection(keyGroupRange2);
+               Assert.assertEquals(3, intersection.getStartKeyGroup());
+               Assert.assertEquals(7, intersection.getEndKeyGroup());
+               Assert.assertEquals(intersection, 
keyGroupRange2.getIntersection(keyGroupRange1));
+
+               Assert.assertEquals(keyGroupRange1, 
keyGroupRange1.getIntersection(keyGroupRange1));
+
+               keyGroupRange1 = KeyGroupRange.of(0,5);
+               keyGroupRange2 = KeyGroupRange.of(6,10);
+               intersection =keyGroupRange1.getIntersection(keyGroupRange2);
+               Assert.assertEquals(KeyGroupRange.EMPTY_KEY_GROUP, 
intersection);
+               Assert.assertEquals(intersection, 
keyGroupRange2.getIntersection(keyGroupRange1));
+
+               keyGroupRange1 = KeyGroupRange.of(0, 10);
+               keyGroupRange2 = KeyGroupRange.of(5, 20);
+               intersection = keyGroupRange1.getIntersection(keyGroupRange2);
+               Assert.assertEquals(5, intersection.getStartKeyGroup());
+               Assert.assertEquals(10, intersection.getEndKeyGroup());
+               Assert.assertEquals(intersection, 
keyGroupRange2.getIntersection(keyGroupRange1));
+
+               keyGroupRange1 = KeyGroupRange.of(3, 12);
+               keyGroupRange2 = KeyGroupRange.of(0, 10);
+               intersection = keyGroupRange1.getIntersection(keyGroupRange2);
+               Assert.assertEquals(3, intersection.getStartKeyGroup());
+               Assert.assertEquals(10, intersection.getEndKeyGroup());
+               Assert.assertEquals(intersection, 
keyGroupRange2.getIntersection(keyGroupRange1));
+       }
+
+
+       @Test
+       public void testKeyGroupRangeBasics() {
+               testKeyGroupRangeBasicsInternal(0, 0);
+               testKeyGroupRangeBasicsInternal(0, 1);
+               testKeyGroupRangeBasicsInternal(1, 2);
+               testKeyGroupRangeBasicsInternal(42, 42);
+               testKeyGroupRangeBasicsInternal(3, 7);
+               testKeyGroupRangeBasicsInternal(0, Short.MAX_VALUE);
+               testKeyGroupRangeBasicsInternal(Short.MAX_VALUE - 1, 
Short.MAX_VALUE);
+
+               try {
+                       testKeyGroupRangeBasicsInternal(-3, 2);
+                       Assert.fail();
+               } catch (IllegalArgumentException ex) {
+                       //expected
+               }
+
+       }
+
+       private void testKeyGroupRangeBasicsInternal(int startKeyGroup, int 
endKeyGroup) {
+               KeyGroupRange keyGroupRange = KeyGroupRange.of(startKeyGroup, 
endKeyGroup);
+               int numberOfKeyGroup = keyGroupRange.getNumberOfKeyGroups();
+               Assert.assertEquals(Math.max(0, endKeyGroup - startKeyGroup + 
1), numberOfKeyGroup);
+               if (keyGroupRange.getNumberOfKeyGroups() > 0) {
+                       Assert.assertEquals(startKeyGroup, 
keyGroupRange.getStartKeyGroup());
+                       Assert.assertEquals(endKeyGroup, 
keyGroupRange.getEndKeyGroup());
+                       int c = startKeyGroup;
+                       for(int i : keyGroupRange) {
+                               Assert.assertEquals(c, i);
+                               Assert.assertTrue(keyGroupRange.contains(i));
+                               ++c;
+                       }
+
+                       Assert.assertEquals(endKeyGroup + 1, c);
+                       Assert.assertFalse(keyGroupRange.contains(startKeyGroup 
- 1));
+                       Assert.assertFalse(keyGroupRange.contains(endKeyGroup + 
1));
+               } else {
+                       Assert.assertEquals(KeyGroupRange.EMPTY_KEY_GROUP, 
keyGroupRange);
+               }
+       }
+
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index d3b4dbc..940b337 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -57,37 +57,26 @@ public class MemoryStateBackendTest extends 
StateBackendTestBase<MemoryStateBack
        public void testReducingStateRestoreWithWrongSerializers() {}
 
        @Test
-       public void testSerializableState() {
+       public void testOversizedState() {
                try {
-                       MemoryStateBackend backend = new MemoryStateBackend();
+                       MemoryStateBackend backend = new MemoryStateBackend(10);
 
                        HashMap<String, Integer> state = new HashMap<>();
                        state.put("hey there", 2);
                        state.put("the crazy brown fox stumbles over a sentence 
that does not contain every letter", 77);
 
-                       StateHandle<HashMap<String, Integer>> handle = 
backend.checkpointStateSerializable(state, 12, 459);
-                       assertNotNull(handle);
+                       try {
+                               
AbstractStateBackend.CheckpointStateOutputStream outStream = 
backend.createCheckpointStateOutputStream(
+                                               12,
+                                               459);
 
-                       HashMap<String, Integer> restored = 
handle.getState(getClass().getClassLoader());
-                       assertEquals(state, restored);
-               }
-               catch (Exception e) {
-                       e.printStackTrace();
-                       fail(e.getMessage());
-               }
-       }
+                               ObjectOutputStream oos = new 
ObjectOutputStream(outStream);
+                               oos.writeObject(state);
 
-       @Test
-       public void testOversizedState() {
-               try {
-                       MemoryStateBackend backend = new MemoryStateBackend(10);
+                               oos.flush();
 
-                       HashMap<String, Integer> state = new HashMap<>();
-                       state.put("hey there", 2);
-                       state.put("the crazy brown fox stumbles over a sentence 
that does not contain every letter", 77);
+                               outStream.closeAndGetHandle();
 
-                       try {
-                               backend.checkpointStateSerializable(state, 12, 
459);
                                fail("this should cause an exception");
                        }
                        catch (IOException e) {
@@ -117,7 +106,7 @@ public class MemoryStateBackendTest extends 
StateBackendTestBase<MemoryStateBack
 
                        assertNotNull(handle);
 
-                       ObjectInputStream ois = new 
ObjectInputStream(handle.getState(getClass().getClassLoader()));
+                       ObjectInputStream ois = new 
ObjectInputStream(handle.openInputStream());
                        assertEquals(state, ois.readObject());
                        assertTrue(ois.available() <= 0);
                        ois.close();

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 7b00b27..834c35c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -84,7 +84,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
 
        @After
        public void teardown() throws Exception {
-               this.backend.dispose();
+               this.backend.discardState();
                cleanup();
        }
 
@@ -154,7 +154,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                assertEquals("u3", state.value());
                assertEquals("u3", getSerializedValue(kvState, 3, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
-               backend.dispose();
+               backend.discardState();
                backend.initializeForJob(new DummyEnvironment("test", 1, 0), 
"test_op", IntSerializer.INSTANCE);
 
                backend.injectKeyValueStateSnapshots((HashMap) snapshot1);
@@ -174,7 +174,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                assertEquals("2", restored1.value());
                assertEquals("2", getSerializedValue(restoredKvState1, 2, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
-               backend.dispose();
+               backend.discardState();
                backend.initializeForJob(new DummyEnvironment("test", 1, 0), 
"test_op", IntSerializer.INSTANCE);
 
                backend.injectKeyValueStateSnapshots((HashMap) snapshot2);
@@ -254,7 +254,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                        }
                }
 
-               backend.dispose();
+               backend.discardState();
                backend.initializeForJob(new DummyEnvironment("test", 1, 0), 
"test_op", IntSerializer.INSTANCE);
 
                backend.injectKeyValueStateSnapshots((HashMap) snapshot1);
@@ -334,7 +334,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                        assertEquals("u3", joiner.join(state.get()));
                        assertEquals("u3", 
joiner.join(getSerializedList(kvState, 3, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 
-                       backend.dispose();
+                       backend.discardState();
 
                        // restore the first snapshot and validate it
                        backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
@@ -355,7 +355,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                        assertEquals("2", joiner.join(restored1.get()));
                        assertEquals("2", 
joiner.join(getSerializedList(restoredKvState1, 2, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)));
 
-                       backend.dispose();
+                       backend.discardState();
 
                        // restore the second snapshot and validate it
                        backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
@@ -452,7 +452,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                        assertEquals("u3", state.get());
                        assertEquals("u3", getSerializedValue(kvState, 3, 
keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
-                       backend.dispose();
+                       backend.discardState();
 
                        // restore the first snapshot and validate it
                        backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
@@ -473,7 +473,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                        assertEquals("2", restored1.get());
                        assertEquals("2", getSerializedValue(restoredKvState1, 
2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, 
valueSerializer));
 
-                       backend.dispose();
+                       backend.discardState();
 
                        // restore the second snapshot and validate it
                        backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
@@ -574,7 +574,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                        assertEquals("Fold-Initial:,103", state.get());
                        assertEquals("Fold-Initial:,103", 
getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, 
namespaceSerializer, valueSerializer));
 
-                       backend.dispose();
+                       backend.discardState();
 
                        // restore the first snapshot and validate it
                        backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
@@ -595,7 +595,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                        assertEquals("Fold-Initial:,2", restored1.get());
                        assertEquals("Fold-Initial:,2", 
getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, 
namespaceSerializer, valueSerializer));
 
-                       backend.dispose();
+                       backend.discardState();
 
                        // restore the second snapshot and validate it
                        backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
@@ -653,7 +653,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                                }
                        }
 
-                       backend.dispose();
+                       backend.discardState();
 
                        // restore the first snapshot and validate it
                        backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
@@ -713,7 +713,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                                }
                        }
 
-                       backend.dispose();
+                       backend.discardState();
 
                        // restore the first snapshot and validate it
                        backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
@@ -775,7 +775,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                                }
                        }
 
-                       backend.dispose();
+                       backend.discardState();
 
                        // restore the first snapshot and validate it
                        backend.initializeForJob(new DummyEnvironment("test", 
1, 0), "test_op", IntSerializer.INSTANCE);
@@ -1030,7 +1030,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> {
                }
 
                // Verify unregistered
-               backend.dispose();
+               backend.discardState();
 
                verify(listener, times(1)).notifyKvStateUnregistered(
                                eq(env.getJobID()), eq(env.getJobVertexId()), 
eq(0), eq("banana"));

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
new file mode 100644
index 0000000..1d45115
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.filesystem;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.AbstractStateBackend;
+
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.junit.Test;
+
+import java.io.DataInputStream;
+import java.io.File;
+import java.io.InputStream;
+import java.util.Random;
+
+import static org.junit.Assert.*;
+
+public class FsCheckpointStateOutputStreamTest {
+       
+       /** The temp dir, obtained in a platform neutral way */
+       private static final Path TEMP_DIR_PATH = new Path(new 
File(System.getProperty("java.io.tmpdir")).toURI());
+       
+       
+       @Test(expected = IllegalArgumentException.class)
+       public void testWrongParameters() {
+               // this should fail
+               new FsStateBackend.FsCheckpointStateOutputStream(
+                       TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 4000, 
5000);
+       }
+
+
+       @Test
+       public void testEmptyState() throws Exception {
+               AbstractStateBackend.CheckpointStateOutputStream stream = new 
FsStateBackend.FsCheckpointStateOutputStream(
+                       TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 1024, 
512);
+
+               StreamStateHandle handle = stream.closeAndGetHandle();
+               assertTrue(handle instanceof ByteStreamStateHandle);
+
+               InputStream inStream = handle.openInputStream();
+               assertEquals(-1, inStream.read());
+       }
+
+       @Test
+       public void testStateBlowMemThreshold() throws Exception {
+               runTest(222, 999, 512, false);
+       }
+
+       @Test
+       public void testStateOneBufferAboveThreshold() throws Exception {
+               runTest(896, 1024, 15, true);
+       }
+
+       @Test
+       public void testStateAboveMemThreshold() throws Exception {
+               runTest(576446, 259, 17, true);
+       }
+       
+       @Test
+       public void testZeroThreshold() throws Exception {
+               runTest(16678, 4096, 0, true);
+       }
+       
+       private void runTest(int numBytes, int bufferSize, int threshold, 
boolean expectFile) throws Exception {
+               AbstractStateBackend.CheckpointStateOutputStream stream =
+                       new FsStateBackend.FsCheckpointStateOutputStream(
+                               TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 
bufferSize, threshold);
+               
+               Random rnd = new Random();
+               byte[] original = new byte[numBytes];
+               byte[] bytes = new byte[original.length];
+
+               rnd.nextBytes(original);
+               System.arraycopy(original, 0, bytes, 0, original.length);
+
+               // the test writes a mixture of writing individual bytes and 
byte arrays
+               int pos = 0;
+               while (pos < bytes.length) {
+                       boolean single = rnd.nextBoolean();
+                       if (single) {
+                               stream.write(bytes[pos++]);
+                       }
+                       else {
+                               int num = rnd.nextInt(Math.min(10, bytes.length 
- pos));
+                               stream.write(bytes, pos, num);
+                               pos += num;
+                       }
+               }
+
+               StreamStateHandle handle = stream.closeAndGetHandle();
+               if (expectFile) {
+                       assertTrue(handle instanceof FileStateHandle);
+               } else {
+                       assertTrue(handle instanceof ByteStreamStateHandle);
+               }
+
+               // make sure the writing process did not alter the original 
byte array
+               assertArrayEquals(original, bytes);
+
+               InputStream inStream = handle.openInputStream();
+               byte[] validation = new byte[bytes.length];
+
+               DataInputStream dataInputStream = new DataInputStream(inStream);
+               dataInputStream.readFully(validation);
+
+               assertArrayEquals(bytes, validation);
+
+               handle.discardState();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index fc1a5df..429fc6b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -44,16 +44,19 @@ import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.memory.MemoryManager;
 
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.ChainedStateHandle;
+import org.apache.flink.runtime.state.KeyGroupsStateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
+
 import org.apache.flink.util.SerializedValue;
 import org.junit.Before;
 import org.junit.Test;
 
 import scala.concurrent.duration.FiniteDuration;
 
-import java.io.Serializable;
 import java.net.URL;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertFalse;
@@ -179,8 +182,8 @@ public class TaskAsyncCallTest {
                                new TaskManagerRuntimeInfo("localhost", new 
Configuration(), System.getProperty("java.io.tmpdir")),
                                mock(TaskMetricGroup.class));
        }
-       
-       public static class CheckpointsInOrderInvokable extends 
AbstractInvokable implements StatefulTask<StateHandle<Serializable>> {
+
+       public static class CheckpointsInOrderInvokable extends 
AbstractInvokable implements StatefulTask {
 
                private volatile long lastCheckpointId = 0;
                
@@ -204,7 +207,10 @@ public class TaskAsyncCallTest {
                }
 
                @Override
-               public void setInitialState(StateHandle<Serializable> 
stateHandle) throws Exception {}
+               public void 
setInitialState(ChainedStateHandle<StreamStateHandle> chainedState,
+                               List<KeyGroupsStateHandle> keyGroupsState) 
throws Exception {
+
+               }
 
                @Override
                public boolean triggerCheckpoint(long checkpointId, long 
timestamp) {

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
index 7505bfc..7e8868c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java
@@ -22,7 +22,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.api.BackgroundCallback;
 import org.apache.curator.framework.api.CuratorEvent;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.TestLogger;
 import org.apache.zookeeper.CreateMode;
@@ -97,7 +97,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
                // Verify
                // State handle created
                assertEquals(1, store.getAll().size());
-               assertEquals(state, store.get(pathInZooKeeper).getState(null));
+               assertEquals(state, store.get(pathInZooKeeper).retrieveState());
 
                // Path created and is persistent
                Stat stat = 
ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
@@ -106,9 +106,9 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
 
                // Data is equal
                @SuppressWarnings("unchecked")
-               Long actual = ((StateHandle<Long>) 
InstantiationUtil.deserializeObject(
+               Long actual = ((RetrievableStateHandle<Long>) 
InstantiationUtil.deserializeObject(
                                
ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
-                               
ClassLoader.getSystemClassLoader())).getState(null);
+                               
ClassLoader.getSystemClassLoader())).retrieveState();
 
                assertEquals(state, actual);
        }
@@ -149,7 +149,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
                        // Verify
                        // State handle created
                        assertEquals(i + 1, store.getAll().size());
-                       assertEquals(state, 
longStateStorage.getStateHandles().get(i).getState(null));
+                       assertEquals(state, 
longStateStorage.getStateHandles().get(i).retrieveState());
 
                        // Path created
                        Stat stat = 
ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
@@ -166,9 +166,9 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
 
                        // Data is equal
                        @SuppressWarnings("unchecked")
-                       Long actual = ((StateHandle<Long>) 
InstantiationUtil.deserializeObject(
+                       Long actual = ((RetrievableStateHandle<Long>) 
InstantiationUtil.deserializeObject(
                                        
ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
-                                       
ClassLoader.getSystemClassLoader())).getState(null);
+                                       
ClassLoader.getSystemClassLoader())).retrieveState();
 
                        assertEquals(state, actual);
                }
@@ -218,7 +218,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
                // Verify
                // State handle created and discarded
                assertEquals(1, stateHandleProvider.getStateHandles().size());
-               assertEquals(state, 
stateHandleProvider.getStateHandles().get(0).getState(null));
+               assertEquals(state, 
stateHandleProvider.getStateHandles().get(0).retrieveState());
                assertEquals(1, 
stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls());
        }
 
@@ -245,8 +245,8 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
                // Verify
                // State handles created
                assertEquals(2, stateHandleProvider.getStateHandles().size());
-               assertEquals(initialState, 
stateHandleProvider.getStateHandles().get(0).getState(null));
-               assertEquals(replaceState, 
stateHandleProvider.getStateHandles().get(1).getState(null));
+               assertEquals(initialState, 
stateHandleProvider.getStateHandles().get(0).retrieveState());
+               assertEquals(replaceState, 
stateHandleProvider.getStateHandles().get(1).retrieveState());
 
                // Path created and is persistent
                Stat stat = 
ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
@@ -255,9 +255,9 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
 
                // Data is equal
                @SuppressWarnings("unchecked")
-               Long actual = ((StateHandle<Long>) 
InstantiationUtil.deserializeObject(
+               Long actual = ((RetrievableStateHandle<Long>) 
InstantiationUtil.deserializeObject(
                                
ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
-                               
ClassLoader.getSystemClassLoader())).getState(null);
+                               
ClassLoader.getSystemClassLoader())).retrieveState();
 
                assertEquals(replaceState, actual);
        }
@@ -267,7 +267,7 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
         */
        @Test(expected = Exception.class)
        public void testReplaceNonExistingPath() throws Exception {
-               StateStorageHelper<Long> stateStorage = new LongStateStorage();
+               RetrievableStateStorageHelper<Long> stateStorage = new 
LongStateStorage();
 
                ZooKeeperStateHandleStore<Long> store = new 
ZooKeeperStateHandleStore<>(
                                ZooKeeper.getClient(), stateStorage);
@@ -307,15 +307,15 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
                // Verify
                // State handle created and discarded
                assertEquals(2, stateHandleProvider.getStateHandles().size());
-               assertEquals(initialState, 
stateHandleProvider.getStateHandles().get(0).getState(null));
-               assertEquals(replaceState, 
stateHandleProvider.getStateHandles().get(1).getState(null));
+               assertEquals(initialState, 
stateHandleProvider.getStateHandles().get(0).retrieveState());
+               assertEquals(replaceState, 
stateHandleProvider.getStateHandles().get(1).retrieveState());
                assertEquals(1, 
stateHandleProvider.getStateHandles().get(1).getNumberOfDiscardCalls());
 
                // Initial value
                @SuppressWarnings("unchecked")
-               Long actual = ((StateHandle<Long>) 
InstantiationUtil.deserializeObject(
+               Long actual = ((RetrievableStateHandle<Long>) 
InstantiationUtil.deserializeObject(
                                
ZooKeeper.getClient().getData().forPath(pathInZooKeeper),
-                               
ClassLoader.getSystemClassLoader())).getState(null);
+                               
ClassLoader.getSystemClassLoader())).retrieveState();
 
                assertEquals(initialState, actual);
        }
@@ -339,10 +339,10 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
                assertEquals(-1, store.exists(pathInZooKeeper));
 
                store.add(pathInZooKeeper, state);
-               StateHandle<Long> actual = store.get(pathInZooKeeper);
+               RetrievableStateHandle<Long> actual = 
store.get(pathInZooKeeper);
 
                // Verify
-               assertEquals(state, actual.getState(null));
+               assertEquals(state, actual.retrieveState());
                assertTrue(store.exists(pathInZooKeeper) >= 0);
        }
 
@@ -384,8 +384,8 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
                        store.add(pathInZooKeeper, val, 
CreateMode.PERSISTENT_SEQUENTIAL);
                }
 
-               for (Tuple2<StateHandle<Long>, String> val : store.getAll()) {
-                       assertTrue(expected.remove(val.f0.getState(null)));
+               for (Tuple2<RetrievableStateHandle<Long>, String> val : 
store.getAll()) {
+                       assertTrue(expected.remove(val.f0.retrieveState()));
                }
                assertEquals(0, expected.size());
        }
@@ -412,11 +412,11 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
                        store.add(pathInZooKeeper, val, 
CreateMode.PERSISTENT_SEQUENTIAL);
                }
 
-               List<Tuple2<StateHandle<Long>, String>> actual = 
store.getAllSortedByName();
+               List<Tuple2<RetrievableStateHandle<Long>, String>> actual = 
store.getAllSortedByName();
                assertEquals(expected.length, actual.size());
 
                for (int i = 0; i < expected.length; i++) {
-                       assertEquals(expected[i], 
actual.get(i).f0.getState(null));
+                       assertEquals(expected[i], 
actual.get(i).f0.retrieveState());
                }
        }
 
@@ -540,24 +540,24 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
        // Simple test helpers
        // 
---------------------------------------------------------------------------------------------
 
-       private static class LongStateStorage implements 
StateStorageHelper<Long> {
+       private static class LongStateStorage implements 
RetrievableStateStorageHelper<Long> {
 
-               private final List<LongStateHandle> stateHandles = new 
ArrayList<>();
+               private final List<LongRetrievableStateHandle> stateHandles = 
new ArrayList<>();
 
                @Override
-               public StateHandle<Long> store(Long state) throws Exception {
-                       LongStateHandle stateHandle = new 
LongStateHandle(state);
+               public RetrievableStateHandle<Long> store(Long state) throws 
Exception {
+                       LongRetrievableStateHandle stateHandle = new 
LongRetrievableStateHandle(state);
                        stateHandles.add(stateHandle);
 
                        return stateHandle;
                }
 
-               List<LongStateHandle> getStateHandles() {
+               List<LongRetrievableStateHandle> getStateHandles() {
                        return stateHandles;
                }
        }
 
-       private static class LongStateHandle implements StateHandle<Long> {
+       private static class LongRetrievableStateHandle implements 
RetrievableStateHandle<Long> {
 
                private static final long serialVersionUID = 
-3555329254423838912L;
 
@@ -565,12 +565,12 @@ public class ZooKeeperStateHandleStoreITCase extends 
TestLogger {
 
                private int numberOfDiscardCalls;
 
-               public LongStateHandle(Long state) {
+               public LongRetrievableStateHandle(Long state) {
                        this.state = state;
                }
 
                @Override
-               public Long getState(ClassLoader ignored) throws Exception {
+               public Long retrieveState() throws Exception {
                        return state;
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 
b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
index 9388818..4822f91 100644
--- 
a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
+++ 
b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -46,11 +46,10 @@ import 
org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
-import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
+
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
 import org.apache.flink.test.util.TestEnvironment;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -276,8 +275,7 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
 
        @Override
        protected void verifyResultsIdealCircumstances(
-               OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, 
Tuple3<String, Integer, Integer>> harness,
-               OneInputStreamTask<Tuple3<String, Integer, Integer>, 
Tuple3<String, Integer, Integer>> task,
+               OneInputStreamOperatorTestHarness<Tuple3<String, Integer, 
Integer>, Tuple3<String, Integer, Integer>> harness,
                CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> 
sink) {
 
                ResultSet result = session.execute(SELECT_DATA_QUERY);
@@ -294,8 +292,7 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
 
        @Override
        protected void verifyResultsDataPersistenceUponMissedNotify(
-               OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, 
Tuple3<String, Integer, Integer>> harness,
-               OneInputStreamTask<Tuple3<String, Integer, Integer>, 
Tuple3<String, Integer, Integer>> task,
+               OneInputStreamOperatorTestHarness<Tuple3<String, Integer, 
Integer>, Tuple3<String, Integer, Integer>> harness,
                CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> 
sink) {
 
                ResultSet result = session.execute(SELECT_DATA_QUERY);
@@ -312,8 +309,7 @@ public class CassandraConnectorITCase extends 
WriteAheadSinkTestBase<Tuple3<Stri
 
        @Override
        protected void verifyResultsDataDiscardingUponRestore(
-               OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, 
Tuple3<String, Integer, Integer>> harness,
-               OneInputStreamTask<Tuple3<String, Integer, Integer>, 
Tuple3<String, Integer, Integer>> task,
+               OneInputStreamOperatorTestHarness<Tuple3<String, Integer, 
Integer>, Tuple3<String, Integer, Integer>> harness,
                CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> 
sink) {
 
                ResultSet result = session.execute(SELECT_DATA_QUERY);

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
index 828dcc9..e274fdd 100644
--- 
a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
+++ 
b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java
@@ -28,13 +28,13 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.api.operators.StreamSink;
 import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter;
 import org.apache.flink.streaming.connectors.fs.Clock;
 import org.apache.flink.streaming.connectors.fs.SequenceFileWriter;
 import org.apache.flink.streaming.connectors.fs.StringWriter;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider;
 import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -137,11 +137,11 @@ public class BucketingSinkTest {
 
                // snapshot but don't call notify to simulate a notify that 
never
                // arrives, the sink should move pending files in restore() in 
that case
-               StreamTaskState snapshot1 = testHarness.snapshot(0, 0);
+               StreamStateHandle snapshot1 = testHarness.snapshot(0, 0);
 
                testHarness = createTestSink(dataDir, clock);
                testHarness.setup();
-               testHarness.restore(snapshot1, 1);
+               testHarness.restore(snapshot1);
                testHarness.open();
 
                testHarness.processElement(new StreamRecord<>("Hello"));

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index fda5efd..6d67560 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -24,11 +24,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.fs.FileInputSplit;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.metrics.Counter;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
@@ -36,13 +35,11 @@ import 
org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.io.Serializable;
@@ -376,14 +373,10 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
        //      ---------------------                   Checkpointing           
        --------------------------
 
        @Override
-       public StreamTaskState snapshotOperatorState(long checkpointId, long 
timestamp) throws Exception {
-               StreamTaskState taskState = 
super.snapshotOperatorState(checkpointId, timestamp);
-
-               final AbstractStateBackend.CheckpointStateOutputStream os =
-                       
this.getStateBackend().createCheckpointStateOutputStream(checkpointId, 
timestamp);
+       public void snapshotState(FSDataOutputStream os, long checkpointId, 
long timestamp) throws Exception {
+               super.snapshotState(os, checkpointId, timestamp);
 
                final ObjectOutputStream oos = new ObjectOutputStream(os);
-               final AbstractStateBackend.CheckpointStateOutputView ov = new 
AbstractStateBackend.CheckpointStateOutputView(os);
 
                Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState = 
this.reader.getReaderState();
                List<FileInputSplit> pendingSplits = readerState.f0;
@@ -392,35 +385,28 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
 
                // write the current split
                oos.writeObject(currSplit);
-
-               // write the pending ones
-               ov.writeInt(pendingSplits.size());
+               oos.writeInt(pendingSplits.size());
                for (FileInputSplit split : pendingSplits) {
                        oos.writeObject(split);
                }
 
                // write the state of the reading channel
                oos.writeObject(formatState);
-               taskState.setOperatorState(os.closeAndGetHandle());
-               return taskState;
+               oos.flush();
        }
 
        @Override
-       public void restoreState(StreamTaskState state) throws Exception {
-               super.restoreState(state);
-
-               StreamStateHandle stream = (StreamStateHandle) 
state.getOperatorState();
+       public void restoreState(FSDataInputStream is) throws Exception {
+               super.restoreState(is);
 
-               final InputStream is = 
stream.getState(getUserCodeClassloader());
                final ObjectInputStream ois = new ObjectInputStream(is);
-               final DataInputViewStreamWrapper div = new 
DataInputViewStreamWrapper(is);
 
                // read the split that was being read
                FileInputSplit currSplit = (FileInputSplit) ois.readObject();
 
                // read the pending splits list
                List<FileInputSplit> pendingSplits = new LinkedList<>();
-               int noOfSplits = div.readInt();
+               int noOfSplits = ois.readInt();
                for (int i = 0; i < noOfSplits; i++) {
                        FileInputSplit split = (FileInputSplit) 
ois.readObject();
                        pendingSplits.add(split);
@@ -435,6 +421,5 @@ public class ContinuousFileReaderOperator<OUT, S extends 
Serializable> extends A
                        "The reader state has already been initialized.");
 
                this.readerState = new Tuple3<>(pendingSplits, currSplit, 
formatState);
-               div.close();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 15bb384..59ecd15 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -25,6 +25,9 @@ import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.KvStateSnapshot;
@@ -35,11 +38,14 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.util.HashMap;
+import java.util.Set;
 import java.util.concurrent.ScheduledFuture;
 
 /**
@@ -91,7 +97,7 @@ public abstract class AbstractStreamOperator<OUT>
        private transient KeySelector<?, ?> stateKeySelector2;
 
        /** The state backend that stores the state and checkpoints for this 
task */
-       private AbstractStateBackend stateBackend = null;
+       private transient AbstractStateBackend stateBackend;
        protected MetricGroup metrics;
 
        // 
------------------------------------------------------------------------
@@ -164,7 +170,7 @@ public abstract class AbstractStreamOperator<OUT>
                if (stateBackend != null) {
                        try {
                                stateBackend.close();
-                               stateBackend.dispose();
+                               stateBackend.discardState();
                        } catch (Exception e) {
                                throw new RuntimeException("Error while 
closing/disposing state backend.", e);
                        }
@@ -176,30 +182,45 @@ public abstract class AbstractStreamOperator<OUT>
        // 
------------------------------------------------------------------------
 
        @Override
-       public StreamTaskState snapshotOperatorState(long checkpointId, long 
timestamp) throws Exception {
-               // here, we deal with key/value state snapshots
-               
-               StreamTaskState state = new StreamTaskState();
-
-               if (stateBackend != null) {
-                       HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> 
partitionedSnapshots =
-                               
stateBackend.snapshotPartitionedState(checkpointId, timestamp);
-                       if (partitionedSnapshots != null) {
-                               state.setKvStates(partitionedSnapshots);
+       public void snapshotState(FSDataOutputStream out,
+                       long checkpointId,
+                       long timestamp) throws Exception {
+
+               HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> keyedState =
+                               
stateBackend.snapshotPartitionedState(checkpointId,timestamp);
+
+               // Materialize asynchronous snapshots, if any
+               if (keyedState != null) {
+                       Set<String> keys = keyedState.keySet();
+                       for (String key: keys) {
+                               if (keyedState.get(key) instanceof 
AsynchronousKvStateSnapshot) {
+                                       AsynchronousKvStateSnapshot<?, ?, ?, ?, 
?> asyncHandle = (AsynchronousKvStateSnapshot<?, ?, ?, ?, ?>) 
keyedState.get(key);
+                                       keyedState.put(key, 
asyncHandle.materialize());
+                               }
                        }
                }
 
-               return state;
+               byte[] serializedSnapshot = 
InstantiationUtil.serializeObject(keyedState);
+
+               DataOutputStream dos = new DataOutputStream(out);
+               dos.writeInt(serializedSnapshot.length);
+               dos.write(serializedSnapshot);
+
+               dos.flush();
+
        }
-       
+
        @Override
-       @SuppressWarnings("rawtypes,unchecked")
-       public void restoreState(StreamTaskState state) throws Exception {
-               // restore the key/value state. the actual restore happens 
lazily, when the function requests
-               // the state again, because the restore method needs 
information provided by the user function
-               if (stateBackend != null) {
-                       
stateBackend.injectKeyValueStateSnapshots((HashMap)state.getKvStates());
-               }
+       public void restoreState(FSDataInputStream in) throws Exception {
+               DataInputStream dis = new DataInputStream(in);
+               int size = dis.readInt();
+               byte[] serializedSnapshot = new byte[size];
+               dis.readFully(serializedSnapshot);
+
+               HashMap<String, KvStateSnapshot> keyedState =
+                               
InstantiationUtil.deserializeObject(serializedSnapshot, 
getUserCodeClassloader());
+
+               stateBackend.injectKeyValueStateSnapshots(keyedState);
        }
        
        @Override
@@ -249,8 +270,8 @@ public abstract class AbstractStreamOperator<OUT>
        }
 
        /**
-        * Register a timer callback. At the specified time the provided {@link 
Triggerable} will
-        * be invoked. This call is guaranteed to not happen concurrently with 
method calls on the operator.
+        * Register a timer callback. At the specified time the {@link 
Triggerable} will be invoked.
+        * This call is guaranteed to not happen concurrently with method calls 
on the operator.
         *
         * @param time The absolute time in milliseconds.
         * @param target The target to be triggered.
@@ -281,11 +302,18 @@ public abstract class AbstractStreamOperator<OUT>
         */
        @SuppressWarnings("unchecked")
        protected <S extends State, N> S getPartitionedState(N namespace, 
TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) 
throws Exception {
-               return getStateBackend().getPartitionedState(namespace, 
(TypeSerializer<Object>) namespaceSerializer,
-                       stateDescriptor);
+               if (stateBackend != null) {
+                       return stateBackend.getPartitionedState(
+                               namespace,
+                               namespaceSerializer,
+                               stateDescriptor);
+               } else {
+                       throw new RuntimeException("Cannot create partitioned 
state. The key grouped state " +
+                               "backend has not been set. This indicates that 
the operator is not " +
+                               "partitioned/keyed.");
+               }
        }
 
-
        @Override
        @SuppressWarnings({"unchecked", "rawtypes"})
        public void setKeyContextElement1(StreamRecord record) throws Exception 
{
@@ -300,17 +328,25 @@ public abstract class AbstractStreamOperator<OUT>
        public void setKeyContextElement2(StreamRecord record) throws Exception 
{
                if (stateKeySelector2 != null) {
                        Object key = ((KeySelector) 
stateKeySelector2).getKey(record.getValue());
-                       getStateBackend().setCurrentKey(key);
+
+                       setKeyContext(key);
                }
        }
 
        @SuppressWarnings({"unchecked", "rawtypes"})
        public void setKeyContext(Object key) {
-               if (stateKeySelector1 != null) {
-                       stateBackend.setCurrentKey(key);
+               if (stateBackend != null) {
+                       try {
+                               stateBackend.setCurrentKey(key);
+                       } catch (Exception e) {
+                               throw new RuntimeException("Exception occurred 
while setting the current key context.", e);
+                       }
+               } else {
+                       throw new RuntimeException("Could not set the current 
key context, because the " +
+                               "AbstractStateBackend has not been 
initialized.");
                }
        }
-       
+
        // 
------------------------------------------------------------------------
        //  Context and chaining properties
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 1ddd934..b1bc531 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.Serializable;
 
 import org.apache.flink.annotation.PublicEvolving;
@@ -26,14 +28,13 @@ import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 
 import static java.util.Objects.requireNonNull;
 
@@ -117,8 +118,8 @@ public abstract class AbstractUdfStreamOperator<OUT, F 
extends Function> extends
        // 
------------------------------------------------------------------------
        
        @Override
-       public StreamTaskState snapshotOperatorState(long checkpointId, long 
timestamp) throws Exception {
-               StreamTaskState state = 
super.snapshotOperatorState(checkpointId, timestamp);
+       public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
+               super.snapshotState(out, checkpointId, timestamp);
 
                if (userFunction instanceof Checkpointed) {
                        @SuppressWarnings("unchecked")
@@ -127,45 +128,39 @@ public abstract class AbstractUdfStreamOperator<OUT, F 
extends Function> extends
                        Serializable udfState;
                        try {
                                udfState = 
chkFunction.snapshotState(checkpointId, timestamp);
-                       } 
-                       catch (Exception e) {
-                               throw new Exception("Failed to draw state 
snapshot from function: " + e.getMessage(), e);
-                       }
-                       
-                       if (udfState != null) {
-                               try {
-                                       AbstractStateBackend stateBackend = 
getStateBackend();
-                                       StateHandle<Serializable> handle = 
-                                                       
stateBackend.checkpointStateSerializable(udfState, checkpointId, timestamp);
-                                       state.setFunctionState(handle);
-                               }
-                               catch (Exception e) {
-                                       throw new Exception("Failed to add the 
state snapshot of the function to the checkpoint: "
-                                                       + e.getMessage(), e);
+                               if (udfState != null) {
+                                       out.write(1);
+                                       ObjectOutputStream os = new 
ObjectOutputStream(out);
+                                       os.writeObject(udfState);
+                                       os.flush();
+                               } else {
+                                       out.write(0);
                                }
+                       } catch (Exception e) {
+                               throw new Exception("Failed to draw state 
snapshot from function: " + e.getMessage(), e);
                        }
                }
-               
-               return state;
        }
 
        @Override
-       public void restoreState(StreamTaskState state) throws Exception {
-               super.restoreState(state);
-               
-               StateHandle<Serializable> stateHandle =  
state.getFunctionState();
-               
-               if (userFunction instanceof Checkpointed && stateHandle != 
null) {
+       public void restoreState(FSDataInputStream in) throws Exception {
+               super.restoreState(in);
+
+               if (userFunction instanceof Checkpointed) {
                        @SuppressWarnings("unchecked")
                        Checkpointed<Serializable> chkFunction = 
(Checkpointed<Serializable>) userFunction;
-                       
-                       Serializable functionState = 
stateHandle.getState(getUserCodeClassloader());
-                       if (functionState != null) {
-                               try {
-                                       chkFunction.restoreState(functionState);
-                               }
-                               catch (Exception e) {
-                                       throw new Exception("Failed to restore 
state to function: " + e.getMessage(), e);
+
+                       int hasUdfState = in.read();
+
+                       if (hasUdfState == 1) {
+                               ObjectInputStream ois = new 
ObjectInputStream(in);
+                               Serializable functionState = (Serializable) 
ois.readObject();
+                               if (functionState != null) {
+                                       try {
+                                               
chkFunction.restoreState(functionState);
+                                       } catch (Exception e) {
+                                               throw new Exception("Failed to 
restore state to function: " + e.getMessage(), e);
+                                       }
                                }
                        }
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 3e38165..3411a60 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -21,10 +21,11 @@ import java.io.Serializable;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 
 /**
  * Basic interface for stream operators. Implementers would implement one of
@@ -94,17 +95,15 @@ public interface StreamOperator<OUT> extends Serializable {
         * (if the operator is stateful) and the key/value state (if it is 
being used and has been
         * initialized).
         *
+        * @param out The stream to which we have to write our state.
         * @param checkpointId The ID of the checkpoint.
         * @param timestamp The timestamp of the checkpoint.
         *
-        * @return The StreamTaskState object, possibly containing the 
snapshots for the
-        *         operator and key/value state.
-        *
         * @throws Exception Forwards exceptions that occur while drawing 
snapshots from the operator
         *                   and the key/value state.
         */
-       StreamTaskState snapshotOperatorState(long checkpointId, long 
timestamp) throws Exception;
-       
+       void snapshotState(FSDataOutputStream out, long checkpointId, long 
timestamp) throws Exception;
+
        /**
         * Restores the operator state, if this operator's execution is 
recovering from a checkpoint.
         * This method restores the operator state (if the operator is 
stateful) and the key/value state
@@ -113,13 +112,12 @@ public interface StreamOperator<OUT> extends Serializable 
{
         * <p>This method is called after {@link #setup(StreamTask, 
StreamConfig, Output)}
         * and before {@link #open()}.
         *
-        * @param state The state of operator that was snapshotted as part of 
checkpoint
-        *              from which the execution is restored.
+        * @param in The stream from which we have to restore our state.
         *
         * @throws Exception Exceptions during state restore should be 
forwarded, so that the system can
         *                   properly react to failed state restore and fail 
the execution attempt.
         */
-       void restoreState(StreamTaskState state) throws Exception;
+       void restoreState(FSDataInputStream in) throws Exception;
 
        /**
         * Called when the checkpoint with the given ID is completed and 
acknowledged on the JobManager.

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index 34c62fb..8d074cc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -19,17 +19,19 @@ package org.apache.flink.streaming.runtime.operators;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.io.disk.InputViewIterator;
 import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.InstantiationUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,7 +56,7 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
 
        protected static final Logger LOG = 
LoggerFactory.getLogger(GenericWriteAheadSink.class);
        private final CheckpointCommitter committer;
-       private transient AbstractStateBackend.CheckpointStateOutputView out;
+       private transient AbstractStateBackend.CheckpointStateOutputStream out;
        protected final TypeSerializer<IN> serializer;
        private final String id;
 
@@ -89,7 +91,7 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
        private void saveHandleInState(final long checkpointId, final long 
timestamp) throws Exception {
                //only add handle if a new OperatorState was created since the 
last snapshot
                if (out != null) {
-                       StateHandle<DataInputView> handle = 
out.closeAndGetHandle();
+                       StreamStateHandle handle = out.closeAndGetHandle();
                        if (state.pendingHandles.containsKey(checkpointId)) {
                                //we already have a checkpoint stored for that 
ID that may have been partially written,
                                //so we discard this "alternate version" and 
use the stored checkpoint
@@ -102,18 +104,21 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
        }
 
        @Override
-       public StreamTaskState snapshotOperatorState(final long checkpointId, 
final long timestamp) throws Exception {
-               StreamTaskState taskState = 
super.snapshotOperatorState(checkpointId, timestamp);
+       public void snapshotState(FSDataOutputStream out,
+                       long checkpointId,
+                       long timestamp) throws Exception {
+               super.snapshotState(out, checkpointId, timestamp);
+
                saveHandleInState(checkpointId, timestamp);
-               taskState.setFunctionState(state);
-               return taskState;
+
+               InstantiationUtil.serializeObject(out, state);
        }
 
        @Override
-       public void restoreState(StreamTaskState state) throws Exception {
-               super.restoreState(state);
-               this.state = (ExactlyOnceState) state.getFunctionState();
-               out = null;
+       public void restoreState(FSDataInputStream in) throws Exception {
+               super.restoreState(in);
+
+               this.state = InstantiationUtil.deserializeObject(in, 
getUserCodeClassloader());
        }
 
        private void cleanState() throws Exception {
@@ -142,9 +147,9 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
                                if (pastCheckpointId <= checkpointId) {
                                        try {
                                                if 
(!committer.isCheckpointCommitted(pastCheckpointId)) {
-                                                       Tuple2<Long, 
StateHandle<DataInputView>> handle = state.pendingHandles.get(pastCheckpointId);
-                                                       DataInputView in = 
handle.f1.getState(getUserCodeClassloader());
-                                                       boolean success = 
sendValues(new ReusingMutableToRegularIteratorWrapper<>(new 
InputViewIterator<>(in, serializer), serializer), handle.f0);
+                                                       Tuple2<Long, 
StreamStateHandle> handle = state.pendingHandles.get(pastCheckpointId);
+                                                       FSDataInputStream in = 
handle.f1.openInputStream();
+                                                       boolean success = 
sendValues(new ReusingMutableToRegularIteratorWrapper<>(new 
InputViewIterator<>(new DataInputViewStreamWrapper(in), serializer), 
serializer), handle.f0);
                                                        if (success) { //if the 
sending has failed we will retry on the next notify
                                                                
committer.commitCheckpoint(pastCheckpointId);
                                                                
checkpointsToRemove.add(pastCheckpointId);
@@ -159,7 +164,7 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
                                }
                        }
                        for (Long toRemove : checkpointsToRemove) {
-                               Tuple2<Long, StateHandle<DataInputView>> handle 
= state.pendingHandles.get(toRemove);
+                               Tuple2<Long, StreamStateHandle> handle = 
state.pendingHandles.get(toRemove);
                                state.pendingHandles.remove(toRemove);
                                handle.f1.discardState();
                        }
@@ -181,9 +186,9 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
                IN value = element.getValue();
                //generate initial operator state
                if (out == null) {
-                       out = 
getStateBackend().createCheckpointStateOutputView(0, 0);
+                       out = 
getStateBackend().createCheckpointStateOutputStream(0, 0);
                }
-               serializer.serialize(value, out);
+               serializer.serialize(value, new 
DataOutputViewStreamWrapper(out));
        }
 
        @Override
@@ -195,59 +200,21 @@ public abstract class GenericWriteAheadSink<IN> extends 
AbstractStreamOperator<I
         * This state is used to keep a list of all StateHandles (essentially 
references to past OperatorStates) that were
         * used since the last completed checkpoint.
         **/
-       public static class ExactlyOnceState implements 
StateHandle<Serializable> {
+       public static class ExactlyOnceState implements Serializable {
 
                private static final long serialVersionUID = 
-3571063495273460743L;
 
-               protected TreeMap<Long, Tuple2<Long, 
StateHandle<DataInputView>>> pendingHandles;
+               protected TreeMap<Long, Tuple2<Long, StreamStateHandle>> 
pendingHandles;
 
                public ExactlyOnceState() {
                        pendingHandles = new TreeMap<>();
                }
 
-               @Override
-               public TreeMap<Long, Tuple2<Long, StateHandle<DataInputView>>> 
getState(ClassLoader userCodeClassLoader) throws Exception {
+               public TreeMap<Long, Tuple2<Long, StreamStateHandle>> 
getState(ClassLoader userCodeClassLoader) throws Exception {
                        return pendingHandles;
                }
 
                @Override
-               public void discardState() throws Exception {
-                       //we specifically want the state to survive failed 
jobs, so we don't discard anything
-               }
-
-               @Override
-               public long getStateSize() throws Exception {
-                       int stateSize = 0;
-                       for (Tuple2<Long, StateHandle<DataInputView>> pair : 
pendingHandles.values()) {
-                               stateSize += pair.f1.getStateSize();
-                       }
-                       return stateSize;
-               }
-
-               @Override
-               public void close() throws IOException {
-                       Throwable exception = null;
-
-                       for (Tuple2<Long, StateHandle<DataInputView>> pair : 
pendingHandles.values()) {
-                               StateHandle<DataInputView> handle = pair.f1;
-                               if (handle != null) {
-                                       try {
-                                               handle.close();
-                                       }
-                                       catch (Throwable t) {
-                                               if (exception != null) {
-                                                       exception = t;
-                                               }
-                                       }
-                               }
-                       }
-
-                       if (exception != null) {
-                               ExceptionUtils.rethrowIOException(exception);
-                       }
-               }
-
-               @Override
                public String toString() {
                        return this.pendingHandles.toString();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
index fdc8117..2c95099 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java
@@ -24,18 +24,18 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.MathUtils;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 
 import static java.util.Objects.requireNonNull;
 
@@ -244,36 +244,35 @@ public abstract class 
AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT,
        // 
------------------------------------------------------------------------
 
        @Override
-       public StreamTaskState snapshotOperatorState(long checkpointId, long 
timestamp) throws Exception {
-               StreamTaskState taskState = 
super.snapshotOperatorState(checkpointId, timestamp);
-               
+       public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
+               super.snapshotState(out, checkpointId, timestamp);
+
                // we write the panes with the key/value maps into the stream, 
as well as when this state
                // should have triggered and slided
-               AbstractStateBackend.CheckpointStateOutputView out =
-                               
getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
 
-               out.writeLong(nextEvaluationTime);
-               out.writeLong(nextSlideTime);
-               panes.writeToOutput(out, keySerializer, stateTypeSerializer);
-               
-               taskState.setOperatorState(out.closeAndGetHandle());
-               return taskState;
+               DataOutputViewStreamWrapper outView = new 
DataOutputViewStreamWrapper(out);
+
+               outView.writeLong(nextEvaluationTime);
+               outView.writeLong(nextSlideTime);
+
+               panes.writeToOutput(outView, keySerializer, 
stateTypeSerializer);
+
+               outView.flush();
        }
 
        @Override
-       public void restoreState(StreamTaskState taskState) throws Exception {
-               super.restoreState(taskState);
+       public void restoreState(FSDataInputStream in) throws Exception {
+               super.restoreState(in);
 
-               @SuppressWarnings("unchecked")
-               StateHandle<DataInputView> inputState = 
(StateHandle<DataInputView>) taskState.getOperatorState();
-               DataInputView in = 
inputState.getState(getUserCodeClassloader());
-               
-               final long nextEvaluationTime = in.readLong();
-               final long nextSlideTime = in.readLong();
+               DataInputViewStreamWrapper inView = new 
DataInputViewStreamWrapper(in);
+
+               final long nextEvaluationTime = inView.readLong();
+               final long nextSlideTime = inView.readLong();
 
                AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes = 
createPanes(keySelector, function);
-               panes.readFromInput(in, keySerializer, stateTypeSerializer);
-               
+
+               panes.readFromInput(inView, keySerializer, stateTypeSerializer);
+
                restoredState = new RestoredState<>(panes, nextEvaluationTime, 
nextSlideTime);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 12ed60e..dbdd660 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -39,11 +39,13 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
 import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
@@ -59,7 +61,6 @@ import 
org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -857,7 +858,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
 
        @Override
        @SuppressWarnings("unchecked")
-       public StreamTaskState snapshotOperatorState(long checkpointId, long 
timestamp) throws Exception {
+       public void snapshotState(FSDataOutputStream out, long checkpointId, 
long timestamp) throws Exception {
 
                if (mergingWindowsByKey != null) {
                        TupleSerializer<Tuple2<W, W>> tupleSerializer = new 
TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, 
windowSerializer} );
@@ -870,29 +871,16 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                        }
                }
 
-               StreamTaskState taskState = 
super.snapshotOperatorState(checkpointId, timestamp);
+               snapshotTimers(new DataOutputViewStreamWrapper(out));
 
-               AbstractStateBackend.CheckpointStateOutputView out =
-                       
getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
-
-               snapshotTimers(out);
-
-               taskState.setOperatorState(out.closeAndGetHandle());
-
-               return taskState;
+               super.snapshotState(out, checkpointId, timestamp);
        }
 
        @Override
-       public void restoreState(StreamTaskState taskState) throws Exception {
-               super.restoreState(taskState);
-
-               final ClassLoader userClassloader = getUserCodeClassloader();
-
-               @SuppressWarnings("unchecked")
-               StateHandle<DataInputView> inputState = 
(StateHandle<DataInputView>) taskState.getOperatorState();
-               DataInputView in = inputState.getState(userClassloader);
+       public void restoreState(FSDataInputStream in) throws Exception {
+               restoreTimers(new DataInputViewStreamWrapper(in));
 
-               restoreTimers(in);
+               super.restoreState(in);
        }
 
        private void restoreTimers(DataInputView in ) throws IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
index 5bcf41b..108a3ae 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java
@@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.KeyGroupAssigner;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Preconditions;
 
@@ -60,8 +61,10 @@ public class KeyGroupStreamPartitioner<T, K> extends 
StreamPartitioner<T> implem
                } catch (Exception e) {
                        throw new RuntimeException("Could not extract key from 
" + record.getInstance().getValue(), e);
                }
-               returnArray[0] = keyGroupAssigner.getKeyGroupIndex(key) % 
numberOfOutputChannels;
-
+               returnArray[0] = KeyGroupRange.computeOperatorIndexForKeyGroup(
+                               keyGroupAssigner.getNumberKeyGroups(),
+                               numberOfOutputChannels,
+                               keyGroupAssigner.getKeyGroupIndex(key));
                return returnArray;
        }
 

Reply via email to