This is an automated email from the ASF dual-hosted git repository. ron pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 9cac80be18c [FLINK-33612][table-planner] Hybrid shuffle mode avoids unnecessary blocking edges in the plan 9cac80be18c is described below commit 9cac80be18c6aff0cebdfe706327c1693822e884 Author: Yuxin Tan <tanyuxinw...@gmail.com> AuthorDate: Wed Nov 22 14:22:45 2023 +0800 [FLINK-33612][table-planner] Hybrid shuffle mode avoids unnecessary blocking edges in the plan --- .../utils/InputPriorityConflictResolver.java | 6 +- .../planner/utils/StreamExchangeModeUtils.java | 4 + .../physical/batch/BatchPhysicalExchange.scala | 10 +- .../utils/InputPriorityConflictResolverTest.java | 72 +++- .../plan/optimize/ShuffleModePlanOptimizeTest.java | 127 ++++++ .../planner/utils/StreamExchangeModeUtilsTest.java | 11 + .../plan/optimize/ShuffleModePlanOptimizeTest.xml | 445 +++++++++++++++++++++ 7 files changed, 655 insertions(+), 20 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java index 144b527c520..0479993b5bd 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolver.java @@ -199,7 +199,11 @@ public class InputPriorityConflictResolver extends InputPriorityGraphGenerator { } private InputProperty.DamBehavior getDamBehavior() { - if (getBatchStreamExchangeMode(tableConfig, exchangeMode) == StreamExchangeMode.BATCH) { + StreamExchangeMode streamExchangeMode = + getBatchStreamExchangeMode(tableConfig, exchangeMode); + if (streamExchangeMode == StreamExchangeMode.BATCH + || streamExchangeMode == StreamExchangeMode.HYBRID_FULL + || streamExchangeMode == StreamExchangeMode.HYBRID_SELECTIVE) { return InputProperty.DamBehavior.BLOCKING; } else { return InputProperty.DamBehavior.PIPELINED; diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtils.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtils.java index bd722a07726..312e59bee34 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtils.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtils.java @@ -52,6 +52,10 @@ public class StreamExchangeModeUtils { final BatchShuffleMode shuffleMode = config.get(ExecutionOptions.BATCH_SHUFFLE_MODE); if (shuffleMode == BatchShuffleMode.ALL_EXCHANGES_BLOCKING) { return StreamExchangeMode.BATCH; + } else if (shuffleMode == BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL) { + return StreamExchangeMode.HYBRID_FULL; + } else if (shuffleMode == BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE) { + return StreamExchangeMode.HYBRID_SELECTIVE; } return StreamExchangeMode.UNDEFINED; diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala index 3fbddc658e1..0a57f8087f8 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalExchange.scala @@ -60,10 +60,12 @@ class BatchPhysicalExchange( val exchangeMode = getBatchStreamExchangeMode(unwrapTableConfig(this), StreamExchangeMode.UNDEFINED) - val damBehavior = if (exchangeMode eq StreamExchangeMode.BATCH) { - InputProperty.DamBehavior.BLOCKING - } else { - InputProperty.DamBehavior.PIPELINED + val damBehavior = exchangeMode match { + case StreamExchangeMode.BATCH | StreamExchangeMode.HYBRID_FULL | + StreamExchangeMode.HYBRID_SELECTIVE => + InputProperty.DamBehavior.BLOCKING + case _ => + InputProperty.DamBehavior.PIPELINED } InputProperty.builder diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java index 927347defdd..41a85088b3d 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/processor/utils/InputPriorityConflictResolverTest.java @@ -18,7 +18,10 @@ package org.apache.flink.table.planner.plan.nodes.exec.processor.utils; +import org.apache.flink.api.common.BatchShuffleMode; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ExecutionOptions; import org.apache.flink.streaming.api.transformations.StreamExchangeMode; import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; @@ -30,9 +33,15 @@ import org.apache.flink.table.planner.plan.nodes.exec.batch.BatchExecTableSource import org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSourceSpec; import org.apache.flink.table.types.logical.IntType; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.Optional; import java.util.function.Consumer; @@ -40,9 +49,27 @@ import java.util.function.Consumer; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link InputPriorityConflictResolver}. */ +@ExtendWith(ParameterizedTestExtension.class) class InputPriorityConflictResolverTest { - @Test + @Parameter + public Tuple2<BatchShuffleMode, StreamExchangeMode> batchShuffleModeAndStreamExchangeMode; + + @Parameters(name = "batchShuffleModeAndStreamExchangeMode={0}") + public static Collection<Tuple2<BatchShuffleMode, StreamExchangeMode>> parameters() { + return Arrays.asList( + Tuple2.of(BatchShuffleMode.ALL_EXCHANGES_BLOCKING, StreamExchangeMode.BATCH), + Tuple2.of(BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL, StreamExchangeMode.BATCH), + Tuple2.of( + BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE, StreamExchangeMode.BATCH), + Tuple2.of( + BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL, StreamExchangeMode.HYBRID_FULL), + Tuple2.of( + BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE, + StreamExchangeMode.HYBRID_SELECTIVE)); + } + + @TestTemplate void testDetectAndResolve() { // P = InputProperty.DamBehavior.PIPELINED, E = InputProperty.DamBehavior.END_INPUT // P100 = PIPELINED + priority 100 @@ -79,30 +106,35 @@ class InputPriorityConflictResolverTest { nodes[7].addInput(nodes[5], InputProperty.builder().priority(10).build()); nodes[7].addInput(nodes[6], InputProperty.builder().priority(100).build()); + Configuration configuration = new Configuration(); + BatchShuffleMode batchShuffleMode = batchShuffleModeAndStreamExchangeMode.f0; + StreamExchangeMode streamExchangeMode = batchShuffleModeAndStreamExchangeMode.f1; + configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, batchShuffleMode); + InputPriorityConflictResolver resolver = new InputPriorityConflictResolver( Collections.singletonList(nodes[7]), InputProperty.DamBehavior.END_INPUT, - StreamExchangeMode.BATCH, - new Configuration()); + streamExchangeMode, + configuration); resolver.detectAndResolve(); assertThat(nodes[7].getInputNodes().get(0)).isEqualTo(nodes[1]); assertThat(nodes[7].getInputNodes().get(1)).isEqualTo(nodes[2]); assertThat(nodes[7].getInputNodes().get(2)).isInstanceOf(BatchExecExchange.class); assertThat(((BatchExecExchange) nodes[7].getInputNodes().get(2)).getRequiredExchangeMode()) - .isEqualTo(Optional.of(StreamExchangeMode.BATCH)); + .isEqualTo(Optional.of(streamExchangeMode)); assertThat(nodes[7].getInputNodes().get(2).getInputEdges().get(0).getSource()) .isEqualTo(nodes[3]); assertThat(nodes[7].getInputNodes().get(3)).isInstanceOf(BatchExecExchange.class); assertThat(((BatchExecExchange) nodes[7].getInputNodes().get(3)).getRequiredExchangeMode()) - .isEqualTo(Optional.of(StreamExchangeMode.BATCH)); + .isEqualTo(Optional.of(streamExchangeMode)); assertThat(nodes[7].getInputNodes().get(3).getInputEdges().get(0).getSource()) .isEqualTo(nodes[4]); assertThat(nodes[7].getInputNodes().get(4)).isEqualTo(nodes[5]); assertThat(nodes[7].getInputNodes().get(5)).isEqualTo(nodes[6]); } - @Test + @TestTemplate void testDeadlockCausedByExchange() { // P1 = PIPELINED + priority 1 // @@ -113,15 +145,20 @@ class InputPriorityConflictResolverTest { nodes[i] = new TestingBatchExecNode("TestingBatchExecNode" + i); } + Configuration configuration = new Configuration(); + BatchShuffleMode batchShuffleMode = batchShuffleModeAndStreamExchangeMode.f0; + StreamExchangeMode streamExchangeMode = batchShuffleModeAndStreamExchangeMode.f1; + configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, batchShuffleMode); + BatchExecExchange exchange = new BatchExecExchange( - new Configuration(), + configuration, InputProperty.builder() .requiredDistribution(InputProperty.ANY_DISTRIBUTION) .build(), (RowType) nodes[0].getOutputType(), "Exchange"); - exchange.setRequiredExchangeMode(StreamExchangeMode.BATCH); + exchange.setRequiredExchangeMode(streamExchangeMode); ExecEdge execEdge = ExecEdge.builder().source(nodes[0]).target(exchange).build(); exchange.setInputEdges(Collections.singletonList(execEdge)); @@ -132,8 +169,8 @@ class InputPriorityConflictResolverTest { new InputPriorityConflictResolver( Collections.singletonList(nodes[1]), InputProperty.DamBehavior.END_INPUT, - StreamExchangeMode.BATCH, - new Configuration()); + streamExchangeMode, + configuration); resolver.detectAndResolve(); ExecNode<?> input0 = nodes[1].getInputNodes().get(0); @@ -145,14 +182,14 @@ class InputPriorityConflictResolverTest { assertThat(execNode).isInstanceOf(BatchExecExchange.class); BatchExecExchange e = (BatchExecExchange) execNode; assertThat(e.getRequiredExchangeMode()) - .isEqualTo(Optional.of(StreamExchangeMode.BATCH)); + .isEqualTo(Optional.of(streamExchangeMode)); assertThat(e.getInputEdges().get(0).getSource()).isEqualTo(nodes[0]); }; checkExchange.accept(input0); checkExchange.accept(input1); } - @Test + @TestTemplate void testWithDynamicFilteringPlan() { // no conflicts for dpp pattern // 2 --------------------------------------(P1)--- 1 --(P0)--> 0 @@ -188,12 +225,17 @@ class InputPriorityConflictResolverTest { ExecEdge toCollector = ExecEdge.builder().source(nodes[2]).target(collector).build(); collector.setInputEdges(Collections.singletonList(toCollector)); + Configuration configuration = new Configuration(); + BatchShuffleMode batchShuffleMode = batchShuffleModeAndStreamExchangeMode.f0; + StreamExchangeMode streamExchangeMode = batchShuffleModeAndStreamExchangeMode.f1; + configuration.set(ExecutionOptions.BATCH_SHUFFLE_MODE, batchShuffleMode); + InputPriorityConflictResolver resolver = new InputPriorityConflictResolver( Collections.singletonList(nodes[1]), InputProperty.DamBehavior.END_INPUT, - StreamExchangeMode.BATCH, - new Configuration()); + streamExchangeMode, + configuration); resolver.detectAndResolve(); ExecNode<?> input0 = nodes[1].getInputNodes().get(0); diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ShuffleModePlanOptimizeTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ShuffleModePlanOptimizeTest.java new file mode 100644 index 00000000000..0c14d90d178 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/optimize/ShuffleModePlanOptimizeTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.optimize; + +import org.apache.flink.api.common.BatchShuffleMode; +import org.apache.flink.configuration.ExecutionOptions; +import org.apache.flink.table.api.TableConfig; +import org.apache.flink.table.api.config.OptimizerConfigOptions; +import org.apache.flink.table.planner.factories.TestValuesCatalog; +import org.apache.flink.table.planner.utils.BatchTableTestUtil; +import org.apache.flink.table.planner.utils.TableTestBase; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameter; +import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension; +import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.util.Arrays; +import java.util.Collection; + +/** Test optimized plans for different shuffle mode. */ +@ExtendWith(ParameterizedTestExtension.class) +class ShuffleModePlanOptimizeTest extends TableTestBase { + + @Parameters(name = "mode = {0}") + public static Collection<BatchShuffleMode> parameters() { + return Arrays.asList( + BatchShuffleMode.ALL_EXCHANGES_BLOCKING, + BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE, + BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL); + } + + @Parameter public BatchShuffleMode mode; + + private final BatchTableTestUtil util = batchTestUtil(TableConfig.getDefault()); + private final TestValuesCatalog catalog = + new TestValuesCatalog("testCatalog", "test_database", true); + + @BeforeEach + void setup() { + catalog.open(); + util.tableEnv().registerCatalog("testCatalog", catalog); + util.tableEnv().useCatalog("testCatalog"); + TableConfig tableConfig = util.tableEnv().getConfig(); + tableConfig.set(OptimizerConfigOptions.TABLE_OPTIMIZER_DYNAMIC_FILTERING_ENABLED, true); + if (mode != null) { + tableConfig.set(ExecutionOptions.BATCH_SHUFFLE_MODE, mode); + } + + // partition fact table. + util.tableEnv() + .executeSql( + "CREATE TABLE fact_part (\n" + + " id BIGINT,\n" + + " name STRING,\n" + + " amount BIGINT,\n" + + " price BIGINT,\n" + + " fact_date_sk BIGINT\n" + + ") PARTITIONED BY (fact_date_sk)\n" + + "WITH (\n" + + " 'connector' = 'values',\n" + + " 'runtime-source' = 'NewSource',\n" + + " 'partition-list' = 'fact_date_sk:1990;fact_date_sk:1991;fact_date_sk:1992',\n" + + " 'dynamic-filtering-fields' = 'fact_date_sk;amount',\n" + + " 'bounded' = 'true'\n" + + ")"); + + // dim table. + util.tableEnv() + .executeSql( + "CREATE TABLE dim (\n" + + " id BIGINT,\n" + + " male BOOLEAN,\n" + + " amount BIGINT,\n" + + " price BIGINT,\n" + + " dim_date_sk BIGINT\n" + + ") WITH (\n" + + " 'connector' = 'values',\n" + + " 'runtime-source' = 'NewSource',\n" + + " 'bounded' = 'true'\n" + + ")"); + } + + @TestTemplate + void testMultipleInputWithDPP() { + String query = + "SELECT * FROM" + + " (Select count(*) c1 from fact_part, dim " + + "where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 100) s1," + + " (Select count(*) c2 from fact_part, dim " + + "where fact_part.fact_date_sk = dim_date_sk and dim.price < 200) s2," + + " (Select count(*) c3 from fact_part, dim " + + "where fact_part.fact_date_sk = dim_date_sk and dim.price < 400) s3"; + util.verifyExecPlan(query); + } + + @TestTemplate + void testMultipleInputWithoutDPP() { + String query = + "SELECT * FROM" + + " (Select count(*) c1 from fact_part, dim " + + "where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 100) s1," + + " (Select count(*) c2 from fact_part, dim " + + "where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 200) s2," + + " (Select count(*) c3 from fact_part, dim " + + "where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 400) s3"; + util.verifyExecPlan(query); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtilsTest.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtilsTest.java index e5a4d918b15..c350a991390 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtilsTest.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/utils/StreamExchangeModeUtilsTest.java @@ -47,6 +47,17 @@ class StreamExchangeModeUtilsTest { assertThat(getBatchStreamExchangeMode(configuration, null)) .isEqualTo(StreamExchangeMode.BATCH); + configuration.set( + ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL); + assertThat(getBatchStreamExchangeMode(configuration, null)) + .isEqualTo(StreamExchangeMode.HYBRID_FULL); + + configuration.set( + ExecutionOptions.BATCH_SHUFFLE_MODE, + BatchShuffleMode.ALL_EXCHANGES_HYBRID_SELECTIVE); + assertThat(getBatchStreamExchangeMode(configuration, null)) + .isEqualTo(StreamExchangeMode.HYBRID_SELECTIVE); + configuration.set( ExecutionOptions.BATCH_SHUFFLE_MODE, BatchShuffleMode.ALL_EXCHANGES_PIPELINED); assertThat(getBatchStreamExchangeMode(configuration, null)) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ShuffleModePlanOptimizeTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ShuffleModePlanOptimizeTest.xml new file mode 100644 index 00000000000..65382e2f600 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ShuffleModePlanOptimizeTest.xml @@ -0,0 +1,445 @@ +<?xml version="1.0" ?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<Root> + <TestCase name="testMultipleInputWithDPP[mode = ALL_EXCHANGES_BLOCKING]"> + <Resource name="sql"> + <![CDATA[SELECT * FROM (Select count(*) c1 from fact_part, dim where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 100) s1, (Select count(*) c2 from fact_part, dim where fact_part.fact_date_sk = dim_date_sk and dim.price < 200) s2, (Select count(*) c3 from fact_part, dim where fact_part.fact_date_sk = dim_date_sk and dim.price < 400) s3]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(c1=[$0], c2=[$1], c3=[$2]) ++- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalAggregate(group=[{}], c1=[COUNT()]) + : : +- LogicalProject($f0=[0]) + : : +- LogicalFilter(condition=[AND(=($4, $9), <($3, 100))]) + : : +- LogicalJoin(condition=[true], joinType=[inner]) + : : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + : : +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) + : +- LogicalAggregate(group=[{}], c2=[COUNT()]) + : +- LogicalProject($f0=[0]) + : +- LogicalFilter(condition=[AND(=($4, $9), <($8, 200))]) + : +- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + : +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) + +- LogicalAggregate(group=[{}], c3=[COUNT()]) + +- LogicalProject($f0=[0]) + +- LogicalFilter(condition=[AND(=($4, $9), <($8, 400))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +MultipleInput(readOrder=[0,0,1], members=[\nNestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2, c3], build=[right], singleRowJoin=[true])\n:- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2], build=[left], singleRowJoin=[true])\n: :- [#2] Exchange(distribution=[broadcast])\n: +- [#3] HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])\n+- [#1] Exchange(distribution=[broadcast])\n]) +:- Exchange(distribution=[broadcast]) +: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3]) +: +- Exchange(distribution=[single]) +: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) +: +- Calc(select=[0 AS $f0]) +: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right]) +: :- Exchange(distribution=[hash[fact_date_sk]]) +: : +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] Exchange(distribution=[any], shuffle_mode=[BATCH])\n+- [#2] DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n]) +: : :- Exchange(distribution=[any], shuffle_mode=[BATCH]) +: : : +- DynamicFilteringDataCollector(fields=[dim_date_sk]) +: : : +- Calc(select=[dim_date_sk], where=[(price < 400)])(reuse_id=[1]) +: : : +- TableSourceScan(table=[[testCatalog, test_database, dim, project=[price, dim_date_sk], metadata=[]]], fields=[price, dim_date_sk])(reuse_id=[2]) +: : +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk]) +: +- Exchange(distribution=[hash[dim_date_sk]]) +: +- Reused(reference_id=[1]) +:- Exchange(distribution=[broadcast]) +: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1]) +: +- Exchange(distribution=[single]) +: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) +: +- Calc(select=[0 AS $f0]) +: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left]) +: :- Exchange(distribution=[hash[fact_date_sk]]) +: : +- Calc(select=[fact_date_sk], where=[(price < 100)]) +: : +- TableSourceScan(table=[[testCatalog, test_database, fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]], fields=[price, fact_date_sk]) +: +- Exchange(distribution=[hash[dim_date_sk]]) +: +- Calc(select=[dim_date_sk]) +: +- Reused(reference_id=[2]) ++- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2]) + +- Exchange(distribution=[single]) + +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) + +- Calc(select=[0 AS $f0]) + +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right]) + :- Exchange(distribution=[hash[fact_date_sk]]) + : +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] Exchange(distribution=[any], shuffle_mode=[BATCH])\n+- [#2] DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n]) + : :- Exchange(distribution=[any], shuffle_mode=[BATCH]) + : : +- DynamicFilteringDataCollector(fields=[dim_date_sk]) + : : +- Calc(select=[dim_date_sk], where=[(price < 200)])(reuse_id=[3]) + : : +- Reused(reference_id=[2]) + : +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk]) + +- Exchange(distribution=[hash[dim_date_sk]]) + +- Reused(reference_id=[3]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultipleInputWithDPP[mode = ALL_EXCHANGES_HYBRID_SELECTIVE]"> + <Resource name="sql"> + <![CDATA[SELECT * FROM (Select count(*) c1 from fact_part, dim where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 100) s1, (Select count(*) c2 from fact_part, dim where fact_part.fact_date_sk = dim_date_sk and dim.price < 200) s2, (Select count(*) c3 from fact_part, dim where fact_part.fact_date_sk = dim_date_sk and dim.price < 400) s3]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(c1=[$0], c2=[$1], c3=[$2]) ++- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalAggregate(group=[{}], c1=[COUNT()]) + : : +- LogicalProject($f0=[0]) + : : +- LogicalFilter(condition=[AND(=($4, $9), <($3, 100))]) + : : +- LogicalJoin(condition=[true], joinType=[inner]) + : : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + : : +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) + : +- LogicalAggregate(group=[{}], c2=[COUNT()]) + : +- LogicalProject($f0=[0]) + : +- LogicalFilter(condition=[AND(=($4, $9), <($8, 200))]) + : +- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + : +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) + +- LogicalAggregate(group=[{}], c3=[COUNT()]) + +- LogicalProject($f0=[0]) + +- LogicalFilter(condition=[AND(=($4, $9), <($8, 400))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +MultipleInput(readOrder=[0,0,1], members=[\nNestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2, c3], build=[right], singleRowJoin=[true])\n:- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2], build=[left], singleRowJoin=[true])\n: :- [#2] Exchange(distribution=[broadcast])\n: +- [#3] HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])\n+- [#1] Exchange(distribution=[broadcast])\n]) +:- Exchange(distribution=[broadcast]) +: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3]) +: +- Exchange(distribution=[single]) +: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) +: +- Calc(select=[0 AS $f0]) +: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right]) +: :- Exchange(distribution=[hash[fact_date_sk]]) +: : +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] Exchange(distribution=[any], shuffle_mode=[BATCH])\n+- [#2] DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n]) +: : :- Exchange(distribution=[any], shuffle_mode=[BATCH]) +: : : +- DynamicFilteringDataCollector(fields=[dim_date_sk]) +: : : +- Calc(select=[dim_date_sk], where=[(price < 400)])(reuse_id=[1]) +: : : +- TableSourceScan(table=[[testCatalog, test_database, dim, project=[price, dim_date_sk], metadata=[]]], fields=[price, dim_date_sk])(reuse_id=[2]) +: : +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk]) +: +- Exchange(distribution=[hash[dim_date_sk]], shuffle_mode=[BATCH]) +: +- Reused(reference_id=[1]) +:- Exchange(distribution=[broadcast]) +: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1]) +: +- Exchange(distribution=[single]) +: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) +: +- Calc(select=[0 AS $f0]) +: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left]) +: :- Exchange(distribution=[hash[fact_date_sk]]) +: : +- Calc(select=[fact_date_sk], where=[(price < 100)]) +: : +- TableSourceScan(table=[[testCatalog, test_database, fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]], fields=[price, fact_date_sk]) +: +- Exchange(distribution=[hash[dim_date_sk]]) +: +- Calc(select=[dim_date_sk]) +: +- Exchange(distribution=[any], shuffle_mode=[BATCH]) +: +- Reused(reference_id=[2]) ++- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2]) + +- Exchange(distribution=[single]) + +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) + +- Calc(select=[0 AS $f0]) + +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right]) + :- Exchange(distribution=[hash[fact_date_sk]]) + : +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] Exchange(distribution=[any], shuffle_mode=[BATCH])\n+- [#2] DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n]) + : :- Exchange(distribution=[any], shuffle_mode=[BATCH]) + : : +- DynamicFilteringDataCollector(fields=[dim_date_sk]) + : : +- Calc(select=[dim_date_sk], where=[(price < 200)])(reuse_id=[3]) + : : +- Reused(reference_id=[2]) + : +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk]) + +- Exchange(distribution=[hash[dim_date_sk]], shuffle_mode=[BATCH]) + +- Reused(reference_id=[3]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultipleInputWithDPP[mode = ALL_EXCHANGES_HYBRID_FULL]"> + <Resource name="sql"> + <![CDATA[SELECT * FROM (Select count(*) c1 from fact_part, dim where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 100) s1, (Select count(*) c2 from fact_part, dim where fact_part.fact_date_sk = dim_date_sk and dim.price < 200) s2, (Select count(*) c3 from fact_part, dim where fact_part.fact_date_sk = dim_date_sk and dim.price < 400) s3]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(c1=[$0], c2=[$1], c3=[$2]) ++- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalAggregate(group=[{}], c1=[COUNT()]) + : : +- LogicalProject($f0=[0]) + : : +- LogicalFilter(condition=[AND(=($4, $9), <($3, 100))]) + : : +- LogicalJoin(condition=[true], joinType=[inner]) + : : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + : : +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) + : +- LogicalAggregate(group=[{}], c2=[COUNT()]) + : +- LogicalProject($f0=[0]) + : +- LogicalFilter(condition=[AND(=($4, $9), <($8, 200))]) + : +- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + : +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) + +- LogicalAggregate(group=[{}], c3=[COUNT()]) + +- LogicalProject($f0=[0]) + +- LogicalFilter(condition=[AND(=($4, $9), <($8, 400))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +MultipleInput(readOrder=[0,0,1], members=[\nNestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2, c3], build=[right], singleRowJoin=[true])\n:- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2], build=[left], singleRowJoin=[true])\n: :- [#2] Exchange(distribution=[broadcast])\n: +- [#3] HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])\n+- [#1] Exchange(distribution=[broadcast])\n]) +:- Exchange(distribution=[broadcast]) +: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3]) +: +- Exchange(distribution=[single]) +: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) +: +- Calc(select=[0 AS $f0]) +: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right]) +: :- Exchange(distribution=[hash[fact_date_sk]]) +: : +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] Exchange(distribution=[any], shuffle_mode=[BATCH])\n+- [#2] DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n]) +: : :- Exchange(distribution=[any], shuffle_mode=[BATCH]) +: : : +- DynamicFilteringDataCollector(fields=[dim_date_sk]) +: : : +- Calc(select=[dim_date_sk], where=[(price < 400)])(reuse_id=[1]) +: : : +- TableSourceScan(table=[[testCatalog, test_database, dim, project=[price, dim_date_sk], metadata=[]]], fields=[price, dim_date_sk])(reuse_id=[2]) +: : +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk]) +: +- Exchange(distribution=[hash[dim_date_sk]], shuffle_mode=[BATCH]) +: +- Reused(reference_id=[1]) +:- Exchange(distribution=[broadcast]) +: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1]) +: +- Exchange(distribution=[single]) +: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) +: +- Calc(select=[0 AS $f0]) +: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left]) +: :- Exchange(distribution=[hash[fact_date_sk]]) +: : +- Calc(select=[fact_date_sk], where=[(price < 100)]) +: : +- TableSourceScan(table=[[testCatalog, test_database, fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]], fields=[price, fact_date_sk]) +: +- Exchange(distribution=[hash[dim_date_sk]]) +: +- Calc(select=[dim_date_sk]) +: +- Exchange(distribution=[any], shuffle_mode=[BATCH]) +: +- Reused(reference_id=[2]) ++- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2]) + +- Exchange(distribution=[single]) + +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) + +- Calc(select=[0 AS $f0]) + +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[right]) + :- Exchange(distribution=[hash[fact_date_sk]]) + : +- MultipleInput(members=[\nOrderEnforcer\n:- [#1] Exchange(distribution=[any], shuffle_mode=[BATCH])\n+- [#2] DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk])\n]) + : :- Exchange(distribution=[any], shuffle_mode=[BATCH]) + : : +- DynamicFilteringDataCollector(fields=[dim_date_sk]) + : : +- Calc(select=[dim_date_sk], where=[(price < 200)])(reuse_id=[3]) + : : +- Reused(reference_id=[2]) + : +- DynamicFilteringTableSourceScan(table=[[testCatalog, test_database, fact_part, project=[fact_date_sk], metadata=[]]], fields=[fact_date_sk]) + +- Exchange(distribution=[hash[dim_date_sk]], shuffle_mode=[BATCH]) + +- Reused(reference_id=[3]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultipleInputWithoutDPP[mode = ALL_EXCHANGES_BLOCKING]"> + <Resource name="sql"> + <![CDATA[SELECT * FROM (Select count(*) c1 from fact_part, dim where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 100) s1, (Select count(*) c2 from fact_part, dim where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 200) s2, (Select count(*) c3 from fact_part, dim where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 400) s3]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(c1=[$0], c2=[$1], c3=[$2]) ++- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalAggregate(group=[{}], c1=[COUNT()]) + : : +- LogicalProject($f0=[0]) + : : +- LogicalFilter(condition=[AND(=($4, $9), <($3, 100))]) + : : +- LogicalJoin(condition=[true], joinType=[inner]) + : : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + : : +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) + : +- LogicalAggregate(group=[{}], c2=[COUNT()]) + : +- LogicalProject($f0=[0]) + : +- LogicalFilter(condition=[AND(=($4, $9), <($3, 200))]) + : +- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + : +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) + +- LogicalAggregate(group=[{}], c3=[COUNT()]) + +- LogicalProject($f0=[0]) + +- LogicalFilter(condition=[AND(=($4, $9), <($3, 400))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +MultipleInput(readOrder=[0,0,1], members=[\nNestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2, c3], build=[right], singleRowJoin=[true])\n:- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2], build=[left], singleRowJoin=[true])\n: :- [#2] Exchange(distribution=[broadcast])\n: +- [#3] HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])\n+- [#1] Exchange(distribution=[broadcast])\n]) +:- Exchange(distribution=[broadcast]) +: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3]) +: +- Exchange(distribution=[single]) +: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) +: +- Calc(select=[0 AS $f0]) +: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left]) +: :- Exchange(distribution=[hash[fact_date_sk]]) +: : +- Calc(select=[fact_date_sk], where=[(price < 400)]) +: : +- TableSourceScan(table=[[testCatalog, test_database, fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]], fields=[price, fact_date_sk])(reuse_id=[1]) +: +- Exchange(distribution=[hash[dim_date_sk]])(reuse_id=[2]) +: +- TableSourceScan(table=[[testCatalog, test_database, dim, project=[dim_date_sk], metadata=[]]], fields=[dim_date_sk]) +:- Exchange(distribution=[broadcast]) +: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1]) +: +- Exchange(distribution=[single]) +: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) +: +- Calc(select=[0 AS $f0]) +: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left]) +: :- Exchange(distribution=[hash[fact_date_sk]]) +: : +- Calc(select=[fact_date_sk], where=[(price < 100)]) +: : +- Reused(reference_id=[1]) +: +- Reused(reference_id=[2]) ++- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2]) + +- Exchange(distribution=[single]) + +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) + +- Calc(select=[0 AS $f0]) + +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left]) + :- Exchange(distribution=[hash[fact_date_sk]]) + : +- Calc(select=[fact_date_sk], where=[(price < 200)]) + : +- Reused(reference_id=[1]) + +- Reused(reference_id=[2]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultipleInputWithoutDPP[mode = ALL_EXCHANGES_HYBRID_SELECTIVE]"> + <Resource name="sql"> + <![CDATA[SELECT * FROM (Select count(*) c1 from fact_part, dim where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 100) s1, (Select count(*) c2 from fact_part, dim where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 200) s2, (Select count(*) c3 from fact_part, dim where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 400) s3]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(c1=[$0], c2=[$1], c3=[$2]) ++- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalAggregate(group=[{}], c1=[COUNT()]) + : : +- LogicalProject($f0=[0]) + : : +- LogicalFilter(condition=[AND(=($4, $9), <($3, 100))]) + : : +- LogicalJoin(condition=[true], joinType=[inner]) + : : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + : : +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) + : +- LogicalAggregate(group=[{}], c2=[COUNT()]) + : +- LogicalProject($f0=[0]) + : +- LogicalFilter(condition=[AND(=($4, $9), <($3, 200))]) + : +- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + : +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) + +- LogicalAggregate(group=[{}], c3=[COUNT()]) + +- LogicalProject($f0=[0]) + +- LogicalFilter(condition=[AND(=($4, $9), <($3, 400))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +MultipleInput(readOrder=[0,0,1], members=[\nNestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2, c3], build=[right], singleRowJoin=[true])\n:- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2], build=[left], singleRowJoin=[true])\n: :- [#2] Exchange(distribution=[broadcast])\n: +- [#3] HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])\n+- [#1] Exchange(distribution=[broadcast])\n]) +:- Exchange(distribution=[broadcast]) +: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3]) +: +- Exchange(distribution=[single]) +: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) +: +- Calc(select=[0 AS $f0]) +: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left]) +: :- Exchange(distribution=[hash[fact_date_sk]]) +: : +- Calc(select=[fact_date_sk], where=[(price < 400)]) +: : +- TableSourceScan(table=[[testCatalog, test_database, fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]], fields=[price, fact_date_sk])(reuse_id=[1]) +: +- Exchange(distribution=[hash[dim_date_sk]])(reuse_id=[2]) +: +- TableSourceScan(table=[[testCatalog, test_database, dim, project=[dim_date_sk], metadata=[]]], fields=[dim_date_sk]) +:- Exchange(distribution=[broadcast]) +: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1]) +: +- Exchange(distribution=[single]) +: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) +: +- Calc(select=[0 AS $f0]) +: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left]) +: :- Exchange(distribution=[hash[fact_date_sk]]) +: : +- Calc(select=[fact_date_sk], where=[(price < 100)]) +: : +- Reused(reference_id=[1]) +: +- Reused(reference_id=[2]) ++- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2]) + +- Exchange(distribution=[single]) + +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) + +- Calc(select=[0 AS $f0]) + +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left]) + :- Exchange(distribution=[hash[fact_date_sk]]) + : +- Calc(select=[fact_date_sk], where=[(price < 200)]) + : +- Reused(reference_id=[1]) + +- Reused(reference_id=[2]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultipleInputWithoutDPP[mode = ALL_EXCHANGES_HYBRID_FULL]"> + <Resource name="sql"> + <![CDATA[SELECT * FROM (Select count(*) c1 from fact_part, dim where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 100) s1, (Select count(*) c2 from fact_part, dim where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 200) s2, (Select count(*) c3 from fact_part, dim where fact_part.fact_date_sk = dim_date_sk and fact_part.price < 400) s3]]> + </Resource> + <Resource name="ast"> + <![CDATA[ +LogicalProject(c1=[$0], c2=[$1], c3=[$2]) ++- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalAggregate(group=[{}], c1=[COUNT()]) + : : +- LogicalProject($f0=[0]) + : : +- LogicalFilter(condition=[AND(=($4, $9), <($3, 100))]) + : : +- LogicalJoin(condition=[true], joinType=[inner]) + : : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + : : +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) + : +- LogicalAggregate(group=[{}], c2=[COUNT()]) + : +- LogicalProject($f0=[0]) + : +- LogicalFilter(condition=[AND(=($4, $9), <($3, 200))]) + : +- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + : +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) + +- LogicalAggregate(group=[{}], c3=[COUNT()]) + +- LogicalProject($f0=[0]) + +- LogicalFilter(condition=[AND(=($4, $9), <($3, 400))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalTableScan(table=[[testCatalog, test_database, fact_part]]) + +- LogicalTableScan(table=[[testCatalog, test_database, dim]]) +]]> + </Resource> + <Resource name="optimized exec plan"> + <![CDATA[ +MultipleInput(readOrder=[0,0,1], members=[\nNestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2, c3], build=[right], singleRowJoin=[true])\n:- NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[c1, c2], build=[left], singleRowJoin=[true])\n: :- [#2] Exchange(distribution=[broadcast])\n: +- [#3] HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2])\n+- [#1] Exchange(distribution=[broadcast])\n]) +:- Exchange(distribution=[broadcast]) +: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c3]) +: +- Exchange(distribution=[single]) +: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) +: +- Calc(select=[0 AS $f0]) +: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left]) +: :- Exchange(distribution=[hash[fact_date_sk]]) +: : +- Calc(select=[fact_date_sk], where=[(price < 400)]) +: : +- TableSourceScan(table=[[testCatalog, test_database, fact_part, filter=[], project=[price, fact_date_sk], metadata=[]]], fields=[price, fact_date_sk])(reuse_id=[1]) +: +- Exchange(distribution=[hash[dim_date_sk]])(reuse_id=[2]) +: +- TableSourceScan(table=[[testCatalog, test_database, dim, project=[dim_date_sk], metadata=[]]], fields=[dim_date_sk]) +:- Exchange(distribution=[broadcast]) +: +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c1]) +: +- Exchange(distribution=[single]) +: +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) +: +- Calc(select=[0 AS $f0]) +: +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left]) +: :- Exchange(distribution=[hash[fact_date_sk]]) +: : +- Calc(select=[fact_date_sk], where=[(price < 100)]) +: : +- Reused(reference_id=[1]) +: +- Reused(reference_id=[2]) ++- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS c2]) + +- Exchange(distribution=[single]) + +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0]) + +- Calc(select=[0 AS $f0]) + +- HashJoin(joinType=[InnerJoin], where=[(fact_date_sk = dim_date_sk)], select=[fact_date_sk, dim_date_sk], build=[left]) + :- Exchange(distribution=[hash[fact_date_sk]]) + : +- Calc(select=[fact_date_sk], where=[(price < 200)]) + : +- Reused(reference_id=[1]) + +- Reused(reference_id=[2]) +]]> + </Resource> + </TestCase> +</Root>