Github user edwinalu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20940#discussion_r179978186
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
    @@ -251,6 +260,163 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
         }
       }
     
    +  /**
    +   * Test executor metrics update logging functionality. This checks that a
    +   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
    +   * log if one of the executor metrics is larger than any previously
    +   * recorded value for the metric, per executor per stage. The task 
metrics
    +   * should not be added.
    +   */
    +  private def testExecutorMetricsUpdateEventLogging() {
    +    val conf = getLoggingConf(testDirPath, None)
    +    val logName = "executorMetricsUpdated-test"
    +    val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
    +    val listenerBus = new LiveListenerBus(conf)
    +
    +    // list of events and if they should be logged
    +    val events = Array(
    +      (SparkListenerApplicationStart("executionMetrics", None,
    +        1L, "update", None), true),
    +      (createExecutorAddedEvent(1), true),
    +      (createExecutorAddedEvent(2), true),
    +      (createStageSubmittedEvent(0), true),
    +      (createExecutorMetricsUpdateEvent(1, 10L, 5000L, 50L, 0L, 0L, 0L), 
true), // new stage
    +      (createExecutorMetricsUpdateEvent(2, 10L, 3500L, 20L, 0L, 0L, 0L), 
true), // new stage
    +      (createExecutorMetricsUpdateEvent(1, 15L, 4000L, 50L, 0L, 0L, 0L), 
false),
    +      (createExecutorMetricsUpdateEvent(2, 15L, 3500L, 10L, 0L, 20L, 0L), 
true), // onheap storage
    +      (createExecutorMetricsUpdateEvent(1, 20L, 6000L, 50L, 0L, 30L, 0L), 
true), // JVM used
    +      (createExecutorMetricsUpdateEvent(2, 20L, 3500L, 15L, 0L, 20L, 0L), 
true), // onheap unified
    +      (createStageSubmittedEvent(1), true),
    +      (createExecutorMetricsUpdateEvent(1, 25L, 3000L, 15L, 0L, 0L, 0L), 
true), // new stage
    +      (createExecutorMetricsUpdateEvent(2, 25L, 6000L, 50L, 0L, 0L, 0L), 
true), // new stage
    +      (createStageCompletedEvent(0), true),
    +      (createExecutorMetricsUpdateEvent(1, 30L, 3000L, 20L, 0L, 0L, 0L), 
true), // onheap execution
    +      (createExecutorMetricsUpdateEvent(2, 30L, 5500L, 20L, 0L, 0L, 0L), 
false),
    +      (createExecutorMetricsUpdateEvent(1, 35L, 3000L, 5L, 25L, 0L, 0L), 
true), // offheap execution
    +      (createExecutorMetricsUpdateEvent(2, 35L, 5500L, 25L, 0L, 0L, 30L), 
true), // offheap storage
    +      (createExecutorMetricsUpdateEvent(1, 40L, 3000L, 8L, 20L, 0L, 0L), 
false),
    +      (createExecutorMetricsUpdateEvent(2, 40L, 5500L, 25L, 0L, 0L, 30L), 
false),
    +      (createStageCompletedEvent(1), true),
    +      (SparkListenerApplicationEnd(1000L), true))
    +
    +    // play the events for the event logger
    +    eventLogger.start()
    +    listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
    +    listenerBus.addToEventLogQueue(eventLogger)
    +    for ((event, included) <- events) {
    +      listenerBus.post(event)
    +    }
    +    listenerBus.stop()
    +    eventLogger.stop()
    +
    +    // Verify the log file contains the expected events
    +    val logData = EventLoggingListener.openEventLog(new 
Path(eventLogger.logPath), fileSystem)
    +    try {
    +      val lines = readLines(logData)
    +      val logStart = SparkListenerLogStart(SPARK_VERSION)
    +      assert(lines.size === 19)
    +      assert(lines(0).contains("SparkListenerLogStart"))
    +      assert(lines(1).contains("SparkListenerApplicationStart"))
    +      assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
    +      var i = 1
    +      for ((event, included) <- events) {
    +        if (included) {
    +          checkEvent(lines(i), event)
    +          i += 1
    +        }
    +      }
    +    } finally {
    +      logData.close()
    +    }
    +  }
    +
    +  /** Create a stage submitted event for the specified stage Id. */
    +  private def createStageSubmittedEvent(stageId: Int) =
    +    SparkListenerStageSubmitted(new StageInfo(stageId, 0, 
stageId.toString, 0,
    +      Seq.empty, Seq.empty, "details"))
    +
    +  /** Create a stage completed event for the specified stage Id. */
    +  private def createStageCompletedEvent(stageId: Int) =
    +    SparkListenerStageCompleted(new StageInfo(stageId, 0, 
stageId.toString, 0,
    +      Seq.empty, Seq.empty, "details"))
    +
    +  /** Create an executor added event for the specified executor Id. */
    +  private def createExecutorAddedEvent(executorId: Int) =
    +    SparkListenerExecutorAdded(0L, executorId.toString, new 
ExecutorInfo("host1", 1, Map.empty))
    +
    +  /** Create an executor metrics update event, with the specified executor 
metrics values. */
    +  private def createExecutorMetricsUpdateEvent(
    +    executorId: Int, time: Long,
    --- End diff --
    
    Fixed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to