This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4e65322dc1b5f80a7f3a42f0f205f978357daa40 Author: Timo Walther <twal...@apache.org> AuthorDate: Mon Jul 19 14:22:18 2021 +0200 [FLINK-23402][streaming-java] Refactor ShuffleMode to StreamExchangeMode --- .../api/graph/GlobalDataExchangeMode.java | 4 +-- .../flink/streaming/api/graph/StreamEdge.java | 18 ++++++------ .../flink/streaming/api/graph/StreamGraph.java | 23 ++++++++-------- .../api/graph/StreamingJobGraphGenerator.java | 8 +++--- .../transformations/PartitionTransformation.java | 18 ++++++------ .../{ShuffleMode.java => StreamExchangeMode.java} | 6 ++-- .../PartitionTransformationTranslator.java | 2 +- .../api/graph/StreamingJobGraphGeneratorTest.java | 30 ++++++++++---------- ...aphGeneratorWithGlobalDataExchangeModeTest.java | 8 +++--- .../table/planner/plan/nodes/exec/ExecEdge.java | 31 +++++++++++---------- .../plan/nodes/exec/batch/BatchExecExchange.java | 32 +++++++++++----------- .../exec/processor/DeadlockBreakupProcessor.java | 4 +-- .../MultipleInputNodeCreationProcessor.java | 4 +-- .../utils/InputPriorityConflictResolver.java | 21 +++++++------- .../exec/serde/ExecNodeGraphJsonPlanGenerator.java | 18 ++++++------ .../flink/table/planner/utils/ExecutorUtils.java | 2 +- ...ModeUtils.java => StreamExchangeModeUtils.java} | 2 +- .../physical/batch/BatchPhysicalExchange.scala | 12 ++++---- .../utils/InputPriorityConflictResolverTest.java | 19 +++++++------ ...sTest.java => StreamExchangeModeUtilsTest.java} | 28 +++++++++---------- 20 files changed, 149 insertions(+), 141 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java index b2b5f21..3118f7f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java @@ -19,13 +19,13 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.streaming.api.transformations.ShuffleMode; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner; /** * This mode decides the default {@link ResultPartitionType} of job edges. Note that this only - * affects job edges which are {@link ShuffleMode#UNDEFINED}. + * affects job edges which are {@link StreamExchangeMode#UNDEFINED}. */ public enum GlobalDataExchangeMode { /** Set all job edges to be {@link ResultPartitionType#BLOCKING}. */ diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java index f76edd5..5ca627c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.api.graph; import org.apache.flink.annotation.Internal; -import org.apache.flink.streaming.api.transformations.ShuffleMode; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.util.OutputTag; @@ -58,7 +58,7 @@ public class StreamEdge implements Serializable { /** The name of the operator in the target vertex. */ private final String targetOperatorName; - private final ShuffleMode shuffleMode; + private final StreamExchangeMode exchangeMode; private long bufferTimeout; @@ -78,7 +78,7 @@ public class StreamEdge implements Serializable { ALWAYS_FLUSH_BUFFER_TIMEOUT, outputPartitioner, outputTag, - ShuffleMode.UNDEFINED); + StreamExchangeMode.UNDEFINED); } public StreamEdge( @@ -87,7 +87,7 @@ public class StreamEdge implements Serializable { int typeNumber, StreamPartitioner<?> outputPartitioner, OutputTag outputTag, - ShuffleMode shuffleMode) { + StreamExchangeMode exchangeMode) { this( sourceVertex, @@ -96,7 +96,7 @@ public class StreamEdge implements Serializable { sourceVertex.getBufferTimeout(), outputPartitioner, outputTag, - shuffleMode); + exchangeMode); } public StreamEdge( @@ -106,7 +106,7 @@ public class StreamEdge implements Serializable { long bufferTimeout, StreamPartitioner<?> outputPartitioner, OutputTag outputTag, - ShuffleMode shuffleMode) { + StreamExchangeMode exchangeMode) { this.sourceId = sourceVertex.getId(); this.targetId = targetVertex.getId(); @@ -116,7 +116,7 @@ public class StreamEdge implements Serializable { this.outputTag = outputTag; this.sourceOperatorName = sourceVertex.getOperatorName(); this.targetOperatorName = targetVertex.getOperatorName(); - this.shuffleMode = checkNotNull(shuffleMode); + this.exchangeMode = checkNotNull(exchangeMode); this.edgeId = sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + outputPartitioner; } @@ -141,8 +141,8 @@ public class StreamEdge implements Serializable { return outputPartitioner; } - public ShuffleMode getShuffleMode() { - return shuffleMode; + public StreamExchangeMode getExchangeMode() { + return exchangeMode; } public void setPartitioner(StreamPartitioner<?> partitioner) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index e3f284f..a641898 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -47,7 +47,7 @@ import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.OutputFormatOperatorFactory; import org.apache.flink.streaming.api.operators.SourceOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; -import org.apache.flink.streaming.api.transformations.ShuffleMode; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; @@ -113,7 +113,8 @@ public class StreamGraph implements Pipeline { private Set<Integer> sources; private Set<Integer> sinks; private Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes; - private Map<Integer, Tuple3<Integer, StreamPartitioner<?>, ShuffleMode>> virtualPartitionNodes; + private Map<Integer, Tuple3<Integer, StreamPartitioner<?>, StreamExchangeMode>> + virtualPartitionNodes; protected Map<Integer, String> vertexIDtoBrokerID; protected Map<Integer, Long> vertexIDtoLoopTimeout; @@ -555,14 +556,14 @@ public class StreamGraph implements Pipeline { Integer originalId, Integer virtualId, StreamPartitioner<?> partitioner, - ShuffleMode shuffleMode) { + StreamExchangeMode exchangeMode) { if (virtualPartitionNodes.containsKey(virtualId)) { throw new IllegalStateException( "Already has virtual partition node with id " + virtualId); } - virtualPartitionNodes.put(virtualId, new Tuple3<>(originalId, partitioner, shuffleMode)); + virtualPartitionNodes.put(virtualId, new Tuple3<>(originalId, partitioner, exchangeMode)); } /** Determines the slot sharing group of an operation across virtual nodes. */ @@ -597,7 +598,7 @@ public class StreamGraph implements Pipeline { StreamPartitioner<?> partitioner, List<String> outputNames, OutputTag outputTag, - ShuffleMode shuffleMode) { + StreamExchangeMode exchangeMode) { if (virtualSideOutputNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; @@ -612,14 +613,14 @@ public class StreamGraph implements Pipeline { partitioner, null, outputTag, - shuffleMode); + exchangeMode); } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) { int virtualId = upStreamVertexID; upStreamVertexID = virtualPartitionNodes.get(virtualId).f0; if (partitioner == null) { partitioner = virtualPartitionNodes.get(virtualId).f1; } - shuffleMode = virtualPartitionNodes.get(virtualId).f2; + exchangeMode = virtualPartitionNodes.get(virtualId).f2; addEdgeInternal( upStreamVertexID, downStreamVertexID, @@ -627,7 +628,7 @@ public class StreamGraph implements Pipeline { partitioner, outputNames, outputTag, - shuffleMode); + exchangeMode); } else { StreamNode upstreamNode = getStreamNode(upStreamVertexID); StreamNode downstreamNode = getStreamNode(downStreamVertexID); @@ -657,8 +658,8 @@ public class StreamGraph implements Pipeline { } } - if (shuffleMode == null) { - shuffleMode = ShuffleMode.UNDEFINED; + if (exchangeMode == null) { + exchangeMode = StreamExchangeMode.UNDEFINED; } StreamEdge edge = @@ -668,7 +669,7 @@ public class StreamGraph implements Pipeline { typeNumber, partitioner, outputTag, - shuffleMode); + exchangeMode); getStreamNode(edge.getSourceId()).addOutEdge(edge); getStreamNode(edge.getTargetId()).addInEdge(edge); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 6e96655..a4c0936 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -62,7 +62,7 @@ import org.apache.flink.streaming.api.operators.SourceOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperatorFactory; import org.apache.flink.streaming.api.operators.UdfStreamOperatorFactory; import org.apache.flink.streaming.api.operators.YieldingOperatorFactory; -import org.apache.flink.streaming.api.transformations.ShuffleMode; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; @@ -780,7 +780,7 @@ public class StreamingJobGraphGenerator { StreamPartitioner<?> partitioner = edge.getPartitioner(); ResultPartitionType resultPartitionType; - switch (edge.getShuffleMode()) { + switch (edge.getExchangeMode()) { case PIPELINED: resultPartitionType = ResultPartitionType.PIPELINED_BOUNDED; break; @@ -792,7 +792,7 @@ public class StreamingJobGraphGenerator { break; default: throw new UnsupportedOperationException( - "Data exchange mode " + edge.getShuffleMode() + " is not supported yet."); + "Data exchange mode " + edge.getExchangeMode() + " is not supported yet."); } checkAndResetBufferTimeout(resultPartitionType, edge); @@ -877,7 +877,7 @@ public class StreamingJobGraphGenerator { if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex) && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph) && (edge.getPartitioner() instanceof ForwardPartitioner) - && edge.getShuffleMode() != ShuffleMode.BATCH + && edge.getExchangeMode() != StreamExchangeMode.BATCH && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() && streamGraph.isChainingEnabled())) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java index 6ec9a22..2521539 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java @@ -44,7 +44,7 @@ public class PartitionTransformation<T> extends Transformation<T> { private final StreamPartitioner<T> partitioner; - private final ShuffleMode shuffleMode; + private final StreamExchangeMode exchangeMode; /** * Creates a new {@code PartitionTransformation} from the given input and {@link @@ -54,7 +54,7 @@ public class PartitionTransformation<T> extends Transformation<T> { * @param partitioner The {@code StreamPartitioner} */ public PartitionTransformation(Transformation<T> input, StreamPartitioner<T> partitioner) { - this(input, partitioner, ShuffleMode.UNDEFINED); + this(input, partitioner, StreamExchangeMode.UNDEFINED); } /** @@ -63,14 +63,16 @@ public class PartitionTransformation<T> extends Transformation<T> { * * @param input The input {@code Transformation} * @param partitioner The {@code StreamPartitioner} - * @param shuffleMode The {@code ShuffleMode} + * @param exchangeMode The {@code ShuffleMode} */ public PartitionTransformation( - Transformation<T> input, StreamPartitioner<T> partitioner, ShuffleMode shuffleMode) { + Transformation<T> input, + StreamPartitioner<T> partitioner, + StreamExchangeMode exchangeMode) { super("Partition", input.getOutputType(), input.getParallelism()); this.input = input; this.partitioner = partitioner; - this.shuffleMode = checkNotNull(shuffleMode); + this.exchangeMode = checkNotNull(exchangeMode); } /** @@ -81,9 +83,9 @@ public class PartitionTransformation<T> extends Transformation<T> { return partitioner; } - /** Returns the {@link ShuffleMode} of this {@link PartitionTransformation}. */ - public ShuffleMode getShuffleMode() { - return shuffleMode; + /** Returns the {@link StreamExchangeMode} of this {@link PartitionTransformation}. */ + public StreamExchangeMode getExchangeMode() { + return exchangeMode; } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/ShuffleMode.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java similarity index 89% rename from flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/ShuffleMode.java rename to flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java index cc98cc7..4637c07 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/ShuffleMode.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java @@ -22,7 +22,7 @@ import org.apache.flink.annotation.PublicEvolving; /** The shuffle mode defines the data exchange mode between operators. */ @PublicEvolving -public enum ShuffleMode { +public enum StreamExchangeMode { /** * Producer and consumer are online at the same time. Produced data is received by consumer * immediately. @@ -37,8 +37,8 @@ public enum ShuffleMode { /** * The shuffle mode is undefined. It leaves it up to the framework to decide the shuffle mode. - * The framework will pick one of {@link ShuffleMode#BATCH} or {@link ShuffleMode#PIPELINED} in - * the end. + * The framework will pick one of {@link StreamExchangeMode#BATCH} or {@link + * StreamExchangeMode#PIPELINED} in the end. */ UNDEFINED } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/PartitionTransformationTranslator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/PartitionTransformationTranslator.java index c459c9e..f427cae 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/PartitionTransformationTranslator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/PartitionTransformationTranslator.java @@ -76,7 +76,7 @@ public class PartitionTransformationTranslator<OUT> inputId, virtualId, transformation.getPartitioner(), - transformation.getShuffleMode()); + transformation.getExchangeMode()); resultIds.add(virtualId); } return resultIds; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index a251b23..bfb82f3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -82,7 +82,7 @@ import org.apache.flink.streaming.api.operators.YieldingOperatorFactory; import org.apache.flink.streaming.api.transformations.MultipleInputTransformation; import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation; -import org.apache.flink.streaming.api.transformations.ShuffleMode; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner; @@ -593,7 +593,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { assertTrue(operatorFactory instanceof SourceOperatorFactory); } - /** Test setting shuffle mode to {@link ShuffleMode#PIPELINED}. */ + /** Test setting shuffle mode to {@link StreamExchangeMode#PIPELINED}. */ @Test public void testShuffleModePipelined() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -606,7 +606,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { new PartitionTransformation<>( sourceDataStream.getTransformation(), new ForwardPartitioner<>(), - ShuffleMode.PIPELINED)); + StreamExchangeMode.PIPELINED)); DataStream<Integer> mapDataStream = partitionAfterSourceDataStream.map(value -> value).setParallelism(1); @@ -616,7 +616,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { new PartitionTransformation<>( mapDataStream.getTransformation(), new RescalePartitioner<>(), - ShuffleMode.PIPELINED)); + StreamExchangeMode.PIPELINED)); partitionAfterMapDataStream.print().setParallelism(2); JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); @@ -633,7 +633,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { sourceAndMapVertex.getProducedDataSets().get(0).getResultType()); } - /** Test setting shuffle mode to {@link ShuffleMode#BATCH}. */ + /** Test setting shuffle mode to {@link StreamExchangeMode#BATCH}. */ @Test public void testShuffleModeBatch() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -646,7 +646,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { new PartitionTransformation<>( sourceDataStream.getTransformation(), new ForwardPartitioner<>(), - ShuffleMode.BATCH)); + StreamExchangeMode.BATCH)); DataStream<Integer> mapDataStream = partitionAfterSourceDataStream.map(value -> value).setParallelism(1); @@ -656,7 +656,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { new PartitionTransformation<>( mapDataStream.getTransformation(), new RescalePartitioner<>(), - ShuffleMode.BATCH)); + StreamExchangeMode.BATCH)); partitionAfterMapDataStream.print().setParallelism(2); JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); @@ -677,7 +677,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { mapVertex.getProducedDataSets().get(0).getResultType()); } - /** Test setting shuffle mode to {@link ShuffleMode#UNDEFINED}. */ + /** Test setting shuffle mode to {@link StreamExchangeMode#UNDEFINED}. */ @Test public void testShuffleModeUndefined() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -690,7 +690,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { new PartitionTransformation<>( sourceDataStream.getTransformation(), new ForwardPartitioner<>(), - ShuffleMode.UNDEFINED)); + StreamExchangeMode.UNDEFINED)); DataStream<Integer> mapDataStream = partitionAfterSourceDataStream.map(value -> value).setParallelism(1); @@ -700,7 +700,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { new PartitionTransformation<>( mapDataStream.getTransformation(), new RescalePartitioner<>(), - ShuffleMode.UNDEFINED)); + StreamExchangeMode.UNDEFINED)); partitionAfterMapDataStream.print().setParallelism(2); JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph()); @@ -784,15 +784,15 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { @Test(expected = UnsupportedOperationException.class) public void testConflictShuffleModeWithBufferTimeout() { - testCompatibleShuffleModeWithBufferTimeout(ShuffleMode.BATCH); + testCompatibleShuffleModeWithBufferTimeout(StreamExchangeMode.BATCH); } @Test public void testNormalShuffleModeWithBufferTimeout() { - testCompatibleShuffleModeWithBufferTimeout(ShuffleMode.PIPELINED); + testCompatibleShuffleModeWithBufferTimeout(StreamExchangeMode.PIPELINED); } - private void testCompatibleShuffleModeWithBufferTimeout(ShuffleMode shuffleMode) { + private void testCompatibleShuffleModeWithBufferTimeout(StreamExchangeMode exchangeMode) { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setBufferTimeout(100); @@ -801,7 +801,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { new PartitionTransformation<>( sourceDataStream.getTransformation(), new RebalancePartitioner<>(), - shuffleMode); + exchangeMode); DataStream<Integer> partitionStream = new DataStream<>(env, transformation); partitionStream.map(value -> value).print(); @@ -1396,7 +1396,7 @@ public class StreamingJobGraphGeneratorTest extends TestLogger { new PartitionTransformation<>( source2.getTransformation(), new RebalancePartitioner<>(), - ShuffleMode.BATCH)); + StreamExchangeMode.BATCH)); partitioned.map(v -> v).name("map2"); return env.getStreamGraph(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.java index 5170592..7ee6462 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.java @@ -23,7 +23,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.transformations.PartitionTransformation; -import org.apache.flink.streaming.api.transformations.ShuffleMode; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner; import org.apache.flink.util.TestLogger; @@ -148,7 +148,7 @@ public class StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest extends Te new PartitionTransformation<>( source.getTransformation(), new ForwardPartitioner<>(), - ShuffleMode.PIPELINED)); + StreamExchangeMode.PIPELINED)); forward.map(i -> i).startNewChain().setParallelism(1); final StreamGraph streamGraph = env.getStreamGraph(); streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING); @@ -178,7 +178,7 @@ public class StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest extends Te new PartitionTransformation<>( source.getTransformation(), new ForwardPartitioner<>(), - ShuffleMode.UNDEFINED)); + StreamExchangeMode.UNDEFINED)); final DataStream<Integer> map1 = forward.map(i -> i).startNewChain().setParallelism(1); final DataStream<Integer> rescale = @@ -187,7 +187,7 @@ public class StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest extends Te new PartitionTransformation<>( map1.getTransformation(), new RescalePartitioner<>(), - ShuffleMode.UNDEFINED)); + StreamExchangeMode.UNDEFINED)); final DataStream<Integer> map2 = rescale.map(i -> i).setParallelism(2); map2.rebalance().print().setParallelism(2); diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java index 9e4f2d0..f6b5216 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.plan.nodes.exec; import org.apache.flink.api.dag.Transformation; -import org.apache.flink.streaming.api.transformations.ShuffleMode; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; import org.apache.flink.table.api.TableException; import org.apache.flink.table.delegation.Planner; import org.apache.flink.table.planner.plan.nodes.exec.serde.ExecNodeGraphJsonPlanGenerator.JsonPlanEdge; @@ -45,22 +45,25 @@ public class ExecEdge { private final ExecNode<?> target; /** The {@link Shuffle} on this edge from source to target. */ private final Shuffle shuffle; - /** The {@link ShuffleMode} defines the data exchange mode on this edge. */ - private final ShuffleMode shuffleMode; + /** The {@link StreamExchangeMode} defines the data exchange mode on this edge. */ + private final StreamExchangeMode exchangeMode; public ExecEdge( - ExecNode<?> source, ExecNode<?> target, Shuffle shuffle, ShuffleMode shuffleMode) { + ExecNode<?> source, + ExecNode<?> target, + Shuffle shuffle, + StreamExchangeMode exchangeMode) { this.source = checkNotNull(source); this.target = checkNotNull(target); this.shuffle = checkNotNull(shuffle); - this.shuffleMode = checkNotNull(shuffleMode); + this.exchangeMode = checkNotNull(exchangeMode); // TODO once FLINK-21224 [Remove BatchExecExchange and StreamExecExchange, and replace their // functionality with ExecEdge] is finished, we should remove the following validation. if (shuffle.getType() != Shuffle.Type.FORWARD) { throw new TableException("Only FORWARD shuffle is supported now."); } - if (shuffleMode != ShuffleMode.PIPELINED) { + if (exchangeMode != StreamExchangeMode.PIPELINED) { throw new TableException("Only PIPELINED shuffle mode is supported now."); } } @@ -77,8 +80,8 @@ public class ExecEdge { return shuffle; } - public ShuffleMode getShuffleMode() { - return shuffleMode; + public StreamExchangeMode getExchangeMode() { + return exchangeMode; } /** Returns the output {@link LogicalType} of the data passing this edge. */ @@ -95,8 +98,8 @@ public class ExecEdge { + target.getDescription() + ", shuffle=" + shuffle - + ", shuffleMode=" - + shuffleMode + + ", exchangeMode=" + + exchangeMode + '}'; } @@ -109,7 +112,7 @@ public class ExecEdge { private ExecNode<?> source; private ExecNode<?> target; private Shuffle shuffle = FORWARD_SHUFFLE; - private ShuffleMode shuffleMode = ShuffleMode.PIPELINED; + private StreamExchangeMode exchangeMode = StreamExchangeMode.PIPELINED; public Builder source(ExecNode<?> source) { this.source = source; @@ -131,13 +134,13 @@ public class ExecEdge { return shuffle(fromRequiredDistribution(requiredDistribution)); } - public Builder shuffleMode(ShuffleMode shuffleMode) { - this.shuffleMode = shuffleMode; + public Builder exchangeMode(StreamExchangeMode exchangeMode) { + this.exchangeMode = exchangeMode; return this; } public ExecEdge build() { - return new ExecEdge(source, target, shuffle, shuffleMode); + return new ExecEdge(source, target, shuffle, exchangeMode); } private Shuffle fromRequiredDistribution( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java index 3d1b5e9..3a5b3aa 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java @@ -24,7 +24,7 @@ import org.apache.flink.api.dag.Transformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode; import org.apache.flink.streaming.api.transformations.PartitionTransformation; -import org.apache.flink.streaming.api.transformations.ShuffleMode; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; @@ -58,14 +58,14 @@ import java.util.Optional; public class BatchExecExchange extends CommonExecExchange implements BatchExecNode<RowData> { // the required shuffle mode for reusable BatchExecExchange // if it's None, use value from configuration - @Nullable private ShuffleMode requiredShuffleMode; + @Nullable private StreamExchangeMode requiredExchangeMode; public BatchExecExchange(InputProperty inputProperty, RowType outputType, String description) { super(getNewNodeId(), Collections.singletonList(inputProperty), outputType, description); } - public void setRequiredShuffleMode(@Nullable ShuffleMode requiredShuffleMode) { - this.requiredShuffleMode = requiredShuffleMode; + public void setRequiredExchangeMode(@Nullable StreamExchangeMode requiredExchangeMode) { + this.requiredExchangeMode = requiredExchangeMode; } @Override @@ -89,7 +89,7 @@ public class BatchExecExchange extends CommonExecExchange implements BatchExecNo sb.append("[").append(String.join(", ", fieldNames)).append("]"); } sb.append("]"); - if (requiredShuffleMode == ShuffleMode.BATCH) { + if (requiredExchangeMode == StreamExchangeMode.BATCH) { sb.append(", shuffle_mode=[BATCH]"); } return String.format("Exchange(%s)", sb.toString()); @@ -141,30 +141,30 @@ public class BatchExecExchange extends CommonExecExchange implements BatchExecNo throw new TableException(distributionType + "is not supported now!"); } - final ShuffleMode shuffleMode = - getShuffleMode(planner.getTableConfig().getConfiguration(), requiredShuffleMode); + final StreamExchangeMode exchangeMode = + getExchangeMode(planner.getTableConfig().getConfiguration(), requiredExchangeMode); final Transformation<RowData> transformation = - new PartitionTransformation<>(inputTransform, partitioner, shuffleMode); + new PartitionTransformation<>(inputTransform, partitioner, exchangeMode); transformation.setParallelism(parallelism); transformation.setOutputType(InternalTypeInfo.of(getOutputType())); return transformation; } - public static ShuffleMode getShuffleMode( - Configuration config, @Nullable ShuffleMode requiredShuffleMode) { - if (requiredShuffleMode == ShuffleMode.BATCH) { - return ShuffleMode.BATCH; + public static StreamExchangeMode getExchangeMode( + Configuration config, @Nullable StreamExchangeMode requiredExchangeMode) { + if (requiredExchangeMode == StreamExchangeMode.BATCH) { + return StreamExchangeMode.BATCH; } if (config.getString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE) .equalsIgnoreCase(GlobalDataExchangeMode.ALL_EDGES_BLOCKING.toString())) { - return ShuffleMode.BATCH; + return StreamExchangeMode.BATCH; } else { - return ShuffleMode.UNDEFINED; + return StreamExchangeMode.UNDEFINED; } } @VisibleForTesting - public Optional<ShuffleMode> getRequiredShuffleMode() { - return Optional.ofNullable(requiredShuffleMode); + public Optional<StreamExchangeMode> getRequiredExchangeMode() { + return Optional.ofNullable(requiredExchangeMode); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DeadlockBreakupProcessor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DeadlockBreakupProcessor.java index 970b7c6..fee1411 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DeadlockBreakupProcessor.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DeadlockBreakupProcessor.java @@ -18,7 +18,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.processor; -import org.apache.flink.streaming.api.transformations.ShuffleMode; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; import org.apache.flink.table.api.TableException; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph; import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; @@ -43,7 +43,7 @@ public class DeadlockBreakupProcessor implements ExecNodeGraphProcessor { new InputPriorityConflictResolver( execGraph.getRootNodes(), InputProperty.DamBehavior.END_INPUT, - ShuffleMode.BATCH, + StreamExchangeMode.BATCH, context.getPlanner().getTableConfig().getConfiguration()); resolver.detectAndResolve(); return execGraph; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessor.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessor.java index 8f4eb5a..54a79e3 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessor.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessor.java @@ -21,8 +21,8 @@ package org.apache.flink.table.planner.plan.nodes.exec.processor; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.transformations.ShuffleMode; import org.apache.flink.streaming.api.transformations.SourceTransformation; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph; @@ -78,7 +78,7 @@ public class MultipleInputNodeCreationProcessor implements ExecNodeGraphProcesso new InputPriorityConflictResolver( execGraph.getRootNodes(), InputProperty.DamBehavior.BLOCKING, - ShuffleMode.PIPELINED, + StreamExchangeMode.PIPELINED, context.getPlanner().getTableConfig().getConfiguration()); resolver.detectAndResolve(); } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java index b5c5011..bc9a2c3 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java @@ -20,7 +20,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.processor.utils; import org.apache.flink.annotation.Internal; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.transformations.ShuffleMode; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; @@ -40,7 +40,7 @@ import java.util.List; @Internal public class InputPriorityConflictResolver extends InputPriorityGraphGenerator { - private final ShuffleMode shuffleMode; + private final StreamExchangeMode exchangeMode; private final Configuration configuration; /** @@ -49,16 +49,16 @@ public class InputPriorityConflictResolver extends InputPriorityGraphGenerator { * @param roots the first layer of nodes on the output side of the graph * @param safeDamBehavior when checking for conflicts we'll ignore the edges with {@link * InputProperty.DamBehavior} stricter or equal than this - * @param shuffleMode when a conflict occurs we'll insert an {@link BatchExecExchange} node with - * this shuffleMode to resolve conflict + * @param exchangeMode when a conflict occurs we'll insert an {@link BatchExecExchange} node + * with this shuffleMode to resolve conflict */ public InputPriorityConflictResolver( List<ExecNode<?>> roots, InputProperty.DamBehavior safeDamBehavior, - ShuffleMode shuffleMode, + StreamExchangeMode exchangeMode, Configuration configuration) { super(roots, Collections.emptySet(), safeDamBehavior); - this.shuffleMode = shuffleMode; + this.exchangeMode = exchangeMode; this.configuration = configuration; } @@ -86,7 +86,7 @@ public class InputPriorityConflictResolver extends InputPriorityGraphGenerator { BatchExecExchange newExchange = new BatchExecExchange( inputProperty, (RowType) exchange.getOutputType(), "Exchange"); - newExchange.setRequiredShuffleMode(shuffleMode); + newExchange.setRequiredExchangeMode(exchangeMode); newExchange.setInputEdges(exchange.getInputEdges()); newNode = newExchange; } else { @@ -96,7 +96,7 @@ public class InputPriorityConflictResolver extends InputPriorityGraphGenerator { inputProperty, (RowType) exchange.getOutputType(), exchange.getDescription()); - newExchange.setRequiredShuffleMode(shuffleMode); + newExchange.setRequiredExchangeMode(exchangeMode); newExchange.setInputEdges(exchange.getInputEdges()); newNode = newExchange; } @@ -137,7 +137,7 @@ public class InputPriorityConflictResolver extends InputPriorityGraphGenerator { BatchExecExchange exchange = new BatchExecExchange( newInputProperty, (RowType) inputNode.getOutputType(), "Exchange"); - exchange.setRequiredShuffleMode(shuffleMode); + exchange.setRequiredExchangeMode(exchangeMode); ExecEdge execEdge = ExecEdge.builder().source(inputNode).target(exchange).build(); exchange.setInputEdges(Collections.singletonList(execEdge)); return exchange; @@ -168,7 +168,8 @@ public class InputPriorityConflictResolver extends InputPriorityGraphGenerator { } private InputProperty.DamBehavior getDamBehavior() { - if (BatchExecExchange.getShuffleMode(configuration, shuffleMode) == ShuffleMode.BATCH) { + if (BatchExecExchange.getExchangeMode(configuration, exchangeMode) + == StreamExchangeMode.BATCH) { return InputProperty.DamBehavior.BLOCKING; } else { return InputProperty.DamBehavior.PIPELINED; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonPlanGenerator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonPlanGenerator.java index f18dcb4..953ea6b 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonPlanGenerator.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonPlanGenerator.java @@ -18,7 +18,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.serde; -import org.apache.flink.streaming.api.transformations.ShuffleMode; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; import org.apache.flink.table.api.TableException; import org.apache.flink.table.catalog.ObjectIdentifier; import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; @@ -293,7 +293,7 @@ public class ExecNodeGraphJsonPlanGenerator { .source(source) .target(target) .shuffle(edge.getShuffle()) - .shuffleMode(edge.getShuffleMode()) + .exchangeMode(edge.getExchangeMode()) .build(); idToInputEdges .computeIfAbsent(target.getId(), n -> new ArrayList<>()) @@ -343,20 +343,20 @@ public class ExecNodeGraphJsonPlanGenerator { @JsonSerialize(using = ShuffleJsonSerializer.class) @JsonDeserialize(using = ShuffleJsonDeserializer.class) private final ExecEdge.Shuffle shuffle; - /** The {@link ShuffleMode} defines the data exchange mode on this edge. */ + /** The {@link StreamExchangeMode} on this edge. */ @JsonProperty(FIELD_NAME_SHUFFLE_MODE) - private final ShuffleMode shuffleMode; + private final StreamExchangeMode exchangeMode; @JsonCreator public JsonPlanEdge( @JsonProperty(FIELD_NAME_SOURCE) int sourceId, @JsonProperty(FIELD_NAME_TARGET) int targetId, @JsonProperty(FIELD_NAME_SHUFFLE) ExecEdge.Shuffle shuffle, - @JsonProperty(FIELD_NAME_SHUFFLE_MODE) ShuffleMode shuffleMode) { + @JsonProperty(FIELD_NAME_SHUFFLE_MODE) StreamExchangeMode exchangeMode) { this.sourceId = sourceId; this.targetId = targetId; this.shuffle = shuffle; - this.shuffleMode = shuffleMode; + this.exchangeMode = exchangeMode; } @JsonIgnore @@ -375,8 +375,8 @@ public class ExecNodeGraphJsonPlanGenerator { } @JsonIgnore - public ShuffleMode getShuffleMode() { - return shuffleMode; + public StreamExchangeMode getExchangeMode() { + return exchangeMode; } /** Build {@link JsonPlanEdge} from an {@link ExecEdge}. */ @@ -385,7 +385,7 @@ public class ExecNodeGraphJsonPlanGenerator { execEdge.getSource().getId(), execEdge.getTarget().getId(), execEdge.getShuffle(), - execEdge.getShuffleMode()); + execEdge.getExchangeMode()); } } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java index cce7280..4871560 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java @@ -84,7 +84,7 @@ public class ExecutorUtils { } private static GlobalDataExchangeMode getGlobalDataExchangeMode(TableConfig tableConfig) { - return ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode( + return StreamExchangeModeUtils.getShuffleModeAsGlobalDataExchangeMode( tableConfig.getConfiguration()); } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShuffleModeUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtils.java similarity index 98% rename from flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShuffleModeUtils.java rename to flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtils.java index 54dd0cd..c2e01de 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShuffleModeUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtils.java @@ -23,7 +23,7 @@ import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode; import org.apache.flink.table.api.config.ExecutionConfigOptions; /** Utility class to load job-wide shuffle mode. */ -public class ShuffleModeUtils { +public class StreamExchangeModeUtils { static final String ALL_EDGES_BLOCKING_LEGACY = "batch"; diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala index 81ae96b..fc00199 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala @@ -19,11 +19,11 @@ package org.apache.flink.table.planner.plan.nodes.physical.batch import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode -import org.apache.flink.streaming.api.transformations.ShuffleMode +import org.apache.flink.streaming.api.transformations.StreamExchangeMode import org.apache.flink.table.api.config.ExecutionConfigOptions import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange -import org.apache.flink.table.planner.plan.nodes.exec.{InputProperty, ExecNode} +import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty} import org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalExchange import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil @@ -60,7 +60,7 @@ class BatchPhysicalExchange( throw new UnsupportedOperationException("Range sort is not supported.") } - val damBehavior = if (getShuffleMode eq ShuffleMode.BATCH) { + val damBehavior = if (getExchangeMode eq StreamExchangeMode.BATCH) { InputProperty.DamBehavior.BLOCKING } else { InputProperty.DamBehavior.PIPELINED @@ -72,13 +72,13 @@ class BatchPhysicalExchange( .build } - private def getShuffleMode: ShuffleMode = { + private def getExchangeMode: StreamExchangeMode = { val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this) if (tableConfig.getConfiguration.getString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE) .equalsIgnoreCase(GlobalDataExchangeMode.ALL_EDGES_BLOCKING.toString)) { - ShuffleMode.BATCH + StreamExchangeMode.BATCH } else { - ShuffleMode.UNDEFINED + StreamExchangeMode.UNDEFINED } } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java index b9d6f9b..73f6228 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java @@ -19,7 +19,7 @@ package org.apache.flink.table.planner.plan.nodes.exec.processor.utils; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.transformations.ShuffleMode; +import org.apache.flink.streaming.api.transformations.StreamExchangeMode; import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; @@ -78,21 +78,21 @@ public class InputPriorityConflictResolverTest { new InputPriorityConflictResolver( Collections.singletonList(nodes[7]), InputProperty.DamBehavior.END_INPUT, - ShuffleMode.BATCH, + StreamExchangeMode.BATCH, new Configuration()); resolver.detectAndResolve(); Assert.assertEquals(nodes[1], nodes[7].getInputNodes().get(0)); Assert.assertEquals(nodes[2], nodes[7].getInputNodes().get(1)); Assert.assertTrue(nodes[7].getInputNodes().get(2) instanceof BatchExecExchange); Assert.assertEquals( - Optional.of(ShuffleMode.BATCH), - ((BatchExecExchange) nodes[7].getInputNodes().get(2)).getRequiredShuffleMode()); + Optional.of(StreamExchangeMode.BATCH), + ((BatchExecExchange) nodes[7].getInputNodes().get(2)).getRequiredExchangeMode()); Assert.assertEquals( nodes[3], nodes[7].getInputNodes().get(2).getInputEdges().get(0).getSource()); Assert.assertTrue(nodes[7].getInputNodes().get(3) instanceof BatchExecExchange); Assert.assertEquals( - Optional.of(ShuffleMode.BATCH), - ((BatchExecExchange) nodes[7].getInputNodes().get(3)).getRequiredShuffleMode()); + Optional.of(StreamExchangeMode.BATCH), + ((BatchExecExchange) nodes[7].getInputNodes().get(3)).getRequiredExchangeMode()); Assert.assertEquals( nodes[4], nodes[7].getInputNodes().get(3).getInputEdges().get(0).getSource()); Assert.assertEquals(nodes[5], nodes[7].getInputNodes().get(4)); @@ -117,7 +117,7 @@ public class InputPriorityConflictResolverTest { .build(), (RowType) nodes[0].getOutputType(), "Exchange"); - exchange.setRequiredShuffleMode(ShuffleMode.BATCH); + exchange.setRequiredExchangeMode(StreamExchangeMode.BATCH); ExecEdge execEdge = ExecEdge.builder().source(nodes[0]).target(exchange).build(); exchange.setInputEdges(Collections.singletonList(execEdge)); @@ -128,7 +128,7 @@ public class InputPriorityConflictResolverTest { new InputPriorityConflictResolver( Collections.singletonList(nodes[1]), InputProperty.DamBehavior.END_INPUT, - ShuffleMode.BATCH, + StreamExchangeMode.BATCH, new Configuration()); resolver.detectAndResolve(); @@ -140,7 +140,8 @@ public class InputPriorityConflictResolverTest { execNode -> { Assert.assertTrue(execNode instanceof BatchExecExchange); BatchExecExchange e = (BatchExecExchange) execNode; - Assert.assertEquals(Optional.of(ShuffleMode.BATCH), e.getRequiredShuffleMode()); + Assert.assertEquals( + Optional.of(StreamExchangeMode.BATCH), e.getRequiredExchangeMode()); Assert.assertEquals(nodes[0], e.getInputEdges().get(0).getSource()); }; checkExchange.accept(input0); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/ShuffleModeUtilsTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtilsTest.java similarity index 75% rename from flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/ShuffleModeUtilsTest.java rename to flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtilsTest.java index 33ecddb..c0078da 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/ShuffleModeUtilsTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtilsTest.java @@ -27,8 +27,8 @@ import org.junit.Test; import static org.junit.Assert.assertEquals; -/** Tests for {@link ShuffleModeUtils}. */ -public class ShuffleModeUtilsTest extends TestLogger { +/** Tests for {@link StreamExchangeModeUtils}. */ +public class StreamExchangeModeUtilsTest extends TestLogger { @Test public void testGetValidShuffleMode() { @@ -39,28 +39,28 @@ public class ShuffleModeUtilsTest extends TestLogger { GlobalDataExchangeMode.ALL_EDGES_BLOCKING.toString()); assertEquals( GlobalDataExchangeMode.ALL_EDGES_BLOCKING, - ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration)); + StreamExchangeModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration)); configuration.setString( ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE, GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED.toString()); assertEquals( GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED, - ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration)); + StreamExchangeModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration)); configuration.setString( ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE, GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED.toString()); assertEquals( GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED, - ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration)); + StreamExchangeModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration)); configuration.setString( ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE, GlobalDataExchangeMode.ALL_EDGES_PIPELINED.toString()); assertEquals( GlobalDataExchangeMode.ALL_EDGES_PIPELINED, - ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration)); + StreamExchangeModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration)); } @Test @@ -69,17 +69,17 @@ public class ShuffleModeUtilsTest extends TestLogger { configuration.setString( ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE, - ShuffleModeUtils.ALL_EDGES_BLOCKING_LEGACY); + StreamExchangeModeUtils.ALL_EDGES_BLOCKING_LEGACY); assertEquals( GlobalDataExchangeMode.ALL_EDGES_BLOCKING, - ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration)); + StreamExchangeModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration)); configuration.setString( ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE, - ShuffleModeUtils.ALL_EDGES_PIPELINED_LEGACY); + StreamExchangeModeUtils.ALL_EDGES_PIPELINED_LEGACY); assertEquals( GlobalDataExchangeMode.ALL_EDGES_PIPELINED, - ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration)); + StreamExchangeModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration)); } @Test @@ -90,12 +90,12 @@ public class ShuffleModeUtilsTest extends TestLogger { ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE, "Forward_edges_PIPELINED"); assertEquals( GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED, - ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration)); + StreamExchangeModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration)); configuration.setString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE, "Pipelined"); assertEquals( GlobalDataExchangeMode.ALL_EDGES_PIPELINED, - ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration)); + StreamExchangeModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration)); } @Test @@ -103,13 +103,13 @@ public class ShuffleModeUtilsTest extends TestLogger { final Configuration configuration = new Configuration(); assertEquals( GlobalDataExchangeMode.ALL_EDGES_BLOCKING, - ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration)); + StreamExchangeModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration)); } @Test(expected = IllegalArgumentException.class) public void testGetInvalidShuffleMode() { final Configuration configuration = new Configuration(); configuration.setString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE, "invalid-value"); - ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration); + StreamExchangeModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration); } }