Repository: spark Updated Branches: refs/heads/branch-1.3 11dbf7137 -> 9fa29a629
[SPARK-4874] [CORE] Collect record count metrics Collects record counts for both Input/Output and Shuffle Metrics. For the input/output metrics, it just appends the counter every time the iterators get accessed. For shuffle on the write side, we count the metrics post aggregation (after a map side combine) and on the read side we count the metrics pre aggregation. This allows both the bytes read/written metrics and the records read/written to line up. For backwards compatibility, if we deserialize an older event that doesn't have record metrics, we set the metric to -1. Author: Kostas Sakellis <kos...@cloudera.com> Closes #4067 from ksakellis/kostas-spark-4874 and squashes the following commits: bd919be [Kostas Sakellis] Changed 'Records Read' in shuffleReadMetrics json output to 'Total Records Read' dad4d57 [Kostas Sakellis] Add a comment and check to BlockObjectWriter so that it cannot be reopend. 6f236a1 [Kostas Sakellis] Renamed _recordsWritten in ShuffleWriteMetrics to be more consistent 70620a0 [Kostas Sakellis] CR Feedback 17faa3a [Kostas Sakellis] Removed AtomicLong in favour of using Long b6f9923 [Kostas Sakellis] Merge AfterNextInterceptingIterator with InterruptableIterator to save a function call 46c8186 [Kostas Sakellis] Combined Bytes and # records into one column 57551c1 [Kostas Sakellis] Conforms to SPARK-3288 6cdb44e [Kostas Sakellis] Removed the generic InterceptingIterator and repalced it with specific implementation 1aa273c [Kostas Sakellis] CR Feedback 1bb78b1 [Kostas Sakellis] [SPARK-4874] [CORE] Collect record count metrics (cherry picked from commit dcd1e42d6b6ac08d2c0736bf61a15f515a1f222b) Signed-off-by: Patrick Wendell <patr...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9fa29a62 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9fa29a62 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9fa29a62 Branch: refs/heads/branch-1.3 Commit: 9fa29a629a03893c226c4d42228daaf13ee41e5c Parents: 11dbf71 Author: Kostas Sakellis <kos...@cloudera.com> Authored: Fri Feb 6 14:31:20 2015 -0800 Committer: Patrick Wendell <patr...@databricks.com> Committed: Fri Feb 6 14:31:29 2015 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/CacheManager.scala | 11 +- .../org/apache/spark/executor/TaskMetrics.scala | 54 ++++-- .../scala/org/apache/spark/rdd/HadoopRDD.scala | 6 +- .../org/apache/spark/rdd/NewHadoopRDD.scala | 6 +- .../org/apache/spark/rdd/PairRDDFunctions.scala | 12 +- .../shuffle/hash/BlockStoreShuffleFetcher.scala | 8 +- .../org/apache/spark/storage/BlockManager.scala | 2 +- .../spark/storage/BlockObjectWriter.scala | 24 ++- .../scala/org/apache/spark/ui/ToolTips.scala | 9 +- .../org/apache/spark/ui/exec/ExecutorsTab.scala | 6 + .../apache/spark/ui/jobs/ExecutorTable.scala | 86 +++++++-- .../spark/ui/jobs/JobProgressListener.scala | 24 +++ .../org/apache/spark/ui/jobs/StagePage.scala | 148 +++++++++------ .../scala/org/apache/spark/ui/jobs/UIData.scala | 14 ++ .../org/apache/spark/util/JsonProtocol.scala | 19 +- .../spark/util/collection/ExternalSorter.scala | 1 + .../spark/metrics/InputOutputMetricsSuite.scala | 186 ++++++++++++++++--- .../spark/storage/BlockObjectWriterSuite.scala | 21 +++ .../ui/jobs/JobProgressListenerSuite.scala | 2 +- .../apache/spark/util/JsonProtocolSuite.scala | 55 +++++- 20 files changed, 548 insertions(+), 146 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9fa29a62/core/src/main/scala/org/apache/spark/CacheManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index a0c0372..a96d754 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -47,10 +47,15 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { val inputMetrics = blockResult.inputMetrics val existingMetrics = context.taskMetrics .getInputMetricsForReadMethod(inputMetrics.readMethod) - existingMetrics.addBytesRead(inputMetrics.bytesRead) - - new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]]) + existingMetrics.incBytesRead(inputMetrics.bytesRead) + val iter = blockResult.data.asInstanceOf[Iterator[T]] + new InterruptibleIterator[T](context, iter) { + override def next(): T = { + existingMetrics.incRecordsRead(1) + delegate.next() + } + } case None => // Acquire a lock for loading this partition // If another thread already holds the lock, wait for it to finish return its results http://git-wip-us.apache.org/repos/asf/spark/blob/9fa29a62/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 97912c6..d056591 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -194,18 +194,19 @@ class TaskMetrics extends Serializable { /** * Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics. */ - private[spark] def updateShuffleReadMetrics() = synchronized { + private[spark] def updateShuffleReadMetrics(): Unit = synchronized { val merged = new ShuffleReadMetrics() for (depMetrics <- depsShuffleReadMetrics) { merged.incFetchWaitTime(depMetrics.fetchWaitTime) merged.incLocalBlocksFetched(depMetrics.localBlocksFetched) merged.incRemoteBlocksFetched(depMetrics.remoteBlocksFetched) merged.incRemoteBytesRead(depMetrics.remoteBytesRead) + merged.incRecordsRead(depMetrics.recordsRead) } _shuffleReadMetrics = Some(merged) } - private[spark] def updateInputMetrics() = synchronized { + private[spark] def updateInputMetrics(): Unit = synchronized { inputMetrics.foreach(_.updateBytesRead()) } } @@ -242,27 +243,31 @@ object DataWriteMethod extends Enumeration with Serializable { @DeveloperApi case class InputMetrics(readMethod: DataReadMethod.Value) { - private val _bytesRead: AtomicLong = new AtomicLong() + /** + * This is volatile so that it is visible to the updater thread. + */ + @volatile @transient var bytesReadCallback: Option[() => Long] = None /** * Total bytes read. */ - def bytesRead: Long = _bytesRead.get() - @volatile @transient var bytesReadCallback: Option[() => Long] = None + private var _bytesRead: Long = _ + def bytesRead: Long = _bytesRead + def incBytesRead(bytes: Long) = _bytesRead += bytes /** - * Adds additional bytes read for this read method. + * Total records read. */ - def addBytesRead(bytes: Long) = { - _bytesRead.addAndGet(bytes) - } + private var _recordsRead: Long = _ + def recordsRead: Long = _recordsRead + def incRecordsRead(records: Long) = _recordsRead += records /** * Invoke the bytesReadCallback and mutate bytesRead. */ def updateBytesRead() { bytesReadCallback.foreach { c => - _bytesRead.set(c()) + _bytesRead = c() } } @@ -287,6 +292,13 @@ case class OutputMetrics(writeMethod: DataWriteMethod.Value) { private var _bytesWritten: Long = _ def bytesWritten = _bytesWritten private[spark] def setBytesWritten(value : Long) = _bytesWritten = value + + /** + * Total records written + */ + private var _recordsWritten: Long = 0L + def recordsWritten = _recordsWritten + private[spark] def setRecordsWritten(value: Long) = _recordsWritten = value } /** @@ -301,7 +313,7 @@ class ShuffleReadMetrics extends Serializable { private var _remoteBlocksFetched: Int = _ def remoteBlocksFetched = _remoteBlocksFetched private[spark] def incRemoteBlocksFetched(value: Int) = _remoteBlocksFetched += value - private[spark] def defRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value + private[spark] def decRemoteBlocksFetched(value: Int) = _remoteBlocksFetched -= value /** * Number of local blocks fetched in this shuffle by this task @@ -309,8 +321,7 @@ class ShuffleReadMetrics extends Serializable { private var _localBlocksFetched: Int = _ def localBlocksFetched = _localBlocksFetched private[spark] def incLocalBlocksFetched(value: Int) = _localBlocksFetched += value - private[spark] def defLocalBlocksFetched(value: Int) = _localBlocksFetched -= value - + private[spark] def decLocalBlocksFetched(value: Int) = _localBlocksFetched -= value /** * Time the task spent waiting for remote shuffle blocks. This only includes the time @@ -334,6 +345,14 @@ class ShuffleReadMetrics extends Serializable { * Number of blocks fetched in this shuffle by this task (remote or local) */ def totalBlocksFetched = _remoteBlocksFetched + _localBlocksFetched + + /** + * Total number of records read from the shuffle by this task + */ + private var _recordsRead: Long = _ + def recordsRead = _recordsRead + private[spark] def incRecordsRead(value: Long) = _recordsRead += value + private[spark] def decRecordsRead(value: Long) = _recordsRead -= value } /** @@ -358,5 +377,12 @@ class ShuffleWriteMetrics extends Serializable { private[spark] def incShuffleWriteTime(value: Long) = _shuffleWriteTime += value private[spark] def decShuffleWriteTime(value: Long) = _shuffleWriteTime -= value - + /** + * Total number of records written to the shuffle by this task + */ + @volatile private var _shuffleRecordsWritten: Long = _ + def shuffleRecordsWritten = _shuffleRecordsWritten + private[spark] def incShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten += value + private[spark] def decShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten -= value + private[spark] def setShuffleRecordsWritten(value: Long) = _shuffleRecordsWritten = value } http://git-wip-us.apache.org/repos/asf/spark/blob/9fa29a62/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 89adddc..486e86c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -247,7 +247,9 @@ class HadoopRDD[K, V]( case eof: EOFException => finished = true } - + if (!finished) { + inputMetrics.incRecordsRead(1) + } (key, value) } @@ -261,7 +263,7 @@ class HadoopRDD[K, V]( // If we can't get the bytes read from the FS stats, fall back to the split size, // which may be inaccurate. try { - inputMetrics.addBytesRead(split.inputSplit.value.getLength) + inputMetrics.incBytesRead(split.inputSplit.value.getLength) } catch { case e: java.io.IOException => logWarning("Unable to get input size to set InputMetrics for task", e) http://git-wip-us.apache.org/repos/asf/spark/blob/9fa29a62/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 44b9ffd..7fb9484 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -151,7 +151,9 @@ class NewHadoopRDD[K, V]( throw new java.util.NoSuchElementException("End of stream") } havePair = false - + if (!finished) { + inputMetrics.incRecordsRead(1) + } (reader.getCurrentKey, reader.getCurrentValue) } @@ -165,7 +167,7 @@ class NewHadoopRDD[K, V]( // If we can't get the bytes read from the FS stats, fall back to the split size, // which may be inaccurate. try { - inputMetrics.addBytesRead(split.serializableHadoopSplit.value.getLength) + inputMetrics.incBytesRead(split.serializableHadoopSplit.value.getLength) } catch { case e: java.io.IOException => logWarning("Unable to get input size to set InputMetrics for task", e) http://git-wip-us.apache.org/repos/asf/spark/blob/9fa29a62/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 49b88a9..955b42c 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob, OutputFormat => NewOutputFormat, -RecordWriter => NewRecordWriter} + RecordWriter => NewRecordWriter} import org.apache.spark._ import org.apache.spark.Partitioner.defaultPartitioner @@ -993,8 +993,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context) val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]] + var recordsWritten = 0L try { - var recordsWritten = 0L while (iter.hasNext) { val pair = iter.next() writer.write(pair._1, pair._2) @@ -1008,6 +1008,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } committer.commitTask(hadoopContext) bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) } + outputMetrics.setRecordsWritten(recordsWritten) 1 } : Int @@ -1065,8 +1066,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) writer.setup(context.stageId, context.partitionId, taskAttemptId) writer.open() + var recordsWritten = 0L try { - var recordsWritten = 0L while (iter.hasNext) { val record = iter.next() writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) @@ -1080,6 +1081,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } writer.commit() bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) } + outputMetrics.setRecordsWritten(recordsWritten) } self.context.runJob(self, writeToFile) @@ -1097,9 +1099,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) private def maybeUpdateOutputMetrics(bytesWrittenCallback: Option[() => Long], outputMetrics: OutputMetrics, recordsWritten: Long): Unit = { - if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0 - && bytesWrittenCallback.isDefined) { + if (recordsWritten % PairRDDFunctions.RECORDS_BETWEEN_BYTES_WRITTEN_METRIC_UPDATES == 0) { bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) } + outputMetrics.setRecordsWritten(recordsWritten) } } http://git-wip-us.apache.org/repos/asf/spark/blob/9fa29a62/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala index e3e7434..7a2c5ae 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala @@ -86,6 +86,12 @@ private[hash] object BlockStoreShuffleFetcher extends Logging { context.taskMetrics.updateShuffleReadMetrics() }) - new InterruptibleIterator[T](context, completionIter) + new InterruptibleIterator[T](context, completionIter) { + val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency() + override def next(): T = { + readMetrics.incRecordsRead(1) + delegate.next() + } + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/9fa29a62/core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 8bc5a1c..86dbd89 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -53,7 +53,7 @@ private[spark] class BlockResult( readMethod: DataReadMethod.Value, bytes: Long) { val inputMetrics = new InputMetrics(readMethod) - inputMetrics.addBytesRead(bytes) + inputMetrics.incBytesRead(bytes) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/9fa29a62/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 3198d76..8116417 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -29,7 +29,8 @@ import org.apache.spark.executor.ShuffleWriteMetrics * appending data to an existing block, and can guarantee atomicity in the case of faults * as it allows the caller to revert partial writes. * - * This interface does not support concurrent writes. + * This interface does not support concurrent writes. Also, once the writer has + * been opened, it cannot be reopened again. */ private[spark] abstract class BlockObjectWriter(val blockId: BlockId) { @@ -95,6 +96,7 @@ private[spark] class DiskBlockObjectWriter( private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null private var initialized = false + private var hasBeenClosed = false /** * Cursors used to represent positions in the file. @@ -115,11 +117,16 @@ private[spark] class DiskBlockObjectWriter( private var finalPosition: Long = -1 private var reportedPosition = initialPosition - /** Calling channel.position() to update the write metrics can be a little bit expensive, so we - * only call it every N writes */ - private var writesSinceMetricsUpdate = 0 + /** + * Keep track of number of records written and also use this to periodically + * output bytes written since the latter is expensive to do for each record. + */ + private var numRecordsWritten = 0 override def open(): BlockObjectWriter = { + if (hasBeenClosed) { + throw new IllegalStateException("Writer already closed. Cannot be reopened.") + } fos = new FileOutputStream(file, true) ts = new TimeTrackingOutputStream(fos) channel = fos.getChannel() @@ -145,6 +152,7 @@ private[spark] class DiskBlockObjectWriter( ts = null objOut = null initialized = false + hasBeenClosed = true } } @@ -168,6 +176,7 @@ private[spark] class DiskBlockObjectWriter( override def revertPartialWritesAndClose() { try { writeMetrics.decShuffleBytesWritten(reportedPosition - initialPosition) + writeMetrics.decShuffleRecordsWritten(numRecordsWritten) if (initialized) { objOut.flush() @@ -193,12 +202,11 @@ private[spark] class DiskBlockObjectWriter( } objOut.writeObject(value) + numRecordsWritten += 1 + writeMetrics.incShuffleRecordsWritten(1) - if (writesSinceMetricsUpdate == 32) { - writesSinceMetricsUpdate = 0 + if (numRecordsWritten % 32 == 0) { updateBytesWritten() - } else { - writesSinceMetricsUpdate += 1 } } http://git-wip-us.apache.org/repos/asf/spark/blob/9fa29a62/core/src/main/scala/org/apache/spark/ui/ToolTips.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala index 4307029..3a15e60 100644 --- a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala +++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala @@ -29,14 +29,15 @@ private[spark] object ToolTips { val SHUFFLE_READ_BLOCKED_TIME = "Time that the task spent blocked waiting for shuffle data to be read from remote machines." - val INPUT = "Bytes read from Hadoop or from Spark storage." + val INPUT = "Bytes and records read from Hadoop or from Spark storage." - val OUTPUT = "Bytes written to Hadoop." + val OUTPUT = "Bytes and records written to Hadoop." - val SHUFFLE_WRITE = "Bytes written to disk in order to be read by a shuffle in a future stage." + val SHUFFLE_WRITE = + "Bytes and records written to disk in order to be read by a shuffle in a future stage." val SHUFFLE_READ = - """Bytes read from remote executors. Typically less than shuffle write bytes + """Bytes and records read from remote executors. Typically less than shuffle write bytes because this does not include shuffle data read locally.""" val GETTING_RESULT_TIME = http://git-wip-us.apache.org/repos/asf/spark/blob/9fa29a62/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index a38cb75..3afd7ef 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -48,7 +48,9 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp val executorToTasksFailed = HashMap[String, Int]() val executorToDuration = HashMap[String, Long]() val executorToInputBytes = HashMap[String, Long]() + val executorToInputRecords = HashMap[String, Long]() val executorToOutputBytes = HashMap[String, Long]() + val executorToOutputRecords = HashMap[String, Long]() val executorToShuffleRead = HashMap[String, Long]() val executorToShuffleWrite = HashMap[String, Long]() val executorToLogUrls = HashMap[String, Map[String, String]]() @@ -84,10 +86,14 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener) extends Sp metrics.inputMetrics.foreach { inputMetrics => executorToInputBytes(eid) = executorToInputBytes.getOrElse(eid, 0L) + inputMetrics.bytesRead + executorToInputRecords(eid) = + executorToInputRecords.getOrElse(eid, 0L) + inputMetrics.recordsRead } metrics.outputMetrics.foreach { outputMetrics => executorToOutputBytes(eid) = executorToOutputBytes.getOrElse(eid, 0L) + outputMetrics.bytesWritten + executorToOutputRecords(eid) = + executorToOutputRecords.getOrElse(eid, 0L) + outputMetrics.recordsWritten } metrics.shuffleReadMetrics.foreach { shuffleRead => executorToShuffleRead(eid) = http://git-wip-us.apache.org/repos/asf/spark/blob/9fa29a62/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 9836d11..1f8536d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -36,6 +36,20 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage /** Special table which merges two header cells. */ private def executorTable[T](): Seq[Node] = { + val stageData = listener.stageIdToData.get((stageId, stageAttemptId)) + var hasInput = false + var hasOutput = false + var hasShuffleWrite = false + var hasShuffleRead = false + var hasBytesSpilled = false + stageData.foreach(data => { + hasInput = data.hasInput + hasOutput = data.hasOutput + hasShuffleRead = data.hasShuffleRead + hasShuffleWrite = data.hasShuffleWrite + hasBytesSpilled = data.hasBytesSpilled + }) + <table class={UIUtils.TABLE_CLASS_STRIPED}> <thead> <th>Executor ID</th> @@ -44,12 +58,32 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage <th>Total Tasks</th> <th>Failed Tasks</th> <th>Succeeded Tasks</th> - <th><span data-toggle="tooltip" title={ToolTips.INPUT}>Input</span></th> - <th><span data-toggle="tooltip" title={ToolTips.OUTPUT}>Output</span></th> - <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}>Shuffle Read</span></th> - <th><span data-toggle="tooltip" title={ToolTips.SHUFFLE_WRITE}>Shuffle Write</span></th> - <th>Shuffle Spill (Memory)</th> - <th>Shuffle Spill (Disk)</th> + {if (hasInput) { + <th> + <span data-toggle="tooltip" title={ToolTips.INPUT}>Input Size / Records</span> + </th> + }} + {if (hasOutput) { + <th> + <span data-toggle="tooltip" title={ToolTips.OUTPUT}>Output Size / Records</span> + </th> + }} + {if (hasShuffleRead) { + <th> + <span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ}> + Shuffle Read Size / Records</span> + </th> + }} + {if (hasShuffleWrite) { + <th> + <span data-toggle="tooltip" title={ToolTips.SHUFFLE_WRITE}> + Shuffle Write Size / Records</span> + </th> + }} + {if (hasBytesSpilled) { + <th>Shuffle Spill (Memory)</th> + <th>Shuffle Spill (Disk)</th> + }} </thead> <tbody> {createExecutorTable()} @@ -76,18 +110,34 @@ private[ui] class ExecutorTable(stageId: Int, stageAttemptId: Int, parent: Stage <td>{v.failedTasks + v.succeededTasks}</td> <td>{v.failedTasks}</td> <td>{v.succeededTasks}</td> - <td sorttable_customkey={v.inputBytes.toString}> - {Utils.bytesToString(v.inputBytes)}</td> - <td sorttable_customkey={v.outputBytes.toString}> - {Utils.bytesToString(v.outputBytes)}</td> - <td sorttable_customkey={v.shuffleRead.toString}> - {Utils.bytesToString(v.shuffleRead)}</td> - <td sorttable_customkey={v.shuffleWrite.toString}> - {Utils.bytesToString(v.shuffleWrite)}</td> - <td sorttable_customkey={v.memoryBytesSpilled.toString}> - {Utils.bytesToString(v.memoryBytesSpilled)}</td> - <td sorttable_customkey={v.diskBytesSpilled.toString}> - {Utils.bytesToString(v.diskBytesSpilled)}</td> + {if (stageData.hasInput) { + <td sorttable_customkey={v.inputBytes.toString}> + {s"${Utils.bytesToString(v.inputBytes)} / ${v.inputRecords}"} + </td> + }} + {if (stageData.hasOutput) { + <td sorttable_customkey={v.outputBytes.toString}> + {s"${Utils.bytesToString(v.outputBytes)} / ${v.outputRecords}"} + </td> + }} + {if (stageData.hasShuffleRead) { + <td sorttable_customkey={v.shuffleRead.toString}> + {s"${Utils.bytesToString(v.shuffleRead)} / ${v.shuffleReadRecords}"} + </td> + }} + {if (stageData.hasShuffleWrite) { + <td sorttable_customkey={v.shuffleWrite.toString}> + {s"${Utils.bytesToString(v.shuffleWrite)} / ${v.shuffleWriteRecords}"} + </td> + }} + {if (stageData.hasBytesSpilled) { + <td sorttable_customkey={v.memoryBytesSpilled.toString}> + {Utils.bytesToString(v.memoryBytesSpilled)} + </td> + <td sorttable_customkey={v.diskBytesSpilled.toString}> + {Utils.bytesToString(v.diskBytesSpilled)} + </td> + }} </tr> } case None => http://git-wip-us.apache.org/repos/asf/spark/blob/9fa29a62/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index 4d200ee..f463f8d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -394,24 +394,48 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging { stageData.shuffleWriteBytes += shuffleWriteDelta execSummary.shuffleWrite += shuffleWriteDelta + val shuffleWriteRecordsDelta = + (taskMetrics.shuffleWriteMetrics.map(_.shuffleRecordsWritten).getOrElse(0L) + - oldMetrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleRecordsWritten).getOrElse(0L)) + stageData.shuffleWriteRecords += shuffleWriteRecordsDelta + execSummary.shuffleWriteRecords += shuffleWriteRecordsDelta + val shuffleReadDelta = (taskMetrics.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L) - oldMetrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead).getOrElse(0L)) stageData.shuffleReadBytes += shuffleReadDelta execSummary.shuffleRead += shuffleReadDelta + val shuffleReadRecordsDelta = + (taskMetrics.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L) + - oldMetrics.flatMap(_.shuffleReadMetrics).map(_.recordsRead).getOrElse(0L)) + stageData.shuffleReadRecords += shuffleReadRecordsDelta + execSummary.shuffleReadRecords += shuffleReadRecordsDelta + val inputBytesDelta = (taskMetrics.inputMetrics.map(_.bytesRead).getOrElse(0L) - oldMetrics.flatMap(_.inputMetrics).map(_.bytesRead).getOrElse(0L)) stageData.inputBytes += inputBytesDelta execSummary.inputBytes += inputBytesDelta + val inputRecordsDelta = + (taskMetrics.inputMetrics.map(_.recordsRead).getOrElse(0L) + - oldMetrics.flatMap(_.inputMetrics).map(_.recordsRead).getOrElse(0L)) + stageData.inputRecords += inputRecordsDelta + execSummary.inputRecords += inputRecordsDelta + val outputBytesDelta = (taskMetrics.outputMetrics.map(_.bytesWritten).getOrElse(0L) - oldMetrics.flatMap(_.outputMetrics).map(_.bytesWritten).getOrElse(0L)) stageData.outputBytes += outputBytesDelta execSummary.outputBytes += outputBytesDelta + val outputRecordsDelta = + (taskMetrics.outputMetrics.map(_.recordsWritten).getOrElse(0L) + - oldMetrics.flatMap(_.outputMetrics).map(_.recordsWritten).getOrElse(0L)) + stageData.outputRecords += outputRecordsDelta + execSummary.outputRecords += outputRecordsDelta + val diskSpillDelta = taskMetrics.diskBytesSpilled - oldMetrics.map(_.diskBytesSpilled).getOrElse(0L) stageData.diskBytesSpilled += diskSpillDelta http://git-wip-us.apache.org/repos/asf/spark/blob/9fa29a62/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index d8be1b2..02a3cc3 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -56,11 +56,6 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val numCompleted = tasks.count(_.taskInfo.finished) val accumulables = listener.stageIdToData((stageId, stageAttemptId)).accumulables val hasAccumulators = accumulables.size > 0 - val hasInput = stageData.inputBytes > 0 - val hasOutput = stageData.outputBytes > 0 - val hasShuffleRead = stageData.shuffleReadBytes > 0 - val hasShuffleWrite = stageData.shuffleWriteBytes > 0 - val hasBytesSpilled = stageData.memoryBytesSpilled > 0 && stageData.diskBytesSpilled > 0 val summary = <div> @@ -69,31 +64,33 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { <strong>Total task time across all tasks: </strong> {UIUtils.formatDuration(stageData.executorRunTime)} </li> - {if (hasInput) { + {if (stageData.hasInput) { <li> - <strong>Input: </strong> - {Utils.bytesToString(stageData.inputBytes)} + <strong>Input Size / Records: </strong> + {s"${Utils.bytesToString(stageData.inputBytes)} / ${stageData.inputRecords}"} </li> }} - {if (hasOutput) { + {if (stageData.hasOutput) { <li> <strong>Output: </strong> - {Utils.bytesToString(stageData.outputBytes)} + {s"${Utils.bytesToString(stageData.outputBytes)} / ${stageData.outputRecords}"} </li> }} - {if (hasShuffleRead) { + {if (stageData.hasShuffleRead) { <li> <strong>Shuffle read: </strong> - {Utils.bytesToString(stageData.shuffleReadBytes)} + {s"${Utils.bytesToString(stageData.shuffleReadBytes)} / " + + s"${stageData.shuffleReadRecords}"} </li> }} - {if (hasShuffleWrite) { + {if (stageData.hasShuffleWrite) { <li> <strong>Shuffle write: </strong> - {Utils.bytesToString(stageData.shuffleWriteBytes)} + {s"${Utils.bytesToString(stageData.shuffleWriteBytes)} / " + + s"${stageData.shuffleWriteRecords}"} </li> }} - {if (hasBytesSpilled) { + {if (stageData.hasBytesSpilled) { <li> <strong>Shuffle spill (memory): </strong> {Utils.bytesToString(stageData.memoryBytesSpilled)} @@ -132,7 +129,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { <span class="additional-metric-title">Task Deserialization Time</span> </span> </li> - {if (hasShuffleRead) { + {if (stageData.hasShuffleRead) { <li> <span data-toggle="tooltip" title={ToolTips.SHUFFLE_READ_BLOCKED_TIME} data-placement="right"> @@ -174,25 +171,32 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { ("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME), ("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++ {if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++ - {if (hasInput) Seq(("Input", "")) else Nil} ++ - {if (hasOutput) Seq(("Output", "")) else Nil} ++ - {if (hasShuffleRead) { + {if (stageData.hasInput) Seq(("Input Size / Records", "")) else Nil} ++ + {if (stageData.hasOutput) Seq(("Output Size / Records", "")) else Nil} ++ + {if (stageData.hasShuffleRead) { Seq(("Shuffle Read Blocked Time", TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME), - ("Shuffle Read", "")) + ("Shuffle Read Size / Records", "")) + } else { + Nil + }} ++ + {if (stageData.hasShuffleWrite) { + Seq(("Write Time", ""), ("Shuffle Write Size / Records", "")) + } else { + Nil + }} ++ + {if (stageData.hasBytesSpilled) { + Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", "")) } else { Nil }} ++ - {if (hasShuffleWrite) Seq(("Write Time", ""), ("Shuffle Write", "")) else Nil} ++ - {if (hasBytesSpilled) Seq(("Shuffle Spill (Memory)", ""), ("Shuffle Spill (Disk)", "")) - else Nil} ++ Seq(("Errors", "")) val unzipped = taskHeadersAndCssClasses.unzip val taskTable = UIUtils.listingTable( unzipped._1, - taskRow(hasAccumulators, hasInput, hasOutput, hasShuffleRead, hasShuffleWrite, - hasBytesSpilled), + taskRow(hasAccumulators, stageData.hasInput, stageData.hasOutput, + stageData.hasShuffleRead, stageData.hasShuffleWrite, stageData.hasBytesSpilled), tasks, headerClasses = unzipped._2) // Excludes tasks which failed and have incomplete metrics @@ -203,8 +207,11 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { None } else { + def getDistributionQuantiles(data: Seq[Double]): IndexedSeq[Double] = + Distribution(data).get.getQuantiles() + def getFormattedTimeQuantiles(times: Seq[Double]): Seq[Node] = { - Distribution(times).get.getQuantiles().map { millis => + getDistributionQuantiles(times).map { millis => <td>{UIUtils.formatDuration(millis.toLong)}</td> } } @@ -273,17 +280,36 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { getFormattedTimeQuantiles(schedulerDelays) def getFormattedSizeQuantiles(data: Seq[Double]) = - Distribution(data).get.getQuantiles().map(d => <td>{Utils.bytesToString(d.toLong)}</td>) + getDistributionQuantiles(data).map(d => <td>{Utils.bytesToString(d.toLong)}</td>) + + def getFormattedSizeQuantilesWithRecords(data: Seq[Double], records: Seq[Double]) = { + val recordDist = getDistributionQuantiles(records).iterator + getDistributionQuantiles(data).map(d => + <td>{s"${Utils.bytesToString(d.toLong)} / ${recordDist.next().toLong}"}</td> + ) + } val inputSizes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble } - val inputQuantiles = <td>Input</td> +: getFormattedSizeQuantiles(inputSizes) + + val inputRecords = validTasks.map { case TaskUIData(_, metrics, _) => + metrics.get.inputMetrics.map(_.recordsRead).getOrElse(0L).toDouble + } + + val inputQuantiles = <td>Input Size / Records</td> +: + getFormattedSizeQuantilesWithRecords(inputSizes, inputRecords) val outputSizes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.outputMetrics.map(_.bytesWritten).getOrElse(0L).toDouble } - val outputQuantiles = <td>Output</td> +: getFormattedSizeQuantiles(outputSizes) + + val outputRecords = validTasks.map { case TaskUIData(_, metrics, _) => + metrics.get.outputMetrics.map(_.recordsWritten).getOrElse(0L).toDouble + } + + val outputQuantiles = <td>Output Size / Records</td> +: + getFormattedSizeQuantilesWithRecords(outputSizes, outputRecords) val shuffleReadBlockedTimes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.shuffleReadMetrics.map(_.fetchWaitTime).getOrElse(0L).toDouble @@ -294,14 +320,24 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble } - val shuffleReadQuantiles = <td>Shuffle Read (Remote)</td> +: - getFormattedSizeQuantiles(shuffleReadSizes) + + val shuffleReadRecords = validTasks.map { case TaskUIData(_, metrics, _) => + metrics.get.shuffleReadMetrics.map(_.recordsRead).getOrElse(0L).toDouble + } + + val shuffleReadQuantiles = <td>Shuffle Read Size / Records (Remote)</td> +: + getFormattedSizeQuantilesWithRecords(shuffleReadSizes, shuffleReadRecords) val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble } - val shuffleWriteQuantiles = <td>Shuffle Write</td> +: - getFormattedSizeQuantiles(shuffleWriteSizes) + + val shuffleWriteRecords = validTasks.map { case TaskUIData(_, metrics, _) => + metrics.get.shuffleWriteMetrics.map(_.shuffleRecordsWritten).getOrElse(0L).toDouble + } + + val shuffleWriteQuantiles = <td>Shuffle Write Size / Records</td> +: + getFormattedSizeQuantilesWithRecords(shuffleWriteSizes, shuffleWriteRecords) val memoryBytesSpilledSizes = validTasks.map { case TaskUIData(_, metrics, _) => metrics.get.memoryBytesSpilled.toDouble @@ -326,9 +362,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { {serializationQuantiles} </tr>, <tr class={TaskDetailsClassNames.GETTING_RESULT_TIME}>{gettingResultQuantiles}</tr>, - if (hasInput) <tr>{inputQuantiles}</tr> else Nil, - if (hasOutput) <tr>{outputQuantiles}</tr> else Nil, - if (hasShuffleRead) { + if (stageData.hasInput) <tr>{inputQuantiles}</tr> else Nil, + if (stageData.hasOutput) <tr>{outputQuantiles}</tr> else Nil, + if (stageData.hasShuffleRead) { <tr class={TaskDetailsClassNames.SHUFFLE_READ_BLOCKED_TIME}> {shuffleReadBlockedQuantiles} </tr> @@ -336,9 +372,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { } else { Nil }, - if (hasShuffleWrite) <tr>{shuffleWriteQuantiles}</tr> else Nil, - if (hasBytesSpilled) <tr>{memoryBytesSpilledQuantiles}</tr> else Nil, - if (hasBytesSpilled) <tr>{diskBytesSpilledQuantiles}</tr> else Nil) + if (stageData.hasShuffleWrite) <tr>{shuffleWriteQuantiles}</tr> else Nil, + if (stageData.hasBytesSpilled) <tr>{memoryBytesSpilledQuantiles}</tr> else Nil, + if (stageData.hasBytesSpilled) <tr>{diskBytesSpilledQuantiles}</tr> else Nil) val quantileHeaders = Seq("Metric", "Min", "25th percentile", "Median", "75th percentile", "Max") @@ -397,26 +433,32 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val inputReadable = maybeInput .map(m => s"${Utils.bytesToString(m.bytesRead)} (${m.readMethod.toString.toLowerCase()})") .getOrElse("") + val inputRecords = maybeInput.map(_.recordsRead.toString).getOrElse("") val maybeOutput = metrics.flatMap(_.outputMetrics) val outputSortable = maybeOutput.map(_.bytesWritten.toString).getOrElse("") val outputReadable = maybeOutput .map(m => s"${Utils.bytesToString(m.bytesWritten)}") .getOrElse("") + val outputRecords = maybeOutput.map(_.recordsWritten.toString).getOrElse("") - val maybeShuffleReadBlockedTime = metrics.flatMap(_.shuffleReadMetrics).map(_.fetchWaitTime) - val shuffleReadBlockedTimeSortable = maybeShuffleReadBlockedTime.map(_.toString).getOrElse("") + val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics) + val shuffleReadBlockedTimeSortable = maybeShuffleRead + .map(_.fetchWaitTime.toString).getOrElse("") val shuffleReadBlockedTimeReadable = - maybeShuffleReadBlockedTime.map(ms => UIUtils.formatDuration(ms)).getOrElse("") + maybeShuffleRead.map(ms => UIUtils.formatDuration(ms.fetchWaitTime)).getOrElse("") - val maybeShuffleRead = metrics.flatMap(_.shuffleReadMetrics).map(_.remoteBytesRead) - val shuffleReadSortable = maybeShuffleRead.map(_.toString).getOrElse("") - val shuffleReadReadable = maybeShuffleRead.map(Utils.bytesToString).getOrElse("") + val shuffleReadSortable = maybeShuffleRead.map(_.remoteBytesRead.toString).getOrElse("") + val shuffleReadReadable = maybeShuffleRead + .map(m => s"${Utils.bytesToString(m.remoteBytesRead)}").getOrElse("") + val shuffleReadRecords = maybeShuffleRead.map(_.recordsRead.toString).getOrElse("") - val maybeShuffleWrite = - metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleBytesWritten) - val shuffleWriteSortable = maybeShuffleWrite.map(_.toString).getOrElse("") - val shuffleWriteReadable = maybeShuffleWrite.map(Utils.bytesToString).getOrElse("") + val maybeShuffleWrite = metrics.flatMap(_.shuffleWriteMetrics) + val shuffleWriteSortable = maybeShuffleWrite.map(_.shuffleBytesWritten.toString).getOrElse("") + val shuffleWriteReadable = maybeShuffleWrite + .map(m => s"${Utils.bytesToString(m.shuffleBytesWritten)}").getOrElse("") + val shuffleWriteRecords = maybeShuffleWrite + .map(_.shuffleRecordsWritten.toString).getOrElse("") val maybeWriteTime = metrics.flatMap(_.shuffleWriteMetrics).map(_.shuffleWriteTime) val writeTimeSortable = maybeWriteTime.map(_.toString).getOrElse("") @@ -472,12 +514,12 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { }} {if (hasInput) { <td sorttable_customkey={inputSortable}> - {inputReadable} + {s"$inputReadable / $inputRecords"} </td> }} {if (hasOutput) { <td sorttable_customkey={outputSortable}> - {outputReadable} + {s"$outputReadable / $outputRecords"} </td> }} {if (hasShuffleRead) { @@ -486,7 +528,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { {shuffleReadBlockedTimeReadable} </td> <td sorttable_customkey={shuffleReadSortable}> - {shuffleReadReadable} + {s"$shuffleReadReadable / $shuffleReadRecords"} </td> }} {if (hasShuffleWrite) { @@ -494,7 +536,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { {writeTimeReadable} </td> <td sorttable_customkey={shuffleWriteSortable}> - {shuffleWriteReadable} + {s"$shuffleWriteReadable / $shuffleWriteRecords"} </td> }} {if (hasBytesSpilled) { http://git-wip-us.apache.org/repos/asf/spark/blob/9fa29a62/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 01f7e23..69aac6c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -31,9 +31,13 @@ private[jobs] object UIData { var failedTasks : Int = 0 var succeededTasks : Int = 0 var inputBytes : Long = 0 + var inputRecords : Long = 0 var outputBytes : Long = 0 + var outputRecords : Long = 0 var shuffleRead : Long = 0 + var shuffleReadRecords : Long = 0 var shuffleWrite : Long = 0 + var shuffleWriteRecords : Long = 0 var memoryBytesSpilled : Long = 0 var diskBytesSpilled : Long = 0 } @@ -73,9 +77,13 @@ private[jobs] object UIData { var executorRunTime: Long = _ var inputBytes: Long = _ + var inputRecords: Long = _ var outputBytes: Long = _ + var outputRecords: Long = _ var shuffleReadBytes: Long = _ + var shuffleReadRecords : Long = _ var shuffleWriteBytes: Long = _ + var shuffleWriteRecords: Long = _ var memoryBytesSpilled: Long = _ var diskBytesSpilled: Long = _ @@ -85,6 +93,12 @@ private[jobs] object UIData { var accumulables = new HashMap[Long, AccumulableInfo] var taskData = new HashMap[Long, TaskUIData] var executorSummary = new HashMap[String, ExecutorSummary] + + def hasInput = inputBytes > 0 + def hasOutput = outputBytes > 0 + def hasShuffleRead = shuffleReadBytes > 0 + def hasShuffleWrite = shuffleWriteBytes > 0 + def hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0 } /** http://git-wip-us.apache.org/repos/asf/spark/blob/9fa29a62/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index c8407bb..b0b5456 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -293,22 +293,26 @@ private[spark] object JsonProtocol { ("Remote Blocks Fetched" -> shuffleReadMetrics.remoteBlocksFetched) ~ ("Local Blocks Fetched" -> shuffleReadMetrics.localBlocksFetched) ~ ("Fetch Wait Time" -> shuffleReadMetrics.fetchWaitTime) ~ - ("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead) + ("Remote Bytes Read" -> shuffleReadMetrics.remoteBytesRead) ~ + ("Total Records Read" -> shuffleReadMetrics.recordsRead) } def shuffleWriteMetricsToJson(shuffleWriteMetrics: ShuffleWriteMetrics): JValue = { ("Shuffle Bytes Written" -> shuffleWriteMetrics.shuffleBytesWritten) ~ - ("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime) + ("Shuffle Write Time" -> shuffleWriteMetrics.shuffleWriteTime) ~ + ("Shuffle Records Written" -> shuffleWriteMetrics.shuffleRecordsWritten) } def inputMetricsToJson(inputMetrics: InputMetrics): JValue = { ("Data Read Method" -> inputMetrics.readMethod.toString) ~ - ("Bytes Read" -> inputMetrics.bytesRead) + ("Bytes Read" -> inputMetrics.bytesRead) ~ + ("Records Read" -> inputMetrics.recordsRead) } def outputMetricsToJson(outputMetrics: OutputMetrics): JValue = { ("Data Write Method" -> outputMetrics.writeMethod.toString) ~ - ("Bytes Written" -> outputMetrics.bytesWritten) + ("Bytes Written" -> outputMetrics.bytesWritten) ~ + ("Records Written" -> outputMetrics.recordsWritten) } def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = { @@ -670,6 +674,7 @@ private[spark] object JsonProtocol { metrics.incLocalBlocksFetched((json \ "Local Blocks Fetched").extract[Int]) metrics.incFetchWaitTime((json \ "Fetch Wait Time").extract[Long]) metrics.incRemoteBytesRead((json \ "Remote Bytes Read").extract[Long]) + metrics.incRecordsRead((json \ "Total Records Read").extractOpt[Long].getOrElse(0)) metrics } @@ -677,13 +682,16 @@ private[spark] object JsonProtocol { val metrics = new ShuffleWriteMetrics metrics.incShuffleBytesWritten((json \ "Shuffle Bytes Written").extract[Long]) metrics.incShuffleWriteTime((json \ "Shuffle Write Time").extract[Long]) + metrics.setShuffleRecordsWritten((json \ "Shuffle Records Written") + .extractOpt[Long].getOrElse(0)) metrics } def inputMetricsFromJson(json: JValue): InputMetrics = { val metrics = new InputMetrics( DataReadMethod.withName((json \ "Data Read Method").extract[String])) - metrics.addBytesRead((json \ "Bytes Read").extract[Long]) + metrics.incBytesRead((json \ "Bytes Read").extract[Long]) + metrics.incRecordsRead((json \ "Records Read").extractOpt[Long].getOrElse(0)) metrics } @@ -691,6 +699,7 @@ private[spark] object JsonProtocol { val metrics = new OutputMetrics( DataWriteMethod.withName((json \ "Data Write Method").extract[String])) metrics.setBytesWritten((json \ "Bytes Written").extract[Long]) + metrics.setRecordsWritten((json \ "Records Written").extractOpt[Long].getOrElse(0)) metrics } http://git-wip-us.apache.org/repos/asf/spark/blob/9fa29a62/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 6ba0384..eaec5a7 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -763,6 +763,7 @@ private[spark] class ExternalSorter[K, V, C]( if (curWriteMetrics != null) { m.incShuffleBytesWritten(curWriteMetrics.shuffleBytesWritten) m.incShuffleWriteTime(curWriteMetrics.shuffleWriteTime) + m.incShuffleRecordsWritten(curWriteMetrics.shuffleRecordsWritten) } } http://git-wip-us.apache.org/repos/asf/spark/blob/9fa29a62/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index 81db66a..78fa98a 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -21,44 +21,46 @@ import java.io.{File, FileWriter, PrintWriter} import scala.collection.mutable.ArrayBuffer -import org.scalatest.FunSuite - +import org.apache.commons.lang.math.RandomUtils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.io.{LongWritable, Text} -import org.apache.hadoop.mapred.{FileSplit => OldFileSplit, InputSplit => OldInputSplit, JobConf, - LineRecordReader => OldLineRecordReader, RecordReader => OldRecordReader, Reporter, - TextInputFormat => OldTextInputFormat} import org.apache.hadoop.mapred.lib.{CombineFileInputFormat => OldCombineFileInputFormat, - CombineFileSplit => OldCombineFileSplit, CombineFileRecordReader => OldCombineFileRecordReader} -import org.apache.hadoop.mapreduce.{InputSplit => NewInputSplit, RecordReader => NewRecordReader, - TaskAttemptContext} + CombineFileRecordReader => OldCombineFileRecordReader, CombineFileSplit => OldCombineFileSplit} +import org.apache.hadoop.mapred.{JobConf, Reporter, FileSplit => OldFileSplit, + InputSplit => OldInputSplit, LineRecordReader => OldLineRecordReader, + RecordReader => OldRecordReader, TextInputFormat => OldTextInputFormat} import org.apache.hadoop.mapreduce.lib.input.{CombineFileInputFormat => NewCombineFileInputFormat, CombineFileRecordReader => NewCombineFileRecordReader, CombineFileSplit => NewCombineFileSplit, FileSplit => NewFileSplit, TextInputFormat => NewTextInputFormat} +import org.apache.hadoop.mapreduce.lib.output.{TextOutputFormat => NewTextOutputFormat} +import org.apache.hadoop.mapreduce.{TaskAttemptContext, InputSplit => NewInputSplit, + RecordReader => NewRecordReader} +import org.scalatest.{BeforeAndAfter, FunSuite} import org.apache.spark.SharedSparkContext import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} import org.apache.spark.util.Utils -class InputOutputMetricsSuite extends FunSuite with SharedSparkContext { +class InputOutputMetricsSuite extends FunSuite with SharedSparkContext + with BeforeAndAfter { @transient var tmpDir: File = _ @transient var tmpFile: File = _ @transient var tmpFilePath: String = _ + @transient val numRecords: Int = 100000 + @transient val numBuckets: Int = 10 - override def beforeAll() { - super.beforeAll() - + before { tmpDir = Utils.createTempDir() val testTempDir = new File(tmpDir, "test") testTempDir.mkdir() tmpFile = new File(testTempDir, getClass.getSimpleName + ".txt") val pw = new PrintWriter(new FileWriter(tmpFile)) - for (x <- 1 to 1000000) { - pw.println("s") + for (x <- 1 to numRecords) { + pw.println(RandomUtils.nextInt(numBuckets)) } pw.close() @@ -66,8 +68,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext { tmpFilePath = "file://" + tmpFile.getAbsolutePath } - override def afterAll() { - super.afterAll() + after { Utils.deleteRecursively(tmpDir) } @@ -155,6 +156,101 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext { assert(bytesRead >= tmpFile.length()) } + test("input metrics on records read - simple") { + val records = runAndReturnRecordsRead { + sc.textFile(tmpFilePath, 4).count() + } + assert(records == numRecords) + } + + test("input metrics on records read - more stages") { + val records = runAndReturnRecordsRead { + sc.textFile(tmpFilePath, 4) + .map(key => (key.length, 1)) + .reduceByKey(_ + _) + .count() + } + assert(records == numRecords) + } + + test("input metrics on records - New Hadoop API") { + val records = runAndReturnRecordsRead { + sc.newAPIHadoopFile(tmpFilePath, classOf[NewTextInputFormat], classOf[LongWritable], + classOf[Text]).count() + } + assert(records == numRecords) + } + + test("input metrics on recordsd read with cache") { + // prime the cache manager + val rdd = sc.textFile(tmpFilePath, 4).cache() + rdd.collect() + + val records = runAndReturnRecordsRead { + rdd.count() + } + + assert(records == numRecords) + } + + test("shuffle records read metrics") { + val recordsRead = runAndReturnShuffleRecordsRead { + sc.textFile(tmpFilePath, 4) + .map(key => (key, 1)) + .groupByKey() + .collect() + } + assert(recordsRead == numRecords) + } + + test("shuffle records written metrics") { + val recordsWritten = runAndReturnShuffleRecordsWritten { + sc.textFile(tmpFilePath, 4) + .map(key => (key, 1)) + .groupByKey() + .collect() + } + assert(recordsWritten == numRecords) + } + + /** + * Tests the metrics from end to end. + * 1) reading a hadoop file + * 2) shuffle and writing to a hadoop file. + * 3) writing to hadoop file. + */ + test("input read/write and shuffle read/write metrics all line up") { + var inputRead = 0L + var outputWritten = 0L + var shuffleRead = 0L + var shuffleWritten = 0L + sc.addSparkListener(new SparkListener() { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + val metrics = taskEnd.taskMetrics + metrics.inputMetrics.foreach(inputRead += _.recordsRead) + metrics.outputMetrics.foreach(outputWritten += _.recordsWritten) + metrics.shuffleReadMetrics.foreach(shuffleRead += _.recordsRead) + metrics.shuffleWriteMetrics.foreach(shuffleWritten += _.shuffleRecordsWritten) + } + }) + + val tmpFile = new File(tmpDir, getClass.getSimpleName) + + sc.textFile(tmpFilePath, 4) + .map(key => (key, 1)) + .reduceByKey(_+_) + .saveAsTextFile("file://" + tmpFile.getAbsolutePath) + + sc.listenerBus.waitUntilEmpty(500) + assert(inputRead == numRecords) + + // Only supported on newer Hadoop + if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) { + assert(outputWritten == numBuckets) + } + assert(shuffleRead == shuffleWritten) + } + test("input metrics with interleaved reads") { val numPartitions = 2 val cartVector = 0 to 9 @@ -193,18 +289,66 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext { assert(cartesianBytes == firstSize * numPartitions + (cartVector.length * secondSize)) } - private def runAndReturnBytesRead(job : => Unit): Long = { - val taskBytesRead = new ArrayBuffer[Long]() + private def runAndReturnBytesRead(job: => Unit): Long = { + runAndReturnMetrics(job, _.taskMetrics.inputMetrics.map(_.bytesRead)) + } + + private def runAndReturnRecordsRead(job: => Unit): Long = { + runAndReturnMetrics(job, _.taskMetrics.inputMetrics.map(_.recordsRead)) + } + + private def runAndReturnRecordsWritten(job: => Unit): Long = { + runAndReturnMetrics(job, _.taskMetrics.outputMetrics.map(_.recordsWritten)) + } + + private def runAndReturnShuffleRecordsRead(job: => Unit): Long = { + runAndReturnMetrics(job, _.taskMetrics.shuffleReadMetrics.map(_.recordsRead)) + } + + private def runAndReturnShuffleRecordsWritten(job: => Unit): Long = { + runAndReturnMetrics(job, _.taskMetrics.shuffleWriteMetrics.map(_.shuffleRecordsWritten)) + } + + private def runAndReturnMetrics(job: => Unit, + collector: (SparkListenerTaskEnd) => Option[Long]): Long = { + val taskMetrics = new ArrayBuffer[Long]() sc.addSparkListener(new SparkListener() { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { - taskBytesRead += taskEnd.taskMetrics.inputMetrics.get.bytesRead + collector(taskEnd).foreach(taskMetrics += _) } }) job sc.listenerBus.waitUntilEmpty(500) - taskBytesRead.sum + taskMetrics.sum + } + + test("output metrics on records written") { + // Only supported on newer Hadoop + if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) { + val file = new File(tmpDir, getClass.getSimpleName) + val filePath = "file://" + file.getAbsolutePath + + val records = runAndReturnRecordsWritten { + sc.parallelize(1 to numRecords).saveAsTextFile(filePath) + } + assert(records == numRecords) + } + } + + test("output metrics on records written - new Hadoop API") { + // Only supported on newer Hadoop + if (SparkHadoopUtil.get.getFSBytesWrittenOnThreadCallback().isDefined) { + val file = new File(tmpDir, getClass.getSimpleName) + val filePath = "file://" + file.getAbsolutePath + + val records = runAndReturnRecordsWritten { + sc.parallelize(1 to numRecords).map(key => (key.toString, key.toString)) + .saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](filePath) + } + assert(records == numRecords) + } } test("output metrics when writing text file") { @@ -318,4 +462,4 @@ class NewCombineTextRecordReaderWrapper( override def getCurrentValue(): Text = delegate.getCurrentValue override def getProgress(): Float = delegate.getProgress override def close(): Unit = delegate.close() -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/spark/blob/9fa29a62/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala index bbc7e13..c21c92b 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockObjectWriterSuite.scala @@ -31,6 +31,8 @@ class BlockObjectWriterSuite extends FunSuite { new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics) writer.write(Long.box(20)) + // Record metrics update on every write + assert(writeMetrics.shuffleRecordsWritten === 1) // Metrics don't update on every write assert(writeMetrics.shuffleBytesWritten == 0) // After 32 writes, metrics should update @@ -39,6 +41,7 @@ class BlockObjectWriterSuite extends FunSuite { writer.write(Long.box(i)) } assert(writeMetrics.shuffleBytesWritten > 0) + assert(writeMetrics.shuffleRecordsWritten === 33) writer.commitAndClose() assert(file.length() == writeMetrics.shuffleBytesWritten) } @@ -51,6 +54,8 @@ class BlockObjectWriterSuite extends FunSuite { new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics) writer.write(Long.box(20)) + // Record metrics update on every write + assert(writeMetrics.shuffleRecordsWritten === 1) // Metrics don't update on every write assert(writeMetrics.shuffleBytesWritten == 0) // After 32 writes, metrics should update @@ -59,7 +64,23 @@ class BlockObjectWriterSuite extends FunSuite { writer.write(Long.box(i)) } assert(writeMetrics.shuffleBytesWritten > 0) + assert(writeMetrics.shuffleRecordsWritten === 33) writer.revertPartialWritesAndClose() assert(writeMetrics.shuffleBytesWritten == 0) + assert(writeMetrics.shuffleRecordsWritten == 0) + } + + test("Reopening a closed block writer") { + val file = new File("somefile") + file.deleteOnExit() + val writeMetrics = new ShuffleWriteMetrics() + val writer = new DiskBlockObjectWriter(new TestBlockId("0"), file, + new JavaSerializer(new SparkConf()), 1024, os => os, true, writeMetrics) + + writer.open() + writer.close() + intercept[IllegalStateException] { + writer.open() + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/9fa29a62/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index 68074ae..e8405ba 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -234,7 +234,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc taskMetrics.incMemoryBytesSpilled(base + 6) val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) taskMetrics.setInputMetrics(Some(inputMetrics)) - inputMetrics.addBytesRead(base + 7) + inputMetrics.incBytesRead(base + 7) val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop) taskMetrics.outputMetrics = Some(outputMetrics) outputMetrics.setBytesWritten(base + 8) http://git-wip-us.apache.org/repos/asf/spark/blob/9fa29a62/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 842f545..f3017dc 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -189,6 +189,34 @@ class JsonProtocolSuite extends FunSuite { assert(newMetrics.inputMetrics.isEmpty) } + test("Input/Output records backwards compatibility") { + // records read were added after 1.2 + val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, + hasHadoopInput = true, hasOutput = true, hasRecords = false) + assert(metrics.inputMetrics.nonEmpty) + assert(metrics.outputMetrics.nonEmpty) + val newJson = JsonProtocol.taskMetricsToJson(metrics) + val oldJson = newJson.removeField { case (field, _) => field == "Records Read" } + .removeField { case (field, _) => field == "Records Written" } + val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) + assert(newMetrics.inputMetrics.get.recordsRead == 0) + assert(newMetrics.outputMetrics.get.recordsWritten == 0) + } + + test("Shuffle Read/Write records backwards compatibility") { + // records read were added after 1.2 + val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, + hasHadoopInput = false, hasOutput = false, hasRecords = false) + assert(metrics.shuffleReadMetrics.nonEmpty) + assert(metrics.shuffleWriteMetrics.nonEmpty) + val newJson = JsonProtocol.taskMetricsToJson(metrics) + val oldJson = newJson.removeField { case (field, _) => field == "Total Records Read" } + .removeField { case (field, _) => field == "Shuffle Records Written" } + val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson) + assert(newMetrics.shuffleReadMetrics.get.recordsRead == 0) + assert(newMetrics.shuffleWriteMetrics.get.shuffleRecordsWritten == 0) + } + test("OutputMetrics backward compatibility") { // OutputMetrics were added after 1.1 val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = false, hasOutput = true) @@ -644,7 +672,8 @@ class JsonProtocolSuite extends FunSuite { e: Int, f: Int, hasHadoopInput: Boolean, - hasOutput: Boolean) = { + hasOutput: Boolean, + hasRecords: Boolean = true) = { val t = new TaskMetrics t.setHostname("localhost") t.setExecutorDeserializeTime(a) @@ -656,7 +685,8 @@ class JsonProtocolSuite extends FunSuite { if (hasHadoopInput) { val inputMetrics = new InputMetrics(DataReadMethod.Hadoop) - inputMetrics.addBytesRead(d + e + f) + inputMetrics.incBytesRead(d + e + f) + inputMetrics.incRecordsRead(if (hasRecords) (d + e + f) / 100 else -1) t.setInputMetrics(Some(inputMetrics)) } else { val sr = new ShuffleReadMetrics @@ -664,16 +694,19 @@ class JsonProtocolSuite extends FunSuite { sr.incLocalBlocksFetched(e) sr.incFetchWaitTime(a + d) sr.incRemoteBlocksFetched(f) + sr.incRecordsRead(if (hasRecords) (b + d) / 100 else -1) t.setShuffleReadMetrics(Some(sr)) } if (hasOutput) { val outputMetrics = new OutputMetrics(DataWriteMethod.Hadoop) outputMetrics.setBytesWritten(a + b + c) + outputMetrics.setRecordsWritten(if (hasRecords) (a + b + c)/100 else -1) t.outputMetrics = Some(outputMetrics) } else { val sw = new ShuffleWriteMetrics sw.incShuffleBytesWritten(a + b + c) sw.incShuffleWriteTime(b + c + d) + sw.setShuffleRecordsWritten(if (hasRecords) (a + b + c) / 100 else -1) t.shuffleWriteMetrics = Some(sw) } // Make at most 6 blocks @@ -907,11 +940,13 @@ class JsonProtocolSuite extends FunSuite { | "Remote Blocks Fetched": 800, | "Local Blocks Fetched": 700, | "Fetch Wait Time": 900, - | "Remote Bytes Read": 1000 + | "Remote Bytes Read": 1000, + | "Total Records Read" : 10 | }, | "Shuffle Write Metrics": { | "Shuffle Bytes Written": 1200, - | "Shuffle Write Time": 1500 + | "Shuffle Write Time": 1500, + | "Shuffle Records Written": 12 | }, | "Updated Blocks": [ | { @@ -988,11 +1023,13 @@ class JsonProtocolSuite extends FunSuite { | "Disk Bytes Spilled": 0, | "Shuffle Write Metrics": { | "Shuffle Bytes Written": 1200, - | "Shuffle Write Time": 1500 + | "Shuffle Write Time": 1500, + | "Shuffle Records Written": 12 | }, | "Input Metrics": { | "Data Read Method": "Hadoop", - | "Bytes Read": 2100 + | "Bytes Read": 2100, + | "Records Read": 21 | }, | "Updated Blocks": [ | { @@ -1069,11 +1106,13 @@ class JsonProtocolSuite extends FunSuite { | "Disk Bytes Spilled": 0, | "Input Metrics": { | "Data Read Method": "Hadoop", - | "Bytes Read": 2100 + | "Bytes Read": 2100, + | "Records Read": 21 | }, | "Output Metrics": { | "Data Write Method": "Hadoop", - | "Bytes Written": 1200 + | "Bytes Written": 1200, + | "Records Written": 12 | }, | "Updated Blocks": [ | { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org