This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ee3c1c7  [SPARK-28375][SQL] Make pullupCorrelatedPredicate idempotent
ee3c1c7 is described below

commit ee3c1c777ddb8034d62213a5d8e064b97cc067e5
Author: Dilip Biswal <dbis...@us.ibm.com>
AuthorDate: Tue Jul 30 16:29:24 2019 -0700

    [SPARK-28375][SQL] Make pullupCorrelatedPredicate idempotent
    
    ## What changes were proposed in this pull request?
    
    This PR makes the optimizer rule PullupCorrelatedPredicates idempotent.
    ## How was this patch tested?
    
    A new test PullupCorrelatedPredicatesSuite
    
    Closes #25268 from dilipbiswal/pr-25164.
    
    Authored-by: Dilip Biswal <dbis...@us.ibm.com>
    Signed-off-by: gatorsmile <gatorsm...@gmail.com>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  4 +-
 .../spark/sql/catalyst/optimizer/subquery.scala    | 20 +++++--
 .../PullupCorrelatedPredicatesSuite.scala          | 62 +++++++++++++++++++---
 3 files changed, 72 insertions(+), 14 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 670fc92..346b2e6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -48,9 +48,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
   }
 
   override protected val blacklistedOnceBatches: Set[String] =
-    Set("Pullup Correlated Expressions",
-      "Extract Python UDFs"
-    )
+    Set("Extract Python UDFs")
 
   protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations)
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
index 4f7333c..32dbd389 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala
@@ -273,16 +273,28 @@ object PullupCorrelatedPredicates extends 
Rule[LogicalPlan] with PredicateHelper
   }
 
   private def rewriteSubQueries(plan: LogicalPlan, outerPlans: 
Seq[LogicalPlan]): LogicalPlan = {
+    /**
+     * This function is used as a aid to enforce idempotency of 
pullUpCorrelatedPredicate rule.
+     * In the first call to rewriteSubqueries, all the outer references from 
the subplan are
+     * pulled up and join predicates are recorded as children of the enclosing 
subquery expression.
+     * The subsequent call to rewriteSubqueries would simply re-records the 
`children` which would
+     * contains the pulled up correlated predicates (from the previous call) 
in the enclosing
+     * subquery expression.
+     */
+    def getJoinCondition(newCond: Seq[Expression], oldCond: Seq[Expression]): 
Seq[Expression] = {
+      if (newCond.isEmpty) oldCond else newCond
+    }
+
     plan transformExpressions {
       case ScalarSubquery(sub, children, exprId) if children.nonEmpty =>
         val (newPlan, newCond) = pullOutCorrelatedPredicates(sub, outerPlans)
-        ScalarSubquery(newPlan, newCond, exprId)
+        ScalarSubquery(newPlan, getJoinCondition(newCond, children), exprId)
       case Exists(sub, children, exprId) if children.nonEmpty =>
         val (newPlan, newCond) = pullOutCorrelatedPredicates(sub, outerPlans)
-        Exists(newPlan, newCond, exprId)
-      case ListQuery(sub, _, exprId, childOutputs) =>
+        Exists(newPlan, getJoinCondition(newCond, children), exprId)
+      case ListQuery(sub, children, exprId, childOutputs) if children.nonEmpty 
=>
         val (newPlan, newCond) = pullOutCorrelatedPredicates(sub, outerPlans)
-        ListQuery(newPlan, newCond, exprId, childOutputs)
+        ListQuery(newPlan, getJoinCondition(newCond, children), exprId, 
childOutputs)
     }
   }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala
index 960162a..2d86d5a 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala
@@ -19,9 +19,9 @@ package org.apache.spark.sql.catalyst.optimizer
 
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{InSubquery, ListQuery}
+import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.PlanTest
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, 
LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 
 class PullupCorrelatedPredicatesSuite extends PlanTest {
@@ -38,17 +38,65 @@ class PullupCorrelatedPredicatesSuite extends PlanTest {
   val testRelation2 = LocalRelation('c.int, 'd.double)
 
   test("PullupCorrelatedPredicates should not produce unresolved plan") {
-    val correlatedSubquery =
+    val subPlan =
       testRelation2
         .where('b < 'd)
         .select('c)
-    val outerQuery =
+    val inSubquery =
       testRelation
-        .where(InSubquery(Seq('a), ListQuery(correlatedSubquery)))
+        .where(InSubquery(Seq('a), ListQuery(subPlan)))
         .select('a).analyze
-    assert(outerQuery.resolved)
+    assert(inSubquery.resolved)
 
-    val optimized = Optimize.execute(outerQuery)
+    val optimized = Optimize.execute(inSubquery)
     assert(optimized.resolved)
   }
+
+  test("PullupCorrelatedPredicates in correlated subquery idempotency check") {
+    val subPlan =
+      testRelation2
+      .where('b < 'd)
+      .select('c)
+    val inSubquery =
+      testRelation
+      .where(InSubquery(Seq('a), ListQuery(subPlan)))
+      .select('a).analyze
+    assert(inSubquery.resolved)
+
+    val optimized = Optimize.execute(inSubquery)
+    val doubleOptimized = Optimize.execute(optimized)
+    comparePlans(optimized, doubleOptimized)
+  }
+
+  test("PullupCorrelatedPredicates exists correlated subquery idempotency 
check") {
+    val subPlan =
+      testRelation2
+        .where('b === 'd && 'd === 1)
+        .select(Literal(1))
+    val existsSubquery =
+      testRelation
+        .where(Exists(subPlan))
+        .select('a).analyze
+    assert(existsSubquery.resolved)
+
+    val optimized = Optimize.execute(existsSubquery)
+    val doubleOptimized = Optimize.execute(optimized)
+    comparePlans(optimized, doubleOptimized)
+  }
+
+  test("PullupCorrelatedPredicates scalar correlated subquery idempotency 
check") {
+    val subPlan =
+      testRelation2
+        .where('b === 'd && 'd === 1)
+        .select(max('d))
+    val scalarSubquery =
+      testRelation
+        .where(ScalarSubquery(subPlan))
+        .select('a).analyze
+    assert(scalarSubquery.resolved)
+
+    val optimized = Optimize.execute(scalarSubquery)
+    val doubleOptimized = Optimize.execute(optimized)
+    comparePlans(optimized, doubleOptimized, false)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to