This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 3b3eb6f  [SPARK-36489][SQL] Aggregate functions over no grouping keys, 
on tables with a single bucket, return multiple rows
3b3eb6f is described below

commit 3b3eb6f8eae8519d107e8ae7c634c61e9a2b5e5c
Author: IonutBoicuAms <ionut.bo...@databricks.com>
AuthorDate: Thu Aug 12 15:22:38 2021 +0800

    [SPARK-36489][SQL] Aggregate functions over no grouping keys, on tables 
with a single bucket, return multiple rows
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a bug in `DisableUnnecessaryBucketedScan`.
    When running any aggregate function, without any grouping keys, on a table 
with a single bucket, multiple rows are returned.
    This happens because the aggregate function satisfies the `AllTuples` 
distribution, no `Exchange` will be planned, and the bucketed scan will be 
disabled.
    
    ### Why are the changes needed?
    
    Bug fixing. Aggregates over no grouping keys should return a single row.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added new test in `DisableUnnecessaryBucketedScanSuite`.
    
    Closes #33711 from IonutBoicuAms/fix-bug-disableunnecessarybucketedscan.
    
    Authored-by: IonutBoicuAms <ionut.bo...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit 2b665751d9c7e4fb07ea18ce6611328e24f3dce9)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../bucketing/DisableUnnecessaryBucketedScan.scala |  4 ++--
 .../DisableUnnecessaryBucketedScanSuite.scala      | 26 ++++++++++++++++++++++
 2 files changed, 28 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala
index 98bcab2..5bd70c6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/bucketing/DisableUnnecessaryBucketedScan.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.bucketing
 
-import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, 
HashClusteredDistribution}
+import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, 
ClusteredDistribution, HashClusteredDistribution}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{FileSourceScanExec, FilterExec, 
ProjectExec, SortExec, SparkPlan}
 import org.apache.spark.sql.execution.aggregate.BaseAggregateExec
@@ -120,7 +120,7 @@ object DisableUnnecessaryBucketedScan extends 
Rule[SparkPlan] {
 
   private def hasInterestingPartition(plan: SparkPlan): Boolean = {
     plan.requiredChildDistribution.exists {
-      case _: ClusteredDistribution | _: HashClusteredDistribution => true
+      case _: ClusteredDistribution | _: HashClusteredDistribution | AllTuples 
=> true
       case _ => false
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
index 1a19824..737cffc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/DisableUnnecessaryBucketedScanSuite.scala
@@ -258,4 +258,30 @@ abstract class DisableUnnecessaryBucketedScanSuite
       }
     }
   }
+
+  test("Aggregates with no groupby over tables having 1 BUCKET, return 
multiple rows") {
+    withTable("t1") {
+      withSQLConf(SQLConf.AUTO_BUCKETED_SCAN_ENABLED.key -> "true") {
+        sql(
+          """
+            |CREATE TABLE t1 (`id` BIGINT, `event_date` DATE)
+            |USING PARQUET
+            |CLUSTERED BY (id)
+            |INTO 1 BUCKETS
+            |""".stripMargin)
+        sql(
+          """
+            |INSERT INTO TABLE t1 VALUES(1.23, cast("2021-07-07" as date))
+            |""".stripMargin)
+        sql(
+          """
+            |INSERT INTO TABLE t1 VALUES(2.28, cast("2021-08-08" as date))
+            |""".stripMargin)
+        val df = spark.sql("select sum(id) from t1 where id is not null")
+        assert(df.count == 1)
+        checkDisableBucketedScan(query = "SELECT SUM(id) FROM t1 WHERE id is 
not null",
+          expectedNumScanWithAutoScanEnabled = 1, 
expectedNumScanWithAutoScanDisabled = 1)
+      }
+    }
+  }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to