This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4aec9d7  [SPARK-36647][SQL][TESTS] Push down Aggregate (Min/Max/Count) 
for Parquet if filter is on partition col
4aec9d7 is described below

commit 4aec9d7daca7a1a146ff1fb1e7541c9443905725
Author: Huaxin Gao <huaxin_...@apple.com>
AuthorDate: Wed Oct 27 00:14:00 2021 -0700

    [SPARK-36647][SQL][TESTS] Push down Aggregate (Min/Max/Count) for Parquet 
if filter is on partition col
    
    ### What changes were proposed in this pull request?
    I just realized that with the changes in 
https://github.com/apache/spark/pull/33650, the restriction for not pushing 
down Min/Max/Count for partition filter was already removed. This PR just added 
test to make sure Min/Max/Count in parquet are pushed down if filter is on 
partition col.
    
    ### Why are the changes needed?
    To complete the work for Aggregate (Min/Max/Count) push down for Parquet
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    new test
    
    Closes #34248 from huaxingao/partitionFilter.
    
    Authored-by: Huaxin Gao <huaxin_...@apple.com>
    Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com>
---
 .../v2/parquet/ParquetScanBuilder.scala            |  7 ++--
 .../parquet/ParquetAggregatePushDownSuite.scala    | 40 ++++++++++++++++------
 2 files changed, 33 insertions(+), 14 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
index 113438a..da49381 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
@@ -134,10 +134,9 @@ case class ParquetScanBuilder(
       // are combined with filter or group by
       // e.g. SELECT COUNT(col1) FROM t WHERE col2 = 8
       //      SELECT COUNT(col1) FROM t GROUP BY col2
-      // Todo: 1. add support if groupby column is partition col
-      //          (https://issues.apache.org/jira/browse/SPARK-36646)
-      //       2. add support if filter col is partition col
-      //          (https://issues.apache.org/jira/browse/SPARK-36647)
+      // However, if the filter is on partition column, max/min/count can 
still be pushed down
+      // Todo:  add support if groupby column is partition col
+      //        (https://issues.apache.org/jira/browse/SPARK-36646)
       return false
     }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala
index 0ae95db..77ecd28 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAggregatePushDownSuite.scala
@@ -129,10 +129,9 @@ abstract class ParquetAggregatePushDownSuite
         .write.partitionBy("p").parquet(dir.getCanonicalPath)
       withTempView("tmp") {
         
spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp");
-        val enableVectorizedReader = Seq("false", "true")
-        for (testVectorizedReader <- enableVectorizedReader) {
+        Seq("false", "true").foreach { enableVectorizedReader =>
           withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true",
-            vectorizedReaderEnabledKey -> testVectorizedReader) {
+            vectorizedReaderEnabledKey -> enableVectorizedReader) {
             val count = sql("SELECT COUNT(p) FROM tmp")
             count.queryExecution.optimizedPlan.collect {
               case _: DataSourceV2ScanRelation =>
@@ -221,7 +220,7 @@ abstract class ParquetAggregatePushDownSuite
     }
   }
 
-  test("aggregate push down - query with filter not push down") {
+  test("aggregate push down - aggregate with data filter cannot be pushed 
down") {
     val data = Seq((-2, "abc", 2), (3, "def", 4), (6, "ghi", 2), (0, null, 19),
       (9, "mno", 7), (2, null, 7))
     withParquetTable(data, "t") {
@@ -240,6 +239,29 @@ abstract class ParquetAggregatePushDownSuite
     }
   }
 
+  test("aggregate push down - aggregate with partition filter can be pushed 
down") {
+    withTempPath { dir =>
+      spark.range(10).selectExpr("id", "id % 3 as p")
+        .write.partitionBy("p").parquet(dir.getCanonicalPath)
+      withTempView("tmp") {
+        
spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp");
+        Seq("false", "true").foreach { enableVectorizedReader =>
+          withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true",
+            vectorizedReaderEnabledKey -> enableVectorizedReader) {
+            val max = sql("SELECT max(id), min(id), count(id) FROM tmp WHERE p 
= 0")
+            max.queryExecution.optimizedPlan.collect {
+              case _: DataSourceV2ScanRelation =>
+                val expected_plan_fragment =
+                  "PushedAggregation: [MAX(id), MIN(id), COUNT(id)]"
+                checkKeywordsExistsInExplain(max, expected_plan_fragment)
+            }
+            checkAnswer(max, Seq(Row(9, 0, 4)))
+          }
+        }
+      }
+    }
+  }
+
   test("aggregate push down - push down only if all the aggregates can be 
pushed down") {
     val data = Seq((-2, "abc", 2), (3, "def", 4), (6, "ghi", 2), (0, null, 19),
       (9, "mno", 7), (2, null, 7))
@@ -356,10 +378,9 @@ abstract class ParquetAggregatePushDownSuite
       spark.createDataFrame(rdd, schema).write.parquet(file.getCanonicalPath)
       withTempView("test") {
         
spark.read.parquet(file.getCanonicalPath).createOrReplaceTempView("test")
-        val enableVectorizedReader = Seq("false", "true")
-        for (testVectorizedReader <- enableVectorizedReader) {
+        Seq("false", "true").foreach { enableVectorizedReader =>
           withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true",
-            vectorizedReaderEnabledKey -> testVectorizedReader) {
+            vectorizedReaderEnabledKey -> enableVectorizedReader) {
 
             val testMinWithAllTypes = sql("SELECT min(StringCol), 
min(BooleanCol), min(ByteCol), " +
               "min(BinaryCol), min(ShortCol), min(IntegerCol), min(LongCol), 
min(FloatCol), " +
@@ -474,10 +495,9 @@ abstract class ParquetAggregatePushDownSuite
   }
 
   test("aggregate push down - column name case sensitivity") {
-    val enableVectorizedReader = Seq("false", "true")
-    for (testVectorizedReader <- enableVectorizedReader) {
+    Seq("false", "true").foreach { enableVectorizedReader =>
       withSQLConf(SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED.key -> "true",
-        vectorizedReaderEnabledKey -> testVectorizedReader) {
+        vectorizedReaderEnabledKey -> enableVectorizedReader) {
         withTempPath { dir =>
           spark.range(10).selectExpr("id", "id % 3 as p")
             .write.partitionBy("p").parquet(dir.getCanonicalPath)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to