This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new c07eb181609 [SPARK-41636][SQL] Make sure `selectFilters` returns 
predicates in deterministic order
c07eb181609 is described below

commit c07eb18160952fab18a6d118ce2bb56e18d56741
Author: Jia Fan <fanjiaemi...@qq.com>
AuthorDate: Mon Aug 7 09:07:48 2023 +0900

    [SPARK-41636][SQL] Make sure `selectFilters` returns predicates in 
deterministic order
    
    ### What changes were proposed in this pull request?
    Method `DataSourceStrategy#selectFilters`, which is used to determine 
"pushdown-able" filters, does not preserve the order of the input 
Seq[Expression] nor does it return the same order across the same plans. This 
is resulting in CodeGenerator cache misses even when the exact same LogicalPlan 
is executed.
    
    This PR to make sure `selectFilters` returns predicates in deterministic 
order.
    
    ### Why are the changes needed?
    Make sure `selectFilters` returns predicates in deterministic order, to 
reduce the probability of codegen cache misses.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    add new test.
    
    Closes #42265 from Hisoka-X/SPARK-41636_selectfilters_order.
    
    Authored-by: Jia Fan <fanjiaemi...@qq.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit 9462dcd0e996dd940d4970dc75482f7d088ac2ae)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../sql/execution/datasources/DataSourceStrategy.scala     |  6 ++++--
 .../execution/datasources/DataSourceStrategySuite.scala    | 14 ++++++++++++++
 2 files changed, 18 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 5e6e0ad0392..94c2d2ffaca 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources
 
 import java.util.Locale
 
+import scala.collection.immutable.ListMap
 import scala.collection.mutable
 
 import org.apache.hadoop.fs.Path
@@ -670,9 +671,10 @@ object DataSourceStrategy
     // A map from original Catalyst expressions to corresponding translated 
data source filters.
     // If a predicate is not in this map, it means it cannot be pushed down.
     val supportNestedPredicatePushdown = 
DataSourceUtils.supportNestedPredicatePushdown(relation)
-    val translatedMap: Map[Expression, Filter] = predicates.flatMap { p =>
+    // SPARK-41636: we keep the order of the predicates to avoid CodeGenerator 
cache misses
+    val translatedMap: Map[Expression, Filter] = ListMap(predicates.flatMap { 
p =>
       translateFilter(p, supportNestedPredicatePushdown).map(f => p -> f)
-    }.toMap
+    }: _*)
 
     val pushedFilters: Seq[Filter] = translatedMap.values.toSeq
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala
index a35fb5f6271..2b9ec97bace 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategySuite.scala
@@ -324,4 +324,18 @@ class DataSourceStrategySuite extends PlanTest with 
SharedSparkSession {
       DataSourceStrategy.translateFilter(catalystFilter, true)
     }
   }
+
+  test("SPARK-41636: selectFilters returns predicates in deterministic order") 
{
+
+    val predicates = Seq(EqualTo($"id", 1), EqualTo($"id", 2),
+      EqualTo($"id", 3), EqualTo($"id", 4), EqualTo($"id", 5), EqualTo($"id", 
6))
+
+    val (unhandledPredicates, pushedFilters, handledFilters) =
+      DataSourceStrategy.selectFilters(FakeRelation(), predicates)
+    assert(unhandledPredicates.equals(predicates))
+    assert(pushedFilters.zipWithIndex.forall { case (f, i) =>
+      f.equals(sources.EqualTo("id", i + 1))
+    })
+    assert(handledFilters.isEmpty)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to