Repository: spark
Updated Branches:
  refs/heads/master aeddeafc0 -> 05d04e10a


[SPARK-9733][SQL] Improve physical plan explain for data sources

All data sources show up as "PhysicalRDD" in physical plan explain. It'd be 
better if we can show the name of the data source.

Without this patch:
```
== Physical Plan ==
NewAggregate with UnsafeHybridAggregationIterator ArrayBuffer(date#0, cat#1) 
ArrayBuffer((sum(CAST((CAST(count#2, IntegerType) + 1), 
LongType))2,mode=Final,isDistinct=false))
 Exchange hashpartitioning(date#0,cat#1)
  NewAggregate with UnsafeHybridAggregationIterator ArrayBuffer(date#0, cat#1) 
ArrayBuffer((sum(CAST((CAST(count#2, IntegerType) + 1), 
LongType))2,mode=Partial,isDistinct=false))
   PhysicalRDD [date#0,cat#1,count#2], MapPartitionsRDD[3] at
```

With this patch:
```
== Physical Plan ==
TungstenAggregate(key=[date#0,cat#1], value=[(sum(CAST((CAST(count#2, 
IntegerType) + 1), LongType)),mode=Final,isDistinct=false)]
 Exchange hashpartitioning(date#0,cat#1)
  TungstenAggregate(key=[date#0,cat#1], value=[(sum(CAST((CAST(count#2, 
IntegerType) + 1), LongType)),mode=Partial,isDistinct=false)]
   ConvertToUnsafe
    Scan ParquetRelation[file:/scratch/rxin/spark/sales4][date#0,cat#1,count#2]
```

Author: Reynold Xin <r...@databricks.com>

Closes #8024 from rxin/SPARK-9733 and squashes the following commits:

811b90e [Reynold Xin] Fixed Python test case.
52cab77 [Reynold Xin] Cast.
eea9ccc [Reynold Xin] Fix test case.
fcecb22 [Reynold Xin] [SPARK-9733][SQL] Improve explain message for data source 
scan node.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/05d04e10
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/05d04e10
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/05d04e10

Branch: refs/heads/master
Commit: 05d04e10a8ea030bea840c3c5ba93ecac479a039
Parents: aeddeaf
Author: Reynold Xin <r...@databricks.com>
Authored: Fri Aug 7 13:41:45 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Fri Aug 7 13:41:45 2015 -0700

----------------------------------------------------------------------
 python/pyspark/sql/dataframe.py                 |  4 +---
 .../spark/sql/catalyst/expressions/Cast.scala   |  4 ++--
 .../expressions/aggregate/interfaces.scala      |  2 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |  4 ----
 .../spark/sql/execution/ExistingRDD.scala       | 15 ++++++++++++-
 .../spark/sql/execution/SparkStrategies.scala   |  4 ++--
 .../execution/aggregate/TungstenAggregate.scala |  9 +++++---
 .../datasources/DataSourceStrategy.scala        | 22 +++++++++++++-------
 .../apache/spark/sql/sources/interfaces.scala   |  2 +-
 .../execution/RowFormatConvertersSuite.scala    |  4 ++--
 .../sql/hive/execution/HiveExplainSuite.scala   |  2 +-
 11 files changed, 45 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/05d04e10/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 0f3480c..47d5a6a 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -212,8 +212,7 @@ class DataFrame(object):
         :param extended: boolean, default ``False``. If ``False``, prints only 
the physical plan.
 
         >>> df.explain()
-        PhysicalRDD [age#0,name#1], MapPartitionsRDD[...] at 
applySchemaToPythonRDD at\
-          NativeMethodAccessorImpl.java:...
+        Scan PhysicalRDD[age#0,name#1]
 
         >>> df.explain(True)
         == Parsed Logical Plan ==
@@ -224,7 +223,6 @@ class DataFrame(object):
         ...
         == Physical Plan ==
         ...
-        == RDD ==
         """
         if extended:
             print(self._jdf.queryExecution().toString())

http://git-wip-us.apache.org/repos/asf/spark/blob/05d04e10/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 39f9970..946c5a9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -107,6 +107,8 @@ object Cast {
 case class Cast(child: Expression, dataType: DataType)
   extends UnaryExpression with CodegenFallback {
 
+  override def toString: String = s"cast($child as ${dataType.simpleString})"
+
   override def checkInputDataTypes(): TypeCheckResult = {
     if (Cast.canCast(child.dataType, dataType)) {
       TypeCheckResult.TypeCheckSuccess
@@ -118,8 +120,6 @@ case class Cast(child: Expression, dataType: DataType)
 
   override def nullable: Boolean = Cast.forceNullable(child.dataType, 
dataType) || child.nullable
 
-  override def toString: String = s"CAST($child, $dataType)"
-
   // [[func]] assumes the input is no longer null because eval already does 
the null check.
   @inline private[this] def buildCast[T](a: Any, func: T => Any): Any = 
func(a.asInstanceOf[T])
 

http://git-wip-us.apache.org/repos/asf/spark/blob/05d04e10/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
index 4abfdfe..576d8c7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
@@ -93,7 +93,7 @@ private[sql] case class AggregateExpression2(
     AttributeSet(childReferences)
   }
 
-  override def toString: String = 
s"(${aggregateFunction}2,mode=$mode,isDistinct=$isDistinct)"
+  override def toString: String = 
s"(${aggregateFunction},mode=$mode,isDistinct=$isDistinct)"
 }
 
 abstract class AggregateFunction2

http://git-wip-us.apache.org/repos/asf/spark/blob/05d04e10/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 075c0ea..8325725 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -1011,9 +1011,6 @@ class SQLContext(@transient val sparkContext: 
SparkContext)
       def output =
         analyzed.output.map(o => s"${o.name}: 
${o.dataType.simpleString}").mkString(", ")
 
-      // TODO previously will output RDD details by run 
(${stringOrError(toRdd.toDebugString)})
-      // however, the `toRdd` will cause the real execution, which is not what 
we want.
-      // We need to think about how to avoid the side effect.
       s"""== Parsed Logical Plan ==
          |${stringOrError(logical)}
          |== Analyzed Logical Plan ==
@@ -1024,7 +1021,6 @@ class SQLContext(@transient val sparkContext: 
SparkContext)
          |== Physical Plan ==
          |${stringOrError(executedPlan)}
          |Code Generation: ${stringOrError(executedPlan.codegenEnabled)}
-         |== RDD ==
       """.stripMargin.trim
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/05d04e10/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index fbaa8e2..cae7ca5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.{InternalRow, 
CatalystTypeConverters}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
 import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericMutableRow}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
+import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.DataType
 import org.apache.spark.sql.{Row, SQLContext}
 
@@ -95,11 +96,23 @@ private[sql] case class LogicalRDD(
 /** Physical plan node for scanning data from an RDD. */
 private[sql] case class PhysicalRDD(
     output: Seq[Attribute],
-    rdd: RDD[InternalRow]) extends LeafNode {
+    rdd: RDD[InternalRow],
+    extraInformation: String) extends LeafNode {
 
   override protected[sql] val trackNumOfRowsEnabled = true
 
   protected override def doExecute(): RDD[InternalRow] = rdd
+
+  override def simpleString: String = "Scan " + extraInformation + 
output.mkString("[", ",", "]")
+}
+
+private[sql] object PhysicalRDD {
+  def createFromDataSource(
+      output: Seq[Attribute],
+      rdd: RDD[InternalRow],
+      relation: BaseRelation): PhysicalRDD = {
+    PhysicalRDD(output, rdd, relation.toString)
+  }
 }
 
 /** Logical plan node for scanning data from a local collection. */

http://git-wip-us.apache.org/repos/asf/spark/blob/05d04e10/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index c5aaebe..c4b9b5a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -363,12 +363,12 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
         execution.Generate(
           generator, join = join, outer = outer, g.output, planLater(child)) 
:: Nil
       case logical.OneRowRelation =>
-        execution.PhysicalRDD(Nil, singleRowRdd) :: Nil
+        execution.PhysicalRDD(Nil, singleRowRdd, "OneRowRelation") :: Nil
       case logical.RepartitionByExpression(expressions, child) =>
         execution.Exchange(HashPartitioning(expressions, numPartitions), 
planLater(child)) :: Nil
       case e @ EvaluatePython(udf, child, _) =>
         BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil
-      case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd) :: Nil
+      case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "PhysicalRDD") 
:: Nil
       case BroadcastHint(child) => apply(child)
       case _ => Nil
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/05d04e10/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
index 5a0b4d4..c3dcbd2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala
@@ -93,10 +93,13 @@ case class TungstenAggregate(
     val allAggregateExpressions = nonCompleteAggregateExpressions ++ 
completeAggregateExpressions
 
     testFallbackStartsAt match {
-      case None => s"TungstenAggregate ${groupingExpressions} 
${allAggregateExpressions}"
+      case None =>
+        val keyString = groupingExpressions.mkString("[", ",", "]")
+        val valueString = allAggregateExpressions.mkString("[", ",", "]")
+        s"TungstenAggregate(key=$keyString, value=$valueString"
       case Some(fallbackStartsAt) =>
-        s"TungstenAggregateWithControlledFallback ${groupingExpressions} " +
-          s"${allAggregateExpressions} fallbackStartsAt=$fallbackStartsAt"
+        s"TungstenAggregateWithControlledFallback $groupingExpressions " +
+          s"$allAggregateExpressions fallbackStartsAt=$fallbackStartsAt"
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/05d04e10/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index e5dc676..5b5fa8c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -101,8 +101,9 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
         (a, f) =>
           toCatalystRDD(l, a, t.buildScan(a.map(_.name).toArray, f, t.paths, 
confBroadcast))) :: Nil
 
-    case l @ LogicalRelation(t: TableScan) =>
-      execution.PhysicalRDD(l.output, toCatalystRDD(l, t.buildScan())) :: Nil
+    case l @ LogicalRelation(baseRelation: TableScan) =>
+      execution.PhysicalRDD.createFromDataSource(
+        l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: 
Nil
 
     case i @ logical.InsertIntoTable(
       l @ LogicalRelation(t: InsertableRelation), part, query, overwrite, 
false) if part.isEmpty =>
@@ -169,7 +170,10 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
         new UnionRDD(relation.sqlContext.sparkContext, perPartitionRows)
       }
 
-    execution.PhysicalRDD(projections.map(_.toAttribute), unionedRows)
+    execution.PhysicalRDD.createFromDataSource(
+      projections.map(_.toAttribute),
+      unionedRows,
+      logicalRelation.relation)
   }
 
   // TODO: refactor this thing. It is very complicated because it does 
projection internally.
@@ -299,14 +303,18 @@ private[sql] object DataSourceStrategy extends Strategy 
with Logging {
         projects.asInstanceOf[Seq[Attribute]] // Safe due to if above.
           .map(relation.attributeMap)            // Match original case of 
attributes.
 
-      val scan = execution.PhysicalRDD(projects.map(_.toAttribute),
-        scanBuilder(requestedColumns, pushedFilters))
+      val scan = execution.PhysicalRDD.createFromDataSource(
+        projects.map(_.toAttribute),
+        scanBuilder(requestedColumns, pushedFilters),
+        relation.relation)
       filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
     } else {
       val requestedColumns = (projectSet ++ 
filterSet).map(relation.attributeMap).toSeq
 
-      val scan = execution.PhysicalRDD(requestedColumns,
-        scanBuilder(requestedColumns, pushedFilters))
+      val scan = execution.PhysicalRDD.createFromDataSource(
+        requestedColumns,
+        scanBuilder(requestedColumns, pushedFilters),
+        relation.relation)
       execution.Project(projects, filterCondition.map(execution.Filter(_, 
scan)).getOrElse(scan))
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/05d04e10/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index c04557e..0b29296 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -383,7 +383,7 @@ private[sql] abstract class OutputWriterInternal extends 
OutputWriter {
 abstract class HadoopFsRelation private[sql](maybePartitionSpec: 
Option[PartitionSpec])
   extends BaseRelation with Logging {
 
-  logInfo("Constructing HadoopFsRelation")
+  override def toString: String = getClass.getSimpleName + paths.mkString("[", 
",", "]")
 
   def this() = this(None)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/05d04e10/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
index 8208b25..322966f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
@@ -32,9 +32,9 @@ class RowFormatConvertersSuite extends SparkPlanTest {
     case c: ConvertToSafe => c
   }
 
-  private val outputsSafe = ExternalSort(Nil, false, PhysicalRDD(Seq.empty, 
null))
+  private val outputsSafe = ExternalSort(Nil, false, PhysicalRDD(Seq.empty, 
null, "name"))
   assert(!outputsSafe.outputsUnsafeRows)
-  private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, 
null))
+  private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, 
null, "name"))
   assert(outputsUnsafe.outputsUnsafeRows)
 
   test("planner should insert unsafe->safe conversions when required") {

http://git-wip-us.apache.org/repos/asf/spark/blob/05d04e10/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
index 6972112..8215dd6 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
@@ -36,7 +36,7 @@ class HiveExplainSuite extends QueryTest {
                    "== Analyzed Logical Plan ==",
                    "== Optimized Logical Plan ==",
                    "== Physical Plan ==",
-                   "Code Generation", "== RDD ==")
+                   "Code Generation")
   }
 
   test("explain create table command") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to