This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0cdbda13b08 [SPARK-41017][SQL][FOLLOWUP] Push Filter with both 
deterministic and nondeterministic predicates
0cdbda13b08 is described below

commit 0cdbda13b08ad79c91d1d7d5912fb1120191dc56
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Tue Nov 22 10:40:24 2022 +0800

    [SPARK-41017][SQL][FOLLOWUP] Push Filter with both deterministic and 
nondeterministic predicates
    
    ### What changes were proposed in this pull request?
    
    This PR fixes a regression caused by 
https://github.com/apache/spark/pull/38511 . For `FROM t WHERE rand() > 0.5 AND 
col = 1`, we can still push down `col = 1` because we don't guarantee the 
predicates evaluation order within a `Filter`.
    
    This PR updates `ScanOperation` to consider this case and bring back the 
previous pushdown behavior.
    
    ### Why are the changes needed?
    
    fix perf regression
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    new tests
    
    Closes #38746 from cloud-fan/filter.
    
    Lead-authored-by: Wenchen Fan <wenc...@databricks.com>
    Co-authored-by: Wenchen Fan <cloud0...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/catalyst/planning/patterns.scala     | 21 ++++++++++++++++-----
 .../spark/sql/FileBasedDataSourceSuite.scala       | 22 ++++++++++++++++++++--
 2 files changed, 36 insertions(+), 7 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 3c35ba9b600..bbda9eb76b1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -137,11 +137,22 @@ object ScanOperation extends OperationHelper {
     val alwaysInline = 
SQLConf.get.getConf(SQLConf.COLLAPSE_PROJECT_ALWAYS_INLINE)
     val (fields, filters, child, _) = collectProjectsAndFilters(plan, 
alwaysInline)
     // `collectProjectsAndFilters` transforms the plan bottom-up, so the 
bottom-most filter are
-    // placed at the beginning of `filters` list. According to the SQL 
semantic, we can only
-    // push down the bottom deterministic filters.
-    val filtersCanPushDown = 
filters.takeWhile(_.deterministic).flatMap(splitConjunctivePredicates)
-    val filtersStayUp = filters.dropWhile(_.deterministic)
-    Some((fields.getOrElse(child.output), filtersStayUp, filtersCanPushDown, 
child))
+    // placed at the beginning of `filters` list. According to the SQL 
semantic, we cannot merge
+    // Filters if one or more of them are nondeterministic. This means we can 
only push down the
+    // bottom-most Filter, or more following deterministic Filters if the 
bottom-most Filter is
+    // also deterministic.
+    if (filters.isEmpty) {
+      Some((fields.getOrElse(child.output), Nil, Nil, child))
+    } else if (filters.head.deterministic) {
+      val filtersCanPushDown = filters.takeWhile(_.deterministic)
+        .flatMap(splitConjunctivePredicates)
+      val filtersStayUp = filters.dropWhile(_.deterministic)
+      Some((fields.getOrElse(child.output), filtersStayUp, filtersCanPushDown, 
child))
+    } else {
+      val filtersCanPushDown = splitConjunctivePredicates(filters.head)
+      val filtersStayUp = filters.drop(1)
+      Some((fields.getOrElse(child.output), filtersStayUp, filtersCanPushDown, 
child))
+    }
   }
 }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index 98cb54ccbbc..cf5f8d990f7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -30,10 +30,10 @@ import org.apache.hadoop.fs.{LocalFileSystem, Path}
 import org.apache.spark.SparkException
 import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
 import org.apache.spark.sql.TestingUDT.{IntervalUDT, NullData, NullUDT}
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
GreaterThan, Literal}
 import 
org.apache.spark.sql.catalyst.expressions.IntegralLiteralTestUtils.{negativeInt,
 positiveInt}
 import org.apache.spark.sql.catalyst.plans.logical.Filter
-import org.apache.spark.sql.execution.SimpleMode
+import org.apache.spark.sql.execution.{FileSourceScanLike, SimpleMode}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.datasources.FilePartition
 import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, FileScan}
@@ -1074,6 +1074,24 @@ class FileBasedDataSourceSuite extends QueryTest
       checkAnswer(df, Row("v1", "v2"))
     }
   }
+
+  test("SPARK-41017: filter pushdown with nondeterministic predicates") {
+    withTempPath { path =>
+      val pathStr = path.getCanonicalPath
+      spark.range(10).write.parquet(pathStr)
+      Seq("parquet", "").foreach { useV1SourceList =>
+        withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> useV1SourceList) {
+          val scan = spark.read.parquet(pathStr)
+          val df = scan.where(rand() > 0.5 && $"id" > 5)
+          val filters = df.queryExecution.executedPlan.collect {
+            case f: FileSourceScanLike => f.dataFilters
+            case b: BatchScanExec => b.scan.asInstanceOf[FileScan].dataFilters
+          }.flatten
+          assert(filters.contains(GreaterThan(scan.logicalPlan.output.head, 
Literal(5L))))
+        }
+      }
+    }
+  }
 }
 
 object TestingUDT {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to