[FLINK-5892] Add tests for topology modifications This closes #3770.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c68085f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c68085f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c68085f Branch: refs/heads/table-retraction Commit: 2c68085f658873c2d5836fbad6b82be76a79f0f9 Parents: f7980a7 Author: guowei.mgw <guowei....@gmail.com> Authored: Fri Apr 28 19:40:58 2017 +0200 Committer: zentol <ches...@apache.org> Committed: Fri Apr 28 20:11:39 2017 +0200 ---------------------------------------------------------------------- .../checkpoint/CheckpointCoordinatorTest.java | 292 +++++++++++++++++++ 1 file changed, 292 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2c68085f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index 0d2e903..41b0e35 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -2370,6 +2370,22 @@ public class CheckpointCoordinatorTest { testRestoreLatestCheckpointedStateWithChangingParallelism(false); } + @Test + public void testStateRecoveryWhenTopologyChangeOut() throws Exception { + testStateRecoveryWithTopologyChange(0); + } + + @Test + public void testStateRecoveryWhenTopologyChangeIn() throws Exception { + testStateRecoveryWithTopologyChange(1); + } + + @Test + public void testStateRecoveryWhenTopologyChange() throws Exception { + testStateRecoveryWithTopologyChange(2); + } + + /** * Tests the checkpoint restoration with changing parallelism of job vertex with partitioned * state. @@ -2530,6 +2546,282 @@ public class CheckpointCoordinatorTest { comparePartitionableState(expectedOpStatesRaw, actualOpStatesRaw); } + private static Tuple2<JobVertexID, OperatorID> generateIDPair() { + JobVertexID jobVertexID = new JobVertexID(); + OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID); + return new Tuple2<>(jobVertexID, operatorID); + } + + /** + * old topology + * [operator1,operator2] * parallelism1 -> [operator3,operator4] * parallelism2 + * + * + * new topology + * + * [operator5,operator1,operator3] * newParallelism1 -> [operator3, operator6] * newParallelism2 + * + * scaleType: + * 0 increase parallelism + * 1 decrease parallelism + * 2 same parallelism + */ + public void testStateRecoveryWithTopologyChange(int scaleType) throws Exception { + + /** + * Old topology + * CHAIN(op1 -> op2) * parallelism1 -> CHAIN(op3 -> op4) * parallelism2 + */ + Tuple2<JobVertexID, OperatorID> id1 = generateIDPair(); + Tuple2<JobVertexID, OperatorID> id2 = generateIDPair(); + int parallelism1 = 10; + int maxParallelism1 = 64; + + Tuple2<JobVertexID, OperatorID> id3 = generateIDPair(); + Tuple2<JobVertexID, OperatorID> id4 = generateIDPair(); + int parallelism2 = 10; + int maxParallelism2 = 64; + + List<KeyGroupRange> keyGroupPartitions2 = + StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, parallelism2); + + Map<OperatorID, OperatorState> operatorStates = new HashMap<>(); + + //prepare vertex1 state + for (Tuple2<JobVertexID, OperatorID> id : Lists.newArrayList(id1, id2)) { + OperatorState taskState = new OperatorState(id.f1, parallelism1, maxParallelism1); + operatorStates.put(id.f1, taskState); + for (int index = 0; index < taskState.getParallelism(); index++) { + StreamStateHandle subNonPartitionedState = + generateStateForVertex(id.f0, index) + .get(0); + OperatorStateHandle subManagedOperatorState = + generateChainedPartitionableStateHandle(id.f0, index, 2, 8, false) + .get(0); + OperatorStateHandle subRawOperatorState = + generateChainedPartitionableStateHandle(id.f0, index, 2, 8, true) + .get(0); + + OperatorSubtaskState subtaskState = new OperatorSubtaskState(subNonPartitionedState, + subManagedOperatorState, + subRawOperatorState, + null, + null); + taskState.putState(index, subtaskState); + } + } + + List<List<ChainedStateHandle<OperatorStateHandle>>> expectedManagedOperatorStates = new ArrayList<>(); + List<List<ChainedStateHandle<OperatorStateHandle>>> expectedRawOperatorStates = new ArrayList<>(); + //prepare vertex2 state + for (Tuple2<JobVertexID, OperatorID> id : Lists.newArrayList(id3, id4)) { + OperatorState operatorState = new OperatorState(id.f1, parallelism2, maxParallelism2); + operatorStates.put(id.f1, operatorState); + List<ChainedStateHandle<OperatorStateHandle>> expectedManagedOperatorState = new ArrayList<>(); + List<ChainedStateHandle<OperatorStateHandle>> expectedRawOperatorState = new ArrayList<>(); + expectedManagedOperatorStates.add(expectedManagedOperatorState); + expectedRawOperatorStates.add(expectedRawOperatorState); + + for (int index = 0; index < operatorState.getParallelism(); index++) { + OperatorStateHandle subManagedOperatorState = + generateChainedPartitionableStateHandle(id.f0, index, 2, 8, false) + .get(0); + OperatorStateHandle subRawOperatorState = + generateChainedPartitionableStateHandle(id.f0, index, 2, 8, true) + .get(0); + KeyGroupsStateHandle subManagedKeyedState = id.f0.equals(id3.f0) + ? generateKeyGroupState(id.f0, keyGroupPartitions2.get(index), false) + : null; + KeyGroupsStateHandle subRawKeyedState = id.f0.equals(id3.f0) + ? generateKeyGroupState(id.f0, keyGroupPartitions2.get(index), true) + : null; + + expectedManagedOperatorState.add(ChainedStateHandle.wrapSingleHandle(subManagedOperatorState)); + expectedRawOperatorState.add(ChainedStateHandle.wrapSingleHandle(subRawOperatorState)); + + OperatorSubtaskState subtaskState = new OperatorSubtaskState( + null, + subManagedOperatorState, + subRawOperatorState, + subManagedKeyedState, + subRawKeyedState); + operatorState.putState(index, subtaskState); + } + } + + /** + * New topology + * CHAIN(op5 -> op1 -> op2) * newParallelism1 -> CHAIN(op3 -> op6) * newParallelism2 + */ + Tuple2<JobVertexID, OperatorID> id5 = generateIDPair(); + int newParallelism1 = 10; + + Tuple2<JobVertexID, OperatorID> id6 = generateIDPair(); + int newParallelism2 = parallelism2; + + if (scaleType == 0) { + newParallelism2 = 20; + } else if (scaleType == 1) { + newParallelism2 = 8; + } + + List<KeyGroupRange> newKeyGroupPartitions2 = + StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, newParallelism2); + + final ExecutionJobVertex newJobVertex1 = mockExecutionJobVertex( + id5.f0, + Lists.newArrayList(id2.f1, id1.f1, id5.f1), + newParallelism1, + maxParallelism1); + + final ExecutionJobVertex newJobVertex2 = mockExecutionJobVertex( + id3.f0, + Lists.newArrayList(id6.f1, id3.f1), + newParallelism2, + maxParallelism2); + + Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>(); + + tasks.put(id5.f0, newJobVertex1); + tasks.put(id3.f0, newJobVertex2); + + JobID jobID = new JobID(); + StandaloneCompletedCheckpointStore standaloneCompletedCheckpointStore = + spy(new StandaloneCompletedCheckpointStore(1)); + + CompletedCheckpoint completedCheckpoint = new CompletedCheckpoint( + jobID, + 2, + System.currentTimeMillis(), + System.currentTimeMillis() + 3000, + operatorStates, + Collections.<MasterState>emptyList(), + CheckpointProperties.forStandardCheckpoint(), + null, + null); + + when(standaloneCompletedCheckpointStore.getLatestCheckpoint()).thenReturn(completedCheckpoint); + + // set up the coordinator and validate the initial state + CheckpointCoordinator coord = new CheckpointCoordinator( + new JobID(), + 600000, + 600000, + 0, + Integer.MAX_VALUE, + ExternalizedCheckpointSettings.none(), + newJobVertex1.getTaskVertices(), + newJobVertex1.getTaskVertices(), + newJobVertex1.getTaskVertices(), + new StandaloneCheckpointIDCounter(), + standaloneCompletedCheckpointStore, + null, + Executors.directExecutor()); + + coord.restoreLatestCheckpointedState(tasks, false, true); + + for (int i = 0; i < newJobVertex1.getParallelism(); i++) { + + TaskStateHandles taskStateHandles = newJobVertex1.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskStateHandles(); + ChainedStateHandle<StreamStateHandle> actualSubNonPartitionedState = taskStateHandles.getLegacyOperatorState(); + List<Collection<OperatorStateHandle>> actualSubManagedOperatorState = taskStateHandles.getManagedOperatorState(); + List<Collection<OperatorStateHandle>> actualSubRawOperatorState = taskStateHandles.getRawOperatorState(); + + assertNull(taskStateHandles.getManagedKeyedState()); + assertNull(taskStateHandles.getRawKeyedState()); + + // operator5 + { + int operatorIndexInChain = 2; + assertNull(actualSubNonPartitionedState.get(operatorIndexInChain)); + assertNull(actualSubManagedOperatorState.get(operatorIndexInChain)); + assertNull(actualSubRawOperatorState.get(operatorIndexInChain)); + } + // operator1 + { + int operatorIndexInChain = 1; + ChainedStateHandle<StreamStateHandle> expectSubNonPartitionedState = generateStateForVertex(id1.f0, i); + ChainedStateHandle<OperatorStateHandle> expectedManagedOpState = generateChainedPartitionableStateHandle( + id1.f0, i, 2, 8, false); + ChainedStateHandle<OperatorStateHandle> expectedRawOpState = generateChainedPartitionableStateHandle( + id1.f0, i, 2, 8, true); + + assertTrue(CommonTestUtils.isSteamContentEqual( + expectSubNonPartitionedState.get(0).openInputStream(), + actualSubNonPartitionedState.get(operatorIndexInChain).openInputStream())); + + assertTrue(CommonTestUtils.isSteamContentEqual(expectedManagedOpState.get(0).openInputStream(), + actualSubManagedOperatorState.get(operatorIndexInChain).iterator().next().openInputStream())); + + assertTrue(CommonTestUtils.isSteamContentEqual(expectedRawOpState.get(0).openInputStream(), + actualSubRawOperatorState.get(operatorIndexInChain).iterator().next().openInputStream())); + } + // operator2 + { + int operatorIndexInChain = 0; + ChainedStateHandle<StreamStateHandle> expectSubNonPartitionedState = generateStateForVertex(id2.f0, i); + ChainedStateHandle<OperatorStateHandle> expectedManagedOpState = generateChainedPartitionableStateHandle( + id2.f0, i, 2, 8, false); + ChainedStateHandle<OperatorStateHandle> expectedRawOpState = generateChainedPartitionableStateHandle( + id2.f0, i, 2, 8, true); + + assertTrue(CommonTestUtils.isSteamContentEqual(expectSubNonPartitionedState.get(0).openInputStream(), + actualSubNonPartitionedState.get(operatorIndexInChain).openInputStream())); + + assertTrue(CommonTestUtils.isSteamContentEqual(expectedManagedOpState.get(0).openInputStream(), + actualSubManagedOperatorState.get(operatorIndexInChain).iterator().next().openInputStream())); + + assertTrue(CommonTestUtils.isSteamContentEqual(expectedRawOpState.get(0).openInputStream(), + actualSubRawOperatorState.get(operatorIndexInChain).iterator().next().openInputStream())); + } + } + + List<List<Collection<OperatorStateHandle>>> actualManagedOperatorStates = new ArrayList<>(newJobVertex2.getParallelism()); + List<List<Collection<OperatorStateHandle>>> actualRawOperatorStates = new ArrayList<>(newJobVertex2.getParallelism()); + + for (int i = 0; i < newJobVertex2.getParallelism(); i++) { + TaskStateHandles taskStateHandles = newJobVertex2.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskStateHandles(); + + // operator 3 + { + int operatorIndexInChain = 1; + List<Collection<OperatorStateHandle>> actualSubManagedOperatorState = new ArrayList<>(1); + actualSubManagedOperatorState.add(taskStateHandles.getManagedOperatorState().get(operatorIndexInChain)); + + List<Collection<OperatorStateHandle>> actualSubRawOperatorState = new ArrayList<>(1); + actualSubRawOperatorState.add(taskStateHandles.getRawOperatorState().get(operatorIndexInChain)); + + actualManagedOperatorStates.add(actualSubManagedOperatorState); + actualRawOperatorStates.add(actualSubRawOperatorState); + + assertNull(taskStateHandles.getLegacyOperatorState().get(operatorIndexInChain)); + } + + // operator 6 + { + int operatorIndexInChain = 0; + assertNull(taskStateHandles.getManagedOperatorState().get(operatorIndexInChain)); + assertNull(taskStateHandles.getRawOperatorState().get(operatorIndexInChain)); + assertNull(taskStateHandles.getLegacyOperatorState().get(operatorIndexInChain)); + + } + + KeyGroupsStateHandle originalKeyedStateBackend = generateKeyGroupState(id3.f0, newKeyGroupPartitions2.get(i), false); + KeyGroupsStateHandle originalKeyedStateRaw = generateKeyGroupState(id3.f0, newKeyGroupPartitions2.get(i), true); + + + Collection<KeyedStateHandle> keyedStateBackend = taskStateHandles.getManagedKeyedState(); + Collection<KeyedStateHandle> keyGroupStateRaw = taskStateHandles.getRawKeyedState(); + + + compareKeyedState(Collections.singletonList(originalKeyedStateBackend), keyedStateBackend); + compareKeyedState(Collections.singletonList(originalKeyedStateRaw), keyGroupStateRaw); + } + + comparePartitionableState(expectedManagedOperatorStates.get(0), actualManagedOperatorStates); + comparePartitionableState(expectedRawOperatorStates.get(0), actualRawOperatorStates); + } + /** * Tests that the externalized checkpoint configuration is respected. */