Github user edwinalu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21221#discussion_r195886625
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
    @@ -169,6 +182,31 @@ private[spark] class EventLoggingListener(
     
       // Events that trigger a flush
       override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
    +    if (shouldLogExecutorMetricsUpdates) {
    +      // clear out any previous attempts, that did not have a stage 
completed event
    +      val prevAttemptId = event.stageInfo.attemptNumber() - 1
    +      for (attemptId <- 0 to prevAttemptId) {
    +        liveStageExecutorMetrics.remove((event.stageInfo.stageId, 
attemptId))
    +      }
    +
    +      // log the peak executor metrics for the stage, for each live 
executor,
    +      // whether or not the executor is running tasks for the stage
    +      val accumUpdates = new ArrayBuffer[(Long, Int, Int, 
Seq[AccumulableInfo])]()
    +      val executorMap = liveStageExecutorMetrics.remove(
    +        (event.stageInfo.stageId, event.stageInfo.attemptNumber()))
    +      executorMap.foreach {
    +       executorEntry => {
    +          for ((executorId, peakExecutorMetrics) <- executorEntry) {
    +            val executorMetrics = new ExecutorMetrics(-1, 
peakExecutorMetrics.metrics)
    --- End diff --
    
    This is confusing, especially since the current code does not have the 
stage level metrics, just executor level. 
    
    The -1 wouldn't be replaced. PeakExecutorMetrics only tracks the peak 
metric values for each executor (and later each executor per stage) and doesn't 
have a timestamp.
    
    If there is a local max (500), which is the max between T3 and T5, it would 
be logged at time T5, even if it happens at T3.5.
    
    In actual event order, what the driver sees when the application is running:
    T1: start of stage 1
    T2: value of 1000 for metric m1
    T3: start of stage 2
    T3.5: peak value of 500 for metric m1
    T4: stage 1 ends
    T5: stage 2 ends
    
    Suppose that 1000 (seen at T2) is the peak value of m1 between T1 and T4, 
so it is the peak value seen while stage 1 is running. The m1=1000 value will 
be dumped as the max value in an executorMetricsUpdate event right before the 
stage 1 end event is logged. Also suppose that 500 (seen at T3.5 is the peak 
value of m1 between T3 and T5, so it is the peak value seen while stage 2 is 
running. The m1=500 value will be dumped as the max value in an 
executorMetricsUpdate right before the stage 2 end event is logged.
    
    The generated Spark history log would have the following order of events:
    
    Start of stage 1
    Start of stage 2
    executorMetricsUpdate with m1=1000
    end of stage 1
    executorMetricsUpdate with m1=500
    end of stage 2
    
    When the Spark history server is reading the log, it will will create the 
peakExecutorMetrics for stage 2 when stage 2 starts, which is before it sees 
the executorMetricsUpdate with m1=1000, and so will store m1=1000 as the 
current peak value. When it later sees the executorMetricsUpdate with m1=500, 
it needs to overwrite the m1 value (and set to 500), not compare and update to 
the max value (which would be 1000). The -1 would indicate that the event is 
coming from the Spark history log, is a peak value for the stage just about to 
complete, and should overwrite any previous values.
    
    If the timestamp is set to a positive value, then it will do a compare and 
update to the max value instead.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to