Repository: spark Updated Branches: refs/heads/master c09b31eb8 -> 5df99bd36
[SPARK-20703][SQL][FOLLOW-UP] Associate metrics with data writes onto DataFrameWriter operations ## What changes were proposed in this pull request? Remove time metrics since it seems no way to measure it in non per-row tracking. ## How was this patch tested? Existing tests. Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Liang-Chi Hsieh <vii...@gmail.com> Closes #18558 from viirya/SPARK-20703-followup. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5df99bd3 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5df99bd3 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5df99bd3 Branch: refs/heads/master Commit: 5df99bd364561c6f4c02308149ba5eb71f89247e Parents: c09b31e Author: Liang-Chi Hsieh <vii...@gmail.com> Authored: Fri Jul 7 13:12:20 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Fri Jul 7 13:12:20 2017 +0800 ---------------------------------------------------------------------- .../execution/command/DataWritingCommand.scala | 10 --------- .../datasources/FileFormatWriter.scala | 22 +++----------------- .../sql/hive/execution/SQLMetricsSuite.scala | 3 --- 3 files changed, 3 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5df99bd3/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala index 0c381a2..700f7f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala @@ -30,7 +30,6 @@ trait DataWritingCommand extends RunnableCommand { override lazy val metrics: Map[String, SQLMetric] = { val sparkContext = SparkContext.getActive.get Map( - "avgTime" -> SQLMetrics.createMetric(sparkContext, "average writing time (ms)"), "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files"), "numOutputBytes" -> SQLMetrics.createMetric(sparkContext, "bytes of written output"), "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), @@ -47,23 +46,14 @@ trait DataWritingCommand extends RunnableCommand { var numFiles = 0 var totalNumBytes: Long = 0L var totalNumOutput: Long = 0L - var totalWritingTime: Long = 0L writeSummaries.foreach { summary => numPartitions += summary.updatedPartitions.size numFiles += summary.numOutputFile totalNumBytes += summary.numOutputBytes totalNumOutput += summary.numOutputRows - totalWritingTime += summary.totalWritingTime } - val avgWritingTime = if (numFiles > 0) { - (totalWritingTime / numFiles).toLong - } else { - 0L - } - - metrics("avgTime").add(avgWritingTime) metrics("numFiles").add(numFiles) metrics("numOutputBytes").add(totalNumBytes) metrics("numOutputRows").add(totalNumOutput) http://git-wip-us.apache.org/repos/asf/spark/blob/5df99bd3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 6486663..9eb9eae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -275,8 +275,6 @@ object FileFormatWriter extends Logging { /** * The data structures used to measure metrics during writing. */ - protected var totalWritingTime: Long = 0L - protected var timeOnCurrentFile: Long = 0L protected var numOutputRows: Long = 0L protected var numOutputBytes: Long = 0L @@ -343,9 +341,7 @@ object FileFormatWriter extends Logging { } val internalRow = iter.next() - val startTime = System.nanoTime() currentWriter.write(internalRow) - timeOnCurrentFile += (System.nanoTime() - startTime) recordsInFile += 1 } releaseResources() @@ -355,17 +351,13 @@ object FileFormatWriter extends Logging { updatedPartitions = Set.empty, numOutputFile = fileCounter + 1, numOutputBytes = numOutputBytes, - numOutputRows = numOutputRows, - totalWritingTime = totalWritingTime) + numOutputRows = numOutputRows) } override def releaseResources(): Unit = { if (currentWriter != null) { try { - val startTime = System.nanoTime() currentWriter.close() - totalWritingTime += (timeOnCurrentFile + System.nanoTime() - startTime) / 1000 / 1000 - timeOnCurrentFile = 0 numOutputBytes += getFileSize(taskAttemptContext.getConfiguration, currentPath) } finally { currentWriter = null @@ -504,9 +496,7 @@ object FileFormatWriter extends Logging { releaseResources() newOutputWriter(currentPartColsAndBucketId, getPartPath, fileCounter, updatedPartitions) } - val startTime = System.nanoTime() currentWriter.write(getOutputRow(row)) - timeOnCurrentFile += (System.nanoTime() - startTime) recordsInFile += 1 } if (currentPartColsAndBucketId != null) { @@ -519,17 +509,13 @@ object FileFormatWriter extends Logging { updatedPartitions = updatedPartitions.toSet, numOutputFile = totalFileCounter, numOutputBytes = numOutputBytes, - numOutputRows = numOutputRows, - totalWritingTime = totalWritingTime) + numOutputRows = numOutputRows) } override def releaseResources(): Unit = { if (currentWriter != null) { try { - val startTime = System.nanoTime() currentWriter.close() - totalWritingTime += (timeOnCurrentFile + System.nanoTime() - startTime) / 1000 / 1000 - timeOnCurrentFile = 0 numOutputBytes += getFileSize(taskAttemptContext.getConfiguration, currentPath) } finally { currentWriter = null @@ -547,11 +533,9 @@ object FileFormatWriter extends Logging { * @param numOutputFile the total number of files. * @param numOutputRows the number of output rows. * @param numOutputBytes the bytes of output data. - * @param totalWritingTime the total writing time in ms. */ case class ExecutedWriteSummary( updatedPartitions: Set[String], numOutputFile: Int, numOutputRows: Long, - numOutputBytes: Long, - totalWritingTime: Long) + numOutputBytes: Long) http://git-wip-us.apache.org/repos/asf/spark/blob/5df99bd3/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala index 1ef1988..24c0385 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLMetricsSuite.scala @@ -65,9 +65,6 @@ class SQLMetricsSuite extends SQLTestUtils with TestHiveSingleton { val totalNumBytesMetric = executedNode.metrics.find(_.name == "bytes of written output").get val totalNumBytes = metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "").toInt assert(totalNumBytes > 0) - val writingTimeMetric = executedNode.metrics.find(_.name == "average writing time (ms)").get - val writingTime = metrics(writingTimeMetric.accumulatorId).replaceAll(",", "").toInt - assert(writingTime >= 0) } private def testMetricsNonDynamicPartition( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org