Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20013#discussion_r160222252
  
    --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---
    @@ -119,118 +121,115 @@ private class LiveTask(
     
       import LiveEntityHelpers._
     
    -  private var recordedMetrics: v1.TaskMetrics = null
    +  private var metrics: MetricsTracker = new MetricsTracker()
     
       var errorMessage: Option[String] = None
     
       /**
        * Update the metrics for the task and return the difference between the 
previous and new
        * values.
        */
    -  def updateMetrics(metrics: TaskMetrics): v1.TaskMetrics = {
    +  def updateMetrics(metrics: TaskMetrics): MetricsTracker = {
         if (metrics != null) {
    -      val old = recordedMetrics
    -      recordedMetrics = new v1.TaskMetrics(
    -        metrics.executorDeserializeTime,
    -        metrics.executorDeserializeCpuTime,
    -        metrics.executorRunTime,
    -        metrics.executorCpuTime,
    -        metrics.resultSize,
    -        metrics.jvmGCTime,
    -        metrics.resultSerializationTime,
    -        metrics.memoryBytesSpilled,
    -        metrics.diskBytesSpilled,
    -        metrics.peakExecutionMemory,
    -        new v1.InputMetrics(
    -          metrics.inputMetrics.bytesRead,
    -          metrics.inputMetrics.recordsRead),
    -        new v1.OutputMetrics(
    -          metrics.outputMetrics.bytesWritten,
    -          metrics.outputMetrics.recordsWritten),
    -        new v1.ShuffleReadMetrics(
    -          metrics.shuffleReadMetrics.remoteBlocksFetched,
    -          metrics.shuffleReadMetrics.localBlocksFetched,
    -          metrics.shuffleReadMetrics.fetchWaitTime,
    -          metrics.shuffleReadMetrics.remoteBytesRead,
    -          metrics.shuffleReadMetrics.remoteBytesReadToDisk,
    -          metrics.shuffleReadMetrics.localBytesRead,
    -          metrics.shuffleReadMetrics.recordsRead),
    -        new v1.ShuffleWriteMetrics(
    -          metrics.shuffleWriteMetrics.bytesWritten,
    -          metrics.shuffleWriteMetrics.writeTime,
    -          metrics.shuffleWriteMetrics.recordsWritten))
    -      if (old != null) calculateMetricsDelta(recordedMetrics, old) else 
recordedMetrics
    +      val old = this.metrics
    +      val newMetrics = new MetricsTracker()
    +      newMetrics.executorDeserializeTime = metrics.executorDeserializeTime
    +      newMetrics.executorDeserializeCpuTime = 
metrics.executorDeserializeCpuTime
    +      newMetrics.executorRunTime = metrics.executorRunTime
    +      newMetrics.executorCpuTime = metrics.executorCpuTime
    +      newMetrics.resultSize = metrics.resultSize
    +      newMetrics.jvmGcTime = metrics.jvmGCTime
    +      newMetrics.resultSerializationTime = metrics.resultSerializationTime
    +      newMetrics.memoryBytesSpilled = metrics.memoryBytesSpilled
    +      newMetrics.diskBytesSpilled = metrics.diskBytesSpilled
    +      newMetrics.peakExecutionMemory = metrics.peakExecutionMemory
    +      newMetrics.inputBytesRead = metrics.inputMetrics.bytesRead
    +      newMetrics.inputRecordsRead = metrics.inputMetrics.recordsRead
    +      newMetrics.outputBytesWritten = metrics.outputMetrics.bytesWritten
    +      newMetrics.outputRecordsWritten = 
metrics.outputMetrics.recordsWritten
    +      newMetrics.shuffleRemoteBlocksFetched = 
metrics.shuffleReadMetrics.remoteBlocksFetched
    +      newMetrics.shuffleLocalBlocksFetched = 
metrics.shuffleReadMetrics.localBlocksFetched
    +      newMetrics.shuffleFetchWaitTime = 
metrics.shuffleReadMetrics.fetchWaitTime
    +      newMetrics.shuffleRemoteBytesRead = 
metrics.shuffleReadMetrics.remoteBytesRead
    +      newMetrics.shuffleRemoteBytesReadToDisk = 
metrics.shuffleReadMetrics.remoteBytesReadToDisk
    +      newMetrics.shuffleLocalBytesRead = 
metrics.shuffleReadMetrics.localBytesRead
    +      newMetrics.shuffleRecordsRead = 
metrics.shuffleReadMetrics.recordsRead
    +      newMetrics.shuffleBytesWritten = 
metrics.shuffleWriteMetrics.bytesWritten
    +      newMetrics.shuffleWriteTime = metrics.shuffleWriteMetrics.writeTime
    +      newMetrics.shuffleRecordsWritten = 
metrics.shuffleWriteMetrics.recordsWritten
    +
    +      this.metrics = newMetrics
    +      if (old.executorDeserializeTime >= 0L) {
    +        old.subtract(newMetrics)
    +        old
    +      } else {
    +        newMetrics
    +      }
         } else {
           null
         }
       }
     
    -  /**
    -   * Return a new TaskMetrics object containing the delta of the various 
fields of the given
    -   * metrics objects. This is currently targeted at updating stage data, 
so it does not
    -   * necessarily calculate deltas for all the fields.
    -   */
    -  private def calculateMetricsDelta(
    -      metrics: v1.TaskMetrics,
    -      old: v1.TaskMetrics): v1.TaskMetrics = {
    -    val shuffleWriteDelta = new v1.ShuffleWriteMetrics(
    -      metrics.shuffleWriteMetrics.bytesWritten - 
old.shuffleWriteMetrics.bytesWritten,
    -      0L,
    -      metrics.shuffleWriteMetrics.recordsWritten - 
old.shuffleWriteMetrics.recordsWritten)
    -
    -    val shuffleReadDelta = new v1.ShuffleReadMetrics(
    -      0L, 0L, 0L,
    -      metrics.shuffleReadMetrics.remoteBytesRead - 
old.shuffleReadMetrics.remoteBytesRead,
    -      metrics.shuffleReadMetrics.remoteBytesReadToDisk -
    -        old.shuffleReadMetrics.remoteBytesReadToDisk,
    -      metrics.shuffleReadMetrics.localBytesRead - 
old.shuffleReadMetrics.localBytesRead,
    -      metrics.shuffleReadMetrics.recordsRead - 
old.shuffleReadMetrics.recordsRead)
    -
    -    val inputDelta = new v1.InputMetrics(
    -      metrics.inputMetrics.bytesRead - old.inputMetrics.bytesRead,
    -      metrics.inputMetrics.recordsRead - old.inputMetrics.recordsRead)
    -
    -    val outputDelta = new v1.OutputMetrics(
    -      metrics.outputMetrics.bytesWritten - old.outputMetrics.bytesWritten,
    -      metrics.outputMetrics.recordsWritten - 
old.outputMetrics.recordsWritten)
    -
    -    new v1.TaskMetrics(
    -      0L, 0L,
    -      metrics.executorRunTime - old.executorRunTime,
    -      metrics.executorCpuTime - old.executorCpuTime,
    -      0L, 0L, 0L,
    -      metrics.memoryBytesSpilled - old.memoryBytesSpilled,
    -      metrics.diskBytesSpilled - old.diskBytesSpilled,
    -      0L,
    -      inputDelta,
    -      outputDelta,
    -      shuffleReadDelta,
    -      shuffleWriteDelta)
    -  }
    -
    -  override protected def doUpdate(): Any = {
    +  private def buildUpdate(): TaskDataWrapper = {
         val duration = if (info.finished) {
           info.duration
         } else {
           
info.timeRunning(lastUpdateTime.getOrElse(System.currentTimeMillis()))
         }
     
    -    val task = new v1.TaskData(
    +    new TaskDataWrapper(
           info.taskId,
           info.index,
           info.attemptNumber,
    -      new Date(info.launchTime),
    -      if (info.gettingResult) Some(new Date(info.gettingResultTime)) else 
None,
    -      Some(duration),
    -      info.executorId,
    -      info.host,
    -      info.status,
    -      info.taskLocality.toString(),
    +      info.launchTime,
    +      if (info.gettingResult) info.gettingResultTime else -1L,
    +      duration,
    +      weakIntern(info.executorId),
    +      weakIntern(info.host),
    +      weakIntern(info.status),
    +      weakIntern(info.taskLocality.toString()),
    --- End diff --
    
    I don't think weak vs. strong interning makes a whole lot of difference 
with Java 8 (= no perm gen). I used the weak one because that's what the old 
code used.
    
    It can probably be extended to more things, but tasks are the bulk of an 
app's data, and that's what I was focusing on in this PR.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to