This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 8f3a025 [FLINK-26168][runtime] The judgment of chainable ignores StreamExchangeMode when partitioner is ForwardForConsecutiveHashPartitioner 8f3a025 is described below commit 8f3a0251d195ed2532abc31e602029dfcbf7bc77 Author: Lijie Wang <wangdachui9...@gmail.com> AuthorDate: Wed Feb 16 08:36:22 2022 +0800 [FLINK-26168][runtime] The judgment of chainable ignores StreamExchangeMode when partitioner is ForwardForConsecutiveHashPartitioner This closes #18789. --- .../api/graph/StreamingJobGraphGenerator.java | 22 ++++++++++++++++++++-- .../ForwardForConsecutiveHashPartitionerTest.java | 13 +++++++++++++ .../partitioner/StreamPartitionerTestUtils.java | 22 +++++++++++++++++++--- 3 files changed, 52 insertions(+), 5 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index e056048..4801ec7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -1064,8 +1064,10 @@ public class StreamingJobGraphGenerator { if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex) && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph) - && (edge.getPartitioner() instanceof ForwardPartitioner) - && edge.getExchangeMode() != StreamExchangeMode.BATCH + && arePartitionerAndExchangeModeChainable( + edge.getPartitioner(), + edge.getExchangeMode(), + streamGraph.getExecutionConfig().isDynamicGraph()) && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() && streamGraph.isChainingEnabled())) { @@ -1084,6 +1086,22 @@ public class StreamingJobGraphGenerator { } @VisibleForTesting + static boolean arePartitionerAndExchangeModeChainable( + StreamPartitioner<?> partitioner, + StreamExchangeMode exchangeMode, + boolean isDynamicGraph) { + if (partitioner instanceof ForwardForConsecutiveHashPartitioner) { + checkState(isDynamicGraph); + return true; + } else if ((partitioner instanceof ForwardPartitioner) + && exchangeMode != StreamExchangeMode.BATCH) { + return true; + } else { + return false; + } + } + + @VisibleForTesting static boolean areOperatorsChainable( StreamNode upStreamVertex, StreamNode downStreamVertex, StreamGraph streamGraph) { StreamOperatorFactory<?> upStreamOperator = upStreamVertex.getOperatorFactory(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java index 8e1863b..9e0fd6c 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardForConsecutiveHashPartitionerTest.java @@ -21,6 +21,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamEdge; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -36,6 +37,12 @@ public class ForwardForConsecutiveHashPartitionerTest extends TestLogger { @Test public void testConvertToForwardPartitioner() { + testConvertToForwardPartitioner(StreamExchangeMode.BATCH); + testConvertToForwardPartitioner(StreamExchangeMode.PIPELINED); + testConvertToForwardPartitioner(StreamExchangeMode.UNDEFINED); + } + + private void testConvertToForwardPartitioner(StreamExchangeMode streamExchangeMode) { JobGraph jobGraph = StreamPartitionerTestUtils.createJobGraph( "group1", @@ -53,6 +60,12 @@ public class ForwardForConsecutiveHashPartitionerTest extends TestLogger { @Test public void testConvertToHashPartitioner() { + testConvertToHashPartitioner(StreamExchangeMode.BATCH); + testConvertToHashPartitioner(StreamExchangeMode.PIPELINED); + testConvertToHashPartitioner(StreamExchangeMode.UNDEFINED); + } + + private void testConvertToHashPartitioner(StreamExchangeMode streamExchangeMode) { JobGraph jobGraph = StreamPartitionerTestUtils.createJobGraph( "group1", diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java index 2059aaa..12e9d54 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/partitioner/StreamPartitionerTestUtils.java @@ -23,6 +23,7 @@ import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.DiscardingSink; import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; /** Utility class to test {@link StreamPartitioner}. */ public class StreamPartitionerTestUtils { @@ -31,6 +32,18 @@ public class StreamPartitionerTestUtils { String sourceSlotSharingGroup, String sinkSlotSharingGroup, StreamPartitioner<Long> streamPartitioner) { + return createJobGraph( + sourceSlotSharingGroup, + sinkSlotSharingGroup, + streamPartitioner, + StreamExchangeMode.UNDEFINED); + } + + public static JobGraph createJobGraph( + String sourceSlotSharingGroup, + String sinkSlotSharingGroup, + StreamPartitioner<Long> streamPartitioner, + StreamExchangeMode exchangeMode) { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setRuntimeMode(RuntimeExecutionMode.BATCH); @@ -39,7 +52,7 @@ public class StreamPartitionerTestUtils { final DataStream<Long> source = env.fromSequence(0, 99).slotSharingGroup(sourceSlotSharingGroup).name("source"); - setPartitioner(source, streamPartitioner) + setPartitioner(source, streamPartitioner, exchangeMode) .addSink(new DiscardingSink<>()) .slotSharingGroup(sinkSlotSharingGroup) .name("sink"); @@ -48,10 +61,13 @@ public class StreamPartitionerTestUtils { } private static <T> DataStream<T> setPartitioner( - DataStream<T> dataStream, StreamPartitioner<T> partitioner) { + DataStream<T> dataStream, + StreamPartitioner<T> partitioner, + StreamExchangeMode exchangeMode) { return new DataStream<T>( dataStream.getExecutionEnvironment(), - new PartitionTransformation<T>(dataStream.getTransformation(), partitioner)); + new PartitionTransformation<T>( + dataStream.getTransformation(), partitioner, exchangeMode)); } /** Utility class, should not be instantiated. */