Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r196236364 --- Diff: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala --- @@ -169,6 +182,31 @@ private[spark] class EventLoggingListener( // Events that trigger a flush override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { + if (shouldLogExecutorMetricsUpdates) { + // clear out any previous attempts, that did not have a stage completed event + val prevAttemptId = event.stageInfo.attemptNumber() - 1 + for (attemptId <- 0 to prevAttemptId) { + liveStageExecutorMetrics.remove((event.stageInfo.stageId, attemptId)) + } + + // log the peak executor metrics for the stage, for each live executor, + // whether or not the executor is running tasks for the stage + val accumUpdates = new ArrayBuffer[(Long, Int, Int, Seq[AccumulableInfo])]() + val executorMap = liveStageExecutorMetrics.remove( + (event.stageInfo.stageId, event.stageInfo.attemptNumber())) + executorMap.foreach { + executorEntry => { + for ((executorId, peakExecutorMetrics) <- executorEntry) { + val executorMetrics = new ExecutorMetrics(-1, peakExecutorMetrics.metrics) --- End diff -- Just discussed with @squito -- thanks! Logging -1 for timestamp is confusing and hacky. Some items discussed: For ExecutorMetrics, timestamp can be optional, or it can be removed completely and replaced by Array[Long], with comments explaining how the metrics work. For logging, stage ID could be added as part of the executorMetrics in SparkListenerExecutorMetricsUpdate, but this is awkward, since this information isn't used as part of the Heartbeat, and only for logging. While the application is running, there could be multiple stages running when the metrics are gathered, so specifying 1 stage ID doesn't make sense. For logging, the metrics are the peak values for a particular stage, so are associated with a stage. Another option is to add the information to SparkListenerStageCompleted, but this would bloat the event if there are many executors. A third option is to create a new event, SparkListenerStageExecutorMetrics, which would have the executor ID, stage ID and attempt, and peak metrics. I'll give the 3rd option a try, and will add details to the design doc once this is more finalized.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org