Repository: spark
Updated Branches:
  refs/heads/master fb219029d -> 667d4ea7b


[SPARK-6320][SQL] Move planLater method into GenericStrategy.

## What changes were proposed in this pull request?

This PR moves `QueryPlanner.planLater()` method into `GenericStrategy` for 
extra strategies to be able to use `planLater` in its strategy.

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <ues...@happy-camper.st>

Closes #13147 from ueshin/issues/SPARK-6320.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/667d4ea7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/667d4ea7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/667d4ea7

Branch: refs/heads/master
Commit: 667d4ea7b35f285954ea7cb719b7c80581e31f4d
Parents: fb21902
Author: Takuya UESHIN <ues...@happy-camper.st>
Authored: Fri Jun 10 13:06:18 2016 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Fri Jun 10 13:06:18 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/planning/QueryPlanner.scala    | 58 ++++++++++++++----
 .../spark/sql/execution/QueryExecution.scala    |  2 +
 .../spark/sql/execution/SparkPlanner.scala      | 13 ++++
 .../spark/sql/execution/SparkStrategies.scala   | 23 +++++++
 .../scala/org/apache/spark/sql/package.scala    |  4 +-
 .../spark/sql/execution/SparkPlannerSuite.scala | 63 ++++++++++++++++++++
 6 files changed, 151 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/667d4ea7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
index 8b1a34f..5f694f4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
@@ -27,6 +27,14 @@ import org.apache.spark.sql.catalyst.trees.TreeNode
  * empty list should be returned.
  */
 abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends 
Logging {
+
+  /**
+   * Returns a placeholder for a physical plan that executes `plan`. This 
placeholder will be
+   * filled in automatically by the QueryPlanner using the other execution 
strategies that are
+   * available.
+   */
+  protected def planLater(plan: LogicalPlan): PhysicalPlan
+
   def apply(plan: LogicalPlan): Seq[PhysicalPlan]
 }
 
@@ -47,17 +55,47 @@ abstract class QueryPlanner[PhysicalPlan <: 
TreeNode[PhysicalPlan]] {
   /** A list of execution strategies that can be used by the planner */
   def strategies: Seq[GenericStrategy[PhysicalPlan]]
 
-  /**
-   * Returns a placeholder for a physical plan that executes `plan`. This 
placeholder will be
-   * filled in automatically by the QueryPlanner using the other execution 
strategies that are
-   * available.
-   */
-  protected def planLater(plan: LogicalPlan): PhysicalPlan = 
this.plan(plan).next()
-
   def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
     // Obviously a lot to do here still...
-    val iter = strategies.view.flatMap(_(plan)).toIterator
-    assert(iter.hasNext, s"No plan for $plan")
-    iter
+
+    // Collect physical plan candidates.
+    val candidates = strategies.iterator.flatMap(_(plan))
+
+    // The candidates may contain placeholders marked as [[planLater]],
+    // so try to replace them by their child plans.
+    val plans = candidates.flatMap { candidate =>
+      val placeholders = collectPlaceholders(candidate)
+
+      if (placeholders.isEmpty) {
+        // Take the candidate as is because it does not contain placeholders.
+        Iterator(candidate)
+      } else {
+        // Plan the logical plan marked as [[planLater]] and replace the 
placeholders.
+        placeholders.iterator.foldLeft(Iterator(candidate)) {
+          case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
+            // Plan the logical plan for the placeholder.
+            val childPlans = this.plan(logicalPlan)
+
+            candidatesWithPlaceholders.flatMap { candidateWithPlaceholders =>
+              childPlans.map { childPlan =>
+                // Replace the placeholder by the child plan
+                candidateWithPlaceholders.transformUp {
+                  case p if p == placeholder => childPlan
+                }
+              }
+            }
+        }
+      }
+    }
+
+    val pruned = prunePlans(plans)
+    assert(pruned.hasNext, s"No plan for $plan")
+    pruned
   }
+
+  /** Collects placeholders marked as [[planLater]] by strategy and its 
[[LogicalPlan]]s */
+  protected def collectPlaceholders(plan: PhysicalPlan): Seq[(PhysicalPlan, 
LogicalPlan)]
+
+  /** Prunes bad plans to prevent combinatorial explosion. */
+  protected def prunePlans(plans: Iterator[PhysicalPlan]): 
Iterator[PhysicalPlan]
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/667d4ea7/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index a2d4502..e6dc50a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -75,6 +75,8 @@ class QueryExecution(val sparkSession: SparkSession, val 
logical: LogicalPlan) {
 
   lazy val sparkPlan: SparkPlan = {
     SparkSession.setActiveSession(sparkSession)
+    // TODO: We use next(), i.e. take the first plan returned by the planner, 
here for now,
+    //       but we will implement to choose the best plan.
     planner.plan(ReturnAnswer(optimizedPlan)).next()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/667d4ea7/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
index de832ec..73e2ffd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
 import org.apache.spark.SparkContext
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, 
FileSourceStrategy}
 import org.apache.spark.sql.internal.SQLConf
 
@@ -42,6 +43,18 @@ class SparkPlanner(
       InMemoryScans ::
       BasicOperators :: Nil)
 
+  override protected def collectPlaceholders(plan: SparkPlan): Seq[(SparkPlan, 
LogicalPlan)] = {
+    plan.collect {
+      case placeholder @ PlanLater(logicalPlan) => placeholder -> logicalPlan
+    }
+  }
+
+  override protected def prunePlans(plans: Iterator[SparkPlan]): 
Iterator[SparkPlan] = {
+    // TODO: We will need to prune bad plans when we improve plan space 
exploration
+    //       to prevent combinatorial explosion.
+    plans
+  }
+
   /**
    * Used to build table scan operators where complex projection and filtering 
are done using
    * separate physical operators.  This function returns the given scan 
operator with Project and

http://git-wip-us.apache.org/repos/asf/spark/blob/667d4ea7/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index a36fe78..d1261dd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution
 
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{AnalysisException, Strategy}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
@@ -35,6 +37,27 @@ import org.apache.spark.sql.execution.streaming.MemoryPlan
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming.ContinuousQuery
 
+/**
+ * Converts a logical plan into zero or more SparkPlans.  This API is exposed 
for experimenting
+ * with the query planner and is not designed to be stable across spark 
releases.  Developers
+ * writing libraries should instead consider using the stable APIs provided in
+ * [[org.apache.spark.sql.sources]]
+ */
+@DeveloperApi
+abstract class SparkStrategy extends GenericStrategy[SparkPlan] {
+
+  override protected def planLater(plan: LogicalPlan): SparkPlan = 
PlanLater(plan)
+}
+
+private[sql] case class PlanLater(plan: LogicalPlan) extends LeafExecNode {
+
+  override def output: Seq[Attribute] = plan.output
+
+  protected override def doExecute(): RDD[InternalRow] = {
+    throw new UnsupportedOperationException()
+  }
+}
+
 private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
   self: SparkPlanner =>
 

http://git-wip-us.apache.org/repos/asf/spark/blob/667d4ea7/sql/core/src/main/scala/org/apache/spark/sql/package.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
index 97e35bb..28d8bc3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
@@ -18,7 +18,7 @@
 package org.apache.spark
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
 
 /**
  * Allows the execution of relational queries, including those expressed in 
SQL using Spark.
@@ -40,7 +40,7 @@ package object sql {
    * [[org.apache.spark.sql.sources]]
    */
   @DeveloperApi
-  type Strategy = 
org.apache.spark.sql.catalyst.planning.GenericStrategy[SparkPlan]
+  type Strategy = SparkStrategy
 
   type DataFrame = Dataset[Row]
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/667d4ea7/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala
new file mode 100644
index 0000000..aecfd30
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.Strategy
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, 
LogicalPlan, ReturnAnswer, Union}
+import org.apache.spark.sql.test.SharedSQLContext
+
+class SparkPlannerSuite extends SharedSQLContext {
+  import testImplicits._
+
+  test("Ensure to go down only the first branch, not any other possible 
branches") {
+
+    case object NeverPlanned extends LeafNode {
+      override def output: Seq[Attribute] = Nil
+    }
+
+    var planned = 0
+    object TestStrategy extends Strategy {
+      def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
+        case ReturnAnswer(child) =>
+          planned += 1
+          planLater(child) :: planLater(NeverPlanned) :: Nil
+        case Union(children) =>
+          planned += 1
+          UnionExec(children.map(planLater)) :: planLater(NeverPlanned) :: Nil
+        case LocalRelation(output, data) =>
+          planned += 1
+          LocalTableScanExec(output, data) :: planLater(NeverPlanned) :: Nil
+        case NeverPlanned =>
+          fail("QueryPlanner should not go down to this branch.")
+        case _ => Nil
+      }
+    }
+
+    try {
+      spark.experimental.extraStrategies = TestStrategy :: Nil
+
+      val ds = Seq("a", "b", "c").toDS().union(Seq("d", "e", "f").toDS())
+
+      assert(ds.collect().toSeq === Seq("a", "b", "c", "d", "e", "f"))
+      assert(planned === 4)
+    } finally {
+      spark.experimental.extraStrategies = Nil
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to