squito commented on a change in pull request #23767: [SPARK-26329][CORE] Faster polling of executor memory metrics. URL: https://github.com/apache/spark/pull/23767#discussion_r304109127
########## File path: core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala ########## @@ -268,12 +278,16 @@ private[spark] class EventLoggingListener( override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { if (shouldLogStageExecutorMetrics) { - // For the active stages, record any new peak values for the memory metrics for the executor - event.executorUpdates.foreach { executorUpdates => - liveStageExecutorMetrics.values.foreach { peakExecutorMetrics => - val peakMetrics = peakExecutorMetrics.getOrElseUpdate( - event.execId, new ExecutorMetrics()) - peakMetrics.compareAndUpdatePeakValues(executorUpdates) + event.executorUpdates.foreach { case (stageKey1, peaks) => + liveStageExecutorMetrics.foreach { case (stageKey2, metricsPerExecutor) => + // If the update came from the driver, stageKey1 will be the dummy key (-1, -1), + // so record those peaks for all active stages. + // Otherwise, record the peaks for the matching stage. + if (stageKey1 == DRIVER_STAGE_KEY || stageKey1 == stageKey2) { + val metrics = metricsPerExecutor.getOrElseUpdate( + event.execId, new ExecutorMetrics()) + metrics.compareAndUpdatePeakValues(peaks) Review comment: minor, can you rename `peaks` -> `newPeaks` ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org