Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20013#discussion_r160017859 --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala --- @@ -119,118 +121,115 @@ private class LiveTask( import LiveEntityHelpers._ - private var recordedMetrics: v1.TaskMetrics = null + private var metrics: MetricsTracker = new MetricsTracker() var errorMessage: Option[String] = None /** * Update the metrics for the task and return the difference between the previous and new * values. */ - def updateMetrics(metrics: TaskMetrics): v1.TaskMetrics = { + def updateMetrics(metrics: TaskMetrics): MetricsTracker = { if (metrics != null) { - val old = recordedMetrics - recordedMetrics = new v1.TaskMetrics( - metrics.executorDeserializeTime, - metrics.executorDeserializeCpuTime, - metrics.executorRunTime, - metrics.executorCpuTime, - metrics.resultSize, - metrics.jvmGCTime, - metrics.resultSerializationTime, - metrics.memoryBytesSpilled, - metrics.diskBytesSpilled, - metrics.peakExecutionMemory, - new v1.InputMetrics( - metrics.inputMetrics.bytesRead, - metrics.inputMetrics.recordsRead), - new v1.OutputMetrics( - metrics.outputMetrics.bytesWritten, - metrics.outputMetrics.recordsWritten), - new v1.ShuffleReadMetrics( - metrics.shuffleReadMetrics.remoteBlocksFetched, - metrics.shuffleReadMetrics.localBlocksFetched, - metrics.shuffleReadMetrics.fetchWaitTime, - metrics.shuffleReadMetrics.remoteBytesRead, - metrics.shuffleReadMetrics.remoteBytesReadToDisk, - metrics.shuffleReadMetrics.localBytesRead, - metrics.shuffleReadMetrics.recordsRead), - new v1.ShuffleWriteMetrics( - metrics.shuffleWriteMetrics.bytesWritten, - metrics.shuffleWriteMetrics.writeTime, - metrics.shuffleWriteMetrics.recordsWritten)) - if (old != null) calculateMetricsDelta(recordedMetrics, old) else recordedMetrics + val old = this.metrics + val newMetrics = new MetricsTracker() + newMetrics.executorDeserializeTime = metrics.executorDeserializeTime + newMetrics.executorDeserializeCpuTime = metrics.executorDeserializeCpuTime + newMetrics.executorRunTime = metrics.executorRunTime + newMetrics.executorCpuTime = metrics.executorCpuTime + newMetrics.resultSize = metrics.resultSize + newMetrics.jvmGcTime = metrics.jvmGCTime + newMetrics.resultSerializationTime = metrics.resultSerializationTime + newMetrics.memoryBytesSpilled = metrics.memoryBytesSpilled + newMetrics.diskBytesSpilled = metrics.diskBytesSpilled + newMetrics.peakExecutionMemory = metrics.peakExecutionMemory + newMetrics.inputBytesRead = metrics.inputMetrics.bytesRead + newMetrics.inputRecordsRead = metrics.inputMetrics.recordsRead + newMetrics.outputBytesWritten = metrics.outputMetrics.bytesWritten + newMetrics.outputRecordsWritten = metrics.outputMetrics.recordsWritten + newMetrics.shuffleRemoteBlocksFetched = metrics.shuffleReadMetrics.remoteBlocksFetched + newMetrics.shuffleLocalBlocksFetched = metrics.shuffleReadMetrics.localBlocksFetched + newMetrics.shuffleFetchWaitTime = metrics.shuffleReadMetrics.fetchWaitTime + newMetrics.shuffleRemoteBytesRead = metrics.shuffleReadMetrics.remoteBytesRead + newMetrics.shuffleRemoteBytesReadToDisk = metrics.shuffleReadMetrics.remoteBytesReadToDisk + newMetrics.shuffleLocalBytesRead = metrics.shuffleReadMetrics.localBytesRead + newMetrics.shuffleRecordsRead = metrics.shuffleReadMetrics.recordsRead + newMetrics.shuffleBytesWritten = metrics.shuffleWriteMetrics.bytesWritten + newMetrics.shuffleWriteTime = metrics.shuffleWriteMetrics.writeTime + newMetrics.shuffleRecordsWritten = metrics.shuffleWriteMetrics.recordsWritten + + this.metrics = newMetrics + if (old.executorDeserializeTime >= 0L) { + old.subtract(newMetrics) + old + } else { + newMetrics + } } else { null } } - /** - * Return a new TaskMetrics object containing the delta of the various fields of the given - * metrics objects. This is currently targeted at updating stage data, so it does not - * necessarily calculate deltas for all the fields. - */ - private def calculateMetricsDelta( - metrics: v1.TaskMetrics, - old: v1.TaskMetrics): v1.TaskMetrics = { - val shuffleWriteDelta = new v1.ShuffleWriteMetrics( - metrics.shuffleWriteMetrics.bytesWritten - old.shuffleWriteMetrics.bytesWritten, - 0L, - metrics.shuffleWriteMetrics.recordsWritten - old.shuffleWriteMetrics.recordsWritten) - - val shuffleReadDelta = new v1.ShuffleReadMetrics( - 0L, 0L, 0L, - metrics.shuffleReadMetrics.remoteBytesRead - old.shuffleReadMetrics.remoteBytesRead, - metrics.shuffleReadMetrics.remoteBytesReadToDisk - - old.shuffleReadMetrics.remoteBytesReadToDisk, - metrics.shuffleReadMetrics.localBytesRead - old.shuffleReadMetrics.localBytesRead, - metrics.shuffleReadMetrics.recordsRead - old.shuffleReadMetrics.recordsRead) - - val inputDelta = new v1.InputMetrics( - metrics.inputMetrics.bytesRead - old.inputMetrics.bytesRead, - metrics.inputMetrics.recordsRead - old.inputMetrics.recordsRead) - - val outputDelta = new v1.OutputMetrics( - metrics.outputMetrics.bytesWritten - old.outputMetrics.bytesWritten, - metrics.outputMetrics.recordsWritten - old.outputMetrics.recordsWritten) - - new v1.TaskMetrics( - 0L, 0L, - metrics.executorRunTime - old.executorRunTime, - metrics.executorCpuTime - old.executorCpuTime, - 0L, 0L, 0L, - metrics.memoryBytesSpilled - old.memoryBytesSpilled, - metrics.diskBytesSpilled - old.diskBytesSpilled, - 0L, - inputDelta, - outputDelta, - shuffleReadDelta, - shuffleWriteDelta) - } - - override protected def doUpdate(): Any = { + private def buildUpdate(): TaskDataWrapper = { val duration = if (info.finished) { info.duration } else { info.timeRunning(lastUpdateTime.getOrElse(System.currentTimeMillis())) } - val task = new v1.TaskData( + new TaskDataWrapper( info.taskId, info.index, info.attemptNumber, - new Date(info.launchTime), - if (info.gettingResult) Some(new Date(info.gettingResultTime)) else None, - Some(duration), - info.executorId, - info.host, - info.status, - info.taskLocality.toString(), + info.launchTime, + if (info.gettingResult) info.gettingResultTime else -1L, + duration, + weakIntern(info.executorId), + weakIntern(info.host), + weakIntern(info.status), + weakIntern(info.taskLocality.toString()), --- End diff -- I wasn't familiar with WeakInterner before, its neat. Not super important, but for most of these, wouldn't strong interning be fine? I expect the same values to come up over and over again across apps for these: executorId host status taskLocality storageLevel though weak interning makes sense for exec.hostPort accumulators
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org