[FLINK-5892] Add tests for topology modifications

This closes #3770.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2c68085f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2c68085f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2c68085f

Branch: refs/heads/table-retraction
Commit: 2c68085f658873c2d5836fbad6b82be76a79f0f9
Parents: f7980a7
Author: guowei.mgw <guowei....@gmail.com>
Authored: Fri Apr 28 19:40:58 2017 +0200
Committer: zentol <ches...@apache.org>
Committed: Fri Apr 28 20:11:39 2017 +0200

----------------------------------------------------------------------
 .../checkpoint/CheckpointCoordinatorTest.java   | 292 +++++++++++++++++++
 1 file changed, 292 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2c68085f/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
index 0d2e903..41b0e35 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java
@@ -2370,6 +2370,22 @@ public class CheckpointCoordinatorTest {
                
testRestoreLatestCheckpointedStateWithChangingParallelism(false);
        }
 
+       @Test
+       public void testStateRecoveryWhenTopologyChangeOut() throws Exception {
+               testStateRecoveryWithTopologyChange(0);
+       }
+
+       @Test
+       public void testStateRecoveryWhenTopologyChangeIn() throws Exception {
+               testStateRecoveryWithTopologyChange(1);
+       }
+
+       @Test
+       public void testStateRecoveryWhenTopologyChange() throws Exception {
+               testStateRecoveryWithTopologyChange(2);
+       }
+
+
        /**
         * Tests the checkpoint restoration with changing parallelism of job 
vertex with partitioned
         * state.
@@ -2530,6 +2546,282 @@ public class CheckpointCoordinatorTest {
                comparePartitionableState(expectedOpStatesRaw, 
actualOpStatesRaw);
        }
 
+       private static Tuple2<JobVertexID, OperatorID> generateIDPair() {
+               JobVertexID jobVertexID = new JobVertexID();
+               OperatorID operatorID = OperatorID.fromJobVertexID(jobVertexID);
+               return new Tuple2<>(jobVertexID, operatorID);
+       }
+       
+       /**
+        * old topology
+        * [operator1,operator2] * parallelism1 -> [operator3,operator4] * 
parallelism2
+        *
+        *
+        * new topology
+        *
+        * [operator5,operator1,operator3] * newParallelism1 -> [operator3, 
operator6] * newParallelism2
+        *
+        * scaleType:
+        * 0  increase parallelism
+        * 1  decrease parallelism
+        * 2  same parallelism
+        */
+       public void testStateRecoveryWithTopologyChange(int scaleType) throws 
Exception {
+
+               /**
+                * Old topology
+                * CHAIN(op1 -> op2) * parallelism1 -> CHAIN(op3 -> op4) * 
parallelism2
+                */
+               Tuple2<JobVertexID, OperatorID> id1 = generateIDPair();
+               Tuple2<JobVertexID, OperatorID> id2 = generateIDPair();
+               int parallelism1 = 10;
+               int maxParallelism1 = 64;
+
+               Tuple2<JobVertexID, OperatorID> id3 = generateIDPair();
+               Tuple2<JobVertexID, OperatorID> id4 = generateIDPair();
+               int parallelism2 = 10;
+               int maxParallelism2 = 64;
+
+               List<KeyGroupRange> keyGroupPartitions2 =
+                       
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, 
parallelism2);
+
+               Map<OperatorID, OperatorState> operatorStates = new HashMap<>();
+
+               //prepare vertex1 state
+               for (Tuple2<JobVertexID, OperatorID> id : 
Lists.newArrayList(id1, id2)) {
+                       OperatorState taskState = new OperatorState(id.f1, 
parallelism1, maxParallelism1);
+                       operatorStates.put(id.f1, taskState);
+                       for (int index = 0; index < taskState.getParallelism(); 
index++) {
+                               StreamStateHandle subNonPartitionedState = 
+                                       generateStateForVertex(id.f0, index)
+                                               .get(0);
+                               OperatorStateHandle subManagedOperatorState =
+                                       
generateChainedPartitionableStateHandle(id.f0, index, 2, 8, false)
+                                               .get(0);
+                               OperatorStateHandle subRawOperatorState =
+                                       
generateChainedPartitionableStateHandle(id.f0, index, 2, 8, true)
+                                               .get(0);
+
+                               OperatorSubtaskState subtaskState = new 
OperatorSubtaskState(subNonPartitionedState,
+                                       subManagedOperatorState,
+                                       subRawOperatorState,
+                                       null,
+                                       null);
+                               taskState.putState(index, subtaskState);
+                       }
+               }
+
+               List<List<ChainedStateHandle<OperatorStateHandle>>> 
expectedManagedOperatorStates = new ArrayList<>();
+               List<List<ChainedStateHandle<OperatorStateHandle>>> 
expectedRawOperatorStates = new ArrayList<>();
+               //prepare vertex2 state
+               for (Tuple2<JobVertexID, OperatorID> id : 
Lists.newArrayList(id3, id4)) {
+                       OperatorState operatorState = new OperatorState(id.f1, 
parallelism2, maxParallelism2);
+                       operatorStates.put(id.f1, operatorState);
+                       List<ChainedStateHandle<OperatorStateHandle>> 
expectedManagedOperatorState = new ArrayList<>();
+                       List<ChainedStateHandle<OperatorStateHandle>> 
expectedRawOperatorState = new ArrayList<>();
+                       
expectedManagedOperatorStates.add(expectedManagedOperatorState);
+                       expectedRawOperatorStates.add(expectedRawOperatorState);
+
+                       for (int index = 0; index < 
operatorState.getParallelism(); index++) {
+                               OperatorStateHandle subManagedOperatorState =
+                                       
generateChainedPartitionableStateHandle(id.f0, index, 2, 8, false)
+                                               .get(0);
+                               OperatorStateHandle subRawOperatorState =
+                                       
generateChainedPartitionableStateHandle(id.f0, index, 2, 8, true)
+                                               .get(0);
+                               KeyGroupsStateHandle subManagedKeyedState = 
id.f0.equals(id3.f0)
+                                       ? generateKeyGroupState(id.f0, 
keyGroupPartitions2.get(index), false)
+                                       : null;
+                               KeyGroupsStateHandle subRawKeyedState = 
id.f0.equals(id3.f0)
+                                       ? generateKeyGroupState(id.f0, 
keyGroupPartitions2.get(index), true)
+                                       : null;
+
+                               
expectedManagedOperatorState.add(ChainedStateHandle.wrapSingleHandle(subManagedOperatorState));
+                               
expectedRawOperatorState.add(ChainedStateHandle.wrapSingleHandle(subRawOperatorState));
+
+                               OperatorSubtaskState subtaskState = new 
OperatorSubtaskState(
+                                       null,
+                                       subManagedOperatorState,
+                                       subRawOperatorState,
+                                       subManagedKeyedState,
+                                       subRawKeyedState);
+                               operatorState.putState(index, subtaskState);
+                       }
+               }
+
+               /**
+                * New topology
+                * CHAIN(op5 -> op1 -> op2) * newParallelism1 -> CHAIN(op3 -> 
op6) * newParallelism2
+                */
+               Tuple2<JobVertexID, OperatorID> id5 = generateIDPair();
+               int newParallelism1 = 10;
+
+               Tuple2<JobVertexID, OperatorID> id6 = generateIDPair();
+               int newParallelism2 = parallelism2;
+
+               if (scaleType == 0) {
+                       newParallelism2 = 20;
+               } else if (scaleType == 1) {
+                       newParallelism2 = 8;
+               }
+
+               List<KeyGroupRange> newKeyGroupPartitions2 =
+                       
StateAssignmentOperation.createKeyGroupPartitions(maxParallelism2, 
newParallelism2);
+
+               final ExecutionJobVertex newJobVertex1 = mockExecutionJobVertex(
+                       id5.f0,
+                       Lists.newArrayList(id2.f1, id1.f1, id5.f1),
+                       newParallelism1,
+                       maxParallelism1);
+
+               final ExecutionJobVertex newJobVertex2 = mockExecutionJobVertex(
+                       id3.f0,
+                       Lists.newArrayList(id6.f1, id3.f1),
+                       newParallelism2,
+                       maxParallelism2);
+
+               Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
+
+               tasks.put(id5.f0, newJobVertex1);
+               tasks.put(id3.f0, newJobVertex2);
+
+               JobID jobID = new JobID();
+               StandaloneCompletedCheckpointStore 
standaloneCompletedCheckpointStore =
+                       spy(new StandaloneCompletedCheckpointStore(1));
+
+               CompletedCheckpoint completedCheckpoint = new 
CompletedCheckpoint(
+                               jobID,
+                               2,
+                               System.currentTimeMillis(),
+                               System.currentTimeMillis() + 3000,
+                               operatorStates,
+                               Collections.<MasterState>emptyList(),
+                               CheckpointProperties.forStandardCheckpoint(),
+                               null,
+                               null);
+
+               
when(standaloneCompletedCheckpointStore.getLatestCheckpoint()).thenReturn(completedCheckpoint);
+
+               // set up the coordinator and validate the initial state
+               CheckpointCoordinator coord = new CheckpointCoordinator(
+                       new JobID(),
+                       600000,
+                       600000,
+                       0,
+                       Integer.MAX_VALUE,
+                       ExternalizedCheckpointSettings.none(),
+                       newJobVertex1.getTaskVertices(),
+                       newJobVertex1.getTaskVertices(),
+                       newJobVertex1.getTaskVertices(),
+                       new StandaloneCheckpointIDCounter(),
+                       standaloneCompletedCheckpointStore,
+                       null,
+                       Executors.directExecutor());
+
+               coord.restoreLatestCheckpointedState(tasks, false, true);
+
+               for (int i = 0; i < newJobVertex1.getParallelism(); i++) {
+
+                       TaskStateHandles taskStateHandles = 
newJobVertex1.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskStateHandles();
+                       ChainedStateHandle<StreamStateHandle> 
actualSubNonPartitionedState = taskStateHandles.getLegacyOperatorState();
+                       List<Collection<OperatorStateHandle>> 
actualSubManagedOperatorState = taskStateHandles.getManagedOperatorState();
+                       List<Collection<OperatorStateHandle>> 
actualSubRawOperatorState = taskStateHandles.getRawOperatorState();
+
+                       assertNull(taskStateHandles.getManagedKeyedState());
+                       assertNull(taskStateHandles.getRawKeyedState());
+
+                       // operator5
+                       {
+                               int operatorIndexInChain = 2;
+                               
assertNull(actualSubNonPartitionedState.get(operatorIndexInChain));
+                               
assertNull(actualSubManagedOperatorState.get(operatorIndexInChain));
+                               
assertNull(actualSubRawOperatorState.get(operatorIndexInChain));
+                       }
+                       // operator1
+                       {
+                               int operatorIndexInChain = 1;
+                               ChainedStateHandle<StreamStateHandle> 
expectSubNonPartitionedState = generateStateForVertex(id1.f0, i);
+                               ChainedStateHandle<OperatorStateHandle> 
expectedManagedOpState = generateChainedPartitionableStateHandle(
+                                       id1.f0, i, 2, 8, false);
+                               ChainedStateHandle<OperatorStateHandle> 
expectedRawOpState = generateChainedPartitionableStateHandle(
+                                       id1.f0, i, 2, 8, true);
+
+                               assertTrue(CommonTestUtils.isSteamContentEqual(
+                                       
expectSubNonPartitionedState.get(0).openInputStream(),
+                                       
actualSubNonPartitionedState.get(operatorIndexInChain).openInputStream()));
+
+                               
assertTrue(CommonTestUtils.isSteamContentEqual(expectedManagedOpState.get(0).openInputStream(),
+                                       
actualSubManagedOperatorState.get(operatorIndexInChain).iterator().next().openInputStream()));
+
+                               
assertTrue(CommonTestUtils.isSteamContentEqual(expectedRawOpState.get(0).openInputStream(),
+                                       
actualSubRawOperatorState.get(operatorIndexInChain).iterator().next().openInputStream()));
+                       }
+                       // operator2
+                       {
+                               int operatorIndexInChain = 0;
+                               ChainedStateHandle<StreamStateHandle> 
expectSubNonPartitionedState = generateStateForVertex(id2.f0, i);
+                               ChainedStateHandle<OperatorStateHandle> 
expectedManagedOpState = generateChainedPartitionableStateHandle(
+                                       id2.f0, i, 2, 8, false);
+                               ChainedStateHandle<OperatorStateHandle> 
expectedRawOpState = generateChainedPartitionableStateHandle(
+                                       id2.f0, i, 2, 8, true);
+
+                               
assertTrue(CommonTestUtils.isSteamContentEqual(expectSubNonPartitionedState.get(0).openInputStream(),
+                                       
actualSubNonPartitionedState.get(operatorIndexInChain).openInputStream()));
+
+                               
assertTrue(CommonTestUtils.isSteamContentEqual(expectedManagedOpState.get(0).openInputStream(),
+                                       
actualSubManagedOperatorState.get(operatorIndexInChain).iterator().next().openInputStream()));
+
+                               
assertTrue(CommonTestUtils.isSteamContentEqual(expectedRawOpState.get(0).openInputStream(),
+                                       
actualSubRawOperatorState.get(operatorIndexInChain).iterator().next().openInputStream()));
+                       }
+               }
+
+               List<List<Collection<OperatorStateHandle>>> 
actualManagedOperatorStates = new ArrayList<>(newJobVertex2.getParallelism());
+               List<List<Collection<OperatorStateHandle>>> 
actualRawOperatorStates = new ArrayList<>(newJobVertex2.getParallelism());
+
+               for (int i = 0; i < newJobVertex2.getParallelism(); i++) {
+                       TaskStateHandles taskStateHandles = 
newJobVertex2.getTaskVertices()[i].getCurrentExecutionAttempt().getTaskStateHandles();
+
+                       // operator 3
+                       {
+                               int operatorIndexInChain = 1;
+                               List<Collection<OperatorStateHandle>> 
actualSubManagedOperatorState = new ArrayList<>(1);
+                               
actualSubManagedOperatorState.add(taskStateHandles.getManagedOperatorState().get(operatorIndexInChain));
+
+                               List<Collection<OperatorStateHandle>> 
actualSubRawOperatorState = new ArrayList<>(1);
+                               
actualSubRawOperatorState.add(taskStateHandles.getRawOperatorState().get(operatorIndexInChain));
+
+                               
actualManagedOperatorStates.add(actualSubManagedOperatorState);
+                               
actualRawOperatorStates.add(actualSubRawOperatorState);
+
+                               
assertNull(taskStateHandles.getLegacyOperatorState().get(operatorIndexInChain));
+                       }
+
+                       // operator 6
+                       {
+                               int operatorIndexInChain = 0;
+                               
assertNull(taskStateHandles.getManagedOperatorState().get(operatorIndexInChain));
+                               
assertNull(taskStateHandles.getRawOperatorState().get(operatorIndexInChain));
+                               
assertNull(taskStateHandles.getLegacyOperatorState().get(operatorIndexInChain));
+
+                       }
+
+                       KeyGroupsStateHandle originalKeyedStateBackend = 
generateKeyGroupState(id3.f0, newKeyGroupPartitions2.get(i), false);
+                       KeyGroupsStateHandle originalKeyedStateRaw = 
generateKeyGroupState(id3.f0, newKeyGroupPartitions2.get(i), true);
+
+
+                       Collection<KeyedStateHandle> keyedStateBackend = 
taskStateHandles.getManagedKeyedState();
+                       Collection<KeyedStateHandle> keyGroupStateRaw = 
taskStateHandles.getRawKeyedState();
+
+
+                       
compareKeyedState(Collections.singletonList(originalKeyedStateBackend), 
keyedStateBackend);
+                       
compareKeyedState(Collections.singletonList(originalKeyedStateRaw), 
keyGroupStateRaw);
+               }
+
+               comparePartitionableState(expectedManagedOperatorStates.get(0), 
actualManagedOperatorStates);
+               comparePartitionableState(expectedRawOperatorStates.get(0), 
actualRawOperatorStates);
+       }
+
        /**
         * Tests that the externalized checkpoint configuration is respected.
         */

Reply via email to