Repository: spark
Updated Branches:
  refs/heads/branch-2.0 a780848af -> 71e8aaeaa


[SPARK-6320][SQL] Move planLater method into GenericStrategy.

## What changes were proposed in this pull request?

This PR is the minimal version of #13147 for `branch-2.0`.

## How was this patch tested?

Picked `SparkPlannerSuite` from #13147.

Author: Takuya UESHIN <ues...@happy-camper.st>

Closes #13426 from ueshin/issues/SPARK-6320_2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/71e8aaea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/71e8aaea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/71e8aaea

Branch: refs/heads/branch-2.0
Commit: 71e8aaeaa9c2983d6f2ab8c512e59f5b13e8844e
Parents: a780848
Author: Takuya UESHIN <ues...@happy-camper.st>
Authored: Wed Jun 1 10:28:48 2016 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Wed Jun 1 10:28:48 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/planning/QueryPlanner.scala    | 15 ++---
 .../spark/sql/execution/SparkPlanner.scala      | 11 ++++
 .../spark/sql/execution/SparkStrategies.scala   | 23 +++++++
 .../scala/org/apache/spark/sql/package.scala    |  4 +-
 .../spark/sql/execution/SparkPlannerSuite.scala | 63 ++++++++++++++++++++
 5 files changed, 107 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/71e8aaea/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
index 8b1a34f..327a048 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
@@ -27,6 +27,14 @@ import org.apache.spark.sql.catalyst.trees.TreeNode
  * empty list should be returned.
  */
 abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends 
Logging {
+
+  /**
+   * Returns a placeholder for a physical plan that executes `plan`. This 
placeholder will be
+   * filled in automatically by the QueryPlanner using the other execution 
strategies that are
+   * available.
+   */
+  protected def planLater(plan: LogicalPlan): PhysicalPlan
+
   def apply(plan: LogicalPlan): Seq[PhysicalPlan]
 }
 
@@ -47,13 +55,6 @@ abstract class QueryPlanner[PhysicalPlan <: 
TreeNode[PhysicalPlan]] {
   /** A list of execution strategies that can be used by the planner */
   def strategies: Seq[GenericStrategy[PhysicalPlan]]
 
-  /**
-   * Returns a placeholder for a physical plan that executes `plan`. This 
placeholder will be
-   * filled in automatically by the QueryPlanner using the other execution 
strategies that are
-   * available.
-   */
-  protected def planLater(plan: LogicalPlan): PhysicalPlan = 
this.plan(plan).next()
-
   def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
     // Obviously a lot to do here still...
     val iter = strategies.view.flatMap(_(plan)).toIterator

http://git-wip-us.apache.org/repos/asf/spark/blob/71e8aaea/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
index de832ec..319fff1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
 import org.apache.spark.SparkContext
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, 
FileSourceStrategy}
 import org.apache.spark.sql.internal.SQLConf
 
@@ -42,6 +43,16 @@ class SparkPlanner(
       InMemoryScans ::
       BasicOperators :: Nil)
 
+  override def plan(plan: LogicalPlan): Iterator[SparkPlan] = {
+    super.plan(plan).map {
+      _.transformUp {
+        case PlanLater(p) =>
+          // TODO: use the first plan for now, but we will implement plan 
space exploaration later.
+          this.plan(p).next()
+      }
+    }
+  }
+
   /**
    * Used to build table scan operators where complex projection and filtering 
are done using
    * separate physical operators.  This function returns the given scan 
operator with Project and

http://git-wip-us.apache.org/repos/asf/spark/blob/71e8aaea/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 7e3e45e..5a069f2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution
 
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Strategy}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
@@ -35,6 +37,27 @@ import org.apache.spark.sql.execution.streaming.MemoryPlan
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.ContinuousQuery
 
+/**
+ * Converts a logical plan into zero or more SparkPlans.  This API is exposed 
for experimenting
+ * with the query planner and is not designed to be stable across spark 
releases.  Developers
+ * writing libraries should instead consider using the stable APIs provided in
+ * [[org.apache.spark.sql.sources]]
+ */
+@DeveloperApi
+abstract class SparkStrategy extends GenericStrategy[SparkPlan] {
+
+  override protected def planLater(plan: LogicalPlan): SparkPlan = 
PlanLater(plan)
+}
+
+private[sql] case class PlanLater(plan: LogicalPlan) extends LeafExecNode {
+
+  override def output: Seq[Attribute] = plan.output
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    throw new UnsupportedOperationException()
+  }
+}
+
 private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
   self: SparkPlanner =>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/71e8aaea/sql/core/src/main/scala/org/apache/spark/sql/package.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
index 97e35bb..28d8bc3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
@@ -18,7 +18,7 @@
 package org.apache.spark
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
 
 /**
  * Allows the execution of relational queries, including those expressed in 
SQL using Spark.
@@ -40,7 +40,7 @@ package object sql {
    * [[org.apache.spark.sql.sources]]
    */
   @DeveloperApi
-  type Strategy = 
org.apache.spark.sql.catalyst.planning.GenericStrategy[SparkPlan]
+  type Strategy = SparkStrategy
 
   type DataFrame = Dataset[Row]
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/71e8aaea/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala
new file mode 100644
index 0000000..aecfd30
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, 
LogicalPlan, ReturnAnswer, Union}
+import org.apache.spark.sql.test.SharedSQLContext
+
+class SparkPlannerSuite extends SharedSQLContext {
+  import testImplicits._
+
+  test("Ensure to go down only the first branch, not any other possible 
branches") {
+
+    case object NeverPlanned extends LeafNode {
+      override def output: Seq[Attribute] = Nil
+    }
+
+    var planned = 0
+    object TestStrategy extends Strategy {
+      def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+        case ReturnAnswer(child) =>
+          planned += 1
+          planLater(child) :: planLater(NeverPlanned) :: Nil
+        case Union(children) =>
+          planned += 1
+          UnionExec(children.map(planLater)) :: planLater(NeverPlanned) :: Nil
+        case LocalRelation(output, data) =>
+          planned += 1
+          LocalTableScanExec(output, data) :: planLater(NeverPlanned) :: Nil
+        case NeverPlanned =>
+          fail("QueryPlanner should not go down to this branch.")
+        case _ => Nil
+      }
+    }
+
+    try {
+      spark.experimental.extraStrategies = TestStrategy :: Nil
+
+      val ds = Seq("a", "b", "c").toDS().union(Seq("d", "e", "f").toDS())
+
+      assert(ds.collect().toSeq === Seq("a", "b", "c", "d", "e", "f"))
+      assert(planned === 4)
+    } finally {
+      spark.experimental.extraStrategies = Nil
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to