This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 0cdbda13b08 [SPARK-41017][SQL][FOLLOWUP] Push Filter with both deterministic and nondeterministic predicates 0cdbda13b08 is described below commit 0cdbda13b08ad79c91d1d7d5912fb1120191dc56 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Tue Nov 22 10:40:24 2022 +0800 [SPARK-41017][SQL][FOLLOWUP] Push Filter with both deterministic and nondeterministic predicates ### What changes were proposed in this pull request? This PR fixes a regression caused by https://github.com/apache/spark/pull/38511 . For `FROM t WHERE rand() > 0.5 AND col = 1`, we can still push down `col = 1` because we don't guarantee the predicates evaluation order within a `Filter`. This PR updates `ScanOperation` to consider this case and bring back the previous pushdown behavior. ### Why are the changes needed? fix perf regression ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? new tests Closes #38746 from cloud-fan/filter. Lead-authored-by: Wenchen Fan <wenc...@databricks.com> Co-authored-by: Wenchen Fan <cloud0...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/planning/patterns.scala | 21 ++++++++++++++++----- .../spark/sql/FileBasedDataSourceSuite.scala | 22 ++++++++++++++++++++-- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala index 3c35ba9b600..bbda9eb76b1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala @@ -137,11 +137,22 @@ object ScanOperation extends OperationHelper { val alwaysInline = SQLConf.get.getConf(SQLConf.COLLAPSE_PROJECT_ALWAYS_INLINE) val (fields, filters, child, _) = collectProjectsAndFilters(plan, alwaysInline) // `collectProjectsAndFilters` transforms the plan bottom-up, so the bottom-most filter are - // placed at the beginning of `filters` list. According to the SQL semantic, we can only - // push down the bottom deterministic filters. - val filtersCanPushDown = filters.takeWhile(_.deterministic).flatMap(splitConjunctivePredicates) - val filtersStayUp = filters.dropWhile(_.deterministic) - Some((fields.getOrElse(child.output), filtersStayUp, filtersCanPushDown, child)) + // placed at the beginning of `filters` list. According to the SQL semantic, we cannot merge + // Filters if one or more of them are nondeterministic. This means we can only push down the + // bottom-most Filter, or more following deterministic Filters if the bottom-most Filter is + // also deterministic. + if (filters.isEmpty) { + Some((fields.getOrElse(child.output), Nil, Nil, child)) + } else if (filters.head.deterministic) { + val filtersCanPushDown = filters.takeWhile(_.deterministic) + .flatMap(splitConjunctivePredicates) + val filtersStayUp = filters.dropWhile(_.deterministic) + Some((fields.getOrElse(child.output), filtersStayUp, filtersCanPushDown, child)) + } else { + val filtersCanPushDown = splitConjunctivePredicates(filters.head) + val filtersStayUp = filters.drop(1) + Some((fields.getOrElse(child.output), filtersStayUp, filtersCanPushDown, child)) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index 98cb54ccbbc..cf5f8d990f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -30,10 +30,10 @@ import org.apache.hadoop.fs.{LocalFileSystem, Path} import org.apache.spark.SparkException import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT} -import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GreaterThan, Literal} import org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt, positiveInt} import org.apache.spark.sql.catalyst.plans.logical.Filter -import org.apache.spark.sql.execution.SimpleMode +import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan} @@ -1074,6 +1074,24 @@ class FileBasedDataSourceSuite extends QueryTest checkAnswer(df, Row("v1", "v2")) } } + + test("SPARK-41017: filter pushdown with nondeterministic predicates") { + withTempPath { path => + val pathStr = path.getCanonicalPath + spark.range(10).write.parquet(pathStr) + Seq("parquet", "").foreach { useV1SourceList => + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1SourceList) { + val scan = spark.read.parquet(pathStr) + val df = scan.where(rand() > 0.5 && $"id" > 5) + val filters = df.queryExecution.executedPlan.collect { + case f: FileSourceScanLike => f.dataFilters + case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters + }.flatten + assert(filters.contains(GreaterThan(scan.logicalPlan.output.head, Literal(5L)))) + } + } + } + } } object TestingUDT { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org