This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ee74bd0d4e3 [SPARK-38832][SQL] Remove unnecessary distinct in 
aggregate expression by distinctKeys
ee74bd0d4e3 is described below

commit ee74bd0d4e3d97b33aa57fe523ab4f5537125f68
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Wed Apr 13 18:10:33 2022 +0800

    [SPARK-38832][SQL] Remove unnecessary distinct in aggregate expression by 
distinctKeys
    
    ### What changes were proposed in this pull request?
    
    Make `EliminateDistinct` support eliminate distinct by child distinct keys.
    
    ### Why are the changes needed?
    
    We can remove the distinct in aggregate expression if the distinct 
semantics is guaranteed by child.
    
    For example:
    ```sql
    SELECT count(distinct c) FROM (
      SELECT c FROM t GROUP BY c
    )
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    improve performance
    
    ### How was this patch tested?
    
    add test in `EliminateDistinctSuite`
    
    Closes #36117 from ulysses-you/remove-distinct.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   | 25 ++++++++++++++++------
 .../plans/logical/LogicalPlanDistinctKeys.scala    |  8 ++++++-
 .../optimizer/EliminateDistinctSuite.scala         | 18 ++++++++++++++++
 3 files changed, 44 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 66c2ad84cce..bb788336c6d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -146,7 +146,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
         PushDownPredicates) :: Nil
     }
 
-    val batches = (Batch("Eliminate Distinct", Once, EliminateDistinct) ::
+    val batches = (
     // Technically some of the rules in Finish Analysis are not optimizer 
rules and belong more
     // in the analyzer, because they are needed for correctness (e.g. 
ComputeCurrentTime).
     // However, because we also use the analyzer to canonicalized queries (for 
view definition),
@@ -166,6 +166,7 @@ abstract class Optimizer(catalogManager: CatalogManager)
     
//////////////////////////////////////////////////////////////////////////////////////////
     // Optimizer rules start here
     
//////////////////////////////////////////////////////////////////////////////////////////
+    Batch("Eliminate Distinct", Once, EliminateDistinct) ::
     // - Do the first call of CombineUnions before starting the major 
Optimizer rules,
     //   since it can reduce the number of iteration and the other rules could 
add/move
     //   extra operators between two adjacent Union operators.
@@ -411,14 +412,26 @@ abstract class Optimizer(catalogManager: CatalogManager)
 }
 
 /**
- * Remove useless DISTINCT for MAX and MIN.
+ * Remove useless DISTINCT:
+ *   1. For some aggregate expression, e.g.: MAX and MIN.
+ *   2. If the distinct semantics is guaranteed by child.
+ *
  * This rule should be applied before RewriteDistinctAggregates.
  */
 object EliminateDistinct extends Rule[LogicalPlan] {
-  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.transformAllExpressionsWithPruning(
-    _.containsPattern(AGGREGATE_EXPRESSION)) {
-    case ae: AggregateExpression if ae.isDistinct && 
isDuplicateAgnostic(ae.aggregateFunction) =>
-      ae.copy(isDistinct = false)
+  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.transformWithPruning(
+    _.containsPattern(AGGREGATE)) {
+    case agg: Aggregate =>
+      
agg.transformExpressionsWithPruning(_.containsPattern(AGGREGATE_EXPRESSION)) {
+        case ae: AggregateExpression if ae.isDistinct &&
+          isDuplicateAgnostic(ae.aggregateFunction) =>
+          ae.copy(isDistinct = false)
+
+        case ae: AggregateExpression if ae.isDistinct &&
+          agg.child.distinctKeys.exists(
+            
_.subsetOf(ExpressionSet(ae.aggregateFunction.children.filterNot(_.foldable)))) 
=>
+          ae.copy(isDistinct = false)
+      }
   }
 
   def isDuplicateAgnostic(af: AggregateFunction): Boolean = af match {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala
index 1843c2da478..2ffa5a0e594 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlanDistinctKeys.scala
@@ -29,6 +29,12 @@ import 
org.apache.spark.sql.internal.SQLConf.PROPAGATE_DISTINCT_KEYS_ENABLED
  */
 trait LogicalPlanDistinctKeys { self: LogicalPlan =>
   lazy val distinctKeys: Set[ExpressionSet] = {
-    if (conf.getConf(PROPAGATE_DISTINCT_KEYS_ENABLED)) 
DistinctKeyVisitor.visit(self) else Set.empty
+    if (conf.getConf(PROPAGATE_DISTINCT_KEYS_ENABLED)) {
+      val keys = DistinctKeyVisitor.visit(self)
+      require(keys.forall(_.nonEmpty))
+      keys
+    } else {
+      Set.empty
+    }
   }
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala
index 9c57ced8492..798cc0a42dd 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateDistinctSuite.scala
@@ -33,6 +33,7 @@ class EliminateDistinctSuite extends PlanTest {
   }
 
   val testRelation = LocalRelation($"a".int)
+  val testRelation2 = LocalRelation($"a".int, $"b".string)
 
   Seq(
     Max(_),
@@ -71,4 +72,21 @@ class EliminateDistinctSuite extends PlanTest {
         comparePlans(Optimize.execute(query), answer)
       }
   }
+
+  test("SPARK-38832: Remove unnecessary distinct in aggregate expression by 
distinctKeys") {
+    val q1 = testRelation2.groupBy($"a")($"a")
+      .rebalance().groupBy()(countDistinct($"a") as "x", sumDistinct($"a") as 
"y").analyze
+    val r1 = testRelation2.groupBy($"a")($"a")
+      .rebalance().groupBy()(count($"a") as "x", sum($"a") as "y").analyze
+    comparePlans(Optimize.execute(q1), r1)
+
+    // not a subset of distinct attr
+    val q2 = testRelation2.groupBy($"a", $"b")($"a", $"b")
+      .rebalance().groupBy()(countDistinct($"a") as "x", sumDistinct($"a") as 
"y").analyze
+    comparePlans(Optimize.execute(q2), q2)
+
+    // child distinct key is empty
+    val q3 = testRelation2.groupBy($"a")(countDistinct($"a") as "x").analyze
+    comparePlans(Optimize.execute(q3), q3)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to