Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19981#discussion_r157135580 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala --- @@ -142,286 +163,277 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest (id, accumulatorValue) }.toMap - bus.postToAll(SparkListenerSQLExecutionStart( + listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, "test", "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis())) - bus.postToAll(SparkListenerJobStart( + listener.onJobStart(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), stageInfos = Seq( createStageInfo(0, 0), createStageInfo(1, 0) ), createProperties(executionId))) - bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0))) + listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0))) - assert(store.executionMetrics(0).isEmpty) + assert(statusStore.executionMetrics(executionId).isEmpty) - bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates)) ))) - checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2)) // Driver accumulator updates don't belong to this execution should be filtered and no // exception will be thrown. - bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L)))) + listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L)))) - checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2)) - bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2))) ))) - checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 3)) + checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 3)) // Retrying a stage should reset the metrics - bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1))) + listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1))) - bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)), (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates)) ))) - checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2)) // Ignore the task end for the first attempt - bus.postToAll(SparkListenerTaskEnd( + listener.onTaskEnd(SparkListenerTaskEnd( stageId = 0, stageAttemptId = 0, taskType = "", reason = null, createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)), null)) - checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 2)) + checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 2)) // Finish two tasks - bus.postToAll(SparkListenerTaskEnd( + listener.onTaskEnd(SparkListenerTaskEnd( stageId = 0, stageAttemptId = 1, taskType = "", reason = null, createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)), null)) - bus.postToAll(SparkListenerTaskEnd( + listener.onTaskEnd(SparkListenerTaskEnd( stageId = 0, stageAttemptId = 1, taskType = "", reason = null, createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)), null)) - checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 5)) + checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 5)) // Summit a new stage - bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0))) + listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0))) - bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq( + listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)), (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates)) ))) - checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 7)) + checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 7)) // Finish two tasks - bus.postToAll(SparkListenerTaskEnd( + listener.onTaskEnd(SparkListenerTaskEnd( stageId = 1, stageAttemptId = 0, taskType = "", reason = null, createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)), null)) - bus.postToAll(SparkListenerTaskEnd( + listener.onTaskEnd(SparkListenerTaskEnd( stageId = 1, stageAttemptId = 0, taskType = "", reason = null, createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)), null)) - checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11)) + checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 11)) - assertJobs(store.execution(0), running = Seq(0)) + assertJobs(statusStore.execution(executionId), running = Seq(0)) - bus.postToAll(SparkListenerJobEnd( + listener.onJobEnd(SparkListenerJobEnd( jobId = 0, time = System.currentTimeMillis(), JobSucceeded )) - bus.postToAll(SparkListenerSQLExecutionEnd( + listener.onOtherEvent(SparkListenerSQLExecutionEnd( executionId, System.currentTimeMillis())) - assertJobs(store.execution(0), completed = Seq(0)) - checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ * 11)) + assertJobs(statusStore.execution(executionId), completed = Seq(0)) + + checkAnswer(statusStore.executionMetrics(executionId), accumulatorUpdates.mapValues(_ * 11)) } - sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") { (store, bus) => + test("onExecutionEnd happens before onJobEnd(JobSucceeded)") { + val statusStore = createStatusStore() + val listener = statusStore.listener.get + val executionId = 0 val df = createTestDataFrame - bus.postToAll(SparkListenerSQLExecutionStart( + listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, "test", "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis())) - bus.postToAll(SparkListenerJobStart( + listener.onJobStart(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), stageInfos = Nil, createProperties(executionId))) - bus.postToAll(SparkListenerSQLExecutionEnd( + listener.onOtherEvent(SparkListenerSQLExecutionEnd( executionId, System.currentTimeMillis())) - bus.postToAll(SparkListenerJobEnd( + listener.onJobEnd(SparkListenerJobEnd( jobId = 0, time = System.currentTimeMillis(), JobSucceeded )) - assertJobs(store.execution(0), completed = Seq(0)) + assertJobs(statusStore.execution(executionId), completed = Seq(0)) } - sqlStoreTest("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") { (store, bus) => + test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") { + val statusStore = createStatusStore() + val listener = statusStore.listener.get + val executionId = 0 val df = createTestDataFrame - bus.postToAll(SparkListenerSQLExecutionStart( + listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, "test", "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis())) - bus.postToAll(SparkListenerJobStart( + listener.onJobStart(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), stageInfos = Nil, createProperties(executionId))) - bus.postToAll(SparkListenerJobEnd( + listener.onJobEnd(SparkListenerJobEnd( jobId = 0, time = System.currentTimeMillis(), JobSucceeded )) - bus.postToAll(SparkListenerJobStart( + listener.onJobStart(SparkListenerJobStart( jobId = 1, time = System.currentTimeMillis(), stageInfos = Nil, createProperties(executionId))) - bus.postToAll(SparkListenerSQLExecutionEnd( + listener.onOtherEvent(SparkListenerSQLExecutionEnd( executionId, System.currentTimeMillis())) - bus.postToAll(SparkListenerJobEnd( + listener.onJobEnd(SparkListenerJobEnd( jobId = 1, time = System.currentTimeMillis(), JobSucceeded )) - assertJobs(store.execution(0), completed = Seq(0, 1)) + assertJobs(statusStore.execution(executionId), completed = Seq(0, 1)) } - sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") { (store, bus) => + test("onExecutionEnd happens before onJobEnd(JobFailed)") { + val statusStore = createStatusStore() + val listener = statusStore.listener.get + val executionId = 0 val df = createTestDataFrame - bus.postToAll(SparkListenerSQLExecutionStart( + listener.onOtherEvent(SparkListenerSQLExecutionStart( executionId, "test", "test", df.queryExecution.toString, SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan), System.currentTimeMillis())) - bus.postToAll(SparkListenerJobStart( + listener.onJobStart(SparkListenerJobStart( jobId = 0, time = System.currentTimeMillis(), stageInfos = Seq.empty, createProperties(executionId))) - bus.postToAll(SparkListenerSQLExecutionEnd( + listener.onOtherEvent(SparkListenerSQLExecutionEnd( executionId, System.currentTimeMillis())) - bus.postToAll(SparkListenerJobEnd( + listener.onJobEnd(SparkListenerJobEnd( jobId = 0, time = System.currentTimeMillis(), JobFailed(new RuntimeException("Oops")) )) - assertJobs(store.execution(0), failed = Seq(0)) + assertJobs(statusStore.execution(executionId), failed = Seq(0)) } test("SPARK-11126: no memory leak when running non SQL jobs") { - val previousStageNumber = statusStore.executionsList().size - spark.sparkContext.parallelize(1 to 10).foreach(i => ()) - spark.sparkContext.listenerBus.waitUntilEmpty(10000) --- End diff -- Previously we did not attach the testing listener to spark event bus, fixed now.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org