This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new a3b8420e5eec [SPARK-48431][SQL] Do not forward predicates on collated columns to file readers a3b8420e5eec is described below commit a3b8420e5eecc3ce33528bc7c73967a64b1f670e Author: Ole Sasse <ole.sa...@databricks.com> AuthorDate: Wed May 29 13:52:33 2024 -0700 [SPARK-48431][SQL] Do not forward predicates on collated columns to file readers ### What changes were proposed in this pull request? [SPARK-47657](https://issues.apache.org/jira/browse/SPARK-47657) allows to push filters on collated columns to file sources that support it. If such filters are pushed to file sources, those file sources must not push those filters to the actual file readers (i.e. parquet or csv readers), because there is no guarantee that those support collations. In this PR we are widening filters on collations to be AlwaysTrue when we translate filters for file sources. ### Why are the changes needed? Without this, no file source can implement filter pushdown ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests. No component tests are possible because there is no file source with filter pushdown yet. ### Was this patch authored or co-authored using generative AI tooling? No Closes #46760 from olaky/filter-translation-for-collations. Authored-by: Ole Sasse <ole.sa...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../execution/datasources/DataSourceStrategy.scala | 31 +++++++++--- .../datasources/DataSourceStrategySuite.scala | 55 +++++++++++++++++++++- 2 files changed, 78 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 22b60caf2669..7cda347ce581 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -54,7 +54,7 @@ import org.apache.spark.sql.execution.streaming.StreamingRelation import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ import org.apache.spark.sql.util.{PartitioningUtils => CatalystPartitioningUtils} -import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.sql.util.{CaseInsensitiveStringMap, SchemaUtils} import org.apache.spark.unsafe.types.UTF8String /** @@ -595,6 +595,16 @@ object DataSourceStrategy translatedFilterToExpr: Option[mutable.HashMap[sources.Filter, Expression]], nestedPredicatePushdownEnabled: Boolean) : Option[Filter] = { + + def translateAndRecordLeafNodeFilter(filter: Expression): Option[Filter] = { + val translatedFilter = + translateLeafNodeFilter(filter, PushableColumn(nestedPredicatePushdownEnabled)) + if (translatedFilter.isDefined && translatedFilterToExpr.isDefined) { + translatedFilterToExpr.get(translatedFilter.get) = predicate + } + translatedFilter + } + predicate match { case expressions.And(left, right) => // See SPARK-12218 for detailed discussion @@ -621,16 +631,25 @@ object DataSourceStrategy right, translatedFilterToExpr, nestedPredicatePushdownEnabled) } yield sources.Or(leftFilter, rightFilter) + case notNull @ expressions.IsNotNull(_: AttributeReference) => + // Not null filters on attribute references can always be pushed, also for collated columns. + translateAndRecordLeafNodeFilter(notNull) + + case isNull @ expressions.IsNull(_: AttributeReference) => + // Is null filters on attribute references can always be pushed, also for collated columns. + translateAndRecordLeafNodeFilter(isNull) + + case p if p.references.exists(ref => SchemaUtils.hasNonUTF8BinaryCollation(ref.dataType)) => + // The filter cannot be pushed and we widen it to be AlwaysTrue(). This is only valid if + // the result of the filter is not negated by a Not expression it is wrapped in. + translateAndRecordLeafNodeFilter(Literal.TrueLiteral) + case expressions.Not(child) => translateFilterWithMapping(child, translatedFilterToExpr, nestedPredicatePushdownEnabled) .map(sources.Not) case other => - val filter = translateLeafNodeFilter(other, PushableColumn(nestedPredicatePushdownEnabled)) - if (filter.isDefined && translatedFilterToExpr.isDefined) { - translatedFilterToExpr.get(filter.get) = predicate - } - filter + translateAndRecordLeafNodeFilter(other) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala index 2b9ec97bace1..3c09dee990eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala @@ -327,8 +327,10 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession { test("SPARK-41636: selectFilters returns predicates in deterministic order") { - val predicates = Seq(EqualTo($"id", 1), EqualTo($"id", 2), - EqualTo($"id", 3), EqualTo($"id", 4), EqualTo($"id", 5), EqualTo($"id", 6)) + val idColAttribute = AttributeReference("id", IntegerType)() + val predicates = Seq(EqualTo(idColAttribute, 1), EqualTo(idColAttribute, 2), + EqualTo(idColAttribute, 3), EqualTo(idColAttribute, 4), EqualTo(idColAttribute, 5), + EqualTo(idColAttribute, 6)) val (unhandledPredicates, pushedFilters, handledFilters) = DataSourceStrategy.selectFilters(FakeRelation(), predicates) @@ -338,4 +340,53 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession { }) assert(handledFilters.isEmpty) } + + test("SPARK-48431: Push filters on columns with UTF8_BINARY collation") { + val colAttr = $"col".string("UTF8_BINARY") + testTranslateFilter(EqualTo(colAttr, Literal("value")), Some(sources.EqualTo("col", "value"))) + testTranslateFilter(Not(EqualTo(colAttr, Literal("value"))), + Some(sources.Not(sources.EqualTo("col", "value")))) + testTranslateFilter(LessThan(colAttr, Literal("value")), + Some(sources.LessThan("col", "value"))) + testTranslateFilter(LessThan(colAttr, Literal("value")), Some(sources.LessThan("col", "value"))) + testTranslateFilter(LessThanOrEqual(colAttr, Literal("value")), + Some(sources.LessThanOrEqual("col", "value"))) + testTranslateFilter(GreaterThan(colAttr, Literal("value")), + Some(sources.GreaterThan("col", "value"))) + testTranslateFilter(GreaterThanOrEqual(colAttr, Literal("value")), + Some(sources.GreaterThanOrEqual("col", "value"))) + testTranslateFilter(IsNotNull(colAttr), Some(sources.IsNotNull("col"))) + } + + for (collation <- Seq("UTF8_BINARY_LCASE", "UNICODE")) { + test(s"SPARK-48431: Filter pushdown on columns with $collation collation") { + val colAttr = $"col".string(collation) + + // No pushdown for all comparison based filters. + testTranslateFilter(EqualTo(colAttr, Literal("value")), Some(sources.AlwaysTrue)) + testTranslateFilter(LessThan(colAttr, Literal("value")), Some(sources.AlwaysTrue)) + testTranslateFilter(LessThan(colAttr, Literal("value")), Some(sources.AlwaysTrue)) + testTranslateFilter(LessThanOrEqual(colAttr, Literal("value")), Some(sources.AlwaysTrue)) + testTranslateFilter(GreaterThan(colAttr, Literal("value")), Some(sources.AlwaysTrue)) + testTranslateFilter(GreaterThanOrEqual(colAttr, Literal("value")), Some(sources.AlwaysTrue)) + + // Allow pushdown of Is(Not)Null filter. + testTranslateFilter(IsNotNull(colAttr), Some(sources.IsNotNull("col"))) + testTranslateFilter(IsNull(colAttr), Some(sources.IsNull("col"))) + + // Top level filter splitting at And and Or. + testTranslateFilter(And(EqualTo(colAttr, Literal("value")), IsNotNull(colAttr)), + Some(sources.And(sources.AlwaysTrue, sources.IsNotNull("col")))) + testTranslateFilter(Or(EqualTo(colAttr, Literal("value")), IsNotNull(colAttr)), + Some(sources.Or(sources.AlwaysTrue, sources.IsNotNull("col")))) + + // Different cases involving Not. + testTranslateFilter(Not(EqualTo(colAttr, Literal("value"))), Some(sources.AlwaysTrue)) + testTranslateFilter(And(Not(EqualTo(colAttr, Literal("value"))), IsNotNull(colAttr)), + Some(sources.And(sources.AlwaysTrue, sources.IsNotNull("col")))) + // This filter would work, but we want to keep the translation logic simple. + testTranslateFilter(And(EqualTo(colAttr, Literal("value")), Not(IsNotNull(colAttr))), + Some(sources.And(sources.AlwaysTrue, sources.AlwaysTrue))) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org