Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/19211#discussion_r139748516 --- Diff: core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala --- @@ -173,80 +159,62 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { */ @throws(classOf[TimeoutException]) def waitUntilEmpty(timeoutMillis: Long): Unit = { - val finishTime = System.currentTimeMillis + timeoutMillis - while (!queueIsEmpty) { - if (System.currentTimeMillis > finishTime) { - throw new TimeoutException( - s"The event queue is not empty after $timeoutMillis milliseconds") + val deadline = System.currentTimeMillis + timeoutMillis + queues.asScala.foreach { queue => + if (!queue.waitUntilEmpty(deadline)) { + throw new TimeoutException(s"The event queue is not empty after $timeoutMillis ms.") } - /* Sleep rather than using wait/notify, because this is used only for testing and - * wait/notify add overhead in the general case. */ - Thread.sleep(10) } } /** - * For testing only. Return whether the listener daemon thread is still alive. - * Exposed for testing. - */ - def listenerThreadIsAlive: Boolean = listenerThread.isAlive - - /** - * Return whether the event queue is empty. - * - * The use of synchronized here guarantees that all events that once belonged to this queue - * have already been processed by all attached listeners, if this returns true. - */ - private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && !processingEvent } - - /** * Stop the listener bus. It will wait until the queued events have been processed, but drop the * new events after stopping. */ def stop(): Unit = { if (!started.get()) { - throw new IllegalStateException(s"Attempted to stop $name that has not yet started!") + throw new IllegalStateException(s"Attempted to stop bus that has not yet started!") } - if (stopped.compareAndSet(false, true)) { - // Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know - // `stop` is called. - eventLock.release() - listenerThread.join() - } else { - // Keep quiet + + if (!stopped.compareAndSet(false, true)) { + return } - } - /** - * If the event queue exceeds its capacity, the new events will be dropped. The subclasses will be - * notified with the dropped events. - * - * Note: `onDropEvent` can be called in any thread. - */ - def onDropEvent(event: SparkListenerEvent): Unit = { - metrics.numDroppedEvents.inc() - droppedEventsCounter.incrementAndGet() - if (logDroppedEvent.compareAndSet(false, true)) { - // Only log the following message once to avoid duplicated annoying logs. - logError("Dropping SparkListenerEvent because no remaining room in event queue. " + - "This likely means one of the SparkListeners is too slow and cannot keep up with " + - "the rate at which tasks are being started by the scheduler.") + synchronized { + queues.asScala.foreach(_.stop()) + queues.clear() } - logTrace(s"Dropping event $event") } + + private[spark] def findListenersByClass[T <: SparkListenerInterface : ClassTag](): Seq[T] = { + queues.asScala.flatMap { queue => queue.findListenersByClass[T]() } + } + + private[spark] def listeners: JList[SparkListenerInterface] = { --- End diff -- Used by tests.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org