Repository: spark
Updated Branches:
  refs/heads/master f37398699 -> d563c8fa0


Revert "[SPARK-13383][SQL] Keep broadcast hint after column pruning"

This reverts commit f3739869973ba4285196a61775d891292b8e282b.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/382b27ba
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/382b27ba
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/382b27ba

Branch: refs/heads/master
Commit: 382b27babf7771b724f7abff78195a858631d138
Parents: f373986
Author: Reynold Xin <r...@databricks.com>
Authored: Wed Feb 24 11:58:12 2016 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Feb 24 11:58:12 2016 -0800

----------------------------------------------------------------------
 .../catalyst/plans/logical/basicOperators.scala |   4 -
 .../optimizer/JoinOptimizationSuite.scala       | 122 -------------------
 .../sql/catalyst/optimizer/JoinOrderSuite.scala |  95 +++++++++++++++
 .../spark/sql/execution/SparkStrategies.scala   |  12 +-
 4 files changed, 100 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/382b27ba/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 5d2a65b..af43cb3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -332,10 +332,6 @@ case class Join(
  */
 case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
   override def output: Seq[Attribute] = child.output
-
-  // We manually set statistics of BroadcastHint to smallest value to make sure
-  // the plan wrapped by BroadcastHint will be considered to broadcast later.
-  override def statistics: Statistics = Statistics(sizeInBytes = 1)
 }
 
 case class InsertIntoTable(

http://git-wip-us.apache.org/repos/asf/spark/blob/382b27ba/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
deleted file mode 100644
index d482519..0000000
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.optimizer
-
-import org.apache.spark.sql.catalyst.analysis
-import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.dsl.expressions._
-import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.Expression
-import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
-import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
-import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.rules.RuleExecutor
-
-
-class JoinOptimizationSuite extends PlanTest {
-
-  object Optimize extends RuleExecutor[LogicalPlan] {
-    val batches =
-      Batch("Subqueries", Once,
-        EliminateSubqueryAliases) ::
-      Batch("Filter Pushdown", FixedPoint(100),
-        CombineFilters,
-        PushPredicateThroughProject,
-        BooleanSimplification,
-        ReorderJoin,
-        PushPredicateThroughJoin,
-        PushPredicateThroughGenerate,
-        PushPredicateThroughAggregate,
-        ColumnPruning,
-        CollapseProject) :: Nil
-
-  }
-
-  val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
-  val testRelation1 = LocalRelation('d.int)
-
-  test("extract filters and joins") {
-    val x = testRelation.subquery('x)
-    val y = testRelation1.subquery('y)
-    val z = testRelation.subquery('z)
-
-    def testExtract(plan: LogicalPlan, expected: Option[(Seq[LogicalPlan], 
Seq[Expression])]) {
-      assert(ExtractFiltersAndInnerJoins.unapply(plan) === expected)
-    }
-
-    testExtract(x, None)
-    testExtract(x.where("x.b".attr === 1), None)
-    testExtract(x.join(y), Some(Seq(x, y), Seq()))
-    testExtract(x.join(y, condition = Some("x.b".attr === "y.d".attr)),
-      Some(Seq(x, y), Seq("x.b".attr === "y.d".attr)))
-    testExtract(x.join(y).where("x.b".attr === "y.d".attr),
-      Some(Seq(x, y), Seq("x.b".attr === "y.d".attr)))
-    testExtract(x.join(y).join(z), Some(Seq(x, y, z), Seq()))
-    testExtract(x.join(y).where("x.b".attr === "y.d".attr).join(z),
-      Some(Seq(x, y, z), Seq("x.b".attr === "y.d".attr)))
-    testExtract(x.join(y).join(x.join(z)), Some(Seq(x, y, x.join(z)), Seq()))
-    testExtract(x.join(y).join(x.join(z)).where("x.b".attr === "y.d".attr),
-      Some(Seq(x, y, x.join(z)), Seq("x.b".attr === "y.d".attr)))
-  }
-
-  test("reorder inner joins") {
-    val x = testRelation.subquery('x)
-    val y = testRelation1.subquery('y)
-    val z = testRelation.subquery('z)
-
-    val originalQuery = {
-      x.join(y).join(z)
-        .where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr))
-    }
-
-    val optimized = Optimize.execute(originalQuery.analyze)
-    val correctAnswer =
-      x.join(z, condition = Some("x.b".attr === "z.b".attr))
-        .join(y, condition = Some("y.d".attr === "z.a".attr))
-        .analyze
-
-    comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
-  }
-
-  test("broadcasthint sets relation statistics to smallest value") {
-    val input = LocalRelation('key.int, 'value.string)
-
-    val query =
-      Project(Seq($"x.key", $"y.key"),
-        Join(
-          SubqueryAlias("x", input),
-          BroadcastHint(SubqueryAlias("y", input)), Inner, None)).analyze
-
-    val optimized = Optimize.execute(query)
-
-    val expected =
-      Project(Seq($"x.key", $"y.key"),
-        Join(
-          Project(Seq($"x.key"), SubqueryAlias("x", input)),
-          BroadcastHint(
-            Project(Seq($"y.key"), SubqueryAlias("y", input))),
-          Inner, None)).analyze
-
-    comparePlans(optimized, expected)
-
-    val broadcastChildren = optimized.collect {
-      case Join(_, r, _, _) if r.statistics.sizeInBytes == 1 => r
-    }
-    assert(broadcastChildren.size == 1)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/382b27ba/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala
new file mode 100644
index 0000000..a5b487b
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOrderSuite.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.optimizer
+
+import org.apache.spark.sql.catalyst.analysis
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.sql.catalyst.planning.ExtractFiltersAndInnerJoins
+import org.apache.spark.sql.catalyst.plans.PlanTest
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
+
+class JoinOrderSuite extends PlanTest {
+
+  object Optimize extends RuleExecutor[LogicalPlan] {
+    val batches =
+      Batch("Subqueries", Once,
+        EliminateSubqueryAliases) ::
+      Batch("Filter Pushdown", Once,
+        CombineFilters,
+        PushPredicateThroughProject,
+        BooleanSimplification,
+        ReorderJoin,
+        PushPredicateThroughJoin,
+        PushPredicateThroughGenerate,
+        PushPredicateThroughAggregate,
+        ColumnPruning,
+        CollapseProject) :: Nil
+
+  }
+
+  val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
+  val testRelation1 = LocalRelation('d.int)
+
+  test("extract filters and joins") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+    val z = testRelation.subquery('z)
+
+    def testExtract(plan: LogicalPlan, expected: Option[(Seq[LogicalPlan], 
Seq[Expression])]) {
+      assert(ExtractFiltersAndInnerJoins.unapply(plan) === expected)
+    }
+
+    testExtract(x, None)
+    testExtract(x.where("x.b".attr === 1), None)
+    testExtract(x.join(y), Some(Seq(x, y), Seq()))
+    testExtract(x.join(y, condition = Some("x.b".attr === "y.d".attr)),
+      Some(Seq(x, y), Seq("x.b".attr === "y.d".attr)))
+    testExtract(x.join(y).where("x.b".attr === "y.d".attr),
+      Some(Seq(x, y), Seq("x.b".attr === "y.d".attr)))
+    testExtract(x.join(y).join(z), Some(Seq(x, y, z), Seq()))
+    testExtract(x.join(y).where("x.b".attr === "y.d".attr).join(z),
+      Some(Seq(x, y, z), Seq("x.b".attr === "y.d".attr)))
+    testExtract(x.join(y).join(x.join(z)), Some(Seq(x, y, x.join(z)), Seq()))
+    testExtract(x.join(y).join(x.join(z)).where("x.b".attr === "y.d".attr),
+      Some(Seq(x, y, x.join(z)), Seq("x.b".attr === "y.d".attr)))
+  }
+
+  test("reorder inner joins") {
+    val x = testRelation.subquery('x)
+    val y = testRelation1.subquery('y)
+    val z = testRelation.subquery('z)
+
+    val originalQuery = {
+      x.join(y).join(z)
+        .where(("x.b".attr === "z.b".attr) && ("y.d".attr === "z.a".attr))
+    }
+
+    val optimized = Optimize.execute(originalQuery.analyze)
+    val correctAnswer =
+      x.join(z, condition = Some("x.b".attr === "z.b".attr))
+        .join(y, condition = Some("y.d".attr === "z.a".attr))
+        .analyze
+
+    comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/382b27ba/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 247eb05..7347156 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -81,13 +81,11 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
    * Matches a plan whose output should be small enough to be used in 
broadcast join.
    */
   object CanBroadcast {
-    def unapply(plan: LogicalPlan): Option[LogicalPlan] = {
-      if (sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
-          plan.statistics.sizeInBytes <= 
sqlContext.conf.autoBroadcastJoinThreshold) {
-        Some(plan)
-      } else {
-        None
-      }
+    def unapply(plan: LogicalPlan): Option[LogicalPlan] = plan match {
+      case BroadcastHint(p) => Some(p)
+      case p if sqlContext.conf.autoBroadcastJoinThreshold > 0 &&
+        p.statistics.sizeInBytes <= sqlContext.conf.autoBroadcastJoinThreshold 
=> Some(p)
+      case _ => None
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to