This is an automated email from the ASF dual-hosted git repository. yamamuro pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 67ec4f7 [SPARK-33971][SQL] Eliminate distinct from more aggregates 67ec4f7 is described below commit 67ec4f7f67dc494c2619b7faf1b1145f2200b65c Author: tanel.k...@gmail.com <tanel.k...@gmail.com> AuthorDate: Fri Feb 26 21:59:02 2021 +0900 [SPARK-33971][SQL] Eliminate distinct from more aggregates ### What changes were proposed in this pull request? Add more aggregate expressions to `EliminateDistinct` rule. ### Why are the changes needed? Distinct aggregation can add a significant overhead. It's better to remove distinct whenever possible. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #30999 from tanelk/SPARK-33971_eliminate_distinct. Authored-by: tanel.k...@gmail.com <tanel.k...@gmail.com> Signed-off-by: Takeshi Yamamuro <yamam...@apache.org> --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 16 ++++++--- .../optimizer/EliminateDistinctSuite.scala | 41 +++++++++++----------- 2 files changed, 32 insertions(+), 25 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 717770f..cb24180 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -352,11 +352,17 @@ abstract class Optimizer(catalogManager: CatalogManager) */ object EliminateDistinct extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transformExpressions { - case ae: AggregateExpression if ae.isDistinct => - ae.aggregateFunction match { - case _: Max | _: Min => ae.copy(isDistinct = false) - case _ => ae - } + case ae: AggregateExpression if ae.isDistinct && isDuplicateAgnostic(ae.aggregateFunction) => + ae.copy(isDistinct = false) + } + + private def isDuplicateAgnostic(af: AggregateFunction): Boolean = af match { + case _: Max => true + case _: Min => true + case _: BitAndAgg => true + case _: BitOrAgg => true + case _: CollectSet => true + case _ => false } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala index 51c7519..0848d56 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala @@ -18,6 +18,8 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor @@ -32,25 +34,24 @@ class EliminateDistinctSuite extends PlanTest { val testRelation = LocalRelation('a.int) - test("Eliminate Distinct in Max") { - val query = testRelation - .select(maxDistinct('a).as('result)) - .analyze - val answer = testRelation - .select(max('a).as('result)) - .analyze - assert(query != answer) - comparePlans(Optimize.execute(query), answer) - } - - test("Eliminate Distinct in Min") { - val query = testRelation - .select(minDistinct('a).as('result)) - .analyze - val answer = testRelation - .select(min('a).as('result)) - .analyze - assert(query != answer) - comparePlans(Optimize.execute(query), answer) + Seq( + Max(_), + Min(_), + BitAndAgg(_), + BitOrAgg(_), + CollectSet(_: Expression) + ).foreach { + aggBuilder => + val agg = aggBuilder('a) + test(s"Eliminate Distinct in ${agg.prettyName}") { + val query = testRelation + .select(agg.toAggregateExpression(isDistinct = true).as('result)) + .analyze + val answer = testRelation + .select(agg.toAggregateExpression(isDistinct = false).as('result)) + .analyze + assert(query != answer) + comparePlans(Optimize.execute(query), answer) + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org