Github user rezasafi commented on a diff in the pull request: https://github.com/apache/spark/pull/21221#discussion_r204976606 --- Diff: core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala --- @@ -251,6 +261,215 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit } } + /** + * Test stage executor metrics logging functionality. This checks that peak + * values from SparkListenerExecutorMetricsUpdate events during a stage are + * logged in a StageExecutorMetrics event for each executor at stage completion. + */ + private def testStageExecutorMetricsEventLogging() { + val conf = getLoggingConf(testDirPath, None) + val logName = "stageExecutorMetrics-test" + val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) + val listenerBus = new LiveListenerBus(conf) + + // expected StageExecutorMetrics, for the given stage id and executor id + val expectedMetricsEvents: Map[(Int, String), SparkListenerStageExecutorMetrics] = + Map( + ((0, "1"), + new SparkListenerStageExecutorMetrics("1", 0, 0, + Array(5000L, 50L, 50L, 20L, 50L, 10L, 100L, 30L, 70L, 20L))), + ((0, "2"), + new SparkListenerStageExecutorMetrics("2", 0, 0, + Array(7000L, 70L, 50L, 20L, 10L, 10L, 50L, 30L, 80L, 40L))), + ((1, "1"), + new SparkListenerStageExecutorMetrics("1", 1, 0, + Array(7000L, 70L, 50L, 30L, 60L, 30L, 80L, 55L, 50L, 0L))), + ((1, "2"), + new SparkListenerStageExecutorMetrics("2", 1, 0, + Array(7000L, 70L, 50L, 40L, 10L, 30L, 50L, 60L, 40L, 40L)))) + + // Events to post. + val events = Array( + SparkListenerApplicationStart("executionMetrics", None, + 1L, "update", None), + createExecutorAddedEvent(1), + createExecutorAddedEvent(2), + createStageSubmittedEvent(0), + // receive 3 metric updates from each executor with just stage 0 running, + // with different peak updates for each executor + createExecutorMetricsUpdateEvent(1, + Array(4000L, 50L, 20L, 0L, 40L, 0L, 60L, 0L, 70L, 20L)), + createExecutorMetricsUpdateEvent(2, + Array(1500L, 50L, 20L, 0L, 0L, 0L, 20L, 0L, 70L, 0L)), + // exec 1: new stage 0 peaks for metrics at indexes: 2, 4, 6 + createExecutorMetricsUpdateEvent(1, + Array(4000L, 50L, 50L, 0L, 50L, 0L, 100L, 0L, 70L, 20L)), + // exec 2: new stage 0 peaks for metrics at indexes: 0, 4, 6 + createExecutorMetricsUpdateEvent(2, + Array(2000L, 50L, 10L, 0L, 10L, 0L, 30L, 0L, 70L, 0L)), + // exec 1: new stage 0 peaks for metrics at indexes: 5, 7 + createExecutorMetricsUpdateEvent(1, + Array(2000L, 40L, 50L, 0L, 40L, 10L, 90L, 10L, 50L, 0L)), + // exec 2: new stage 0 peaks for metrics at indexes: 0, 5, 6, 7, 8 + createExecutorMetricsUpdateEvent(2, + Array(3500L, 50L, 15L, 0L, 10L, 10L, 35L, 10L, 80L, 0L)), + // now start stage 1, one more metric update for each executor, and new + // peaks for some stage 1 metrics (as listed), initialize stage 1 peaks + createStageSubmittedEvent(1), + // exec 1: new stage 0 peaks for metrics at indexes: 0, 3, 7 --- End diff -- Are this comment and the one in line 322 correct? Shouldn't it say stage 1?
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org