This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 37d6b3c  [SPARK-32761][SQL][3.0] Allow aggregating multiple foldable 
distinct expressions
37d6b3c is described below

commit 37d6b3c0fafa98922ed1ecf4f8634d962f5bb9d9
Author: Linhong Liu <linhong....@databricks.com>
AuthorDate: Fri Oct 16 03:36:21 2020 +0000

    [SPARK-32761][SQL][3.0] Allow aggregating multiple foldable distinct 
expressions
    
    ### What changes were proposed in this pull request?
    For queries with multiple foldable distinct columns, since they will be 
eliminated during
    execution, it's not mandatory to let `RewriteDistinctAggregates` handle 
this case. And
    in the current code, `RewriteDistinctAggregates` *dose* miss some 
"aggregating with
    multiple foldable distinct expressions" cases.
    For example: `select count(distinct 2), count(distinct 2, 3)` will be 
missed.
    
    But in the planner, this will trigger an error that "multiple distinct 
expressions" are not allowed.
    As the foldable distinct columns can be eliminated finally, we can allow 
this in the aggregation
    planner check.
    
    ### Why are the changes needed?
    bug fix
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    added test case
    
    Authored-by: Linhong Liu <linhong.liudatabricks.com>
    Signed-off-by: Wenchen Fan <wenchendatabricks.com>
    (cherry picked from commit a410658c9bc244e325702dc926075bd835b669ff)
    
    Closes #30052 from linhongliu-db/SPARK-32761-3.0.
    
    Authored-by: Linhong Liu <linhong....@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../main/scala/org/apache/spark/sql/execution/SparkStrategies.scala | 6 ++++--
 sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala   | 4 ++++
 2 files changed, 8 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index f836deb..689d1eb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -517,7 +517,8 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
         val (functionsWithDistinct, functionsWithoutDistinct) =
           aggregateExpressions.partition(_.isDistinct)
-        if 
(functionsWithDistinct.map(_.aggregateFunction.children.toSet).distinct.length 
> 1) {
+        if (functionsWithDistinct.map(
+          
_.aggregateFunction.children.filterNot(_.foldable).toSet).distinct.length > 1) {
           // This is a sanity check. We should not reach here when we have 
multiple distinct
           // column sets. Our `RewriteDistinctAggregates` should take care 
this case.
           sys.error("You hit a query analyzer bug. Please report your query to 
" +
@@ -548,7 +549,8 @@ abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
             // to be [COUNT(DISTINCT foo), MAX(DISTINCT foo)], but
             // [COUNT(DISTINCT bar), COUNT(DISTINCT foo)] is disallowed 
because those two distinct
             // aggregates have different column expressions.
-            val distinctExpressions = 
functionsWithDistinct.head.aggregateFunction.children
+            val distinctExpressions =
+              
functionsWithDistinct.head.aggregateFunction.children.filterNot(_.foldable)
             val normalizedNamedDistinctExpressions = distinctExpressions.map { 
e =>
               // Ideally this should be done in `NormalizeFloatingNumbers`, 
but we do it here
               // because `distinctExpressions` is not extracted during logical 
phase.
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 7869005..85cbe45 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -2467,6 +2467,10 @@ class DataFrameSuite extends QueryTest
     val df = l.join(r, $"col2" === $"col4", "LeftOuter")
     checkAnswer(df, Row("2", "2"))
   }
+
+  test("SPARK-32761: aggregating multiple distinct CONSTANT columns") {
+     checkAnswer(sql("select count(distinct 2), count(distinct 2,3)"), Row(1, 
1))
+  }
 }
 
 case class GroupByKey(a: Int, b: Int)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to