This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 79362c4  [SPARK-34898][CORE] We should log 
SparkListenerExecutorMetricsUpdateEvent   of `driver` appropriately when  
`spark.eventLog.logStageExecutorMetrics` is true
79362c4 is described below

commit 79362c4efcb6bd4b575438330a14a6191cca5e4b
Author: Angerszhuuuu <angers....@gmail.com>
AuthorDate: Thu Jun 17 12:08:10 2021 -0500

    [SPARK-34898][CORE] We should log SparkListenerExecutorMetricsUpdateEvent   
of `driver` appropriately when  `spark.eventLog.logStageExecutorMetrics` is true
    
    ### What changes were proposed in this pull request?
    In current EventLoggingListener, we won't write 
SparkListenerExecutorMetricsUpdate message to event log file at all
    
    ```
    override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
      if (shouldLogStageExecutorMetrics) {
        event.executorUpdates.foreach { case (stageKey1, newPeaks) =>
          liveStageExecutorMetrics.foreach { case (stageKey2, 
metricsPerExecutor) =>
            // If the update came from the driver, stageKey1 will be the dummy 
key (-1, -1),
            // so record those peaks for all active stages.
            // Otherwise, record the peaks for the matching stage.
            if (stageKey1 == DRIVER_STAGE_KEY || stageKey1 == stageKey2) {
              val metrics = metricsPerExecutor.getOrElseUpdate(
                event.execId, new ExecutorMetrics())
              metrics.compareAndUpdatePeakValues(newPeaks)
            }
          }
        }
      }
    }
    ```
    
    In history server's restful API about executor, we can get Executor's 
metrics but can't get all driver's metrics. Executor's executor metrics can be 
updated with TaskEnd event etc...
    
    So in this pr, I add support to log SparkListenerExecutorMetricsUpdateEvent 
of `driver` when `spark.eventLog.logStageExecutorMetrics` is true.
    
    ### Why are the changes needed?
    Make user can got driver's peakMemoryMetrics in SHS.
    
    ### Does this PR introduce _any_ user-facing change?
     user can got driver's executor metrics in SHS's restful API.
    
    ### How was this patch tested?
    Mannul test
    
    Closes #31992 from AngersZhuuuu/SPARK-34898.
    
    Lead-authored-by: Angerszhuuuu <angers....@gmail.com>
    Co-authored-by: AngersZhuuuu <angers....@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../scala/org/apache/spark/scheduler/EventLoggingListener.scala  | 5 ++++-
 .../org/apache/spark/scheduler/EventLoggingListenerSuite.scala   | 9 +++++++--
 2 files changed, 11 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala 
b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index c57894b..cfbaa46 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.conf.Configuration
 import org.json4s.JsonAST.JValue
 import org.json4s.jackson.JsonMethods._
 
-import org.apache.spark.{SPARK_VERSION, SparkConf}
+import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.deploy.history.EventLogFileWriter
 import org.apache.spark.executor.ExecutorMetrics
@@ -250,6 +250,9 @@ private[spark] class EventLoggingListener(
 
   override def onExecutorMetricsUpdate(event: 
SparkListenerExecutorMetricsUpdate): Unit = {
     if (shouldLogStageExecutorMetrics) {
+      if (event.execId == SparkContext.DRIVER_IDENTIFIER) {
+        logEvent(event)
+      }
       event.executorUpdates.foreach { case (stageKey1, newPeaks) =>
         liveStageExecutorMetrics.foreach { case (stageKey2, 
metricsPerExecutor) =>
           // If the update came from the driver, stageKey1 will be the dummy 
key (-1, -1),
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index 240774d..09ad223 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -515,14 +515,15 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
     try {
       val lines = readLines(logData)
       val logStart = SparkListenerLogStart(SPARK_VERSION)
-      assert(lines.size === 22)
+      assert(lines.size === 25)
       assert(lines(0).contains("SparkListenerLogStart"))
       assert(lines(1).contains("SparkListenerApplicationStart"))
       assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
       var logIdx = 1
       events.foreach { event =>
         event match {
-          case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
+          case metricsUpdate: SparkListenerExecutorMetricsUpdate
+            if metricsUpdate.execId != SparkContext.DRIVER_IDENTIFIER =>
           case stageCompleted: SparkListenerStageCompleted =>
             val execIds = Set[String]()
             (1 to 3).foreach { _ =>
@@ -618,6 +619,10 @@ class EventLoggingListenerSuite extends SparkFunSuite with 
LocalSparkContext wit
         assert(expected.stageInfo.stageId === actual.stageInfo.stageId)
       case (expected: SparkListenerTaskEnd, actual: SparkListenerTaskEnd) =>
         assert(expected.stageId === actual.stageId)
+      case (expected: SparkListenerExecutorMetricsUpdate,
+          actual: SparkListenerExecutorMetricsUpdate) =>
+        assert(expected.execId == actual.execId)
+        assert(expected.execId == SparkContext.DRIVER_IDENTIFIER)
       case (expected: SparkListenerEvent, actual: SparkListenerEvent) =>
         assert(expected === actual)
     }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to