Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19981#discussion_r157317442
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
 ---
    @@ -142,286 +163,277 @@ class SQLListenerSuite extends SparkFunSuite with 
SharedSQLContext with JsonTest
           (id, accumulatorValue)
         }.toMap
     
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
     
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq(
             createStageInfo(0, 0),
             createStageInfo(1, 0)
           ),
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 0)))
    +    
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 0)))
     
    -    assert(store.executionMetrics(0).isEmpty)
    +    assert(statusStore.executionMetrics(executionId).isEmpty)
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 2))
     
         // Driver accumulator updates don't belong to this execution should be 
filtered and no
         // exception will be thrown.
    -    bus.postToAll(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
    +    listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 
2L))))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 2))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 
2)))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 3))
    +    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 3))
     
         // Retrying a stage should reset the metrics
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1)))
    +    
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1)))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 0, 1, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 0, 1, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 2))
     
         // Ignore the task end for the first attempt
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 2))
    +    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 2))
     
         // Finish two tasks
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2)),
           null))
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 0,
           stageAttemptId = 1,
           taskType = "",
           reason = null,
           createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 5))
    +    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 5))
     
         // Summit a new stage
    -    bus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 0)))
    +    
listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0)))
     
    -    bus.postToAll(SparkListenerExecutorMetricsUpdate("", Seq(
    +    
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq(
           // (task id, stage id, stage attempt, accum updates)
           (0L, 1, 0, createAccumulatorInfos(accumulatorUpdates)),
           (1L, 1, 0, createAccumulatorInfos(accumulatorUpdates))
         )))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 7))
    +    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 7))
     
         // Finish two tasks
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
    -    bus.postToAll(SparkListenerTaskEnd(
    +    listener.onTaskEnd(SparkListenerTaskEnd(
           stageId = 1,
           stageAttemptId = 0,
           taskType = "",
           reason = null,
           createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3)),
           null))
     
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 11))
    +    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 11))
     
    -    assertJobs(store.execution(0), running = Seq(0))
    +    assertJobs(statusStore.execution(executionId), running = Seq(0))
     
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
     
    -    assertJobs(store.execution(0), completed = Seq(0))
    -    checkAnswer(store.executionMetrics(0), accumulatorUpdates.mapValues(_ 
* 11))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0))
    +
    +    checkAnswer(statusStore.executionMetrics(executionId), 
accumulatorUpdates.mapValues(_ * 11))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobSucceeded)") { 
(store, bus) =>
    +  test("onExecutionEnd happens before onJobEnd(JobSucceeded)") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    assertJobs(store.execution(0), completed = Seq(0))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before multiple 
onJobEnd(JobSucceeded)s") { (store, bus) =>
    +  test("onExecutionEnd happens before multiple onJobEnd(JobSucceeded)s") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
             jobId = 0,
             time = System.currentTimeMillis(),
             JobSucceeded
         ))
     
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 1,
           time = System.currentTimeMillis(),
           stageInfos = Nil,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 1,
           time = System.currentTimeMillis(),
           JobSucceeded
         ))
     
    -    assertJobs(store.execution(0), completed = Seq(0, 1))
    +    assertJobs(statusStore.execution(executionId), completed = Seq(0, 1))
       }
     
    -  sqlStoreTest("onExecutionEnd happens before onJobEnd(JobFailed)") { 
(store, bus) =>
    +  test("onExecutionEnd happens before onJobEnd(JobFailed)") {
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +
         val executionId = 0
         val df = createTestDataFrame
    -    bus.postToAll(SparkListenerSQLExecutionStart(
    +    listener.onOtherEvent(SparkListenerSQLExecutionStart(
           executionId,
           "test",
           "test",
           df.queryExecution.toString,
           SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
           System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobStart(
    +    listener.onJobStart(SparkListenerJobStart(
           jobId = 0,
           time = System.currentTimeMillis(),
           stageInfos = Seq.empty,
           createProperties(executionId)))
    -    bus.postToAll(SparkListenerSQLExecutionEnd(
    +    listener.onOtherEvent(SparkListenerSQLExecutionEnd(
           executionId, System.currentTimeMillis()))
    -    bus.postToAll(SparkListenerJobEnd(
    +    listener.onJobEnd(SparkListenerJobEnd(
           jobId = 0,
           time = System.currentTimeMillis(),
           JobFailed(new RuntimeException("Oops"))
         ))
     
    -    assertJobs(store.execution(0), failed = Seq(0))
    +    assertJobs(statusStore.execution(executionId), failed = Seq(0))
       }
     
       test("SPARK-11126: no memory leak when running non SQL jobs") {
    -    val previousStageNumber = statusStore.executionsList().size
    -    spark.sparkContext.parallelize(1 to 10).foreach(i => ())
    -    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
    -    // listener should ignore the non SQL stage
    -    assert(statusStore.executionsList().size == previousStageNumber)
    -
    -    spark.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
    -    spark.sparkContext.listenerBus.waitUntilEmpty(10000)
    -    // listener should save the SQL stage
    -    assert(statusStore.executionsList().size == previousStageNumber + 1)
    -  }
    -
    -  test("driver side SQL metrics") {
    -    val oldCount = statusStore.executionsList().size
    -    val expectedAccumValue = 12345L
    -    val physicalPlan = MyPlan(sqlContext.sparkContext, expectedAccumValue)
    -    val dummyQueryExecution = new QueryExecution(spark, LocalRelation()) {
    -      override lazy val sparkPlan = physicalPlan
    -      override lazy val executedPlan = physicalPlan
    -    }
    -
    -    SQLExecution.withNewExecutionId(spark, dummyQueryExecution) {
    -      physicalPlan.execute().collect()
    -    }
    -
    -    while (statusStore.executionsList().size < oldCount) {
    -      Thread.sleep(100)
    -    }
    -
    -    // Wait for listener to finish computing the metrics for the execution.
    -    while (statusStore.executionsList().last.metricValues == null) {
    -      Thread.sleep(100)
    +    val statusStore = createStatusStore()
    +    val listener = statusStore.listener.get
    +    try {
    +      sparkContext.addSparkListener(listener)
    +      spark.sparkContext.parallelize(1 to 10).foreach(i => ())
    +      spark.sparkContext.listenerBus.waitUntilEmpty(10000)
    +      // Listener should ignore the non-SQL stages, as the stage data are 
only removed when SQL
    +      // execution ends, which will not be triggered for non-SQL jobs.
    +      assert(listener.stageMetrics.isEmpty)
    +    } finally {
    +      sparkContext.removeSparkListener(listener)
         }
    -
    -    val execId = statusStore.executionsList().last.executionId
    -    val metrics = statusStore.executionMetrics(execId)
    -    val driverMetric = physicalPlan.metrics("dummy")
    -    val expectedValue = SQLMetrics.stringValue(driverMetric.metricType, 
Seq(expectedAccumValue))
    -
    -    assert(metrics.contains(driverMetric.id))
    -    assert(metrics(driverMetric.id) === expectedValue)
    --- End diff --
    
    This passed before because the listener was automatically being added to 
the bus using the plugin interface you've removed in this PR.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to