zentol commented on a change in pull request #8446: [FLINK-12414] [runtime] 
Implement ExecutionGraph to SchedulingTopology
URL: https://github.com/apache/flink/pull/8446#discussion_r284697765
 
 

 ##########
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adapter/DefaultResultPartitionTest.java
 ##########
 @@ -62,121 +49,73 @@
 /**
  * Unit tests for {@link DefaultResultPartition}.
  */
-public class DefaultResultPartitionTest {
+public class DefaultResultPartitionTest extends TestLogger {
 
-       private final SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
+       private final DefaultExecutionVertexTest.ExecutionStateProviderTest 
stateProvider = new DefaultExecutionVertexTest.ExecutionStateProviderTest();
 
-       private final TestRestartStrategy triggeredRestartStrategy = 
TestRestartStrategy.manuallyTriggered();
+       private List<SchedulingExecutionVertex> schedulingExecutionVertices;
 
-       private ExecutionGraph executionGraph;
-
-       private ExecutionGraphToSchedulingTopologyAdapter adapter;
-
-       private List<IntermediateResultPartition> intermediateResultPartitions;
-
-       private List<SchedulingResultPartition> schedulingResultPartitions;
+       private DefaultResultPartition resultPartition;
 
        @Before
-       public void setUp() throws Exception {
-               final int parallelism = 3;
-               JobVertex[] jobVertices = new JobVertex[2];
-               jobVertices[0] = createNoOpVertex(parallelism);
-               jobVertices[1] = createNoOpVertex(parallelism);
-               jobVertices[1].connectNewDataSetAsInput(jobVertices[0], 
ALL_TO_ALL, BLOCKING);
-               jobVertices[0].setInputDependencyConstraint(ALL);
-               jobVertices[1].setInputDependencyConstraint(ANY);
-               executionGraph = createSimpleTestGraph(
-                       new JobID(),
-                       taskManagerGateway,
-                       triggeredRestartStrategy,
-                       jobVertices);
-               adapter = new 
ExecutionGraphToSchedulingTopologyAdapter(executionGraph);
-
-               intermediateResultPartitions = new ArrayList<>();
-               schedulingResultPartitions = new ArrayList<>();
-
-               for (ExecutionVertex vertex : 
executionGraph.getAllExecutionVertices()) {
-                       for (Map.Entry<IntermediateResultPartitionID, 
IntermediateResultPartition> entry
-                               : vertex.getProducedPartitions().entrySet()) {
-                               
intermediateResultPartitions.add(entry.getValue());
-                               
schedulingResultPartitions.add(adapter.getResultPartition(entry.getKey())
-                                       .orElseThrow(() -> new 
IllegalArgumentException("can not find partition" + entry.getKey())));
-                       }
-               }
-               assertEquals(parallelism, intermediateResultPartitions.size());
-       }
-
-       @Test
-       public void testBasicInfo() {
-               for (int idx = 0; idx < intermediateResultPartitions.size(); 
idx++) {
-                       final IntermediateResultPartition partition = 
intermediateResultPartitions.get(idx);
-                       final SchedulingResultPartition 
schedulingResultPartition = schedulingResultPartitions.get(idx);
-                       assertEquals(partition.getPartitionId(), 
schedulingResultPartition.getId());
-                       assertEquals(partition.getIntermediateResult().getId(), 
schedulingResultPartition.getResultId());
-                       assertEquals(partition.getResultType(), 
schedulingResultPartition.getPartitionType());
-               }
+       public void setUp() {
+               schedulingExecutionVertices = new ArrayList<>(2);
+               resultPartition = new DefaultResultPartition(
+                       new IntermediateResultPartitionID(),
+                       new IntermediateDataSetID(),
+                       BLOCKING);
+
+               DefaultExecutionVertex vertex1 = new DefaultExecutionVertex(
+                       new ExecutionVertexID(new JobVertexID(), 0),
+                       Collections.singletonList(resultPartition),
+                       ALL,
+                       stateProvider);
+               resultPartition.setProducer(vertex1);
+               DefaultExecutionVertex vertex2 = new DefaultExecutionVertex(
+                       new ExecutionVertexID(new JobVertexID(), 0),
+                       java.util.Collections.emptyList(),
+                       ALL,
+                       stateProvider);
+               resultPartition.addConsumer(vertex2);
+               schedulingExecutionVertices.add(vertex1);
+               schedulingExecutionVertices.add(vertex2);
        }
 
        @Test
        public void testGetConsumers() {
-               for (int idx = 0; idx < intermediateResultPartitions.size(); 
idx++) {
-                       Collection<ExecutionVertexID> schedulingConsumers = 
schedulingResultPartitions.get(idx).getConsumers()
-                               
.stream().map(SchedulingExecutionVertex::getId).collect(Collectors.toList());
+               Collection<ExecutionVertexID> schedulingConsumers = 
resultPartition.getConsumers()
+                       
.stream().map(SchedulingExecutionVertex::getId).collect(Collectors.toList());
 
-                       Set<ExecutionVertexID> executionConsumers = new 
HashSet<>();
-                       for (List<ExecutionEdge> list 
:intermediateResultPartitions.get(idx).getConsumers()) {
-                               for (ExecutionEdge edge : list) {
-                                       final ExecutionVertex vertex = 
edge.getTarget();
-                                       executionConsumers.add(new 
ExecutionVertexID(vertex.getJobvertexId(), vertex.getParallelSubtaskIndex()));
-                               }
-                       }
-                       assertThat(schedulingConsumers, 
containsInAnyOrder(executionConsumers.toArray()));
-               }
+               List<ExecutionVertexID> executionConsumers = 
Collections.singletonList(schedulingExecutionVertices.get(1).getId());
+               assertThat(schedulingConsumers, 
containsInAnyOrder(executionConsumers.toArray()));
        }
 
        @Test
        public void testGetProducer() {
-               for (int idx = 0; idx < intermediateResultPartitions.size(); 
idx++) {
-                       final ExecutionVertex vertex = 
intermediateResultPartitions.get(idx).getProducer();
-                       
assertEquals(schedulingResultPartitions.get(idx).getProducer().getId(),
-                               new ExecutionVertexID(vertex.getJobvertexId(), 
vertex.getParallelSubtaskIndex()));
-               }
+               assertEquals(resultPartition.getProducer().getId(), 
schedulingExecutionVertices.get(0).getId());
        }
 
        @Test
        public void testGetPartitionState() {
-               List<SchedulingExecutionVertex> schedulingExecutionVertices = 
new ArrayList<>();
-               executionGraph.getAllExecutionVertices().forEach(
-                       vertex -> schedulingExecutionVertices.add(new 
DefaultExecutionVertex(vertex)));
-
                final ExecutionState[] states = ExecutionState.values();
 
 Review comment:
   in-line, as above

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to