Repository: spark
Updated Branches:
  refs/heads/branch-2.0 199bac8fa -> 9bfb16a6b


[SPARK-15459][SQL] Make Range logical and physical explain consistent

## What changes were proposed in this pull request?
This patch simplifies the implementation of Range operator and make the explain 
string consistent between logical plan and physical plan. To do this, I changed 
RangeExec to embed a Range logical plan in it.

Before this patch (note that the logical Range and physical Range actually 
output different information):
```
== Optimized Logical Plan ==
Range 0, 100, 2, 2, [id#8L]

== Physical Plan ==
*Range 0, 2, 2, 50, [id#8L]
```

After this patch:
If step size is 1:
```
== Optimized Logical Plan ==
Range(0, 100, splits=2)

== Physical Plan ==
*Range(0, 100, splits=2)
```

If step size is not 1:
```
== Optimized Logical Plan ==
Range (0, 100, step=2, splits=2)

== Physical Plan ==
*Range (0, 100, step=2, splits=2)
```

## How was this patch tested?
N/A

Author: Reynold Xin <r...@databricks.com>

Closes #13239 from rxin/SPARK-15459.

(cherry picked from commit 845e447fa03bf0a53ed79fa7e240af94dc152d2c)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9bfb16a6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9bfb16a6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9bfb16a6

Branch: refs/heads/branch-2.0
Commit: 9bfb16a6b042cdf8217df2e335027fa6c67366cf
Parents: 199bac8
Author: Reynold Xin <r...@databricks.com>
Authored: Sun May 22 00:03:37 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Sun May 22 00:03:43 2016 -0700

----------------------------------------------------------------------
 .../plans/logical/basicLogicalOperators.scala   | 18 +++++++++++---
 .../catalyst/catalog/SessionCatalogSuite.scala  | 16 ++++++------
 .../spark/sql/execution/SparkStrategies.scala   |  4 +--
 .../sql/execution/basicPhysicalOperators.scala  | 26 +++++++++-----------
 .../spark/sql/internal/CatalogSuite.scala       |  2 +-
 5 files changed, 37 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9bfb16a6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index bed48b6..b1b3e00 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -431,8 +431,11 @@ case class Range(
     end: Long,
     step: Long,
     numSlices: Int,
-    output: Seq[Attribute]) extends LeafNode with MultiInstanceRelation {
-  require(step != 0, "step cannot be 0")
+    output: Seq[Attribute])
+  extends LeafNode with MultiInstanceRelation {
+
+  require(step != 0, s"step ($step) cannot be 0")
+
   val numElements: BigInt = {
     val safeStart = BigInt(start)
     val safeEnd = BigInt(end)
@@ -444,13 +447,20 @@ case class Range(
     }
   }
 
-  override def newInstance(): Range =
-    Range(start, end, step, numSlices, output.map(_.newInstance()))
+  override def newInstance(): Range = copy(output = 
output.map(_.newInstance()))
 
   override def statistics: Statistics = {
     val sizeInBytes = LongType.defaultSize * numElements
     Statistics( sizeInBytes = sizeInBytes )
   }
+
+  override def simpleString: String = {
+    if (step == 1) {
+      s"Range ($start, $end, splits=$numSlices)"
+    } else {
+      s"Range ($start, $end, step=$step, splits=$numSlices)"
+    }
+  }
 }
 
 case class Aggregate(

http://git-wip-us.apache.org/repos/asf/spark/blob/9bfb16a6/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 91e2e07..a4dc03c 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -197,8 +197,8 @@ class SessionCatalogSuite extends SparkFunSuite {
 
   test("create temp table") {
     val catalog = new SessionCatalog(newBasicCatalog())
-    val tempTable1 = Range(1, 10, 1, 10, Seq())
-    val tempTable2 = Range(1, 20, 2, 10, Seq())
+    val tempTable1 = Range(1, 10, 1, 10)
+    val tempTable2 = Range(1, 20, 2, 10)
     catalog.createTempView("tbl1", tempTable1, overrideIfExists = false)
     catalog.createTempView("tbl2", tempTable2, overrideIfExists = false)
     assert(catalog.getTempTable("tbl1") == Option(tempTable1))
@@ -243,7 +243,7 @@ class SessionCatalogSuite extends SparkFunSuite {
   test("drop temp table") {
     val externalCatalog = newBasicCatalog()
     val sessionCatalog = new SessionCatalog(externalCatalog)
-    val tempTable = Range(1, 10, 2, 10, Seq())
+    val tempTable = Range(1, 10, 2, 10)
     sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false)
     sessionCatalog.setCurrentDatabase("db2")
     assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable))
@@ -304,7 +304,7 @@ class SessionCatalogSuite extends SparkFunSuite {
   test("rename temp table") {
     val externalCatalog = newBasicCatalog()
     val sessionCatalog = new SessionCatalog(externalCatalog)
-    val tempTable = Range(1, 10, 2, 10, Seq())
+    val tempTable = Range(1, 10, 2, 10)
     sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false)
     sessionCatalog.setCurrentDatabase("db2")
     assert(sessionCatalog.getTempTable("tbl1") == Option(tempTable))
@@ -383,7 +383,7 @@ class SessionCatalogSuite extends SparkFunSuite {
   test("lookup table relation") {
     val externalCatalog = newBasicCatalog()
     val sessionCatalog = new SessionCatalog(externalCatalog)
-    val tempTable1 = Range(1, 10, 1, 10, Seq())
+    val tempTable1 = Range(1, 10, 1, 10)
     val metastoreTable1 = externalCatalog.getTable("db2", "tbl1")
     sessionCatalog.createTempView("tbl1", tempTable1, overrideIfExists = false)
     sessionCatalog.setCurrentDatabase("db2")
@@ -422,7 +422,7 @@ class SessionCatalogSuite extends SparkFunSuite {
     assert(!catalog.tableExists(TableIdentifier("tbl1", Some("db1"))))
     assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1"))))
     // If database is explicitly specified, do not check temporary tables
-    val tempTable = Range(1, 10, 1, 10, Seq())
+    val tempTable = Range(1, 10, 1, 10)
     catalog.createTempView("tbl3", tempTable, overrideIfExists = false)
     assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
     // If database is not explicitly specified, check the current database
@@ -434,7 +434,7 @@ class SessionCatalogSuite extends SparkFunSuite {
 
   test("list tables without pattern") {
     val catalog = new SessionCatalog(newBasicCatalog())
-    val tempTable = Range(1, 10, 2, 10, Seq())
+    val tempTable = Range(1, 10, 2, 10)
     catalog.createTempView("tbl1", tempTable, overrideIfExists = false)
     catalog.createTempView("tbl4", tempTable, overrideIfExists = false)
     assert(catalog.listTables("db1").toSet ==
@@ -451,7 +451,7 @@ class SessionCatalogSuite extends SparkFunSuite {
 
   test("list tables with pattern") {
     val catalog = new SessionCatalog(newBasicCatalog())
-    val tempTable = Range(1, 10, 2, 10, Seq())
+    val tempTable = Range(1, 10, 2, 10)
     catalog.createTempView("tbl1", tempTable, overrideIfExists = false)
     catalog.createTempView("tbl4", tempTable, overrideIfExists = false)
     assert(catalog.listTables("db1", "*").toSet == 
catalog.listTables("db1").toSet)

http://git-wip-us.apache.org/repos/asf/spark/blob/9bfb16a6/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 664e7f5..555a2f4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -360,8 +360,8 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
           generator, join = join, outer = outer, g.output, planLater(child)) 
:: Nil
       case logical.OneRowRelation =>
         execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil
-      case r @ logical.Range(start, end, step, numSlices, output) =>
-        execution.RangeExec(start, step, numSlices, r.numElements, output) :: 
Nil
+      case r : logical.Range =>
+        execution.RangeExec(r) :: Nil
       case logical.RepartitionByExpression(expressions, child, nPartitions) =>
         exchange.ShuffleExchange(HashPartitioning(
           expressions, nPartitions.getOrElse(numPartitions)), 
planLater(child)) :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/9bfb16a6/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index d492fa7..89bde6a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, 
ExprCode, ExpressionCanonicalizer}
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution.metric.SQLMetrics
-import org.apache.spark.sql.types.LongType
+import org.apache.spark.sql.types.{LongType, StructField, StructType}
 import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler}
 
 /** Physical plan for Project. */
@@ -305,22 +305,18 @@ case class SampleExec(
 
 
 /**
- * Physical plan for range (generating a range of 64 bit numbers.
- *
- * @param start first number in the range, inclusive.
- * @param step size of the step increment.
- * @param numSlices number of partitions.
- * @param numElements total number of elements to output.
- * @param output output attributes.
+ * Physical plan for range (generating a range of 64 bit numbers).
  */
-case class RangeExec(
-    start: Long,
-    step: Long,
-    numSlices: Int,
-    numElements: BigInt,
-    output: Seq[Attribute])
+case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range)
   extends LeafExecNode with CodegenSupport {
 
+  def start: Long = range.start
+  def step: Long = range.step
+  def numSlices: Int = range.numSlices
+  def numElements: BigInt = range.numElements
+
+  override val output: Seq[Attribute] = range.output
+
   private[sql] override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
 
@@ -458,6 +454,8 @@ case class RangeExec(
         }
       }
   }
+
+  override def simpleString: String = range.simpleString
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/9bfb16a6/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index e4d4cec..cd434f7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -58,7 +58,7 @@ class CatalogSuite
   }
 
   private def createTempTable(name: String): Unit = {
-    sessionCatalog.createTempView(name, Range(1, 2, 3, 4, Seq()), 
overrideIfExists = true)
+    sessionCatalog.createTempView(name, Range(1, 2, 3, 4), overrideIfExists = 
true)
   }
 
   private def dropTable(name: String, db: Option[String] = None): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to