This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ee74bd0d4e3 [SPARK-38832][SQL] Remove unnecessary distinct in aggregate expression by distinctKeys ee74bd0d4e3 is described below commit ee74bd0d4e3d97b33aa57fe523ab4f5537125f68 Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Wed Apr 13 18:10:33 2022 +0800 [SPARK-38832][SQL] Remove unnecessary distinct in aggregate expression by distinctKeys ### What changes were proposed in this pull request? Make `EliminateDistinct` support eliminate distinct by child distinct keys. ### Why are the changes needed? We can remove the distinct in aggregate expression if the distinct semantics is guaranteed by child. For example: ```sql SELECT count(distinct c) FROM ( SELECT c FROM t GROUP BY c ) ``` ### Does this PR introduce _any_ user-facing change? improve performance ### How was this patch tested? add test in `EliminateDistinctSuite` Closes #36117 from ulysses-you/remove-distinct. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 25 ++++++++++++++++------ .../plans/logical/LogicalPlanDistinctKeys.scala | 8 ++++++- .../optimizer/EliminateDistinctSuite.scala | 18 ++++++++++++++++ 3 files changed, 44 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 66c2ad84cce..bb788336c6d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -146,7 +146,7 @@ abstract class Optimizer(catalogManager: CatalogManager) PushDownPredicates) :: Nil } - val batches = (Batch("Eliminate Distinct", Once, EliminateDistinct) :: + val batches = ( // Technically some of the rules in Finish Analysis are not optimizer rules and belong more // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime). // However, because we also use the analyzer to canonicalized queries (for view definition), @@ -166,6 +166,7 @@ abstract class Optimizer(catalogManager: CatalogManager) ////////////////////////////////////////////////////////////////////////////////////////// // Optimizer rules start here ////////////////////////////////////////////////////////////////////////////////////////// + Batch("Eliminate Distinct", Once, EliminateDistinct) :: // - Do the first call of CombineUnions before starting the major Optimizer rules, // since it can reduce the number of iteration and the other rules could add/move // extra operators between two adjacent Union operators. @@ -411,14 +412,26 @@ abstract class Optimizer(catalogManager: CatalogManager) } /** - * Remove useless DISTINCT for MAX and MIN. + * Remove useless DISTINCT: + * 1. For some aggregate expression, e.g.: MAX and MIN. + * 2. If the distinct semantics is guaranteed by child. + * * This rule should be applied before RewriteDistinctAggregates. */ object EliminateDistinct extends Rule[LogicalPlan] { - override def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressionsWithPruning( - _.containsPattern(AGGREGATE_EXPRESSION)) { - case ae: AggregateExpression if ae.isDistinct && isDuplicateAgnostic(ae.aggregateFunction) => - ae.copy(isDistinct = false) + override def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning( + _.containsPattern(AGGREGATE)) { + case agg: Aggregate => + agg.transformExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) { + case ae: AggregateExpression if ae.isDistinct && + isDuplicateAgnostic(ae.aggregateFunction) => + ae.copy(isDistinct = false) + + case ae: AggregateExpression if ae.isDistinct && + agg.child.distinctKeys.exists( + _.subsetOf(ExpressionSet(ae.aggregateFunction.children.filterNot(_.foldable)))) => + ae.copy(isDistinct = false) + } } def isDuplicateAgnostic(af: AggregateFunction): Boolean = af match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala index 1843c2da478..2ffa5a0e594 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala @@ -29,6 +29,12 @@ import org.apache.spark.sql.internal.SQLConf.PROPAGATE_DISTINCT_KEYS_ENABLED */ trait LogicalPlanDistinctKeys { self: LogicalPlan => lazy val distinctKeys: Set[ExpressionSet] = { - if (conf.getConf(PROPAGATE_DISTINCT_KEYS_ENABLED)) DistinctKeyVisitor.visit(self) else Set.empty + if (conf.getConf(PROPAGATE_DISTINCT_KEYS_ENABLED)) { + val keys = DistinctKeyVisitor.visit(self) + require(keys.forall(_.nonEmpty)) + keys + } else { + Set.empty + } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala index 9c57ced8492..798cc0a42dd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala @@ -33,6 +33,7 @@ class EliminateDistinctSuite extends PlanTest { } val testRelation = LocalRelation($"a".int) + val testRelation2 = LocalRelation($"a".int, $"b".string) Seq( Max(_), @@ -71,4 +72,21 @@ class EliminateDistinctSuite extends PlanTest { comparePlans(Optimize.execute(query), answer) } } + + test("SPARK-38832: Remove unnecessary distinct in aggregate expression by distinctKeys") { + val q1 = testRelation2.groupBy($"a")($"a") + .rebalance().groupBy()(countDistinct($"a") as "x", sumDistinct($"a") as "y").analyze + val r1 = testRelation2.groupBy($"a")($"a") + .rebalance().groupBy()(count($"a") as "x", sum($"a") as "y").analyze + comparePlans(Optimize.execute(q1), r1) + + // not a subset of distinct attr + val q2 = testRelation2.groupBy($"a", $"b")($"a", $"b") + .rebalance().groupBy()(countDistinct($"a") as "x", sumDistinct($"a") as "y").analyze + comparePlans(Optimize.execute(q2), q2) + + // child distinct key is empty + val q3 = testRelation2.groupBy($"a")(countDistinct($"a") as "x").analyze + comparePlans(Optimize.execute(q3), q3) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org