http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java index 1881dad..d6d4af7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java @@ -34,6 +34,7 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; @@ -41,6 +42,7 @@ import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; import org.apache.flink.util.FutureUtil; import org.apache.flink.util.Preconditions; + import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -51,7 +53,6 @@ import org.powermock.modules.junit4.PowerMockRunner; import java.io.File; import java.io.IOException; import java.io.Serializable; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -254,7 +255,7 @@ public class OperatorStateBackendTest { broadcastState.put(5, 6); CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096); - RunnableFuture<OperatorStateHandle> runnableFuture = + RunnableFuture<SnapshotResult<OperatorStateHandle>> runnableFuture = operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); FutureUtil.runIfNotDoneAndGet(runnableFuture); @@ -375,10 +376,11 @@ public class OperatorStateBackendTest { CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096); - RunnableFuture<OperatorStateHandle> snapshot = + RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot = operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); - OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(snapshot); + SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtil.runIfNotDoneAndGet(snapshot); + OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot(); assertNull(stateHandle); } @@ -404,15 +406,16 @@ public class OperatorStateBackendTest { OperatorStateHandle stateHandle = null; try { - RunnableFuture<OperatorStateHandle> snapshot = + RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot = operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); - stateHandle = FutureUtil.runIfNotDoneAndGet(snapshot); + SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtil.runIfNotDoneAndGet(snapshot); + stateHandle = snapshotResult.getJobManagerOwnedSnapshot(); assertNotNull(stateHandle); final Map<Integer, Integer> retrieved = new HashMap<>(); - operatorStateBackend.restore(Collections.singleton(stateHandle)); + operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle)); BroadcastState<Integer, Integer> retrievedState = operatorStateBackend.getBroadcastState(broadcastStateDesc); for (Map.Entry<Integer, Integer> e: retrievedState.entries()) { retrieved.put(e.getKey(), e.getValue()); @@ -424,10 +427,13 @@ public class OperatorStateBackendTest { expected.remove(1); snapshot = operatorStateBackend.snapshot(1L, 1L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); - stateHandle = FutureUtil.runIfNotDoneAndGet(snapshot); + snapshotResult = FutureUtil.runIfNotDoneAndGet(snapshot); + + stateHandle.discardState(); + stateHandle = snapshotResult.getJobManagerOwnedSnapshot(); retrieved.clear(); - operatorStateBackend.restore(Collections.singleton(stateHandle)); + operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle)); retrievedState = operatorStateBackend.getBroadcastState(broadcastStateDesc); for (Map.Entry<Integer, Integer> e: retrievedState.immutableEntries()) { retrieved.put(e.getKey(), e.getValue()); @@ -439,16 +445,24 @@ public class OperatorStateBackendTest { expected.clear(); snapshot = operatorStateBackend.snapshot(2L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); - stateHandle = FutureUtil.runIfNotDoneAndGet(snapshot); + snapshotResult = FutureUtil.runIfNotDoneAndGet(snapshot); + if (stateHandle != null) { + stateHandle.discardState(); + } + stateHandle = snapshotResult.getJobManagerOwnedSnapshot(); retrieved.clear(); - operatorStateBackend.restore(Collections.singleton(stateHandle)); + operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle)); retrievedState = operatorStateBackend.getBroadcastState(broadcastStateDesc); for (Map.Entry<Integer, Integer> e: retrievedState.immutableEntries()) { retrieved.put(e.getKey(), e.getValue()); } assertTrue(expected.isEmpty()); assertEquals(expected, retrieved); + if (stateHandle != null) { + stateHandle.discardState(); + stateHandle = null; + } } finally { operatorStateBackend.close(); operatorStateBackend.dispose(); @@ -497,9 +511,11 @@ public class OperatorStateBackendTest { broadcastState2.put(2, 5); CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(2 * 4096); - RunnableFuture<OperatorStateHandle> runnableFuture = - operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); - OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(runnableFuture); + RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot = + operatorStateBackend.snapshot(1L, 1L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); + + SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtil.runIfNotDoneAndGet(snapshot); + OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot(); try { @@ -510,7 +526,7 @@ public class OperatorStateBackendTest { createMockEnvironment(), "testOperator"); - operatorStateBackend.restore(Collections.singletonList(stateHandle)); + operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle)); assertEquals(3, operatorStateBackend.getRegisteredStateNames().size()); assertEquals(3, operatorStateBackend.getRegisteredBroadcastStateNames().size()); @@ -624,7 +640,7 @@ public class OperatorStateBackendTest { streamFactory.setWaiterLatch(waiterLatch); streamFactory.setBlockerLatch(blockerLatch); - RunnableFuture<OperatorStateHandle> runnableFuture = + RunnableFuture<SnapshotResult<OperatorStateHandle>> runnableFuture = operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); ExecutorService executorService = Executors.newFixedThreadPool(1); @@ -657,7 +673,8 @@ public class OperatorStateBackendTest { new ListStateDescriptor<>("test4", new JavaSerializer<MutableType>())); // run the snapshot - OperatorStateHandle stateHandle = runnableFuture.get(); + SnapshotResult<OperatorStateHandle> snapshotResult = runnableFuture.get(); + OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot(); try { @@ -670,7 +687,7 @@ public class OperatorStateBackendTest { createMockEnvironment(), "testOperator"); - operatorStateBackend.restore(Collections.singletonList(stateHandle)); + operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle)); assertEquals(3, operatorStateBackend.getRegisteredStateNames().size()); assertEquals(3, operatorStateBackend.getRegisteredBroadcastStateNames().size()); @@ -762,7 +779,7 @@ public class OperatorStateBackendTest { streamFactory.setWaiterLatch(waiterLatch); streamFactory.setBlockerLatch(blockerLatch); - RunnableFuture<OperatorStateHandle> runnableFuture = + RunnableFuture<SnapshotResult<OperatorStateHandle>> runnableFuture = operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); ExecutorService executorService = Executors.newFixedThreadPool(1); @@ -805,7 +822,7 @@ public class OperatorStateBackendTest { streamFactory.setWaiterLatch(waiterLatch); streamFactory.setBlockerLatch(blockerLatch); - RunnableFuture<OperatorStateHandle> runnableFuture = + RunnableFuture<SnapshotResult<OperatorStateHandle>> runnableFuture = operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); ExecutorService executorService = Executors.newFixedThreadPool(1); @@ -856,9 +873,11 @@ public class OperatorStateBackendTest { listState3.add(20); CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096); - RunnableFuture<OperatorStateHandle> runnableFuture = + RunnableFuture<SnapshotResult<OperatorStateHandle>> runnableFuture = operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); - OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(runnableFuture); + + SnapshotResult<OperatorStateHandle> snapshotResult = FutureUtil.runIfNotDoneAndGet(runnableFuture); + OperatorStateHandle stateHandle = snapshotResult.getJobManagerOwnedSnapshot(); try { @@ -875,7 +894,7 @@ public class OperatorStateBackendTest { doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class)); PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy); - operatorStateBackend.restore(Collections.singletonList(stateHandle)); + operatorStateBackend.restore(StateObjectCollection.singleton(stateHandle)); fail("The operator state restore should have failed if the previous state serializer could not be loaded."); } catch (IOException expected) {
http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateHandleTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateHandleTest.java deleted file mode 100644 index 88f9cd7..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateHandleTest.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -import org.junit.Assert; -import org.junit.Test; - -public class OperatorStateHandleTest { - - @Test - public void testFixedEnumOrder() { - - // Ensure the order / ordinal of all values of enum 'mode' are fixed, as this is used for serialization - Assert.assertEquals(0, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE.ordinal()); - Assert.assertEquals(1, OperatorStateHandle.Mode.UNION.ordinal()); - Assert.assertEquals(2, OperatorStateHandle.Mode.BROADCAST.ordinal()); - - // Ensure all enum values are registered and fixed forever by this test - Assert.assertEquals(3, OperatorStateHandle.Mode.values().length); - - // Byte is used to encode enum value on serialization - Assert.assertTrue(OperatorStateHandle.Mode.values().length <= Byte.MAX_VALUE); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStreamStateHandleTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStreamStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStreamStateHandleTest.java new file mode 100644 index 0000000..57c6c64 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStreamStateHandleTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.junit.Assert; +import org.junit.Test; + +public class OperatorStreamStateHandleTest { + + @Test + public void testFixedEnumOrder() { + + // Ensure the order / ordinal of all values of enum 'mode' are fixed, as this is used for serialization + Assert.assertEquals(0, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE.ordinal()); + Assert.assertEquals(1, OperatorStateHandle.Mode.UNION.ordinal()); + Assert.assertEquals(2, OperatorStateHandle.Mode.BROADCAST.ordinal()); + + // Ensure all enum values are registered and fixed forever by this test + Assert.assertEquals(3, OperatorStateHandle.Mode.values().length); + + // Byte is used to encode enum value on serialization + Assert.assertTrue(OperatorStateHandle.Mode.values().length <= Byte.MAX_VALUE); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotDirectoryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotDirectoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotDirectoryTest.java new file mode 100644 index 0000000..9b090d6 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotDirectoryTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.util.TestLogger; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.UUID; + +public class SnapshotDirectoryTest extends TestLogger { + + private static TemporaryFolder temporaryFolder; + + @BeforeClass + public static void beforeClass() throws IOException { + temporaryFolder = new TemporaryFolder(); + temporaryFolder.create(); + } + + @AfterClass + public static void afterClass() { + temporaryFolder.delete(); + } + + /** + * Tests if mkdirs for snapshot directories works. + */ + @Test + public void mkdirs() throws Exception { + File folderRoot = temporaryFolder.getRoot(); + File newFolder = new File(folderRoot, String.valueOf(UUID.randomUUID())); + File innerNewFolder = new File(newFolder, String.valueOf(UUID.randomUUID())); + Path path = new Path(innerNewFolder.toURI()); + + Assert.assertFalse(newFolder.isDirectory()); + Assert.assertFalse(innerNewFolder.isDirectory()); + SnapshotDirectory snapshotDirectory = SnapshotDirectory.permanent(path); + Assert.assertFalse(snapshotDirectory.exists()); + Assert.assertFalse(newFolder.isDirectory()); + Assert.assertFalse(innerNewFolder.isDirectory()); + + Assert.assertTrue(snapshotDirectory.mkdirs()); + Assert.assertTrue(newFolder.isDirectory()); + Assert.assertTrue(innerNewFolder.isDirectory()); + Assert.assertTrue(snapshotDirectory.exists()); + } + + /** + * Tests if indication of directory existence works. + */ + @Test + public void exists() throws Exception { + File folderRoot = temporaryFolder.getRoot(); + File folderA = new File(folderRoot, String.valueOf(UUID.randomUUID())); + + Assert.assertFalse(folderA.isDirectory()); + Path path = new Path(folderA.toURI()); + SnapshotDirectory snapshotDirectory = SnapshotDirectory.permanent(path); + Assert.assertFalse(snapshotDirectory.exists()); + Assert.assertTrue(folderA.mkdirs()); + Assert.assertTrue(snapshotDirectory.exists()); + Assert.assertTrue(folderA.delete()); + Assert.assertFalse(snapshotDirectory.exists()); + } + + /** + * Tests listing of file statuses works like listing on the path directly. + */ + @Test + public void listStatus() throws Exception { + File folderRoot = temporaryFolder.getRoot(); + File folderA = new File(folderRoot, String.valueOf(UUID.randomUUID())); + File folderB = new File(folderA, String.valueOf(UUID.randomUUID())); + Assert.assertTrue(folderB.mkdirs()); + File file = new File(folderA, "test.txt"); + Assert.assertTrue(file.createNewFile()); + + Path path = new Path(folderA.toURI()); + FileSystem fileSystem = path.getFileSystem(); + SnapshotDirectory snapshotDirectory = SnapshotDirectory.permanent(path); + Assert.assertTrue(snapshotDirectory.exists()); + + Assert.assertEquals( + Arrays.toString(fileSystem.listStatus(path)), + Arrays.toString(snapshotDirectory.listStatus())); + } + + /** + * Tests that reporting the handle of a completed snapshot works as expected and that the directory for completed + * snapshot is not deleted by {@link #deleteIfNotCompeltedSnapshot()}. + */ + @Test + public void completeSnapshotAndGetHandle() throws Exception { + File folderRoot = temporaryFolder.getRoot(); + File folderA = new File(folderRoot, String.valueOf(UUID.randomUUID())); + Assert.assertTrue(folderA.mkdirs()); + Path folderAPath = new Path(folderA.toURI()); + + SnapshotDirectory snapshotDirectory = SnapshotDirectory.permanent(folderAPath); + + // check that completed checkpoint dirs are not deleted as incomplete. + DirectoryStateHandle handle = snapshotDirectory.completeSnapshotAndGetHandle(); + Assert.assertNotNull(handle); + Assert.assertTrue(snapshotDirectory.cleanup()); + Assert.assertTrue(folderA.isDirectory()); + Assert.assertEquals(folderAPath, handle.getDirectory()); + handle.discardState(); + + Assert.assertFalse(folderA.isDirectory()); + Assert.assertTrue(folderA.mkdirs()); + snapshotDirectory = SnapshotDirectory.permanent(folderAPath); + Assert.assertTrue(snapshotDirectory.cleanup()); + try { + snapshotDirectory.completeSnapshotAndGetHandle(); + Assert.fail(); + } catch (IOException ignore) { + } + } + + /** + * Tests that snapshot director behaves correct for delete calls. Completed snapshots should not be deleted, + * only ongoing snapshots can. + */ + @Test + public void deleteIfNotCompeltedSnapshot() throws Exception { + File folderRoot = temporaryFolder.getRoot(); + File folderA = new File(folderRoot, String.valueOf(UUID.randomUUID())); + File folderB = new File(folderA, String.valueOf(UUID.randomUUID())); + Assert.assertTrue(folderB.mkdirs()); + File file = new File(folderA, "test.txt"); + Assert.assertTrue(file.createNewFile()); + Path folderAPath = new Path(folderA.toURI()); + SnapshotDirectory snapshotDirectory = SnapshotDirectory.permanent(folderAPath); + Assert.assertTrue(snapshotDirectory.cleanup()); + Assert.assertFalse(folderA.isDirectory()); + Assert.assertTrue(folderA.mkdirs()); + Assert.assertTrue(file.createNewFile()); + snapshotDirectory = SnapshotDirectory.permanent(folderAPath); + snapshotDirectory.completeSnapshotAndGetHandle(); + Assert.assertTrue(snapshotDirectory.cleanup()); + Assert.assertTrue(folderA.isDirectory()); + Assert.assertTrue(file.exists()); + } + + /** + * This test checks that completing or deleting the snapshot influence the #isSnapshotOngoing() flag. + */ + @Test + public void isSnapshotOngoing() throws Exception { + File folderRoot = temporaryFolder.getRoot(); + File folderA = new File(folderRoot, String.valueOf(UUID.randomUUID())); + Assert.assertTrue(folderA.mkdirs()); + Path pathA = new Path(folderA.toURI()); + SnapshotDirectory snapshotDirectory = SnapshotDirectory.permanent(pathA); + Assert.assertFalse(snapshotDirectory.isSnapshotCompleted()); + Assert.assertNotNull(snapshotDirectory.completeSnapshotAndGetHandle()); + Assert.assertTrue(snapshotDirectory.isSnapshotCompleted()); + snapshotDirectory = SnapshotDirectory.permanent(pathA); + Assert.assertFalse(snapshotDirectory.isSnapshotCompleted()); + snapshotDirectory.cleanup(); + Assert.assertFalse(snapshotDirectory.isSnapshotCompleted()); + } + + /** + * Tests that temporary directories have the right behavior on completion and deletion. + */ + @Test + public void temporary() throws Exception { + File folderRoot = temporaryFolder.getRoot(); + File folder = new File(folderRoot, String.valueOf(UUID.randomUUID())); + Assert.assertTrue(folder.mkdirs()); + Path folderPath = new Path(folder.toURI()); + SnapshotDirectory tmpSnapshotDirectory = SnapshotDirectory.temporary(folderPath); + // temporary snapshot directories should not return a handle, because they will be deleted. + Assert.assertNull(tmpSnapshotDirectory.completeSnapshotAndGetHandle()); + // check that the directory is deleted even after we called #completeSnapshotAndGetHandle. + Assert.assertTrue(tmpSnapshotDirectory.cleanup()); + Assert.assertFalse(folder.exists()); + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotResultTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotResultTest.java new file mode 100644 index 0000000..63a9340 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SnapshotResultTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.util.TestLogger; +import org.junit.Assert; +import org.junit.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +public class SnapshotResultTest extends TestLogger { + + @Test + public void discardState() throws Exception { + SnapshotResult<StateObject> result = SnapshotResult.withLocalState(mock(StateObject.class), mock(StateObject.class)); + result.discardState(); + verify(result.getJobManagerOwnedSnapshot()).discardState(); + verify(result.getTaskLocalSnapshot()).discardState(); + } + + @Test + public void getStateSize() { + long size = 42L; + + SnapshotResult<StateObject> result = SnapshotResult.withLocalState( + new DummyStateObject(size), + new DummyStateObject(size)); + Assert.assertEquals(size, result.getStateSize()); + } + + static class DummyStateObject implements StateObject { + + private static final long serialVersionUID = 1L; + + private final long size; + + DummyStateObject(long size) { + this.size = size; + } + + @Override + public void discardState() { + } + + @Override + public long getStateSize() { + return size; + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 7838450..11ae389 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -18,6 +18,10 @@ package org.apache.flink.runtime.state; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import org.apache.commons.io.output.ByteArrayOutputStream; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.FoldFunction; @@ -53,6 +57,7 @@ import org.apache.flink.queryablestate.KvStateID; import org.apache.flink.queryablestate.client.state.serialization.KvStateSerializer; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.StateAssignmentOperation; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; @@ -69,15 +74,9 @@ import org.apache.flink.runtime.state.internal.InternalValueState; import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; import org.apache.flink.shaded.guava18.com.google.common.base.Joiner; import org.apache.flink.types.IntValue; -import org.apache.flink.util.FutureUtil; import org.apache.flink.util.IOUtils; import org.apache.flink.util.StateMigrationException; import org.apache.flink.util.TestLogger; - -import com.esotericsoftware.kryo.Kryo; -import com.esotericsoftware.kryo.io.Input; -import com.esotericsoftware.kryo.io.Output; -import org.apache.commons.io.output.ByteArrayOutputStream; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -141,7 +140,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten } protected <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer) throws Exception { - return createKeyedBackend(keySerializer, new DummyEnvironment("test", 1, 0)); + return createKeyedBackend(keySerializer, new DummyEnvironment()); } protected <K> AbstractKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer, Environment env) throws Exception { @@ -173,7 +172,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten } protected <K> AbstractKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, KeyedStateHandle state) throws Exception { - return restoreKeyedBackend(keySerializer, state, new DummyEnvironment("test", 1, 0)); + return restoreKeyedBackend(keySerializer, state, new DummyEnvironment()); } protected <K> AbstractKeyedStateBackend<K> restoreKeyedBackend( @@ -204,7 +203,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten keyGroupRange, env.getTaskKvStateRegistry()); - backend.restore(state); + backend.restore(new StateObjectCollection<>(state)); return backend; } @@ -274,7 +273,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten @SuppressWarnings("unchecked") public void testBackendUsesRegisteredKryoDefaultSerializer() throws Exception { CheckpointStreamFactory streamFactory = createStreamFactory(); - Environment env = new DummyEnvironment("test", 1, 0); + Environment env = new DummyEnvironment(); AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); // cast because our test serializer is not typed to TestPojo @@ -329,7 +328,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten @SuppressWarnings("unchecked") public void testBackendUsesRegisteredKryoDefaultSerializerUsingGetOrCreate() throws Exception { CheckpointStreamFactory streamFactory = createStreamFactory(); - Environment env = new DummyEnvironment("test", 1, 0); + Environment env = new DummyEnvironment(); AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); // cast because our test serializer is not typed to TestPojo @@ -388,7 +387,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten @Test public void testBackendUsesRegisteredKryoSerializer() throws Exception { CheckpointStreamFactory streamFactory = createStreamFactory(); - Environment env = new DummyEnvironment("test", 1, 0); + Environment env = new DummyEnvironment(); AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); env.getExecutionConfig() @@ -443,7 +442,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten @SuppressWarnings("unchecked") public void testBackendUsesRegisteredKryoSerializerUsingGetOrCreate() throws Exception { CheckpointStreamFactory streamFactory = createStreamFactory(); - Environment env = new DummyEnvironment("test", 1, 0); + Environment env = new DummyEnvironment(); AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); env.getExecutionConfig().registerTypeWithKryoSerializer(TestPojo.class, ExceptionThrowingTestSerializer.class); @@ -507,7 +506,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten @Test public void testKryoRegisteringRestoreResilienceWithRegisteredType() throws Exception { CheckpointStreamFactory streamFactory = createStreamFactory(); - Environment env = new DummyEnvironment("test", 1, 0); + Environment env = new DummyEnvironment(); AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE, env); TypeInformation<TestPojo> pojoType = new GenericTypeInfo<>(TestPojo.class); @@ -569,7 +568,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten public void testKryoRegisteringRestoreResilienceWithDefaultSerializer() throws Exception { CheckpointStreamFactory streamFactory = createStreamFactory(); SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); - Environment env = new DummyEnvironment("test", 1, 0); + Environment env = new DummyEnvironment(); AbstractKeyedStateBackend<Integer> backend = null; try { @@ -671,7 +670,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten public void testKryoRegisteringRestoreResilienceWithRegisteredSerializer() throws Exception { CheckpointStreamFactory streamFactory = createStreamFactory(); SharedStateRegistry sharedStateRegistry = new SharedStateRegistry(); - Environment env = new DummyEnvironment("test", 1, 0); + Environment env = new DummyEnvironment(); AbstractKeyedStateBackend<Integer> backend = null; @@ -761,7 +760,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten @Test public void testKryoRestoreResilienceWithDifferentRegistrationOrder() throws Exception { CheckpointStreamFactory streamFactory = createStreamFactory(); - Environment env = new DummyEnvironment("test", 1, 0); + Environment env = new DummyEnvironment(); // register A first then B env.getExecutionConfig().registerKryoType(TestNestedPojoClassA.class); @@ -796,7 +795,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten // ========== restore snapshot, with a different registration order in the configuration ========== - env = new DummyEnvironment("test", 1, 0); + env = new DummyEnvironment(); env.getExecutionConfig().registerKryoType(TestNestedPojoClassB.class); // this time register B first env.getExecutionConfig().registerKryoType(TestNestedPojoClassA.class); @@ -828,7 +827,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten @Test public void testPojoRestoreResilienceWithDifferentRegistrationOrder() throws Exception { CheckpointStreamFactory streamFactory = createStreamFactory(); - Environment env = new DummyEnvironment("test", 1, 0); + Environment env = new DummyEnvironment(); // register A first then B env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class); @@ -863,7 +862,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten // ========== restore snapshot, with a different registration order in the configuration ========== - env = new DummyEnvironment("test", 1, 0); + env = new DummyEnvironment(); env.getExecutionConfig().registerPojoType(TestNestedPojoClassB.class); // this time register B first env.getExecutionConfig().registerPojoType(TestNestedPojoClassA.class); @@ -924,7 +923,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertEquals("1", getSerializedValue(kvState, 1, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); // draw a snapshot - KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); + KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); // make some more modifications backend.setCurrentKey(1); @@ -935,7 +934,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.update("u3"); // draw another snapshot - KeyedStateHandle snapshot2 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); + KeyedStateHandle snapshot2 = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); // validate the original state backend.setCurrentKey(1); @@ -1110,7 +1109,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten IntSerializer.INSTANCE, 1, new KeyGroupRange(0, 0), - new DummyEnvironment("test_op", 1, 0)); + new DummyEnvironment()); ValueStateDescriptor<String> desc1 = new ValueStateDescriptor<>("a-string", StringSerializer.INSTANCE); ValueStateDescriptor<Integer> desc2 = new ValueStateDescriptor<>("an-integer", IntSerializer.INSTANCE); @@ -1134,7 +1133,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertEquals(13, (int) state2.value()); // draw a snapshot - KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); + KeyedStateHandle snapshot1 = + runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); backend.dispose(); backend = restoreKeyedBackend( @@ -1142,7 +1142,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten 1, new KeyGroupRange(0, 0), Collections.singletonList(snapshot1), - new DummyEnvironment("test_op", 1, 0)); + new DummyEnvironment()); snapshot1.discardState(); @@ -1206,7 +1206,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten assertEquals(42L, (long) state.value()); // draw a snapshot - KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); + KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); backend.dispose(); backend = restoreKeyedBackend(IntSerializer.INSTANCE, snapshot1); @@ -1934,7 +1934,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten final AggregatingStateDescriptor<Long, MutableLong, Long> stateDescr = new AggregatingStateDescriptor<>("my-state", new MutableAggregatingAddingFunction(), MutableLong.class); - + AbstractKeyedStateBackend<String> keyedBackend = createKeyedBackend(StringSerializer.INSTANCE); try { @@ -2700,7 +2700,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten IntSerializer.INSTANCE, MAX_PARALLELISM, new KeyGroupRange(0, MAX_PARALLELISM - 1), - new DummyEnvironment("test", 1, 0)); + new DummyEnvironment()); ValueStateDescriptor<String> kvId = new ValueStateDescriptor<>("id", String.class); @@ -2727,7 +2727,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.update("ShouldBeInSecondHalf"); - KeyedStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(0, 0, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); + KeyedStateHandle snapshot = runSnapshot(backend.snapshot(0, 0, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); List<KeyedStateHandle> firstHalfKeyGroupStates = StateAssignmentOperation.getKeyedStateHandles( Collections.singletonList(snapshot), @@ -2745,7 +2745,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten MAX_PARALLELISM, new KeyGroupRange(0, 4), firstHalfKeyGroupStates, - new DummyEnvironment("test", 1, 0)); + new DummyEnvironment()); // backend for the second half of the key group range final AbstractKeyedStateBackend<Integer> secondHalfBackend = restoreKeyedBackend( @@ -2753,7 +2753,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten MAX_PARALLELISM, new KeyGroupRange(5, 9), secondHalfKeyGroupStates, - new DummyEnvironment("test", 1, 0)); + new DummyEnvironment()); ValueState<String> firstHalfState = firstHalfBackend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId); @@ -2794,7 +2794,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.update("2"); // draw a snapshot - KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); + KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); backend.dispose(); @@ -2825,7 +2825,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.update("2"); // draw a snapshot - KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); + KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); backend.dispose(); // restore the first snapshot and validate it @@ -2868,7 +2868,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.add("2"); // draw a snapshot - KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); + KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); backend.dispose(); // restore the first snapshot and validate it @@ -2913,7 +2913,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.add("2"); // draw a snapshot - KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); + KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); backend.dispose(); // restore the first snapshot and validate it @@ -2956,7 +2956,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten state.put("2", "Second"); // draw a snapshot - KeyedStateHandle snapshot1 = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); + KeyedStateHandle snapshot1 = runSnapshot(backend.snapshot(682375462378L, 2, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); backend.dispose(); // restore the first snapshot and validate it @@ -3050,7 +3050,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten IntSerializer.INSTANCE, numberOfKeyGroups, new KeyGroupRange(0, 0), - new DummyEnvironment("test_op", 1, 0)); + new DummyEnvironment()); { // ValueState @@ -3193,7 +3193,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten */ @Test public void testQueryableStateRegistration() throws Exception { - DummyEnvironment env = new DummyEnvironment("test", 1, 0); + DummyEnvironment env = new DummyEnvironment(); KvStateRegistry registry = env.getKvStateRegistry(); CheckpointStreamFactory streamFactory = createStreamFactory(); @@ -3215,7 +3215,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten eq(env.getJobID()), eq(env.getJobVertexId()), eq(expectedKeyGroupRange), eq("banana"), any(KvStateID.class)); - KeyedStateHandle snapshot = FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); + KeyedStateHandle snapshot = runSnapshot(backend.snapshot(682375462379L, 4, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); backend.dispose(); @@ -3247,7 +3247,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten // draw a snapshot KeyedStateHandle snapshot = - FutureUtil.runIfNotDoneAndGet(backend.snapshot(682375462379L, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); + runSnapshot(backend.snapshot(682375462379L, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation())); assertNull(snapshot); backend.dispose(); @@ -3335,7 +3335,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten valueState.update(i); } - RunnableFuture<KeyedStateHandle> snapshot1 = + RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot1 = backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); Thread runner1 = new Thread(snapshot1, "snapshot-1-runner"); @@ -3353,7 +3353,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten streamFactory.setWaiterLatch(null); streamFactory.setBlockerLatch(null); - RunnableFuture<KeyedStateHandle> snapshot2 = + RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot2 = backend.snapshot(1L, 1L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); Thread runner2 = new Thread(snapshot2,"snapshot-2-runner"); @@ -3392,7 +3392,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten valueState.update(i); } - RunnableFuture<KeyedStateHandle> snapshot = + RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); Thread runner = new Thread(snapshot); runner.start(); @@ -3405,7 +3405,8 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten } runner.join(); - stateHandle = snapshot.get(); + SnapshotResult<KeyedStateHandle> snapshotResult = snapshot.get(); + stateHandle = snapshotResult.getJobManagerOwnedSnapshot(); // test isolation for (int i = 0; i < 20; ++i) { @@ -3476,7 +3477,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten valueState.update(i); } - RunnableFuture<KeyedStateHandle> snapshot = + RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = backend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); Thread runner = new Thread(snapshot); @@ -3588,12 +3589,15 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> exten } } - protected KeyedStateHandle runSnapshot(RunnableFuture<KeyedStateHandle> snapshotRunnableFuture) throws Exception { - if(!snapshotRunnableFuture.isDone()) { - Thread runner = new Thread(snapshotRunnableFuture); - runner.start(); + protected KeyedStateHandle runSnapshot( + RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture) throws Exception { + + if (!snapshotRunnableFuture.isDone()) { + snapshotRunnableFuture.run(); } - return snapshotRunnableFuture.get(); + + SnapshotResult<KeyedStateHandle> snapshotResult = snapshotRunnableFuture.get(); + return snapshotResult.getJobManagerOwnedSnapshot(); } public static class TestPojo implements Serializable { http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java index a7a4b9a..7d903cc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateSnapshotCompressionTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeutils.base.StringSerializer; import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; import org.apache.flink.runtime.query.TaskKvStateRegistry; import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend; import org.apache.flink.runtime.state.internal.InternalValueState; @@ -32,7 +33,6 @@ import org.apache.commons.io.IOUtils; import org.junit.Assert; import org.junit.Test; -import java.util.Collections; import java.util.concurrent.RunnableFuture; import static org.mockito.Mockito.mock; @@ -40,7 +40,7 @@ import static org.mockito.Mockito.mock; public class StateSnapshotCompressionTest extends TestLogger { @Test - public void testCompressionConfiguration() throws Exception { + public void testCompressionConfiguration() { ExecutionConfig executionConfig = new ExecutionConfig(); executionConfig.setUseSnapshotCompression(true); @@ -52,7 +52,8 @@ public class StateSnapshotCompressionTest extends TestLogger { 16, new KeyGroupRange(0, 15), true, - executionConfig); + executionConfig, + TestLocalRecoveryConfig.disabled()); try { Assert.assertTrue( @@ -73,7 +74,8 @@ public class StateSnapshotCompressionTest extends TestLogger { 16, new KeyGroupRange(0, 15), true, - executionConfig); + executionConfig, + TestLocalRecoveryConfig.disabled()); try { Assert.assertTrue( @@ -112,7 +114,8 @@ public class StateSnapshotCompressionTest extends TestLogger { 16, new KeyGroupRange(0, 15), true, - executionConfig); + executionConfig, + TestLocalRecoveryConfig.disabled()); try { @@ -134,10 +137,11 @@ public class StateSnapshotCompressionTest extends TestLogger { state.setCurrentNamespace(VoidNamespace.INSTANCE); state.update("45"); CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4 * 1024 * 1024); - RunnableFuture<KeyedStateHandle> snapshot = + RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshot = stateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()); snapshot.run(); - stateHandle = snapshot.get(); + SnapshotResult<KeyedStateHandle> snapshotResult = snapshot.get(); + stateHandle = snapshotResult.getJobManagerOwnedSnapshot(); } finally { IOUtils.closeQuietly(stateBackend); @@ -153,10 +157,11 @@ public class StateSnapshotCompressionTest extends TestLogger { 16, new KeyGroupRange(0, 15), true, - executionConfig); + executionConfig, + TestLocalRecoveryConfig.disabled()); try { - stateBackend.restore(Collections.singletonList(stateHandle)); + stateBackend.restore(StateObjectCollection.singleton(stateHandle)); InternalValueState<VoidNamespace, String> state = stateBackend.createValueState( new VoidNamespaceSerializer(), http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java new file mode 100644 index 0000000..2e9b107 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.taskexecutor.TaskManagerServices; +import org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.Assert; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.net.InetAddress; + +public class TaskExecutorLocalStateStoresManagerTest extends TestLogger { + + @ClassRule + public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private static final long MEM_SIZE_PARAM = 128L*1024*1024; + + /** + * This tests that the creation of {@link TaskManagerServices} correctly creates the local state root directory + * for the {@link TaskExecutorLocalStateStoresManager} with the configured root directory. + */ + @Test + public void testCreationFromConfig() throws Exception { + + final Configuration config = new Configuration(); + + File newFolder = temporaryFolder.newFolder(); + String tmpDir = newFolder.getAbsolutePath() + File.separator; + final String rootDirString = "__localStateRoot1,__localStateRoot2,__localStateRoot3".replaceAll("__", tmpDir); + + // test configuration of the local state directories + config.setString(CheckpointingOptions.LOCAL_RECOVERY_TASK_MANAGER_STATE_ROOT_DIRS, rootDirString); + + // test configuration of the local state mode + config.setString(CheckpointingOptions.LOCAL_RECOVERY, "ENABLE_FILE_BASED"); + + final ResourceID tmResourceID = ResourceID.generate(); + + TaskManagerServicesConfiguration taskManagerServicesConfiguration = + TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getLocalHost(), true); + + TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( + taskManagerServicesConfiguration, + tmResourceID, + Executors.directExecutor(), + MEM_SIZE_PARAM, + MEM_SIZE_PARAM); + + TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskManagerStateStore(); + + // verify configured directories for local state + String[] split = rootDirString.split(","); + File[] rootDirectories = taskStateManager.getLocalStateRootDirectories(); + for (int i = 0; i < split.length; ++i) { + Assert.assertEquals( + new File(split[i], TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT), + rootDirectories[i]); + } + + // verify local recovery mode + Assert.assertEquals( + LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, + taskStateManager.getLocalRecoveryMode()); + + Assert.assertEquals("localState", TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT); + for (File rootDirectory : rootDirectories) { + FileUtils.deleteFileOrDirectory(rootDirectory); + } + } + + /** + * This tests that the creation of {@link TaskManagerServices} correctly falls back to the first tmp directory of + * the IOManager as default for the local state root directory. + */ + @Test + public void testCreationFromConfigDefault() throws Exception { + + final Configuration config = new Configuration(); + + final ResourceID tmResourceID = ResourceID.generate(); + + TaskManagerServicesConfiguration taskManagerServicesConfiguration = + TaskManagerServicesConfiguration.fromConfiguration(config, InetAddress.getLocalHost(), true); + + TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration( + taskManagerServicesConfiguration, + tmResourceID, + Executors.directExecutor(), + MEM_SIZE_PARAM, + MEM_SIZE_PARAM); + + TaskExecutorLocalStateStoresManager taskStateManager = taskManagerServices.getTaskManagerStateStore(); + + String[] tmpDirPaths = taskManagerServicesConfiguration.getTmpDirPaths(); + File[] localStateRootDirectories = taskStateManager.getLocalStateRootDirectories(); + + for (int i = 0; i < tmpDirPaths.length; ++i) { + Assert.assertEquals( + new File(tmpDirPaths[i], TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT), + localStateRootDirectories[i]); + } + + Assert.assertEquals( + LocalRecoveryConfig.LocalRecoveryMode.DISABLED, + taskStateManager.getLocalRecoveryMode()); + } + + /** + * This tests that the {@link TaskExecutorLocalStateStoresManager} creates {@link TaskLocalStateStoreImpl} that have + * a properly initialized local state base directory. It also checks that subdirectories are correctly deleted on + * shutdown. + */ + @Test + public void testSubtaskStateStoreDirectoryCreateAndDelete() throws Exception { + + JobID jobID = new JobID(); + JobVertexID jobVertexID = new JobVertexID(); + AllocationID allocationID = new AllocationID(); + int subtaskIdx = 23; + + File[] rootDirs = {temporaryFolder.newFolder(), temporaryFolder.newFolder(), temporaryFolder.newFolder()}; + TaskExecutorLocalStateStoresManager storesManager = new TaskExecutorLocalStateStoresManager( + LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, + rootDirs, + Executors.directExecutor()); + + TaskLocalStateStore taskLocalStateStore = + storesManager.localStateStoreForSubtask(jobID, allocationID, jobVertexID, subtaskIdx); + + LocalRecoveryDirectoryProvider directoryProvider = + taskLocalStateStore.getLocalRecoveryConfig().getLocalStateDirectoryProvider(); + + for (int i = 0; i < 10; ++i) { + Assert.assertEquals( + new File( + rootDirs[(i & Integer.MAX_VALUE) % rootDirs.length], + storesManager.allocationSubDirString(allocationID)), + directoryProvider.allocationBaseDirectory(i)); + } + + long chkId = 42L; + File allocBaseDirChk42 = directoryProvider.allocationBaseDirectory(chkId); + File subtaskSpecificCheckpointDirectory = directoryProvider.subtaskSpecificCheckpointDirectory(chkId); + Assert.assertEquals( + new File( + allocBaseDirChk42, + "jid_" + jobID + File.separator + + "vtx_" + jobVertexID + "_" + + "sti_" + subtaskIdx + File.separator + + "chk_" + chkId), + subtaskSpecificCheckpointDirectory); + + Assert.assertTrue(subtaskSpecificCheckpointDirectory.mkdirs()); + + File testFile = new File(subtaskSpecificCheckpointDirectory, "test"); + Assert.assertTrue(testFile.createNewFile()); + + // test that local recovery mode is forwarded to the created store + Assert.assertEquals( + storesManager.getLocalRecoveryMode(), + taskLocalStateStore.getLocalRecoveryConfig().getLocalRecoveryMode()); + + Assert.assertTrue(testFile.exists()); + + // check cleanup after releasing allocation id + storesManager.releaseLocalStateForAllocationId(allocationID); + checkRootDirsClean(rootDirs); + + AllocationID otherAllocationID = new AllocationID(); + + taskLocalStateStore = + storesManager.localStateStoreForSubtask(jobID, otherAllocationID, jobVertexID, subtaskIdx); + + directoryProvider = taskLocalStateStore.getLocalRecoveryConfig().getLocalStateDirectoryProvider(); + + File chkDir = directoryProvider.subtaskSpecificCheckpointDirectory(23L); + Assert.assertTrue(chkDir.mkdirs()); + testFile = new File(chkDir, "test"); + Assert.assertTrue(testFile.createNewFile()); + + // check cleanup after shutdown + storesManager.shutdown(); + checkRootDirsClean(rootDirs); + } + + private void checkRootDirsClean(File[] rootDirs) { + for (File rootDir : rootDirs) { + File[] files = rootDir.listFiles(); + if (files != null) { + Assert.assertArrayEquals(new File[0], files); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java new file mode 100644 index 0000000..1641676 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskLocalStateStoreImplTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobgraph.OperatorID; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; + +import static org.powermock.api.mockito.PowerMockito.spy; + +public class TaskLocalStateStoreImplTest { + + private TemporaryFolder temporaryFolder; + private JobID jobID; + private AllocationID allocationID; + private JobVertexID jobVertexID; + private int subtaskIdx; + private File[] allocationBaseDirs; + private TaskLocalStateStoreImpl taskLocalStateStore; + + @Before + public void before() throws Exception { + this.temporaryFolder = new TemporaryFolder(); + this.temporaryFolder.create(); + this.jobID = new JobID(); + this.allocationID = new AllocationID(); + this.jobVertexID = new JobVertexID(); + this.subtaskIdx = 0; + this.allocationBaseDirs = new File[]{temporaryFolder.newFolder(), temporaryFolder.newFolder()}; + + LocalRecoveryDirectoryProviderImpl directoryProvider = + new LocalRecoveryDirectoryProviderImpl(allocationBaseDirs, jobID, jobVertexID, subtaskIdx); + + LocalRecoveryConfig localRecoveryConfig = + new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, directoryProvider); + + this.taskLocalStateStore = new TaskLocalStateStoreImpl( + jobID, + allocationID, + jobVertexID, + subtaskIdx, + localRecoveryConfig, + Executors.directExecutor()); + } + + @After + public void after() { + this.temporaryFolder.delete(); + } + + /** + * Test that the instance delivers a correctly configured LocalRecoveryDirectoryProvider. + */ + @Test + public void getLocalRecoveryRootDirectoryProvider() { + + LocalRecoveryConfig directoryProvider = taskLocalStateStore.getLocalRecoveryConfig(); + Assert.assertEquals( + allocationBaseDirs.length, + directoryProvider.getLocalStateDirectoryProvider().allocationBaseDirsCount()); + + for (int i = 0; i < allocationBaseDirs.length; ++i) { + Assert.assertEquals( + allocationBaseDirs[i], + directoryProvider.getLocalStateDirectoryProvider().selectAllocationBaseDirectory(i)); + } + } + + /** + * Tests basic store/retrieve of local state. + */ + @Test + public void storeAndRetrieve() throws Exception { + + final int chkCount = 3; + + for (int i = 0; i < chkCount; ++i) { + Assert.assertNull(taskLocalStateStore.retrieveLocalState(i)); + } + + List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount); + + checkStoredAsExpected(taskStateSnapshots, 0, chkCount); + + Assert.assertNull(taskLocalStateStore.retrieveLocalState(chkCount + 1)); + } + + /** + * Tests pruning of previous checkpoints if a new checkpoint is confirmed. + */ + @Test + public void confirmCheckpoint() throws Exception { + + final int chkCount = 3; + final int confirmed = chkCount - 1; + List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount); + taskLocalStateStore.confirmCheckpoint(confirmed); + checkPrunedAndDiscarded(taskStateSnapshots, 0, confirmed); + checkStoredAsExpected(taskStateSnapshots, confirmed, chkCount); + } + + /** + * Tests that disposal of a {@link TaskLocalStateStoreImpl} works and discards all local states. + */ + @Test + public void dispose() throws Exception { + final int chkCount = 3; + final int confirmed = chkCount - 1; + List<TaskStateSnapshot> taskStateSnapshots = storeStates(chkCount); + taskLocalStateStore.confirmCheckpoint(confirmed); + taskLocalStateStore.dispose(); + + checkPrunedAndDiscarded(taskStateSnapshots, 0, chkCount); + } + + private void checkStoredAsExpected(List<TaskStateSnapshot> history, int off, int len) throws Exception { + for (int i = off; i < len; ++i) { + TaskStateSnapshot expected = history.get(i); + Assert.assertTrue(expected == taskLocalStateStore.retrieveLocalState(i)); + Mockito.verify(expected, Mockito.never()).discardState(); + } + } + + private void checkPrunedAndDiscarded(List<TaskStateSnapshot> history, int off, int len) throws Exception { + for (int i = off; i < len; ++i) { + Assert.assertNull(taskLocalStateStore.retrieveLocalState(i)); + Mockito.verify(history.get(i)).discardState(); + } + } + + private List<TaskStateSnapshot> storeStates(int count) { + List<TaskStateSnapshot> taskStateSnapshots = new ArrayList<>(count); + for (int i = 0; i < count; ++i) { + OperatorID operatorID = new OperatorID(); + TaskStateSnapshot taskStateSnapshot = spy(new TaskStateSnapshot()); + OperatorSubtaskState operatorSubtaskState = new OperatorSubtaskState(); + taskStateSnapshot.putSubtaskStateByOperatorID(operatorID, operatorSubtaskState); + taskLocalStateStore.storeLocalState(i, taskStateSnapshot); + taskStateSnapshots.add(taskStateSnapshot); + } + return taskStateSnapshots; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java index 47bbebb..926c196 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java @@ -23,93 +23,221 @@ import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.PrioritizedOperatorSubtaskState; +import org.apache.flink.runtime.checkpoint.StateHandleDummyUtil; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.TestCheckpointResponder; - +import org.apache.flink.util.TestLogger; import org.junit.Assert; import org.junit.Test; +import org.junit.rules.TemporaryFolder; -import static org.mockito.Mockito.mock; +import java.io.File; +import java.io.IOException; +import java.util.Iterator; +import java.util.concurrent.Executor; -public class TaskStateManagerImplTest { +public class TaskStateManagerImplTest extends TestLogger { + /** + * Test reporting and retrieving prioritized local and remote state. + */ @Test public void testStateReportingAndRetrieving() { - JobID jobID = new JobID(42L, 43L); - ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(23L, 24L); - TestCheckpointResponder checkpointResponderMock = new TestCheckpointResponder(); + JobID jobID = new JobID(); + ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(); + + TestCheckpointResponder testCheckpointResponder = new TestCheckpointResponder(); + TestTaskLocalStateStore testTaskLocalStateStore = new TestTaskLocalStateStore(); TaskStateManager taskStateManager = taskStateManager( jobID, executionAttemptID, - checkpointResponderMock, - null); + testCheckpointResponder, + null, + testTaskLocalStateStore); //---------------------------------------- test reporting ----------------------------------------- CheckpointMetaData checkpointMetaData = new CheckpointMetaData(74L, 11L); CheckpointMetrics checkpointMetrics = new CheckpointMetrics(); - TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(); + TaskStateSnapshot jmTaskStateSnapshot = new TaskStateSnapshot(); OperatorID operatorID_1 = new OperatorID(1L, 1L); OperatorID operatorID_2 = new OperatorID(2L, 2L); OperatorID operatorID_3 = new OperatorID(3L, 3L); - Assert.assertNull(taskStateManager.operatorStates(operatorID_1)); - Assert.assertNull(taskStateManager.operatorStates(operatorID_2)); - Assert.assertNull(taskStateManager.operatorStates(operatorID_3)); + Assert.assertFalse(taskStateManager.prioritizedOperatorState(operatorID_1).isRestored()); + Assert.assertFalse(taskStateManager.prioritizedOperatorState(operatorID_2).isRestored()); + Assert.assertFalse(taskStateManager.prioritizedOperatorState(operatorID_3).isRestored()); + + KeyGroupRange keyGroupRange = new KeyGroupRange(0,1); + // Remote state of operator 1 has only managed keyed state. + OperatorSubtaskState jmOperatorSubtaskState_1 = + new OperatorSubtaskState(null, null, StateHandleDummyUtil.createNewKeyedStateHandle(keyGroupRange), null); + // Remote state of operator 1 has only raw keyed state. + OperatorSubtaskState jmOperatorSubtaskState_2 = + new OperatorSubtaskState(null, null, null, StateHandleDummyUtil.createNewKeyedStateHandle(keyGroupRange)); + + jmTaskStateSnapshot.putSubtaskStateByOperatorID(operatorID_1, jmOperatorSubtaskState_1); + jmTaskStateSnapshot.putSubtaskStateByOperatorID(operatorID_2, jmOperatorSubtaskState_2); - OperatorSubtaskState operatorSubtaskState_1 = new OperatorSubtaskState(); - OperatorSubtaskState operatorSubtaskState_2 = new OperatorSubtaskState(); + TaskStateSnapshot tmTaskStateSnapshot = new TaskStateSnapshot(); - taskStateSnapshot.putSubtaskStateByOperatorID(operatorID_1, operatorSubtaskState_1); - taskStateSnapshot.putSubtaskStateByOperatorID(operatorID_2, operatorSubtaskState_2); + // Only operator 1 has a local alternative for the managed keyed state. + OperatorSubtaskState tmOperatorSubtaskState_1 = + new OperatorSubtaskState(null, null, StateHandleDummyUtil.createNewKeyedStateHandle(keyGroupRange), null); - taskStateManager.reportStateHandles(checkpointMetaData, checkpointMetrics, taskStateSnapshot); + tmTaskStateSnapshot.putSubtaskStateByOperatorID(operatorID_1, tmOperatorSubtaskState_1); + + taskStateManager.reportTaskStateSnapshots( + checkpointMetaData, + checkpointMetrics, + jmTaskStateSnapshot, + tmTaskStateSnapshot); TestCheckpointResponder.AcknowledgeReport acknowledgeReport = - checkpointResponderMock.getAcknowledgeReports().get(0); + testCheckpointResponder.getAcknowledgeReports().get(0); + // checks that the checkpoint responder and the local state store received state as expected. Assert.assertEquals(checkpointMetaData.getCheckpointId(), acknowledgeReport.getCheckpointId()); Assert.assertEquals(checkpointMetrics, acknowledgeReport.getCheckpointMetrics()); Assert.assertEquals(executionAttemptID, acknowledgeReport.getExecutionAttemptID()); Assert.assertEquals(jobID, acknowledgeReport.getJobID()); - Assert.assertEquals(taskStateSnapshot, acknowledgeReport.getSubtaskState()); + Assert.assertEquals(jmTaskStateSnapshot, acknowledgeReport.getSubtaskState()); + Assert.assertEquals(tmTaskStateSnapshot, testTaskLocalStateStore.retrieveLocalState(checkpointMetaData.getCheckpointId())); - //---------------------------------------- test retrieving ----------------------------------------- + //-------------------------------------- test prio retrieving --------------------------------------- JobManagerTaskRestore taskRestore = new JobManagerTaskRestore( - 0L, + checkpointMetaData.getCheckpointId(), acknowledgeReport.getSubtaskState()); taskStateManager = taskStateManager( jobID, executionAttemptID, - checkpointResponderMock, - taskRestore); + testCheckpointResponder, + taskRestore, + testTaskLocalStateStore); + + // this has remote AND local managed keyed state. + PrioritizedOperatorSubtaskState prioritized_1 = taskStateManager.prioritizedOperatorState(operatorID_1); + // this has only remote raw keyed state. + PrioritizedOperatorSubtaskState prioritized_2 = taskStateManager.prioritizedOperatorState(operatorID_2); + // not restored. + PrioritizedOperatorSubtaskState prioritized_3 = taskStateManager.prioritizedOperatorState(operatorID_3); + + Assert.assertTrue(prioritized_1.isRestored()); + Assert.assertTrue(prioritized_2.isRestored()); + Assert.assertFalse(prioritized_3.isRestored()); + Assert.assertFalse(taskStateManager.prioritizedOperatorState(new OperatorID()).isRestored()); + + // checks for operator 1. + Iterator<StateObjectCollection<KeyedStateHandle>> prioritizedManagedKeyedState_1 = + prioritized_1.getPrioritizedManagedKeyedState(); + + Assert.assertTrue(prioritizedManagedKeyedState_1.hasNext()); + StateObjectCollection<KeyedStateHandle> current = prioritizedManagedKeyedState_1.next(); + KeyedStateHandle keyedStateHandleExp = tmOperatorSubtaskState_1.getManagedKeyedState().iterator().next(); + KeyedStateHandle keyedStateHandleAct = current.iterator().next(); + Assert.assertTrue(keyedStateHandleExp == keyedStateHandleAct); + Assert.assertTrue(prioritizedManagedKeyedState_1.hasNext()); + current = prioritizedManagedKeyedState_1.next(); + keyedStateHandleExp = jmOperatorSubtaskState_1.getManagedKeyedState().iterator().next(); + keyedStateHandleAct = current.iterator().next(); + Assert.assertTrue(keyedStateHandleExp == keyedStateHandleAct); + Assert.assertFalse(prioritizedManagedKeyedState_1.hasNext()); + + // checks for operator 2. + Iterator<StateObjectCollection<KeyedStateHandle>> prioritizedRawKeyedState_2 = + prioritized_2.getPrioritizedRawKeyedState(); + + Assert.assertTrue(prioritizedRawKeyedState_2.hasNext()); + current = prioritizedRawKeyedState_2.next(); + keyedStateHandleExp = jmOperatorSubtaskState_2.getRawKeyedState().iterator().next(); + keyedStateHandleAct = current.iterator().next(); + Assert.assertTrue(keyedStateHandleExp == keyedStateHandleAct); + Assert.assertFalse(prioritizedRawKeyedState_2.hasNext()); + } + + /** + * This tests if the {@link TaskStateManager} properly returns the the subtask local state dir from the + * corresponding {@link TaskLocalStateStoreImpl}. + */ + @Test + public void testForwardingSubtaskLocalStateBaseDirFromLocalStateStore() throws IOException { + JobID jobID = new JobID(42L, 43L); + AllocationID allocationID = new AllocationID(4711L, 23L); + JobVertexID jobVertexID = new JobVertexID(12L, 34L); + ExecutionAttemptID executionAttemptID = new ExecutionAttemptID(23L, 24L); + TestCheckpointResponder checkpointResponderMock = new TestCheckpointResponder(); - Assert.assertEquals(operatorSubtaskState_1, taskStateManager.operatorStates(operatorID_1)); - Assert.assertEquals(operatorSubtaskState_2, taskStateManager.operatorStates(operatorID_2)); - Assert.assertNull(taskStateManager.operatorStates(operatorID_3)); + Executor directExecutor = Executors.directExecutor(); + + TemporaryFolder tmpFolder = new TemporaryFolder(); + + try { + tmpFolder.create(); + + File[] allocBaseDirs = new File[]{tmpFolder.newFolder(), tmpFolder.newFolder(), tmpFolder.newFolder()}; + + LocalRecoveryDirectoryProviderImpl directoryProvider = + new LocalRecoveryDirectoryProviderImpl(allocBaseDirs, jobID, jobVertexID, 0); + + LocalRecoveryConfig localRecoveryConfig = + new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.ENABLE_FILE_BASED, directoryProvider); + + TaskLocalStateStore taskLocalStateStore = + new TaskLocalStateStoreImpl(jobID, allocationID, jobVertexID, 13, localRecoveryConfig, directExecutor); + + TaskStateManager taskStateManager = taskStateManager( + jobID, + executionAttemptID, + checkpointResponderMock, + null, + taskLocalStateStore); + + LocalRecoveryConfig localRecoveryConfFromTaskLocalStateStore = + taskLocalStateStore.getLocalRecoveryConfig(); + + LocalRecoveryConfig localRecoveryConfFromTaskStateManager = + taskStateManager.createLocalRecoveryConfig(); + + + for (int i = 0; i < 10; ++i) { + Assert.assertEquals(allocBaseDirs[i % allocBaseDirs.length], + localRecoveryConfFromTaskLocalStateStore.getLocalStateDirectoryProvider().allocationBaseDirectory(i)); + Assert.assertEquals(allocBaseDirs[i % allocBaseDirs.length], + localRecoveryConfFromTaskStateManager.getLocalStateDirectoryProvider().allocationBaseDirectory(i)); + } + + Assert.assertEquals( + localRecoveryConfFromTaskLocalStateStore.getLocalRecoveryMode(), + localRecoveryConfFromTaskStateManager.getLocalRecoveryMode()); + } finally { + tmpFolder.delete(); + } } public static TaskStateManager taskStateManager( JobID jobID, ExecutionAttemptID executionAttemptID, CheckpointResponder checkpointResponderMock, - JobManagerTaskRestore jobManagerTaskRestore) { - - // for now just a mock because this is not yet implemented - TaskLocalStateStore taskLocalStateStore = mock(TaskLocalStateStore.class); + JobManagerTaskRestore jobManagerTaskRestore, + TaskLocalStateStore localStateStore) { return new TaskStateManagerImpl( jobID, executionAttemptID, - taskLocalStateStore, + localStateStore, jobManagerTaskRestore, checkpointResponderMock); } http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java new file mode 100644 index 0000000..7801720 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestLocalRecoveryConfig.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import java.io.File; + +/** + * Helper methods to easily create a {@link LocalRecoveryConfig} for tests. + */ +public class TestLocalRecoveryConfig { + + private static final LocalRecoveryDirectoryProvider INSTANCE = new TestDummyLocalDirectoryProvider(); + + public static LocalRecoveryConfig disabled() { + return new LocalRecoveryConfig(LocalRecoveryConfig.LocalRecoveryMode.DISABLED, INSTANCE); + } + + public static class TestDummyLocalDirectoryProvider implements LocalRecoveryDirectoryProvider { + + private TestDummyLocalDirectoryProvider() { + } + + @Override + public File allocationBaseDirectory(long checkpointId) { + throw new UnsupportedOperationException("Test dummy"); + } + + @Override + public File subtaskBaseDirectory(long checkpointId) { + throw new UnsupportedOperationException("Test dummy"); + } + + @Override + public File subtaskSpecificCheckpointDirectory(long checkpointId) { + throw new UnsupportedOperationException("Test dummy"); + } + + @Override + public File selectAllocationBaseDirectory(int idx) { + throw new UnsupportedOperationException("Test dummy"); + } + + @Override + public File selectSubtaskBaseDirectory(int idx) { + throw new UnsupportedOperationException("Test dummy"); + } + + @Override + public int allocationBaseDirsCount() { + throw new UnsupportedOperationException("Test dummy"); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestMemoryCheckpointOutputStream.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestMemoryCheckpointOutputStream.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestMemoryCheckpointOutputStream.java index 5accc19..8ea76c6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestMemoryCheckpointOutputStream.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestMemoryCheckpointOutputStream.java @@ -20,6 +20,8 @@ package org.apache.flink.runtime.state; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; +import javax.annotation.Nullable; + import java.io.IOException; final class TestMemoryCheckpointOutputStream extends MemCheckpointStreamFactory.MemoryCheckpointOutputStream { @@ -41,9 +43,10 @@ final class TestMemoryCheckpointOutputStream extends MemCheckpointStreamFactory. return this.closed; } + @Nullable @Override public StreamStateHandle closeAndGetHandle() throws IOException { this.closed = true; return super.closeAndGetHandle(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/flink/blob/df3e6bb7/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskLocalStateStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskLocalStateStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskLocalStateStore.java new file mode 100644 index 0000000..12c07dd --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskLocalStateStore.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Iterator; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; + +/** + * Test implementation of a {@link TaskLocalStateStore}. + */ +public class TestTaskLocalStateStore implements TaskLocalStateStore { + + private final SortedMap<Long, TaskStateSnapshot> taskStateSnapshotsByCheckpointID; + + private final LocalRecoveryConfig localRecoveryConfig; + + private boolean disposed; + + public TestTaskLocalStateStore() { + this(TestLocalRecoveryConfig.disabled()); + } + + public TestTaskLocalStateStore(@Nonnull LocalRecoveryConfig localRecoveryConfig) { + this.localRecoveryConfig = localRecoveryConfig; + this.taskStateSnapshotsByCheckpointID = new TreeMap<>(); + this.disposed = false; + } + + @Override + public void storeLocalState(long checkpointId, @Nullable TaskStateSnapshot localState) { + Preconditions.checkState(!disposed); + taskStateSnapshotsByCheckpointID.put(checkpointId, localState); + } + + @Nullable + @Override + public TaskStateSnapshot retrieveLocalState(long checkpointID) { + Preconditions.checkState(!disposed); + return taskStateSnapshotsByCheckpointID.get(checkpointID); + } + + public void dispose() { + if (!disposed) { + disposed = true; + for (TaskStateSnapshot stateSnapshot : taskStateSnapshotsByCheckpointID.values()) { + try { + stateSnapshot.discardState(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + taskStateSnapshotsByCheckpointID.clear(); + } + } + + @Nonnull + @Override + public LocalRecoveryConfig getLocalRecoveryConfig() { + Preconditions.checkState(!disposed); + return Preconditions.checkNotNull(localRecoveryConfig); + } + + @Override + public void confirmCheckpoint(long confirmedCheckpointId) { + Preconditions.checkState(!disposed); + Iterator<Map.Entry<Long, TaskStateSnapshot>> iterator = taskStateSnapshotsByCheckpointID.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<Long, TaskStateSnapshot> entry = iterator.next(); + if (entry.getKey() < confirmedCheckpointId) { + iterator.remove(); + try { + entry.getValue().discardState(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } else { + break; + } + } + } + + public boolean isDisposed() { + return disposed; + } + + public SortedMap<Long, TaskStateSnapshot> getTaskStateSnapshotsByCheckpointID() { + return taskStateSnapshotsByCheckpointID; + } +}