This is an automated email from the ASF dual-hosted git repository. kurt pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new a6d72fe [FLINK-12937][table-planner-blink] Introduce join reorder planner rules in blink planner a6d72fe is described below commit a6d72fed708e99404c96ffeca6ad70e08963e9cd Author: godfreyhe <godfre...@163.com> AuthorDate: Sat Jun 22 12:50:06 2019 +0800 [FLINK-12937][table-planner-blink] Introduce join reorder planner rules in blink planner This closes #8832 --- .../flink/table/api/PlannerConfigOptions.java | 6 + .../plan/optimize/program/FlinkBatchProgram.scala | 22 +- .../plan/optimize/program/FlinkStreamProgram.scala | 23 +- .../table/plan/rules/FlinkBatchRuleSets.scala | 16 + .../table/plan/rules/FlinkStreamRuleSets.scala | 16 + .../logical/RewriteMultiJoinConditionRule.scala | 129 +++++ .../table/plan/batch/sql/join/JoinReorderTest.xml | 600 +++++++++++++++++++ .../logical/RewriteMultiJoinConditionRuleTest.xml | 318 +++++++++++ .../table/plan/stream/sql/join/JoinReorderTest.xml | 635 +++++++++++++++++++++ .../plan/batch/sql/join/JoinReorderTest.scala | 25 + .../table/plan/common/JoinReorderTestBase.scala | 233 ++++++++ .../FlinkAggregateInnerJoinTransposeRuleTest.scala | 2 +- .../logical/FlinkJoinPushExpressionsRuleTest.scala | 10 +- .../RewriteMultiJoinConditionRuleTest.scala | 153 +++++ .../plan/stream/sql/join/JoinReorderTest.scala | 25 + 15 files changed, 2208 insertions(+), 5 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java index 7b05df1..5e7a1a3 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/api/PlannerConfigOptions.java @@ -137,4 +137,10 @@ public class PlannerConfigOptions { .defaultValue(true) .withDescription("Allow trying to push predicate down to a FilterableTableSource. " + "the default value is true, means allow the attempt."); + + public static final ConfigOption<Boolean> SQL_OPTIMIZER_JOIN_REORDER_ENABLED = + key("sql.optimizer.join-reorder.enabled") + .defaultValue(false) + .withDescription("Enables join reorder in optimizer cbo. Default is disabled."); + } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala index 090b4fb..e73fde6 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkBatchProgram.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.plan.optimize.program import org.apache.flink.configuration.Configuration +import org.apache.flink.table.api.PlannerConfigOptions import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.rules.FlinkBatchRuleSets @@ -33,6 +34,7 @@ object FlinkBatchProgram { val DECORRELATE = "decorrelate" val DEFAULT_REWRITE = "default_rewrite" val PREDICATE_PUSHDOWN = "predicate_pushdown" + val JOIN_REORDER = "join_reorder" val JOIN_REWRITE = "join_rewrite" val WINDOW = "window" val LOGICAL = "logical" @@ -120,7 +122,7 @@ object FlinkBatchProgram { .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) .add(FlinkBatchRuleSets.FILTER_PREPARE_RULES) .build(), "other predicate rewrite") - .setIterations(5).build()) + .setIterations(5).build(), "predicate rewrite") .addProgram( FlinkHepRuleSetProgramBuilder.newBuilder .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE) @@ -135,6 +137,24 @@ object FlinkBatchProgram { .build(), "prune empty after predicate push down") .build()) + // join reorder + if (config.getBoolean(PlannerConfigOptions.SQL_OPTIMIZER_JOIN_REORDER_ENABLED)) { + chainedProgram.addLast( + JOIN_REORDER, + FlinkGroupProgramBuilder.newBuilder[BatchOptimizeContext] + .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder + .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION) + .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) + .add(FlinkBatchRuleSets.JOIN_REORDER_PERPARE_RULES) + .build(), "merge join into MultiJoin") + .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder + .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE) + .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) + .add(FlinkBatchRuleSets.JOIN_REORDER_RULES) + .build(), "do join reorder") + .build()) + } + // join rewrite chainedProgram.addLast( JOIN_REWRITE, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala index a5f1dac..b64ff31 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/optimize/program/FlinkStreamProgram.scala @@ -18,11 +18,13 @@ package org.apache.flink.table.plan.optimize.program -import org.apache.calcite.plan.hep.HepMatchOrder import org.apache.flink.configuration.Configuration +import org.apache.flink.table.api.PlannerConfigOptions import org.apache.flink.table.plan.nodes.FlinkConventions import org.apache.flink.table.plan.rules.{FlinkBatchRuleSets, FlinkStreamRuleSets} +import org.apache.calcite.plan.hep.HepMatchOrder + /** * Defines a sequence of programs to optimize for stream table plan. */ @@ -34,6 +36,7 @@ object FlinkStreamProgram { val TIME_INDICATOR = "time_indicator" val DEFAULT_REWRITE = "default_rewrite" val PREDICATE_PUSHDOWN = "predicate_pushdown" + val JOIN_REORDER = "join_reorder" val LOGICAL = "logical" val LOGICAL_REWRITE = "logical_rewrite" val PHYSICAL = "physical" @@ -130,6 +133,24 @@ object FlinkStreamProgram { .build(), "prune empty after predicate push down") .build()) + // join reorder + if (config.getBoolean(PlannerConfigOptions.SQL_OPTIMIZER_JOIN_REORDER_ENABLED)) { + chainedProgram.addLast( + JOIN_REORDER, + FlinkGroupProgramBuilder.newBuilder[StreamOptimizeContext] + .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder + .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION) + .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) + .add(FlinkStreamRuleSets.JOIN_REORDER_PERPARE_RULES) + .build(), "merge join into MultiJoin") + .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder + .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE) + .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) + .add(FlinkStreamRuleSets.JOIN_REORDER_RULES) + .build(), "do join reorder") + .build()) + } + // optimize the logical plan chainedProgram.addLast( LOGICAL, diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala index 75f1d5a..71fcb23 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala @@ -218,6 +218,22 @@ object FlinkBatchRuleSets { FILTER_RULES.asScala ).asJava) + val JOIN_REORDER_PERPARE_RULES: RuleSet = RuleSets.ofList( + // merge join to MultiJoin + JoinToMultiJoinRule.INSTANCE, + // merge project to MultiJoin + ProjectMultiJoinMergeRule.INSTANCE, + // merge filter to MultiJoin + FilterMultiJoinMergeRule.INSTANCE + ) + + val JOIN_REORDER_RULES: RuleSet = RuleSets.ofList( + // equi-join predicates transfer + RewriteMultiJoinConditionRule.INSTANCE, + // join reorder + LoptOptimizeJoinRule.INSTANCE + ) + /** * RuleSet to do logical optimize. * This RuleSet is a sub-set of [[LOGICAL_OPT_RULES]]. diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala index 0f8b219..ca94b8e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala @@ -201,6 +201,22 @@ object FlinkStreamRuleSets { ProjectSetOpTransposeRule.INSTANCE ) + val JOIN_REORDER_PERPARE_RULES: RuleSet = RuleSets.ofList( + // merge project to MultiJoin + ProjectMultiJoinMergeRule.INSTANCE, + // merge filter to MultiJoin + FilterMultiJoinMergeRule.INSTANCE, + // merge join to MultiJoin + JoinToMultiJoinRule.INSTANCE + ) + + val JOIN_REORDER_RULES: RuleSet = RuleSets.ofList( + // equi-join predicates transfer + RewriteMultiJoinConditionRule.INSTANCE, + // join reorder + LoptOptimizeJoinRule.INSTANCE + ) + /** * RuleSet to do logical optimize. * This RuleSet is a sub-set of [[LOGICAL_OPT_RULES]]. diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRule.scala new file mode 100644 index 0000000..392e58b --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRule.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.rules.logical + +import org.apache.calcite.plan.RelOptRule._ +import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptUtil} +import org.apache.calcite.rel.core.JoinRelType +import org.apache.calcite.rel.rules.MultiJoin +import org.apache.calcite.rex.{RexCall, RexNode} +import org.apache.calcite.sql.SqlKind +import org.apache.calcite.sql.fun.SqlStdOperatorTable._ + +import scala.collection.JavaConversions._ +import scala.collection.mutable + +/** + * Planner rule to apply transitive closure on [[MultiJoin]] for equi-join predicates. + * + * <p>e.g. + * MJ(A, B, C) ON A.a1=B.b1 AND B.b1=C.c1 → + * MJ(A, B, C) ON A.a1=B.b1 AND B.b1=C.c1 AND A.a1=C.c1 + * + * The advantage of applying this rule is that it increases the choice of join reorder; + * at the same time, the disadvantage is that it will use more CPU for additional join predicates. + */ +class RewriteMultiJoinConditionRule extends RelOptRule( + operand(classOf[MultiJoin], any), + "RewriteMultiJoinConditionRule") { + + override def matches(call: RelOptRuleCall): Boolean = { + val multiJoin: MultiJoin = call.rel(0) + // currently only supports all join types are INNER join + val isAllInnerJoin = multiJoin.getJoinTypes.forall(_ eq JoinRelType.INNER) + val (equiJoinFilters, _) = partitionJoinFilters(multiJoin) + !multiJoin.isFullOuterJoin && isAllInnerJoin && equiJoinFilters.size > 1 + } + + override def onMatch(call: RelOptRuleCall): Unit = { + val multiJoin: MultiJoin = call.rel(0) + val (equiJoinFilters, nonEquiJoinFilters) = partitionJoinFilters(multiJoin) + // there is no `equals` method in RexCall, so the key of this map should be String + val equiJoinFilterMap = mutable.HashMap[String, mutable.ListBuffer[RexNode]]() + equiJoinFilters.foreach { + case c: RexCall => + require(c.isA(SqlKind.EQUALS)) + val left = c.operands.head + val right = c.operands(1) + equiJoinFilterMap.getOrElseUpdate(left.toString, mutable.ListBuffer[RexNode]()) += right + equiJoinFilterMap.getOrElseUpdate(right.toString, mutable.ListBuffer[RexNode]()) += left + } + + val candidateJoinFilters = equiJoinFilterMap.values.filter(_.size > 1) + if (candidateJoinFilters.isEmpty) { + // no transitive closure predicates + return + } + + val newEquiJoinFilters = mutable.ListBuffer[RexNode](equiJoinFilters: _*) + def containEquiJoinFilter(joinFilter: RexNode): Boolean = { + newEquiJoinFilters.exists { f => f.toString.equals(joinFilter.toString) } + } + + val rexBuilder = multiJoin.getCluster.getRexBuilder + candidateJoinFilters.foreach { + candidate => candidate.indices.foreach { + startIndex => + val op1 = candidate(startIndex) + candidate.subList(startIndex + 1, candidate.size).foreach { + op2 => + // `a = b` and `b = a` are the same + val newFilter1 = rexBuilder.makeCall(EQUALS, op1, op2) + val newFilter2 = rexBuilder.makeCall(EQUALS, op2, op1) + if (!containEquiJoinFilter(newFilter1) && !containEquiJoinFilter(newFilter2)) { + newEquiJoinFilters += newFilter1 + } + } + } + } + + if (newEquiJoinFilters.size == equiJoinFilters.size) { + // no new join filters added + return + } + + val newJoinFilter = call.builder().and(newEquiJoinFilters.toList ::: nonEquiJoinFilters.toList) + val newMultiJoin = + new MultiJoin( + multiJoin.getCluster, + multiJoin.getInputs, + newJoinFilter, + multiJoin.getRowType, + multiJoin.isFullOuterJoin, + multiJoin.getOuterJoinConditions, + multiJoin.getJoinTypes, + multiJoin.getProjFields, + multiJoin.getJoinFieldRefCountsMap, + multiJoin.getPostJoinFilter) + + call.transformTo(newMultiJoin) + } + + /** + * Partitions MultiJoin condition in equi join filters and non-equi join filters. + */ + private def partitionJoinFilters(multiJoin: MultiJoin): (Seq[RexNode], Seq[RexNode]) = { + val joinFilters = RelOptUtil.conjunctions(multiJoin.getJoinFilter) + joinFilters.partition(f => f.isA(SqlKind.EQUALS)) + } + +} + +object RewriteMultiJoinConditionRule { + val INSTANCE = new RewriteMultiJoinConditionRule +} diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/JoinReorderTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/JoinReorderTest.xml new file mode 100644 index 0000000..8fe210d --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/JoinReorderTest.xml @@ -0,0 +1,600 @@ +<?xml version="1.0" ?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<Root> + <TestCase name="testAllFullOuterJoin"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM T1 + FULL OUTER JOIN T2 ON a1 = a2 + FULL OUTER JOIN T3 ON a1 = a3 + FULL OUTER JOIN T4 ON a1 = a4 + FULL OUTER JOIN T5 ON a4 = a5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalJoin(condition=[=($9, $12)], joinType=[full]) + :- LogicalJoin(condition=[=($0, $9)], joinType=[full]) + : :- LogicalJoin(condition=[=($0, $6)], joinType=[full]) + : : :- LogicalJoin(condition=[=($0, $3)], joinType=[full]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +HashJoin(joinType=[FullOuterJoin], where=[=(a4, a5)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5], build=[right]) +:- Exchange(distribution=[hash[a4]]) +: +- HashJoin(joinType=[FullOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], build=[right]) +: :- NestedLoopJoin(joinType=[FullOuterJoin], where=[=(a1, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], build=[right]) +: : :- Exchange(distribution=[hash[a1]]) +: : : +- HashJoin(joinType=[FullOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], build=[right]) +: : : :- Exchange(distribution=[hash[a1]]) +: : : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) +: : : +- Exchange(distribution=[hash[a2]]) +: : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) +: : +- Exchange(distribution=[single]) +: : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) +: +- Exchange(distribution=[hash[a4]]) +: +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) ++- Exchange(distribution=[hash[a5]]) + +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) +]]> + </Resource> + </TestCase> + <TestCase name="testAllLeftOuterJoin"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM T1 + LEFT OUTER JOIN T2 ON a1 = a2 + LEFT OUTER JOIN T3 ON a2 = a3 + LEFT OUTER JOIN T4 ON a1 = a4 + LEFT OUTER JOIN T5 ON a4 = a5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalJoin(condition=[=($9, $12)], joinType=[left]) + :- LogicalJoin(condition=[=($0, $9)], joinType=[left]) + : :- LogicalJoin(condition=[=($3, $6)], joinType=[left]) + : : :- LogicalJoin(condition=[=($0, $3)], joinType=[left]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +HashJoin(joinType=[LeftOuterJoin], where=[=(a4, a5)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5], build=[right]) +:- Exchange(distribution=[hash[a4]]) +: +- HashJoin(joinType=[LeftOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right]) +: :- HashJoin(joinType=[LeftOuterJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right]) +: : :- HashJoin(joinType=[LeftOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], isBroadcast=[true], build=[right]) +: : : :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) +: : : +- Exchange(distribution=[broadcast]) +: : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) +: : +- Exchange(distribution=[broadcast]) +: : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) +: +- Exchange(distribution=[broadcast]) +: +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) ++- Exchange(distribution=[hash[a5]]) + +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) +]]> + </Resource> + </TestCase> + <TestCase name="testAllRightOuterJoin"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM T1 + RIGHT OUTER JOIN T2 ON a1 = a2 + RIGHT OUTER JOIN T3 ON a2 = a3 + RIGHT OUTER JOIN T4 ON a1 = a4 + RIGHT OUTER JOIN T5 ON a4 = a5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalJoin(condition=[=($9, $12)], joinType=[right]) + :- LogicalJoin(condition=[=($0, $9)], joinType=[right]) + : :- LogicalJoin(condition=[=($3, $6)], joinType=[right]) + : : :- LogicalJoin(condition=[=($0, $3)], joinType=[right]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) ++- HashJoin(joinType=[LeftOuterJoin], where=[=(a4, a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right]) + :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + +- Exchange(distribution=[broadcast]) + +- HashJoin(joinType=[RightOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[left]) + :- Exchange(distribution=[broadcast]) + : +- SortMergeJoin(joinType=[RightOuterJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3]) + : :- HashJoin(joinType=[RightOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], build=[right]) + : : :- Exchange(distribution=[hash[a1]]) + : : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + : : +- Exchange(distribution=[hash[a2]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + : +- Exchange(distribution=[hash[a3]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) + +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) +]]> + </Resource> + </TestCase> + <TestCase name="testBushyJoinCondition1"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM T1, T2, T3, T4, T5 +WHERE a1 = a2 AND a2 = a3 AND a1 = a4 AND a3 = a5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalFilter(condition=[AND(=($0, $3), =($3, $6), =($0, $9), =($6, $12))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalJoin(condition=[true], joinType=[inner]) + : : :- LogicalJoin(condition=[true], joinType=[inner]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) ++- HashJoin(joinType=[InnerJoin], where=[=(a2, a4)], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right]) + :- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3], build=[right]) + : :- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + : +- Exchange(distribution=[broadcast]) + : +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a5, a3)], select=[a5, b5, c5, a1, b1, c1, a3, b3, c3], build=[right]) + : :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + : +- Exchange(distribution=[broadcast]) + : +- Calc(select=[a1, b1, c1, a3, b3, c3], where=[=(a3, a1)]) + : +- HashJoin(joinType=[InnerJoin], where=[=(a3, a1)], select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right]) + : :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + : +- Exchange(distribution=[broadcast]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) + +- Exchange(distribution=[broadcast]) + +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) +]]> + </Resource> + </TestCase> + <TestCase name="testBushyJoinCondition2"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM T1, T2, T3, T4, T5 +WHERE b1 = b2 AND b2 = b3 AND b1 = b4 AND b3 = b5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalFilter(condition=[AND(=($1, $4), =($4, $7), =($1, $10), =($7, $13))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalJoin(condition=[true], joinType=[inner]) + : : :- LogicalJoin(condition=[true], joinType=[inner]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) ++- HashJoin(joinType=[InnerJoin], where=[=(b1, b3)], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right]) + :- HashJoin(joinType=[InnerJoin], where=[=(b1, b4)], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4], build=[right]) + : :- Exchange(distribution=[hash[b1]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + : +- Exchange(distribution=[hash[b4]]) + : +- HashJoin(joinType=[InnerJoin], where=[=(b5, b4)], select=[a5, b5, c5, a2, b2, c2, a4, b4, c4], isBroadcast=[true], build=[right]) + : :- Calc(select=[a5, b5, c5, a2, b2, c2], where=[=(b5, b2)]) + : : +- HashJoin(joinType=[InnerJoin], where=[=(b5, b2)], select=[a5, b5, c5, a2, b2, c2], isBroadcast=[true], build=[right]) + : : :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + : : +- Exchange(distribution=[broadcast]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + : +- Exchange(distribution=[broadcast]) + : +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) + +- Exchange(distribution=[broadcast]) + +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) +]]> + </Resource> + </TestCase> + <TestCase name="testInnerAndFullOuterJoin"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM T1 + JOIN T2 ON a1 = a2 + FULL OUTER JOIN T3 ON a2 = a3 + JOIN T4 ON a1 = a4 + JOIN T5 ON a4 = a5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalJoin(condition=[=($9, $12)], joinType=[inner]) + :- LogicalJoin(condition=[=($0, $9)], joinType=[inner]) + : :- LogicalJoin(condition=[=($3, $6)], joinType=[full]) + : : :- LogicalJoin(condition=[=($0, $3)], joinType=[inner]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) ++- HashJoin(joinType=[InnerJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right]) + :- NestedLoopJoin(joinType=[FullOuterJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], build=[right]) + : :- Exchange(distribution=[single]) + : : +- HashJoin(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], isBroadcast=[true], build=[right]) + : : :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + : : +- Exchange(distribution=[broadcast]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + : +- Exchange(distribution=[single]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) + +- Exchange(distribution=[broadcast]) + +- Calc(select=[a5, b5, c5, a4, b4, c4], where=[=(a4, a5)]) + +- HashJoin(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right]) + :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + +- Exchange(distribution=[broadcast]) + +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) +]]> + </Resource> + </TestCase> + <TestCase name="testInnerAndLeftOuterJoin"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM T1 + JOIN T2 ON a1 = a2 + JOIN T3 ON a2 = a3 + LEFT OUTER JOIN T4 ON a1 = a4 + JOIN T5 ON a4 = a5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalJoin(condition=[=($9, $12)], joinType=[inner]) + :- LogicalJoin(condition=[=($0, $9)], joinType=[left]) + : :- LogicalJoin(condition=[=($3, $6)], joinType=[inner]) + : : :- LogicalJoin(condition=[=($0, $3)], joinType=[inner]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) ++- HashJoin(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right]) + :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + +- Exchange(distribution=[broadcast]) + +- HashJoin(joinType=[LeftOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right]) + :- HashJoin(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right]) + : :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + : +- Exchange(distribution=[broadcast]) + : +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, b2, c2, a3, b3, c3], isBroadcast=[true], build=[right]) + : :- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + : +- Exchange(distribution=[broadcast]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) + +- Exchange(distribution=[broadcast]) + +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) +]]> + </Resource> + </TestCase> + <TestCase name="testInnerAndRightOuterJoin"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM T1 + RIGHT OUTER JOIN T2 ON a1 = a2 + JOIN T3 ON a2 = a3 + JOIN T4 ON a1 = a4 + JOIN T5 ON a4 = a5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalJoin(condition=[=($9, $12)], joinType=[inner]) + :- LogicalJoin(condition=[=($0, $9)], joinType=[inner]) + : :- LogicalJoin(condition=[=($3, $6)], joinType=[inner]) + : : :- LogicalJoin(condition=[=($0, $3)], joinType=[right]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) ++- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a4, b4, c4, a3, b3, c3], build=[right]) + :- HashJoin(joinType=[InnerJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right]) + : :- HashJoin(joinType=[RightOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], build=[right]) + : : :- Exchange(distribution=[hash[a1]]) + : : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + : : +- Exchange(distribution=[hash[a2]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + : +- Exchange(distribution=[broadcast]) + : +- Calc(select=[a5, b5, c5, a4, b4, c4], where=[=(a4, a5)]) + : +- HashJoin(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a4, b4, c4], isBroadcast=[true], build=[right]) + : :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + : +- Exchange(distribution=[broadcast]) + : +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) + +- Exchange(distribution=[broadcast]) + +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) +]]> + </Resource> + </TestCase> + <TestCase name="testJoinWithFilter"> + <Resource name="sql"> + <![CDATA[ +WITH V1 AS (SELECT * FROM T1 JOIN T2 ON a1 = a2 WHERE b1 * b2 > 10), + V2 AS (SELECT * FROM V1 JOIN T3 ON a2 = a3 WHERE b1 * b3 < 2000), + V3 AS (SELECT * FROM T4 JOIN V2 ON a3 = a4 WHERE b2 + b4 > 100) + +SELECT * FROM V3, T5 WHERE a4 = a5 AND b5 < 15 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a4=[$0], b4=[$1], c4=[$2], a1=[$3], b1=[$4], c1=[$5], a2=[$6], b2=[$7], c2=[$8], a3=[$9], b3=[$10], c3=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalFilter(condition=[AND(=($0, $12), <($13, 15))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalProject(a4=[$0], b4=[$1], c4=[$2], a1=[$3], b1=[$4], c1=[$5], a2=[$6], b2=[$7], c2=[$8], a3=[$9], b3=[$10], c3=[$11]) + : +- LogicalFilter(condition=[>(+($7, $1), 100)]) + : +- LogicalJoin(condition=[=($9, $0)], joinType=[inner]) + : :- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + : +- LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8]) + : +- LogicalFilter(condition=[<(*($1, $7), 2000)]) + : +- LogicalJoin(condition=[=($3, $6)], joinType=[inner]) + : :- LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5]) + : : +- LogicalFilter(condition=[>(*($1, $4), 10)]) + : : +- LogicalJoin(condition=[=($0, $3)], joinType=[inner]) + : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a4, b4, c4, a1, b1, c1, a2, b2, c2, a3, b3, c3, a5, b5, c5]) ++- HashJoin(joinType=[InnerJoin], where=[AND(>(+(b2, b4), 100), =(a2, a4))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right]) + :- NestedLoopJoin(joinType=[InnerJoin], where=[AND(>(*(b1, b2), 10), =(a2, a3))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3], build=[right]) + : :- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + : +- Exchange(distribution=[broadcast]) + : +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a5, a3)], select=[a5, b5, c5, a1, b1, c1, a3, b3, c3], build=[right]) + : :- Calc(select=[a5, b5, c5], where=[<(b5, 15)]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + : +- Exchange(distribution=[broadcast]) + : +- Calc(select=[a1, b1, c1, a3, b3, c3], where=[=(a3, a1)]) + : +- HashJoin(joinType=[InnerJoin], where=[AND(=(a3, a1), <(*(b1, b3), 2000))], select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right]) + : :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + : +- Exchange(distribution=[broadcast]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) + +- Exchange(distribution=[broadcast]) + +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) +]]> + </Resource> + </TestCase> + <TestCase name="testJoinWithProject"> + <Resource name="sql"> + <![CDATA[ +WITH V1 AS (SELECT b1, a1, a2, c2 FROM T1 JOIN T2 ON a1 = a2), + V2 AS (SELECT a3, b1, a1, c2, c3 FROM V1 JOIN T3 ON a2 = a3), + V3 AS (SELECT a3, b1, a1, c2, c3, a4, b4 FROM T4 JOIN V2 ON a1 = a4) + +SELECT * FROM V3, T5 where a4 = a5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a3=[$0], b1=[$1], a1=[$2], c2=[$3], c3=[$4], a4=[$5], b4=[$6], a5=[$7], b5=[$8], c5=[$9]) ++- LogicalFilter(condition=[=($5, $7)]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalProject(a3=[$3], b1=[$4], a1=[$5], c2=[$6], c3=[$7], a4=[$0], b4=[$1]) + : +- LogicalJoin(condition=[=($5, $0)], joinType=[inner]) + : :- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + : +- LogicalProject(a3=[$4], b1=[$0], a1=[$1], c2=[$3], c3=[$6]) + : +- LogicalJoin(condition=[=($2, $4)], joinType=[inner]) + : :- LogicalProject(b1=[$1], a1=[$0], a2=[$3], c2=[$5]) + : : +- LogicalJoin(condition=[=($0, $3)], joinType=[inner]) + : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a3, b1, a1, c2, c3, a4, b4, a5, b5, c5]) ++- HashJoin(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a3, b1, a1, c2, c3, a4, b4], isBroadcast=[true], build=[right]) + :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + +- Exchange(distribution=[broadcast]) + +- HashJoin(joinType=[InnerJoin], where=[=(a1, a4)], select=[a3, b1, a1, c2, c3, a4, b4], isBroadcast=[true], build=[right]) + :- Calc(select=[a3, b1, a1, c2, c3]) + : +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[b1, a1, a2, c2, a3, c3], isBroadcast=[true], build=[right]) + : :- Calc(select=[b1, a1, a2, c2]) + : : +- HashJoin(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, c2], isBroadcast=[true], build=[right]) + : : :- Calc(select=[a1, b1]) + : : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + : : +- Exchange(distribution=[broadcast]) + : : +- Calc(select=[a2, c2]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + : +- Exchange(distribution=[broadcast]) + : +- Calc(select=[a3, c3]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) + +- Exchange(distribution=[broadcast]) + +- Calc(select=[a4, b4]) + +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) +]]> + </Resource> + </TestCase> + <TestCase name="testStarJoinCondition1"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM T1, T2, T3, T4, T5 +WHERE a1 = a2 AND a1 = a3 AND a1 = a4 AND a1 = a5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalFilter(condition=[AND(=($0, $3), =($0, $6), =($0, $9), =($0, $12))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalJoin(condition=[true], joinType=[inner]) + : : :- LogicalJoin(condition=[true], joinType=[inner]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) ++- HashJoin(joinType=[InnerJoin], where=[=(a2, a4)], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right]) + :- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right]) + : :- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + : +- Exchange(distribution=[broadcast]) + : +- HashJoin(joinType=[InnerJoin], where=[=(a5, a3)], select=[a5, b5, c5, a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right]) + : :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + : +- Exchange(distribution=[broadcast]) + : +- HashJoin(joinType=[InnerJoin], where=[=(a1, a3)], select=[a1, b1, c1, a3, b3, c3], isBroadcast=[true], build=[right]) + : :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + : +- Exchange(distribution=[broadcast]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) + +- Exchange(distribution=[broadcast]) + +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) +]]> + </Resource> + </TestCase> + <TestCase name="testStarJoinCondition2"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM T1, T2, T3, T4, T5 +WHERE b1 = b2 AND b1 = b3 AND b1 = b4 AND b1 = b5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalFilter(condition=[AND(=($1, $4), =($1, $7), =($1, $10), =($1, $13))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalJoin(condition=[true], joinType=[inner]) + : : :- LogicalJoin(condition=[true], joinType=[inner]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) ++- HashJoin(joinType=[InnerJoin], where=[=(b1, b3)], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4, a3, b3, c3], isBroadcast=[true], build=[right]) + :- HashJoin(joinType=[InnerJoin], where=[=(b1, b4)], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4], build=[right]) + : :- Exchange(distribution=[hash[b1]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + : +- Exchange(distribution=[hash[b4]]) + : +- HashJoin(joinType=[InnerJoin], where=[=(b5, b4)], select=[a5, b5, c5, a2, b2, c2, a4, b4, c4], isBroadcast=[true], build=[right]) + : :- Calc(select=[a5, b5, c5, a2, b2, c2], where=[=(b5, b2)]) + : : +- HashJoin(joinType=[InnerJoin], where=[=(b5, b2)], select=[a5, b5, c5, a2, b2, c2], isBroadcast=[true], build=[right]) + : : :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + : : +- Exchange(distribution=[broadcast]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + : +- Exchange(distribution=[broadcast]) + : +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) + +- Exchange(distribution=[broadcast]) + +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) +]]> + </Resource> + </TestCase> + <TestCase name="testWithoutColumnStats"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM T1, T2, T3, T4, T5 +WHERE c1 = c2 AND c1 = c3 AND c2 = c4 AND c1 = c5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalFilter(condition=[AND(=($2, $5), =($2, $8), =($5, $11), =($2, $14))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalJoin(condition=[true], joinType=[inner]) + : : :- LogicalJoin(condition=[true], joinType=[inner]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) ++- HashJoin(joinType=[InnerJoin], where=[=(c1, c2)], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right]) + :- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + +- Exchange(distribution=[broadcast]) + +- HashJoin(joinType=[InnerJoin], where=[=(c2, c5)], select=[a2, b2, c2, a5, b5, c5, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right]) + :- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + +- Exchange(distribution=[broadcast]) + +- HashJoin(joinType=[InnerJoin], where=[=(c3, c4)], select=[a5, b5, c5, a3, b3, c3, a4, b4, c4], isBroadcast=[true], build=[right]) + :- Calc(select=[a5, b5, c5, a3, b3, c3], where=[=(c5, c3)]) + : +- HashJoin(joinType=[InnerJoin], where=[=(c5, c3)], select=[a5, b5, c5, a3, b3, c3], isBroadcast=[true], build=[right]) + : :- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + : +- Exchange(distribution=[broadcast]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) + +- Exchange(distribution=[broadcast]) + +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) +]]> + </Resource> + </TestCase> +</Root> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRuleTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRuleTest.xml new file mode 100644 index 0000000..29dd20a --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRuleTest.xml @@ -0,0 +1,318 @@ +<?xml version="1.0" ?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<Root> + <TestCase name="testMultiJoin_FullJoin1"> + <Resource name="sql"> + <![CDATA[SELECT * FROM A FULL OUTER JOIN B ON a1 = b1]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3]) ++- LogicalJoin(condition=[=($0, $2)], joinType=[full]) + :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[true], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[{0, 1}, {0, 1}]]) +:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) ++- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiJoin_FullJoin2"> + <Resource name="sql"> + <![CDATA[SELECT * FROM A FULL OUTER JOIN B ON a1 = b1 FULL OUTER JOIN C ON a1 = c1]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5]) ++- LogicalJoin(condition=[=($0, $4)], joinType=[full]) + :- LogicalJoin(condition=[=($0, $2)], joinType=[full]) + : :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[true], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]]) +:- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[true], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]]) +: :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) +: +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) ++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiJoin_InnerJoin1"> + <Resource name="sql"> + <![CDATA[SELECT * FROM A, B WHERE a1 = b1]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3]) ++- LogicalFilter(condition=[=($0, $2)]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[{0, 1}, {0, 1}]]) +:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) ++- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiJoin_InnerJoin2"> + <Resource name="sql"> + <![CDATA[SELECT * FROM A, B, C WHERE a1 = b1 AND a1 = c1]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5]) ++- LogicalFilter(condition=[AND(=($0, $2), =($0, $4))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +MultiJoin(joinFilter=[AND(=($0, $4), =($0, $2), =($4, $2))], isFullOuterJoin=[false], joinTypes=[[INNER, INNER, INNER]], outerJoinConditions=[[NULL, NULL, NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}]]) +:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) +:- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) ++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiJoin_InnerJoin3"> + <Resource name="sql"> + <![CDATA[SELECT * FROM A, B, C, D WHERE a1 = b1 AND b1 = c1 AND c1 = d1]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5], d1=[$6], d2=[$7]) ++- LogicalFilter(condition=[AND(=($0, $2), =($2, $4), =($4, $6))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalJoin(condition=[true], joinType=[inner]) + : : :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, D, source: [TestTableSource(d1, d2)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +MultiJoin(joinFilter=[AND(=($4, $6), =($2, $4), =($0, $2), =($6, $2), =($4, $0), =($6, $0))], isFullOuterJoin=[false], joinTypes=[[INNER, INNER, INNER, INNER]], outerJoinConditions=[[NULL, NULL, NULL, NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}, {0, 1}]]) +:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) +:- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) +:- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]]) ++- LogicalTableScan(table=[[default_catalog, default_database, D, source: [TestTableSource(d1, d2)]]]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiJoin_InnerJoin4"> + <Resource name="sql"> + <![CDATA[SELECT * FROM A, B, C WHERE a1 = b1 AND a1 > c1]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5]) ++- LogicalFilter(condition=[AND(=($0, $2), >($0, $4))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +MultiJoin(joinFilter=[AND(>($0, $4), =($0, $2))], isFullOuterJoin=[false], joinTypes=[[INNER, INNER, INNER]], outerJoinConditions=[[NULL, NULL, NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}]]) +:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) +:- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) ++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiJoin_InnerJoin5"> + <Resource name="sql"> + <![CDATA[SELECT * FROM A, B, C WHERE a1 + 1 = b1 AND a1 + 1 = c1]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5]) ++- LogicalFilter(condition=[AND(=(+($0, 1), $2), =(+($0, 1), $4))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +MultiJoin(joinFilter=[AND(=(+($0, 1), $4), =(+($0, 1), $2), =($4, $2))], isFullOuterJoin=[false], joinTypes=[[INNER, INNER, INNER]], outerJoinConditions=[[NULL, NULL, NULL]], projFields=[[{0, 1}, {0, 1}, {0, 1}]]) +:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) +:- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) ++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiJoin_LeftJoin1"> + <Resource name="sql"> + <![CDATA[SELECT * FROM A LEFT JOIN B ON a1 = b1]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3]) ++- LogicalJoin(condition=[=($0, $2)], joinType=[left]) + :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT]], outerJoinConditions=[[NULL, =($0, $2)]], projFields=[[{0, 1}, {0, 1}]]) +:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) ++- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiJoin_LeftJoin2"> + <Resource name="sql"> + <![CDATA[SELECT * FROM A JOIN B ON a1 = b1 LEFT JOIN C ON b1 = c1]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5]) ++- LogicalJoin(condition=[=($2, $4)], joinType=[left]) + :- LogicalJoin(condition=[=($0, $2)], joinType=[inner]) + : :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER, LEFT]], outerJoinConditions=[[NULL, NULL, =($2, $4)]], projFields=[[{0, 1}, {0, 1}, {0, 1}]]) +:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) +:- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) ++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiJoin_LeftJoin3"> + <Resource name="sql"> + <![CDATA[SELECT * FROM A LEFT JOIN B ON a1 = b1 JOIN C ON a1 = c1]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5]) ++- LogicalJoin(condition=[=($0, $4)], joinType=[inner]) + :- LogicalJoin(condition=[=($0, $2)], joinType=[left]) + : :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]]) +:- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[INNER, LEFT]], outerJoinConditions=[[NULL, =($0, $2)]], projFields=[[ALL, ALL]]) +: :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) +: +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) ++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiJoin_RightJoin1"> + <Resource name="sql"> + <![CDATA[SELECT * FROM A RIGHT JOIN B ON a1 = b1]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3]) ++- LogicalJoin(condition=[=($0, $2)], joinType=[right]) + :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $2), NULL]], projFields=[[{0, 1}, {0, 1}]]) +:- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) ++- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiJoin_RightJoin2"> + <Resource name="sql"> + <![CDATA[SELECT * FROM A JOIN B ON a1 = b1 RIGHT JOIN C ON b1 = c1]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5]) ++- LogicalJoin(condition=[=($2, $4)], joinType=[right]) + :- LogicalJoin(condition=[=($0, $2)], joinType=[inner]) + : :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($2, $4), NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]]) +:- MultiJoin(joinFilter=[=($0, $2)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[ALL, ALL]]) +: :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) +: +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) ++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]]) +]]> + </Resource> + </TestCase> + <TestCase name="testMultiJoin_RightJoin3"> + <Resource name="sql"> + <![CDATA[SELECT * FROM A RIGHT JOIN B ON a1 = b1 JOIN C ON a1 = c1]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], a2=[$1], b1=[$2], b2=[$3], c1=[$4], c2=[$5]) ++- LogicalJoin(condition=[=($0, $4)], joinType=[inner]) + :- LogicalJoin(condition=[=($0, $2)], joinType=[right]) + : :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +MultiJoin(joinFilter=[=($0, $4)], isFullOuterJoin=[false], joinTypes=[[INNER, INNER]], outerJoinConditions=[[NULL, NULL]], projFields=[[{0, 1, 2, 3}, {0, 1}]]) +:- MultiJoin(joinFilter=[true], isFullOuterJoin=[false], joinTypes=[[RIGHT, INNER]], outerJoinConditions=[[=($0, $2), NULL]], projFields=[[ALL, ALL]]) +: :- LogicalTableScan(table=[[default_catalog, default_database, A, source: [TestTableSource(a1, a2)]]]) +: +- LogicalTableScan(table=[[default_catalog, default_database, B, source: [TestTableSource(b1, b2)]]]) ++- LogicalTableScan(table=[[default_catalog, default_database, C, source: [TestTableSource(c1, c2)]]]) +]]> + </Resource> + </TestCase> +</Root> diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/JoinReorderTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/JoinReorderTest.xml new file mode 100644 index 0000000..b05c7a0 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/JoinReorderTest.xml @@ -0,0 +1,635 @@ +<?xml version="1.0" ?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> +<Root> + <TestCase name="testAllFullOuterJoin"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM T1 + FULL OUTER JOIN T2 ON a1 = a2 + FULL OUTER JOIN T3 ON a1 = a3 + FULL OUTER JOIN T4 ON a1 = a4 + FULL OUTER JOIN T5 ON a4 = a5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalJoin(condition=[=($9, $12)], joinType=[full]) + :- LogicalJoin(condition=[=($0, $9)], joinType=[full]) + : :- LogicalJoin(condition=[=($0, $6)], joinType=[full]) + : : :- LogicalJoin(condition=[=($0, $3)], joinType=[full]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Join(joinType=[FullOuterJoin], where=[=(a4, a5)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) +:- Exchange(distribution=[hash[a4]]) +: +- Join(joinType=[FullOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) +: :- Exchange(distribution=[hash[a1]]) +: : +- Join(joinType=[FullOuterJoin], where=[=(a1, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) +: : :- Exchange(distribution=[hash[a1]]) +: : : +- Join(joinType=[FullOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) +: : : :- Exchange(distribution=[hash[a1]]) +: : : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) +: : : +- Exchange(distribution=[hash[a2]]) +: : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) +: : +- Exchange(distribution=[hash[a3]]) +: : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) +: +- Exchange(distribution=[hash[a4]]) +: +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) ++- Exchange(distribution=[hash[a5]]) + +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) +]]> + </Resource> + </TestCase> + <TestCase name="testAllLeftOuterJoin"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM T1 + LEFT OUTER JOIN T2 ON a1 = a2 + LEFT OUTER JOIN T3 ON a2 = a3 + LEFT OUTER JOIN T4 ON a1 = a4 + LEFT OUTER JOIN T5 ON a4 = a5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalJoin(condition=[=($9, $12)], joinType=[left]) + :- LogicalJoin(condition=[=($0, $9)], joinType=[left]) + : :- LogicalJoin(condition=[=($3, $6)], joinType=[left]) + : : :- LogicalJoin(condition=[=($0, $3)], joinType=[left]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Join(joinType=[LeftOuterJoin], where=[=(a4, a5)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) +:- Exchange(distribution=[hash[a4]]) +: +- Join(joinType=[LeftOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) +: :- Exchange(distribution=[hash[a1]]) +: : +- Join(joinType=[LeftOuterJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) +: : :- Exchange(distribution=[hash[a2]]) +: : : +- Join(joinType=[LeftOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) +: : : :- Exchange(distribution=[hash[a1]]) +: : : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) +: : : +- Exchange(distribution=[hash[a2]]) +: : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) +: : +- Exchange(distribution=[hash[a3]]) +: : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) +: +- Exchange(distribution=[hash[a4]]) +: +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) ++- Exchange(distribution=[hash[a5]]) + +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) +]]> + </Resource> + </TestCase> + <TestCase name="testAllRightOuterJoin"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM T1 + RIGHT OUTER JOIN T2 ON a1 = a2 + RIGHT OUTER JOIN T3 ON a2 = a3 + RIGHT OUTER JOIN T4 ON a1 = a4 + RIGHT OUTER JOIN T5 ON a4 = a5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalJoin(condition=[=($9, $12)], joinType=[right]) + :- LogicalJoin(condition=[=($0, $9)], joinType=[right]) + : :- LogicalJoin(condition=[=($3, $6)], joinType=[right]) + : : :- LogicalJoin(condition=[=($0, $3)], joinType=[right]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) ++- Join(joinType=[LeftOuterJoin], where=[=(a4, a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[a5]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + +- Exchange(distribution=[hash[a4]]) + +- Join(joinType=[RightOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[a1]]) + : +- Join(joinType=[RightOuterJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : :- Exchange(distribution=[hash[a2]]) + : : +- Join(joinType=[RightOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : : :- Exchange(distribution=[hash[a1]]) + : : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + : : +- Exchange(distribution=[hash[a2]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + : +- Exchange(distribution=[hash[a3]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) + +- Exchange(distribution=[hash[a4]]) + +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) +]]> + </Resource> + </TestCase> + <TestCase name="testBushyJoinCondition1"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM T1, T2, T3, T4, T5 +WHERE a1 = a2 AND a2 = a3 AND a1 = a4 AND a3 = a5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalFilter(condition=[AND(=($0, $3), =($3, $6), =($0, $9), =($6, $12))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalJoin(condition=[true], joinType=[inner]) + : : :- LogicalJoin(condition=[true], joinType=[inner]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) ++- Join(joinType=[InnerJoin], where=[AND(=(a1, a4), =(a4, a2), =(a3, a4), =(a5, a4))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[a1, a2, a3, a5]]) + : +- Join(joinType=[InnerJoin], where=[AND(=(a2, a3), =(a1, a2), =(a5, a2))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : :- Exchange(distribution=[hash[a2, a2, a2]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + : +- Exchange(distribution=[hash[a3, a1, a5]]) + : +- Join(joinType=[InnerJoin], where=[AND(=(a3, a5), =(a1, a5))], select=[a5, b5, c5, a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : :- Exchange(distribution=[hash[a5, a5]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + : +- Exchange(distribution=[hash[a3, a1]]) + : +- Join(joinType=[InnerJoin], where=[=(a3, a1)], select=[a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : :- Exchange(distribution=[hash[a1]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + : +- Exchange(distribution=[hash[a3]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) + +- Exchange(distribution=[hash[a4, a4, a4, a4]]) + +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) +]]> + </Resource> + </TestCase> + <TestCase name="testBushyJoinCondition2"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM T1, T2, T3, T4, T5 +WHERE b1 = b2 AND b2 = b3 AND b1 = b4 AND b3 = b5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalFilter(condition=[AND(=($1, $4), =($4, $7), =($1, $10), =($7, $13))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalJoin(condition=[true], joinType=[inner]) + : : :- LogicalJoin(condition=[true], joinType=[inner]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) ++- Join(joinType=[InnerJoin], where=[AND(=(b3, b5), =(b2, b3), =(b3, b1), =(b4, b3))], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[b5, b2, b1, b4]]) + : +- Join(joinType=[InnerJoin], where=[AND(=(b1, b4), =(b1, b2), =(b5, b1))], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : :- Exchange(distribution=[hash[b1, b1, b1]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + : +- Exchange(distribution=[hash[b4, b2, b5]]) + : +- Join(joinType=[InnerJoin], where=[AND(=(b4, b2), =(b5, b4))], select=[a5, b5, c5, a2, b2, c2, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : :- Exchange(distribution=[hash[b2, b5]]) + : : +- Join(joinType=[InnerJoin], where=[=(b5, b2)], select=[a5, b5, c5, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : : :- Exchange(distribution=[hash[b5]]) + : : : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + : : +- Exchange(distribution=[hash[b2]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + : +- Exchange(distribution=[hash[b4, b4]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) + +- Exchange(distribution=[hash[b3, b3, b3, b3]]) + +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) +]]> + </Resource> + </TestCase> + <TestCase name="testInnerAndFullOuterJoin"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM T1 + JOIN T2 ON a1 = a2 + FULL OUTER JOIN T3 ON a2 = a3 + JOIN T4 ON a1 = a4 + JOIN T5 ON a4 = a5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalJoin(condition=[=($9, $12)], joinType=[inner]) + :- LogicalJoin(condition=[=($0, $9)], joinType=[inner]) + : :- LogicalJoin(condition=[=($3, $6)], joinType=[full]) + : : :- LogicalJoin(condition=[=($0, $3)], joinType=[inner]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) ++- Join(joinType=[InnerJoin], where=[AND(=(a1, a4), =(a5, a1))], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a5, b5, c5, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[a1, a1]]) + : +- Join(joinType=[FullOuterJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : :- Exchange(distribution=[hash[a2]]) + : : +- Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : : :- Exchange(distribution=[hash[a1]]) + : : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + : : +- Exchange(distribution=[hash[a2]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + : +- Exchange(distribution=[hash[a3]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) + +- Exchange(distribution=[hash[a4, a5]]) + +- Join(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[a5]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + +- Exchange(distribution=[hash[a4]]) + +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) +]]> + </Resource> + </TestCase> + <TestCase name="testInnerAndLeftOuterJoin"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM T1 + JOIN T2 ON a1 = a2 + JOIN T3 ON a2 = a3 + LEFT OUTER JOIN T4 ON a1 = a4 + JOIN T5 ON a4 = a5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalJoin(condition=[=($9, $12)], joinType=[inner]) + :- LogicalJoin(condition=[=($0, $9)], joinType=[left]) + : :- LogicalJoin(condition=[=($3, $6)], joinType=[inner]) + : : :- LogicalJoin(condition=[=($0, $3)], joinType=[inner]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) ++- Join(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[a5]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + +- Exchange(distribution=[hash[a4]]) + +- Join(joinType=[LeftOuterJoin], where=[=(a1, a4)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[a1]]) + : +- Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : :- Exchange(distribution=[hash[a1]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + : +- Exchange(distribution=[hash[a2]]) + : +- Join(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, b2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : :- Exchange(distribution=[hash[a2]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + : +- Exchange(distribution=[hash[a3]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) + +- Exchange(distribution=[hash[a4]]) + +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) +]]> + </Resource> + </TestCase> + <TestCase name="testInnerAndRightOuterJoin"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM T1 + RIGHT OUTER JOIN T2 ON a1 = a2 + JOIN T3 ON a2 = a3 + JOIN T4 ON a1 = a4 + JOIN T5 ON a4 = a5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalJoin(condition=[=($9, $12)], joinType=[inner]) + :- LogicalJoin(condition=[=($0, $9)], joinType=[inner]) + : :- LogicalJoin(condition=[=($3, $6)], joinType=[inner]) + : : :- LogicalJoin(condition=[=($0, $3)], joinType=[right]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) ++- Join(joinType=[InnerJoin], where=[=(a2, a3)], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a4, b4, c4, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[a2]]) + : +- Join(joinType=[InnerJoin], where=[AND(=(a1, a4), =(a5, a1))], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : :- Exchange(distribution=[hash[a1, a1]]) + : : +- Join(joinType=[RightOuterJoin], where=[=(a1, a2)], select=[a1, b1, c1, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : : :- Exchange(distribution=[hash[a1]]) + : : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + : : +- Exchange(distribution=[hash[a2]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + : +- Exchange(distribution=[hash[a4, a5]]) + : +- Join(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : :- Exchange(distribution=[hash[a5]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + : +- Exchange(distribution=[hash[a4]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) + +- Exchange(distribution=[hash[a3]]) + +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) +]]> + </Resource> + </TestCase> + <TestCase name="testJoinWithFilter"> + <Resource name="sql"> + <![CDATA[ +WITH V1 AS (SELECT * FROM T1 JOIN T2 ON a1 = a2 WHERE b1 * b2 > 10), + V2 AS (SELECT * FROM V1 JOIN T3 ON a2 = a3 WHERE b1 * b3 < 2000), + V3 AS (SELECT * FROM T4 JOIN V2 ON a3 = a4 WHERE b2 + b4 > 100) + +SELECT * FROM V3, T5 WHERE a4 = a5 AND b5 < 15 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a4=[$0], b4=[$1], c4=[$2], a1=[$3], b1=[$4], c1=[$5], a2=[$6], b2=[$7], c2=[$8], a3=[$9], b3=[$10], c3=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalFilter(condition=[AND(=($0, $12), <($13, 15))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalProject(a4=[$0], b4=[$1], c4=[$2], a1=[$3], b1=[$4], c1=[$5], a2=[$6], b2=[$7], c2=[$8], a3=[$9], b3=[$10], c3=[$11]) + : +- LogicalFilter(condition=[>(+($7, $1), 100)]) + : +- LogicalJoin(condition=[=($9, $0)], joinType=[inner]) + : :- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + : +- LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8]) + : +- LogicalFilter(condition=[<(*($1, $7), 2000)]) + : +- LogicalJoin(condition=[=($3, $6)], joinType=[inner]) + : :- LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5]) + : : +- LogicalFilter(condition=[>(*($1, $4), 10)]) + : : +- LogicalJoin(condition=[=($0, $3)], joinType=[inner]) + : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a4, b4, c4, a1, b1, c1, a2, b2, c2, a3, b3, c3, a5, b5, c5]) ++- Join(joinType=[InnerJoin], where=[AND(=(a4, a5), =(a3, a4), =(a4, a2), =(a1, a4), >(+(b2, b4), 100))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[a5, a3, a2, a1]]) + : +- Join(joinType=[InnerJoin], where=[AND(=(a2, a3), =(a1, a2), =(a2, a5), >(*(b1, b2), 10))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : :- Exchange(distribution=[hash[a2, a2, a2]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + : +- Exchange(distribution=[hash[a3, a1, a5]]) + : +- Join(joinType=[InnerJoin], where=[AND(=(a5, a3), =(a1, a5))], select=[a5, b5, c5, a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : :- Exchange(distribution=[hash[a5, a5]]) + : : +- Calc(select=[a5, b5, c5], where=[<(b5, 15)]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + : +- Exchange(distribution=[hash[a3, a1]]) + : +- Join(joinType=[InnerJoin], where=[AND(=(a3, a1), <(*(b1, b3), 2000))], select=[a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : :- Exchange(distribution=[hash[a1]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + : +- Exchange(distribution=[hash[a3]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) + +- Exchange(distribution=[hash[a4, a4, a4, a4]]) + +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) +]]> + </Resource> + </TestCase> + <TestCase name="testJoinWithProject"> + <Resource name="sql"> + <![CDATA[ +WITH V1 AS (SELECT b1, a1, a2, c2 FROM T1 JOIN T2 ON a1 = a2), + V2 AS (SELECT a3, b1, a1, c2, c3 FROM V1 JOIN T3 ON a2 = a3), + V3 AS (SELECT a3, b1, a1, c2, c3, a4, b4 FROM T4 JOIN V2 ON a1 = a4) + +SELECT * FROM V3, T5 where a4 = a5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a3=[$0], b1=[$1], a1=[$2], c2=[$3], c3=[$4], a4=[$5], b4=[$6], a5=[$7], b5=[$8], c5=[$9]) ++- LogicalFilter(condition=[=($5, $7)]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalProject(a3=[$3], b1=[$4], a1=[$5], c2=[$6], c3=[$7], a4=[$0], b4=[$1]) + : +- LogicalJoin(condition=[=($5, $0)], joinType=[inner]) + : :- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + : +- LogicalProject(a3=[$4], b1=[$0], a1=[$1], c2=[$3], c3=[$6]) + : +- LogicalJoin(condition=[=($2, $4)], joinType=[inner]) + : :- LogicalProject(b1=[$1], a1=[$0], a2=[$3], c2=[$5]) + : : +- LogicalJoin(condition=[=($0, $3)], joinType=[inner]) + : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a3, b1, a1, c2, c3, a4, b4, a5, b5, c5]) ++- Join(joinType=[InnerJoin], where=[=(a4, a5)], select=[a5, b5, c5, a3, b1, a1, c2, c3, a4, b4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[a5]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + +- Exchange(distribution=[hash[a4]]) + +- Join(joinType=[InnerJoin], where=[=(a1, a4)], select=[a3, b1, a1, c2, c3, a4, b4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[a1]]) + : +- Calc(select=[a3, b1, a1, c2, c3]) + : +- Join(joinType=[InnerJoin], where=[=(a2, a3)], select=[b1, a1, a2, c2, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : :- Exchange(distribution=[hash[a2]]) + : : +- Calc(select=[b1, a1, a2, c2]) + : : +- Join(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : : :- Exchange(distribution=[hash[a1]]) + : : : +- Calc(select=[a1, b1]) + : : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + : : +- Exchange(distribution=[hash[a2]]) + : : +- Calc(select=[a2, c2]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + : +- Exchange(distribution=[hash[a3]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) + +- Exchange(distribution=[hash[a4]]) + +- Calc(select=[a4, b4]) + +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) +]]> + </Resource> + </TestCase> + <TestCase name="testStarJoinCondition1"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM T1, T2, T3, T4, T5 +WHERE a1 = a2 AND a1 = a3 AND a1 = a4 AND a1 = a5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalFilter(condition=[AND(=($0, $3), =($0, $6), =($0, $9), =($0, $12))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalJoin(condition=[true], joinType=[inner]) + : : :- LogicalJoin(condition=[true], joinType=[inner]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) ++- Join(joinType=[InnerJoin], where=[AND(=(a1, a4), =(a5, a4), =(a4, a3), =(a4, a2))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[a1, a5, a3, a2]]) + : +- Join(joinType=[InnerJoin], where=[AND(=(a1, a2), =(a5, a2), =(a3, a2))], select=[a2, b2, c2, a5, b5, c5, a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : :- Exchange(distribution=[hash[a2, a2, a2]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + : +- Exchange(distribution=[hash[a1, a5, a3]]) + : +- Join(joinType=[InnerJoin], where=[AND(=(a1, a5), =(a5, a3))], select=[a5, b5, c5, a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : :- Exchange(distribution=[hash[a5, a5]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + : +- Exchange(distribution=[hash[a1, a3]]) + : +- Join(joinType=[InnerJoin], where=[=(a1, a3)], select=[a1, b1, c1, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : :- Exchange(distribution=[hash[a1]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + : +- Exchange(distribution=[hash[a3]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) + +- Exchange(distribution=[hash[a4, a4, a4, a4]]) + +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) +]]> + </Resource> + </TestCase> + <TestCase name="testStarJoinCondition2"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM T1, T2, T3, T4, T5 +WHERE b1 = b2 AND b1 = b3 AND b1 = b4 AND b1 = b5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalFilter(condition=[AND(=($1, $4), =($1, $7), =($1, $10), =($1, $13))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalJoin(condition=[true], joinType=[inner]) + : : :- LogicalJoin(condition=[true], joinType=[inner]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) ++- Join(joinType=[InnerJoin], where=[AND(=(b1, b3), =(b5, b3), =(b4, b3), =(b3, b2))], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[b1, b5, b4, b2]]) + : +- Join(joinType=[InnerJoin], where=[AND(=(b1, b5), =(b1, b4), =(b1, b2))], select=[a1, b1, c1, a5, b5, c5, a2, b2, c2, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : :- Exchange(distribution=[hash[b1, b1, b1]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + : +- Exchange(distribution=[hash[b5, b4, b2]]) + : +- Join(joinType=[InnerJoin], where=[AND(=(b5, b4), =(b4, b2))], select=[a5, b5, c5, a2, b2, c2, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : :- Exchange(distribution=[hash[b5, b2]]) + : : +- Join(joinType=[InnerJoin], where=[=(b5, b2)], select=[a5, b5, c5, a2, b2, c2], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : : :- Exchange(distribution=[hash[b5]]) + : : : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + : : +- Exchange(distribution=[hash[b2]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + : +- Exchange(distribution=[hash[b4, b4]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) + +- Exchange(distribution=[hash[b3, b3, b3, b3]]) + +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) +]]> + </Resource> + </TestCase> + <TestCase name="testWithoutColumnStats"> + <Resource name="sql"> + <![CDATA[ +SELECT * FROM T1, T2, T3, T4, T5 +WHERE c1 = c2 AND c1 = c3 AND c2 = c4 AND c1 = c5 + ]]> + </Resource> + <Resource name="planBefore"> + <![CDATA[ +LogicalProject(a1=[$0], b1=[$1], c1=[$2], a2=[$3], b2=[$4], c2=[$5], a3=[$6], b3=[$7], c3=[$8], a4=[$9], b4=[$10], c4=[$11], a5=[$12], b5=[$13], c5=[$14]) ++- LogicalFilter(condition=[AND(=($2, $5), =($2, $8), =($5, $11), =($2, $14))]) + +- LogicalJoin(condition=[true], joinType=[inner]) + :- LogicalJoin(condition=[true], joinType=[inner]) + : :- LogicalJoin(condition=[true], joinType=[inner]) + : : :- LogicalJoin(condition=[true], joinType=[inner]) + : : : :- LogicalTableScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]]) + : : : +- LogicalTableScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]]) + : : +- LogicalTableScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]]) + : +- LogicalTableScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]]) + +- LogicalTableScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]]) +]]> + </Resource> + <Resource name="planAfter"> + <![CDATA[ +Calc(select=[a1, b1, c1, a2, b2, c2, a3, b3, c3, a4, b4, c4, a5, b5, c5]) ++- Join(joinType=[InnerJoin], where=[AND(=(c1, c5), =(c1, c3), =(c1, c2), =(c4, c1))], select=[a1, b1, c1, a2, b2, c2, a5, b5, c5, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[c1, c1, c1, c1]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T1, source: [TestTableSource(a1, b1, c1)]]], fields=[a1, b1, c1]) + +- Exchange(distribution=[hash[c5, c3, c2, c4]]) + +- Join(joinType=[InnerJoin], where=[AND(=(c2, c4), =(c5, c2), =(c3, c2))], select=[a2, b2, c2, a5, b5, c5, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[c2, c2, c2]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T2, source: [TestTableSource(a2, b2, c2)]]], fields=[a2, b2, c2]) + +- Exchange(distribution=[hash[c4, c5, c3]]) + +- Join(joinType=[InnerJoin], where=[AND(=(c4, c5), =(c4, c3))], select=[a5, b5, c5, a3, b3, c3, a4, b4, c4], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + :- Exchange(distribution=[hash[c5, c3]]) + : +- Join(joinType=[InnerJoin], where=[=(c5, c3)], select=[a5, b5, c5, a3, b3, c3], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) + : :- Exchange(distribution=[hash[c5]]) + : : +- TableSourceScan(table=[[default_catalog, default_database, T5, source: [TestTableSource(a5, b5, c5)]]], fields=[a5, b5, c5]) + : +- Exchange(distribution=[hash[c3]]) + : +- TableSourceScan(table=[[default_catalog, default_database, T3, source: [TestTableSource(a3, b3, c3)]]], fields=[a3, b3, c3]) + +- Exchange(distribution=[hash[c4, c4]]) + +- TableSourceScan(table=[[default_catalog, default_database, T4, source: [TestTableSource(a4, b4, c4)]]], fields=[a4, b4, c4]) +]]> + </Resource> + </TestCase> +</Root> diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/JoinReorderTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/JoinReorderTest.scala new file mode 100644 index 0000000..cb31143 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/JoinReorderTest.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.batch.sql.join + +import org.apache.flink.table.plan.common.JoinReorderTestBase +import org.apache.flink.table.util.TableTestUtil + +class JoinReorderTest extends JoinReorderTestBase { + override protected def getTableTestUtil: TableTestUtil = batchTestUtil() +} diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/common/JoinReorderTestBase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/common/JoinReorderTestBase.scala new file mode 100644 index 0000000..88883a7 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/common/JoinReorderTestBase.scala @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.common + +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.api.{PlannerConfigOptions, Types} +import org.apache.flink.table.plan.stats.{ColumnStats, FlinkStatistic, TableStats} +import org.apache.flink.table.util.{TableTestBase, TableTestUtil} + +import org.junit.{Before, Test} + +import scala.collection.JavaConversions._ + +abstract class JoinReorderTestBase extends TableTestBase { + + protected val util: TableTestUtil = getTableTestUtil + + protected def getTableTestUtil: TableTestUtil + + @Before + def setup(): Unit = { + val types = Array[TypeInformation[_]](Types.INT, Types.LONG, Types.STRING) + + util.addTableSource("T1", types, Array("a1", "b1", "c1"), FlinkStatistic.builder() + .tableStats(new TableStats(1000000L, Map( + "a1" -> new ColumnStats(1000000L, 0L, 4.0, 4, null, null), + "b1" -> new ColumnStats(10L, 0L, 8.0, 8, null, null) + ))).build()) + + util.addTableSource("T2", types, Array("a2", "b2", "c2"), FlinkStatistic.builder() + .tableStats(new TableStats(10000L, Map( + "a2" -> new ColumnStats(100L, 0L, 4.0, 4, null, null), + "b2" -> new ColumnStats(5000L, 0L, 8.0, 8, null, null) + ))).build()) + + util.addTableSource("T3", types, Array("a3", "b3", "c3"), FlinkStatistic.builder() + .tableStats(new TableStats(10L, Map( + "a3" -> new ColumnStats(5L, 0L, 4.0, 4, null, null), + "b3" -> new ColumnStats(2L, 0L, 8.0, 8, null, null) + ))).build()) + + util.addTableSource("T4", types, Array("a4", "b4", "c4"), FlinkStatistic.builder() + .tableStats(new TableStats(100L, Map( + "a4" -> new ColumnStats(100L, 0L, 4.0, 4, null, null), + "b4" -> new ColumnStats(20L, 0L, 8.0, 8, null, null) + ))).build()) + + util.addTableSource("T5", types, Array("a5", "b5", "c5"), FlinkStatistic.builder() + .tableStats(new TableStats(500000L, Map( + "a5" -> new ColumnStats(200000L, 0L, 4.0, 4, null, null), + "b5" -> new ColumnStats(200L, 0L, 8.0, 8, null, null) + ))).build()) + + util.getTableEnv.getConfig.getConf.setBoolean( + PlannerConfigOptions.SQL_OPTIMIZER_JOIN_REORDER_ENABLED, true) + } + + @Test + def testStarJoinCondition1(): Unit = { + val sql = + s""" + |SELECT * FROM T1, T2, T3, T4, T5 + |WHERE a1 = a2 AND a1 = a3 AND a1 = a4 AND a1 = a5 + """.stripMargin + util.verifyPlan(sql) + } + + @Test + def testStarJoinCondition2(): Unit = { + val sql = + s""" + |SELECT * FROM T1, T2, T3, T4, T5 + |WHERE b1 = b2 AND b1 = b3 AND b1 = b4 AND b1 = b5 + """.stripMargin + util.verifyPlan(sql) + } + + @Test + def testBushyJoinCondition1(): Unit = { + val sql = + s""" + |SELECT * FROM T1, T2, T3, T4, T5 + |WHERE a1 = a2 AND a2 = a3 AND a1 = a4 AND a3 = a5 + """.stripMargin + util.verifyPlan(sql) + } + + @Test + def testBushyJoinCondition2(): Unit = { + val sql = + s""" + |SELECT * FROM T1, T2, T3, T4, T5 + |WHERE b1 = b2 AND b2 = b3 AND b1 = b4 AND b3 = b5 + """.stripMargin + util.verifyPlan(sql) + } + + @Test + def testWithoutColumnStats(): Unit = { + val sql = + s""" + |SELECT * FROM T1, T2, T3, T4, T5 + |WHERE c1 = c2 AND c1 = c3 AND c2 = c4 AND c1 = c5 + """.stripMargin + util.verifyPlan(sql) + } + + @Test + def testJoinWithProject(): Unit = { + val sql = + s""" + |WITH V1 AS (SELECT b1, a1, a2, c2 FROM T1 JOIN T2 ON a1 = a2), + | V2 AS (SELECT a3, b1, a1, c2, c3 FROM V1 JOIN T3 ON a2 = a3), + | V3 AS (SELECT a3, b1, a1, c2, c3, a4, b4 FROM T4 JOIN V2 ON a1 = a4) + | + |SELECT * FROM V3, T5 where a4 = a5 + """.stripMargin + // can not reorder now + util.verifyPlan(sql) + } + + @Test + def testJoinWithFilter(): Unit = { + val sql = + s""" + |WITH V1 AS (SELECT * FROM T1 JOIN T2 ON a1 = a2 WHERE b1 * b2 > 10), + | V2 AS (SELECT * FROM V1 JOIN T3 ON a2 = a3 WHERE b1 * b3 < 2000), + | V3 AS (SELECT * FROM T4 JOIN V2 ON a3 = a4 WHERE b2 + b4 > 100) + | + |SELECT * FROM V3, T5 WHERE a4 = a5 AND b5 < 15 + """.stripMargin + util.verifyPlan(sql) + } + + @Test + def testInnerAndLeftOuterJoin(): Unit = { + val sql = + s""" + |SELECT * FROM T1 + | JOIN T2 ON a1 = a2 + | JOIN T3 ON a2 = a3 + | LEFT OUTER JOIN T4 ON a1 = a4 + | JOIN T5 ON a4 = a5 + """.stripMargin + // T1, T2, T3 can reorder + util.verifyPlan(sql) + } + + @Test + def testInnerAndRightOuterJoin(): Unit = { + val sql = + s""" + |SELECT * FROM T1 + | RIGHT OUTER JOIN T2 ON a1 = a2 + | JOIN T3 ON a2 = a3 + | JOIN T4 ON a1 = a4 + | JOIN T5 ON a4 = a5 + """.stripMargin + // T3, T4, T5 can reorder + util.verifyPlan(sql) + } + + @Test + def testInnerAndFullOuterJoin(): Unit = { + val sql = + s""" + |SELECT * FROM T1 + | JOIN T2 ON a1 = a2 + | FULL OUTER JOIN T3 ON a2 = a3 + | JOIN T4 ON a1 = a4 + | JOIN T5 ON a4 = a5 + """.stripMargin + util.verifyPlan(sql) + } + + @Test + def testAllLeftOuterJoin(): Unit = { + val sql = + s""" + |SELECT * FROM T1 + | LEFT OUTER JOIN T2 ON a1 = a2 + | LEFT OUTER JOIN T3 ON a2 = a3 + | LEFT OUTER JOIN T4 ON a1 = a4 + | LEFT OUTER JOIN T5 ON a4 = a5 + """.stripMargin + // can not reorder + util.verifyPlan(sql) + } + + @Test + def testAllRightOuterJoin(): Unit = { + val sql = + s""" + |SELECT * FROM T1 + | RIGHT OUTER JOIN T2 ON a1 = a2 + | RIGHT OUTER JOIN T3 ON a2 = a3 + | RIGHT OUTER JOIN T4 ON a1 = a4 + | RIGHT OUTER JOIN T5 ON a4 = a5 + """.stripMargin + // can not reorder + util.verifyPlan(sql) + } + + @Test + def testAllFullOuterJoin(): Unit = { + val sql = + s""" + |SELECT * FROM T1 + | FULL OUTER JOIN T2 ON a1 = a2 + | FULL OUTER JOIN T3 ON a1 = a3 + | FULL OUTER JOIN T4 ON a1 = a4 + | FULL OUTER JOIN T5 ON a4 = a5 + """.stripMargin + // can not reorder + util.verifyPlan(sql) + } +} diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkAggregateInnerJoinTransposeRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkAggregateInnerJoinTransposeRuleTest.scala index ea096e3..6d3b3b6 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkAggregateInnerJoinTransposeRuleTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkAggregateInnerJoinTransposeRuleTest.scala @@ -50,7 +50,7 @@ class FlinkAggregateInnerJoinTransposeRuleTest extends TableTestBase { .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION) .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) .add(RuleSets.ofList(AggregateReduceGroupingRule.INSTANCE - )).build(), "reduce unless grouping") + )).build(), "reduce useless grouping") .addProgram( FlinkHepRuleSetProgramBuilder.newBuilder .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkJoinPushExpressionsRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkJoinPushExpressionsRuleTest.scala index 3ad9faa..1fa6622 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkJoinPushExpressionsRuleTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/FlinkJoinPushExpressionsRuleTest.scala @@ -25,6 +25,7 @@ import org.apache.flink.table.plan.rules.FlinkBatchRuleSets import org.apache.flink.table.util.TableTestBase import org.apache.calcite.plan.hep.HepMatchOrder +import org.apache.calcite.tools.RuleSets import org.junit.{Before, Test} /** @@ -38,11 +39,16 @@ class FlinkJoinPushExpressionsRuleTest extends TableTestBase { def setup(): Unit = { val programs = new FlinkChainedProgram[BatchOptimizeContext]() programs.addLast( - "FilterSimplifyExpressions", + "rules", FlinkHepRuleSetProgramBuilder.newBuilder .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE) .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) - .add(FlinkBatchRuleSets.SEMI_JOIN_RULES) + .add(RuleSets.ofList( + SimplifyFilterConditionRule.EXTENDED, + FlinkRewriteSubQueryRule.FILTER, + FlinkSubQueryRemoveRule.FILTER, + JoinConditionTypeCoerceRule.INSTANCE, + FlinkJoinPushExpressionsRule.INSTANCE)) .build() ) val calciteConfig = CalciteConfig.createBuilder(util.tableEnv.getConfig.getCalciteConfig) diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRuleTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRuleTest.scala new file mode 100644 index 0000000..156c2ba --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/rules/logical/RewriteMultiJoinConditionRuleTest.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.rules.logical + +import org.apache.flink.api.scala._ +import org.apache.flink.table.calcite.CalciteConfig +import org.apache.flink.table.plan.optimize.program.{BatchOptimizeContext, FlinkChainedProgram, FlinkGroupProgramBuilder, FlinkHepRuleSetProgramBuilder, HEP_RULES_EXECUTION_TYPE} +import org.apache.flink.table.util.TableTestBase + +import org.apache.calcite.plan.hep.HepMatchOrder +import org.apache.calcite.rel.rules.{FilterMultiJoinMergeRule, JoinToMultiJoinRule, ProjectMultiJoinMergeRule} +import org.apache.calcite.tools.RuleSets +import org.junit.{Before, Test} + +/** + * Test for [[RewriteMultiJoinConditionRule]]. + */ +class RewriteMultiJoinConditionRuleTest extends TableTestBase { + private val util = batchTestUtil() + + @Before + def setup(): Unit = { + val program = new FlinkChainedProgram[BatchOptimizeContext]() + program.addLast( + "rules", + FlinkGroupProgramBuilder.newBuilder[BatchOptimizeContext] + .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder + .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION) + .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) + .add(RuleSets.ofList( + FlinkFilterJoinRule.FILTER_ON_JOIN, + FlinkFilterJoinRule.JOIN)) + .build(), "push filter into join") + .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder + .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_COLLECTION) + .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) + .add(RuleSets.ofList( + ProjectMultiJoinMergeRule.INSTANCE, + FilterMultiJoinMergeRule.INSTANCE, + JoinToMultiJoinRule.INSTANCE)) + .build(), "merge join to MultiJoin") + .addProgram(FlinkHepRuleSetProgramBuilder.newBuilder + .setHepRulesExecutionType(HEP_RULES_EXECUTION_TYPE.RULE_SEQUENCE) + .setHepMatchOrder(HepMatchOrder.BOTTOM_UP) + .add(RuleSets.ofList(RewriteMultiJoinConditionRule.INSTANCE)) + .build(), "RewriteMultiJoinConditionRule") + .build()) + + val builder = CalciteConfig.createBuilder(util.tableEnv.getConfig.getCalciteConfig) + .replaceBatchProgram(program) + util.tableEnv.config.setCalciteConfig(builder.build()) + + util.addTableSource[(Int, Long)]("A", 'a1, 'a2) + util.addTableSource[(Int, Long)]("B", 'b1, 'b2) + util.addTableSource[(Int, Long)]("C", 'c1, 'c2) + util.addTableSource[(Int, Long)]("D", 'd1, 'd2) + } + + @Test + def testMultiJoin_InnerJoin1(): Unit = { + val sqlQuery = "SELECT * FROM A, B WHERE a1 = b1" + util.verifyPlan(sqlQuery) + } + + @Test + def testMultiJoin_InnerJoin2(): Unit = { + val sqlQuery = "SELECT * FROM A, B, C WHERE a1 = b1 AND a1 = c1" + util.verifyPlan(sqlQuery) + } + + @Test + def testMultiJoin_InnerJoin3(): Unit = { + val sqlQuery = "SELECT * FROM A, B, C, D WHERE a1 = b1 AND b1 = c1 AND c1 = d1" + util.verifyPlan(sqlQuery) + } + + @Test + def testMultiJoin_InnerJoin4(): Unit = { + // non-equi join condition + val sqlQuery = "SELECT * FROM A, B, C WHERE a1 = b1 AND a1 > c1" + util.verifyPlan(sqlQuery) + } + + @Test + def testMultiJoin_InnerJoin5(): Unit = { + val sqlQuery = "SELECT * FROM A, B, C WHERE a1 + 1 = b1 AND a1 + 1 = c1" + util.verifyPlan(sqlQuery) + } + + @Test + def testMultiJoin_LeftJoin1(): Unit = { + val sqlQuery = "SELECT * FROM A LEFT JOIN B ON a1 = b1" + util.verifyPlan(sqlQuery) + } + + @Test + def testMultiJoin_LeftJoin2(): Unit = { + val sqlQuery = "SELECT * FROM A JOIN B ON a1 = b1 LEFT JOIN C ON b1 = c1" + util.verifyPlan(sqlQuery) + } + + @Test + def testMultiJoin_LeftJoin3(): Unit = { + val sqlQuery = "SELECT * FROM A LEFT JOIN B ON a1 = b1 JOIN C ON a1 = c1" + util.verifyPlan(sqlQuery) + } + + @Test + def testMultiJoin_RightJoin1(): Unit = { + val sqlQuery = "SELECT * FROM A RIGHT JOIN B ON a1 = b1" + util.verifyPlan(sqlQuery) + } + + @Test + def testMultiJoin_RightJoin2(): Unit = { + val sqlQuery = "SELECT * FROM A JOIN B ON a1 = b1 RIGHT JOIN C ON b1 = c1" + util.verifyPlan(sqlQuery) + } + + @Test + def testMultiJoin_RightJoin3(): Unit = { + val sqlQuery = "SELECT * FROM A RIGHT JOIN B ON a1 = b1 JOIN C ON a1 = c1" + util.verifyPlan(sqlQuery) + } + + @Test + def testMultiJoin_FullJoin1(): Unit = { + val sqlQuery = "SELECT * FROM A FULL OUTER JOIN B ON a1 = b1" + util.verifyPlan(sqlQuery) + } + + @Test + def testMultiJoin_FullJoin2(): Unit = { + val sqlQuery = "SELECT * FROM A FULL OUTER JOIN B ON a1 = b1 FULL OUTER JOIN C ON a1 = c1" + util.verifyPlan(sqlQuery) + } + +} diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/JoinReorderTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/JoinReorderTest.scala new file mode 100644 index 0000000..2a34791 --- /dev/null +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/JoinReorderTest.scala @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.plan.stream.sql.join + +import org.apache.flink.table.plan.common.JoinReorderTestBase +import org.apache.flink.table.util.TableTestUtil + +class JoinReorderTest extends JoinReorderTestBase { + override protected def getTableTestUtil: TableTestUtil = streamTestUtil() +}