This is an automated email from the ASF dual-hosted git repository. mridulm80 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 79362c4 [SPARK-34898][CORE] We should log SparkListenerExecutorMetricsUpdateEvent of `driver` appropriately when `spark.eventLog.logStageExecutorMetrics` is true 79362c4 is described below commit 79362c4efcb6bd4b575438330a14a6191cca5e4b Author: Angerszhuuuu <angers....@gmail.com> AuthorDate: Thu Jun 17 12:08:10 2021 -0500 [SPARK-34898][CORE] We should log SparkListenerExecutorMetricsUpdateEvent of `driver` appropriately when `spark.eventLog.logStageExecutorMetrics` is true ### What changes were proposed in this pull request? In current EventLoggingListener, we won't write SparkListenerExecutorMetricsUpdate message to event log file at all ``` override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { if (shouldLogStageExecutorMetrics) { event.executorUpdates.foreach { case (stageKey1, newPeaks) => liveStageExecutorMetrics.foreach { case (stageKey2, metricsPerExecutor) => // If the update came from the driver, stageKey1 will be the dummy key (-1, -1), // so record those peaks for all active stages. // Otherwise, record the peaks for the matching stage. if (stageKey1 == DRIVER_STAGE_KEY || stageKey1 == stageKey2) { val metrics = metricsPerExecutor.getOrElseUpdate( event.execId, new ExecutorMetrics()) metrics.compareAndUpdatePeakValues(newPeaks) } } } } } ``` In history server's restful API about executor, we can get Executor's metrics but can't get all driver's metrics. Executor's executor metrics can be updated with TaskEnd event etc... So in this pr, I add support to log SparkListenerExecutorMetricsUpdateEvent of `driver` when `spark.eventLog.logStageExecutorMetrics` is true. ### Why are the changes needed? Make user can got driver's peakMemoryMetrics in SHS. ### Does this PR introduce _any_ user-facing change? user can got driver's executor metrics in SHS's restful API. ### How was this patch tested? Mannul test Closes #31992 from AngersZhuuuu/SPARK-34898. Lead-authored-by: Angerszhuuuu <angers....@gmail.com> Co-authored-by: AngersZhuuuu <angers....@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> --- .../scala/org/apache/spark/scheduler/EventLoggingListener.scala | 5 ++++- .../org/apache/spark/scheduler/EventLoggingListenerSuite.scala | 9 +++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index c57894b..cfbaa46 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods._ -import org.apache.spark.{SPARK_VERSION, SparkConf} +import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.history.EventLogFileWriter import org.apache.spark.executor.ExecutorMetrics @@ -250,6 +250,9 @@ private[spark] class EventLoggingListener( override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { if (shouldLogStageExecutorMetrics) { + if (event.execId == SparkContext.DRIVER_IDENTIFIER) { + logEvent(event) + } event.executorUpdates.foreach { case (stageKey1, newPeaks) => liveStageExecutorMetrics.foreach { case (stageKey2, metricsPerExecutor) => // If the update came from the driver, stageKey1 will be the dummy key (-1, -1), diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 240774d..09ad223 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -515,14 +515,15 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit try { val lines = readLines(logData) val logStart = SparkListenerLogStart(SPARK_VERSION) - assert(lines.size === 22) + assert(lines.size === 25) assert(lines(0).contains("SparkListenerLogStart")) assert(lines(1).contains("SparkListenerApplicationStart")) assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) var logIdx = 1 events.foreach { event => event match { - case metricsUpdate: SparkListenerExecutorMetricsUpdate => + case metricsUpdate: SparkListenerExecutorMetricsUpdate + if metricsUpdate.execId != SparkContext.DRIVER_IDENTIFIER => case stageCompleted: SparkListenerStageCompleted => val execIds = Set[String]() (1 to 3).foreach { _ => @@ -618,6 +619,10 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit assert(expected.stageInfo.stageId === actual.stageInfo.stageId) case (expected: SparkListenerTaskEnd, actual: SparkListenerTaskEnd) => assert(expected.stageId === actual.stageId) + case (expected: SparkListenerExecutorMetricsUpdate, + actual: SparkListenerExecutorMetricsUpdate) => + assert(expected.execId == actual.execId) + assert(expected.execId == SparkContext.DRIVER_IDENTIFIER) case (expected: SparkListenerEvent, actual: SparkListenerEvent) => assert(expected === actual) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org