This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 74f1176  [SPARK-27815][SQL] Predicate pushdown in one pass for 
cascading joins
74f1176 is described below

commit 74f1176311676145d5d8669daacf67e5308f68b5
Author: Yesheng Ma <kimi.y...@gmail.com>
AuthorDate: Wed Jul 3 09:01:16 2019 -0700

    [SPARK-27815][SQL] Predicate pushdown in one pass for cascading joins
    
    ## What changes were proposed in this pull request?
    
    This PR makes the predicate pushdown logic in catalyst optimizer more 
efficient by unifying two existing rules `PushdownPredicates` and 
`PushPredicateThroughJoin`. Previously pushing down a predicate for queries 
such as `Filter(Join(Join(Join)))` requires n steps. This patch essentially 
reduces this to a single pass.
    
    To make this actually work, we need to unify a few rules such as 
`CombineFilters`, `PushDownPredicate` and `PushDownPrdicateThroughJoin`. 
Otherwise cases such as `Filter(Join(Filter(Join)))` still requires several 
passes to fully push down predicates. This unification is done by composing 
several partial functions, which makes a minimal code change and can reuse 
existing UTs.
    
    Results show that this optimization can improve the catalyst optimization 
time by 16.5%. For queries with more joins, the performance is even better. 
E.g., for TPC-DS q64, the performance boost is 49.2%.
    
    ## How was this patch tested?
    Existing UTs + new a UT for the new rule.
    
    Closes #24956 from yeshengm/fixed-point-opt.
    
    Authored-by: Yesheng Ma <kimi.y...@gmail.com>
    Signed-off-by: gatorsmile <gatorsm...@gmail.com>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  30 +++-
 .../optimizer/PushDownLeftSemiAntiJoin.scala       |  10 +-
 .../catalyst/optimizer/ColumnPruningSuite.scala    |   2 +-
 .../optimizer/FilterPushdownOnePassSuite.scala     | 183 +++++++++++++++++++++
 .../catalyst/optimizer/FilterPushdownSuite.scala   |   2 +-
 .../InferFiltersFromConstraintsSuite.scala         |   2 +-
 .../catalyst/optimizer/JoinOptimizationSuite.scala |   2 +-
 .../sql/catalyst/optimizer/JoinReorderSuite.scala  |   2 +-
 .../optimizer/LeftSemiAntiJoinPushDownSuite.scala  |   2 +-
 .../catalyst/optimizer/OptimizerLoggingSuite.scala |  12 +-
 .../optimizer/OptimizerRuleExclusionSuite.scala    |   6 +-
 .../optimizer/PropagateEmptyRelationSuite.scala    |   4 +-
 .../sql/catalyst/optimizer/PruneFiltersSuite.scala |   2 +-
 .../sql/catalyst/optimizer/SetOperationSuite.scala |   2 +-
 .../optimizer/StarJoinCostBasedReorderSuite.scala  |   2 +-
 .../catalyst/optimizer/StarJoinReorderSuite.scala  |   2 +-
 .../spark/sql/execution/SparkOptimizer.scala       |   4 +-
 17 files changed, 235 insertions(+), 34 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 17b4ff7..c99d2c0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -63,8 +63,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
         PushProjectionThroughUnion,
         ReorderJoin,
         EliminateOuterJoin,
-        PushPredicateThroughJoin,
-        PushDownPredicate,
+        PushDownPredicates,
         PushDownLeftSemiAntiJoin,
         PushLeftSemiLeftAntiThroughJoin,
         LimitPushDown,
@@ -911,7 +910,9 @@ object CombineUnions extends Rule[LogicalPlan] {
  * one conjunctive predicate.
  */
 object CombineFilters extends Rule[LogicalPlan] with PredicateHelper {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally
+
+  val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
     // The query execution/optimization does not guarantee the expressions are 
evaluated in order.
     // We only can combine them if and only if both are deterministic.
     case Filter(fc, nf @ Filter(nc, grandChild)) if fc.deterministic && 
nc.deterministic =>
@@ -997,14 +998,29 @@ object PruneFilters extends Rule[LogicalPlan] with 
PredicateHelper {
 }
 
 /**
+ * The unified version for predicate pushdown of normal operators and joins.
+ * This rule improves performance of predicate pushdown for cascading joins 
such as:
+ *  Filter-Join-Join-Join. Most predicates can be pushed down in a single pass.
+ */
+object PushDownPredicates extends Rule[LogicalPlan] with PredicateHelper {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    CombineFilters.applyLocally
+      .orElse(PushPredicateThroughNonJoin.applyLocally)
+      .orElse(PushPredicateThroughJoin.applyLocally)
+  }
+}
+
+/**
  * Pushes [[Filter]] operators through many operators iff:
  * 1) the operator is deterministic
  * 2) the predicate is deterministic and the operator will not change any of 
rows.
  *
  * This heuristic is valid assuming the expression evaluation cost is minimal.
  */
-object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+object PushPredicateThroughNonJoin extends Rule[LogicalPlan] with 
PredicateHelper {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally
+
+  val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
     // SPARK-13473: We can't push the predicate down when the underlying 
projection output non-
     // deterministic field(s).  Non-deterministic expressions are essentially 
stateful. This
     // implies that, for a given input row, the output are determined by the 
expression's initial
@@ -1221,7 +1237,9 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] 
with PredicateHelper {
     (leftEvaluateCondition, rightEvaluateCondition, commonCondition ++ 
nonDeterministic)
   }
 
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform applyLocally
+
+  val applyLocally: PartialFunction[LogicalPlan, LogicalPlan] = {
     // push the where condition down into join filter
     case f @ Filter(filterCondition, Join(left, right, joinType, 
joinCondition, hint)) =>
       val (leftFilterConditions, rightFilterConditions, commonFilterCondition) 
=
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala
index 0c38900..606db85 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushDownLeftSemiAntiJoin.scala
@@ -23,13 +23,13 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 
 /**
- * This rule is a variant of [[PushDownPredicate]] which can handle
+ * This rule is a variant of [[PushPredicateThroughNonJoin]] which can handle
  * pushing down Left semi and Left Anti joins below the following operators.
  *  1) Project
  *  2) Window
  *  3) Union
  *  4) Aggregate
- *  5) Other permissible unary operators. please see 
[[PushDownPredicate.canPushThrough]].
+ *  5) Other permissible unary operators. please see 
[[PushPredicateThroughNonJoin.canPushThrough]].
  */
 object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] with PredicateHelper 
{
   def apply(plan: LogicalPlan): LogicalPlan = plan transform {
@@ -42,7 +42,7 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] 
with PredicateHelper {
         // No join condition, just push down the Join below Project
         p.copy(child = Join(gChild, rightOp, joinType, joinCond, hint))
       } else {
-        val aliasMap = PushDownPredicate.getAliasMap(p)
+        val aliasMap = PushPredicateThroughNonJoin.getAliasMap(p)
         val newJoinCond = if (aliasMap.nonEmpty) {
           Option(replaceAlias(joinCond.get, aliasMap))
         } else {
@@ -55,7 +55,7 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] 
with PredicateHelper {
     case join @ Join(agg: Aggregate, rightOp, LeftSemiOrAnti(_), _, _)
         if agg.aggregateExpressions.forall(_.deterministic) && 
agg.groupingExpressions.nonEmpty &&
         
!agg.aggregateExpressions.exists(ScalarSubquery.hasCorrelatedScalarSubquery) =>
-      val aliasMap = PushDownPredicate.getAliasMap(agg)
+      val aliasMap = PushPredicateThroughNonJoin.getAliasMap(agg)
       val canPushDownPredicate = (predicate: Expression) => {
         val replaced = replaceAlias(predicate, aliasMap)
         predicate.references.nonEmpty &&
@@ -94,7 +94,7 @@ object PushDownLeftSemiAntiJoin extends Rule[LogicalPlan] 
with PredicateHelper {
 
     // LeftSemi/LeftAnti over UnaryNode
     case join @ Join(u: UnaryNode, rightOp, LeftSemiOrAnti(_), _, _)
-        if PushDownPredicate.canPushThrough(u) && 
u.expressions.forall(_.deterministic) =>
+        if PushPredicateThroughNonJoin.canPushThrough(u) && 
u.expressions.forall(_.deterministic) =>
       val validAttrs = u.child.outputSet ++ rightOp.outputSet
       pushDownJoin(join, _.references.subsetOf(validAttrs), _.reduce(And))
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
index b738f30..78ae131 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala
@@ -32,7 +32,7 @@ class ColumnPruningSuite extends PlanTest {
 
   object Optimize extends RuleExecutor[LogicalPlan] {
     val batches = Batch("Column pruning", FixedPoint(100),
-      PushDownPredicate,
+      PushPredicateThroughNonJoin,
       ColumnPruning,
       RemoveNoopOperators,
       CollapseProject) :: Nil
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownOnePassSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownOnePassSuite.scala
new file mode 100644
index 0000000..6f1280c
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownOnePassSuite.scala
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules._
+
+/**
+ * This test suite ensures that the [[PushDownPredicates]] actually does 
predicate pushdown in
+ * an efficient manner. This is enforced by asserting that a single predicate 
pushdown can push
+ * all predicate to bottom as much as possible.
+ */
+class FilterPushdownOnePassSuite extends PlanTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("Subqueries", Once,
+        EliminateSubqueryAliases) ::
+      // this batch must reach expected state in one pass
+      Batch("Filter Pushdown One Pass", Once,
+        ReorderJoin,
+        PushDownPredicates
+      ) :: Nil
+  }
+
+  val testRelation1 = LocalRelation('a.int, 'b.int, 'c.int)
+  val testRelation2 = LocalRelation('a.int, 'd.int, 'e.int)
+
+  test("really simple predicate push down") {
+    val x = testRelation1.subquery('x)
+    val y = testRelation2.subquery('y)
+
+    val originalQuery = x.join(y).where("x.a".attr === 1)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val correctAnswer = x.where("x.a".attr === 1).join(y).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("push down conjunctive predicates") {
+    val x = testRelation1.subquery('x)
+    val y = testRelation2.subquery('y)
+
+    val originalQuery = x.join(y).where("x.a".attr === 1 && "y.d".attr < 1)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val correctAnswer = x.where("x.a".attr === 1).join(y.where("y.d".attr < 
1)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("push down predicates for simple joins") {
+    val x = testRelation1.subquery('x)
+    val y = testRelation2.subquery('y)
+
+    val originalQuery =
+      x.where("x.c".attr < 0)
+        .join(y.where("y.d".attr > 1))
+        .where("x.a".attr === 1 && "y.d".attr < 2)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val correctAnswer =
+      x.where("x.c".attr < 0 && "x.a".attr === 1)
+        .join(y.where("y.d".attr > 1 && "y.d".attr < 2)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("push down top-level filters for cascading joins") {
+    val x = testRelation1.subquery('x)
+    val y = testRelation2.subquery('y)
+
+    val originalQuery =
+      y.join(x).join(x).join(x).join(x).join(x).where("y.d".attr === 0)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val correctAnswer = y.where("y.d".attr === 
0).join(x).join(x).join(x).join(x).join(x).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("push down predicates for tree-like joins") {
+    val x = testRelation1.subquery('x)
+    val y1 = testRelation2.subquery('y1)
+    val y2 = testRelation2.subquery('y2)
+
+    val originalQuery =
+      y1.join(x).join(x)
+        .join(y2.join(x).join(x))
+        .where("y1.d".attr === 0 && "y2.d".attr === 3)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val correctAnswer =
+      y1.where("y1.d".attr === 0).join(x).join(x)
+        .join(y2.where("y2.d".attr === 3).join(x).join(x)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("push down through join and project") {
+    val x = testRelation1.subquery('x)
+    val y = testRelation2.subquery('y)
+
+    val originalQuery =
+      x.where('a > 0).select('a, 'b)
+        .join(y.where('d < 100).select('e))
+        .where("x.a".attr < 100)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val correctAnswer =
+      x.where('a > 0 && 'a < 100).select('a, 'b)
+        .join(y.where('d < 100).select('e)).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("push down through deep projects") {
+    val x = testRelation1.subquery('x)
+
+    val originalQuery =
+      x.select(('a + 1) as 'a1, 'b)
+        .select(('a1 + 1) as 'a2, 'b)
+        .select(('a2 + 1) as 'a3, 'b)
+        .select(('a3 + 1) as 'a4, 'b)
+        .select('b)
+        .where('b > 0)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val correctAnswer =
+      x.where('b > 0)
+        .select(('a + 1) as 'a1, 'b)
+        .select(('a1 + 1) as 'a2, 'b)
+        .select(('a2 + 1) as 'a3, 'b)
+        .select(('a3 + 1) as 'a4, 'b)
+        .select('b).analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+
+  test("push down through aggregate and join") {
+    val x = testRelation1.subquery('x)
+    val y = testRelation2.subquery('y)
+
+    val left = x
+      .where('c > 0)
+      .groupBy('a)('a, count('b))
+      .subquery('left)
+    val right = y
+      .where('d < 0)
+      .groupBy('a)('a, count('d))
+      .subquery('right)
+    val originalQuery = left
+      .join(right).where("left.a".attr < 100 && "right.a".attr < 100)
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val correctAnswer =
+      x.where('c > 0 && 'a < 100).groupBy('a)('a, count('b))
+        .join(y.where('d < 0 && 'a < 100).groupBy('a)('a, count('d)))
+        .analyze
+
+    comparePlans(optimized, correctAnswer)
+  }
+}
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index cf4e9fc..2db4667f 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -35,7 +35,7 @@ class FilterPushdownSuite extends PlanTest {
         EliminateSubqueryAliases) ::
       Batch("Filter Pushdown", FixedPoint(10),
         CombineFilters,
-        PushDownPredicate,
+        PushPredicateThroughNonJoin,
         BooleanSimplification,
         PushPredicateThroughJoin,
         CollapseProject) :: Nil
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
index a40ba2d..974bc78 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala
@@ -31,7 +31,7 @@ class InferFiltersFromConstraintsSuite extends PlanTest {
     val batches =
       Batch("InferAndPushDownFilters", FixedPoint(100),
         PushPredicateThroughJoin,
-        PushDownPredicate,
+        PushPredicateThroughNonJoin,
         InferFiltersFromConstraints,
         CombineFilters,
         SimplifyBinaryComparison,
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
index c570643..0f93305 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
@@ -34,7 +34,7 @@ class JoinOptimizationSuite extends PlanTest {
         EliminateSubqueryAliases) ::
       Batch("Filter Pushdown", FixedPoint(100),
         CombineFilters,
-        PushDownPredicate,
+        PushPredicateThroughNonJoin,
         BooleanSimplification,
         ReorderJoin,
         PushPredicateThroughJoin,
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
index 18516ee..43e5bad 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
@@ -35,7 +35,7 @@ class JoinReorderSuite extends PlanTest with 
StatsEstimationTestBase {
         EliminateResolvedHint) ::
       Batch("Operator Optimizations", FixedPoint(100),
         CombineFilters,
-        PushDownPredicate,
+        PushPredicateThroughNonJoin,
         ReorderJoin,
         PushPredicateThroughJoin,
         ColumnPruning,
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala
index 00709ad..f6d1898 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LeftSemiAntiJoinPushDownSuite.scala
@@ -35,7 +35,7 @@ class LeftSemiPushdownSuite extends PlanTest {
         EliminateSubqueryAliases) ::
       Batch("Filter Pushdown", FixedPoint(10),
         CombineFilters,
-        PushDownPredicate,
+        PushPredicateThroughNonJoin,
         PushDownLeftSemiAntiJoin,
         PushLeftSemiLeftAntiThroughJoin,
         BooleanSimplification,
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala
index dd7e29d..7a432d2 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerLoggingSuite.scala
@@ -34,7 +34,7 @@ class OptimizerLoggingSuite extends PlanTest {
   object Optimize extends RuleExecutor[LogicalPlan] {
     val batches =
       Batch("Optimizer Batch", FixedPoint(100),
-        PushDownPredicate, ColumnPruning, CollapseProject) ::
+        PushPredicateThroughNonJoin, ColumnPruning, CollapseProject) ::
       Batch("Batch Has No Effect", Once,
         ColumnPruning) :: Nil
   }
@@ -99,7 +99,7 @@ class OptimizerLoggingSuite extends PlanTest {
         verifyLog(
           level._2,
           Seq(
-            PushDownPredicate.ruleName,
+            PushPredicateThroughNonJoin.ruleName,
             ColumnPruning.ruleName,
             CollapseProject.ruleName))
       }
@@ -123,15 +123,15 @@ class OptimizerLoggingSuite extends PlanTest {
 
   test("test log rules") {
     val rulesSeq = Seq(
-      Seq(PushDownPredicate.ruleName,
+      Seq(PushPredicateThroughNonJoin.ruleName,
         ColumnPruning.ruleName,
         CollapseProject.ruleName).reduce(_ + "," + _) ->
-        Seq(PushDownPredicate.ruleName,
+        Seq(PushPredicateThroughNonJoin.ruleName,
           ColumnPruning.ruleName,
           CollapseProject.ruleName),
-      Seq(PushDownPredicate.ruleName,
+      Seq(PushPredicateThroughNonJoin.ruleName,
         ColumnPruning.ruleName).reduce(_ + "," + _) ->
-        Seq(PushDownPredicate.ruleName,
+        Seq(PushPredicateThroughNonJoin.ruleName,
           ColumnPruning.ruleName),
       CollapseProject.ruleName ->
         Seq(CollapseProject.ruleName),
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala
index 7587776..2a87803 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerRuleExclusionSuite.scala
@@ -96,21 +96,21 @@ class OptimizerRuleExclusionSuite extends PlanTest {
     val optimizer = new SimpleTestOptimizer() {
       override def defaultBatches: Seq[Batch] =
         Batch("push", Once,
-          PushDownPredicate,
+          PushPredicateThroughNonJoin,
           PushPredicateThroughJoin,
           PushProjectionThroughUnion) ::
         Batch("pull", Once,
           PullupCorrelatedPredicates) :: Nil
 
       override def nonExcludableRules: Seq[String] =
-        PushDownPredicate.ruleName ::
+        PushPredicateThroughNonJoin.ruleName ::
           PullupCorrelatedPredicates.ruleName :: Nil
     }
 
     verifyExcludedRules(
       optimizer,
       Seq(
-        PushDownPredicate.ruleName,
+        PushPredicateThroughNonJoin.ruleName,
         PushProjectionThroughUnion.ruleName,
         PullupCorrelatedPredicates.ruleName))
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
index d395bba..9c7d4c7 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PropagateEmptyRelationSuite.scala
@@ -35,7 +35,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
         ReplaceDistinctWithAggregate,
         ReplaceExceptWithAntiJoin,
         ReplaceIntersectWithSemiJoin,
-        PushDownPredicate,
+        PushPredicateThroughNonJoin,
         PruneFilters,
         PropagateEmptyRelation,
         CollapseProject) :: Nil
@@ -48,7 +48,7 @@ class PropagateEmptyRelationSuite extends PlanTest {
         ReplaceDistinctWithAggregate,
         ReplaceExceptWithAntiJoin,
         ReplaceIntersectWithSemiJoin,
-        PushDownPredicate,
+        PushPredicateThroughNonJoin,
         PruneFilters,
         CollapseProject) :: Nil
   }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
index 6d1a05f..526a5b0 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PruneFiltersSuite.scala
@@ -35,7 +35,7 @@ class PruneFiltersSuite extends PlanTest {
       Batch("Filter Pushdown and Pruning", Once,
         CombineFilters,
         PruneFilters,
-        PushDownPredicate,
+        PushPredicateThroughNonJoin,
         PushPredicateThroughJoin) :: Nil
   }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
index 3d3e361..ccc30b1 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationSuite.scala
@@ -34,7 +34,7 @@ class SetOperationSuite extends PlanTest {
       Batch("Union Pushdown", FixedPoint(5),
         CombineUnions,
         PushProjectionThroughUnion,
-        PushDownPredicate,
+        PushPredicateThroughNonJoin,
         PruneFilters) :: Nil
   }
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
index baae934..f8c48d5 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
@@ -33,7 +33,7 @@ class StarJoinCostBasedReorderSuite extends PlanTest with 
StatsEstimationTestBas
     val batches =
       Batch("Operator Optimizations", FixedPoint(100),
         CombineFilters,
-        PushDownPredicate,
+        PushPredicateThroughNonJoin,
         ReorderJoin,
         PushPredicateThroughJoin,
         ColumnPruning,
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
index 9dc653b..10e970d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
@@ -52,7 +52,7 @@ class StarJoinReorderSuite extends PlanTest with 
StatsEstimationTestBase {
     val batches =
       Batch("Operator Optimizations", FixedPoint(100),
         CombineFilters,
-        PushDownPredicate,
+        PushPredicateThroughNonJoin,
         ReorderJoin,
         PushPredicateThroughJoin,
         ColumnPruning,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
index c35e5de..4ae2194 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.sql.ExperimentalMethods
 import org.apache.spark.sql.catalyst.catalog.SessionCatalog
-import org.apache.spark.sql.catalyst.optimizer.{ColumnPruning, Optimizer, 
PushDownPredicate, RemoveNoopOperators}
+import org.apache.spark.sql.catalyst.optimizer.{ColumnPruning, Optimizer, 
PushPredicateThroughNonJoin, RemoveNoopOperators}
 import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions
 import org.apache.spark.sql.execution.datasources.SchemaPruning
 import org.apache.spark.sql.execution.python.{ExtractPythonUDFFromAggregate, 
ExtractPythonUDFs}
@@ -37,7 +37,7 @@ class SparkOptimizer(
       // The eval-python node may be between Project/Filter and the scan node, 
which breaks
       // column pruning and filter push-down. Here we rerun the related 
optimizer rules.
       ColumnPruning,
-      PushDownPredicate,
+      PushPredicateThroughNonJoin,
       RemoveNoopOperators) :+
     Batch("Prune File Source Table Partitions", Once, 
PruneFileSourcePartitions) :+
     Batch("Schema Pruning", Once, SchemaPruning)) ++


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to