Repository: spark Updated Branches: refs/heads/master c734fc504 -> a8a765b3f
[SPARK-20151][SQL] Account for partition pruning in scan metadataTime metrics ## What changes were proposed in this pull request? After SPARK-20136, we report metadata timing metrics in scan operator. However, that timing metric doesn't include one of the most important part of metadata, which is partition pruning. This patch adds that time measurement to the scan metrics. ## How was this patch tested? N/A - I tried adding a test in SQLMetricsSuite but it was extremely convoluted to the point that I'm not sure if this is worth it. Author: Reynold Xin <r...@databricks.com> Closes #17476 from rxin/SPARK-20151. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a8a765b3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a8a765b3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a8a765b3 Branch: refs/heads/master Commit: a8a765b3f302c078cb9519c4a17912cd38b9680c Parents: c734fc5 Author: Reynold Xin <r...@databricks.com> Authored: Thu Mar 30 23:09:33 2017 -0700 Committer: Reynold Xin <r...@databricks.com> Committed: Thu Mar 30 23:09:33 2017 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/execution/DataSourceScanExec.scala | 5 +++-- .../sql/execution/datasources/CatalogFileIndex.scala | 7 +++++-- .../spark/sql/execution/datasources/FileIndex.scala | 10 ++++++++++ 3 files changed, 18 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a8a765b3/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 2391514..2fa660c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -172,12 +172,13 @@ case class FileSourceScanExec( } @transient private lazy val selectedPartitions: Seq[PartitionDirectory] = { + val optimizerMetadataTimeNs = relation.location.metadataOpsTimeNs.getOrElse(0L) val startTime = System.nanoTime() val ret = relation.location.listFiles(partitionFilters, dataFilters) - val timeTaken = (System.nanoTime() - startTime) / 1000 / 1000 + val timeTakenMs = ((System.nanoTime() - startTime) + optimizerMetadataTimeNs) / 1000 / 1000 metrics("numFiles").add(ret.map(_.files.size.toLong).sum) - metrics("metadataTime").add(timeTaken) + metrics("metadataTime").add(timeTakenMs) val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, http://git-wip-us.apache.org/repos/asf/spark/blob/a8a765b3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala index db0254f..4046396 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala @@ -69,6 +69,7 @@ class CatalogFileIndex( */ def filterPartitions(filters: Seq[Expression]): InMemoryFileIndex = { if (table.partitionColumnNames.nonEmpty) { + val startTime = System.nanoTime() val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter( table.identifier, filters) val partitions = selectedPartitions.map { p => @@ -79,8 +80,9 @@ class CatalogFileIndex( path.makeQualified(fs.getUri, fs.getWorkingDirectory)) } val partitionSpec = PartitionSpec(partitionSchema, partitions) + val timeNs = System.nanoTime() - startTime new PrunedInMemoryFileIndex( - sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec) + sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec, Option(timeNs)) } else { new InMemoryFileIndex( sparkSession, rootPaths, table.storage.properties, partitionSchema = None) @@ -111,7 +113,8 @@ private class PrunedInMemoryFileIndex( sparkSession: SparkSession, tableBasePath: Path, fileStatusCache: FileStatusCache, - override val partitionSpec: PartitionSpec) + override val partitionSpec: PartitionSpec, + override val metadataOpsTimeNs: Option[Long]) extends InMemoryFileIndex( sparkSession, partitionSpec.partitions.map(_.path), http://git-wip-us.apache.org/repos/asf/spark/blob/a8a765b3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala index 6b99d38..094a66a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileIndex.scala @@ -72,4 +72,14 @@ trait FileIndex { /** Schema of the partitioning columns, or the empty schema if the table is not partitioned. */ def partitionSchema: StructType + + /** + * Returns an optional metadata operation time, in nanoseconds, for listing files. + * + * We do file listing in query optimization (in order to get the proper statistics) and we want + * to account for file listing time in physical execution (as metrics). To do that, we save the + * file listing time in some implementations and physical execution calls it in this method + * to update the metrics. + */ + def metadataOpsTimeNs: Option[Long] = None } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org