Repository: spark
Updated Branches:
  refs/heads/master 9634e17d0 -> 7791d0c3a


Revert "[SPARK-13668][SQL] Reorder filter/join predicates to short-circuit 
isNotNull checks"

This reverts commit e430614eae53c8864b31a1dc64db83e27100d1d9.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7791d0c3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7791d0c3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7791d0c3

Branch: refs/heads/master
Commit: 7791d0c3a9bdfe73e071266846f9ab1491fce50c
Parents: 9634e17
Author: Davies Liu <davies....@gmail.com>
Authored: Wed Mar 9 10:05:57 2016 -0800
Committer: Davies Liu <davies....@gmail.com>
Committed: Wed Mar 9 10:05:57 2016 -0800

----------------------------------------------------------------------
 .../sql/catalyst/planning/QueryPlanner.scala    |  24 +----
 .../spark/sql/execution/SparkStrategies.scala   |  37 +++----
 .../sql/execution/ReorderedPredicateSuite.scala | 103 -------------------
 3 files changed, 14 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7791d0c3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
index 1e4523e..56a3dd0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
@@ -18,8 +18,6 @@
 package org.apache.spark.sql.catalyst.planning
 
 import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.expressions.{And, Expression, IsNotNull, 
PredicateHelper}
-import org.apache.spark.sql.catalyst.plans
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.trees.TreeNode
 
@@ -28,28 +26,8 @@ import org.apache.spark.sql.catalyst.trees.TreeNode
  * be used for execution. If this strategy does not apply to the give logical 
operation then an
  * empty list should be returned.
  */
-abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]]
-  extends PredicateHelper with Logging {
-
+abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends 
Logging {
   def apply(plan: LogicalPlan): Seq[PhysicalPlan]
-
-  // Attempts to re-order the individual conjunctive predicates in an 
expression to short circuit
-  // the evaluation of relatively cheaper checks (e.g., checking for 
nullability) before others.
-  protected def reorderPredicates(expr: Expression): Expression = {
-    splitConjunctivePredicates(expr)
-      .sortWith((x, _) => x.isInstanceOf[IsNotNull])
-      .reduce(And)
-  }
-
-  // Wrapper around reorderPredicates(expr: Expression) to reorder optional 
conditions in joins
-  protected def reorderPredicates(exprOpt: Option[Expression]): 
Option[Expression] = {
-    exprOpt match {
-      case Some(expr) =>
-        Option(reorderPredicates(expr))
-      case None =>
-        exprOpt
-    }
-  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/7791d0c3/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 36fea4d..debd04a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -66,13 +66,11 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       case ExtractEquiJoinKeys(
              LeftSemi, leftKeys, rightKeys, condition, left, 
CanBroadcast(right)) =>
         joins.BroadcastLeftSemiJoinHash(
-          leftKeys, rightKeys, planLater(left), planLater(right),
-          reorderPredicates(condition)) :: Nil
+          leftKeys, rightKeys, planLater(left), planLater(right), condition) 
:: Nil
       // Find left semi joins where at least some predicates can be evaluated 
by matching join keys
       case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, 
right) =>
         joins.LeftSemiJoinHash(
-          leftKeys, rightKeys, planLater(left), planLater(right),
-          reorderPredicates(condition)) :: Nil
+          leftKeys, rightKeys, planLater(left), planLater(right), condition) 
:: Nil
       case _ => Nil
     }
   }
@@ -113,39 +111,33 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
 
       case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, 
CanBroadcast(right)) =>
         Seq(joins.BroadcastHashJoin(
-          leftKeys, rightKeys, Inner, BuildRight, reorderPredicates(condition),
-          planLater(left), planLater(right)))
+          leftKeys, rightKeys, Inner, BuildRight, condition, planLater(left), 
planLater(right)))
 
       case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, 
CanBroadcast(left), right) =>
         Seq(joins.BroadcastHashJoin(
-          leftKeys, rightKeys, Inner, BuildLeft, reorderPredicates(condition), 
planLater(left),
-          planLater(right)))
+          leftKeys, rightKeys, Inner, BuildLeft, condition, planLater(left), 
planLater(right)))
 
       case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, condition, left, 
right)
         if RowOrdering.isOrderable(leftKeys) =>
         joins.SortMergeJoin(
-          leftKeys, rightKeys, reorderPredicates(condition), planLater(left),
-          planLater(right)) :: Nil
+          leftKeys, rightKeys, condition, planLater(left), planLater(right)) 
:: Nil
 
       // --- Outer joins 
--------------------------------------------------------------------------
 
       case ExtractEquiJoinKeys(
           LeftOuter, leftKeys, rightKeys, condition, left, 
CanBroadcast(right)) =>
         Seq(joins.BroadcastHashJoin(
-          leftKeys, rightKeys, LeftOuter, BuildRight, 
reorderPredicates(condition),
-          planLater(left), planLater(right)))
+          leftKeys, rightKeys, LeftOuter, BuildRight, condition, 
planLater(left), planLater(right)))
 
       case ExtractEquiJoinKeys(
           RightOuter, leftKeys, rightKeys, condition, CanBroadcast(left), 
right) =>
         Seq(joins.BroadcastHashJoin(
-          leftKeys, rightKeys, RightOuter, BuildLeft, 
reorderPredicates(condition),
-          planLater(left), planLater(right)))
+          leftKeys, rightKeys, RightOuter, BuildLeft, condition, 
planLater(left), planLater(right)))
 
       case ExtractEquiJoinKeys(joinType, leftKeys, rightKeys, condition, left, 
right)
         if RowOrdering.isOrderable(leftKeys) =>
         joins.SortMergeOuterJoin(
-          leftKeys, rightKeys, joinType, reorderPredicates(condition), 
planLater(left),
-          planLater(right)) :: Nil
+          leftKeys, rightKeys, joinType, condition, planLater(left), 
planLater(right)) :: Nil
 
       // --- Cases where this strategy does not apply 
---------------------------------------------
 
@@ -260,12 +252,10 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case j @ logical.Join(CanBroadcast(left), right, Inner | RightOuter, 
condition) =>
         execution.joins.BroadcastNestedLoopJoin(
-          planLater(left), planLater(right), joins.BuildLeft, j.joinType,
-          reorderPredicates(condition)) :: Nil
+          planLater(left), planLater(right), joins.BuildLeft, j.joinType, 
condition) :: Nil
       case j @ logical.Join(left, CanBroadcast(right), Inner | LeftOuter | 
LeftSemi, condition) =>
         execution.joins.BroadcastNestedLoopJoin(
-          planLater(left), planLater(right), joins.BuildRight, j.joinType,
-          reorderPredicates(condition)) :: Nil
+          planLater(left), planLater(right), joins.BuildRight, j.joinType, 
condition) :: Nil
       case _ => Nil
     }
   }
@@ -275,7 +265,7 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       case logical.Join(left, right, Inner, None) =>
         execution.joins.CartesianProduct(planLater(left), planLater(right)) :: 
Nil
       case logical.Join(left, right, Inner, Some(condition)) =>
-        execution.Filter(reorderPredicates(condition),
+        execution.Filter(condition,
           execution.joins.CartesianProduct(planLater(left), planLater(right))) 
:: Nil
       case _ => Nil
     }
@@ -292,8 +282,7 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
           }
         // This join could be very slow or even hang forever
         joins.BroadcastNestedLoopJoin(
-          planLater(left), planLater(right), buildSide, joinType,
-          reorderPredicates(condition)) :: Nil
+          planLater(left), planLater(right), buildSide, joinType, condition) 
:: Nil
       case _ => Nil
     }
   }
@@ -352,7 +341,7 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       case logical.Project(projectList, child) =>
         execution.Project(projectList, planLater(child)) :: Nil
       case logical.Filter(condition, child) =>
-        execution.Filter(reorderPredicates(condition), planLater(child)) :: Nil
+        execution.Filter(condition, planLater(child)) :: Nil
       case e @ logical.Expand(_, _, child) =>
         execution.Expand(e.projections, e.output, planLater(child)) :: Nil
       case logical.Window(projectList, windowExprs, partitionSpec, orderSpec, 
child) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/7791d0c3/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala
deleted file mode 100644
index dd0e438..0000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReorderedPredicateSuite.scala
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-import org.apache.spark.sql.catalyst.expressions.{Expression, IsNotNull, 
PredicateHelper}
-import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.Join
-import org.apache.spark.sql.execution
-import org.apache.spark.sql.execution.joins.LeftSemiJoinHash
-import org.apache.spark.sql.test.SharedSQLContext
-
-
-class ReorderedPredicateSuite extends SharedSQLContext with PredicateHelper {
-
-  setupTestData()
-
-  // Verifies that (a) In the new condition, the IsNotNull operators precede 
rest of the operators
-  // and (b) The relative sort order of IsNotNull and !IsNotNull operators is 
still maintained
-  private def verifyStableOrder(before: Expression, after: Expression): Unit = 
{
-    val oldPredicates = splitConjunctivePredicates(before)
-    splitConjunctivePredicates(after).sliding(2).foreach { case Seq(x, y) =>
-      // Verify IsNotNull operator ordering
-      assert(x.isInstanceOf[IsNotNull] || !y.isInstanceOf[IsNotNull])
-
-      // Verify stable sort order
-      if ((x.isInstanceOf[IsNotNull] && y.isInstanceOf[IsNotNull]) ||
-        (!x.isInstanceOf[IsNotNull] && !y.isInstanceOf[IsNotNull])) {
-        assert(oldPredicates.indexOf(x) <= oldPredicates.indexOf(y))
-      }
-    }
-  }
-
-  test("null ordering in filter predicates") {
-    val query = sql(
-      """
-        |SELECT * from testData
-        |WHERE value != '5' AND value != '4' AND value IS NOT NULL AND key != 5
-      """.stripMargin)
-      .queryExecution
-
-    val logicalPlan = query.optimizedPlan
-    val physicalPlan = query.sparkPlan
-    assert(logicalPlan.find(_.isInstanceOf[logical.Filter]).isDefined)
-    assert(physicalPlan.find(_.isInstanceOf[execution.Filter]).isDefined)
-
-    val logicalCondition = logicalPlan.collect {
-      case logical.Filter(condition, _) =>
-        condition
-    }.head
-
-    val physicalCondition = physicalPlan.collect {
-      case Filter(condition, _) =>
-        condition
-    }.head
-
-    verifyStableOrder(logicalCondition, physicalCondition)
-  }
-
-  test("null ordering in join predicates") {
-    sqlContext.cacheManager.clearCache()
-    val query = sql(
-      """
-        |SELECT * FROM testData t1
-        |LEFT SEMI JOIN testData t2
-        |ON t1.key = t2.key
-        |AND t1.key + t2.key != 5
-        |AND CONCAT(t1.value, t2.value) IS NOT NULL
-      """.stripMargin)
-      .queryExecution
-
-    val logicalPlan = query.optimizedPlan
-    val physicalPlan = query.sparkPlan
-    assert(logicalPlan.find(_.isInstanceOf[Join]).isDefined)
-    assert(physicalPlan.find(_.isInstanceOf[LeftSemiJoinHash]).isDefined)
-
-    val logicalCondition = logicalPlan.collect {
-      case Join(_, _, _, condition) =>
-        condition.get
-    }.head
-
-    val physicalCondition = physicalPlan.collect {
-      case LeftSemiJoinHash(_, _, _, _, conditionOpt) =>
-        conditionOpt.get
-    }.head
-
-    verifyStableOrder(logicalCondition, physicalCondition)
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to