This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c0dc38  [SPARK-28654][SQL] Move "Extract Python UDFs" to the last in 
optimizer
8c0dc38 is described below

commit 8c0dc386401060feedffb40389c37ea74e903faa
Author: HyukjinKwon <gurwls...@apache.org>
AuthorDate: Thu Aug 8 20:21:07 2019 +0800

    [SPARK-28654][SQL] Move "Extract Python UDFs" to the last in optimizer
    
    ## What changes were proposed in this pull request?
    
    Plans after "Extract Python UDFs" are very flaky and error-prone to other 
rules.
    
    For instance, if we add some rules, for instance, `PushDownPredicates` in 
`postHocOptimizationBatches`, the test in `BatchEvalPythonExecSuite` fails:
    
    ```scala
    test("Python UDF refers to the attributes from more than one child") {
      val df = Seq(("Hello", 4)).toDF("a", "b")
      val df2 = Seq(("Hello", 4)).toDF("c", "d")
      val joinDF = df.crossJoin(df2).where("dummyPythonUDF(a, c) == 
dummyPythonUDF(d, c)")
      val qualifiedPlanNodes = joinDF.queryExecution.executedPlan.collect {
        case b: BatchEvalPythonExec => b
      }
      assert(qualifiedPlanNodes.size == 1)
    }
    ```
    
    ```
    Invalid PythonUDF dummyUDF(a#63, c#74), requires attributes from more than 
one child.
    ```
    
    This is because Python UDF extraction optimization is rolled back as below:
    
    ```
    === Applying Rule 
org.apache.spark.sql.catalyst.optimizer.PushDownPredicates ===
    !Filter (dummyUDF(a#7, c#18) = dummyUDF(d#19, c#18))   Join Cross, 
(dummyUDF(a#7, c#18) = dummyUDF(d#19, c#18))
    !+- Join Cross                                         :- Project [_1#2 AS 
a#7, _2#3 AS b#8]
    !   :- Project [_1#2 AS a#7, _2#3 AS b#8]              :  +- LocalRelation 
[_1#2, _2#3]
    !   :  +- LocalRelation [_1#2, _2#3]                   +- Project [_1#13 AS 
c#18, _2#14 AS d#19]
    !   +- Project [_1#13 AS c#18, _2#14 AS d#19]             +- LocalRelation 
[_1#13, _2#14]
    !      +- LocalRelation [_1#13, _2#14]
    ```
    
    Seems we should do Python UDFs cases at the last even after post hoc rules.
    
    Note that this actually rather follows the way in previous versions when 
those were in physical plans (see SPARK-24721 and SPARK-12981). Those 
optimization rules were supposed to be placed at the end.
    
    Note that I intentionally didn't move `ExperimentalMethods` 
(`spark.experimental.extraStrategies`). This is an explicit experimental API 
and I wanted to just-in-case workaround after this change for now.
    
    ## How was this patch tested?
    
    Existing tests should cover.
    
    Closes #25386 from HyukjinKwon/SPARK-28654.
    
    Authored-by: HyukjinKwon <gurwls...@apache.org>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../scala/org/apache/spark/sql/execution/SparkOptimizer.scala     | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index a1135f7..98060e9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -31,6 +31,9 @@ class SparkOptimizer(
 
   override def defaultBatches: Seq[Batch] = (preOptimizationBatches ++ 
super.defaultBatches :+
     Batch("Optimize Metadata Only Query", Once, 
OptimizeMetadataOnlyQuery(catalog)) :+
+    Batch("Prune File Source Table Partitions", Once, 
PruneFileSourcePartitions) :+
+    Batch("Schema Pruning", Once, SchemaPruning)) ++
+    postHocOptimizationBatches :+
     Batch("Extract Python UDFs", Once,
       ExtractPythonUDFFromJoinCondition,
       // `ExtractPythonUDFFromJoinCondition` can convert a join to a cartesian 
product.
@@ -45,9 +48,6 @@ class SparkOptimizer(
       ColumnPruning,
       PushPredicateThroughNonJoin,
       RemoveNoopOperators) :+
-    Batch("Prune File Source Table Partitions", Once, 
PruneFileSourcePartitions) :+
-    Batch("Schema Pruning", Once, SchemaPruning)) ++
-    postHocOptimizationBatches :+
     Batch("User Provided Optimizers", fixedPoint, 
experimentalMethods.extraOptimizations: _*)
 
   override def nonExcludableRules: Seq[String] = super.nonExcludableRules :+
@@ -65,6 +65,8 @@ class SparkOptimizer(
    * Optimization batches that are executed after the regular optimization 
batches, but before the
    * batch executing the [[ExperimentalMethods]] optimizer rules. This hook 
can be used to add
    * custom optimizer batches to the Spark optimizer.
+   *
+   * Note that 'Extract Python UDFs' batch is an exception and ran after the 
batches defined here.
    */
    def postHocOptimizationBatches: Seq[Batch] = Nil
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to