Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19211#discussion_r139187703 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -65,53 +60,61 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { /** When `droppedEventsCounter` was logged last time in milliseconds. */ @volatile private var lastReportTimestamp = 0L - // Indicate if we are processing some event - // Guarded by `self` - private var processingEvent = false - - private val logDroppedEvent = new AtomicBoolean(false) - - // A counter that represents the number of events produced and consumed in the queue - private val eventLock = new Semaphore(0) - - private val listenerThread = new Thread(name) { - setDaemon(true) - override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { - LiveListenerBus.withinListenerThread.withValue(true) { - val timer = metrics.eventProcessingTime - while (true) { - eventLock.acquire() - self.synchronized { - processingEvent = true - } - try { - val event = eventQueue.poll - if (event == null) { - // Get out of the while loop and shutdown the daemon thread - if (!stopped.get) { - throw new IllegalStateException("Polling `null` from eventQueue means" + - " the listener bus has been stopped. So `stopped` must be true") - } - return - } - val timerContext = timer.time() - try { - postToAll(event) - } finally { - timerContext.stop() - } - } finally { - self.synchronized { - processingEvent = false - } - } + private val queues = new CopyOnWriteArrayList[AsyncEventQueue]() --- End diff -- can we have something like ``` private val defaultQueue = ... private val executorManagementQueue = ... ... ``` I think the number of queues will be small.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org