This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a3b8420e5eec [SPARK-48431][SQL] Do not forward predicates on collated 
columns to file readers
a3b8420e5eec is described below

commit a3b8420e5eecc3ce33528bc7c73967a64b1f670e
Author: Ole Sasse <ole.sa...@databricks.com>
AuthorDate: Wed May 29 13:52:33 2024 -0700

    [SPARK-48431][SQL] Do not forward predicates on collated columns to file 
readers
    
    ### What changes were proposed in this pull request?
    
    [SPARK-47657](https://issues.apache.org/jira/browse/SPARK-47657) allows to 
push filters on collated columns to file sources that support it. If such 
filters are pushed to file sources, those file sources must not push those 
filters to the actual file readers (i.e. parquet or csv readers), because there 
is no guarantee that those support collations.
    
    In this PR we are widening filters on collations to be AlwaysTrue when we 
translate filters for file sources.
    
    ### Why are the changes needed?
    
    Without this, no file source can implement filter pushdown
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Added unit tests. No component tests are possible because there is no file 
source with filter pushdown yet.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #46760 from olaky/filter-translation-for-collations.
    
    Authored-by: Ole Sasse <ole.sa...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../execution/datasources/DataSourceStrategy.scala | 31 +++++++++---
 .../datasources/DataSourceStrategySuite.scala      | 55 +++++++++++++++++++++-
 2 files changed, 78 insertions(+), 8 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 22b60caf2669..7cda347ce581 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -54,7 +54,7 @@ import 
org.apache.spark.sql.execution.streaming.StreamingRelation
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.{PartitioningUtils => 
CatalystPartitioningUtils}
-import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils}
 import org.apache.spark.unsafe.types.UTF8String
 
 /**
@@ -595,6 +595,16 @@ object DataSourceStrategy
       translatedFilterToExpr: Option[mutable.HashMap[sources.Filter, 
Expression]],
       nestedPredicatePushdownEnabled: Boolean)
     : Option[Filter] = {
+
+    def translateAndRecordLeafNodeFilter(filter: Expression): Option[Filter] = 
{
+      val translatedFilter =
+        translateLeafNodeFilter(filter, 
PushableColumn(nestedPredicatePushdownEnabled))
+      if (translatedFilter.isDefined && translatedFilterToExpr.isDefined) {
+        translatedFilterToExpr.get(translatedFilter.get) = predicate
+      }
+      translatedFilter
+    }
+
     predicate match {
       case expressions.And(left, right) =>
         // See SPARK-12218 for detailed discussion
@@ -621,16 +631,25 @@ object DataSourceStrategy
             right, translatedFilterToExpr, nestedPredicatePushdownEnabled)
         } yield sources.Or(leftFilter, rightFilter)
 
+      case notNull @ expressions.IsNotNull(_: AttributeReference) =>
+        // Not null filters on attribute references can always be pushed, also 
for collated columns.
+        translateAndRecordLeafNodeFilter(notNull)
+
+      case isNull @ expressions.IsNull(_: AttributeReference) =>
+        // Is null filters on attribute references can always be pushed, also 
for collated columns.
+        translateAndRecordLeafNodeFilter(isNull)
+
+      case p if p.references.exists(ref => 
SchemaUtils.hasNonUTF8BinaryCollation(ref.dataType)) =>
+        // The filter cannot be pushed and we widen it to be AlwaysTrue(). 
This is only valid if
+        // the result of the filter is not negated by a Not expression it is 
wrapped in.
+        translateAndRecordLeafNodeFilter(Literal.TrueLiteral)
+
       case expressions.Not(child) =>
         translateFilterWithMapping(child, translatedFilterToExpr, 
nestedPredicatePushdownEnabled)
           .map(sources.Not)
 
       case other =>
-        val filter = translateLeafNodeFilter(other, 
PushableColumn(nestedPredicatePushdownEnabled))
-        if (filter.isDefined && translatedFilterToExpr.isDefined) {
-          translatedFilterToExpr.get(filter.get) = predicate
-        }
-        filter
+        translateAndRecordLeafNodeFilter(other)
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala
index 2b9ec97bace1..3c09dee990eb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala
@@ -327,8 +327,10 @@ class DataSourceStrategySuite extends PlanTest with 
SharedSparkSession {
 
   test("SPARK-41636: selectFilters returns predicates in deterministic order") 
{
 
-    val predicates = Seq(EqualTo($"id", 1), EqualTo($"id", 2),
-      EqualTo($"id", 3), EqualTo($"id", 4), EqualTo($"id", 5), EqualTo($"id", 
6))
+    val idColAttribute = AttributeReference("id", IntegerType)()
+    val predicates = Seq(EqualTo(idColAttribute, 1), EqualTo(idColAttribute, 
2),
+      EqualTo(idColAttribute, 3), EqualTo(idColAttribute, 4), 
EqualTo(idColAttribute, 5),
+      EqualTo(idColAttribute, 6))
 
     val (unhandledPredicates, pushedFilters, handledFilters) =
       DataSourceStrategy.selectFilters(FakeRelation(), predicates)
@@ -338,4 +340,53 @@ class DataSourceStrategySuite extends PlanTest with 
SharedSparkSession {
     })
     assert(handledFilters.isEmpty)
   }
+
+  test("SPARK-48431: Push filters on columns with UTF8_BINARY collation") {
+    val colAttr = $"col".string("UTF8_BINARY")
+    testTranslateFilter(EqualTo(colAttr, Literal("value")), 
Some(sources.EqualTo("col", "value")))
+    testTranslateFilter(Not(EqualTo(colAttr, Literal("value"))),
+      Some(sources.Not(sources.EqualTo("col", "value"))))
+    testTranslateFilter(LessThan(colAttr, Literal("value")),
+      Some(sources.LessThan("col", "value")))
+    testTranslateFilter(LessThan(colAttr, Literal("value")), 
Some(sources.LessThan("col", "value")))
+    testTranslateFilter(LessThanOrEqual(colAttr, Literal("value")),
+      Some(sources.LessThanOrEqual("col", "value")))
+    testTranslateFilter(GreaterThan(colAttr, Literal("value")),
+      Some(sources.GreaterThan("col", "value")))
+    testTranslateFilter(GreaterThanOrEqual(colAttr, Literal("value")),
+      Some(sources.GreaterThanOrEqual("col", "value")))
+    testTranslateFilter(IsNotNull(colAttr), Some(sources.IsNotNull("col")))
+  }
+
+  for (collation <- Seq("UTF8_BINARY_LCASE", "UNICODE")) {
+    test(s"SPARK-48431: Filter pushdown on columns with $collation collation") 
{
+      val colAttr = $"col".string(collation)
+
+      // No pushdown for all comparison based filters.
+      testTranslateFilter(EqualTo(colAttr, Literal("value")), 
Some(sources.AlwaysTrue))
+      testTranslateFilter(LessThan(colAttr, Literal("value")), 
Some(sources.AlwaysTrue))
+      testTranslateFilter(LessThan(colAttr, Literal("value")), 
Some(sources.AlwaysTrue))
+      testTranslateFilter(LessThanOrEqual(colAttr, Literal("value")), 
Some(sources.AlwaysTrue))
+      testTranslateFilter(GreaterThan(colAttr, Literal("value")), 
Some(sources.AlwaysTrue))
+      testTranslateFilter(GreaterThanOrEqual(colAttr, Literal("value")), 
Some(sources.AlwaysTrue))
+
+      // Allow pushdown of Is(Not)Null filter.
+      testTranslateFilter(IsNotNull(colAttr), Some(sources.IsNotNull("col")))
+      testTranslateFilter(IsNull(colAttr), Some(sources.IsNull("col")))
+
+      // Top level filter splitting at And and Or.
+      testTranslateFilter(And(EqualTo(colAttr, Literal("value")), 
IsNotNull(colAttr)),
+        Some(sources.And(sources.AlwaysTrue, sources.IsNotNull("col"))))
+      testTranslateFilter(Or(EqualTo(colAttr, Literal("value")), 
IsNotNull(colAttr)),
+        Some(sources.Or(sources.AlwaysTrue, sources.IsNotNull("col"))))
+
+      // Different cases involving Not.
+      testTranslateFilter(Not(EqualTo(colAttr, Literal("value"))), 
Some(sources.AlwaysTrue))
+      testTranslateFilter(And(Not(EqualTo(colAttr, Literal("value"))), 
IsNotNull(colAttr)),
+        Some(sources.And(sources.AlwaysTrue, sources.IsNotNull("col"))))
+      // This filter would work, but we want to keep the translation logic 
simple.
+      testTranslateFilter(And(EqualTo(colAttr, Literal("value")), 
Not(IsNotNull(colAttr))),
+        Some(sources.And(sources.AlwaysTrue, sources.AlwaysTrue)))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to