This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ca044e62536 [SPARK-38886][SQL] Remove outer join if aggregate functions are duplicate agnostic on streamed side ca044e62536 is described below commit ca044e62536b4c80acbbbab538a5f61ce074a684 Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Mon Apr 18 23:22:00 2022 +0800 [SPARK-38886][SQL] Remove outer join if aggregate functions are duplicate agnostic on streamed side ### What changes were proposed in this pull request? Enhance `EliminateOuterJoin` by removing outer join if match two conditions: - all aggregate functions are duplicate agnostic - references are coming from stream side ### Why are the changes needed? If aggregate child is outer join, and the aggregate references are all coming from the streamed side and the aggregate functions are all duplicate agnostic, we can remve the outer join. For example: ```sql SELECT t1.c1, min(t1.c2) FROM t1 LEFT JOIN t2 ON t1.c1 = t2.c1 GROUP BY t1.c1 ==> SELECT t1.c1, min(t1.c2) FROM t1 GROUP BY t1.c1 ``` ### Does this PR introduce _any_ user-facing change? Improve performance ### How was this patch tested? add test Closes #36177 from ulysses-you/SPARK-38886. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/optimizer/joins.scala | 35 +++++++++++++----- .../optimizer/AggregateOptimizeSuite.scala | 42 ++++++++++++++++++++++ 2 files changed, 68 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index 45d8c54ea19..b21594deb70 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer import scala.annotation.tailrec import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -126,11 +127,17 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { * - full outer -> left outer if only the left side has such predicates * - full outer -> right outer if only the right side has such predicates * - * 2. Removes outer join if it only has distinct on streamed side + * 2. Removes outer join if aggregate is from streamed side and duplicate agnostic + * * {{{ * SELECT DISTINCT f1 FROM t1 LEFT JOIN t2 ON t1.id = t2.id ==> SELECT DISTINCT f1 FROM t1 * }}} * + * {{{ + * SELECT t1.c1, max(t1.c2) FROM t1 LEFT JOIN t2 ON t1.c1 = t2.c1 GROUP BY t1.c1 ==> + * SELECT t1.c1, max(t1.c2) FROM t1 GROUP BY t1.c1 + * }}} + * * This rule should be executed before pushing down the Filter */ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { @@ -166,23 +173,33 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with PredicateHelper { } } + private def allDuplicateAgnostic( + aggregateExpressions: Seq[NamedExpression]): Boolean = { + !aggregateExpressions.exists(_.exists { + case agg: AggregateFunction => !EliminateDistinct.isDuplicateAgnostic(agg) + case _ => false + }) + } + def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning( _.containsPattern(OUTER_JOIN), ruleId) { case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _, _)) => val newJoinType = buildNewJoinType(f, j) if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType)) - case a @ Aggregate(_, _, Join(left, _, LeftOuter, _, _)) - if a.groupOnly && a.references.subsetOf(left.outputSet) => + case a @ Aggregate(_, aggExprs, Join(left, _, LeftOuter, _, _)) + if a.references.subsetOf(left.outputSet) && allDuplicateAgnostic(aggExprs) => a.copy(child = left) - case a @ Aggregate(_, _, Join(_, right, RightOuter, _, _)) - if a.groupOnly && a.references.subsetOf(right.outputSet) => + case a @ Aggregate(_, aggExprs, Join(_, right, RightOuter, _, _)) + if a.references.subsetOf(right.outputSet) && allDuplicateAgnostic(aggExprs) => a.copy(child = right) - case a @ Aggregate(_, _, p @ Project(_, Join(left, _, LeftOuter, _, _))) - if a.groupOnly && p.references.subsetOf(left.outputSet) => + case a @ Aggregate(_, aggExprs, p @ Project(projectList, Join(left, _, LeftOuter, _, _))) + if projectList.forall(_.deterministic) && p.references.subsetOf(left.outputSet) && + allDuplicateAgnostic(aggExprs) => a.copy(child = p.copy(child = left)) - case a @ Aggregate(_, _, p @ Project(_, Join(_, right, RightOuter, _, _))) - if a.groupOnly && p.references.subsetOf(right.outputSet) => + case a @ Aggregate(_, aggExprs, p @ Project(projectList, Join(_, right, RightOuter, _, _))) + if projectList.forall(_.deterministic) && p.references.subsetOf(right.outputSet) && + allDuplicateAgnostic(aggExprs) => a.copy(child = p.copy(child = right)) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala index cef307fcba0..915878f4338 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateOptimizeSuite.scala @@ -164,4 +164,46 @@ class AggregateOptimizeSuite extends AnalysisTest { .groupBy("x.b".attr)("x.b".attr, TrueLiteral, FalseLiteral.as("newAlias")) .analyze) } + + test("SPARK-38886: Remove outer join if aggregate functions are duplicate agnostic on " + + "streamed side") { + val x = testRelation.subquery(Symbol("x")) + val y = testRelation.subquery(Symbol("y")) + + Seq((LeftOuter, "x", x), (RightOuter, "y", y)).foreach { case (joinType, t, streamed) => + comparePlans(Optimize.execute( + x.join(y, joinType, Some($"x.a" === $"y.a")) + .groupBy($"$t.a")($"$t.a", max($"$t.b")).analyze), + streamed.groupBy($"$t.a")($"$t.a", max($"$t.b")).analyze) + + // with project + comparePlans(Optimize.execute( + x.join(y, joinType, Some($"x.a" === $"y.a")).select($"$t.a" as "a1", $"$t.b" as "b1") + .groupBy($"a1")($"a1", max($"b1")).analyze), + streamed.select($"$t.a" as "a1", $"$t.b" as "b1") + .groupBy($"a1")($"a1", max($"b1")).analyze) + + // global aggregate + comparePlans(Optimize.execute( + x.join(y, joinType, Some($"x.a" === $"y.a")) + .groupBy()(max($"$t.b"), min($"$t.c")).analyze), + streamed.groupBy()(max($"$t.b"), min($"$t.c")).analyze) + + // negative cases + // with non-deterministic project + val p1 = x.join(y, joinType, Some($"x.a" === $"y.a")).select($"$t.a" as "a1", rand(1) as "b1") + .groupBy($"b1")($"b1", max($"a1")).analyze + comparePlans(Optimize.execute(p1), p1) + + // not from streamed side + val p2 = x.join(y, joinType, Some($"x.a" === $"y.a")) + .groupBy($"x.a", $"y.b")(min($"x.b"), max($"y.a")).analyze + comparePlans(Optimize.execute(p2), p2) + + // not duplicate agnostic + val p3 = x.join(y, joinType, Some($"x.a" === $"y.a")) + .groupBy($"$t.a")(sum($"$t.a")).analyze + comparePlans(Optimize.execute(p3), p3) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org