This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c4cd193b4d4 [SPARK-40565][SQL] Don't push non-deterministic filters to 
V2 file sources
c4cd193b4d4 is described below

commit c4cd193b4d43d951255d9914f9b6f099d140fe1c
Author: Adam Binford <adam...@gmail.com>
AuthorDate: Thu Oct 6 22:57:08 2022 -0700

    [SPARK-40565][SQL] Don't push non-deterministic filters to V2 file sources
    
    ### What changes were proposed in this pull request?
    
    Prevent non-deterministic filters from being pushed to V2 file sources to 
match V1 file source and prevent certain issues.
    
    ### Why are the changes needed?
    
    Pushing non-determinstic filters can cause exceptions (in the case of 
things that need to be initialized like `rand`), or different behavior than V1 
in the case of using non-determinstic UDFs for collecting metrics.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Fixes some cases where queries could fail with filters involving functions 
like `rand`, and changes behavior to match V1 sources.
    
    ### How was this patch tested?
    
    New UT
    
    Closes #38003 from Kimahriman/v2-nondeterminstic-filter-pushdown.
    
    Authored-by: Adam Binford <adam...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../execution/datasources/v2/FileScanBuilder.scala |  5 +++--
 .../PruneFileSourcePartitionsSuite.scala           | 23 ++++++++++++++++++++--
 2 files changed, 24 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
index ae82eecd313..447a36fe622 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
@@ -70,8 +70,9 @@ abstract class FileScanBuilder(
   }
 
   override def pushFilters(filters: Seq[Expression]): Seq[Expression] = {
+    val (deterministicFilters, nonDeterminsticFilters) = 
filters.partition(_.deterministic)
     val (partitionFilters, dataFilters) =
-      DataSourceUtils.getPartitionFiltersAndDataFilters(partitionSchema, 
filters)
+      DataSourceUtils.getPartitionFiltersAndDataFilters(partitionSchema, 
deterministicFilters)
     this.partitionFilters = partitionFilters
     this.dataFilters = dataFilters
     val translatedFilters = mutable.ArrayBuffer.empty[sources.Filter]
@@ -82,7 +83,7 @@ abstract class FileScanBuilder(
       }
     }
     pushedDataFilters = pushDataFilters(translatedFilters.toArray)
-    dataFilters
+    dataFilters ++ nonDeterminsticFilters
   }
 
   override def pushedFilters: Array[Predicate] = pushedDataFilters.map(_.toV2)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala
index b67a574421e..b1234aae151 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, 
LogicalPlan, Project
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
-import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
+import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
 import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
 import org.apache.spark.sql.functions.broadcast
 import org.apache.spark.sql.internal.SQLConf
@@ -140,6 +140,24 @@ class PruneFileSourcePartitionsSuite extends 
PrunePartitionSuiteBase with Shared
     }
   }
 
+  test("SPARK-40565: don't push down non-deterministic filters for V2 file 
sources") {
+    // Force datasource v2 for parquet
+    withSQLConf((SQLConf.USE_V1_SOURCE_LIST.key, "")) {
+      withTempPath { dir =>
+        spark.range(10).coalesce(1).selectExpr("id", "id % 3 as p")
+          .write.partitionBy("p").parquet(dir.getCanonicalPath)
+        withTempView("tmp") {
+          
spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp")
+          assertPrunedPartitions("SELECT * FROM tmp WHERE rand() > 0.5", 3, "")
+          assertPrunedPartitions("SELECT * FROM tmp WHERE p > rand()", 3, "")
+          assertPrunedPartitions("SELECT * FROM tmp WHERE p = 0 AND rand() > 
0.5",
+            1,
+            "(tmp.p = 0)")
+        }
+      }
+    }
+  }
+
   protected def collectPartitionFiltersFn(): PartialFunction[SparkPlan, 
Seq[Expression]] = {
     case scan: FileSourceScanExec => scan.partitionFilters
   }
@@ -147,7 +165,8 @@ class PruneFileSourcePartitionsSuite extends 
PrunePartitionSuiteBase with Shared
   override def getScanExecPartitionSize(plan: SparkPlan): Long = {
     plan.collectFirst {
       case p: FileSourceScanExec => p.selectedPartitions.length
-      case b: BatchScanExec => b.partitions.size
+      case BatchScanExec(_, scan: FileScan, _, _, _, _) =>
+        scan.fileIndex.listFiles(scan.partitionFilters, 
scan.dataFilters).length
     }.get
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to