Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157663383
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
 ---
    @@ -147,236 +159,246 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext with JsonTest
           (id, accumulatorValue)
         }.toMap
     
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
     
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq(
             createStageInfo(0, 0),
             createStageInfo(1, 0)
           ),
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
    +    
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
     
    -    assert(store.executionMetrics(0).isEmpty)
    +    assert(statusStore.executionMetrics(executionId).isEmpty)
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 2))
     
         // Driver accumulator updates don't belong to this execution should be 
filtered and no
         // exception will be thrown.
    -    bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
    +    listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 
2L))))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 2))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 
2)))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 3))
    +    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 3))
     
         // Retrying a stage should reset the metrics
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
    +    
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 2))
     
         // Ignore the task end for the first attempt
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 2))
     
         // Finish two tasks
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)),
           null))
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
           createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 5))
    +    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 5))
     
         // Summit a new stage
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0)))
    +    
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 7))
    +    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 7))
     
         // Finish two tasks
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 11))
    +    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 11))
     
    -    assertJobs(store.execution(0), running = Seq(0))
    +    assertJobs(statusStore.execution(executionId), running = Seq(0))
     
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
     
    -    assertJobs(store.execution(0), completed = Seq(0))
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 11))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0))
    +
    +    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 11))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") { 
(store, bus) =>
    +  test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    assertJobs(store.execution(0), completed = Seq(0))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before multiple 
onJobEnd(JobSucceeded)s") { (store, bus) =>
    +  test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
             jobId = 0,
             time = System.currentTimeMillis(),
             JobSucceeded
         ))
     
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 1,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 1,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    assertJobs(store.execution(0), completed = Seq(0, 1))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0, 1))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") { 
(store, bus) =>
    +  test("onExecutionEnd happens before onJobEnd(JobFailed)") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq.empty,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobFailed(new RuntimeException("Oops"))
         ))
     
    -    assertJobs(store.execution(0), failed = Seq(0))
    +    assertJobs(statusStore.execution(executionId), failed = Seq(0))
       }
     
       test("SPARK-11126: no memory leak when running non SQL jobs") {
    -    val previousStageNumber = statusStore.executionsList().size
    --- End diff --
    
    Previously we didn't test the live data for stages.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to