Github user edwinalu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21221#discussion_r191011834
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala 
---
    @@ -251,6 +261,233 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
         }
       }
     
    +  /**
    +   * Test executor metrics update logging functionality. This checks that a
    +   * SparkListenerExecutorMetricsUpdate event is added to the Spark history
    +   * log if one of the executor metrics is larger than any previously
    +   * recorded value for the metric, per executor per stage. The task 
metrics
    +   * should not be added.
    +   */
    +  private def testExecutorMetricsUpdateEventLogging() {
    +    val conf = getLoggingConf(testDirPath, None)
    +    val logName = "executorMetricsUpdated-test"
    +    val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
    +    val listenerBus = new LiveListenerBus(conf)
    +
    +    // expected ExecutorMetricsUpdate, for the given stage id and executor 
id
    +    val expectedMetricsEvents: Map[(Int, String), 
SparkListenerExecutorMetricsUpdate] =
    +      Map(
    +        ((0, "1"),
    +          createExecutorMetricsUpdateEvent(1,
    +            new ExecutorMetrics(-1L, 5000L, 50L, 50L, 20L, 50L, 10L, 100L, 
30L, 70L, 20L))),
    +        ((0, "2"),
    +          createExecutorMetricsUpdateEvent(2,
    +            new ExecutorMetrics(-1L, 7000L, 70L, 50L, 20L, 10L, 10L, 50L, 
30L, 80L, 40L))),
    +        ((1, "1"),
    +          createExecutorMetricsUpdateEvent(1,
    +            new ExecutorMetrics(-1L, 7000L, 70L, 50L, 30L, 60L, 30L, 80L, 
55L, 50L, 0L))),
    +        ((1, "2"),
    +          createExecutorMetricsUpdateEvent(2,
    +            new ExecutorMetrics(-1L, 7000L, 70L, 50L, 40L, 10L, 30L, 50L, 
60L, 40L, 40L))))
    +
    +    // Events to post.
    +    val events = Array(
    +      SparkListenerApplicationStart("executionMetrics", None,
    +        1L, "update", None),
    +      createExecutorAddedEvent(1),
    +      createExecutorAddedEvent(2),
    +      createStageSubmittedEvent(0),
    +      createExecutorMetricsUpdateEvent(1,
    +        new ExecutorMetrics(10L, 4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 
70L, 20L)),
    +      createExecutorMetricsUpdateEvent(2,
    +        new ExecutorMetrics(10L, 1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 
70L, 0L)),
    +      createExecutorMetricsUpdateEvent(1,
    +        new ExecutorMetrics(15L, 4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 
70L, 20L)),
    +      createExecutorMetricsUpdateEvent(2,
    +        new ExecutorMetrics(15L, 2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 
70L, 0L)),
    +      createExecutorMetricsUpdateEvent(1,
    +        new ExecutorMetrics(20L, 2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 
50L, 0L)),
    +      createExecutorMetricsUpdateEvent(2,
    +        new ExecutorMetrics(20L, 3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 
80L, 0L)),
    +      createStageSubmittedEvent(1),
    +      createExecutorMetricsUpdateEvent(1,
    +        new ExecutorMetrics(25L, 5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 
50L, 0L)),
    +      createExecutorMetricsUpdateEvent(2,
    +        new ExecutorMetrics(25L, 7000L, 70L, 50L, 20L, 0L, 10L, 50L, 30L, 
10L, 40L)),
    +      createStageCompletedEvent(0),
    +      createExecutorMetricsUpdateEvent(1,
    +        new ExecutorMetrics(30L, 6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 
30L, 0L)),
    +      createExecutorMetricsUpdateEvent(2,
    +        new ExecutorMetrics(30L, 5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 
40L, 20L)),
    +      createExecutorMetricsUpdateEvent(1,
    +        new ExecutorMetrics(35L, 7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 
30L, 0L)),
    +      createExecutorMetricsUpdateEvent(2,
    +        new ExecutorMetrics(35L, 5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 
0L, 20L)),
    +      createExecutorMetricsUpdateEvent(1,
    +        new ExecutorMetrics(40L, 5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 
20L, 0L)),
    +      createExecutorRemovedEvent(1),
    +      createExecutorMetricsUpdateEvent(2,
    +        new ExecutorMetrics(40L, 4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 
0L, 0L)),
    +      createStageCompletedEvent(1),
    +      SparkListenerApplicationEnd(1000L))
    +
    +    // play the events for the event logger
    +    eventLogger.start()
    +    listenerBus.start(Mockito.mock(classOf[SparkContext]), 
Mockito.mock(classOf[MetricsSystem]))
    +    listenerBus.addToEventLogQueue(eventLogger)
    +    events.foreach(event => listenerBus.post(event))
    +    listenerBus.stop()
    +    eventLogger.stop()
    +
    +    // Verify the log file contains the expected events.
    +    // Posted events should be logged, except for ExecutorMetricsUpdate 
events -- these
    +    // are consolidated, and the peak values for each stage are logged at 
stage end.
    +    val logData = EventLoggingListener.openEventLog(new 
Path(eventLogger.logPath), fileSystem)
    +    try {
    +      val lines = readLines(logData)
    +      val logStart = SparkListenerLogStart(SPARK_VERSION)
    +      assert(lines.size === 14)
    +      assert(lines(0).contains("SparkListenerLogStart"))
    +      assert(lines(1).contains("SparkListenerApplicationStart"))
    +      assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
    +      var i = 1
    +      events.foreach {event =>
    +        event match {
    +          case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
    +          case stageCompleted: SparkListenerStageCompleted =>
    +            for (j <- 1 to 2) {
    +              checkExecutorMetricsUpdate(lines(i), 
stageCompleted.stageInfo.stageId,
    +                expectedMetricsEvents)
    +                i += 1
    +             }
    +            checkEvent(lines(i), event)
    +            i += 1
    +        case _ =>
    +          checkEvent(lines(i), event)
    +          i += 1
    +        }
    +      }
    +    } finally {
    +      logData.close()
    +    }
    +  }
    +
    +  /** Create a stage submitted event for the specified stage Id. */
    +  private def createStageSubmittedEvent(stageId: Int) = {
    +    SparkListenerStageSubmitted(new StageInfo(stageId, 0, 
stageId.toString, 0,
    +      Seq.empty, Seq.empty, "details"))
    +  }
    +
    +  /** Create a stage completed event for the specified stage Id. */
    +  private def createStageCompletedEvent(stageId: Int) = {
    +    SparkListenerStageCompleted(new StageInfo(stageId, 0, 
stageId.toString, 0,
    +      Seq.empty, Seq.empty, "details"))
    +  }
    +
    +  /** Create an executor added event for the specified executor Id. */
    +  private def createExecutorAddedEvent(executorId: Int) = {
    +    SparkListenerExecutorAdded(0L, executorId.toString, new 
ExecutorInfo("host1", 1, Map.empty))
    +  }
    +
    +  /** Create an executor added event for the specified executor Id. */
    +  private def createExecutorRemovedEvent(executorId: Int) = {
    +    SparkListenerExecutorRemoved(0L, executorId.toString, "test")
    +  }
    +
    +  /** Create an executor metrics update event, with the specified executor 
metrics values. */
    +  private def createExecutorMetricsUpdateEvent(
    +      executorId: Int,
    +      executorMetrics: ExecutorMetrics): 
SparkListenerExecutorMetricsUpdate = {
    +    val taskMetrics = TaskMetrics.empty
    +    taskMetrics.incDiskBytesSpilled(111)
    +    taskMetrics.incMemoryBytesSpilled(222)
    +    val accum = Array((333L, 1, 1, 
taskMetrics.accumulators().map(AccumulatorSuite.makeInfo)))
    +    SparkListenerExecutorMetricsUpdate(executorId.toString, accum, 
Some(executorMetrics))
    +  }
    +
    +  /** Check that the two ExecutorMetrics match */
    +  private def checkExecutorMetrics(
    +      executorMetrics1: Option[ExecutorMetrics],
    +      executorMetrics2: Option[ExecutorMetrics]) = {
    +    (executorMetrics1, executorMetrics2) match {
    +      case (Some(e1), Some(e2)) =>
    +        assert(e1.timestamp === e2.timestamp)
    +        assert(e1.jvmUsedHeapMemory === e2.jvmUsedHeapMemory)
    +        assert(e1.jvmUsedNonHeapMemory === e2.jvmUsedNonHeapMemory)
    +        assert(e1.onHeapExecutionMemory === e2.onHeapExecutionMemory)
    +        assert(e1.offHeapExecutionMemory === e2.offHeapExecutionMemory)
    +        assert(e1.onHeapStorageMemory === e2.onHeapStorageMemory)
    +        assert(e1.offHeapStorageMemory === e2.offHeapStorageMemory)
    +        assert(e1.onHeapUnifiedMemory === e2.onHeapUnifiedMemory)
    +        assert(e1.offHeapUnifiedMemory === e2.offHeapUnifiedMemory)
    +        assert(e1.directMemory === e2.directMemory)
    +        assert(e1.mappedMemory === e2.mappedMemory)
    +      case (None, None) =>
    +      case _ =>
    +        assert(false)
    +    }
    +  }
    +
    +  /** Check that the Spark history log line matches the expected event. */
    +  private def checkEvent(line: String, event: SparkListenerEvent): Unit = {
    +    assert(line.contains(event.getClass.toString.split("\\.").last))
    +    event match {
    +      case executorMetrics: SparkListenerExecutorMetricsUpdate =>
    --- End diff --
    
    Nope, with the change in design to logging the executor metrics updates at 
stage end, this part is skipped -- I'll remove this.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to