Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20013#discussion_r160017859
  
    --- Diff: core/src/main/scala/org/apache/spark/status/LiveEntity.scala ---
    @@ -119,118 +121,115 @@ private class LiveTask(
     
       import LiveEntityHelpers._
     
    -  private var recordedMetrics: v1.TaskMetrics = null
    +  private var metrics: MetricsTracker = new MetricsTracker()
     
       var errorMessage: Option[String] = None
     
       /**
        * Update the metrics for the task and return the difference between the 
previous and new
        * values.
        */
    -  def updateMetrics(metrics: TaskMetrics): v1.TaskMetrics = {
    +  def updateMetrics(metrics: TaskMetrics): MetricsTracker = {
         if (metrics != null) {
    -      val old = recordedMetrics
    -      recordedMetrics = new v1.TaskMetrics(
    -        metrics.executorDeserializeTime,
    -        metrics.executorDeserializeCpuTime,
    -        metrics.executorRunTime,
    -        metrics.executorCpuTime,
    -        metrics.resultSize,
    -        metrics.jvmGCTime,
    -        metrics.resultSerializationTime,
    -        metrics.memoryBytesSpilled,
    -        metrics.diskBytesSpilled,
    -        metrics.peakExecutionMemory,
    -        new v1.InputMetrics(
    -          metrics.inputMetrics.bytesRead,
    -          metrics.inputMetrics.recordsRead),
    -        new v1.OutputMetrics(
    -          metrics.outputMetrics.bytesWritten,
    -          metrics.outputMetrics.recordsWritten),
    -        new v1.ShuffleReadMetrics(
    -          metrics.shuffleReadMetrics.remoteBlocksFetched,
    -          metrics.shuffleReadMetrics.localBlocksFetched,
    -          metrics.shuffleReadMetrics.fetchWaitTime,
    -          metrics.shuffleReadMetrics.remoteBytesRead,
    -          metrics.shuffleReadMetrics.remoteBytesReadToDisk,
    -          metrics.shuffleReadMetrics.localBytesRead,
    -          metrics.shuffleReadMetrics.recordsRead),
    -        new v1.ShuffleWriteMetrics(
    -          metrics.shuffleWriteMetrics.bytesWritten,
    -          metrics.shuffleWriteMetrics.writeTime,
    -          metrics.shuffleWriteMetrics.recordsWritten))
    -      if (old != null) calculateMetricsDelta(recordedMetrics, old) else 
recordedMetrics
    +      val old = this.metrics
    +      val newMetrics = new MetricsTracker()
    +      newMetrics.executorDeserializeTime = metrics.executorDeserializeTime
    +      newMetrics.executorDeserializeCpuTime = 
metrics.executorDeserializeCpuTime
    +      newMetrics.executorRunTime = metrics.executorRunTime
    +      newMetrics.executorCpuTime = metrics.executorCpuTime
    +      newMetrics.resultSize = metrics.resultSize
    +      newMetrics.jvmGcTime = metrics.jvmGCTime
    +      newMetrics.resultSerializationTime = metrics.resultSerializationTime
    +      newMetrics.memoryBytesSpilled = metrics.memoryBytesSpilled
    +      newMetrics.diskBytesSpilled = metrics.diskBytesSpilled
    +      newMetrics.peakExecutionMemory = metrics.peakExecutionMemory
    +      newMetrics.inputBytesRead = metrics.inputMetrics.bytesRead
    +      newMetrics.inputRecordsRead = metrics.inputMetrics.recordsRead
    +      newMetrics.outputBytesWritten = metrics.outputMetrics.bytesWritten
    +      newMetrics.outputRecordsWritten = 
metrics.outputMetrics.recordsWritten
    +      newMetrics.shuffleRemoteBlocksFetched = 
metrics.shuffleReadMetrics.remoteBlocksFetched
    +      newMetrics.shuffleLocalBlocksFetched = 
metrics.shuffleReadMetrics.localBlocksFetched
    +      newMetrics.shuffleFetchWaitTime = 
metrics.shuffleReadMetrics.fetchWaitTime
    +      newMetrics.shuffleRemoteBytesRead = 
metrics.shuffleReadMetrics.remoteBytesRead
    +      newMetrics.shuffleRemoteBytesReadToDisk = 
metrics.shuffleReadMetrics.remoteBytesReadToDisk
    +      newMetrics.shuffleLocalBytesRead = 
metrics.shuffleReadMetrics.localBytesRead
    +      newMetrics.shuffleRecordsRead = 
metrics.shuffleReadMetrics.recordsRead
    +      newMetrics.shuffleBytesWritten = 
metrics.shuffleWriteMetrics.bytesWritten
    +      newMetrics.shuffleWriteTime = metrics.shuffleWriteMetrics.writeTime
    +      newMetrics.shuffleRecordsWritten = 
metrics.shuffleWriteMetrics.recordsWritten
    +
    +      this.metrics = newMetrics
    +      if (old.executorDeserializeTime >= 0L) {
    +        old.subtract(newMetrics)
    +        old
    +      } else {
    +        newMetrics
    +      }
         } else {
           null
         }
       }
     
    -  /**
    -   * Return a new TaskMetrics object containing the delta of the various 
fields of the given
    -   * metrics objects. This is currently targeted at updating stage data, 
so it does not
    -   * necessarily calculate deltas for all the fields.
    -   */
    -  private def calculateMetricsDelta(
    -      metrics: v1.TaskMetrics,
    -      old: v1.TaskMetrics): v1.TaskMetrics = {
    -    val shuffleWriteDelta = new v1.ShuffleWriteMetrics(
    -      metrics.shuffleWriteMetrics.bytesWritten - 
old.shuffleWriteMetrics.bytesWritten,
    -      0L,
    -      metrics.shuffleWriteMetrics.recordsWritten - 
old.shuffleWriteMetrics.recordsWritten)
    -
    -    val shuffleReadDelta = new v1.ShuffleReadMetrics(
    -      0L, 0L, 0L,
    -      metrics.shuffleReadMetrics.remoteBytesRead - 
old.shuffleReadMetrics.remoteBytesRead,
    -      metrics.shuffleReadMetrics.remoteBytesReadToDisk -
    -        old.shuffleReadMetrics.remoteBytesReadToDisk,
    -      metrics.shuffleReadMetrics.localBytesRead - 
old.shuffleReadMetrics.localBytesRead,
    -      metrics.shuffleReadMetrics.recordsRead - 
old.shuffleReadMetrics.recordsRead)
    -
    -    val inputDelta = new v1.InputMetrics(
    -      metrics.inputMetrics.bytesRead - old.inputMetrics.bytesRead,
    -      metrics.inputMetrics.recordsRead - old.inputMetrics.recordsRead)
    -
    -    val outputDelta = new v1.OutputMetrics(
    -      metrics.outputMetrics.bytesWritten - old.outputMetrics.bytesWritten,
    -      metrics.outputMetrics.recordsWritten - 
old.outputMetrics.recordsWritten)
    -
    -    new v1.TaskMetrics(
    -      0L, 0L,
    -      metrics.executorRunTime - old.executorRunTime,
    -      metrics.executorCpuTime - old.executorCpuTime,
    -      0L, 0L, 0L,
    -      metrics.memoryBytesSpilled - old.memoryBytesSpilled,
    -      metrics.diskBytesSpilled - old.diskBytesSpilled,
    -      0L,
    -      inputDelta,
    -      outputDelta,
    -      shuffleReadDelta,
    -      shuffleWriteDelta)
    -  }
    -
    -  override protected def doUpdate(): Any = {
    +  private def buildUpdate(): TaskDataWrapper = {
         val duration = if (info.finished) {
           info.duration
         } else {
           
info.timeRunning(lastUpdateTime.getOrElse(System.currentTimeMillis()))
         }
     
    -    val task = new v1.TaskData(
    +    new TaskDataWrapper(
           info.taskId,
           info.index,
           info.attemptNumber,
    -      new Date(info.launchTime),
    -      if (info.gettingResult) Some(new Date(info.gettingResultTime)) else 
None,
    -      Some(duration),
    -      info.executorId,
    -      info.host,
    -      info.status,
    -      info.taskLocality.toString(),
    +      info.launchTime,
    +      if (info.gettingResult) info.gettingResultTime else -1L,
    +      duration,
    +      weakIntern(info.executorId),
    +      weakIntern(info.host),
    +      weakIntern(info.status),
    +      weakIntern(info.taskLocality.toString()),
    --- End diff --
    
    I wasn't familiar with WeakInterner before, its neat.  Not super important, 
but for most of these, wouldn't strong interning be fine?  I expect the same 
values to come up over and over again across apps for these:
    
    executorId
    host
    status
    taskLocality
    storageLevel
    
    though weak interning makes sense for
    
    exec.hostPort
    accumulators


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to