Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19211#discussion_r139342594
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -65,53 +60,76 @@ private[spark] class LiveListenerBus(conf: SparkConf) 
extends SparkListenerBus {
       /** When `droppedEventsCounter` was logged last time in milliseconds. */
       @volatile private var lastReportTimestamp = 0L
     
    -  // Indicate if we are processing some event
    -  // Guarded by `self`
    -  private var processingEvent = false
    -
    -  private val logDroppedEvent = new AtomicBoolean(false)
    -
    -  // A counter that represents the number of events produced and consumed 
in the queue
    -  private val eventLock = new Semaphore(0)
    -
    -  private val listenerThread = new Thread(name) {
    -    setDaemon(true)
    -    override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
    -      LiveListenerBus.withinListenerThread.withValue(true) {
    -        val timer = metrics.eventProcessingTime
    -        while (true) {
    -          eventLock.acquire()
    -          self.synchronized {
    -            processingEvent = true
    -          }
    -          try {
    -            val event = eventQueue.poll
    -            if (event == null) {
    -              // Get out of the while loop and shutdown the daemon thread
    -              if (!stopped.get) {
    -                throw new IllegalStateException("Polling `null` from 
eventQueue means" +
    -                  " the listener bus has been stopped. So `stopped` must 
be true")
    -              }
    -              return
    -            }
    -            val timerContext = timer.time()
    -            try {
    -              postToAll(event)
    -            } finally {
    -              timerContext.stop()
    -            }
    -          } finally {
    -            self.synchronized {
    -              processingEvent = false
    -            }
    -          }
    +  private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
    +
    +  /** Add a listener to queue shared by all non-internal listeners. */
    +  def addToSharedQueue(listener: SparkListenerInterface): Unit = {
    +    addToQueue(listener, SHARED_QUEUE)
    +  }
    +
    +  /** Add a listener to the executor management queue. */
    +  def addToManagementQueue(listener: SparkListenerInterface): Unit = {
    +    addToQueue(listener, EXECUTOR_MGMT_QUEUE)
    +  }
    +
    +  /** Add a listener to the application status queue. */
    +  def addToStatusQueue(listener: SparkListenerInterface): Unit = {
    +    addToQueue(listener, APP_STATUS_QUEUE)
    +  }
    +
    +  /** Add a listener to the event log queue. */
    +  def addToEventLogQueue(listener: SparkListenerInterface): Unit = {
    +    addToQueue(listener, EVENT_LOG_QUEUE)
    +  }
    +
    +  /**
    +   * Add a listener to a specific queue, creating a new queue if needed. 
Queues are independent
    +   * of each other (each one uses a separate thread for delivering 
events), allowing slower
    +   * listeners to be somewhat isolated from others.
    +   */
    +  private def addToQueue(listener: SparkListenerInterface, queue: String): 
Unit = synchronized {
    +    if (stopped.get()) {
    +      throw new IllegalStateException("LiveListenerBus is stopped.")
    +    }
    +
    +    queues.asScala.find(_.name == queue) match {
    +      case Some(queue) =>
    +        queue.addListener(listener)
    +
    +      case None =>
    +        val newQueue = new AsyncEventQueue(queue, conf, metrics)
    +        newQueue.addListener(listener)
    +        if (started.get() && !stopped.get()) {
    +          newQueue.start(sparkContext)
             }
    -      }
    +        queues.add(newQueue)
         }
       }
     
    -  override protected def getTimer(listener: SparkListenerInterface): 
Option[Timer] = {
    -    
metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface]))
    +  def removeListener(listener: SparkListenerInterface): Unit = 
synchronized {
    +    // Remove listener from all queues it was added to, and stop queues 
that have become empty.
    +    queues.asScala
    +      .filter { queue =>
    +        queue.removeListener(listener)
    +        queue.listeners.isEmpty()
    +      }
    +      .foreach { toRemove =>
    +        if (started.get() && !stopped.get()) {
    +          toRemove.stop()
    --- End diff --
    
    oh I see your point, we can re-create and start this queue later if needed. 
Is it really necessary? Can't we keep all the queues active all the time?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to