This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c4cd193b4d4 [SPARK-40565][SQL] Don't push non-deterministic filters to V2 file sources c4cd193b4d4 is described below commit c4cd193b4d43d951255d9914f9b6f099d140fe1c Author: Adam Binford <adam...@gmail.com> AuthorDate: Thu Oct 6 22:57:08 2022 -0700 [SPARK-40565][SQL] Don't push non-deterministic filters to V2 file sources ### What changes were proposed in this pull request? Prevent non-deterministic filters from being pushed to V2 file sources to match V1 file source and prevent certain issues. ### Why are the changes needed? Pushing non-determinstic filters can cause exceptions (in the case of things that need to be initialized like `rand`), or different behavior than V1 in the case of using non-determinstic UDFs for collecting metrics. ### Does this PR introduce _any_ user-facing change? Fixes some cases where queries could fail with filters involving functions like `rand`, and changes behavior to match V1 sources. ### How was this patch tested? New UT Closes #38003 from Kimahriman/v2-nondeterminstic-filter-pushdown. Authored-by: Adam Binford <adam...@gmail.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../execution/datasources/v2/FileScanBuilder.scala | 5 +++-- .../PruneFileSourcePartitionsSuite.scala | 23 ++++++++++++++++++++-- 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala index ae82eecd313..447a36fe622 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala @@ -70,8 +70,9 @@ abstract class FileScanBuilder( } override def pushFilters(filters: Seq[Expression]): Seq[Expression] = { + val (deterministicFilters, nonDeterminsticFilters) = filters.partition(_.deterministic) val (partitionFilters, dataFilters) = - DataSourceUtils.getPartitionFiltersAndDataFilters(partitionSchema, filters) + DataSourceUtils.getPartitionFiltersAndDataFilters(partitionSchema, deterministicFilters) this.partitionFilters = partitionFilters this.dataFilters = dataFilters val translatedFilters = mutable.ArrayBuffer.empty[sources.Filter] @@ -82,7 +83,7 @@ abstract class FileScanBuilder( } } pushedDataFilters = pushDataFilters(translatedFilters.toArray) - dataFilters + dataFilters ++ nonDeterminsticFilters } override def pushedFilters: Array[Predicate] = pushedDataFilters.map(_.toV2) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala index b67a574421e..b1234aae151 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitionsSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat -import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec import org.apache.spark.sql.functions.broadcast import org.apache.spark.sql.internal.SQLConf @@ -140,6 +140,24 @@ class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase with Shared } } + test("SPARK-40565: don't push down non-deterministic filters for V2 file sources") { + // Force datasource v2 for parquet + withSQLConf((SQLConf.USE_V1_SOURCE_LIST.key, "")) { + withTempPath { dir => + spark.range(10).coalesce(1).selectExpr("id", "id % 3 as p") + .write.partitionBy("p").parquet(dir.getCanonicalPath) + withTempView("tmp") { + spark.read.parquet(dir.getCanonicalPath).createOrReplaceTempView("tmp") + assertPrunedPartitions("SELECT * FROM tmp WHERE rand() > 0.5", 3, "") + assertPrunedPartitions("SELECT * FROM tmp WHERE p > rand()", 3, "") + assertPrunedPartitions("SELECT * FROM tmp WHERE p = 0 AND rand() > 0.5", + 1, + "(tmp.p = 0)") + } + } + } + } + protected def collectPartitionFiltersFn(): PartialFunction[SparkPlan, Seq[Expression]] = { case scan: FileSourceScanExec => scan.partitionFilters } @@ -147,7 +165,8 @@ class PruneFileSourcePartitionsSuite extends PrunePartitionSuiteBase with Shared override def getScanExecPartitionSize(plan: SparkPlan): Long = { plan.collectFirst { case p: FileSourceScanExec => p.selectedPartitions.length - case b: BatchScanExec => b.partitions.size + case BatchScanExec(_, scan: FileScan, _, _, _, _) => + scan.fileIndex.listFiles(scan.partitionFilters, scan.dataFilters).length }.get } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org