Repository: spark
Updated Branches:
  refs/heads/master 22f07fefe -> 60977889e


[SPARK-20136][SQL] Add num files and metadata operation timing to scan operator 
metrics

## What changes were proposed in this pull request?
This patch adds explicit metadata operation timing and number of files in data 
source metrics. Those would be useful to include for performance profiling.

Screenshot of a UI with this change (num files and metadata time are new 
metrics):

<img width="321" alt="screen shot 2017-03-29 at 12 29 28 am" 
src="https://cloud.githubusercontent.com/assets/323388/24443272/d4ea58c0-1416-11e7-8940-ecb69375554a.png";>

## How was this patch tested?
N/A

Author: Reynold Xin <r...@databricks.com>

Closes #17465 from rxin/SPARK-20136.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60977889
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60977889
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60977889

Branch: refs/heads/master
Commit: 60977889eaecdf28adc6164310eaa5afed488fa1
Parents: 22f07fe
Author: Reynold Xin <r...@databricks.com>
Authored: Wed Mar 29 19:06:51 2017 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Wed Mar 29 19:06:51 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/DataSourceScanExec.scala  | 18 ++++++++++++++++--
 1 file changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/60977889/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 28156b2..2391514 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -171,8 +171,20 @@ case class FileSourceScanExec(
     false
   }
 
-  @transient private lazy val selectedPartitions =
-    relation.location.listFiles(partitionFilters, dataFilters)
+  @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
+    val startTime = System.nanoTime()
+    val ret = relation.location.listFiles(partitionFilters, dataFilters)
+    val timeTaken = (System.nanoTime() - startTime) / 1000 / 1000
+
+    metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
+    metrics("metadataTime").add(timeTaken)
+
+    val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
+    SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
+      metrics("numFiles") :: metrics("metadataTime") :: Nil)
+
+    ret
+  }
 
   override val (outputPartitioning, outputOrdering): (Partitioning, 
Seq[SortOrder]) = {
     val bucketSpec = if 
(relation.sparkSession.sessionState.conf.bucketingEnabled) {
@@ -293,6 +305,8 @@ case class FileSourceScanExec(
 
   override lazy val metrics =
     Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of 
output rows"),
+      "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files"),
+      "metadataTime" -> SQLMetrics.createMetric(sparkContext, "metadata time 
(ms)"),
       "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
 
   protected override def doExecute(): RDD[InternalRow] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to