Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r195886625 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +182,31 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { + if (shouldLogExecutorMetricsUpdates) { + // clear out any previous attempts, that did not have a stage completed event + val prevAttemptId = event.stageInfo.attemptNumber() - 1 + for (attemptId <- 0 to prevAttemptId) { + liveStageExecutorMetrics.remove((event.stageInfo.stageId, attemptId)) + } + + // log the peak executor metrics for the stage, for each live executor, + // whether or not the executor is running tasks for the stage + val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() + val executorMap = liveStageExecutorMetrics.remove( + (event.stageInfo.stageId, event.stageInfo.attemptNumber())) + executorMap.foreach { + executorEntry => { + for ((executorId, peakExecutorMetrics) <- executorEntry) { + val executorMetrics = new ExecutorMetrics(-1, peakExecutorMetrics.metrics) --- End diff -- This is confusing, especially since the current code does not have the stage level metrics, just executor level. The -1 wouldn't be replaced. PeakExecutorMetrics only tracks the peak metric values for each executor (and later each executor per stage) and doesn't have a timestamp. If there is a local max (500), which is the max between T3 and T5, it would be logged at time T5, even if it happens at T3.5. In actual event order, what the driver sees when the application is running: T1: start of stage 1 T2: value of 1000 for metric m1 T3: start of stage 2 T3.5: peak value of 500 for metric m1 T4: stage 1 ends T5: stage 2 ends Suppose that 1000 (seen at T2) is the peak value of m1 between T1 and T4, so it is the peak value seen while stage 1 is running. The m1=1000 value will be dumped as the max value in an executorMetricsUpdate event right before the stage 1 end event is logged. Also suppose that 500 (seen at T3.5 is the peak value of m1 between T3 and T5, so it is the peak value seen while stage 2 is running. The m1=500 value will be dumped as the max value in an executorMetricsUpdate right before the stage 2 end event is logged. The generated Spark history log would have the following order of events: Start of stage 1 Start of stage 2 executorMetricsUpdate with m1=1000 end of stage 1 executorMetricsUpdate with m1=500 end of stage 2 When the Spark history server is reading the log, it will will create the peakExecutorMetrics for stage 2 when stage 2 starts, which is before it sees the executorMetricsUpdate with m1=1000, and so will store m1=1000 as the current peak value. When it later sees the executorMetricsUpdate with m1=500, it needs to overwrite the m1 value (and set to 500), not compare and update to the max value (which would be 1000). The -1 would indicate that the event is coming from the Spark history log, is a peak value for the stage just about to complete, and should overwrite any previous values. If the timestamp is set to a positive value, then it will do a compare and update to the max value instead.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org