This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 109247f  [SPARK-35985][SQL] push partitionFilters for empty 
readDataSchema
109247f is described below

commit 109247f02e9c8d4da8457a8fa490d5091a730d14
Author: Steven Aerts <steven.ae...@airties.com>
AuthorDate: Fri Jul 16 04:52:46 2021 +0000

    [SPARK-35985][SQL] push partitionFilters for empty readDataSchema
    
    this commit makes sure that for File Source V2 partition filters are
    also taken into account when the readDataSchema is empty.
    This is the case for queries like:
    
        SELECT count(*) FROM tbl WHERE partition=foo
        SELECT input_file_name() FROM tbl WHERE partition=foo
    
    ### What changes were proposed in this pull request?
    
    As described in SPARK-35985 there is bug in the File Datasource V2 which 
prevents it to push down to the FileScanner for queries like the ones listed 
above.
    
    ### Why are the changes needed?
    
    If partitions filters are not pushed down, the whole dataset will be 
scanned while only one partition is interesting.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    
    An extra test was added which relies on the output of explain, as is done 
in other places.
    
    Closes #33191 from steven-aerts/SPARK-35985.
    
    Authored-by: Steven Aerts <steven.ae...@airties.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit f06aa4a3f34d89e7821ffee19a43a02773dc52e1)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../datasources/PruneFileSourcePartitions.scala     |  2 +-
 .../execution/PruneFileSourcePartitionsSuite.scala  | 21 +++++++++++++++++++--
 .../hive/execution/PrunePartitionSuiteBase.scala    |  4 +++-
 3 files changed, 23 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
index a88dfb2..0927027 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala
@@ -120,7 +120,7 @@ private[sql] object PruneFileSourcePartitions
 
     case op @ PhysicalOperation(projects, filters,
         v2Relation @ DataSourceV2ScanRelation(_, scan: FileScan, output))
-        if filters.nonEmpty && scan.readDataSchema.nonEmpty =>
+        if filters.nonEmpty =>
       val (partitionKeyFilters, dataFilters) =
         getPartitionKeyFiltersAndDataFilters(scan.sparkSession, v2Relation,
           scan.readPartitionSchema, filters, output)
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
index ab37645b..a16545a 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.{CatalogFileIndex, 
HadoopFsRelation, LogicalRelation, PruneFileSourcePartitions}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
+import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
 import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.apache.spark.sql.functions.broadcast
 import org.apache.spark.sql.internal.SQLConf
@@ -109,9 +110,25 @@ class PruneFileSourcePartitionsSuite extends 
PrunePartitionSuiteBase {
     }
   }
 
+  test("SPARK-35985 push filters for empty read schema") {
+    // Force datasource v2 for parquet
+    withSQLConf((SQLConf.USE_V1_SOURCE_LIST.key, "")) {
+      withTempPath { dir =>
+        spark.range(10).selectExpr("id", "id % 3 as p")
+          .write.partitionBy("p").parquet(dir.getCanonicalPath)
+        withTempView("tmp") {
+          
spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp");
+          assertPrunedPartitions("SELECT COUNT(*) FROM tmp WHERE p = 0", 1, 
"(tmp.p = 0)")
+          assertPrunedPartitions("SELECT input_file_name() FROM tmp WHERE p = 
0", 1, "(tmp.p = 0)")
+        }
+      }
+    }
+  }
+
   override def getScanExecPartitionSize(plan: SparkPlan): Long = {
     plan.collectFirst {
-      case p: FileSourceScanExec => p
-    }.get.selectedPartitions.length
+      case p: FileSourceScanExec => p.selectedPartitions.length
+      case b: BatchScanExec => b.partitions.size
+    }.get
   }
 }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala
index 3b13cee..2a690a8 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PrunePartitionSuiteBase.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.execution
 import org.apache.spark.sql.StatisticsCollectionTestBase
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
BinaryOperator, Expression, IsNotNull, Literal}
 import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
+import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf.ADAPTIVE_EXECUTION_ENABLED
 
@@ -94,9 +95,10 @@ abstract class PrunePartitionSuiteBase extends 
StatisticsCollectionTestBase with
     val plan = qe.sparkPlan
     assert(getScanExecPartitionSize(plan) == expectedPartitionCount)
 
-    val pushedDownPartitionFilters = qe.executedPlan.collectFirst {
+    val pushedDownPartitionFilters = plan.collectFirst {
       case scan: FileSourceScanExec => scan.partitionFilters
       case scan: HiveTableScanExec => scan.partitionPruningPred
+      case BatchScanExec(_, scan: FileScan, _) => scan.partitionFilters
     }.map(exps => exps.filterNot(e => e.isInstanceOf[IsNotNull]))
     val pushedFilters = pushedDownPartitionFilters.map(filters => {
       filters.foldLeft("")((currentStr, exp) => {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to