Repository: spark Updated Branches: refs/heads/master d9ca9fd3e -> 61e0bdcff
[SPARK-14476][SQL] Improve the physical plan visualization by adding meta info like table name and file path for data source. ## What changes were proposed in this pull request? Improve the physical plan visualization by adding meta info like table name and file path for data source. Meta info InputPaths and TableName are newly added. Example: ``` scala> spark.range(10).write.saveAsTable("tt") scala> spark.sql("select * from tt").explain() == Physical Plan == WholeStageCodegen : +- BatchedScan HadoopFiles[id#13L] Format: ParquetFormat, InputPaths: file:/home/xzhong10/spark-linux/assembly/spark-warehouse/tt, PushedFilters: [], ReadSchema: struct<id:bigint>, TableName: default.tt ``` ## How was this patch tested? manual tests. Changes for UI: Before:  After:   Author: Sean Zhong <clock...@gmail.com> Closes #12947 from clockfly/spark-14476. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/61e0bdcf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/61e0bdcf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/61e0bdcf Branch: refs/heads/master Commit: 61e0bdcff2ed57b22541fb3c03146d6eec2bb70f Parents: d9ca9fd Author: Sean Zhong <clock...@gmail.com> Authored: Tue May 10 21:50:53 2016 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Tue May 10 21:50:53 2016 -0700 ---------------------------------------------------------------------- .../sql/execution/ui/static/spark-sql-viz.css | 5 +++ .../spark/sql/execution/ExistingRDD.scala | 37 ++++++++++++++------ .../datasources/DataSourceStrategy.scala | 9 ++--- .../datasources/FileSourceStrategy.scala | 19 ++++++---- .../datasources/fileSourceInterfaces.scala | 10 ++++-- 5 files changed, 53 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/61e0bdcf/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css ---------------------------------------------------------------------- diff --git a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css index 303f8eb..594e747 100644 --- a/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css +++ b/sql/core/src/main/resources/org/apache/spark/sql/execution/ui/static/spark-sql-viz.css @@ -41,3 +41,8 @@ stroke: #444; stroke-width: 1.5px; } + +/* Breaks the long string like file path when showing tooltips */ +.tooltip-inner { + word-wrap:break-word; +} http://git-wip-us.apache.org/repos/asf/spark/blob/61e0bdcf/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index d6516f2..85af4fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.execution +import org.apache.commons.lang.StringUtils + import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Row, SparkSession, SQLContext} -import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} @@ -127,8 +129,11 @@ private[sql] case class RDDScanExec( private[sql] trait DataSourceScanExec extends LeafExecNode { val rdd: RDD[InternalRow] val relation: BaseRelation + val metastoreTableIdentifier: Option[TableIdentifier] - override val nodeName: String = relation.toString + override val nodeName: String = { + s"Scan $relation ${metastoreTableIdentifier.map(_.unquotedString).getOrElse("")}" + } // Ignore rdd when checking results override def sameResult(plan: SparkPlan): Boolean = plan match { @@ -143,7 +148,8 @@ private[sql] case class RowDataSourceScanExec( rdd: RDD[InternalRow], @transient relation: BaseRelation, override val outputPartitioning: Partitioning, - override val metadata: Map[String, String] = Map.empty) + override val metadata: Map[String, String], + override val metastoreTableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with CodegenSupport { private[sql] override lazy val metrics = @@ -174,8 +180,11 @@ private[sql] case class RowDataSourceScanExec( } override def simpleString: String = { - val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value" - s"Scan $nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}" + val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield { + key + ": " + StringUtils.abbreviate(value, 100) + } + + s"$nodeName${output.mkString("[", ",", "]")}${metadataEntries.mkString(" ", ", ", "")}" } override def inputRDDs(): Seq[RDD[InternalRow]] = { @@ -212,7 +221,8 @@ private[sql] case class BatchedDataSourceScanExec( rdd: RDD[InternalRow], @transient relation: BaseRelation, override val outputPartitioning: Partitioning, - override val metadata: Map[String, String] = Map.empty) + override val metadata: Map[String, String], + override val metastoreTableIdentifier: Option[TableIdentifier]) extends DataSourceScanExec with CodegenSupport { private[sql] override lazy val metrics = @@ -224,9 +234,11 @@ private[sql] case class BatchedDataSourceScanExec( } override def simpleString: String = { - val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value" + val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield { + key + ": " + StringUtils.abbreviate(value, 100) + } val metadataStr = metadataEntries.mkString(" ", ", ", "") - s"BatchedScan $nodeName${output.mkString("[", ",", "]")}$metadataStr" + s"Batched$nodeName${output.mkString("[", ",", "]")}$metadataStr" } override def inputRDDs(): Seq[RDD[InternalRow]] = { @@ -325,7 +337,8 @@ private[sql] object DataSourceScanExec { output: Seq[Attribute], rdd: RDD[InternalRow], relation: BaseRelation, - metadata: Map[String, String] = Map.empty): DataSourceScanExec = { + metadata: Map[String, String] = Map.empty, + metastoreTableIdentifier: Option[TableIdentifier] = None): DataSourceScanExec = { val outputPartitioning = { val bucketSpec = relation match { // TODO: this should be closer to bucket planning. @@ -351,9 +364,11 @@ private[sql] object DataSourceScanExec { relation match { case r: HadoopFsRelation if r.fileFormat.supportBatch(r.sparkSession, StructType.fromAttributes(output)) => - BatchedDataSourceScanExec(output, rdd, relation, outputPartitioning, metadata) + BatchedDataSourceScanExec( + output, rdd, relation, outputPartitioning, metadata, metastoreTableIdentifier) case _ => - RowDataSourceScanExec(output, rdd, relation, outputPartitioning, metadata) + RowDataSourceScanExec( + output, rdd, relation, outputPartitioning, metadata, metastoreTableIdentifier) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/61e0bdcf/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 9bebd74..bc249f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -192,11 +192,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { pairs += (PUSHED_FILTERS -> pushedFilters.mkString("[", ", ", "]")) } - relation.relation match { - case r: HadoopFsRelation => pairs += INPUT_PATHS -> r.location.paths.mkString(", ") - case _ => - } - pairs.toMap } @@ -217,7 +212,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val scan = execution.DataSourceScanExec.create( projects.map(_.toAttribute), scanBuilder(requestedColumns, candidatePredicates, pushedFilters), - relation.relation, metadata) + relation.relation, metadata, relation.metastoreTableIdentifier) filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan) } else { // Don't request columns that are only referenced by pushed filters. @@ -227,7 +222,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val scan = execution.DataSourceScanExec.create( requestedColumns, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), - relation.relation, metadata) + relation.relation, metadata, relation.metastoreTableIdentifier) execution.ProjectExec( projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)) } http://git-wip-us.apache.org/repos/asf/spark/blob/61e0bdcf/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 8a93c6f..350508c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -27,7 +27,9 @@ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan} +import org.apache.spark.sql.execution.DataSourceScanExec +import org.apache.spark.sql.execution.DataSourceScanExec.{INPUT_PATHS, PUSHED_FILTERS} +import org.apache.spark.sql.execution.SparkPlan /** * A strategy for planning scans over collections of files that might be partitioned or bucketed @@ -54,7 +56,8 @@ import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan} */ private[sql] object FileSourceStrategy extends Strategy with Logging { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, _)) => + case PhysicalOperation(projects, filters, + l @ LogicalRelation(files: HadoopFsRelation, _, table)) => // Filters on this relation fall into four categories based on where we can use them to avoid // reading unneeded data: // - partition keys only - used to prune directories to read @@ -192,6 +195,12 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { partitions } + val meta = Map( + "Format" -> files.fileFormat.toString, + "ReadSchema" -> prunedDataSchema.simpleString, + PUSHED_FILTERS -> pushedDownFilters.mkString("[", ", ", "]"), + INPUT_PATHS -> files.location.paths.mkString(", ")) + val scan = DataSourceScanExec.create( readDataColumns ++ partitionColumns, @@ -200,10 +209,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { readFile, plannedPartitions), files, - Map( - "Format" -> files.fileFormat.toString, - "PushedFilters" -> pushedDownFilters.mkString("[", ", ", "]"), - "ReadSchema" -> prunedDataSchema.simpleString)) + meta, + table) val afterScanFilter = afterScanFilters.toSeq.reduceOption(expressions.And) val withFilter = afterScanFilter.map(execution.FilterExec(_, scan)).getOrElse(scan) http://git-wip-us.apache.org/repos/asf/spark/blob/61e0bdcf/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala index c87e672..b516297 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, Inter import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection import org.apache.spark.sql.execution.FileRelation -import org.apache.spark.sql.sources.{BaseRelation, Filter} +import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, Filter} import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.SerializableConfiguration @@ -157,8 +157,12 @@ case class HadoopFsRelation( def refresh(): Unit = location.refresh() - override def toString: String = - s"HadoopFiles" + override def toString: String = { + fileFormat match { + case source: DataSourceRegister => source.shortName() + case _ => "HadoopFiles" + } + } /** Returns the list of files that will be read when scanning this relation. */ override def inputFiles: Array[String] = --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org