http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index 1881dad..d6d4af7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.execution.Environment;
 import 
org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState;
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
@@ -41,6 +42,7 @@ import 
org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.Preconditions;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -51,7 +53,6 @@ import org.powermock.modules.junit4.PowerMockRunner;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -254,7 +255,7 @@ public class OperatorStateBackendTest {
                broadcastState.put(5, 6);
 
                CheckpointStreamFactory streamFactory = new 
MemCheckpointStreamFactory(4096);
-               RunnableFuture<OperatorStateHandle> runnableFuture =
+               RunnableFuture<SnapshotResult<OperatorStateHandle>> 
runnableFuture =
                        operatorStateBackend.snapshot(1, 1, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation());
                FutureUtil.runIfNotDoneAndGet(runnableFuture);
 
@@ -375,10 +376,11 @@ public class OperatorStateBackendTest {
 
                CheckpointStreamFactory streamFactory = new 
MemCheckpointStreamFactory(4096);
 
-               RunnableFuture<OperatorStateHandle> snapshot =
+               RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot =
                                operatorStateBackend.snapshot(0L, 0L, 
streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
 
-               OperatorStateHandle stateHandle = 
FutureUtil.runIfNotDoneAndGet(snapshot);
+               SnapshotResult<OperatorStateHandle> snapshotResult = 
FutureUtil.runIfNotDoneAndGet(snapshot);
+               OperatorStateHandle stateHandle = 
snapshotResult.getJobManagerOwnedSnapshot();
                assertNull(stateHandle);
        }
 
@@ -404,15 +406,16 @@ public class OperatorStateBackendTest {
                OperatorStateHandle stateHandle = null;
 
                try {
-                       RunnableFuture<OperatorStateHandle> snapshot =
+                       RunnableFuture<SnapshotResult<OperatorStateHandle>> 
snapshot =
                                        operatorStateBackend.snapshot(0L, 0L, 
streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
 
-                       stateHandle = FutureUtil.runIfNotDoneAndGet(snapshot);
+                       SnapshotResult<OperatorStateHandle> snapshotResult = 
FutureUtil.runIfNotDoneAndGet(snapshot);
+                       stateHandle = 
snapshotResult.getJobManagerOwnedSnapshot();
                        assertNotNull(stateHandle);
 
                        final Map<Integer, Integer> retrieved = new HashMap<>();
 
-                       
operatorStateBackend.restore(Collections.singleton(stateHandle));
+                       
operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle));
                        BroadcastState<Integer, Integer> retrievedState = 
operatorStateBackend.getBroadcastState(broadcastStateDesc);
                        for (Map.Entry<Integer, Integer> e: 
retrievedState.entries()) {
                                retrieved.put(e.getKey(), e.getValue());
@@ -424,10 +427,13 @@ public class OperatorStateBackendTest {
                        expected.remove(1);
 
                        snapshot = operatorStateBackend.snapshot(1L, 1L, 
streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
-                       stateHandle = FutureUtil.runIfNotDoneAndGet(snapshot);
+                       snapshotResult = 
FutureUtil.runIfNotDoneAndGet(snapshot);
+
+                       stateHandle.discardState();
+                       stateHandle = 
snapshotResult.getJobManagerOwnedSnapshot();
 
                        retrieved.clear();
-                       
operatorStateBackend.restore(Collections.singleton(stateHandle));
+                       
operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle));
                        retrievedState = 
operatorStateBackend.getBroadcastState(broadcastStateDesc);
                        for (Map.Entry<Integer, Integer> e: 
retrievedState.immutableEntries()) {
                                retrieved.put(e.getKey(), e.getValue());
@@ -439,16 +445,24 @@ public class OperatorStateBackendTest {
                        expected.clear();
 
                        snapshot = operatorStateBackend.snapshot(2L, 2L, 
streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
-                       stateHandle = FutureUtil.runIfNotDoneAndGet(snapshot);
+                       snapshotResult = 
FutureUtil.runIfNotDoneAndGet(snapshot);
+                       if (stateHandle != null) {
+                               stateHandle.discardState();
+                       }
+                       stateHandle = 
snapshotResult.getJobManagerOwnedSnapshot();
 
                        retrieved.clear();
-                       
operatorStateBackend.restore(Collections.singleton(stateHandle));
+                       
operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle));
                        retrievedState = 
operatorStateBackend.getBroadcastState(broadcastStateDesc);
                        for (Map.Entry<Integer, Integer> e: 
retrievedState.immutableEntries()) {
                                retrieved.put(e.getKey(), e.getValue());
                        }
                        assertTrue(expected.isEmpty());
                        assertEquals(expected, retrieved);
+                       if (stateHandle != null) {
+                               stateHandle.discardState();
+                               stateHandle = null;
+                       }
                } finally {
                        operatorStateBackend.close();
                        operatorStateBackend.dispose();
@@ -497,9 +511,11 @@ public class OperatorStateBackendTest {
                broadcastState2.put(2, 5);
 
                CheckpointStreamFactory streamFactory = new 
MemCheckpointStreamFactory(2 * 4096);
-               RunnableFuture<OperatorStateHandle> runnableFuture =
-                               operatorStateBackend.snapshot(1, 1, 
streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
-               OperatorStateHandle stateHandle = 
FutureUtil.runIfNotDoneAndGet(runnableFuture);
+               RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot =
+                       operatorStateBackend.snapshot(1L, 1L, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation());
+
+               SnapshotResult<OperatorStateHandle> snapshotResult = 
FutureUtil.runIfNotDoneAndGet(snapshot);
+               OperatorStateHandle stateHandle = 
snapshotResult.getJobManagerOwnedSnapshot();
 
                try {
 
@@ -510,7 +526,7 @@ public class OperatorStateBackendTest {
                                        createMockEnvironment(),
                                        "testOperator");
 
-                       
operatorStateBackend.restore(Collections.singletonList(stateHandle));
+                       
operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle));
 
                        assertEquals(3, 
operatorStateBackend.getRegisteredStateNames().size());
                        assertEquals(3, 
operatorStateBackend.getRegisteredBroadcastStateNames().size());
@@ -624,7 +640,7 @@ public class OperatorStateBackendTest {
                streamFactory.setWaiterLatch(waiterLatch);
                streamFactory.setBlockerLatch(blockerLatch);
 
-               RunnableFuture<OperatorStateHandle> runnableFuture =
+               RunnableFuture<SnapshotResult<OperatorStateHandle>> 
runnableFuture =
                                operatorStateBackend.snapshot(1, 1, 
streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
 
                ExecutorService executorService = 
Executors.newFixedThreadPool(1);
@@ -657,7 +673,8 @@ public class OperatorStateBackendTest {
                                new ListStateDescriptor<>("test4", new 
JavaSerializer<MutableType>()));
 
                // run the snapshot
-               OperatorStateHandle stateHandle = runnableFuture.get();
+               SnapshotResult<OperatorStateHandle> snapshotResult = 
runnableFuture.get();
+               OperatorStateHandle stateHandle = 
snapshotResult.getJobManagerOwnedSnapshot();
 
                try {
 
@@ -670,7 +687,7 @@ public class OperatorStateBackendTest {
                                        createMockEnvironment(),
                                        "testOperator");
 
-                       
operatorStateBackend.restore(Collections.singletonList(stateHandle));
+                       
operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle));
 
                        assertEquals(3, 
operatorStateBackend.getRegisteredStateNames().size());
                        assertEquals(3, 
operatorStateBackend.getRegisteredBroadcastStateNames().size());
@@ -762,7 +779,7 @@ public class OperatorStateBackendTest {
                streamFactory.setWaiterLatch(waiterLatch);
                streamFactory.setBlockerLatch(blockerLatch);
 
-               RunnableFuture<OperatorStateHandle> runnableFuture =
+               RunnableFuture<SnapshotResult<OperatorStateHandle>> 
runnableFuture =
                                operatorStateBackend.snapshot(1, 1, 
streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
 
                ExecutorService executorService = 
Executors.newFixedThreadPool(1);
@@ -805,7 +822,7 @@ public class OperatorStateBackendTest {
                streamFactory.setWaiterLatch(waiterLatch);
                streamFactory.setBlockerLatch(blockerLatch);
 
-               RunnableFuture<OperatorStateHandle> runnableFuture =
+               RunnableFuture<SnapshotResult<OperatorStateHandle>> 
runnableFuture =
                                operatorStateBackend.snapshot(1, 1, 
streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
 
                ExecutorService executorService = 
Executors.newFixedThreadPool(1);
@@ -856,9 +873,11 @@ public class OperatorStateBackendTest {
                listState3.add(20);
 
                CheckpointStreamFactory streamFactory = new 
MemCheckpointStreamFactory(4096);
-               RunnableFuture<OperatorStateHandle> runnableFuture =
+               RunnableFuture<SnapshotResult<OperatorStateHandle>> 
runnableFuture =
                        operatorStateBackend.snapshot(1, 1, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation());
-               OperatorStateHandle stateHandle = 
FutureUtil.runIfNotDoneAndGet(runnableFuture);
+
+               SnapshotResult<OperatorStateHandle> snapshotResult = 
FutureUtil.runIfNotDoneAndGet(runnableFuture);
+               OperatorStateHandle stateHandle = 
snapshotResult.getJobManagerOwnedSnapshot();
 
                try {
 
@@ -875,7 +894,7 @@ public class OperatorStateBackendTest {
                        doThrow(new 
IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
                        
PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
 
-                       
operatorStateBackend.restore(Collections.singletonList(stateHandle));
+                       
operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle));
 
                        fail("The operator state restore should have failed if 
the previous state serializer could not be loaded.");
                } catch (IOException expected) {

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateHandleTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateHandleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateHandleTest.java
deleted file mode 100644
index 88f9cd7..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateHandleTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-public class OperatorStateHandleTest {
-
-       @Test
-       public void testFixedEnumOrder() {
-
-               // Ensure the order / ordinal of all values of enum 'mode' are 
fixed, as this is used for serialization
-               Assert.assertEquals(0, 
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE.ordinal());
-               Assert.assertEquals(1, 
OperatorStateHandle.Mode.UNION.ordinal());
-               Assert.assertEquals(2, 
OperatorStateHandle.Mode.BROADCAST.ordinal());
-
-               // Ensure all enum values are registered and fixed forever by 
this test
-               Assert.assertEquals(3, 
OperatorStateHandle.Mode.values().length);
-
-               // Byte is used to encode enum value on serialization
-               Assert.assertTrue(OperatorStateHandle.Mode.values().length <= 
Byte.MAX_VALUE);
-       }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStreamStateHandleTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStreamStateHandleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStreamStateHandleTest.java
new file mode 100644
index 0000000..57c6c64
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStreamStateHandleTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class OperatorStreamStateHandleTest {
+
+       @Test
+       public void testFixedEnumOrder() {
+
+               // Ensure the order / ordinal of all values of enum 'mode' are 
fixed, as this is used for serialization
+               Assert.assertEquals(0, 
OperatorStateHandle.Mode.SPLIT_DISTRIBUTE.ordinal());
+               Assert.assertEquals(1, 
OperatorStateHandle.Mode.UNION.ordinal());
+               Assert.assertEquals(2, 
OperatorStateHandle.Mode.BROADCAST.ordinal());
+
+               // Ensure all enum values are registered and fixed forever by 
this test
+               Assert.assertEquals(3, 
OperatorStateHandle.Mode.values().length);
+
+               // Byte is used to encode enum value on serialization
+               Assert.assertTrue(OperatorStateHandle.Mode.values().length <= 
Byte.MAX_VALUE);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotDirectoryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotDirectoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotDirectoryTest.java
new file mode 100644
index 0000000..9b090d6
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotDirectoryTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.UUID;
+
+public class SnapshotDirectoryTest extends TestLogger {
+
+       private static TemporaryFolder temporaryFolder;
+
+       @BeforeClass
+       public static void beforeClass() throws IOException {
+               temporaryFolder = new TemporaryFolder();
+               temporaryFolder.create();
+       }
+
+       @AfterClass
+       public static void afterClass() {
+               temporaryFolder.delete();
+       }
+
+       /**
+        * Tests if mkdirs for snapshot directories works.
+        */
+       @Test
+       public void mkdirs() throws Exception {
+               File folderRoot = temporaryFolder.getRoot();
+               File newFolder = new File(folderRoot, 
String.valueOf(UUID.randomUUID()));
+               File innerNewFolder = new File(newFolder, 
String.valueOf(UUID.randomUUID()));
+               Path path = new Path(innerNewFolder.toURI());
+
+               Assert.assertFalse(newFolder.isDirectory());
+               Assert.assertFalse(innerNewFolder.isDirectory());
+               SnapshotDirectory snapshotDirectory = 
SnapshotDirectory.permanent(path);
+               Assert.assertFalse(snapshotDirectory.exists());
+               Assert.assertFalse(newFolder.isDirectory());
+               Assert.assertFalse(innerNewFolder.isDirectory());
+
+               Assert.assertTrue(snapshotDirectory.mkdirs());
+               Assert.assertTrue(newFolder.isDirectory());
+               Assert.assertTrue(innerNewFolder.isDirectory());
+               Assert.assertTrue(snapshotDirectory.exists());
+       }
+
+       /**
+        * Tests if indication of directory existence works.
+        */
+       @Test
+       public void exists() throws Exception {
+               File folderRoot = temporaryFolder.getRoot();
+               File folderA = new File(folderRoot, 
String.valueOf(UUID.randomUUID()));
+
+               Assert.assertFalse(folderA.isDirectory());
+               Path path = new Path(folderA.toURI());
+               SnapshotDirectory snapshotDirectory = 
SnapshotDirectory.permanent(path);
+               Assert.assertFalse(snapshotDirectory.exists());
+               Assert.assertTrue(folderA.mkdirs());
+               Assert.assertTrue(snapshotDirectory.exists());
+               Assert.assertTrue(folderA.delete());
+               Assert.assertFalse(snapshotDirectory.exists());
+       }
+
+       /**
+        * Tests listing of file statuses works like listing on the path 
directly.
+        */
+       @Test
+       public void listStatus() throws Exception {
+               File folderRoot = temporaryFolder.getRoot();
+               File folderA = new File(folderRoot, 
String.valueOf(UUID.randomUUID()));
+               File folderB = new File(folderA, 
String.valueOf(UUID.randomUUID()));
+               Assert.assertTrue(folderB.mkdirs());
+               File file = new File(folderA, "test.txt");
+               Assert.assertTrue(file.createNewFile());
+
+               Path path = new Path(folderA.toURI());
+               FileSystem fileSystem = path.getFileSystem();
+               SnapshotDirectory snapshotDirectory = 
SnapshotDirectory.permanent(path);
+               Assert.assertTrue(snapshotDirectory.exists());
+
+               Assert.assertEquals(
+                       Arrays.toString(fileSystem.listStatus(path)),
+                       Arrays.toString(snapshotDirectory.listStatus()));
+       }
+
+       /**
+        * Tests that reporting the handle of a completed snapshot works as 
expected and that the directory for completed
+        * snapshot is not deleted by {@link #deleteIfNotCompeltedSnapshot()}.
+        */
+       @Test
+       public void completeSnapshotAndGetHandle() throws Exception {
+               File folderRoot = temporaryFolder.getRoot();
+               File folderA = new File(folderRoot, 
String.valueOf(UUID.randomUUID()));
+               Assert.assertTrue(folderA.mkdirs());
+               Path folderAPath = new Path(folderA.toURI());
+
+               SnapshotDirectory snapshotDirectory = 
SnapshotDirectory.permanent(folderAPath);
+
+               // check that completed checkpoint dirs are not deleted as 
incomplete.
+               DirectoryStateHandle handle = 
snapshotDirectory.completeSnapshotAndGetHandle();
+               Assert.assertNotNull(handle);
+               Assert.assertTrue(snapshotDirectory.cleanup());
+               Assert.assertTrue(folderA.isDirectory());
+               Assert.assertEquals(folderAPath, handle.getDirectory());
+               handle.discardState();
+
+               Assert.assertFalse(folderA.isDirectory());
+               Assert.assertTrue(folderA.mkdirs());
+               snapshotDirectory = SnapshotDirectory.permanent(folderAPath);
+               Assert.assertTrue(snapshotDirectory.cleanup());
+               try {
+                       snapshotDirectory.completeSnapshotAndGetHandle();
+                       Assert.fail();
+               } catch (IOException ignore) {
+               }
+       }
+
+       /**
+        * Tests that snapshot director behaves correct for delete calls. 
Completed snapshots should not be deleted,
+        * only ongoing snapshots can.
+        */
+       @Test
+       public void deleteIfNotCompeltedSnapshot() throws Exception {
+               File folderRoot = temporaryFolder.getRoot();
+               File folderA = new File(folderRoot, 
String.valueOf(UUID.randomUUID()));
+               File folderB = new File(folderA, 
String.valueOf(UUID.randomUUID()));
+               Assert.assertTrue(folderB.mkdirs());
+               File file = new File(folderA, "test.txt");
+               Assert.assertTrue(file.createNewFile());
+               Path folderAPath = new Path(folderA.toURI());
+               SnapshotDirectory snapshotDirectory = 
SnapshotDirectory.permanent(folderAPath);
+               Assert.assertTrue(snapshotDirectory.cleanup());
+               Assert.assertFalse(folderA.isDirectory());
+               Assert.assertTrue(folderA.mkdirs());
+               Assert.assertTrue(file.createNewFile());
+               snapshotDirectory = SnapshotDirectory.permanent(folderAPath);
+               snapshotDirectory.completeSnapshotAndGetHandle();
+               Assert.assertTrue(snapshotDirectory.cleanup());
+               Assert.assertTrue(folderA.isDirectory());
+               Assert.assertTrue(file.exists());
+       }
+
+       /**
+        * This test checks that completing or deleting the snapshot influence 
the #isSnapshotOngoing() flag.
+        */
+       @Test
+       public void isSnapshotOngoing() throws Exception {
+               File folderRoot = temporaryFolder.getRoot();
+               File folderA = new File(folderRoot, 
String.valueOf(UUID.randomUUID()));
+               Assert.assertTrue(folderA.mkdirs());
+               Path pathA = new Path(folderA.toURI());
+               SnapshotDirectory snapshotDirectory = 
SnapshotDirectory.permanent(pathA);
+               Assert.assertFalse(snapshotDirectory.isSnapshotCompleted());
+               
Assert.assertNotNull(snapshotDirectory.completeSnapshotAndGetHandle());
+               Assert.assertTrue(snapshotDirectory.isSnapshotCompleted());
+               snapshotDirectory = SnapshotDirectory.permanent(pathA);
+               Assert.assertFalse(snapshotDirectory.isSnapshotCompleted());
+               snapshotDirectory.cleanup();
+               Assert.assertFalse(snapshotDirectory.isSnapshotCompleted());
+       }
+
+       /**
+        * Tests that temporary directories have the right behavior on 
completion and deletion.
+        */
+       @Test
+       public void temporary() throws Exception {
+               File folderRoot = temporaryFolder.getRoot();
+               File folder = new File(folderRoot, 
String.valueOf(UUID.randomUUID()));
+               Assert.assertTrue(folder.mkdirs());
+               Path folderPath = new Path(folder.toURI());
+               SnapshotDirectory tmpSnapshotDirectory = 
SnapshotDirectory.temporary(folderPath);
+               // temporary snapshot directories should not return a handle, 
because they will be deleted.
+               
Assert.assertNull(tmpSnapshotDirectory.completeSnapshotAndGetHandle());
+               // check that the directory is deleted even after we called 
#completeSnapshotAndGetHandle.
+               Assert.assertTrue(tmpSnapshotDirectory.cleanup());
+               Assert.assertFalse(folder.exists());
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotResultTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotResultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotResultTest.java
new file mode 100644
index 0000000..63a9340
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotResultTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.TestLogger;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class SnapshotResultTest extends TestLogger {
+
+       @Test
+       public void discardState() throws Exception {
+               SnapshotResult<StateObject> result = 
SnapshotResult.withLocalState(mock(StateObject.class), mock(StateObject.class));
+               result.discardState();
+               verify(result.getJobManagerOwnedSnapshot()).discardState();
+               verify(result.getTaskLocalSnapshot()).discardState();
+       }
+
+       @Test
+       public void getStateSize() {
+               long size = 42L;
+
+               SnapshotResult<StateObject> result = 
SnapshotResult.withLocalState(
+                       new DummyStateObject(size),
+                       new DummyStateObject(size));
+               Assert.assertEquals(size, result.getStateSize());
+       }
+
+       static class DummyStateObject implements StateObject {
+
+               private static final long serialVersionUID = 1L;
+
+               private final long size;
+
+               DummyStateObject(long size) {
+                       this.size = size;
+               }
+
+               @Override
+               public void discardState() {
+               }
+
+               @Override
+               public long getStateSize() {
+                       return size;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 7838450..11ae389 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.runtime.state;
 
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
@@ -53,6 +57,7 @@ import org.apache.flink.queryablestate.KvStateID;
 import 
org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -69,15 +74,9 @@ import 
org.apache.flink.runtime.state.internal.InternalValueState;
 import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
 import org.apache.flink.shaded.guava18.com.google.common.base.Joiner;
 import org.apache.flink.types.IntValue;
-import org.apache.flink.util.FutureUtil;
 import org.apache.flink.util.IOUtils;
 import org.apache.flink.util.StateMigrationException;
 import org.apache.flink.util.TestLogger;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -141,7 +140,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
        }
 
        protected <K> AbstractKeyedStateBackend<K> 
createKeyedBackend(TypeSerializer<K> keySerializer) throws Exception {
-               return createKeyedBackend(keySerializer, new 
DummyEnvironment("test", 1, 0));
+               return createKeyedBackend(keySerializer, new 
DummyEnvironment());
        }
 
        protected <K> AbstractKeyedStateBackend<K> 
createKeyedBackend(TypeSerializer<K> keySerializer, Environment env) throws 
Exception {
@@ -173,7 +172,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
        }
 
        protected <K> AbstractKeyedStateBackend<K> 
restoreKeyedBackend(TypeSerializer<K> keySerializer, KeyedStateHandle state) 
throws Exception {
-               return restoreKeyedBackend(keySerializer, state, new 
DummyEnvironment("test", 1, 0));
+               return restoreKeyedBackend(keySerializer, state, new 
DummyEnvironment());
        }
 
        protected <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(
@@ -204,7 +203,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        keyGroupRange,
                        env.getTaskKvStateRegistry());
 
-               backend.restore(state);
+               backend.restore(new StateObjectCollection<>(state));
 
                return backend;
        }
@@ -274,7 +273,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
        @SuppressWarnings("unchecked")
        public void testBackendUsesRegisteredKryoDefaultSerializer() throws 
Exception {
                CheckpointStreamFactory streamFactory = createStreamFactory();
-               Environment env = new DummyEnvironment("test", 1, 0);
+               Environment env = new DummyEnvironment();
                AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
 
                // cast because our test serializer is not typed to TestPojo
@@ -329,7 +328,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
        @SuppressWarnings("unchecked")
        public void 
testBackendUsesRegisteredKryoDefaultSerializerUsingGetOrCreate() throws 
Exception {
                CheckpointStreamFactory streamFactory = createStreamFactory();
-               Environment env = new DummyEnvironment("test", 1, 0);
+               Environment env = new DummyEnvironment();
                AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
 
                // cast because our test serializer is not typed to TestPojo
@@ -388,7 +387,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
        @Test
        public void testBackendUsesRegisteredKryoSerializer() throws Exception {
                CheckpointStreamFactory streamFactory = createStreamFactory();
-               Environment env = new DummyEnvironment("test", 1, 0);
+               Environment env = new DummyEnvironment();
                AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
 
                env.getExecutionConfig()
@@ -443,7 +442,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
        @SuppressWarnings("unchecked")
        public void testBackendUsesRegisteredKryoSerializerUsingGetOrCreate() 
throws Exception {
                CheckpointStreamFactory streamFactory = createStreamFactory();
-               Environment env = new DummyEnvironment("test", 1, 0);
+               Environment env = new DummyEnvironment();
                AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
 
                
env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, 
ExceptionThrowingTestSerializer.class);
@@ -507,7 +506,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
        @Test
        public void testKryoRegisteringRestoreResilienceWithRegisteredType() 
throws Exception {
                CheckpointStreamFactory streamFactory = createStreamFactory();
-               Environment env = new DummyEnvironment("test", 1, 0);
+               Environment env = new DummyEnvironment();
                AbstractKeyedStateBackend<Integer> backend = 
createKeyedBackend(IntSerializer.INSTANCE, env);
 
                TypeInformation<TestPojo> pojoType = new 
GenericTypeInfo<>(TestPojo.class);
@@ -569,7 +568,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
        public void testKryoRegisteringRestoreResilienceWithDefaultSerializer() 
throws Exception {
                CheckpointStreamFactory streamFactory = createStreamFactory();
                SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
-               Environment env = new DummyEnvironment("test", 1, 0);
+               Environment env = new DummyEnvironment();
                AbstractKeyedStateBackend<Integer> backend = null;
 
                try {
@@ -671,7 +670,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
        public void 
testKryoRegisteringRestoreResilienceWithRegisteredSerializer() throws Exception 
{
                CheckpointStreamFactory streamFactory = createStreamFactory();
                SharedStateRegistry sharedStateRegistry = new 
SharedStateRegistry();
-               Environment env = new DummyEnvironment("test", 1, 0);
+               Environment env = new DummyEnvironment();
 
                AbstractKeyedStateBackend<Integer> backend = null;
 
@@ -761,7 +760,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
        @Test
        public void testKryoRestoreResilienceWithDifferentRegistrationOrder() 
throws Exception {
                CheckpointStreamFactory streamFactory = createStreamFactory();
-               Environment env = new DummyEnvironment("test", 1, 0);
+               Environment env = new DummyEnvironment();
 
                // register A first then B
                
env.getExecutionConfig().registerKryoType(TestNestedPojoClassA.class);
@@ -796,7 +795,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                // ========== restore snapshot, with a different registration 
order in the configuration ==========
 
-               env = new DummyEnvironment("test", 1, 0);
+               env = new DummyEnvironment();
 
                
env.getExecutionConfig().registerKryoType(TestNestedPojoClassB.class); // this 
time register B first
                
env.getExecutionConfig().registerKryoType(TestNestedPojoClassA.class);
@@ -828,7 +827,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
        @Test
        public void testPojoRestoreResilienceWithDifferentRegistrationOrder() 
throws Exception {
                CheckpointStreamFactory streamFactory = createStreamFactory();
-               Environment env = new DummyEnvironment("test", 1, 0);
+               Environment env = new DummyEnvironment();
 
                // register A first then B
                
env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class);
@@ -863,7 +862,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                // ========== restore snapshot, with a different registration 
order in the configuration ==========
 
-               env = new DummyEnvironment("test", 1, 0);
+               env = new DummyEnvironment();
 
                
env.getExecutionConfig().registerPojoType(TestNestedPojoClassB.class); // this 
time register B first
                
env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class);
@@ -924,7 +923,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                assertEquals("1", getSerializedValue(kvState, 1, keySerializer, 
VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer));
 
                // draw a snapshot
-               KeyedStateHandle snapshot1 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
+               KeyedStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
 
                // make some more modifications
                backend.setCurrentKey(1);
@@ -935,7 +934,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                state.update("u3");
 
                // draw another snapshot
-               KeyedStateHandle snapshot2 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
+               KeyedStateHandle snapshot2 = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
 
                // validate the original state
                backend.setCurrentKey(1);
@@ -1110,7 +1109,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                IntSerializer.INSTANCE,
                                1,
                                new KeyGroupRange(0, 0),
-                               new DummyEnvironment("test_op", 1, 0));
+                               new DummyEnvironment());
 
                ValueStateDescriptor<String> desc1 = new 
ValueStateDescriptor<>("a-string", StringSerializer.INSTANCE);
                ValueStateDescriptor<Integer> desc2 = new 
ValueStateDescriptor<>("an-integer", IntSerializer.INSTANCE);
@@ -1134,7 +1133,8 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                assertEquals(13, (int) state2.value());
 
                // draw a snapshot
-               KeyedStateHandle snapshot1 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
+               KeyedStateHandle snapshot1 =
+                       runSnapshot(backend.snapshot(682375462378L, 2, 
streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
 
                backend.dispose();
                backend = restoreKeyedBackend(
@@ -1142,7 +1142,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                1,
                                new KeyGroupRange(0, 0),
                                Collections.singletonList(snapshot1),
-                               new DummyEnvironment("test_op", 1, 0));
+                               new DummyEnvironment());
 
                snapshot1.discardState();
 
@@ -1206,7 +1206,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                assertEquals(42L, (long) state.value());
 
                // draw a snapshot
-               KeyedStateHandle snapshot1 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
+               KeyedStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
 
                backend.dispose();
                backend = restoreKeyedBackend(IntSerializer.INSTANCE, 
snapshot1);
@@ -1934,7 +1934,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                final AggregatingStateDescriptor<Long, MutableLong, Long> 
stateDescr =
                        new AggregatingStateDescriptor<>("my-state", new 
MutableAggregatingAddingFunction(), MutableLong.class);
-               
+
                AbstractKeyedStateBackend<String> keyedBackend = 
createKeyedBackend(StringSerializer.INSTANCE);
 
                try {
@@ -2700,7 +2700,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                IntSerializer.INSTANCE,
                                MAX_PARALLELISM,
                                new KeyGroupRange(0, MAX_PARALLELISM - 1),
-                               new DummyEnvironment("test", 1, 0));
+                               new DummyEnvironment());
 
                ValueStateDescriptor<String> kvId = new 
ValueStateDescriptor<>("id", String.class);
 
@@ -2727,7 +2727,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                state.update("ShouldBeInSecondHalf");
 
 
-               KeyedStateHandle snapshot = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(0, 0, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
+               KeyedStateHandle snapshot = runSnapshot(backend.snapshot(0, 0, 
streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
 
                List<KeyedStateHandle> firstHalfKeyGroupStates = 
StateAssignmentOperation.getKeyedStateHandles(
                                Collections.singletonList(snapshot),
@@ -2745,7 +2745,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                MAX_PARALLELISM,
                                new KeyGroupRange(0, 4),
                                firstHalfKeyGroupStates,
-                               new DummyEnvironment("test", 1, 0));
+                               new DummyEnvironment());
 
                // backend for the second half of the key group range
                final AbstractKeyedStateBackend<Integer> secondHalfBackend = 
restoreKeyedBackend(
@@ -2753,7 +2753,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                MAX_PARALLELISM,
                                new KeyGroupRange(5, 9),
                                secondHalfKeyGroupStates,
-                               new DummyEnvironment("test", 1, 0));
+                               new DummyEnvironment());
 
 
                ValueState<String> firstHalfState = 
firstHalfBackend.getPartitionedState(VoidNamespace.INSTANCE, 
VoidNamespaceSerializer.INSTANCE, kvId);
@@ -2794,7 +2794,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                state.update("2");
 
                // draw a snapshot
-               KeyedStateHandle snapshot1 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
+               KeyedStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
 
                backend.dispose();
 
@@ -2825,7 +2825,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        state.update("2");
 
                        // draw a snapshot
-                       KeyedStateHandle snapshot1 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
+                       KeyedStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
 
                        backend.dispose();
                        // restore the first snapshot and validate it
@@ -2868,7 +2868,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        state.add("2");
 
                        // draw a snapshot
-                       KeyedStateHandle snapshot1 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
+                       KeyedStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
 
                        backend.dispose();
                        // restore the first snapshot and validate it
@@ -2913,7 +2913,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        state.add("2");
 
                        // draw a snapshot
-                       KeyedStateHandle snapshot1 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
+                       KeyedStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
 
                        backend.dispose();
                        // restore the first snapshot and validate it
@@ -2956,7 +2956,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        state.put("2", "Second");
 
                        // draw a snapshot
-                       KeyedStateHandle snapshot1 = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
+                       KeyedStateHandle snapshot1 = 
runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
 
                        backend.dispose();
                        // restore the first snapshot and validate it
@@ -3050,7 +3050,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                IntSerializer.INSTANCE,
                                numberOfKeyGroups,
                                new KeyGroupRange(0, 0),
-                               new DummyEnvironment("test_op", 1, 0));
+                               new DummyEnvironment());
 
                {
                        // ValueState
@@ -3193,7 +3193,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
         */
        @Test
        public void testQueryableStateRegistration() throws Exception {
-               DummyEnvironment env = new DummyEnvironment("test", 1, 0);
+               DummyEnvironment env = new DummyEnvironment();
                KvStateRegistry registry = env.getKvStateRegistry();
 
                CheckpointStreamFactory streamFactory = createStreamFactory();
@@ -3215,7 +3215,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                eq(env.getJobID()), eq(env.getJobVertexId()), 
eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class));
 
 
-               KeyedStateHandle snapshot = 
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
+               KeyedStateHandle snapshot = 
runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
 
                backend.dispose();
 
@@ -3247,7 +3247,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
 
                        // draw a snapshot
                        KeyedStateHandle snapshot =
-                                       
FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 1, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation()));
+                               runSnapshot(backend.snapshot(682375462379L, 1, 
streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
                        assertNull(snapshot);
                        backend.dispose();
 
@@ -3335,7 +3335,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                valueState.update(i);
                        }
 
-                       RunnableFuture<KeyedStateHandle> snapshot1 =
+                       RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshot1 =
                                backend.snapshot(0L, 0L, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation());
 
                        Thread runner1 = new Thread(snapshot1, 
"snapshot-1-runner");
@@ -3353,7 +3353,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        streamFactory.setWaiterLatch(null);
                        streamFactory.setBlockerLatch(null);
 
-                       RunnableFuture<KeyedStateHandle> snapshot2 =
+                       RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshot2 =
                                backend.snapshot(1L, 1L, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation());
 
                        Thread runner2 = new 
Thread(snapshot2,"snapshot-2-runner");
@@ -3392,7 +3392,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                valueState.update(i);
                        }
 
-                       RunnableFuture<KeyedStateHandle> snapshot =
+                       RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshot =
                                        backend.snapshot(0L, 0L, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation());
                        Thread runner = new Thread(snapshot);
                        runner.start();
@@ -3405,7 +3405,8 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                        }
 
                        runner.join();
-                       stateHandle = snapshot.get();
+                       SnapshotResult<KeyedStateHandle> snapshotResult = 
snapshot.get();
+                       stateHandle = 
snapshotResult.getJobManagerOwnedSnapshot();
 
                        // test isolation
                        for (int i = 0; i < 20; ++i) {
@@ -3476,7 +3477,7 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                                valueState.update(i);
                        }
 
-                       RunnableFuture<KeyedStateHandle> snapshot =
+                       RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshot =
                                        backend.snapshot(0L, 0L, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation());
 
                        Thread runner = new Thread(snapshot);
@@ -3588,12 +3589,15 @@ public abstract class StateBackendTestBase<B extends 
AbstractStateBackend> exten
                }
        }
 
-       protected KeyedStateHandle runSnapshot(RunnableFuture<KeyedStateHandle> 
snapshotRunnableFuture) throws Exception {
-               if(!snapshotRunnableFuture.isDone()) {
-                       Thread runner = new Thread(snapshotRunnableFuture);
-                       runner.start();
+       protected KeyedStateHandle runSnapshot(
+               RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshotRunnableFuture) throws Exception {
+
+               if (!snapshotRunnableFuture.isDone()) {
+                       snapshotRunnableFuture.run();
                }
-               return snapshotRunnableFuture.get();
+
+               SnapshotResult<KeyedStateHandle> snapshotResult = 
snapshotRunnableFuture.get();
+               return snapshotResult.getJobManagerOwnedSnapshot();
        }
 
        public static class TestPojo implements Serializable {

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
index a7a4b9a..7d903cc 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
 import org.apache.flink.runtime.state.internal.InternalValueState;
@@ -32,7 +33,6 @@ import org.apache.commons.io.IOUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.util.Collections;
 import java.util.concurrent.RunnableFuture;
 
 import static org.mockito.Mockito.mock;
@@ -40,7 +40,7 @@ import static org.mockito.Mockito.mock;
 public class StateSnapshotCompressionTest extends TestLogger {
 
        @Test
-       public void testCompressionConfiguration() throws Exception {
+       public void testCompressionConfiguration() {
 
                ExecutionConfig executionConfig = new ExecutionConfig();
                executionConfig.setUseSnapshotCompression(true);
@@ -52,7 +52,8 @@ public class StateSnapshotCompressionTest extends TestLogger {
                        16,
                        new KeyGroupRange(0, 15),
                        true,
-                       executionConfig);
+                       executionConfig,
+                       TestLocalRecoveryConfig.disabled());
 
                try {
                        Assert.assertTrue(
@@ -73,7 +74,8 @@ public class StateSnapshotCompressionTest extends TestLogger {
                        16,
                        new KeyGroupRange(0, 15),
                        true,
-                       executionConfig);
+                       executionConfig,
+                       TestLocalRecoveryConfig.disabled());
 
                try {
                        Assert.assertTrue(
@@ -112,7 +114,8 @@ public class StateSnapshotCompressionTest extends 
TestLogger {
                        16,
                        new KeyGroupRange(0, 15),
                        true,
-                       executionConfig);
+                       executionConfig,
+                       TestLocalRecoveryConfig.disabled());
 
                try {
 
@@ -134,10 +137,11 @@ public class StateSnapshotCompressionTest extends 
TestLogger {
                        state.setCurrentNamespace(VoidNamespace.INSTANCE);
                        state.update("45");
                        CheckpointStreamFactory streamFactory = new 
MemCheckpointStreamFactory(4 * 1024 * 1024);
-                       RunnableFuture<KeyedStateHandle> snapshot =
+                       RunnableFuture<SnapshotResult<KeyedStateHandle>> 
snapshot =
                                stateBackend.snapshot(0L, 0L, streamFactory, 
CheckpointOptions.forCheckpointWithDefaultLocation());
                        snapshot.run();
-                       stateHandle = snapshot.get();
+                       SnapshotResult<KeyedStateHandle> snapshotResult = 
snapshot.get();
+                       stateHandle = 
snapshotResult.getJobManagerOwnedSnapshot();
 
                } finally {
                        IOUtils.closeQuietly(stateBackend);
@@ -153,10 +157,11 @@ public class StateSnapshotCompressionTest extends 
TestLogger {
                        16,
                        new KeyGroupRange(0, 15),
                        true,
-                       executionConfig);
+                       executionConfig,
+                       TestLocalRecoveryConfig.disabled());
                try {
 
-                       
stateBackend.restore(Collections.singletonList(stateHandle));
+                       
stateBackend.restore(StateObjectCollection.singleton(stateHandle));
 
                        InternalValueState<VoidNamespace, String> state = 
stateBackend.createValueState(
                                new VoidNamespaceSerializer(),

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
new file mode 100644
index 0000000..2e9b107
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.net.InetAddress;
+
+public class TaskExecutorLocalStateStoresManagerTest extends TestLogger {
+
+       @ClassRule
+       public static TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+       private static final long MEM_SIZE_PARAM = 128L*1024*1024;
+
+       /**
+        * This tests that the creation of {@link TaskManagerServices} 
correctly creates the local state root directory
+        * for the {@link TaskExecutorLocalStateStoresManager} with the 
configured root directory.
+        */
+       @Test
+       public void testCreationFromConfig() throws Exception {
+
+               final Configuration config = new Configuration();
+
+               File newFolder = temporaryFolder.newFolder();
+               String tmpDir = newFolder.getAbsolutePath() + File.separator;
+               final String rootDirString = 
"__localStateRoot1,__localStateRoot2,__localStateRoot3".replaceAll("__", 
tmpDir);
+
+               // test configuration of the local state directories
+               
config.setString(CheckpointingOptions.LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS,
 rootDirString);
+
+               // test configuration of the local state mode
+               config.setString(CheckpointingOptions.LOCAL_RECOVERY, 
"ENABLE_FILE_BASED");
+
+               final ResourceID tmResourceID = ResourceID.generate();
+
+               TaskManagerServicesConfiguration 
taskManagerServicesConfiguration =
+                       
TaskManagerServicesConfiguration.fromConfiguration(config, 
InetAddress.getLocalHost(), true);
+
+               TaskManagerServices taskManagerServices = 
TaskManagerServices.fromConfiguration(
+                       taskManagerServicesConfiguration,
+                       tmResourceID,
+                       Executors.directExecutor(),
+                       MEM_SIZE_PARAM,
+                       MEM_SIZE_PARAM);
+
+               TaskExecutorLocalStateStoresManager taskStateManager = 
taskManagerServices.getTaskManagerStateStore();
+
+               // verify configured directories for local state
+               String[] split = rootDirString.split(",");
+               File[] rootDirectories = 
taskStateManager.getLocalStateRootDirectories();
+               for (int i = 0; i < split.length; ++i) {
+                       Assert.assertEquals(
+                               new File(split[i], 
TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
+                               rootDirectories[i]);
+               }
+
+               // verify local recovery mode
+               Assert.assertEquals(
+                       LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED,
+                       taskStateManager.getLocalRecoveryMode());
+
+               Assert.assertEquals("localState", 
TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT);
+               for (File rootDirectory : rootDirectories) {
+                       FileUtils.deleteFileOrDirectory(rootDirectory);
+               }
+       }
+
+       /**
+        * This tests that the creation of {@link TaskManagerServices} 
correctly falls back to the first tmp directory of
+        * the IOManager as default for the local state root directory.
+        */
+       @Test
+       public void testCreationFromConfigDefault() throws Exception {
+
+               final Configuration config = new Configuration();
+
+               final ResourceID tmResourceID = ResourceID.generate();
+
+               TaskManagerServicesConfiguration 
taskManagerServicesConfiguration =
+                       
TaskManagerServicesConfiguration.fromConfiguration(config, 
InetAddress.getLocalHost(), true);
+
+               TaskManagerServices taskManagerServices = 
TaskManagerServices.fromConfiguration(
+                       taskManagerServicesConfiguration,
+                       tmResourceID,
+                       Executors.directExecutor(),
+                       MEM_SIZE_PARAM,
+                       MEM_SIZE_PARAM);
+
+               TaskExecutorLocalStateStoresManager taskStateManager = 
taskManagerServices.getTaskManagerStateStore();
+
+               String[] tmpDirPaths = 
taskManagerServicesConfiguration.getTmpDirPaths();
+               File[] localStateRootDirectories = 
taskStateManager.getLocalStateRootDirectories();
+
+               for (int i = 0; i < tmpDirPaths.length; ++i) {
+                       Assert.assertEquals(
+                               new File(tmpDirPaths[i], 
TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
+                               localStateRootDirectories[i]);
+               }
+
+               Assert.assertEquals(
+                       LocalRecoveryConfig.LocalRecoveryMode.DISABLED,
+                       taskStateManager.getLocalRecoveryMode());
+       }
+
+       /**
+        * This tests that the {@link TaskExecutorLocalStateStoresManager} 
creates {@link TaskLocalStateStoreImpl} that have
+        * a properly initialized local state base directory. It also checks 
that subdirectories are correctly deleted on
+        * shutdown.
+        */
+       @Test
+       public void testSubtaskStateStoreDirectoryCreateAndDelete() throws 
Exception {
+
+               JobID jobID = new JobID();
+               JobVertexID jobVertexID = new JobVertexID();
+               AllocationID allocationID = new AllocationID();
+               int subtaskIdx = 23;
+
+               File[] rootDirs = {temporaryFolder.newFolder(), 
temporaryFolder.newFolder(), temporaryFolder.newFolder()};
+               TaskExecutorLocalStateStoresManager storesManager = new 
TaskExecutorLocalStateStoresManager(
+                       LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED,
+                       rootDirs,
+                       Executors.directExecutor());
+
+               TaskLocalStateStore taskLocalStateStore =
+                       storesManager.localStateStoreForSubtask(jobID, 
allocationID, jobVertexID, subtaskIdx);
+
+               LocalRecoveryDirectoryProvider directoryProvider =
+                       
taskLocalStateStore.getLocalRecoveryConfig().getLocalStateDirectoryProvider();
+
+               for (int i = 0; i < 10; ++i) {
+                       Assert.assertEquals(
+                               new File(
+                                       rootDirs[(i & Integer.MAX_VALUE) % 
rootDirs.length],
+                                       
storesManager.allocationSubDirString(allocationID)),
+                               directoryProvider.allocationBaseDirectory(i));
+               }
+
+               long chkId = 42L;
+               File allocBaseDirChk42 = 
directoryProvider.allocationBaseDirectory(chkId);
+               File subtaskSpecificCheckpointDirectory = 
directoryProvider.subtaskSpecificCheckpointDirectory(chkId);
+               Assert.assertEquals(
+                       new File(
+                               allocBaseDirChk42,
+                               "jid_" + jobID + File.separator +
+                                       "vtx_" + jobVertexID + "_" +
+                                       "sti_" + subtaskIdx + File.separator +
+                                       "chk_" + chkId),
+                       subtaskSpecificCheckpointDirectory);
+
+               Assert.assertTrue(subtaskSpecificCheckpointDirectory.mkdirs());
+
+               File testFile = new File(subtaskSpecificCheckpointDirectory, 
"test");
+               Assert.assertTrue(testFile.createNewFile());
+
+               // test that local recovery mode is forwarded to the created 
store
+               Assert.assertEquals(
+                       storesManager.getLocalRecoveryMode(),
+                       
taskLocalStateStore.getLocalRecoveryConfig().getLocalRecoveryMode());
+
+               Assert.assertTrue(testFile.exists());
+
+               // check cleanup after releasing allocation id
+               storesManager.releaseLocalStateForAllocationId(allocationID);
+               checkRootDirsClean(rootDirs);
+
+               AllocationID otherAllocationID = new AllocationID();
+
+               taskLocalStateStore =
+                       storesManager.localStateStoreForSubtask(jobID, 
otherAllocationID, jobVertexID, subtaskIdx);
+
+               directoryProvider = 
taskLocalStateStore.getLocalRecoveryConfig().getLocalStateDirectoryProvider();
+
+               File chkDir = 
directoryProvider.subtaskSpecificCheckpointDirectory(23L);
+               Assert.assertTrue(chkDir.mkdirs());
+               testFile = new File(chkDir, "test");
+               Assert.assertTrue(testFile.createNewFile());
+
+               // check cleanup after shutdown
+               storesManager.shutdown();
+               checkRootDirsClean(rootDirs);
+       }
+
+       private void checkRootDirsClean(File[] rootDirs) {
+               for (File rootDir : rootDirs) {
+                       File[] files = rootDir.listFiles();
+                       if (files != null) {
+                               Assert.assertArrayEquals(new File[0], files);
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
new file mode 100644
index 0000000..1641676
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+public class TaskLocalStateStoreImplTest {
+
+       private TemporaryFolder temporaryFolder;
+       private JobID jobID;
+       private AllocationID allocationID;
+       private JobVertexID jobVertexID;
+       private int subtaskIdx;
+       private File[] allocationBaseDirs;
+       private TaskLocalStateStoreImpl taskLocalStateStore;
+
+       @Before
+       public void before() throws Exception {
+               this.temporaryFolder = new TemporaryFolder();
+               this.temporaryFolder.create();
+               this.jobID = new JobID();
+               this.allocationID = new AllocationID();
+               this.jobVertexID = new JobVertexID();
+               this.subtaskIdx = 0;
+               this.allocationBaseDirs = new 
File[]{temporaryFolder.newFolder(), temporaryFolder.newFolder()};
+
+               LocalRecoveryDirectoryProviderImpl directoryProvider =
+                       new 
LocalRecoveryDirectoryProviderImpl(allocationBaseDirs, jobID, jobVertexID, 
subtaskIdx);
+
+               LocalRecoveryConfig localRecoveryConfig =
+                       new 
LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, 
directoryProvider);
+
+               this.taskLocalStateStore = new TaskLocalStateStoreImpl(
+                       jobID,
+                       allocationID,
+                       jobVertexID,
+                       subtaskIdx,
+                       localRecoveryConfig,
+                       Executors.directExecutor());
+       }
+
+       @After
+       public void after() {
+               this.temporaryFolder.delete();
+       }
+
+       /**
+        * Test that the instance delivers a correctly configured 
LocalRecoveryDirectoryProvider.
+        */
+       @Test
+       public void getLocalRecoveryRootDirectoryProvider() {
+
+               LocalRecoveryConfig directoryProvider = 
taskLocalStateStore.getLocalRecoveryConfig();
+               Assert.assertEquals(
+                       allocationBaseDirs.length,
+                       
directoryProvider.getLocalStateDirectoryProvider().allocationBaseDirsCount());
+
+               for (int i = 0; i < allocationBaseDirs.length; ++i) {
+                       Assert.assertEquals(
+                               allocationBaseDirs[i],
+                               
directoryProvider.getLocalStateDirectoryProvider().selectAllocationBaseDirectory(i));
+               }
+       }
+
+       /**
+        * Tests basic store/retrieve of local state.
+        */
+       @Test
+       public void storeAndRetrieve() throws Exception {
+
+               final int chkCount = 3;
+
+               for (int i = 0; i < chkCount; ++i) {
+                       
Assert.assertNull(taskLocalStateStore.retrieveLocalState(i));
+               }
+
+               List<TaskStateSnapshot> taskStateSnapshots = 
storeStates(chkCount);
+
+               checkStoredAsExpected(taskStateSnapshots, 0, chkCount);
+
+               
Assert.assertNull(taskLocalStateStore.retrieveLocalState(chkCount + 1));
+       }
+
+       /**
+        * Tests pruning of previous checkpoints if a new checkpoint is 
confirmed.
+        */
+       @Test
+       public void confirmCheckpoint() throws Exception {
+
+               final int chkCount = 3;
+               final int confirmed = chkCount - 1;
+               List<TaskStateSnapshot> taskStateSnapshots = 
storeStates(chkCount);
+               taskLocalStateStore.confirmCheckpoint(confirmed);
+               checkPrunedAndDiscarded(taskStateSnapshots, 0, confirmed);
+               checkStoredAsExpected(taskStateSnapshots, confirmed, chkCount);
+       }
+
+       /**
+        * Tests that disposal of a {@link TaskLocalStateStoreImpl} works and 
discards all local states.
+        */
+       @Test
+       public void dispose() throws Exception {
+               final int chkCount = 3;
+               final int confirmed = chkCount - 1;
+               List<TaskStateSnapshot> taskStateSnapshots = 
storeStates(chkCount);
+               taskLocalStateStore.confirmCheckpoint(confirmed);
+               taskLocalStateStore.dispose();
+
+               checkPrunedAndDiscarded(taskStateSnapshots, 0, chkCount);
+       }
+
+       private void checkStoredAsExpected(List<TaskStateSnapshot> history, int 
off, int len) throws Exception {
+               for (int i = off; i < len; ++i) {
+                       TaskStateSnapshot expected = history.get(i);
+                       Assert.assertTrue(expected == 
taskLocalStateStore.retrieveLocalState(i));
+                       Mockito.verify(expected, 
Mockito.never()).discardState();
+               }
+       }
+
+       private void checkPrunedAndDiscarded(List<TaskStateSnapshot> history, 
int off, int len) throws Exception {
+               for (int i = off; i < len; ++i) {
+                       
Assert.assertNull(taskLocalStateStore.retrieveLocalState(i));
+                       Mockito.verify(history.get(i)).discardState();
+               }
+       }
+
+       private List<TaskStateSnapshot> storeStates(int count) {
+               List<TaskStateSnapshot> taskStateSnapshots = new 
ArrayList<>(count);
+               for (int i = 0; i < count; ++i) {
+                       OperatorID operatorID = new OperatorID();
+                       TaskStateSnapshot taskStateSnapshot = spy(new 
TaskStateSnapshot());
+                       OperatorSubtaskState operatorSubtaskState = new 
OperatorSubtaskState();
+                       
taskStateSnapshot.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState);
+                       taskLocalStateStore.storeLocalState(i, 
taskStateSnapshot);
+                       taskStateSnapshots.add(taskStateSnapshot);
+               }
+               return taskStateSnapshots;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
index 47bbebb..926c196 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
@@ -23,93 +23,221 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateHandleDummyUtil;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
-
+import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
-import static org.mockito.Mockito.mock;
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.Executor;
 
-public class TaskStateManagerImplTest {
+public class TaskStateManagerImplTest extends TestLogger {
 
+       /**
+        * Test reporting and retrieving prioritized local and remote state.
+        */
        @Test
        public void testStateReportingAndRetrieving() {
 
-               JobID jobID = new JobID(42L, 43L);
-               ExecutionAttemptID executionAttemptID = new 
ExecutionAttemptID(23L, 24L);
-               TestCheckpointResponder checkpointResponderMock = new 
TestCheckpointResponder();
+               JobID jobID = new JobID();
+               ExecutionAttemptID executionAttemptID = new 
ExecutionAttemptID();
+
+               TestCheckpointResponder testCheckpointResponder = new 
TestCheckpointResponder();
+               TestTaskLocalStateStore testTaskLocalStateStore = new 
TestTaskLocalStateStore();
 
                TaskStateManager taskStateManager = taskStateManager(
                        jobID,
                        executionAttemptID,
-                       checkpointResponderMock,
-                       null);
+                       testCheckpointResponder,
+                       null,
+                       testTaskLocalStateStore);
 
                //---------------------------------------- test reporting 
-----------------------------------------
 
                CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(74L, 11L);
                CheckpointMetrics checkpointMetrics = new CheckpointMetrics();
-               TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot();
+               TaskStateSnapshot jmTaskStateSnapshot = new TaskStateSnapshot();
 
                OperatorID operatorID_1 = new OperatorID(1L, 1L);
                OperatorID operatorID_2 = new OperatorID(2L, 2L);
                OperatorID operatorID_3 = new OperatorID(3L, 3L);
 
-               
Assert.assertNull(taskStateManager.operatorStates(operatorID_1));
-               
Assert.assertNull(taskStateManager.operatorStates(operatorID_2));
-               
Assert.assertNull(taskStateManager.operatorStates(operatorID_3));
+               
Assert.assertFalse(taskStateManager.prioritizedOperatorState(operatorID_1).isRestored());
+               
Assert.assertFalse(taskStateManager.prioritizedOperatorState(operatorID_2).isRestored());
+               
Assert.assertFalse(taskStateManager.prioritizedOperatorState(operatorID_3).isRestored());
+
+               KeyGroupRange keyGroupRange = new KeyGroupRange(0,1);
+               // Remote state of operator 1 has only managed keyed state.
+               OperatorSubtaskState jmOperatorSubtaskState_1 =
+                       new OperatorSubtaskState(null, null, 
StateHandleDummyUtil.createNewKeyedStateHandle(keyGroupRange), null);
+               // Remote state of operator 1 has only raw keyed state.
+               OperatorSubtaskState jmOperatorSubtaskState_2 =
+                       new OperatorSubtaskState(null, null, null, 
StateHandleDummyUtil.createNewKeyedStateHandle(keyGroupRange));
+
+               jmTaskStateSnapshot.putSubtaskStateByOperatorID(operatorID_1, 
jmOperatorSubtaskState_1);
+               jmTaskStateSnapshot.putSubtaskStateByOperatorID(operatorID_2, 
jmOperatorSubtaskState_2);
 
-               OperatorSubtaskState operatorSubtaskState_1 = new 
OperatorSubtaskState();
-               OperatorSubtaskState operatorSubtaskState_2 = new 
OperatorSubtaskState();
+               TaskStateSnapshot tmTaskStateSnapshot = new TaskStateSnapshot();
 
-               taskStateSnapshot.putSubtaskStateByOperatorID(operatorID_1, 
operatorSubtaskState_1);
-               taskStateSnapshot.putSubtaskStateByOperatorID(operatorID_2, 
operatorSubtaskState_2);
+               // Only operator 1 has a local alternative for the managed 
keyed state.
+               OperatorSubtaskState tmOperatorSubtaskState_1 =
+                       new OperatorSubtaskState(null, null, 
StateHandleDummyUtil.createNewKeyedStateHandle(keyGroupRange), null);
 
-               taskStateManager.reportStateHandles(checkpointMetaData, 
checkpointMetrics, taskStateSnapshot);
+               tmTaskStateSnapshot.putSubtaskStateByOperatorID(operatorID_1, 
tmOperatorSubtaskState_1);
+
+               taskStateManager.reportTaskStateSnapshots(
+                       checkpointMetaData,
+                       checkpointMetrics,
+                       jmTaskStateSnapshot,
+                       tmTaskStateSnapshot);
 
                TestCheckpointResponder.AcknowledgeReport acknowledgeReport =
-                       checkpointResponderMock.getAcknowledgeReports().get(0);
+                       testCheckpointResponder.getAcknowledgeReports().get(0);
 
+               // checks that the checkpoint responder and the local state 
store received state as expected.
                Assert.assertEquals(checkpointMetaData.getCheckpointId(), 
acknowledgeReport.getCheckpointId());
                Assert.assertEquals(checkpointMetrics, 
acknowledgeReport.getCheckpointMetrics());
                Assert.assertEquals(executionAttemptID, 
acknowledgeReport.getExecutionAttemptID());
                Assert.assertEquals(jobID, acknowledgeReport.getJobID());
-               Assert.assertEquals(taskStateSnapshot, 
acknowledgeReport.getSubtaskState());
+               Assert.assertEquals(jmTaskStateSnapshot, 
acknowledgeReport.getSubtaskState());
+               Assert.assertEquals(tmTaskStateSnapshot, 
testTaskLocalStateStore.retrieveLocalState(checkpointMetaData.getCheckpointId()));
 
-               //---------------------------------------- test retrieving 
-----------------------------------------
+               //-------------------------------------- test prio retrieving 
---------------------------------------
 
                JobManagerTaskRestore taskRestore = new JobManagerTaskRestore(
-                       0L,
+                       checkpointMetaData.getCheckpointId(),
                        acknowledgeReport.getSubtaskState());
 
                taskStateManager = taskStateManager(
                        jobID,
                        executionAttemptID,
-                       checkpointResponderMock,
-                       taskRestore);
+                       testCheckpointResponder,
+                       taskRestore,
+                       testTaskLocalStateStore);
+
+               // this has remote AND local managed keyed state.
+               PrioritizedOperatorSubtaskState prioritized_1 = 
taskStateManager.prioritizedOperatorState(operatorID_1);
+               // this has only remote raw keyed state.
+               PrioritizedOperatorSubtaskState prioritized_2 = 
taskStateManager.prioritizedOperatorState(operatorID_2);
+               // not restored.
+               PrioritizedOperatorSubtaskState prioritized_3 = 
taskStateManager.prioritizedOperatorState(operatorID_3);
+
+               Assert.assertTrue(prioritized_1.isRestored());
+               Assert.assertTrue(prioritized_2.isRestored());
+               Assert.assertFalse(prioritized_3.isRestored());
+               
Assert.assertFalse(taskStateManager.prioritizedOperatorState(new 
OperatorID()).isRestored());
+
+               // checks for operator 1.
+               Iterator<StateObjectCollection<KeyedStateHandle>> 
prioritizedManagedKeyedState_1 =
+                       prioritized_1.getPrioritizedManagedKeyedState();
+
+               Assert.assertTrue(prioritizedManagedKeyedState_1.hasNext());
+               StateObjectCollection<KeyedStateHandle> current = 
prioritizedManagedKeyedState_1.next();
+               KeyedStateHandle keyedStateHandleExp = 
tmOperatorSubtaskState_1.getManagedKeyedState().iterator().next();
+               KeyedStateHandle keyedStateHandleAct = 
current.iterator().next();
+               Assert.assertTrue(keyedStateHandleExp == keyedStateHandleAct);
+               Assert.assertTrue(prioritizedManagedKeyedState_1.hasNext());
+               current = prioritizedManagedKeyedState_1.next();
+               keyedStateHandleExp = 
jmOperatorSubtaskState_1.getManagedKeyedState().iterator().next();
+               keyedStateHandleAct = current.iterator().next();
+               Assert.assertTrue(keyedStateHandleExp == keyedStateHandleAct);
+               Assert.assertFalse(prioritizedManagedKeyedState_1.hasNext());
+
+               // checks for operator 2.
+               Iterator<StateObjectCollection<KeyedStateHandle>> 
prioritizedRawKeyedState_2 =
+                       prioritized_2.getPrioritizedRawKeyedState();
+
+               Assert.assertTrue(prioritizedRawKeyedState_2.hasNext());
+               current = prioritizedRawKeyedState_2.next();
+               keyedStateHandleExp = 
jmOperatorSubtaskState_2.getRawKeyedState().iterator().next();
+               keyedStateHandleAct = current.iterator().next();
+               Assert.assertTrue(keyedStateHandleExp == keyedStateHandleAct);
+               Assert.assertFalse(prioritizedRawKeyedState_2.hasNext());
+       }
+
+       /**
+        * This tests if the {@link TaskStateManager} properly returns the the 
subtask local state dir from the
+        * corresponding {@link TaskLocalStateStoreImpl}.
+        */
+       @Test
+       public void testForwardingSubtaskLocalStateBaseDirFromLocalStateStore() 
throws IOException {
+               JobID jobID = new JobID(42L, 43L);
+               AllocationID allocationID = new AllocationID(4711L, 23L);
+               JobVertexID jobVertexID = new JobVertexID(12L, 34L);
+               ExecutionAttemptID executionAttemptID = new 
ExecutionAttemptID(23L, 24L);
+               TestCheckpointResponder checkpointResponderMock = new 
TestCheckpointResponder();
 
-               Assert.assertEquals(operatorSubtaskState_1, 
taskStateManager.operatorStates(operatorID_1));
-               Assert.assertEquals(operatorSubtaskState_2, 
taskStateManager.operatorStates(operatorID_2));
-               
Assert.assertNull(taskStateManager.operatorStates(operatorID_3));
+               Executor directExecutor = Executors.directExecutor();
+
+               TemporaryFolder tmpFolder = new TemporaryFolder();
+
+               try {
+                       tmpFolder.create();
+
+                       File[] allocBaseDirs = new 
File[]{tmpFolder.newFolder(), tmpFolder.newFolder(), tmpFolder.newFolder()};
+
+                       LocalRecoveryDirectoryProviderImpl directoryProvider =
+                               new 
LocalRecoveryDirectoryProviderImpl(allocBaseDirs, jobID, jobVertexID, 0);
+
+                       LocalRecoveryConfig localRecoveryConfig =
+                               new 
LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, 
directoryProvider);
+
+                       TaskLocalStateStore taskLocalStateStore =
+                               new TaskLocalStateStoreImpl(jobID, 
allocationID, jobVertexID, 13, localRecoveryConfig, directExecutor);
+
+                       TaskStateManager taskStateManager = taskStateManager(
+                               jobID,
+                               executionAttemptID,
+                               checkpointResponderMock,
+                               null,
+                               taskLocalStateStore);
+
+                       LocalRecoveryConfig 
localRecoveryConfFromTaskLocalStateStore =
+                               taskLocalStateStore.getLocalRecoveryConfig();
+
+                       LocalRecoveryConfig 
localRecoveryConfFromTaskStateManager =
+                               taskStateManager.createLocalRecoveryConfig();
+
+
+                       for (int i = 0; i < 10; ++i) {
+                               Assert.assertEquals(allocBaseDirs[i % 
allocBaseDirs.length],
+                                       
localRecoveryConfFromTaskLocalStateStore.getLocalStateDirectoryProvider().allocationBaseDirectory(i));
+                               Assert.assertEquals(allocBaseDirs[i % 
allocBaseDirs.length],
+                                       
localRecoveryConfFromTaskStateManager.getLocalStateDirectoryProvider().allocationBaseDirectory(i));
+                       }
+
+                       Assert.assertEquals(
+                               
localRecoveryConfFromTaskLocalStateStore.getLocalRecoveryMode(),
+                               
localRecoveryConfFromTaskStateManager.getLocalRecoveryMode());
+               } finally {
+                       tmpFolder.delete();
+               }
        }
 
        public static TaskStateManager taskStateManager(
                JobID jobID,
                ExecutionAttemptID executionAttemptID,
                CheckpointResponder checkpointResponderMock,
-               JobManagerTaskRestore jobManagerTaskRestore) {
-
-               // for now just a mock because this is not yet implemented
-               TaskLocalStateStore taskLocalStateStore = 
mock(TaskLocalStateStore.class);
+               JobManagerTaskRestore jobManagerTaskRestore,
+               TaskLocalStateStore localStateStore) {
 
                return new TaskStateManagerImpl(
                        jobID,
                        executionAttemptID,
-                       taskLocalStateStore,
+                       localStateStore,
                        jobManagerTaskRestore,
                        checkpointResponderMock);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
new file mode 100644
index 0000000..7801720
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import java.io.File;
+
+/**
+ * Helper methods to easily create a {@link LocalRecoveryConfig} for tests.
+ */
+public class TestLocalRecoveryConfig {
+
+       private static final LocalRecoveryDirectoryProvider INSTANCE = new 
TestDummyLocalDirectoryProvider();
+
+       public static LocalRecoveryConfig disabled() {
+               return new 
LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, INSTANCE);
+       }
+
+       public static class TestDummyLocalDirectoryProvider implements 
LocalRecoveryDirectoryProvider {
+
+               private TestDummyLocalDirectoryProvider() {
+               }
+
+               @Override
+               public File allocationBaseDirectory(long checkpointId) {
+                       throw new UnsupportedOperationException("Test dummy");
+               }
+
+               @Override
+               public File subtaskBaseDirectory(long checkpointId) {
+                       throw new UnsupportedOperationException("Test dummy");
+               }
+
+               @Override
+               public File subtaskSpecificCheckpointDirectory(long 
checkpointId) {
+                       throw new UnsupportedOperationException("Test dummy");
+               }
+
+               @Override
+               public File selectAllocationBaseDirectory(int idx) {
+                       throw new UnsupportedOperationException("Test dummy");
+               }
+
+               @Override
+               public File selectSubtaskBaseDirectory(int idx) {
+                       throw new UnsupportedOperationException("Test dummy");
+               }
+
+               @Override
+               public int allocationBaseDirsCount() {
+                       throw new UnsupportedOperationException("Test dummy");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestMemoryCheckpointOutputStream.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestMemoryCheckpointOutputStream.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestMemoryCheckpointOutputStream.java
index 5accc19..8ea76c6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestMemoryCheckpointOutputStream.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestMemoryCheckpointOutputStream.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 
 final class TestMemoryCheckpointOutputStream extends 
MemCheckpointStreamFactory.MemoryCheckpointOutputStream {
@@ -41,9 +43,10 @@ final class TestMemoryCheckpointOutputStream extends 
MemCheckpointStreamFactory.
                return this.closed;
        }
 
+       @Nullable
        @Override
        public StreamStateHandle closeAndGetHandle() throws IOException {
                this.closed = true;
                return super.closeAndGetHandle();
        }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskLocalStateStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskLocalStateStore.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskLocalStateStore.java
new file mode 100644
index 0000000..12c07dd
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskLocalStateStore.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+/**
+ * Test implementation of a {@link TaskLocalStateStore}.
+ */
+public class TestTaskLocalStateStore implements TaskLocalStateStore {
+
+       private final SortedMap<Long, TaskStateSnapshot> 
taskStateSnapshotsByCheckpointID;
+
+       private final LocalRecoveryConfig localRecoveryConfig;
+
+       private boolean disposed;
+
+       public TestTaskLocalStateStore() {
+               this(TestLocalRecoveryConfig.disabled());
+       }
+
+       public TestTaskLocalStateStore(@Nonnull LocalRecoveryConfig 
localRecoveryConfig) {
+               this.localRecoveryConfig = localRecoveryConfig;
+               this.taskStateSnapshotsByCheckpointID = new TreeMap<>();
+               this.disposed = false;
+       }
+
+       @Override
+       public void storeLocalState(long checkpointId, @Nullable 
TaskStateSnapshot localState) {
+               Preconditions.checkState(!disposed);
+               taskStateSnapshotsByCheckpointID.put(checkpointId, localState);
+       }
+
+       @Nullable
+       @Override
+       public TaskStateSnapshot retrieveLocalState(long checkpointID) {
+               Preconditions.checkState(!disposed);
+               return taskStateSnapshotsByCheckpointID.get(checkpointID);
+       }
+
+       public void dispose() {
+               if (!disposed) {
+                       disposed = true;
+                       for (TaskStateSnapshot stateSnapshot : 
taskStateSnapshotsByCheckpointID.values()) {
+                               try {
+                                       stateSnapshot.discardState();
+                               } catch (Exception e) {
+                                       throw new RuntimeException(e);
+                               }
+                       }
+                       taskStateSnapshotsByCheckpointID.clear();
+               }
+       }
+
+       @Nonnull
+       @Override
+       public LocalRecoveryConfig getLocalRecoveryConfig() {
+               Preconditions.checkState(!disposed);
+               return Preconditions.checkNotNull(localRecoveryConfig);
+       }
+
+       @Override
+       public void confirmCheckpoint(long confirmedCheckpointId) {
+               Preconditions.checkState(!disposed);
+               Iterator<Map.Entry<Long, TaskStateSnapshot>> iterator = 
taskStateSnapshotsByCheckpointID.entrySet().iterator();
+               while (iterator.hasNext()) {
+                       Map.Entry<Long, TaskStateSnapshot> entry = 
iterator.next();
+                       if (entry.getKey() < confirmedCheckpointId) {
+                               iterator.remove();
+                               try {
+                                       entry.getValue().discardState();
+                               } catch (Exception e) {
+                                       throw new RuntimeException(e);
+                               }
+                       } else {
+                               break;
+                       }
+               }
+       }
+
+       public boolean isDisposed() {
+               return disposed;
+       }
+
+       public SortedMap<Long, TaskStateSnapshot> 
getTaskStateSnapshotsByCheckpointID() {
+               return taskStateSnapshotsByCheckpointID;
+       }
+}

Reply via email to