This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new c07eb181609 [SPARK-41636][SQL] Make sure `selectFilters` returns predicates in deterministic order c07eb181609 is described below commit c07eb18160952fab18a6d118ce2bb56e18d56741 Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Mon Aug 7 09:07:48 2023 +0900 [SPARK-41636][SQL] Make sure `selectFilters` returns predicates in deterministic order ### What changes were proposed in this pull request? Method `DataSourceStrategy#selectFilters`, which is used to determine "pushdown-able" filters, does not preserve the order of the input Seq[Expression] nor does it return the same order across the same plans. This is resulting in CodeGenerator cache misses even when the exact same LogicalPlan is executed. This PR to make sure `selectFilters` returns predicates in deterministic order. ### Why are the changes needed? Make sure `selectFilters` returns predicates in deterministic order, to reduce the probability of codegen cache misses. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? add new test. Closes #42265 from Hisoka-X/SPARK-41636_selectfilters_order. Authored-by: Jia Fan <fanjiaemi...@qq.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 9462dcd0e996dd940d4970dc75482f7d088ac2ae) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../sql/execution/datasources/DataSourceStrategy.scala | 6 ++++-- .../execution/datasources/DataSourceStrategySuite.scala | 14 ++++++++++++++ 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 5e6e0ad0392..94c2d2ffaca 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources import java.util.Locale +import scala.collection.immutable.ListMap import scala.collection.mutable import org.apache.hadoop.fs.Path @@ -670,9 +671,10 @@ object DataSourceStrategy // A map from original Catalyst expressions to corresponding translated data source filters. // If a predicate is not in this map, it means it cannot be pushed down. val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation) - val translatedMap: Map[Expression, Filter] = predicates.flatMap { p => + // SPARK-41636: we keep the order of the predicates to avoid CodeGenerator cache misses + val translatedMap: Map[Expression, Filter] = ListMap(predicates.flatMap { p => translateFilter(p, supportNestedPredicatePushdown).map(f => p -> f) - }.toMap + }: _*) val pushedFilters: Seq[Filter] = translatedMap.values.toSeq diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala index a35fb5f6271..2b9ec97bace 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala @@ -324,4 +324,18 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession { DataSourceStrategy.translateFilter(catalystFilter, true) } } + + test("SPARK-41636: selectFilters returns predicates in deterministic order") { + + val predicates = Seq(EqualTo($"id", 1), EqualTo($"id", 2), + EqualTo($"id", 3), EqualTo($"id", 4), EqualTo($"id", 5), EqualTo($"id", 6)) + + val (unhandledPredicates, pushedFilters, handledFilters) = + DataSourceStrategy.selectFilters(FakeRelation(), predicates) + assert(unhandledPredicates.equals(predicates)) + assert(pushedFilters.zipWithIndex.forall { case (f, i) => + f.equals(sources.EqualTo("id", i + 1)) + }) + assert(handledFilters.isEmpty) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org