This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4e65322dc1b5f80a7f3a42f0f205f978357daa40
Author: Timo Walther <twal...@apache.org>
AuthorDate: Mon Jul 19 14:22:18 2021 +0200

    [FLINK-23402][streaming-java] Refactor ShuffleMode to StreamExchangeMode
---
 .../api/graph/GlobalDataExchangeMode.java          |  4 +--
 .../flink/streaming/api/graph/StreamEdge.java      | 18 ++++++------
 .../flink/streaming/api/graph/StreamGraph.java     | 23 ++++++++--------
 .../api/graph/StreamingJobGraphGenerator.java      |  8 +++---
 .../transformations/PartitionTransformation.java   | 18 ++++++------
 .../{ShuffleMode.java => StreamExchangeMode.java}  |  6 ++--
 .../PartitionTransformationTranslator.java         |  2 +-
 .../api/graph/StreamingJobGraphGeneratorTest.java  | 30 ++++++++++----------
 ...aphGeneratorWithGlobalDataExchangeModeTest.java |  8 +++---
 .../table/planner/plan/nodes/exec/ExecEdge.java    | 31 +++++++++++----------
 .../plan/nodes/exec/batch/BatchExecExchange.java   | 32 +++++++++++-----------
 .../exec/processor/DeadlockBreakupProcessor.java   |  4 +--
 .../MultipleInputNodeCreationProcessor.java        |  4 +--
 .../utils/InputPriorityConflictResolver.java       | 21 +++++++-------
 .../exec/serde/ExecNodeGraphJsonPlanGenerator.java | 18 ++++++------
 .../flink/table/planner/utils/ExecutorUtils.java   |  2 +-
 ...ModeUtils.java => StreamExchangeModeUtils.java} |  2 +-
 .../physical/batch/BatchPhysicalExchange.scala     | 12 ++++----
 .../utils/InputPriorityConflictResolverTest.java   | 19 +++++++------
 ...sTest.java => StreamExchangeModeUtilsTest.java} | 28 +++++++++----------
 20 files changed, 149 insertions(+), 141 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java
index b2b5f21..3118f7f 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/GlobalDataExchangeMode.java
@@ -19,13 +19,13 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
 
 /**
  * This mode decides the default {@link ResultPartitionType} of job edges. 
Note that this only
- * affects job edges which are {@link ShuffleMode#UNDEFINED}.
+ * affects job edges which are {@link StreamExchangeMode#UNDEFINED}.
  */
 public enum GlobalDataExchangeMode {
     /** Set all job edges to be {@link ResultPartitionType#BLOCKING}. */
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
index f76edd5..5ca627c 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
@@ -18,7 +18,7 @@
 package org.apache.flink.streaming.api.graph;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
 import org.apache.flink.util.OutputTag;
 
@@ -58,7 +58,7 @@ public class StreamEdge implements Serializable {
     /** The name of the operator in the target vertex. */
     private final String targetOperatorName;
 
-    private final ShuffleMode shuffleMode;
+    private final StreamExchangeMode exchangeMode;
 
     private long bufferTimeout;
 
@@ -78,7 +78,7 @@ public class StreamEdge implements Serializable {
                 ALWAYS_FLUSH_BUFFER_TIMEOUT,
                 outputPartitioner,
                 outputTag,
-                ShuffleMode.UNDEFINED);
+                StreamExchangeMode.UNDEFINED);
     }
 
     public StreamEdge(
@@ -87,7 +87,7 @@ public class StreamEdge implements Serializable {
             int typeNumber,
             StreamPartitioner<?> outputPartitioner,
             OutputTag outputTag,
-            ShuffleMode shuffleMode) {
+            StreamExchangeMode exchangeMode) {
 
         this(
                 sourceVertex,
@@ -96,7 +96,7 @@ public class StreamEdge implements Serializable {
                 sourceVertex.getBufferTimeout(),
                 outputPartitioner,
                 outputTag,
-                shuffleMode);
+                exchangeMode);
     }
 
     public StreamEdge(
@@ -106,7 +106,7 @@ public class StreamEdge implements Serializable {
             long bufferTimeout,
             StreamPartitioner<?> outputPartitioner,
             OutputTag outputTag,
-            ShuffleMode shuffleMode) {
+            StreamExchangeMode exchangeMode) {
 
         this.sourceId = sourceVertex.getId();
         this.targetId = targetVertex.getId();
@@ -116,7 +116,7 @@ public class StreamEdge implements Serializable {
         this.outputTag = outputTag;
         this.sourceOperatorName = sourceVertex.getOperatorName();
         this.targetOperatorName = targetVertex.getOperatorName();
-        this.shuffleMode = checkNotNull(shuffleMode);
+        this.exchangeMode = checkNotNull(exchangeMode);
         this.edgeId =
                 sourceVertex + "_" + targetVertex + "_" + typeNumber + "_" + 
outputPartitioner;
     }
@@ -141,8 +141,8 @@ public class StreamEdge implements Serializable {
         return outputPartitioner;
     }
 
-    public ShuffleMode getShuffleMode() {
-        return shuffleMode;
+    public StreamExchangeMode getExchangeMode() {
+        return exchangeMode;
     }
 
     public void setPartitioner(StreamPartitioner<?> partitioner) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index e3f284f..a641898 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -47,7 +47,7 @@ import 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
 import org.apache.flink.streaming.api.operators.OutputFormatOperatorFactory;
 import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
-import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -113,7 +113,8 @@ public class StreamGraph implements Pipeline {
     private Set<Integer> sources;
     private Set<Integer> sinks;
     private Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes;
-    private Map<Integer, Tuple3<Integer, StreamPartitioner<?>, ShuffleMode>> 
virtualPartitionNodes;
+    private Map<Integer, Tuple3<Integer, StreamPartitioner<?>, 
StreamExchangeMode>>
+            virtualPartitionNodes;
 
     protected Map<Integer, String> vertexIDtoBrokerID;
     protected Map<Integer, Long> vertexIDtoLoopTimeout;
@@ -555,14 +556,14 @@ public class StreamGraph implements Pipeline {
             Integer originalId,
             Integer virtualId,
             StreamPartitioner<?> partitioner,
-            ShuffleMode shuffleMode) {
+            StreamExchangeMode exchangeMode) {
 
         if (virtualPartitionNodes.containsKey(virtualId)) {
             throw new IllegalStateException(
                     "Already has virtual partition node with id " + virtualId);
         }
 
-        virtualPartitionNodes.put(virtualId, new Tuple3<>(originalId, 
partitioner, shuffleMode));
+        virtualPartitionNodes.put(virtualId, new Tuple3<>(originalId, 
partitioner, exchangeMode));
     }
 
     /** Determines the slot sharing group of an operation across virtual 
nodes. */
@@ -597,7 +598,7 @@ public class StreamGraph implements Pipeline {
             StreamPartitioner<?> partitioner,
             List<String> outputNames,
             OutputTag outputTag,
-            ShuffleMode shuffleMode) {
+            StreamExchangeMode exchangeMode) {
 
         if (virtualSideOutputNodes.containsKey(upStreamVertexID)) {
             int virtualId = upStreamVertexID;
@@ -612,14 +613,14 @@ public class StreamGraph implements Pipeline {
                     partitioner,
                     null,
                     outputTag,
-                    shuffleMode);
+                    exchangeMode);
         } else if (virtualPartitionNodes.containsKey(upStreamVertexID)) {
             int virtualId = upStreamVertexID;
             upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;
             if (partitioner == null) {
                 partitioner = virtualPartitionNodes.get(virtualId).f1;
             }
-            shuffleMode = virtualPartitionNodes.get(virtualId).f2;
+            exchangeMode = virtualPartitionNodes.get(virtualId).f2;
             addEdgeInternal(
                     upStreamVertexID,
                     downStreamVertexID,
@@ -627,7 +628,7 @@ public class StreamGraph implements Pipeline {
                     partitioner,
                     outputNames,
                     outputTag,
-                    shuffleMode);
+                    exchangeMode);
         } else {
             StreamNode upstreamNode = getStreamNode(upStreamVertexID);
             StreamNode downstreamNode = getStreamNode(downStreamVertexID);
@@ -657,8 +658,8 @@ public class StreamGraph implements Pipeline {
                 }
             }
 
-            if (shuffleMode == null) {
-                shuffleMode = ShuffleMode.UNDEFINED;
+            if (exchangeMode == null) {
+                exchangeMode = StreamExchangeMode.UNDEFINED;
             }
 
             StreamEdge edge =
@@ -668,7 +669,7 @@ public class StreamGraph implements Pipeline {
                             typeNumber,
                             partitioner,
                             outputTag,
-                            shuffleMode);
+                            exchangeMode);
 
             getStreamNode(edge.getSourceId()).addOutEdge(edge);
             getStreamNode(edge.getTargetId()).addInEdge(edge);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 6e96655..a4c0936 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -62,7 +62,7 @@ import 
org.apache.flink.streaming.api.operators.SourceOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.UdfStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
-import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.streaming.runtime.partitioner.CustomPartitionerWrapper;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -780,7 +780,7 @@ public class StreamingJobGraphGenerator {
         StreamPartitioner<?> partitioner = edge.getPartitioner();
 
         ResultPartitionType resultPartitionType;
-        switch (edge.getShuffleMode()) {
+        switch (edge.getExchangeMode()) {
             case PIPELINED:
                 resultPartitionType = ResultPartitionType.PIPELINED_BOUNDED;
                 break;
@@ -792,7 +792,7 @@ public class StreamingJobGraphGenerator {
                 break;
             default:
                 throw new UnsupportedOperationException(
-                        "Data exchange mode " + edge.getShuffleMode() + " is 
not supported yet.");
+                        "Data exchange mode " + edge.getExchangeMode() + " is 
not supported yet.");
         }
 
         checkAndResetBufferTimeout(resultPartitionType, edge);
@@ -877,7 +877,7 @@ public class StreamingJobGraphGenerator {
         if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
                 && areOperatorsChainable(upStreamVertex, downStreamVertex, 
streamGraph)
                 && (edge.getPartitioner() instanceof ForwardPartitioner)
-                && edge.getShuffleMode() != ShuffleMode.BATCH
+                && edge.getExchangeMode() != StreamExchangeMode.BATCH
                 && upStreamVertex.getParallelism() == 
downStreamVertex.getParallelism()
                 && streamGraph.isChainingEnabled())) {
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
index 6ec9a22..2521539 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/PartitionTransformation.java
@@ -44,7 +44,7 @@ public class PartitionTransformation<T> extends 
Transformation<T> {
 
     private final StreamPartitioner<T> partitioner;
 
-    private final ShuffleMode shuffleMode;
+    private final StreamExchangeMode exchangeMode;
 
     /**
      * Creates a new {@code PartitionTransformation} from the given input and 
{@link
@@ -54,7 +54,7 @@ public class PartitionTransformation<T> extends 
Transformation<T> {
      * @param partitioner The {@code StreamPartitioner}
      */
     public PartitionTransformation(Transformation<T> input, 
StreamPartitioner<T> partitioner) {
-        this(input, partitioner, ShuffleMode.UNDEFINED);
+        this(input, partitioner, StreamExchangeMode.UNDEFINED);
     }
 
     /**
@@ -63,14 +63,16 @@ public class PartitionTransformation<T> extends 
Transformation<T> {
      *
      * @param input The input {@code Transformation}
      * @param partitioner The {@code StreamPartitioner}
-     * @param shuffleMode The {@code ShuffleMode}
+     * @param exchangeMode The {@code ShuffleMode}
      */
     public PartitionTransformation(
-            Transformation<T> input, StreamPartitioner<T> partitioner, 
ShuffleMode shuffleMode) {
+            Transformation<T> input,
+            StreamPartitioner<T> partitioner,
+            StreamExchangeMode exchangeMode) {
         super("Partition", input.getOutputType(), input.getParallelism());
         this.input = input;
         this.partitioner = partitioner;
-        this.shuffleMode = checkNotNull(shuffleMode);
+        this.exchangeMode = checkNotNull(exchangeMode);
     }
 
     /**
@@ -81,9 +83,9 @@ public class PartitionTransformation<T> extends 
Transformation<T> {
         return partitioner;
     }
 
-    /** Returns the {@link ShuffleMode} of this {@link 
PartitionTransformation}. */
-    public ShuffleMode getShuffleMode() {
-        return shuffleMode;
+    /** Returns the {@link StreamExchangeMode} of this {@link 
PartitionTransformation}. */
+    public StreamExchangeMode getExchangeMode() {
+        return exchangeMode;
     }
 
     @Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/ShuffleMode.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java
similarity index 89%
rename from 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/ShuffleMode.java
rename to 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java
index cc98cc7..4637c07 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/ShuffleMode.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/StreamExchangeMode.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.PublicEvolving;
 
 /** The shuffle mode defines the data exchange mode between operators. */
 @PublicEvolving
-public enum ShuffleMode {
+public enum StreamExchangeMode {
     /**
      * Producer and consumer are online at the same time. Produced data is 
received by consumer
      * immediately.
@@ -37,8 +37,8 @@ public enum ShuffleMode {
 
     /**
      * The shuffle mode is undefined. It leaves it up to the framework to 
decide the shuffle mode.
-     * The framework will pick one of {@link ShuffleMode#BATCH} or {@link 
ShuffleMode#PIPELINED} in
-     * the end.
+     * The framework will pick one of {@link StreamExchangeMode#BATCH} or 
{@link
+     * StreamExchangeMode#PIPELINED} in the end.
      */
     UNDEFINED
 }
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/PartitionTransformationTranslator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/PartitionTransformationTranslator.java
index c459c9e..f427cae 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/PartitionTransformationTranslator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/PartitionTransformationTranslator.java
@@ -76,7 +76,7 @@ public class PartitionTransformationTranslator<OUT>
                     inputId,
                     virtualId,
                     transformation.getPartitioner(),
-                    transformation.getShuffleMode());
+                    transformation.getExchangeMode());
             resultIds.add(virtualId);
         }
         return resultIds;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index a251b23..bfb82f3 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -82,7 +82,7 @@ import 
org.apache.flink.streaming.api.operators.YieldingOperatorFactory;
 import 
org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
@@ -593,7 +593,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
         assertTrue(operatorFactory instanceof SourceOperatorFactory);
     }
 
-    /** Test setting shuffle mode to {@link ShuffleMode#PIPELINED}. */
+    /** Test setting shuffle mode to {@link StreamExchangeMode#PIPELINED}. */
     @Test
     public void testShuffleModePipelined() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -606,7 +606,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                         new PartitionTransformation<>(
                                 sourceDataStream.getTransformation(),
                                 new ForwardPartitioner<>(),
-                                ShuffleMode.PIPELINED));
+                                StreamExchangeMode.PIPELINED));
         DataStream<Integer> mapDataStream =
                 partitionAfterSourceDataStream.map(value -> 
value).setParallelism(1);
 
@@ -616,7 +616,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                         new PartitionTransformation<>(
                                 mapDataStream.getTransformation(),
                                 new RescalePartitioner<>(),
-                                ShuffleMode.PIPELINED));
+                                StreamExchangeMode.PIPELINED));
         partitionAfterMapDataStream.print().setParallelism(2);
 
         JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
@@ -633,7 +633,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                 
sourceAndMapVertex.getProducedDataSets().get(0).getResultType());
     }
 
-    /** Test setting shuffle mode to {@link ShuffleMode#BATCH}. */
+    /** Test setting shuffle mode to {@link StreamExchangeMode#BATCH}. */
     @Test
     public void testShuffleModeBatch() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -646,7 +646,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                         new PartitionTransformation<>(
                                 sourceDataStream.getTransformation(),
                                 new ForwardPartitioner<>(),
-                                ShuffleMode.BATCH));
+                                StreamExchangeMode.BATCH));
         DataStream<Integer> mapDataStream =
                 partitionAfterSourceDataStream.map(value -> 
value).setParallelism(1);
 
@@ -656,7 +656,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                         new PartitionTransformation<>(
                                 mapDataStream.getTransformation(),
                                 new RescalePartitioner<>(),
-                                ShuffleMode.BATCH));
+                                StreamExchangeMode.BATCH));
         partitionAfterMapDataStream.print().setParallelism(2);
 
         JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
@@ -677,7 +677,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                 mapVertex.getProducedDataSets().get(0).getResultType());
     }
 
-    /** Test setting shuffle mode to {@link ShuffleMode#UNDEFINED}. */
+    /** Test setting shuffle mode to {@link StreamExchangeMode#UNDEFINED}. */
     @Test
     public void testShuffleModeUndefined() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -690,7 +690,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                         new PartitionTransformation<>(
                                 sourceDataStream.getTransformation(),
                                 new ForwardPartitioner<>(),
-                                ShuffleMode.UNDEFINED));
+                                StreamExchangeMode.UNDEFINED));
         DataStream<Integer> mapDataStream =
                 partitionAfterSourceDataStream.map(value -> 
value).setParallelism(1);
 
@@ -700,7 +700,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                         new PartitionTransformation<>(
                                 mapDataStream.getTransformation(),
                                 new RescalePartitioner<>(),
-                                ShuffleMode.UNDEFINED));
+                                StreamExchangeMode.UNDEFINED));
         partitionAfterMapDataStream.print().setParallelism(2);
 
         JobGraph jobGraph = 
StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph());
@@ -784,15 +784,15 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
 
     @Test(expected = UnsupportedOperationException.class)
     public void testConflictShuffleModeWithBufferTimeout() {
-        testCompatibleShuffleModeWithBufferTimeout(ShuffleMode.BATCH);
+        testCompatibleShuffleModeWithBufferTimeout(StreamExchangeMode.BATCH);
     }
 
     @Test
     public void testNormalShuffleModeWithBufferTimeout() {
-        testCompatibleShuffleModeWithBufferTimeout(ShuffleMode.PIPELINED);
+        
testCompatibleShuffleModeWithBufferTimeout(StreamExchangeMode.PIPELINED);
     }
 
-    private void testCompatibleShuffleModeWithBufferTimeout(ShuffleMode 
shuffleMode) {
+    private void testCompatibleShuffleModeWithBufferTimeout(StreamExchangeMode 
exchangeMode) {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setBufferTimeout(100);
 
@@ -801,7 +801,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                 new PartitionTransformation<>(
                         sourceDataStream.getTransformation(),
                         new RebalancePartitioner<>(),
-                        shuffleMode);
+                        exchangeMode);
 
         DataStream<Integer> partitionStream = new DataStream<>(env, 
transformation);
         partitionStream.map(value -> value).print();
@@ -1396,7 +1396,7 @@ public class StreamingJobGraphGeneratorTest extends 
TestLogger {
                         new PartitionTransformation<>(
                                 source2.getTransformation(),
                                 new RebalancePartitioner<>(),
-                                ShuffleMode.BATCH));
+                                StreamExchangeMode.BATCH));
         partitioned.map(v -> v).name("map2");
 
         return env.getStreamGraph();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.java
index 5170592..7ee6462 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest.java
@@ -23,7 +23,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
 import org.apache.flink.util.TestLogger;
@@ -148,7 +148,7 @@ public class 
StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest extends Te
                         new PartitionTransformation<>(
                                 source.getTransformation(),
                                 new ForwardPartitioner<>(),
-                                ShuffleMode.PIPELINED));
+                                StreamExchangeMode.PIPELINED));
         forward.map(i -> i).startNewChain().setParallelism(1);
         final StreamGraph streamGraph = env.getStreamGraph();
         
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
@@ -178,7 +178,7 @@ public class 
StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest extends Te
                         new PartitionTransformation<>(
                                 source.getTransformation(),
                                 new ForwardPartitioner<>(),
-                                ShuffleMode.UNDEFINED));
+                                StreamExchangeMode.UNDEFINED));
         final DataStream<Integer> map1 = forward.map(i -> 
i).startNewChain().setParallelism(1);
 
         final DataStream<Integer> rescale =
@@ -187,7 +187,7 @@ public class 
StreamingJobGraphGeneratorWithGlobalDataExchangeModeTest extends Te
                         new PartitionTransformation<>(
                                 map1.getTransformation(),
                                 new RescalePartitioner<>(),
-                                ShuffleMode.UNDEFINED));
+                                StreamExchangeMode.UNDEFINED));
         final DataStream<Integer> map2 = rescale.map(i -> i).setParallelism(2);
 
         map2.rebalance().print().setParallelism(2);
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
index 9e4f2d0..f6b5216 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecEdge.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.planner.plan.nodes.exec;
 
 import org.apache.flink.api.dag.Transformation;
-import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.delegation.Planner;
 import 
org.apache.flink.table.planner.plan.nodes.exec.serde.ExecNodeGraphJsonPlanGenerator.JsonPlanEdge;
@@ -45,22 +45,25 @@ public class ExecEdge {
     private final ExecNode<?> target;
     /** The {@link Shuffle} on this edge from source to target. */
     private final Shuffle shuffle;
-    /** The {@link ShuffleMode} defines the data exchange mode on this edge. */
-    private final ShuffleMode shuffleMode;
+    /** The {@link StreamExchangeMode} defines the data exchange mode on this 
edge. */
+    private final StreamExchangeMode exchangeMode;
 
     public ExecEdge(
-            ExecNode<?> source, ExecNode<?> target, Shuffle shuffle, 
ShuffleMode shuffleMode) {
+            ExecNode<?> source,
+            ExecNode<?> target,
+            Shuffle shuffle,
+            StreamExchangeMode exchangeMode) {
         this.source = checkNotNull(source);
         this.target = checkNotNull(target);
         this.shuffle = checkNotNull(shuffle);
-        this.shuffleMode = checkNotNull(shuffleMode);
+        this.exchangeMode = checkNotNull(exchangeMode);
 
         // TODO once FLINK-21224 [Remove BatchExecExchange and 
StreamExecExchange, and replace their
         //  functionality with ExecEdge] is finished, we should remove the 
following validation.
         if (shuffle.getType() != Shuffle.Type.FORWARD) {
             throw new TableException("Only FORWARD shuffle is supported now.");
         }
-        if (shuffleMode != ShuffleMode.PIPELINED) {
+        if (exchangeMode != StreamExchangeMode.PIPELINED) {
             throw new TableException("Only PIPELINED shuffle mode is supported 
now.");
         }
     }
@@ -77,8 +80,8 @@ public class ExecEdge {
         return shuffle;
     }
 
-    public ShuffleMode getShuffleMode() {
-        return shuffleMode;
+    public StreamExchangeMode getExchangeMode() {
+        return exchangeMode;
     }
 
     /** Returns the output {@link LogicalType} of the data passing this edge. 
*/
@@ -95,8 +98,8 @@ public class ExecEdge {
                 + target.getDescription()
                 + ", shuffle="
                 + shuffle
-                + ", shuffleMode="
-                + shuffleMode
+                + ", exchangeMode="
+                + exchangeMode
                 + '}';
     }
 
@@ -109,7 +112,7 @@ public class ExecEdge {
         private ExecNode<?> source;
         private ExecNode<?> target;
         private Shuffle shuffle = FORWARD_SHUFFLE;
-        private ShuffleMode shuffleMode = ShuffleMode.PIPELINED;
+        private StreamExchangeMode exchangeMode = StreamExchangeMode.PIPELINED;
 
         public Builder source(ExecNode<?> source) {
             this.source = source;
@@ -131,13 +134,13 @@ public class ExecEdge {
             return shuffle(fromRequiredDistribution(requiredDistribution));
         }
 
-        public Builder shuffleMode(ShuffleMode shuffleMode) {
-            this.shuffleMode = shuffleMode;
+        public Builder exchangeMode(StreamExchangeMode exchangeMode) {
+            this.exchangeMode = exchangeMode;
             return this;
         }
 
         public ExecEdge build() {
-            return new ExecEdge(source, target, shuffle, shuffleMode);
+            return new ExecEdge(source, target, shuffle, exchangeMode);
         }
 
         private Shuffle fromRequiredDistribution(
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java
index 3d1b5e9..3a5b3aa 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecExchange.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
-import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
@@ -58,14 +58,14 @@ import java.util.Optional;
 public class BatchExecExchange extends CommonExecExchange implements 
BatchExecNode<RowData> {
     // the required shuffle mode for reusable BatchExecExchange
     // if it's None, use value from configuration
-    @Nullable private ShuffleMode requiredShuffleMode;
+    @Nullable private StreamExchangeMode requiredExchangeMode;
 
     public BatchExecExchange(InputProperty inputProperty, RowType outputType, 
String description) {
         super(getNewNodeId(), Collections.singletonList(inputProperty), 
outputType, description);
     }
 
-    public void setRequiredShuffleMode(@Nullable ShuffleMode 
requiredShuffleMode) {
-        this.requiredShuffleMode = requiredShuffleMode;
+    public void setRequiredExchangeMode(@Nullable StreamExchangeMode 
requiredExchangeMode) {
+        this.requiredExchangeMode = requiredExchangeMode;
     }
 
     @Override
@@ -89,7 +89,7 @@ public class BatchExecExchange extends CommonExecExchange 
implements BatchExecNo
             sb.append("[").append(String.join(", ", fieldNames)).append("]");
         }
         sb.append("]");
-        if (requiredShuffleMode == ShuffleMode.BATCH) {
+        if (requiredExchangeMode == StreamExchangeMode.BATCH) {
             sb.append(", shuffle_mode=[BATCH]");
         }
         return String.format("Exchange(%s)", sb.toString());
@@ -141,30 +141,30 @@ public class BatchExecExchange extends CommonExecExchange 
implements BatchExecNo
                 throw new TableException(distributionType + "is not supported 
now!");
         }
 
-        final ShuffleMode shuffleMode =
-                getShuffleMode(planner.getTableConfig().getConfiguration(), 
requiredShuffleMode);
+        final StreamExchangeMode exchangeMode =
+                getExchangeMode(planner.getTableConfig().getConfiguration(), 
requiredExchangeMode);
         final Transformation<RowData> transformation =
-                new PartitionTransformation<>(inputTransform, partitioner, 
shuffleMode);
+                new PartitionTransformation<>(inputTransform, partitioner, 
exchangeMode);
         transformation.setParallelism(parallelism);
         transformation.setOutputType(InternalTypeInfo.of(getOutputType()));
         return transformation;
     }
 
-    public static ShuffleMode getShuffleMode(
-            Configuration config, @Nullable ShuffleMode requiredShuffleMode) {
-        if (requiredShuffleMode == ShuffleMode.BATCH) {
-            return ShuffleMode.BATCH;
+    public static StreamExchangeMode getExchangeMode(
+            Configuration config, @Nullable StreamExchangeMode 
requiredExchangeMode) {
+        if (requiredExchangeMode == StreamExchangeMode.BATCH) {
+            return StreamExchangeMode.BATCH;
         }
         if (config.getString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE)
                 
.equalsIgnoreCase(GlobalDataExchangeMode.ALL_EDGES_BLOCKING.toString())) {
-            return ShuffleMode.BATCH;
+            return StreamExchangeMode.BATCH;
         } else {
-            return ShuffleMode.UNDEFINED;
+            return StreamExchangeMode.UNDEFINED;
         }
     }
 
     @VisibleForTesting
-    public Optional<ShuffleMode> getRequiredShuffleMode() {
-        return Optional.ofNullable(requiredShuffleMode);
+    public Optional<StreamExchangeMode> getRequiredExchangeMode() {
+        return Optional.ofNullable(requiredExchangeMode);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DeadlockBreakupProcessor.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DeadlockBreakupProcessor.java
index 970b7c6..fee1411 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DeadlockBreakupProcessor.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/DeadlockBreakupProcessor.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.processor;
 
-import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
@@ -43,7 +43,7 @@ public class DeadlockBreakupProcessor implements 
ExecNodeGraphProcessor {
                 new InputPriorityConflictResolver(
                         execGraph.getRootNodes(),
                         InputProperty.DamBehavior.END_INPUT,
-                        ShuffleMode.BATCH,
+                        StreamExchangeMode.BATCH,
                         
context.getPlanner().getTableConfig().getConfiguration());
         resolver.detectAndResolve();
         return execGraph;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessor.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessor.java
index 8f4eb5a..54a79e3 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessor.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/MultipleInputNodeCreationProcessor.java
@@ -21,8 +21,8 @@ package 
org.apache.flink.table.planner.plan.nodes.exec.processor;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.transformations.ShuffleMode;
 import org.apache.flink.streaming.api.transformations.SourceTransformation;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeGraph;
@@ -78,7 +78,7 @@ public class MultipleInputNodeCreationProcessor implements 
ExecNodeGraphProcesso
                     new InputPriorityConflictResolver(
                             execGraph.getRootNodes(),
                             InputProperty.DamBehavior.BLOCKING,
-                            ShuffleMode.PIPELINED,
+                            StreamExchangeMode.PIPELINED,
                             
context.getPlanner().getTableConfig().getConfiguration());
             resolver.detectAndResolve();
         }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
index b5c5011..bc9a2c3 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
@@ -20,7 +20,7 @@ package 
org.apache.flink.table.planner.plan.nodes.exec.processor.utils;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
@@ -40,7 +40,7 @@ import java.util.List;
 @Internal
 public class InputPriorityConflictResolver extends InputPriorityGraphGenerator 
{
 
-    private final ShuffleMode shuffleMode;
+    private final StreamExchangeMode exchangeMode;
     private final Configuration configuration;
 
     /**
@@ -49,16 +49,16 @@ public class InputPriorityConflictResolver extends 
InputPriorityGraphGenerator {
      * @param roots the first layer of nodes on the output side of the graph
      * @param safeDamBehavior when checking for conflicts we'll ignore the 
edges with {@link
      *     InputProperty.DamBehavior} stricter or equal than this
-     * @param shuffleMode when a conflict occurs we'll insert an {@link 
BatchExecExchange} node with
-     *     this shuffleMode to resolve conflict
+     * @param exchangeMode when a conflict occurs we'll insert an {@link 
BatchExecExchange} node
+     *     with this shuffleMode to resolve conflict
      */
     public InputPriorityConflictResolver(
             List<ExecNode<?>> roots,
             InputProperty.DamBehavior safeDamBehavior,
-            ShuffleMode shuffleMode,
+            StreamExchangeMode exchangeMode,
             Configuration configuration) {
         super(roots, Collections.emptySet(), safeDamBehavior);
-        this.shuffleMode = shuffleMode;
+        this.exchangeMode = exchangeMode;
         this.configuration = configuration;
     }
 
@@ -86,7 +86,7 @@ public class InputPriorityConflictResolver extends 
InputPriorityGraphGenerator {
                 BatchExecExchange newExchange =
                         new BatchExecExchange(
                                 inputProperty, (RowType) 
exchange.getOutputType(), "Exchange");
-                newExchange.setRequiredShuffleMode(shuffleMode);
+                newExchange.setRequiredExchangeMode(exchangeMode);
                 newExchange.setInputEdges(exchange.getInputEdges());
                 newNode = newExchange;
             } else {
@@ -96,7 +96,7 @@ public class InputPriorityConflictResolver extends 
InputPriorityGraphGenerator {
                                 inputProperty,
                                 (RowType) exchange.getOutputType(),
                                 exchange.getDescription());
-                newExchange.setRequiredShuffleMode(shuffleMode);
+                newExchange.setRequiredExchangeMode(exchangeMode);
                 newExchange.setInputEdges(exchange.getInputEdges());
                 newNode = newExchange;
             }
@@ -137,7 +137,7 @@ public class InputPriorityConflictResolver extends 
InputPriorityGraphGenerator {
         BatchExecExchange exchange =
                 new BatchExecExchange(
                         newInputProperty, (RowType) inputNode.getOutputType(), 
"Exchange");
-        exchange.setRequiredShuffleMode(shuffleMode);
+        exchange.setRequiredExchangeMode(exchangeMode);
         ExecEdge execEdge = 
ExecEdge.builder().source(inputNode).target(exchange).build();
         exchange.setInputEdges(Collections.singletonList(execEdge));
         return exchange;
@@ -168,7 +168,8 @@ public class InputPriorityConflictResolver extends 
InputPriorityGraphGenerator {
     }
 
     private InputProperty.DamBehavior getDamBehavior() {
-        if (BatchExecExchange.getShuffleMode(configuration, shuffleMode) == 
ShuffleMode.BATCH) {
+        if (BatchExecExchange.getExchangeMode(configuration, exchangeMode)
+                == StreamExchangeMode.BATCH) {
             return InputProperty.DamBehavior.BLOCKING;
         } else {
             return InputProperty.DamBehavior.PIPELINED;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonPlanGenerator.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonPlanGenerator.java
index f18dcb4..953ea6b 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonPlanGenerator.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/serde/ExecNodeGraphJsonPlanGenerator.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.serde;
 
-import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
@@ -293,7 +293,7 @@ public class ExecNodeGraphJsonPlanGenerator {
                                 .source(source)
                                 .target(target)
                                 .shuffle(edge.getShuffle())
-                                .shuffleMode(edge.getShuffleMode())
+                                .exchangeMode(edge.getExchangeMode())
                                 .build();
                 idToInputEdges
                         .computeIfAbsent(target.getId(), n -> new 
ArrayList<>())
@@ -343,20 +343,20 @@ public class ExecNodeGraphJsonPlanGenerator {
         @JsonSerialize(using = ShuffleJsonSerializer.class)
         @JsonDeserialize(using = ShuffleJsonDeserializer.class)
         private final ExecEdge.Shuffle shuffle;
-        /** The {@link ShuffleMode} defines the data exchange mode on this 
edge. */
+        /** The {@link StreamExchangeMode} on this edge. */
         @JsonProperty(FIELD_NAME_SHUFFLE_MODE)
-        private final ShuffleMode shuffleMode;
+        private final StreamExchangeMode exchangeMode;
 
         @JsonCreator
         public JsonPlanEdge(
                 @JsonProperty(FIELD_NAME_SOURCE) int sourceId,
                 @JsonProperty(FIELD_NAME_TARGET) int targetId,
                 @JsonProperty(FIELD_NAME_SHUFFLE) ExecEdge.Shuffle shuffle,
-                @JsonProperty(FIELD_NAME_SHUFFLE_MODE) ShuffleMode 
shuffleMode) {
+                @JsonProperty(FIELD_NAME_SHUFFLE_MODE) StreamExchangeMode 
exchangeMode) {
             this.sourceId = sourceId;
             this.targetId = targetId;
             this.shuffle = shuffle;
-            this.shuffleMode = shuffleMode;
+            this.exchangeMode = exchangeMode;
         }
 
         @JsonIgnore
@@ -375,8 +375,8 @@ public class ExecNodeGraphJsonPlanGenerator {
         }
 
         @JsonIgnore
-        public ShuffleMode getShuffleMode() {
-            return shuffleMode;
+        public StreamExchangeMode getExchangeMode() {
+            return exchangeMode;
         }
 
         /** Build {@link JsonPlanEdge} from an {@link ExecEdge}. */
@@ -385,7 +385,7 @@ public class ExecNodeGraphJsonPlanGenerator {
                     execEdge.getSource().getId(),
                     execEdge.getTarget().getId(),
                     execEdge.getShuffle(),
-                    execEdge.getShuffleMode());
+                    execEdge.getExchangeMode());
         }
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
index cce7280..4871560 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ExecutorUtils.java
@@ -84,7 +84,7 @@ public class ExecutorUtils {
     }
 
     private static GlobalDataExchangeMode 
getGlobalDataExchangeMode(TableConfig tableConfig) {
-        return ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode(
+        return StreamExchangeModeUtils.getShuffleModeAsGlobalDataExchangeMode(
                 tableConfig.getConfiguration());
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShuffleModeUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtils.java
similarity index 98%
rename from 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShuffleModeUtils.java
rename to 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtils.java
index 54dd0cd..c2e01de 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShuffleModeUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtils.java
@@ -23,7 +23,7 @@ import 
org.apache.flink.streaming.api.graph.GlobalDataExchangeMode;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 
 /** Utility class to load job-wide shuffle mode. */
-public class ShuffleModeUtils {
+public class StreamExchangeModeUtils {
 
     static final String ALL_EDGES_BLOCKING_LEGACY = "batch";
 
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala
index 81ae96b..fc00199 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala
@@ -19,11 +19,11 @@
 package org.apache.flink.table.planner.plan.nodes.physical.batch
 
 import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode
-import org.apache.flink.streaming.api.transformations.ShuffleMode
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecExchange
-import org.apache.flink.table.planner.plan.nodes.exec.{InputProperty, ExecNode}
+import org.apache.flink.table.planner.plan.nodes.exec.{ExecNode, InputProperty}
 import 
org.apache.flink.table.planner.plan.nodes.physical.common.CommonPhysicalExchange
 import org.apache.flink.table.planner.plan.utils.FlinkRelOptUtil
 
@@ -60,7 +60,7 @@ class BatchPhysicalExchange(
       throw new UnsupportedOperationException("Range sort is not supported.")
     }
 
-    val damBehavior = if (getShuffleMode eq ShuffleMode.BATCH) {
+    val damBehavior = if (getExchangeMode eq StreamExchangeMode.BATCH) {
       InputProperty.DamBehavior.BLOCKING
     } else {
       InputProperty.DamBehavior.PIPELINED
@@ -72,13 +72,13 @@ class BatchPhysicalExchange(
       .build
   }
 
-  private def getShuffleMode: ShuffleMode = {
+  private def getExchangeMode: StreamExchangeMode = {
     val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(this)
     if 
(tableConfig.getConfiguration.getString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE)
       .equalsIgnoreCase(GlobalDataExchangeMode.ALL_EDGES_BLOCKING.toString)) {
-      ShuffleMode.BATCH
+      StreamExchangeMode.BATCH
     } else {
-      ShuffleMode.UNDEFINED
+      StreamExchangeMode.UNDEFINED
     }
   }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java
index b9d6f9b..73f6228 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java
@@ -19,7 +19,7 @@
 package org.apache.flink.table.planner.plan.nodes.exec.processor.utils;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.transformations.ShuffleMode;
+import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
@@ -78,21 +78,21 @@ public class InputPriorityConflictResolverTest {
                 new InputPriorityConflictResolver(
                         Collections.singletonList(nodes[7]),
                         InputProperty.DamBehavior.END_INPUT,
-                        ShuffleMode.BATCH,
+                        StreamExchangeMode.BATCH,
                         new Configuration());
         resolver.detectAndResolve();
         Assert.assertEquals(nodes[1], nodes[7].getInputNodes().get(0));
         Assert.assertEquals(nodes[2], nodes[7].getInputNodes().get(1));
         Assert.assertTrue(nodes[7].getInputNodes().get(2) instanceof 
BatchExecExchange);
         Assert.assertEquals(
-                Optional.of(ShuffleMode.BATCH),
-                ((BatchExecExchange) 
nodes[7].getInputNodes().get(2)).getRequiredShuffleMode());
+                Optional.of(StreamExchangeMode.BATCH),
+                ((BatchExecExchange) 
nodes[7].getInputNodes().get(2)).getRequiredExchangeMode());
         Assert.assertEquals(
                 nodes[3], 
nodes[7].getInputNodes().get(2).getInputEdges().get(0).getSource());
         Assert.assertTrue(nodes[7].getInputNodes().get(3) instanceof 
BatchExecExchange);
         Assert.assertEquals(
-                Optional.of(ShuffleMode.BATCH),
-                ((BatchExecExchange) 
nodes[7].getInputNodes().get(3)).getRequiredShuffleMode());
+                Optional.of(StreamExchangeMode.BATCH),
+                ((BatchExecExchange) 
nodes[7].getInputNodes().get(3)).getRequiredExchangeMode());
         Assert.assertEquals(
                 nodes[4], 
nodes[7].getInputNodes().get(3).getInputEdges().get(0).getSource());
         Assert.assertEquals(nodes[5], nodes[7].getInputNodes().get(4));
@@ -117,7 +117,7 @@ public class InputPriorityConflictResolverTest {
                                 .build(),
                         (RowType) nodes[0].getOutputType(),
                         "Exchange");
-        exchange.setRequiredShuffleMode(ShuffleMode.BATCH);
+        exchange.setRequiredExchangeMode(StreamExchangeMode.BATCH);
         ExecEdge execEdge = 
ExecEdge.builder().source(nodes[0]).target(exchange).build();
         exchange.setInputEdges(Collections.singletonList(execEdge));
 
@@ -128,7 +128,7 @@ public class InputPriorityConflictResolverTest {
                 new InputPriorityConflictResolver(
                         Collections.singletonList(nodes[1]),
                         InputProperty.DamBehavior.END_INPUT,
-                        ShuffleMode.BATCH,
+                        StreamExchangeMode.BATCH,
                         new Configuration());
         resolver.detectAndResolve();
 
@@ -140,7 +140,8 @@ public class InputPriorityConflictResolverTest {
                 execNode -> {
                     Assert.assertTrue(execNode instanceof BatchExecExchange);
                     BatchExecExchange e = (BatchExecExchange) execNode;
-                    Assert.assertEquals(Optional.of(ShuffleMode.BATCH), 
e.getRequiredShuffleMode());
+                    Assert.assertEquals(
+                            Optional.of(StreamExchangeMode.BATCH), 
e.getRequiredExchangeMode());
                     Assert.assertEquals(nodes[0], 
e.getInputEdges().get(0).getSource());
                 };
         checkExchange.accept(input0);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/ShuffleModeUtilsTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtilsTest.java
similarity index 75%
rename from 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/ShuffleModeUtilsTest.java
rename to 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtilsTest.java
index 33ecddb..c0078da 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/ShuffleModeUtilsTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtilsTest.java
@@ -27,8 +27,8 @@ import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 
-/** Tests for {@link ShuffleModeUtils}. */
-public class ShuffleModeUtilsTest extends TestLogger {
+/** Tests for {@link StreamExchangeModeUtils}. */
+public class StreamExchangeModeUtilsTest extends TestLogger {
 
     @Test
     public void testGetValidShuffleMode() {
@@ -39,28 +39,28 @@ public class ShuffleModeUtilsTest extends TestLogger {
                 GlobalDataExchangeMode.ALL_EDGES_BLOCKING.toString());
         assertEquals(
                 GlobalDataExchangeMode.ALL_EDGES_BLOCKING,
-                
ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration));
+                
StreamExchangeModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration));
 
         configuration.setString(
                 ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
                 GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED.toString());
         assertEquals(
                 GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED,
-                
ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration));
+                
StreamExchangeModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration));
 
         configuration.setString(
                 ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
                 GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED.toString());
         assertEquals(
                 GlobalDataExchangeMode.POINTWISE_EDGES_PIPELINED,
-                
ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration));
+                
StreamExchangeModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration));
 
         configuration.setString(
                 ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
                 GlobalDataExchangeMode.ALL_EDGES_PIPELINED.toString());
         assertEquals(
                 GlobalDataExchangeMode.ALL_EDGES_PIPELINED,
-                
ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration));
+                
StreamExchangeModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration));
     }
 
     @Test
@@ -69,17 +69,17 @@ public class ShuffleModeUtilsTest extends TestLogger {
 
         configuration.setString(
                 ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
-                ShuffleModeUtils.ALL_EDGES_BLOCKING_LEGACY);
+                StreamExchangeModeUtils.ALL_EDGES_BLOCKING_LEGACY);
         assertEquals(
                 GlobalDataExchangeMode.ALL_EDGES_BLOCKING,
-                
ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration));
+                
StreamExchangeModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration));
 
         configuration.setString(
                 ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE,
-                ShuffleModeUtils.ALL_EDGES_PIPELINED_LEGACY);
+                StreamExchangeModeUtils.ALL_EDGES_PIPELINED_LEGACY);
         assertEquals(
                 GlobalDataExchangeMode.ALL_EDGES_PIPELINED,
-                
ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration));
+                
StreamExchangeModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration));
     }
 
     @Test
@@ -90,12 +90,12 @@ public class ShuffleModeUtilsTest extends TestLogger {
                 ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE, 
"Forward_edges_PIPELINED");
         assertEquals(
                 GlobalDataExchangeMode.FORWARD_EDGES_PIPELINED,
-                
ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration));
+                
StreamExchangeModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration));
 
         
configuration.setString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE, 
"Pipelined");
         assertEquals(
                 GlobalDataExchangeMode.ALL_EDGES_PIPELINED,
-                
ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration));
+                
StreamExchangeModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration));
     }
 
     @Test
@@ -103,13 +103,13 @@ public class ShuffleModeUtilsTest extends TestLogger {
         final Configuration configuration = new Configuration();
         assertEquals(
                 GlobalDataExchangeMode.ALL_EDGES_BLOCKING,
-                
ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration));
+                
StreamExchangeModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration));
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testGetInvalidShuffleMode() {
         final Configuration configuration = new Configuration();
         
configuration.setString(ExecutionConfigOptions.TABLE_EXEC_SHUFFLE_MODE, 
"invalid-value");
-        ShuffleModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration);
+        
StreamExchangeModeUtils.getShuffleModeAsGlobalDataExchangeMode(configuration);
     }
 }

Reply via email to