Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19211#discussion_r139187703
  
    --- Diff: 
core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---
    @@ -65,53 +60,61 @@ private[spark] class LiveListenerBus(conf: SparkConf) 
extends SparkListenerBus {
       /** When `droppedEventsCounter` was logged last time in milliseconds. */
       @volatile private var lastReportTimestamp = 0L
     
    -  // Indicate if we are processing some event
    -  // Guarded by `self`
    -  private var processingEvent = false
    -
    -  private val logDroppedEvent = new AtomicBoolean(false)
    -
    -  // A counter that represents the number of events produced and consumed 
in the queue
    -  private val eventLock = new Semaphore(0)
    -
    -  private val listenerThread = new Thread(name) {
    -    setDaemon(true)
    -    override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
    -      LiveListenerBus.withinListenerThread.withValue(true) {
    -        val timer = metrics.eventProcessingTime
    -        while (true) {
    -          eventLock.acquire()
    -          self.synchronized {
    -            processingEvent = true
    -          }
    -          try {
    -            val event = eventQueue.poll
    -            if (event == null) {
    -              // Get out of the while loop and shutdown the daemon thread
    -              if (!stopped.get) {
    -                throw new IllegalStateException("Polling `null` from 
eventQueue means" +
    -                  " the listener bus has been stopped. So `stopped` must 
be true")
    -              }
    -              return
    -            }
    -            val timerContext = timer.time()
    -            try {
    -              postToAll(event)
    -            } finally {
    -              timerContext.stop()
    -            }
    -          } finally {
    -            self.synchronized {
    -              processingEvent = false
    -            }
    -          }
    +  private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()
    --- End diff --
    
    can we have something like
    ```
    private val defaultQueue = ...
    private val executorManagementQueue = ...
    ...
    ```
    
    I think the number of queues will be small.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to