zhuzhurk commented on a change in pull request #11774:
URL: https://github.com/apache/flink/pull/11774#discussion_r414270078



##########
File path: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
##########
@@ -898,6 +825,116 @@ public void 
testSlotSharingOnAllVerticesInSameSlotSharingGroupByDefaultDisabled(
                assertDistinctSharingGroups(source1Vertex, source2Vertex, 
map2Vertex);
        }
 
+       @Test
+       public void testDefaultGlobalDataExchangeModeIsAllEdgesPipelined() {
+               final StreamGraph streamGraph = 
createStreamGraphForGlobalDataExchangeModeTests();
+               assertThat(streamGraph.getGlobalDataExchangeMode(), 
is(GlobalDataExchangeMode.ALL_EDGES_PIPELINED));
+       }
+
+       @Test
+       public void testAllEdgesBlockingMode() {
+               final StreamGraph streamGraph = 
createStreamGraphForGlobalDataExchangeModeTests();
+               
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
+               final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+               final List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               final JobVertex sourceVertex = verticesSorted.get(0);
+               final JobVertex map1Vertex = verticesSorted.get(1);
+               final JobVertex map2Vertex = verticesSorted.get(2);
+
+               assertEquals(ResultPartitionType.BLOCKING, 
sourceVertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.BLOCKING, 
map1Vertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.BLOCKING, 
map2Vertex.getProducedDataSets().get(0).getResultType());
+       }
+
+       @Test
+       public void testAllEdgesPipelinedMode() {
+               final StreamGraph streamGraph = 
createStreamGraphForGlobalDataExchangeModeTests();
+               
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_PIPELINED);
+               final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+               final List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               final JobVertex sourceVertex = verticesSorted.get(0);
+               final JobVertex map1Vertex = verticesSorted.get(1);
+               final JobVertex map2Vertex = verticesSorted.get(2);
+
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
sourceVertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
map1Vertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
map2Vertex.getProducedDataSets().get(0).getResultType());
+       }
+
+       @Test
+       public void testForwardEdgesPipelinedMode() {
+               final StreamGraph streamGraph = 
createStreamGraphForGlobalDataExchangeModeTests();
+               
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED);
+               final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+               final List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               final JobVertex sourceVertex = verticesSorted.get(0);
+               final JobVertex map1Vertex = verticesSorted.get(1);
+               final JobVertex map2Vertex = verticesSorted.get(2);
+
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
sourceVertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.BLOCKING, 
map1Vertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.BLOCKING, 
map2Vertex.getProducedDataSets().get(0).getResultType());
+       }
+
+       @Test
+       public void testPointwiseEdgesPipelinedMode() {
+               final StreamGraph streamGraph = 
createStreamGraphForGlobalDataExchangeModeTests();
+               
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED);
+               final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+               final List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               final JobVertex sourceVertex = verticesSorted.get(0);
+               final JobVertex map1Vertex = verticesSorted.get(1);
+               final JobVertex map2Vertex = verticesSorted.get(2);
+
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
sourceVertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
map1Vertex.getProducedDataSets().get(0).getResultType());
+               assertEquals(ResultPartitionType.BLOCKING, 
map2Vertex.getProducedDataSets().get(0).getResultType());
+       }
+
+       @Test
+       public void 
testGlobalDataExchangeModeDoesNotOverrideSpecifiedShuffleMode() {
+               final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               final DataStream<Integer> source = env.fromElements(1, 2, 
3).setParallelism(1);
+               final DataStream<Integer> forward = new DataStream<>(env, new 
PartitionTransformation<>(
+                       source.getTransformation(), new ForwardPartitioner<>(), 
ShuffleMode.PIPELINED));
+               forward.map(i -> i).startNewChain().setParallelism(1);
+               final StreamGraph streamGraph = env.getStreamGraph();
+               
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
+
+               final JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(streamGraph);
+
+               final List<JobVertex> verticesSorted = 
jobGraph.getVerticesSortedTopologicallyFromSources();
+               final JobVertex sourceVertex = verticesSorted.get(0);
+
+               assertEquals(ResultPartitionType.PIPELINED_BOUNDED, 
sourceVertex.getProducedDataSets().get(0).getResultType());
+       }
+
+       /**
+        * Topology: source(parallelism=1) --(forward)--> map1(parallelism=1)
+        *           --(rescale)--> map2(parallelism=2) --(rebalance)--> 
sink(parallelism=2).
+        */
+       private StreamGraph createStreamGraphForGlobalDataExchangeModeTests() {

Review comment:
       Ok. I have moved the tests to 
`StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest`.
   Also make this method static.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to