Repository: spark
Updated Branches:
  refs/heads/master c734fc504 -> a8a765b3f


[SPARK-20151][SQL] Account for partition pruning in scan metadataTime metrics

## What changes were proposed in this pull request?
After SPARK-20136, we report metadata timing metrics in scan operator. However, 
that timing metric doesn't include one of the most important part of metadata, 
which is partition pruning. This patch adds that time measurement to the scan 
metrics.

## How was this patch tested?
N/A - I tried adding a test in SQLMetricsSuite but it was extremely convoluted 
to the point that I'm not sure if this is worth it.

Author: Reynold Xin <r...@databricks.com>

Closes #17476 from rxin/SPARK-20151.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a8a765b3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a8a765b3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a8a765b3

Branch: refs/heads/master
Commit: a8a765b3f302c078cb9519c4a17912cd38b9680c
Parents: c734fc5
Author: Reynold Xin <r...@databricks.com>
Authored: Thu Mar 30 23:09:33 2017 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Mar 30 23:09:33 2017 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/execution/DataSourceScanExec.scala   |  5 +++--
 .../sql/execution/datasources/CatalogFileIndex.scala      |  7 +++++--
 .../spark/sql/execution/datasources/FileIndex.scala       | 10 ++++++++++
 3 files changed, 18 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a8a765b3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 2391514..2fa660c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -172,12 +172,13 @@ case class FileSourceScanExec(
   }
 
   @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = {
+    val optimizerMetadataTimeNs = 
relation.location.metadataOpsTimeNs.getOrElse(0L)
     val startTime = System.nanoTime()
     val ret = relation.location.listFiles(partitionFilters, dataFilters)
-    val timeTaken = (System.nanoTime() - startTime) / 1000 / 1000
+    val timeTakenMs = ((System.nanoTime() - startTime) + 
optimizerMetadataTimeNs) / 1000 / 1000
 
     metrics("numFiles").add(ret.map(_.files.size.toLong).sum)
-    metrics("metadataTime").add(timeTaken)
+    metrics("metadataTime").add(timeTakenMs)
 
     val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
     SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,

http://git-wip-us.apache.org/repos/asf/spark/blob/a8a765b3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index db0254f..4046396 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -69,6 +69,7 @@ class CatalogFileIndex(
    */
   def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = {
     if (table.partitionColumnNames.nonEmpty) {
+      val startTime = System.nanoTime()
       val selectedPartitions = 
sparkSession.sessionState.catalog.listPartitionsByFilter(
         table.identifier, filters)
       val partitions = selectedPartitions.map { p =>
@@ -79,8 +80,9 @@ class CatalogFileIndex(
           path.makeQualified(fs.getUri, fs.getWorkingDirectory))
       }
       val partitionSpec = PartitionSpec(partitionSchema, partitions)
+      val timeNs = System.nanoTime() - startTime
       new PrunedInMemoryFileIndex(
-        sparkSession, new Path(baseLocation.get), fileStatusCache, 
partitionSpec)
+        sparkSession, new Path(baseLocation.get), fileStatusCache, 
partitionSpec, Option(timeNs))
     } else {
       new InMemoryFileIndex(
         sparkSession, rootPaths, table.storage.properties, partitionSchema = 
None)
@@ -111,7 +113,8 @@ private class PrunedInMemoryFileIndex(
     sparkSession: SparkSession,
     tableBasePath: Path,
     fileStatusCache: FileStatusCache,
-    override val partitionSpec: PartitionSpec)
+    override val partitionSpec: PartitionSpec,
+    override val metadataOpsTimeNs: Option[Long])
   extends InMemoryFileIndex(
     sparkSession,
     partitionSpec.partitions.map(_.path),

http://git-wip-us.apache.org/repos/asf/spark/blob/a8a765b3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
index 6b99d38..094a66a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala
@@ -72,4 +72,14 @@ trait FileIndex {
 
   /** Schema of the partitioning columns, or the empty schema if the table is 
not partitioned. */
   def partitionSchema: StructType
+
+  /**
+   * Returns an optional metadata operation time, in nanoseconds, for listing 
files.
+   *
+   * We do file listing in query optimization (in order to get the proper 
statistics) and we want
+   * to account for file listing time in physical execution (as metrics). To 
do that, we save the
+   * file listing time in some implementations and physical execution calls it 
in this method
+   * to update the metrics.
+   */
+  def metadataOpsTimeNs: Option[Long] = None
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to