Repository: spark Updated Branches: refs/heads/branch-2.0 199bac8fa -> 9bfb16a6b
[SPARK-15459][SQL] Make Range logical and physical explain consistent ## What changes were proposed in this pull request? This patch simplifies the implementation of Range operator and make the explain string consistent between logical plan and physical plan. To do this, I changed RangeExec to embed a Range logical plan in it. Before this patch (note that the logical Range and physical Range actually output different information): ``` == Optimized Logical Plan == Range 0, 100, 2, 2, [id#8L] == Physical Plan == *Range 0, 2, 2, 50, [id#8L] ``` After this patch: If step size is 1: ``` == Optimized Logical Plan == Range(0, 100, splits=2) == Physical Plan == *Range(0, 100, splits=2) ``` If step size is not 1: ``` == Optimized Logical Plan == Range (0, 100, step=2, splits=2) == Physical Plan == *Range (0, 100, step=2, splits=2) ``` ## How was this patch tested? N/A Author: Reynold Xin <r...@databricks.com> Closes #13239 from rxin/SPARK-15459. (cherry picked from commit 845e447fa03bf0a53ed79fa7e240af94dc152d2c) Signed-off-by: Reynold Xin <r...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9bfb16a6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9bfb16a6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9bfb16a6 Branch: refs/heads/branch-2.0 Commit: 9bfb16a6b042cdf8217df2e335027fa6c67366cf Parents: 199bac8 Author: Reynold Xin <r...@databricks.com> Authored: Sun May 22 00:03:37 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Sun May 22 00:03:43 2016 -0700 ---------------------------------------------------------------------- .../plans/logical/basicLogicalOperators.scala | 18 +++++++++++--- .../catalyst/catalog/SessionCatalogSuite.scala | 16 ++++++------ .../spark/sql/execution/SparkStrategies.scala | 4 +-- .../sql/execution/basicPhysicalOperators.scala | 26 +++++++++----------- .../spark/sql/internal/CatalogSuite.scala | 2 +- 5 files changed, 37 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9bfb16a6/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index bed48b6..b1b3e00 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -431,8 +431,11 @@ case class Range( end: Long, step: Long, numSlices: Int, - output: Seq[Attribute]) extends LeafNode with MultiInstanceRelation { - require(step != 0, "step cannot be 0") + output: Seq[Attribute]) + extends LeafNode with MultiInstanceRelation { + + require(step != 0, s"step ($step) cannot be 0") + val numElements: BigInt = { val safeStart = BigInt(start) val safeEnd = BigInt(end) @@ -444,13 +447,20 @@ case class Range( } } - override def newInstance(): Range = - Range(start, end, step, numSlices, output.map(_.newInstance())) + override def newInstance(): Range = copy(output = output.map(_.newInstance())) override def statistics: Statistics = { val sizeInBytes = LongType.defaultSize * numElements Statistics( sizeInBytes = sizeInBytes ) } + + override def simpleString: String = { + if (step == 1) { + s"Range ($start, $end, splits=$numSlices)" + } else { + s"Range ($start, $end, step=$step, splits=$numSlices)" + } + } } case class Aggregate( http://git-wip-us.apache.org/repos/asf/spark/blob/9bfb16a6/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala index 91e2e07..a4dc03c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -197,8 +197,8 @@ class SessionCatalogSuite extends SparkFunSuite { test("create temp table") { val catalog = new SessionCatalog(newBasicCatalog()) - val tempTable1 = Range(1, 10, 1, 10, Seq()) - val tempTable2 = Range(1, 20, 2, 10, Seq()) + val tempTable1 = Range(1, 10, 1, 10) + val tempTable2 = Range(1, 20, 2, 10) catalog.createTempView("tbl1", tempTable1, overrideIfExists = false) catalog.createTempView("tbl2", tempTable2, overrideIfExists = false) assert(catalog.getTempTable("tbl1") == Option(tempTable1)) @@ -243,7 +243,7 @@ class SessionCatalogSuite extends SparkFunSuite { test("drop temp table") { val externalCatalog = newBasicCatalog() val sessionCatalog = new SessionCatalog(externalCatalog) - val tempTable = Range(1, 10, 2, 10, Seq()) + val tempTable = Range(1, 10, 2, 10) sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false) sessionCatalog.setCurrentDatabase("db2") assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable)) @@ -304,7 +304,7 @@ class SessionCatalogSuite extends SparkFunSuite { test("rename temp table") { val externalCatalog = newBasicCatalog() val sessionCatalog = new SessionCatalog(externalCatalog) - val tempTable = Range(1, 10, 2, 10, Seq()) + val tempTable = Range(1, 10, 2, 10) sessionCatalog.createTempView("tbl1", tempTable, overrideIfExists = false) sessionCatalog.setCurrentDatabase("db2") assert(sessionCatalog.getTempTable("tbl1") == Option(tempTable)) @@ -383,7 +383,7 @@ class SessionCatalogSuite extends SparkFunSuite { test("lookup table relation") { val externalCatalog = newBasicCatalog() val sessionCatalog = new SessionCatalog(externalCatalog) - val tempTable1 = Range(1, 10, 1, 10, Seq()) + val tempTable1 = Range(1, 10, 1, 10) val metastoreTable1 = externalCatalog.getTable("db2", "tbl1") sessionCatalog.createTempView("tbl1", tempTable1, overrideIfExists = false) sessionCatalog.setCurrentDatabase("db2") @@ -422,7 +422,7 @@ class SessionCatalogSuite extends SparkFunSuite { assert(!catalog.tableExists(TableIdentifier("tbl1", Some("db1")))) assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1")))) // If database is explicitly specified, do not check temporary tables - val tempTable = Range(1, 10, 1, 10, Seq()) + val tempTable = Range(1, 10, 1, 10) catalog.createTempView("tbl3", tempTable, overrideIfExists = false) assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2")))) // If database is not explicitly specified, check the current database @@ -434,7 +434,7 @@ class SessionCatalogSuite extends SparkFunSuite { test("list tables without pattern") { val catalog = new SessionCatalog(newBasicCatalog()) - val tempTable = Range(1, 10, 2, 10, Seq()) + val tempTable = Range(1, 10, 2, 10) catalog.createTempView("tbl1", tempTable, overrideIfExists = false) catalog.createTempView("tbl4", tempTable, overrideIfExists = false) assert(catalog.listTables("db1").toSet == @@ -451,7 +451,7 @@ class SessionCatalogSuite extends SparkFunSuite { test("list tables with pattern") { val catalog = new SessionCatalog(newBasicCatalog()) - val tempTable = Range(1, 10, 2, 10, Seq()) + val tempTable = Range(1, 10, 2, 10) catalog.createTempView("tbl1", tempTable, overrideIfExists = false) catalog.createTempView("tbl4", tempTable, overrideIfExists = false) assert(catalog.listTables("db1", "*").toSet == catalog.listTables("db1").toSet) http://git-wip-us.apache.org/repos/asf/spark/blob/9bfb16a6/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 664e7f5..555a2f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -360,8 +360,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { generator, join = join, outer = outer, g.output, planLater(child)) :: Nil case logical.OneRowRelation => execution.RDDScanExec(Nil, singleRowRdd, "OneRowRelation") :: Nil - case r @ logical.Range(start, end, step, numSlices, output) => - execution.RangeExec(start, step, numSlices, r.numElements, output) :: Nil + case r : logical.Range => + execution.RangeExec(r) :: Nil case logical.RepartitionByExpression(expressions, child, nPartitions) => exchange.ShuffleExchange(HashPartitioning( expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil http://git-wip-us.apache.org/repos/asf/spark/blob/9bfb16a6/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index d492fa7..89bde6a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.metric.SQLMetrics -import org.apache.spark.sql.types.LongType +import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler} /** Physical plan for Project. */ @@ -305,22 +305,18 @@ case class SampleExec( /** - * Physical plan for range (generating a range of 64 bit numbers. - * - * @param start first number in the range, inclusive. - * @param step size of the step increment. - * @param numSlices number of partitions. - * @param numElements total number of elements to output. - * @param output output attributes. + * Physical plan for range (generating a range of 64 bit numbers). */ -case class RangeExec( - start: Long, - step: Long, - numSlices: Int, - numElements: BigInt, - output: Seq[Attribute]) +case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) extends LeafExecNode with CodegenSupport { + def start: Long = range.start + def step: Long = range.step + def numSlices: Int = range.numSlices + def numElements: BigInt = range.numElements + + override val output: Seq[Attribute] = range.output + private[sql] override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -458,6 +454,8 @@ case class RangeExec( } } } + + override def simpleString: String = range.simpleString } /** http://git-wip-us.apache.org/repos/asf/spark/blob/9bfb16a6/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index e4d4cec..cd434f7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -58,7 +58,7 @@ class CatalogSuite } private def createTempTable(name: String): Unit = { - sessionCatalog.createTempView(name, Range(1, 2, 3, 4, Seq()), overrideIfExists = true) + sessionCatalog.createTempView(name, Range(1, 2, 3, 4), overrideIfExists = true) } private def dropTable(name: String, db: Option[String] = None): Unit = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org