Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20013#discussion_r160486870 --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala --- @@ -119,118 +121,115 @@ private class LiveTask( import LiveEntityHelpers._ - private var recordedMetrics: v1.TaskMetrics = null + private var metrics: MetricsTracker = new MetricsTracker() var errorMessage: Option[String] = None /** * Update the metrics for the task and return the difference between the previous and new * values. */ - def updateMetrics(metrics: TaskMetrics): v1.TaskMetrics = { + def updateMetrics(metrics: TaskMetrics): MetricsTracker = { if (metrics != null) { - val old = recordedMetrics - recordedMetrics = new v1.TaskMetrics( - metrics.executorDeserializeTime, - metrics.executorDeserializeCpuTime, - metrics.executorRunTime, - metrics.executorCpuTime, - metrics.resultSize, - metrics.jvmGCTime, - metrics.resultSerializationTime, - metrics.memoryBytesSpilled, - metrics.diskBytesSpilled, - metrics.peakExecutionMemory, - new v1.InputMetrics( - metrics.inputMetrics.bytesRead, - metrics.inputMetrics.recordsRead), - new v1.OutputMetrics( - metrics.outputMetrics.bytesWritten, - metrics.outputMetrics.recordsWritten), - new v1.ShuffleReadMetrics( - metrics.shuffleReadMetrics.remoteBlocksFetched, - metrics.shuffleReadMetrics.localBlocksFetched, - metrics.shuffleReadMetrics.fetchWaitTime, - metrics.shuffleReadMetrics.remoteBytesRead, - metrics.shuffleReadMetrics.remoteBytesReadToDisk, - metrics.shuffleReadMetrics.localBytesRead, - metrics.shuffleReadMetrics.recordsRead), - new v1.ShuffleWriteMetrics( - metrics.shuffleWriteMetrics.bytesWritten, - metrics.shuffleWriteMetrics.writeTime, - metrics.shuffleWriteMetrics.recordsWritten)) - if (old != null) calculateMetricsDelta(recordedMetrics, old) else recordedMetrics + val old = this.metrics + val newMetrics = new MetricsTracker() + newMetrics.executorDeserializeTime = metrics.executorDeserializeTime + newMetrics.executorDeserializeCpuTime = metrics.executorDeserializeCpuTime + newMetrics.executorRunTime = metrics.executorRunTime + newMetrics.executorCpuTime = metrics.executorCpuTime + newMetrics.resultSize = metrics.resultSize + newMetrics.jvmGcTime = metrics.jvmGCTime + newMetrics.resultSerializationTime = metrics.resultSerializationTime + newMetrics.memoryBytesSpilled = metrics.memoryBytesSpilled + newMetrics.diskBytesSpilled = metrics.diskBytesSpilled + newMetrics.peakExecutionMemory = metrics.peakExecutionMemory + newMetrics.inputBytesRead = metrics.inputMetrics.bytesRead + newMetrics.inputRecordsRead = metrics.inputMetrics.recordsRead + newMetrics.outputBytesWritten = metrics.outputMetrics.bytesWritten + newMetrics.outputRecordsWritten = metrics.outputMetrics.recordsWritten + newMetrics.shuffleRemoteBlocksFetched = metrics.shuffleReadMetrics.remoteBlocksFetched + newMetrics.shuffleLocalBlocksFetched = metrics.shuffleReadMetrics.localBlocksFetched + newMetrics.shuffleFetchWaitTime = metrics.shuffleReadMetrics.fetchWaitTime + newMetrics.shuffleRemoteBytesRead = metrics.shuffleReadMetrics.remoteBytesRead + newMetrics.shuffleRemoteBytesReadToDisk = metrics.shuffleReadMetrics.remoteBytesReadToDisk + newMetrics.shuffleLocalBytesRead = metrics.shuffleReadMetrics.localBytesRead + newMetrics.shuffleRecordsRead = metrics.shuffleReadMetrics.recordsRead + newMetrics.shuffleBytesWritten = metrics.shuffleWriteMetrics.bytesWritten + newMetrics.shuffleWriteTime = metrics.shuffleWriteMetrics.writeTime + newMetrics.shuffleRecordsWritten = metrics.shuffleWriteMetrics.recordsWritten + + this.metrics = newMetrics + if (old.executorDeserializeTime >= 0L) { + old.subtract(newMetrics) --- End diff -- Actually sorry, the parameters is not the delta, the return value is. This code is not right but the solution is different that what you suggest. I'll also add a test for it.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org