Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19211#discussion_r139594359
  
    --- Diff: 
core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---
    @@ -42,59 +44,65 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
       private val mockSparkContext: SparkContext = 
Mockito.mock(classOf[SparkContext])
       private val mockMetricsSystem: MetricsSystem = 
Mockito.mock(classOf[MetricsSystem])
     
    +  private def numDroppedEvents(bus: LiveListenerBus): Long = {
    +    
bus.metrics.metricRegistry.counter(s"queue.$SHARED_QUEUE.numDroppedEvents").getCount
    +  }
    +
    +  private def queueSize(bus: LiveListenerBus): Int = {
    +    
bus.metrics.metricRegistry.getGauges().get(s"queue.$SHARED_QUEUE.size").getValue()
    +      .asInstanceOf[Int]
    +  }
    +
    +  private def eventProcessingTimeCount(bus: LiveListenerBus): Long = {
    +    
bus.metrics.metricRegistry.timer(s"queue.$SHARED_QUEUE.listenerProcessingTime").getCount()
    +  }
    +
       test("don't call sc.stop in listener") {
         sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
         val listener = new SparkContextStoppingListener(sc)
    -    val bus = new LiveListenerBus(sc.conf)
    -    bus.addListener(listener)
     
    -    // Starting listener bus should flush all buffered events
    -    bus.start(sc, sc.env.metricsSystem)
    -    bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
    -    bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
    +    sc.listenerBus.addToSharedQueue(listener)
    +    sc.listenerBus.post(SparkListenerJobEnd(0, jobCompletionTime, 
JobSucceeded))
    +    sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
    +    sc.stop()
     
    -    bus.stop()
         assert(listener.sparkExSeen)
       }
     
       test("basic creation and shutdown of LiveListenerBus") {
         val conf = new SparkConf()
         val counter = new BasicJobCounter
         val bus = new LiveListenerBus(conf)
    -    bus.addListener(counter)
    +    bus.addToSharedQueue(counter)
     
         // Metrics are initially empty.
         assert(bus.metrics.numEventsPosted.getCount === 0)
    -    assert(bus.metrics.numDroppedEvents.getCount === 0)
    -    assert(bus.metrics.queueSize.getValue === 0)
    -    assert(bus.metrics.eventProcessingTime.getCount === 0)
    +    assert(numDroppedEvents(bus) === 0)
    +    assert(queueSize(bus) === 0)
    +    assert(eventProcessingTimeCount(bus) === 0)
     
         // Post five events:
         (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, 
jobCompletionTime, JobSucceeded)) }
     
         // Five messages should be marked as received and queued, but no 
messages should be posted to
         // listeners yet because the the listener bus hasn't been started.
         assert(bus.metrics.numEventsPosted.getCount === 5)
    -    assert(bus.metrics.queueSize.getValue === 5)
    +    assert(queueSize(bus) === 5)
         assert(counter.count === 0)
     
         // Starting listener bus should flush all buffered events
         bus.start(mockSparkContext, mockMetricsSystem)
         Mockito.verify(mockMetricsSystem).registerSource(bus.metrics)
         bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
         assert(counter.count === 5)
    -    assert(bus.metrics.queueSize.getValue === 0)
    -    assert(bus.metrics.eventProcessingTime.getCount === 5)
    +    assert(queueSize(bus) === 0)
    +    assert(eventProcessingTimeCount(bus) === 5)
     
         // After listener bus has stopped, posting events should not increment 
counter
         bus.stop()
         (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, 
jobCompletionTime, JobSucceeded)) }
         assert(counter.count === 5)
    -    assert(bus.metrics.numEventsPosted.getCount === 5)
    -
    -    // Make sure per-listener-class timers were created:
    --- End diff --
    
    oh seems we don't need it anymore.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to