Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19211#discussion_r139341925
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -65,53 +60,76 @@ private[spark] class LiveListenerBus(conf: SparkConf) 
extends SparkListenerBus {
       /** When `droppedEventsCounter` was logged last time in milliseconds. */
       @volatile private var lastReportTimestamp = 0L
     
    -  // Indicate if we are processing some event
    -  // Guarded by `self`
    -  private var processingEvent = false
    -
    -  private val logDroppedEvent = new AtomicBoolean(false)
    -
    -  // A counter that represents the number of events produced and consumed 
in the queue
    -  private val eventLock = new Semaphore(0)
    -
    -  private val listenerThread = new Thread(name) {
    -    setDaemon(true)
    -    override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
    -      LiveListenerBus.withinListenerThread.withValue(true) {
    -        val timer = metrics.eventProcessingTime
    -        while (true) {
    -          eventLock.acquire()
    -          self.synchronized {
    -            processingEvent = true
    -          }
    -          try {
    -            val event = eventQueue.poll
    -            if (event == null) {
    -              // Get out of the while loop and shutdown the daemon thread
    -              if (!stopped.get) {
    -                throw new IllegalStateException("Polling `null` from 
eventQueue means" +
    -                  " the listener bus has been stopped. So `stopped` must 
be true")
    -              }
    -              return
    -            }
    -            val timerContext = timer.time()
    -            try {
    -              postToAll(event)
    -            } finally {
    -              timerContext.stop()
    -            }
    -          } finally {
    -            self.synchronized {
    -              processingEvent = false
    -            }
    -          }
    +  private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
    +
    +  /** Add a listener to queue shared by all non-internal listeners. */
    +  def addToSharedQueue(listener: SparkListenerInterface): Unit = {
    +    addToQueue(listener, SHARED_QUEUE)
    +  }
    +
    +  /** Add a listener to the executor management queue. */
    +  def addToManagementQueue(listener: SparkListenerInterface): Unit = {
    +    addToQueue(listener, EXECUTOR_MGMT_QUEUE)
    +  }
    +
    +  /** Add a listener to the application status queue. */
    +  def addToStatusQueue(listener: SparkListenerInterface): Unit = {
    +    addToQueue(listener, APP_STATUS_QUEUE)
    +  }
    +
    +  /** Add a listener to the event log queue. */
    +  def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
    +    addToQueue(listener, EVENT_LOG_QUEUE)
    +  }
    +
    +  /**
    +   * Add a listener to a specific queue, creating a new queue if needed. 
Queues are independent
    +   * of each other (each one uses a separate thread for delivering 
events), allowing slower
    +   * listeners to be somewhat isolated from others.
    +   */
    +  private def addToQueue(listener: SparkListenerInterface, queue: String): 
Unit = synchronized {
    +    if (stopped.get()) {
    +      throw new IllegalStateException("LiveListenerBus is stopped.")
    +    }
    +
    +    queues.asScala.find(_.name == queue) match {
    +      case Some(queue) =>
    +        queue.addListener(listener)
    +
    +      case None =>
    +        val newQueue = new AsyncEventQueue(queue, conf, metrics)
    +        newQueue.addListener(listener)
    +        if (started.get() && !stopped.get()) {
    --- End diff --
    
    `stopped.get` is always false here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to