Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195538848 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +182,31 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { + if (shouldLogExecutorMetricsUpdates) { + // clear out any previous attempts, that did not have a stage completed event + val prevAttemptId = event.stageInfo.attemptNumber() - 1 + for (attemptId <- 0 to prevAttemptId) { + liveStageExecutorMetrics.remove((event.stageInfo.stageId, attemptId)) + } + + // log the peak executor metrics for the stage, for each live executor, + // whether or not the executor is running tasks for the stage + val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() + val executorMap = liveStageExecutorMetrics.remove( + (event.stageInfo.stageId, event.stageInfo.attemptNumber())) + executorMap.foreach { + executorEntry => { + for ((executorId, peakExecutorMetrics) <- executorEntry) { + val executorMetrics = new ExecutorMetrics(-1, peakExecutorMetrics.metrics) --- End diff -- We need to pass in a value for timestamp, but there isn't really one for the peak metrics, since times for each peak could be different. When processing, -1 will help indicate that the event is coming from the history log, and contains the peak values for the stage that is just ending. When updating the stage executor peaks (peak executor values stored for each active stage), we can replace all of the peak executor metric values instead of updating with the max of current and new values for each metric. As an example, suppose there is the following scenario: T1: start of stage 1 T2: peak value of 1000 for metric m1 T3: start of stage 2 T4: stage 1 ends, and peak metric values for stage 1 are logged, including m1=1000 T5: stage 2 ends, and peak metric values for stage 2 are logged. If values for m1 are < 1000 between T3 (start of stage 2) and T5 (end of stage 2), and say that the highest value for m1 during that period is 500, then we want the peak value for m1 for stage 2 to show as 500. There would be an ExecutorMetricsUpdate event logged (and then read) at T4 (end of stage 1), with m1=1000, which is after T3 (start of stage 2). If when reading the history log, we set the stage 2 peakExecutorMetrics to the max of current or new values from ExecutorMetricsUpdate, then the value for stage 2 would remain at 1000. However, we want it to be replaced by the value of 500 when it gets the ExecutorMetricsUpdate logged at T5 (end of stage 2). During processing of ExecutorMetricsUpdate, for the stage level metrics, it will replace all the peakExecutorMetrics if timestamp is -1. I can add some comments for this.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org