Github user edwinalu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21221#discussion_r195538848
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ---
    @@ -169,6 +182,31 @@ private[spark] class EventLoggingListener(
     
       // Events that trigger a flush
       override def onStageCompleted(event: SparkListenerStageCompleted): Unit 
= {
    +    if (shouldLogExecutorMetricsUpdates) {
    +      // clear out any previous attempts, that did not have a stage 
completed event
    +      val prevAttemptId = event.stageInfo.attemptNumber() - 1
    +      for (attemptId <- 0 to prevAttemptId) {
    +        liveStageExecutorMetrics.remove((event.stageInfo.stageId, 
attemptId))
    +      }
    +
    +      // log the peak executor metrics for the stage, for each live 
executor,
    +      // whether or not the executor is running tasks for the stage
    +      val accumUpdates = new ArrayBuffer[(Long, Int, Int, 
Seq[AccumulableInfo])]()
    +      val executorMap = liveStageExecutorMetrics.remove(
    +        (event.stageInfo.stageId, event.stageInfo.attemptNumber()))
    +      executorMap.foreach {
    +       executorEntry => {
    +          for ((executorId, peakExecutorMetrics) <- executorEntry) {
    +            val executorMetrics = new ExecutorMetrics(-1, 
peakExecutorMetrics.metrics)
    --- End diff --
    
    We need to pass in a value for timestamp, but there isn't really one for 
the peak metrics, since times for each peak could be different. 
    
    When processing, -1 will help indicate that the event is coming from the 
history log, and contains the peak values for the stage that is just ending. 
When updating the stage executor peaks (peak executor values stored for each 
active stage), we can replace all of the peak executor metric values instead of 
updating with the max of current and new values for each metric. 
    
    As an example, suppose there is the following scenario:
    T1: start of stage 1
    T2: peak value of 1000 for metric m1
    T3: start of stage 2
    T4: stage 1 ends, and peak metric values for stage 1 are logged, including 
m1=1000
    T5: stage 2 ends, and peak metric values for stage 2 are logged.
    
    If values for m1 are < 1000 between T3 (start of stage 2) and T5 (end of 
stage 2), and say that the highest value for m1 during that period is 500, then 
we want the peak value for m1 for stage 2 to show as 500.
    
    There would be an ExecutorMetricsUpdate event logged (and then read) at T4 
(end of stage 1), with m1=1000, which is after T3 (start of stage 2). If when 
reading the history log, we set the stage 2 peakExecutorMetrics to the max of 
current or new values from ExecutorMetricsUpdate, then the value for stage 2 
would remain at 1000. However, we want it to be replaced by the value of 500 
when it gets the ExecutorMetricsUpdate logged at T5 (end of stage 2). During 
processing of ExecutorMetricsUpdate, for the stage level metrics, it will 
replace all the peakExecutorMetrics if timestamp is -1.
    
    I can add some comments for this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to