This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cac80be18c [FLINK-33612][table-planner] Hybrid shuffle mode avoids 
unnecessary blocking edges in the plan
9cac80be18c is described below

commit 9cac80be18c6aff0cebdfe706327c1693822e884
Author: Yuxin Tan <tanyuxinw...@gmail.com>
AuthorDate: Wed Nov 22 14:22:45 2023 +0800

    [FLINK-33612][table-planner] Hybrid shuffle mode avoids unnecessary 
blocking edges in the plan
---
 .../utils/InputPriorityConflictResolver.java       |   6 +-
 .../planner/utils/StreamExchangeModeUtils.java     |   4 +
 .../physical/batch/BatchPhysicalExchange.scala     |  10 +-
 .../utils/InputPriorityConflictResolverTest.java   |  72 +++-
 .../plan/optimize/ShuffleModePlanOptimizeTest.java | 127 ++++++
 .../planner/utils/StreamExchangeModeUtilsTest.java |  11 +
 .../plan/optimize/ShuffleModePlanOptimizeTest.xml  | 445 +++++++++++++++++++++
 7 files changed, 655 insertions(+), 20 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
index 144b527c520..0479993b5bd 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java
@@ -199,7 +199,11 @@ public class InputPriorityConflictResolver extends 
InputPriorityGraphGenerator {
     }
 
     private InputProperty.DamBehavior getDamBehavior() {
-        if (getBatchStreamExchangeMode(tableConfig, exchangeMode) == 
StreamExchangeMode.BATCH) {
+        StreamExchangeMode streamExchangeMode =
+                getBatchStreamExchangeMode(tableConfig, exchangeMode);
+        if (streamExchangeMode == StreamExchangeMode.BATCH
+                || streamExchangeMode == StreamExchangeMode.HYBRID_FULL
+                || streamExchangeMode == StreamExchangeMode.HYBRID_SELECTIVE) {
             return InputProperty.DamBehavior.BLOCKING;
         } else {
             return InputProperty.DamBehavior.PIPELINED;
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtils.java
index bd722a07726..312e59bee34 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtils.java
@@ -52,6 +52,10 @@ public class StreamExchangeModeUtils {
         final BatchShuffleMode shuffleMode = 
config.get(ExecutionOptions.BATCH_SHUFFLE_MODE);
         if (shuffleMode == BatchShuffleMode.ALL_EXCHANGES_BLOCKING) {
             return StreamExchangeMode.BATCH;
+        } else if (shuffleMode == BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL) {
+            return StreamExchangeMode.HYBRID_FULL;
+        } else if (shuffleMode == 
BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE) {
+            return StreamExchangeMode.HYBRID_SELECTIVE;
         }
 
         return StreamExchangeMode.UNDEFINED;
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala
index 3fbddc658e1..0a57f8087f8 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala
@@ -60,10 +60,12 @@ class BatchPhysicalExchange(
     val exchangeMode =
       getBatchStreamExchangeMode(unwrapTableConfig(this), 
StreamExchangeMode.UNDEFINED)
 
-    val damBehavior = if (exchangeMode eq StreamExchangeMode.BATCH) {
-      InputProperty.DamBehavior.BLOCKING
-    } else {
-      InputProperty.DamBehavior.PIPELINED
+    val damBehavior = exchangeMode match {
+      case StreamExchangeMode.BATCH | StreamExchangeMode.HYBRID_FULL |
+          StreamExchangeMode.HYBRID_SELECTIVE =>
+        InputProperty.DamBehavior.BLOCKING
+      case _ =>
+        InputProperty.DamBehavior.PIPELINED
     }
 
     InputProperty.builder
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java
index 927347defdd..41a85088b3d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.table.planner.plan.nodes.exec.processor.utils;
 
+import org.apache.flink.api.common.BatchShuffleMode;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ExecutionOptions;
 import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
@@ -30,9 +33,15 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSource
 import 
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
 
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Optional;
 import java.util.function.Consumer;
@@ -40,9 +49,27 @@ import java.util.function.Consumer;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link InputPriorityConflictResolver}. */
+@ExtendWith(ParameterizedTestExtension.class)
 class InputPriorityConflictResolverTest {
 
-    @Test
+    @Parameter
+    public Tuple2<BatchShuffleMode, StreamExchangeMode> 
batchShuffleModeAndStreamExchangeMode;
+
+    @Parameters(name = "batchShuffleModeAndStreamExchangeMode={0}")
+    public static Collection<Tuple2<BatchShuffleMode, StreamExchangeMode>> 
parameters() {
+        return Arrays.asList(
+                Tuple2.of(BatchShuffleMode.ALL_EXCHANGES_BLOCKING, 
StreamExchangeMode.BATCH),
+                Tuple2.of(BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL, 
StreamExchangeMode.BATCH),
+                Tuple2.of(
+                        BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE, 
StreamExchangeMode.BATCH),
+                Tuple2.of(
+                        BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL, 
StreamExchangeMode.HYBRID_FULL),
+                Tuple2.of(
+                        BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE,
+                        StreamExchangeMode.HYBRID_SELECTIVE));
+    }
+
+    @TestTemplate
     void testDetectAndResolve() {
         // P = InputProperty.DamBehavior.PIPELINED, E = 
InputProperty.DamBehavior.END_INPUT
         // P100 = PIPELINED + priority 100
@@ -79,30 +106,35 @@ class InputPriorityConflictResolverTest {
         nodes[7].addInput(nodes[5], 
InputProperty.builder().priority(10).build());
         nodes[7].addInput(nodes[6], 
InputProperty.builder().priority(100).build());
 
+        Configuration configuration = new Configuration();
+        BatchShuffleMode batchShuffleMode = 
batchShuffleModeAndStreamExchangeMode.f0;
+        StreamExchangeMode streamExchangeMode = 
batchShuffleModeAndStreamExchangeMode.f1;
+        configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, 
batchShuffleMode);
+
         InputPriorityConflictResolver resolver =
                 new InputPriorityConflictResolver(
                         Collections.singletonList(nodes[7]),
                         InputProperty.DamBehavior.END_INPUT,
-                        StreamExchangeMode.BATCH,
-                        new Configuration());
+                        streamExchangeMode,
+                        configuration);
         resolver.detectAndResolve();
         assertThat(nodes[7].getInputNodes().get(0)).isEqualTo(nodes[1]);
         assertThat(nodes[7].getInputNodes().get(1)).isEqualTo(nodes[2]);
         
assertThat(nodes[7].getInputNodes().get(2)).isInstanceOf(BatchExecExchange.class);
         assertThat(((BatchExecExchange) 
nodes[7].getInputNodes().get(2)).getRequiredExchangeMode())
-                .isEqualTo(Optional.of(StreamExchangeMode.BATCH));
+                .isEqualTo(Optional.of(streamExchangeMode));
         
assertThat(nodes[7].getInputNodes().get(2).getInputEdges().get(0).getSource())
                 .isEqualTo(nodes[3]);
         
assertThat(nodes[7].getInputNodes().get(3)).isInstanceOf(BatchExecExchange.class);
         assertThat(((BatchExecExchange) 
nodes[7].getInputNodes().get(3)).getRequiredExchangeMode())
-                .isEqualTo(Optional.of(StreamExchangeMode.BATCH));
+                .isEqualTo(Optional.of(streamExchangeMode));
         
assertThat(nodes[7].getInputNodes().get(3).getInputEdges().get(0).getSource())
                 .isEqualTo(nodes[4]);
         assertThat(nodes[7].getInputNodes().get(4)).isEqualTo(nodes[5]);
         assertThat(nodes[7].getInputNodes().get(5)).isEqualTo(nodes[6]);
     }
 
-    @Test
+    @TestTemplate
     void testDeadlockCausedByExchange() {
         // P1 = PIPELINED + priority 1
         //
@@ -113,15 +145,20 @@ class InputPriorityConflictResolverTest {
             nodes[i] = new TestingBatchExecNode("TestingBatchExecNode" + i);
         }
 
+        Configuration configuration = new Configuration();
+        BatchShuffleMode batchShuffleMode = 
batchShuffleModeAndStreamExchangeMode.f0;
+        StreamExchangeMode streamExchangeMode = 
batchShuffleModeAndStreamExchangeMode.f1;
+        configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, 
batchShuffleMode);
+
         BatchExecExchange exchange =
                 new BatchExecExchange(
-                        new Configuration(),
+                        configuration,
                         InputProperty.builder()
                                 
.requiredDistribution(InputProperty.ANY_DISTRIBUTION)
                                 .build(),
                         (RowType) nodes[0].getOutputType(),
                         "Exchange");
-        exchange.setRequiredExchangeMode(StreamExchangeMode.BATCH);
+        exchange.setRequiredExchangeMode(streamExchangeMode);
         ExecEdge execEdge = 
ExecEdge.builder().source(nodes[0]).target(exchange).build();
         exchange.setInputEdges(Collections.singletonList(execEdge));
 
@@ -132,8 +169,8 @@ class InputPriorityConflictResolverTest {
                 new InputPriorityConflictResolver(
                         Collections.singletonList(nodes[1]),
                         InputProperty.DamBehavior.END_INPUT,
-                        StreamExchangeMode.BATCH,
-                        new Configuration());
+                        streamExchangeMode,
+                        configuration);
         resolver.detectAndResolve();
 
         ExecNode<?> input0 = nodes[1].getInputNodes().get(0);
@@ -145,14 +182,14 @@ class InputPriorityConflictResolverTest {
                     assertThat(execNode).isInstanceOf(BatchExecExchange.class);
                     BatchExecExchange e = (BatchExecExchange) execNode;
                     assertThat(e.getRequiredExchangeMode())
-                            .isEqualTo(Optional.of(StreamExchangeMode.BATCH));
+                            .isEqualTo(Optional.of(streamExchangeMode));
                     
assertThat(e.getInputEdges().get(0).getSource()).isEqualTo(nodes[0]);
                 };
         checkExchange.accept(input0);
         checkExchange.accept(input1);
     }
 
-    @Test
+    @TestTemplate
     void testWithDynamicFilteringPlan() {
         // no conflicts for dpp pattern
         // 2 --------------------------------------(P1)--- 1 --(P0)--> 0
@@ -188,12 +225,17 @@ class InputPriorityConflictResolverTest {
         ExecEdge toCollector = 
ExecEdge.builder().source(nodes[2]).target(collector).build();
         collector.setInputEdges(Collections.singletonList(toCollector));
 
+        Configuration configuration = new Configuration();
+        BatchShuffleMode batchShuffleMode = 
batchShuffleModeAndStreamExchangeMode.f0;
+        StreamExchangeMode streamExchangeMode = 
batchShuffleModeAndStreamExchangeMode.f1;
+        configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, 
batchShuffleMode);
+
         InputPriorityConflictResolver resolver =
                 new InputPriorityConflictResolver(
                         Collections.singletonList(nodes[1]),
                         InputProperty.DamBehavior.END_INPUT,
-                        StreamExchangeMode.BATCH,
-                        new Configuration());
+                        streamExchangeMode,
+                        configuration);
         resolver.detectAndResolve();
 
         ExecNode<?> input0 = nodes[1].getInputNodes().get(0);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ShuffleModePlanOptimizeTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ShuffleModePlanOptimizeTest.java
new file mode 100644
index 00000000000..0c14d90d178
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ShuffleModePlanOptimizeTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.optimize;
+
+import org.apache.flink.api.common.BatchShuffleMode;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.OptimizerConfigOptions;
+import org.apache.flink.table.planner.factories.TestValuesCatalog;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
+import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/** Test optimized plans for different shuffle mode. */
+@ExtendWith(ParameterizedTestExtension.class)
+class ShuffleModePlanOptimizeTest extends TableTestBase {
+
+    @Parameters(name = "mode = {0}")
+    public static Collection<BatchShuffleMode> parameters() {
+        return Arrays.asList(
+                BatchShuffleMode.ALL_EXCHANGES_BLOCKING,
+                BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE,
+                BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
+    }
+
+    @Parameter public BatchShuffleMode mode;
+
+    private final BatchTableTestUtil util = 
batchTestUtil(TableConfig.getDefault());
+    private final TestValuesCatalog catalog =
+            new TestValuesCatalog("testCatalog", "test_database", true);
+
+    @BeforeEach
+    void setup() {
+        catalog.open();
+        util.tableEnv().registerCatalog("testCatalog", catalog);
+        util.tableEnv().useCatalog("testCatalog");
+        TableConfig tableConfig = util.tableEnv().getConfig();
+        
tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED,
 true);
+        if (mode != null) {
+            tableConfig.set(ExecutionOptions.BATCH_SHUFFLE_MODE, mode);
+        }
+
+        // partition fact table.
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE fact_part (\n"
+                                + "  id BIGINT,\n"
+                                + "  name STRING,\n"
+                                + "  amount BIGINT,\n"
+                                + "  price BIGINT,\n"
+                                + "  fact_date_sk BIGINT\n"
+                                + ") PARTITIONED BY (fact_date_sk)\n"
+                                + "WITH (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'runtime-source' = 'NewSource',\n"
+                                + " 'partition-list' = 
'fact_date_sk:1990;fact_date_sk:1991;fact_date_sk:1992',\n"
+                                + " 'dynamic-filtering-fields' = 
'fact_date_sk;amount',\n"
+                                + " 'bounded' = 'true'\n"
+                                + ")");
+
+        // dim table.
+        util.tableEnv()
+                .executeSql(
+                        "CREATE TABLE dim (\n"
+                                + "  id BIGINT,\n"
+                                + "  male BOOLEAN,\n"
+                                + "  amount BIGINT,\n"
+                                + "  price BIGINT,\n"
+                                + "  dim_date_sk BIGINT\n"
+                                + ") WITH (\n"
+                                + " 'connector' = 'values',\n"
+                                + " 'runtime-source' = 'NewSource',\n"
+                                + " 'bounded' = 'true'\n"
+                                + ")");
+    }
+
+    @TestTemplate
+    void testMultipleInputWithDPP() {
+        String query =
+                "SELECT * FROM"
+                        + " (Select count(*) c1 from fact_part, dim "
+                        + "where fact_part.fact_date_sk = dim_date_sk and 
fact_part.price < 100) s1,"
+                        + " (Select count(*) c2 from fact_part, dim "
+                        + "where fact_part.fact_date_sk = dim_date_sk and 
dim.price < 200) s2,"
+                        + " (Select count(*) c3 from fact_part, dim "
+                        + "where fact_part.fact_date_sk = dim_date_sk and 
dim.price < 400) s3";
+        util.verifyExecPlan(query);
+    }
+
+    @TestTemplate
+    void testMultipleInputWithoutDPP() {
+        String query =
+                "SELECT * FROM"
+                        + " (Select count(*) c1 from fact_part, dim "
+                        + "where fact_part.fact_date_sk = dim_date_sk and 
fact_part.price < 100) s1,"
+                        + " (Select count(*) c2 from fact_part, dim "
+                        + "where fact_part.fact_date_sk = dim_date_sk and 
fact_part.price < 200) s2,"
+                        + " (Select count(*) c3 from fact_part, dim "
+                        + "where fact_part.fact_date_sk = dim_date_sk and 
fact_part.price < 400) s3";
+        util.verifyExecPlan(query);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtilsTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtilsTest.java
index e5a4d918b15..c350a991390 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtilsTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtilsTest.java
@@ -47,6 +47,17 @@ class StreamExchangeModeUtilsTest {
         assertThat(getBatchStreamExchangeMode(configuration, null))
                 .isEqualTo(StreamExchangeMode.BATCH);
 
+        configuration.set(
+                ExecutionOptions.BATCH_SHUFFLE_MODE, 
BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL);
+        assertThat(getBatchStreamExchangeMode(configuration, null))
+                .isEqualTo(StreamExchangeMode.HYBRID_FULL);
+
+        configuration.set(
+                ExecutionOptions.BATCH_SHUFFLE_MODE,
+                BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE);
+        assertThat(getBatchStreamExchangeMode(configuration, null))
+                .isEqualTo(StreamExchangeMode.HYBRID_SELECTIVE);
+
         configuration.set(
                 ExecutionOptions.BATCH_SHUFFLE_MODE, 
BatchShuffleMode.ALL_EXCHANGES_PIPELINED);
         assertThat(getBatchStreamExchangeMode(configuration, null))
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ShuffleModePlanOptimizeTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ShuffleModePlanOptimizeTest.xml
new file mode 100644
index 00000000000..65382e2f600
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ShuffleModePlanOptimizeTest.xml
@@ -0,0 +1,445 @@
+<?xml version="1.0" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<Root>
+       <TestCase name="testMultipleInputWithDPP[mode = 
ALL_EXCHANGES_BLOCKING]">
+               <Resource name="sql">
+                       <![CDATA[SELECT * FROM (Select count(*) c1 from 
fact_part, dim where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 
100) s1, (Select count(*) c2 from fact_part, dim where fact_part.fact_date_sk = 
dim_date_sk and dim.price < 200) s2, (Select count(*) c3 from fact_part, dim 
where fact_part.fact_date_sk = dim_date_sk and dim.price < 400) s3]]>
+               </Resource>
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(c1=[$0], c2=[$1], c3=[$2])
++- LogicalJoin(condition=[true], joinType=[inner])
+   :- LogicalJoin(condition=[true], joinType=[inner])
+   :  :- LogicalAggregate(group=[{}], c1=[COUNT()])
+   :  :  +- LogicalProject($f0=[0])
+   :  :     +- LogicalFilter(condition=[AND(=($4, $9), <($3, 100))])
+   :  :        +- LogicalJoin(condition=[true], joinType=[inner])
+   :  :           :- LogicalTableScan(table=[[testCatalog, test_database, 
fact_part]])
+   :  :           +- LogicalTableScan(table=[[testCatalog, test_database, 
dim]])
+   :  +- LogicalAggregate(group=[{}], c2=[COUNT()])
+   :     +- LogicalProject($f0=[0])
+   :        +- LogicalFilter(condition=[AND(=($4, $9), <($8, 200))])
+   :           +- LogicalJoin(condition=[true], joinType=[inner])
+   :              :- LogicalTableScan(table=[[testCatalog, test_database, 
fact_part]])
+   :              +- LogicalTableScan(table=[[testCatalog, test_database, 
dim]])
+   +- LogicalAggregate(group=[{}], c3=[COUNT()])
+      +- LogicalProject($f0=[0])
+         +- LogicalFilter(condition=[AND(=($4, $9), <($8, 400))])
+            +- LogicalJoin(condition=[true], joinType=[inner])
+               :- LogicalTableScan(table=[[testCatalog, test_database, 
fact_part]])
+               +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+MultipleInput(readOrder=[0,0,1], 
members=[\nNestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2, 
c3], build=[right], singleRowJoin=[true])\n:- 
NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2], 
build=[left], singleRowJoin=[true])\n:  :- [#2] 
Exchange(distribution=[broadcast])\n:  +- [#3] HashAggregate(isMerge=[true], 
select=[Final_COUNT(count1$0) AS c2])\n+- [#1] 
Exchange(distribution=[broadcast])\n])
+:- Exchange(distribution=[broadcast])
+:  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3])
+:     +- Exchange(distribution=[single])
+:        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] 
Exchange(distribution=[any], shuffle_mode=[BATCH])\n+- [#2] 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, 
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n])
+:                 :     :- Exchange(distribution=[any], shuffle_mode=[BATCH])
+:                 :     :  +- 
DynamicFilteringDataCollector(fields=[dim_date_sk])
+:                 :     :     +- Calc(select=[dim_date_sk], where=[(price < 
400)])(reuse_id=[1])
+:                 :     :        +- TableSourceScan(table=[[testCatalog, 
test_database, dim, project=[price, dim_date_sk], metadata=[]]], fields=[price, 
dim_date_sk])(reuse_id=[2])
+:                 :     +- 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, 
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])
+:                 +- Exchange(distribution=[hash[dim_date_sk]])
+:                    +- Reused(reference_id=[1])
+:- Exchange(distribution=[broadcast])
+:  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1])
+:     +- Exchange(distribution=[single])
+:        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- Calc(select=[fact_date_sk], where=[(price < 100)])
+:                 :     +- TableSourceScan(table=[[testCatalog, test_database, 
fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]], 
fields=[price, fact_date_sk])
+:                 +- Exchange(distribution=[hash[dim_date_sk]])
+:                    +- Calc(select=[dim_date_sk])
+:                       +- Reused(reference_id=[2])
++- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])
+   +- Exchange(distribution=[single])
+      +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+         +- Calc(select=[0 AS $f0])
+            +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right])
+               :- Exchange(distribution=[hash[fact_date_sk]])
+               :  +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] 
Exchange(distribution=[any], shuffle_mode=[BATCH])\n+- [#2] 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, 
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n])
+               :     :- Exchange(distribution=[any], shuffle_mode=[BATCH])
+               :     :  +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+               :     :     +- Calc(select=[dim_date_sk], where=[(price < 
200)])(reuse_id=[3])
+               :     :        +- Reused(reference_id=[2])
+               :     +- DynamicFilteringTableSourceScan(table=[[testCatalog, 
test_database, fact_part, project=[fact_date_sk], metadata=[]]], 
fields=[fact_date_sk])
+               +- Exchange(distribution=[hash[dim_date_sk]])
+                  +- Reused(reference_id=[3])
+]]>
+               </Resource>
+       </TestCase>
+       <TestCase name="testMultipleInputWithDPP[mode = 
ALL_EXCHANGES_HYBRID_SELECTIVE]">
+               <Resource name="sql">
+                       <![CDATA[SELECT * FROM (Select count(*) c1 from 
fact_part, dim where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 
100) s1, (Select count(*) c2 from fact_part, dim where fact_part.fact_date_sk = 
dim_date_sk and dim.price < 200) s2, (Select count(*) c3 from fact_part, dim 
where fact_part.fact_date_sk = dim_date_sk and dim.price < 400) s3]]>
+               </Resource>
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(c1=[$0], c2=[$1], c3=[$2])
++- LogicalJoin(condition=[true], joinType=[inner])
+   :- LogicalJoin(condition=[true], joinType=[inner])
+   :  :- LogicalAggregate(group=[{}], c1=[COUNT()])
+   :  :  +- LogicalProject($f0=[0])
+   :  :     +- LogicalFilter(condition=[AND(=($4, $9), <($3, 100))])
+   :  :        +- LogicalJoin(condition=[true], joinType=[inner])
+   :  :           :- LogicalTableScan(table=[[testCatalog, test_database, 
fact_part]])
+   :  :           +- LogicalTableScan(table=[[testCatalog, test_database, 
dim]])
+   :  +- LogicalAggregate(group=[{}], c2=[COUNT()])
+   :     +- LogicalProject($f0=[0])
+   :        +- LogicalFilter(condition=[AND(=($4, $9), <($8, 200))])
+   :           +- LogicalJoin(condition=[true], joinType=[inner])
+   :              :- LogicalTableScan(table=[[testCatalog, test_database, 
fact_part]])
+   :              +- LogicalTableScan(table=[[testCatalog, test_database, 
dim]])
+   +- LogicalAggregate(group=[{}], c3=[COUNT()])
+      +- LogicalProject($f0=[0])
+         +- LogicalFilter(condition=[AND(=($4, $9), <($8, 400))])
+            +- LogicalJoin(condition=[true], joinType=[inner])
+               :- LogicalTableScan(table=[[testCatalog, test_database, 
fact_part]])
+               +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+MultipleInput(readOrder=[0,0,1], 
members=[\nNestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2, 
c3], build=[right], singleRowJoin=[true])\n:- 
NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2], 
build=[left], singleRowJoin=[true])\n:  :- [#2] 
Exchange(distribution=[broadcast])\n:  +- [#3] HashAggregate(isMerge=[true], 
select=[Final_COUNT(count1$0) AS c2])\n+- [#1] 
Exchange(distribution=[broadcast])\n])
+:- Exchange(distribution=[broadcast])
+:  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3])
+:     +- Exchange(distribution=[single])
+:        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] 
Exchange(distribution=[any], shuffle_mode=[BATCH])\n+- [#2] 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, 
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n])
+:                 :     :- Exchange(distribution=[any], shuffle_mode=[BATCH])
+:                 :     :  +- 
DynamicFilteringDataCollector(fields=[dim_date_sk])
+:                 :     :     +- Calc(select=[dim_date_sk], where=[(price < 
400)])(reuse_id=[1])
+:                 :     :        +- TableSourceScan(table=[[testCatalog, 
test_database, dim, project=[price, dim_date_sk], metadata=[]]], fields=[price, 
dim_date_sk])(reuse_id=[2])
+:                 :     +- 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, 
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])
+:                 +- Exchange(distribution=[hash[dim_date_sk]], 
shuffle_mode=[BATCH])
+:                    +- Reused(reference_id=[1])
+:- Exchange(distribution=[broadcast])
+:  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1])
+:     +- Exchange(distribution=[single])
+:        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- Calc(select=[fact_date_sk], where=[(price < 100)])
+:                 :     +- TableSourceScan(table=[[testCatalog, test_database, 
fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]], 
fields=[price, fact_date_sk])
+:                 +- Exchange(distribution=[hash[dim_date_sk]])
+:                    +- Calc(select=[dim_date_sk])
+:                       +- Exchange(distribution=[any], shuffle_mode=[BATCH])
+:                          +- Reused(reference_id=[2])
++- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])
+   +- Exchange(distribution=[single])
+      +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+         +- Calc(select=[0 AS $f0])
+            +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right])
+               :- Exchange(distribution=[hash[fact_date_sk]])
+               :  +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] 
Exchange(distribution=[any], shuffle_mode=[BATCH])\n+- [#2] 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, 
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n])
+               :     :- Exchange(distribution=[any], shuffle_mode=[BATCH])
+               :     :  +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+               :     :     +- Calc(select=[dim_date_sk], where=[(price < 
200)])(reuse_id=[3])
+               :     :        +- Reused(reference_id=[2])
+               :     +- DynamicFilteringTableSourceScan(table=[[testCatalog, 
test_database, fact_part, project=[fact_date_sk], metadata=[]]], 
fields=[fact_date_sk])
+               +- Exchange(distribution=[hash[dim_date_sk]], 
shuffle_mode=[BATCH])
+                  +- Reused(reference_id=[3])
+]]>
+               </Resource>
+       </TestCase>
+       <TestCase name="testMultipleInputWithDPP[mode = 
ALL_EXCHANGES_HYBRID_FULL]">
+               <Resource name="sql">
+                       <![CDATA[SELECT * FROM (Select count(*) c1 from 
fact_part, dim where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 
100) s1, (Select count(*) c2 from fact_part, dim where fact_part.fact_date_sk = 
dim_date_sk and dim.price < 200) s2, (Select count(*) c3 from fact_part, dim 
where fact_part.fact_date_sk = dim_date_sk and dim.price < 400) s3]]>
+               </Resource>
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(c1=[$0], c2=[$1], c3=[$2])
++- LogicalJoin(condition=[true], joinType=[inner])
+   :- LogicalJoin(condition=[true], joinType=[inner])
+   :  :- LogicalAggregate(group=[{}], c1=[COUNT()])
+   :  :  +- LogicalProject($f0=[0])
+   :  :     +- LogicalFilter(condition=[AND(=($4, $9), <($3, 100))])
+   :  :        +- LogicalJoin(condition=[true], joinType=[inner])
+   :  :           :- LogicalTableScan(table=[[testCatalog, test_database, 
fact_part]])
+   :  :           +- LogicalTableScan(table=[[testCatalog, test_database, 
dim]])
+   :  +- LogicalAggregate(group=[{}], c2=[COUNT()])
+   :     +- LogicalProject($f0=[0])
+   :        +- LogicalFilter(condition=[AND(=($4, $9), <($8, 200))])
+   :           +- LogicalJoin(condition=[true], joinType=[inner])
+   :              :- LogicalTableScan(table=[[testCatalog, test_database, 
fact_part]])
+   :              +- LogicalTableScan(table=[[testCatalog, test_database, 
dim]])
+   +- LogicalAggregate(group=[{}], c3=[COUNT()])
+      +- LogicalProject($f0=[0])
+         +- LogicalFilter(condition=[AND(=($4, $9), <($8, 400))])
+            +- LogicalJoin(condition=[true], joinType=[inner])
+               :- LogicalTableScan(table=[[testCatalog, test_database, 
fact_part]])
+               +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+MultipleInput(readOrder=[0,0,1], 
members=[\nNestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2, 
c3], build=[right], singleRowJoin=[true])\n:- 
NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2], 
build=[left], singleRowJoin=[true])\n:  :- [#2] 
Exchange(distribution=[broadcast])\n:  +- [#3] HashAggregate(isMerge=[true], 
select=[Final_COUNT(count1$0) AS c2])\n+- [#1] 
Exchange(distribution=[broadcast])\n])
+:- Exchange(distribution=[broadcast])
+:  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3])
+:     +- Exchange(distribution=[single])
+:        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] 
Exchange(distribution=[any], shuffle_mode=[BATCH])\n+- [#2] 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, 
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n])
+:                 :     :- Exchange(distribution=[any], shuffle_mode=[BATCH])
+:                 :     :  +- 
DynamicFilteringDataCollector(fields=[dim_date_sk])
+:                 :     :     +- Calc(select=[dim_date_sk], where=[(price < 
400)])(reuse_id=[1])
+:                 :     :        +- TableSourceScan(table=[[testCatalog, 
test_database, dim, project=[price, dim_date_sk], metadata=[]]], fields=[price, 
dim_date_sk])(reuse_id=[2])
+:                 :     +- 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, 
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])
+:                 +- Exchange(distribution=[hash[dim_date_sk]], 
shuffle_mode=[BATCH])
+:                    +- Reused(reference_id=[1])
+:- Exchange(distribution=[broadcast])
+:  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1])
+:     +- Exchange(distribution=[single])
+:        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- Calc(select=[fact_date_sk], where=[(price < 100)])
+:                 :     +- TableSourceScan(table=[[testCatalog, test_database, 
fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]], 
fields=[price, fact_date_sk])
+:                 +- Exchange(distribution=[hash[dim_date_sk]])
+:                    +- Calc(select=[dim_date_sk])
+:                       +- Exchange(distribution=[any], shuffle_mode=[BATCH])
+:                          +- Reused(reference_id=[2])
++- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])
+   +- Exchange(distribution=[single])
+      +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+         +- Calc(select=[0 AS $f0])
+            +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right])
+               :- Exchange(distribution=[hash[fact_date_sk]])
+               :  +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] 
Exchange(distribution=[any], shuffle_mode=[BATCH])\n+- [#2] 
DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, 
project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n])
+               :     :- Exchange(distribution=[any], shuffle_mode=[BATCH])
+               :     :  +- DynamicFilteringDataCollector(fields=[dim_date_sk])
+               :     :     +- Calc(select=[dim_date_sk], where=[(price < 
200)])(reuse_id=[3])
+               :     :        +- Reused(reference_id=[2])
+               :     +- DynamicFilteringTableSourceScan(table=[[testCatalog, 
test_database, fact_part, project=[fact_date_sk], metadata=[]]], 
fields=[fact_date_sk])
+               +- Exchange(distribution=[hash[dim_date_sk]], 
shuffle_mode=[BATCH])
+                  +- Reused(reference_id=[3])
+]]>
+               </Resource>
+       </TestCase>
+       <TestCase name="testMultipleInputWithoutDPP[mode = 
ALL_EXCHANGES_BLOCKING]">
+               <Resource name="sql">
+                       <![CDATA[SELECT * FROM (Select count(*) c1 from 
fact_part, dim where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 
100) s1, (Select count(*) c2 from fact_part, dim where fact_part.fact_date_sk = 
dim_date_sk and fact_part.price < 200) s2, (Select count(*) c3 from fact_part, 
dim where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 400) s3]]>
+               </Resource>
+               <Resource name="ast">
+               <![CDATA[
+LogicalProject(c1=[$0], c2=[$1], c3=[$2])
++- LogicalJoin(condition=[true], joinType=[inner])
+   :- LogicalJoin(condition=[true], joinType=[inner])
+   :  :- LogicalAggregate(group=[{}], c1=[COUNT()])
+   :  :  +- LogicalProject($f0=[0])
+   :  :     +- LogicalFilter(condition=[AND(=($4, $9), <($3, 100))])
+   :  :        +- LogicalJoin(condition=[true], joinType=[inner])
+   :  :           :- LogicalTableScan(table=[[testCatalog, test_database, 
fact_part]])
+   :  :           +- LogicalTableScan(table=[[testCatalog, test_database, 
dim]])
+   :  +- LogicalAggregate(group=[{}], c2=[COUNT()])
+   :     +- LogicalProject($f0=[0])
+   :        +- LogicalFilter(condition=[AND(=($4, $9), <($3, 200))])
+   :           +- LogicalJoin(condition=[true], joinType=[inner])
+   :              :- LogicalTableScan(table=[[testCatalog, test_database, 
fact_part]])
+   :              +- LogicalTableScan(table=[[testCatalog, test_database, 
dim]])
+   +- LogicalAggregate(group=[{}], c3=[COUNT()])
+      +- LogicalProject($f0=[0])
+         +- LogicalFilter(condition=[AND(=($4, $9), <($3, 400))])
+            +- LogicalJoin(condition=[true], joinType=[inner])
+               :- LogicalTableScan(table=[[testCatalog, test_database, 
fact_part]])
+               +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+               <![CDATA[
+MultipleInput(readOrder=[0,0,1], 
members=[\nNestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2, 
c3], build=[right], singleRowJoin=[true])\n:- 
NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2], 
build=[left], singleRowJoin=[true])\n:  :- [#2] 
Exchange(distribution=[broadcast])\n:  +- [#3] HashAggregate(isMerge=[true], 
select=[Final_COUNT(count1$0) AS c2])\n+- [#1] 
Exchange(distribution=[broadcast])\n])
+:- Exchange(distribution=[broadcast])
+:  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3])
+:     +- Exchange(distribution=[single])
+:        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- Calc(select=[fact_date_sk], where=[(price < 400)])
+:                 :     +- TableSourceScan(table=[[testCatalog, test_database, 
fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]], 
fields=[price, fact_date_sk])(reuse_id=[1])
+:                 +- Exchange(distribution=[hash[dim_date_sk]])(reuse_id=[2])
+:                    +- TableSourceScan(table=[[testCatalog, test_database, 
dim, project=[dim_date_sk], metadata=[]]], fields=[dim_date_sk])
+:- Exchange(distribution=[broadcast])
+:  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1])
+:     +- Exchange(distribution=[single])
+:        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- Calc(select=[fact_date_sk], where=[(price < 100)])
+:                 :     +- Reused(reference_id=[1])
+:                 +- Reused(reference_id=[2])
++- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])
+   +- Exchange(distribution=[single])
+      +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+         +- Calc(select=[0 AS $f0])
+            +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+               :- Exchange(distribution=[hash[fact_date_sk]])
+               :  +- Calc(select=[fact_date_sk], where=[(price < 200)])
+               :     +- Reused(reference_id=[1])
+               +- Reused(reference_id=[2])
+]]>
+               </Resource>
+       </TestCase>
+       <TestCase name="testMultipleInputWithoutDPP[mode = 
ALL_EXCHANGES_HYBRID_SELECTIVE]">
+               <Resource name="sql">
+                       <![CDATA[SELECT * FROM (Select count(*) c1 from 
fact_part, dim where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 
100) s1, (Select count(*) c2 from fact_part, dim where fact_part.fact_date_sk = 
dim_date_sk and fact_part.price < 200) s2, (Select count(*) c3 from fact_part, 
dim where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 400) s3]]>
+               </Resource>
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(c1=[$0], c2=[$1], c3=[$2])
++- LogicalJoin(condition=[true], joinType=[inner])
+   :- LogicalJoin(condition=[true], joinType=[inner])
+   :  :- LogicalAggregate(group=[{}], c1=[COUNT()])
+   :  :  +- LogicalProject($f0=[0])
+   :  :     +- LogicalFilter(condition=[AND(=($4, $9), <($3, 100))])
+   :  :        +- LogicalJoin(condition=[true], joinType=[inner])
+   :  :           :- LogicalTableScan(table=[[testCatalog, test_database, 
fact_part]])
+   :  :           +- LogicalTableScan(table=[[testCatalog, test_database, 
dim]])
+   :  +- LogicalAggregate(group=[{}], c2=[COUNT()])
+   :     +- LogicalProject($f0=[0])
+   :        +- LogicalFilter(condition=[AND(=($4, $9), <($3, 200))])
+   :           +- LogicalJoin(condition=[true], joinType=[inner])
+   :              :- LogicalTableScan(table=[[testCatalog, test_database, 
fact_part]])
+   :              +- LogicalTableScan(table=[[testCatalog, test_database, 
dim]])
+   +- LogicalAggregate(group=[{}], c3=[COUNT()])
+      +- LogicalProject($f0=[0])
+         +- LogicalFilter(condition=[AND(=($4, $9), <($3, 400))])
+            +- LogicalJoin(condition=[true], joinType=[inner])
+               :- LogicalTableScan(table=[[testCatalog, test_database, 
fact_part]])
+               +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+MultipleInput(readOrder=[0,0,1], 
members=[\nNestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2, 
c3], build=[right], singleRowJoin=[true])\n:- 
NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2], 
build=[left], singleRowJoin=[true])\n:  :- [#2] 
Exchange(distribution=[broadcast])\n:  +- [#3] HashAggregate(isMerge=[true], 
select=[Final_COUNT(count1$0) AS c2])\n+- [#1] 
Exchange(distribution=[broadcast])\n])
+:- Exchange(distribution=[broadcast])
+:  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3])
+:     +- Exchange(distribution=[single])
+:        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- Calc(select=[fact_date_sk], where=[(price < 400)])
+:                 :     +- TableSourceScan(table=[[testCatalog, test_database, 
fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]], 
fields=[price, fact_date_sk])(reuse_id=[1])
+:                 +- Exchange(distribution=[hash[dim_date_sk]])(reuse_id=[2])
+:                    +- TableSourceScan(table=[[testCatalog, test_database, 
dim, project=[dim_date_sk], metadata=[]]], fields=[dim_date_sk])
+:- Exchange(distribution=[broadcast])
+:  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1])
+:     +- Exchange(distribution=[single])
+:        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- Calc(select=[fact_date_sk], where=[(price < 100)])
+:                 :     +- Reused(reference_id=[1])
+:                 +- Reused(reference_id=[2])
++- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])
+   +- Exchange(distribution=[single])
+      +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+         +- Calc(select=[0 AS $f0])
+            +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+               :- Exchange(distribution=[hash[fact_date_sk]])
+               :  +- Calc(select=[fact_date_sk], where=[(price < 200)])
+               :     +- Reused(reference_id=[1])
+               +- Reused(reference_id=[2])
+]]>
+               </Resource>
+       </TestCase>
+       <TestCase name="testMultipleInputWithoutDPP[mode = 
ALL_EXCHANGES_HYBRID_FULL]">
+               <Resource name="sql">
+                       <![CDATA[SELECT * FROM (Select count(*) c1 from 
fact_part, dim where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 
100) s1, (Select count(*) c2 from fact_part, dim where fact_part.fact_date_sk = 
dim_date_sk and fact_part.price < 200) s2, (Select count(*) c3 from fact_part, 
dim where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 400) s3]]>
+               </Resource>
+               <Resource name="ast">
+                       <![CDATA[
+LogicalProject(c1=[$0], c2=[$1], c3=[$2])
++- LogicalJoin(condition=[true], joinType=[inner])
+   :- LogicalJoin(condition=[true], joinType=[inner])
+   :  :- LogicalAggregate(group=[{}], c1=[COUNT()])
+   :  :  +- LogicalProject($f0=[0])
+   :  :     +- LogicalFilter(condition=[AND(=($4, $9), <($3, 100))])
+   :  :        +- LogicalJoin(condition=[true], joinType=[inner])
+   :  :           :- LogicalTableScan(table=[[testCatalog, test_database, 
fact_part]])
+   :  :           +- LogicalTableScan(table=[[testCatalog, test_database, 
dim]])
+   :  +- LogicalAggregate(group=[{}], c2=[COUNT()])
+   :     +- LogicalProject($f0=[0])
+   :        +- LogicalFilter(condition=[AND(=($4, $9), <($3, 200))])
+   :           +- LogicalJoin(condition=[true], joinType=[inner])
+   :              :- LogicalTableScan(table=[[testCatalog, test_database, 
fact_part]])
+   :              +- LogicalTableScan(table=[[testCatalog, test_database, 
dim]])
+   +- LogicalAggregate(group=[{}], c3=[COUNT()])
+      +- LogicalProject($f0=[0])
+         +- LogicalFilter(condition=[AND(=($4, $9), <($3, 400))])
+            +- LogicalJoin(condition=[true], joinType=[inner])
+               :- LogicalTableScan(table=[[testCatalog, test_database, 
fact_part]])
+               +- LogicalTableScan(table=[[testCatalog, test_database, dim]])
+]]>
+               </Resource>
+               <Resource name="optimized exec plan">
+                       <![CDATA[
+MultipleInput(readOrder=[0,0,1], 
members=[\nNestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2, 
c3], build=[right], singleRowJoin=[true])\n:- 
NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2], 
build=[left], singleRowJoin=[true])\n:  :- [#2] 
Exchange(distribution=[broadcast])\n:  +- [#3] HashAggregate(isMerge=[true], 
select=[Final_COUNT(count1$0) AS c2])\n+- [#1] 
Exchange(distribution=[broadcast])\n])
+:- Exchange(distribution=[broadcast])
+:  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3])
+:     +- Exchange(distribution=[single])
+:        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- Calc(select=[fact_date_sk], where=[(price < 400)])
+:                 :     +- TableSourceScan(table=[[testCatalog, test_database, 
fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]], 
fields=[price, fact_date_sk])(reuse_id=[1])
+:                 +- Exchange(distribution=[hash[dim_date_sk]])(reuse_id=[2])
+:                    +- TableSourceScan(table=[[testCatalog, test_database, 
dim, project=[dim_date_sk], metadata=[]]], fields=[dim_date_sk])
+:- Exchange(distribution=[broadcast])
+:  +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1])
+:     +- Exchange(distribution=[single])
+:        +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+:           +- Calc(select=[0 AS $f0])
+:              +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+:                 :- Exchange(distribution=[hash[fact_date_sk]])
+:                 :  +- Calc(select=[fact_date_sk], where=[(price < 100)])
+:                 :     +- Reused(reference_id=[1])
+:                 +- Reused(reference_id=[2])
++- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])
+   +- Exchange(distribution=[single])
+      +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+         +- Calc(select=[0 AS $f0])
+            +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = 
dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left])
+               :- Exchange(distribution=[hash[fact_date_sk]])
+               :  +- Calc(select=[fact_date_sk], where=[(price < 200)])
+               :     +- Reused(reference_id=[1])
+               +- Reused(reference_id=[2])
+]]>
+               </Resource>
+       </TestCase>
+</Root>

Reply via email to