Github user edwinalu commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r191011834 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +261,233 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test executor metrics update logging functionality. This checks that a + * SparkListenerExecutorMetricsUpdate event is added to the Spark history + * log if one of the executor metrics is larger than any previously + * recorded value for the metric, per executor per stage. The task metrics + * should not be added. + */ + private def testExecutorMetricsUpdateEventLogging() { + val conf = getLoggingConf(testDirPath, None) + val logName = "executorMetricsUpdated-test" + val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) + val listenerBus = new LiveListenerBus(conf) + + // expected ExecutorMetricsUpdate, for the given stage id and executor id + val expectedMetricsEvents: Map[(Int, String), SparkListenerExecutorMetricsUpdate] = + Map( + ((0, "1"), + createExecutorMetricsUpdateEvent(1, + new ExecutorMetrics(-1L, 5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L, 20L))), + ((0, "2"), + createExecutorMetricsUpdateEvent(2, + new ExecutorMetrics(-1L, 7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L))), + ((1, "1"), + createExecutorMetricsUpdateEvent(1, + new ExecutorMetrics(-1L, 7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L))), + ((1, "2"), + createExecutorMetricsUpdateEvent(2, + new ExecutorMetrics(-1L, 7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L)))) + + // Events to post. + val events = Array( + SparkListenerApplicationStart("executionMetrics", None, + 1L, "update", None), + createExecutorAddedEvent(1), + createExecutorAddedEvent(2), + createStageSubmittedEvent(0), + createExecutorMetricsUpdateEvent(1, + new ExecutorMetrics(10L, 4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L)), + createExecutorMetricsUpdateEvent(2, + new ExecutorMetrics(10L, 1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L)), + createExecutorMetricsUpdateEvent(1, + new ExecutorMetrics(15L, 4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L)), + createExecutorMetricsUpdateEvent(2, + new ExecutorMetrics(15L, 2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L)), + createExecutorMetricsUpdateEvent(1, + new ExecutorMetrics(20L, 2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L)), + createExecutorMetricsUpdateEvent(2, + new ExecutorMetrics(20L, 3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L)), + createStageSubmittedEvent(1), + createExecutorMetricsUpdateEvent(1, + new ExecutorMetrics(25L, 5000L, 30L, 50L, 20L, 30L, 10L, 80L, 30L, 50L, 0L)), + createExecutorMetricsUpdateEvent(2, + new ExecutorMetrics(25L, 7000L, 70L, 50L, 20L, 0L, 10L, 50L, 30L, 10L, 40L)), + createStageCompletedEvent(0), + createExecutorMetricsUpdateEvent(1, + new ExecutorMetrics(30L, 6000L, 70L, 20L, 30L, 10L, 0L, 30L, 30L, 30L, 0L)), + createExecutorMetricsUpdateEvent(2, + new ExecutorMetrics(30L, 5500L, 30L, 20L, 40L, 10L, 0L, 30L, 40L, 40L, 20L)), + createExecutorMetricsUpdateEvent(1, + new ExecutorMetrics(35L, 7000L, 70L, 5L, 25L, 60L, 30L, 65L, 55L, 30L, 0L)), + createExecutorMetricsUpdateEvent(2, + new ExecutorMetrics(35L, 5500L, 40L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 20L)), + createExecutorMetricsUpdateEvent(1, + new ExecutorMetrics(40L, 5500L, 70L, 15L, 20L, 55L, 20L, 70L, 40L, 20L, 0L)), + createExecutorRemovedEvent(1), + createExecutorMetricsUpdateEvent(2, + new ExecutorMetrics(40L, 4000L, 20L, 25L, 30L, 10L, 30L, 35L, 60L, 0L, 0L)), + createStageCompletedEvent(1), + SparkListenerApplicationEnd(1000L)) + + // play the events for the event logger + eventLogger.start() + listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) + listenerBus.addToEventLogQueue(eventLogger) + events.foreach(event => listenerBus.post(event)) + listenerBus.stop() + eventLogger.stop() + + // Verify the log file contains the expected events. + // Posted events should be logged, except for ExecutorMetricsUpdate events -- these + // are consolidated, and the peak values for each stage are logged at stage end. + val logData = EventLoggingListener.openEventLog(new Path(eventLogger.logPath), fileSystem) + try { + val lines = readLines(logData) + val logStart = SparkListenerLogStart(SPARK_VERSION) + assert(lines.size === 14) + assert(lines(0).contains("SparkListenerLogStart")) + assert(lines(1).contains("SparkListenerApplicationStart")) + assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart) + var i = 1 + events.foreach {event => + event match { + case metricsUpdate: SparkListenerExecutorMetricsUpdate => + case stageCompleted: SparkListenerStageCompleted => + for (j <- 1 to 2) { + checkExecutorMetricsUpdate(lines(i), stageCompleted.stageInfo.stageId, + expectedMetricsEvents) + i += 1 + } + checkEvent(lines(i), event) + i += 1 + case _ => + checkEvent(lines(i), event) + i += 1 + } + } + } finally { + logData.close() + } + } + + /** Create a stage submitted event for the specified stage Id. */ + private def createStageSubmittedEvent(stageId: Int) = { + SparkListenerStageSubmitted(new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details")) + } + + /** Create a stage completed event for the specified stage Id. */ + private def createStageCompletedEvent(stageId: Int) = { + SparkListenerStageCompleted(new StageInfo(stageId, 0, stageId.toString, 0, + Seq.empty, Seq.empty, "details")) + } + + /** Create an executor added event for the specified executor Id. */ + private def createExecutorAddedEvent(executorId: Int) = { + SparkListenerExecutorAdded(0L, executorId.toString, new ExecutorInfo("host1", 1, Map.empty)) + } + + /** Create an executor added event for the specified executor Id. */ + private def createExecutorRemovedEvent(executorId: Int) = { + SparkListenerExecutorRemoved(0L, executorId.toString, "test") + } + + /** Create an executor metrics update event, with the specified executor metrics values. */ + private def createExecutorMetricsUpdateEvent( + executorId: Int, + executorMetrics: ExecutorMetrics): SparkListenerExecutorMetricsUpdate = { + val taskMetrics = TaskMetrics.empty + taskMetrics.incDiskBytesSpilled(111) + taskMetrics.incMemoryBytesSpilled(222) + val accum = Array((333L, 1, 1, taskMetrics.accumulators().map(AccumulatorSuite.makeInfo))) + SparkListenerExecutorMetricsUpdate(executorId.toString, accum, Some(executorMetrics)) + } + + /** Check that the two ExecutorMetrics match */ + private def checkExecutorMetrics( + executorMetrics1: Option[ExecutorMetrics], + executorMetrics2: Option[ExecutorMetrics]) = { + (executorMetrics1, executorMetrics2) match { + case (Some(e1), Some(e2)) => + assert(e1.timestamp === e2.timestamp) + assert(e1.jvmUsedHeapMemory === e2.jvmUsedHeapMemory) + assert(e1.jvmUsedNonHeapMemory === e2.jvmUsedNonHeapMemory) + assert(e1.onHeapExecutionMemory === e2.onHeapExecutionMemory) + assert(e1.offHeapExecutionMemory === e2.offHeapExecutionMemory) + assert(e1.onHeapStorageMemory === e2.onHeapStorageMemory) + assert(e1.offHeapStorageMemory === e2.offHeapStorageMemory) + assert(e1.onHeapUnifiedMemory === e2.onHeapUnifiedMemory) + assert(e1.offHeapUnifiedMemory === e2.offHeapUnifiedMemory) + assert(e1.directMemory === e2.directMemory) + assert(e1.mappedMemory === e2.mappedMemory) + case (None, None) => + case _ => + assert(false) + } + } + + /** Check that the Spark history log line matches the expected event. */ + private def checkEvent(line: String, event: SparkListenerEvent): Unit = { + assert(line.contains(event.getClass.toString.split("\\.").last)) + event match { + case executorMetrics: SparkListenerExecutorMetricsUpdate => --- End diff -- Nope, with the change in design to logging the executor metrics updates at stage end, this part is skipped -- I'll remove this.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org