Repository: spark Updated Branches: refs/heads/master 718bbc939 -> c6ff59a23
[SPARK-18838][CORE] Add separate listener queues to LiveListenerBus. This change modifies the live listener bus so that all listeners are added to queues; each queue has its own thread to dispatch events, making it possible to separate slow listeners from other more performance-sensitive ones. The public API has not changed - all listeners added with the existing "addListener" method, which after this change mostly means all user-defined listeners, end up in a default queue. Internally, there's an API allowing listeners to be added to specific queues, and that API is used to separate the internal Spark listeners into 3 categories: application status listeners (e.g. UI), executor management (e.g. dynamic allocation), and the event log. The queueing logic, while abstracted away in a separate class, is kept as much as possible hidden away from consumers. Aside from choosing their queue, there's no code change needed to take advantage of queues. Test coverage relies on existing tests; a few tests had to be tweaked because they relied on `LiveListenerBus.postToAll` being synchronous, and the change makes that method asynchronous. Other tests were simplified not to use the asynchronous LiveListenerBus. Author: Marcelo Vanzin <van...@cloudera.com> Closes #19211 from vanzin/SPARK-18838. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6ff59a2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6ff59a2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6ff59a2 Branch: refs/heads/master Commit: c6ff59a230758b409fa9cc548b7d283eeb7ebe5d Parents: 718bbc9 Author: Marcelo Vanzin <van...@cloudera.com> Authored: Wed Sep 20 13:41:29 2017 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Wed Sep 20 13:41:29 2017 +0800 ---------------------------------------------------------------------- .../spark/ExecutorAllocationManager.scala | 2 +- .../org/apache/spark/HeartbeatReceiver.scala | 2 +- .../scala/org/apache/spark/SparkContext.scala | 13 +- .../spark/scheduler/AsyncEventQueue.scala | 196 +++++++++++++ .../spark/scheduler/LiveListenerBus.scala | 277 ++++++++----------- .../scala/org/apache/spark/ui/SparkUI.scala | 24 +- .../spark/ExecutorAllocationManagerSuite.scala | 128 +++++---- .../spark/scheduler/DAGSchedulerSuite.scala | 2 +- .../scheduler/EventLoggingListenerSuite.scala | 6 +- .../spark/scheduler/SparkListenerSuite.scala | 95 ++++--- .../spark/ui/storage/StorageTabSuite.scala | 4 +- .../streaming/StreamingQueryListenerBus.scala | 2 +- .../apache/spark/sql/internal/SharedState.scala | 3 +- .../spark/streaming/StreamingContext.scala | 3 +- .../scheduler/StreamingListenerBus.scala | 2 +- .../spark/streaming/StreamingContextSuite.scala | 4 +- 16 files changed, 473 insertions(+), 290 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 7a5fb9a..119b426 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -217,7 +217,7 @@ private[spark] class ExecutorAllocationManager( * the scheduling task. */ def start(): Unit = { - listenerBus.addListener(listener) + listenerBus.addToManagementQueue(listener) val scheduleTask = new Runnable() { override def run(): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala index 5242ab6..ff960b3 100644 --- a/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala +++ b/core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala @@ -63,7 +63,7 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, clock: Clock) this(sc, new SystemClock) } - sc.addSparkListener(this) + sc.listenerBus.addToManagementQueue(this) override val rpcEnv: RpcEnv = sc.env.rpcEnv http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 136f0af..1821bc8 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -419,7 +419,7 @@ class SparkContext(config: SparkConf) extends Logging { // "_jobProgressListener" should be set up before creating SparkEnv because when creating // "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them. _jobProgressListener = new JobProgressListener(_conf) - listenerBus.addListener(jobProgressListener) + listenerBus.addToStatusQueue(jobProgressListener) // Create the Spark execution environment (cache, map output tracker, etc) _env = createSparkEnv(_conf, isLocal, listenerBus) @@ -442,7 +442,7 @@ class SparkContext(config: SparkConf) extends Logging { _ui = if (conf.getBoolean("spark.ui.enabled", true)) { - Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener, + Some(SparkUI.createLiveUI(this, _conf, _jobProgressListener, _env.securityManager, appName, startTime = startTime)) } else { // For tests, do not enable the UI @@ -522,7 +522,7 @@ class SparkContext(config: SparkConf) extends Logging { new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get, _conf, _hadoopConfiguration) logger.start() - listenerBus.addListener(logger) + listenerBus.addToEventLogQueue(logger) Some(logger) } else { None @@ -1563,7 +1563,7 @@ class SparkContext(config: SparkConf) extends Logging { */ @DeveloperApi def addSparkListener(listener: SparkListenerInterface) { - listenerBus.addListener(listener) + listenerBus.addToSharedQueue(listener) } /** @@ -1879,8 +1879,7 @@ class SparkContext(config: SparkConf) extends Logging { */ def stop(): Unit = { if (LiveListenerBus.withinListenerThread.value) { - throw new SparkException( - s"Cannot stop SparkContext within listener thread of ${LiveListenerBus.name}") + throw new SparkException(s"Cannot stop SparkContext within listener bus thread.") } // Use the stopping variable to ensure no contention for the stop scenario. // Still track the stopped variable for use elsewhere in the code. @@ -2378,7 +2377,7 @@ class SparkContext(config: SparkConf) extends Logging { " parameter from breaking Spark's ability to find a valid constructor.") } } - listenerBus.addListener(listener) + listenerBus.addToSharedQueue(listener) logInfo(s"Registered listener $className") } } catch { http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala new file mode 100644 index 0000000..8605e1d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} + +import com.codahale.metrics.{Gauge, Timer} + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ +import org.apache.spark.util.Utils + +/** + * An asynchronous queue for events. All events posted to this queue will be delivered to the child + * listeners in a separate thread. + * + * Delivery will only begin when the `start()` method is called. The `stop()` method should be + * called when no more events need to be delivered. + */ +private class AsyncEventQueue(val name: String, conf: SparkConf, metrics: LiveListenerBusMetrics) + extends SparkListenerBus + with Logging { + + import AsyncEventQueue._ + + // Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if + // it's perpetually being added to more quickly than it's being drained. + private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]( + conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) + + // Keep the event count separately, so that waitUntilEmpty() can be implemented properly; + // this allows that method to return only when the events in the queue have been fully + // processed (instead of just dequeued). + private val eventCount = new AtomicLong() + + /** A counter for dropped events. It will be reset every time we log it. */ + private val droppedEventsCounter = new AtomicLong(0L) + + /** When `droppedEventsCounter` was logged last time in milliseconds. */ + @volatile private var lastReportTimestamp = 0L + + private val logDroppedEvent = new AtomicBoolean(false) + + private var sc: SparkContext = null + + private val started = new AtomicBoolean(false) + private val stopped = new AtomicBoolean(false) + + private val droppedEvents = metrics.metricRegistry.counter(s"queue.$name.numDroppedEvents") + private val processingTime = metrics.metricRegistry.timer(s"queue.$name.listenerProcessingTime") + + // Remove the queue size gauge first, in case it was created by a previous incarnation of + // this queue that was removed from the listener bus. + metrics.metricRegistry.remove(s"queue.$name.size") + metrics.metricRegistry.register(s"queue.$name.size", new Gauge[Int] { + override def getValue: Int = eventQueue.size() + }) + + private val dispatchThread = new Thread(s"spark-listener-group-$name") { + setDaemon(true) + override def run(): Unit = Utils.tryOrStopSparkContext(sc) { + dispatch() + } + } + + private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) { + try { + var next: SparkListenerEvent = eventQueue.take() + while (next != POISON_PILL) { + val ctx = processingTime.time() + try { + super.postToAll(next) + } finally { + ctx.stop() + } + eventCount.decrementAndGet() + next = eventQueue.take() + } + eventCount.decrementAndGet() + } catch { + case ie: InterruptedException => + logInfo(s"Stopping listener queue $name.", ie) + } + } + + override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = { + metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface])) + } + + /** + * Start an asynchronous thread to dispatch events to the underlying listeners. + * + * @param sc Used to stop the SparkContext in case the async dispatcher fails. + */ + private[scheduler] def start(sc: SparkContext): Unit = { + if (started.compareAndSet(false, true)) { + this.sc = sc + dispatchThread.start() + } else { + throw new IllegalStateException(s"$name already started!") + } + } + + /** + * Stop the listener bus. It will wait until the queued events have been processed, but new + * events will be dropped. + */ + private[scheduler] def stop(): Unit = { + if (!started.get()) { + throw new IllegalStateException(s"Attempted to stop $name that has not yet started!") + } + if (stopped.compareAndSet(false, true)) { + eventQueue.put(POISON_PILL) + eventCount.incrementAndGet() + } + dispatchThread.join() + } + + def post(event: SparkListenerEvent): Unit = { + if (stopped.get()) { + return + } + + eventCount.incrementAndGet() + if (eventQueue.offer(event)) { + return + } + + eventCount.decrementAndGet() + droppedEvents.inc() + droppedEventsCounter.incrementAndGet() + if (logDroppedEvent.compareAndSet(false, true)) { + // Only log the following message once to avoid duplicated annoying logs. + logError(s"Dropping event from queue $name. " + + "This likely means one of the listeners is too slow and cannot keep up with " + + "the rate at which tasks are being started by the scheduler.") + } + logTrace(s"Dropping event $event") + + val droppedCount = droppedEventsCounter.get + if (droppedCount > 0) { + // Don't log too frequently + if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { + // There may be multiple threads trying to decrease droppedEventsCounter. + // Use "compareAndSet" to make sure only one thread can win. + // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and + // then that thread will update it. + if (droppedEventsCounter.compareAndSet(droppedCount, 0)) { + val prevLastReportTimestamp = lastReportTimestamp + lastReportTimestamp = System.currentTimeMillis() + val previous = new java.util.Date(prevLastReportTimestamp) + logWarning(s"Dropped $droppedEvents events from $name since $previous.") + } + } + } + } + + /** + * For testing only. Wait until there are no more events in the queue. + * + * @return true if the queue is empty. + */ + def waitUntilEmpty(deadline: Long): Boolean = { + while (eventCount.get() != 0) { + if (System.currentTimeMillis > deadline) { + return false + } + Thread.sleep(10) + } + true + } + +} + +private object AsyncEventQueue { + + val POISON_PILL = new SparkListenerEvent() { } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 7d5e980..2f93c49 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -17,20 +17,22 @@ package org.apache.spark.scheduler +import java.util.{List => JList} import java.util.concurrent._ import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} +import scala.collection.JavaConverters._ import scala.collection.mutable +import scala.reflect.ClassTag import scala.util.DynamicVariable -import com.codahale.metrics.{Counter, Gauge, MetricRegistry, Timer} +import com.codahale.metrics.{Counter, MetricRegistry, Timer} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ import org.apache.spark.metrics.MetricsSystem import org.apache.spark.metrics.source.Source -import org.apache.spark.util.Utils /** * Asynchronously passes SparkListenerEvents to registered SparkListeners. @@ -39,20 +41,13 @@ import org.apache.spark.util.Utils * has started will events be actually propagated to all attached listeners. This listener bus * is stopped when `stop()` is called, and it will drop further events after stopping. */ -private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { - - self => +private[spark] class LiveListenerBus(conf: SparkConf) { import LiveListenerBus._ private var sparkContext: SparkContext = _ - // Cap the capacity of the event queue so we get an explicit error (rather than - // an OOM exception) if it's perpetually being added to more quickly than it's being drained. - private val eventQueue = - new LinkedBlockingQueue[SparkListenerEvent](conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) - - private[spark] val metrics = new LiveListenerBusMetrics(conf, eventQueue) + private[spark] val metrics = new LiveListenerBusMetrics(conf) // Indicate if `start()` is called private val started = new AtomicBoolean(false) @@ -65,53 +60,76 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { /** When `droppedEventsCounter` was logged last time in milliseconds. */ @volatile private var lastReportTimestamp = 0L - // Indicate if we are processing some event - // Guarded by `self` - private var processingEvent = false - - private val logDroppedEvent = new AtomicBoolean(false) - - // A counter that represents the number of events produced and consumed in the queue - private val eventLock = new Semaphore(0) - - private val listenerThread = new Thread(name) { - setDaemon(true) - override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) { - LiveListenerBus.withinListenerThread.withValue(true) { - val timer = metrics.eventProcessingTime - while (true) { - eventLock.acquire() - self.synchronized { - processingEvent = true - } - try { - val event = eventQueue.poll - if (event == null) { - // Get out of the while loop and shutdown the daemon thread - if (!stopped.get) { - throw new IllegalStateException("Polling `null` from eventQueue means" + - " the listener bus has been stopped. So `stopped` must be true") - } - return - } - val timerContext = timer.time() - try { - postToAll(event) - } finally { - timerContext.stop() - } - } finally { - self.synchronized { - processingEvent = false - } - } + private val queues = new CopyOnWriteArrayList[AsyncEventQueue]() + + /** Add a listener to queue shared by all non-internal listeners. */ + def addToSharedQueue(listener: SparkListenerInterface): Unit = { + addToQueue(listener, SHARED_QUEUE) + } + + /** Add a listener to the executor management queue. */ + def addToManagementQueue(listener: SparkListenerInterface): Unit = { + addToQueue(listener, EXECUTOR_MANAGEMENT_QUEUE) + } + + /** Add a listener to the application status queue. */ + def addToStatusQueue(listener: SparkListenerInterface): Unit = { + addToQueue(listener, APP_STATUS_QUEUE) + } + + /** Add a listener to the event log queue. */ + def addToEventLogQueue(listener: SparkListenerInterface): Unit = { + addToQueue(listener, EVENT_LOG_QUEUE) + } + + /** + * Add a listener to a specific queue, creating a new queue if needed. Queues are independent + * of each other (each one uses a separate thread for delivering events), allowing slower + * listeners to be somewhat isolated from others. + */ + private def addToQueue(listener: SparkListenerInterface, queue: String): Unit = synchronized { + if (stopped.get()) { + throw new IllegalStateException("LiveListenerBus is stopped.") + } + + queues.asScala.find(_.name == queue) match { + case Some(queue) => + queue.addListener(listener) + + case None => + val newQueue = new AsyncEventQueue(queue, conf, metrics) + newQueue.addListener(listener) + if (started.get()) { + newQueue.start(sparkContext) } - } + queues.add(newQueue) } } - override protected def getTimer(listener: SparkListenerInterface): Option[Timer] = { - metrics.getTimerForListenerClass(listener.getClass.asSubclass(classOf[SparkListenerInterface])) + def removeListener(listener: SparkListenerInterface): Unit = synchronized { + // Remove listener from all queues it was added to, and stop queues that have become empty. + queues.asScala + .filter { queue => + queue.removeListener(listener) + queue.listeners.isEmpty() + } + .foreach { toRemove => + if (started.get() && !stopped.get()) { + toRemove.stop() + } + queues.remove(toRemove) + } + } + + /** Post an event to all queues. */ + def post(event: SparkListenerEvent): Unit = { + if (!stopped.get()) { + metrics.numEventsPosted.inc() + val it = queues.iterator() + while (it.hasNext()) { + it.next().post(event) + } + } } /** @@ -123,46 +141,14 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { * * @param sc Used to stop the SparkContext in case the listener thread dies. */ - def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = { - if (started.compareAndSet(false, true)) { - sparkContext = sc - metricsSystem.registerSource(metrics) - listenerThread.start() - } else { - throw new IllegalStateException(s"$name already started!") - } - } - - def post(event: SparkListenerEvent): Unit = { - if (stopped.get) { - // Drop further events to make `listenerThread` exit ASAP - logDebug(s"$name has already stopped! Dropping event $event") - return - } - metrics.numEventsPosted.inc() - val eventAdded = eventQueue.offer(event) - if (eventAdded) { - eventLock.release() - } else { - onDropEvent(event) + def start(sc: SparkContext, metricsSystem: MetricsSystem): Unit = synchronized { + if (!started.compareAndSet(false, true)) { + throw new IllegalStateException("LiveListenerBus already started.") } - val droppedEvents = droppedEventsCounter.get - if (droppedEvents > 0) { - // Don't log too frequently - if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) { - // There may be multiple threads trying to decrease droppedEventsCounter. - // Use "compareAndSet" to make sure only one thread can win. - // And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and - // then that thread will update it. - if (droppedEventsCounter.compareAndSet(droppedEvents, 0)) { - val prevLastReportTimestamp = lastReportTimestamp - lastReportTimestamp = System.currentTimeMillis() - logWarning(s"Dropped $droppedEvents SparkListenerEvents since " + - new java.util.Date(prevLastReportTimestamp)) - } - } - } + this.sparkContext = sc + queues.asScala.foreach(_.start(sc)) + metricsSystem.registerSource(metrics) } /** @@ -173,80 +159,64 @@ private[spark] class LiveListenerBus(conf: SparkConf) extends SparkListenerBus { */ @throws(classOf[TimeoutException]) def waitUntilEmpty(timeoutMillis: Long): Unit = { - val finishTime = System.currentTimeMillis + timeoutMillis - while (!queueIsEmpty) { - if (System.currentTimeMillis > finishTime) { - throw new TimeoutException( - s"The event queue is not empty after $timeoutMillis milliseconds") + val deadline = System.currentTimeMillis + timeoutMillis + queues.asScala.foreach { queue => + if (!queue.waitUntilEmpty(deadline)) { + throw new TimeoutException(s"The event queue is not empty after $timeoutMillis ms.") } - /* Sleep rather than using wait/notify, because this is used only for testing and - * wait/notify add overhead in the general case. */ - Thread.sleep(10) } } /** - * For testing only. Return whether the listener daemon thread is still alive. - * Exposed for testing. - */ - def listenerThreadIsAlive: Boolean = listenerThread.isAlive - - /** - * Return whether the event queue is empty. - * - * The use of synchronized here guarantees that all events that once belonged to this queue - * have already been processed by all attached listeners, if this returns true. - */ - private def queueIsEmpty: Boolean = synchronized { eventQueue.isEmpty && !processingEvent } - - /** * Stop the listener bus. It will wait until the queued events have been processed, but drop the * new events after stopping. */ def stop(): Unit = { if (!started.get()) { - throw new IllegalStateException(s"Attempted to stop $name that has not yet started!") + throw new IllegalStateException(s"Attempted to stop bus that has not yet started!") } - if (stopped.compareAndSet(false, true)) { - // Call eventLock.release() so that listenerThread will poll `null` from `eventQueue` and know - // `stop` is called. - eventLock.release() - listenerThread.join() - } else { - // Keep quiet + + if (!stopped.compareAndSet(false, true)) { + return } - } - /** - * If the event queue exceeds its capacity, the new events will be dropped. The subclasses will be - * notified with the dropped events. - * - * Note: `onDropEvent` can be called in any thread. - */ - def onDropEvent(event: SparkListenerEvent): Unit = { - metrics.numDroppedEvents.inc() - droppedEventsCounter.incrementAndGet() - if (logDroppedEvent.compareAndSet(false, true)) { - // Only log the following message once to avoid duplicated annoying logs. - logError("Dropping SparkListenerEvent because no remaining room in event queue. " + - "This likely means one of the SparkListeners is too slow and cannot keep up with " + - "the rate at which tasks are being started by the scheduler.") + synchronized { + queues.asScala.foreach(_.stop()) + queues.clear() } - logTrace(s"Dropping event $event") } + + // For testing only. + private[spark] def findListenersByClass[T <: SparkListenerInterface : ClassTag](): Seq[T] = { + queues.asScala.flatMap { queue => queue.findListenersByClass[T]() } + } + + // For testing only. + private[spark] def listeners: JList[SparkListenerInterface] = { + queues.asScala.flatMap(_.listeners.asScala).asJava + } + + // For testing only. + private[scheduler] def activeQueues(): Set[String] = { + queues.asScala.map(_.name).toSet + } + } private[spark] object LiveListenerBus { // Allows for Context to check whether stop() call is made within listener thread val withinListenerThread: DynamicVariable[Boolean] = new DynamicVariable[Boolean](false) - /** The thread name of Spark listener bus */ - val name = "SparkListenerBus" + private[scheduler] val SHARED_QUEUE = "shared" + + private[scheduler] val APP_STATUS_QUEUE = "appStatus" + + private[scheduler] val EXECUTOR_MANAGEMENT_QUEUE = "executorManagement" + + private[scheduler] val EVENT_LOG_QUEUE = "eventLog" } -private[spark] class LiveListenerBusMetrics( - conf: SparkConf, - queue: LinkedBlockingQueue[_]) +private[spark] class LiveListenerBusMetrics(conf: SparkConf) extends Source with Logging { override val sourceName: String = "LiveListenerBus" @@ -260,25 +230,6 @@ private[spark] class LiveListenerBusMetrics( */ val numEventsPosted: Counter = metricRegistry.counter(MetricRegistry.name("numEventsPosted")) - /** - * The total number of events that were dropped without being delivered to listeners. - */ - val numDroppedEvents: Counter = metricRegistry.counter(MetricRegistry.name("numEventsDropped")) - - /** - * The amount of time taken to post a single event to all listeners. - */ - val eventProcessingTime: Timer = metricRegistry.timer(MetricRegistry.name("eventProcessingTime")) - - /** - * The number of messages waiting in the queue. - */ - val queueSize: Gauge[Int] = { - metricRegistry.register(MetricRegistry.name("queueSize"), new Gauge[Int]{ - override def getValue: Int = queue.size() - }) - } - // Guarded by synchronization. private val perListenerClassTimers = mutable.Map[String, Timer]() @@ -303,5 +254,5 @@ private[spark] class LiveListenerBusMetrics( } } } -} +} http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/main/scala/org/apache/spark/ui/SparkUI.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index f3fcf27..6e94073 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -162,13 +162,14 @@ private[spark] object SparkUI { def createLiveUI( sc: SparkContext, conf: SparkConf, - listenerBus: SparkListenerBus, jobProgressListener: JobProgressListener, securityManager: SecurityManager, appName: String, startTime: Long): SparkUI = { - create(Some(sc), conf, listenerBus, securityManager, appName, - jobProgressListener = Some(jobProgressListener), startTime = startTime) + create(Some(sc), conf, + sc.listenerBus.addToStatusQueue, + securityManager, appName, jobProgressListener = Some(jobProgressListener), + startTime = startTime) } def createHistoryUI( @@ -179,8 +180,7 @@ private[spark] object SparkUI { basePath: String, lastUpdateTime: Option[Long], startTime: Long): SparkUI = { - val sparkUI = create( - None, conf, listenerBus, securityManager, appName, basePath, + val sparkUI = create(None, conf, listenerBus.addListener, securityManager, appName, basePath, lastUpdateTime = lastUpdateTime, startTime = startTime) val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory], @@ -202,7 +202,7 @@ private[spark] object SparkUI { private def create( sc: Option[SparkContext], conf: SparkConf, - listenerBus: SparkListenerBus, + addListenerFn: SparkListenerInterface => Unit, securityManager: SecurityManager, appName: String, basePath: String = "", @@ -212,7 +212,7 @@ private[spark] object SparkUI { val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse { val listener = new JobProgressListener(conf) - listenerBus.addListener(listener) + addListenerFn(listener) listener } @@ -222,11 +222,11 @@ private[spark] object SparkUI { val storageListener = new StorageListener(storageStatusListener) val operationGraphListener = new RDDOperationGraphListener(conf) - listenerBus.addListener(environmentListener) - listenerBus.addListener(storageStatusListener) - listenerBus.addListener(executorsListener) - listenerBus.addListener(storageListener) - listenerBus.addListener(operationGraphListener) + addListenerFn(environmentListener) + addListenerFn(storageStatusListener) + addListenerFn(executorsListener) + addListenerFn(storageListener) + addListenerFn(operationGraphListener) new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, executorsListener, _jobProgressListener, storageListener, operationGraphListener, http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 7da4bae..a91e09b 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -49,6 +49,11 @@ class ExecutorAllocationManagerSuite contexts.foreach(_.stop()) } + private def post(bus: LiveListenerBus, event: SparkListenerEvent): Unit = { + bus.post(event) + bus.waitUntilEmpty(1000) + } + test("verify min/max executors") { val conf = new SparkConf() .setMaster("myDummyLocalExternalClusterManager") @@ -95,7 +100,7 @@ class ExecutorAllocationManagerSuite test("add executors") { sc = createSparkContext(1, 10, 1) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1000))) // Keep adding until the limit is reached assert(numExecutorsTarget(manager) === 1) @@ -140,7 +145,7 @@ class ExecutorAllocationManagerSuite test("add executors capped by num pending tasks") { sc = createSparkContext(0, 10, 0) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 5))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 5))) // Verify that we're capped at number of tasks in the stage assert(numExecutorsTarget(manager) === 0) @@ -156,10 +161,10 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) // Verify that running a task doesn't affect the target - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 3))) - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 3))) + post(sc.listenerBus, SparkListenerExecutorAdded( 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) - sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) + post(sc.listenerBus, SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) assert(numExecutorsTarget(manager) === 5) assert(addExecutors(manager) === 1) assert(numExecutorsTarget(manager) === 6) @@ -172,9 +177,9 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) // Verify that re-running a task doesn't blow things up - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 3))) - sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1"))) - sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1"))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 3))) + post(sc.listenerBus, SparkListenerTaskStart(2, 0, createTaskInfo(0, 0, "executor-1"))) + post(sc.listenerBus, SparkListenerTaskStart(2, 0, createTaskInfo(1, 0, "executor-1"))) assert(addExecutors(manager) === 1) assert(numExecutorsTarget(manager) === 9) assert(numExecutorsToAdd(manager) === 2) @@ -183,7 +188,7 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) // Verify that running a task once we're at our limit doesn't blow things up - sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1"))) + post(sc.listenerBus, SparkListenerTaskStart(2, 0, createTaskInfo(0, 1, "executor-1"))) assert(addExecutors(manager) === 0) assert(numExecutorsTarget(manager) === 10) } @@ -193,13 +198,13 @@ class ExecutorAllocationManagerSuite val manager = sc.executorAllocationManager.get // Verify that we're capped at number of tasks including the speculative ones in the stage - sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1)) + post(sc.listenerBus, SparkListenerSpeculativeTaskSubmitted(1)) assert(numExecutorsTarget(manager) === 0) assert(numExecutorsToAdd(manager) === 1) assert(addExecutors(manager) === 1) - sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1)) - sc.listenerBus.postToAll(SparkListenerSpeculativeTaskSubmitted(1)) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2))) + post(sc.listenerBus, SparkListenerSpeculativeTaskSubmitted(1)) + post(sc.listenerBus, SparkListenerSpeculativeTaskSubmitted(1)) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 2))) assert(numExecutorsTarget(manager) === 1) assert(numExecutorsToAdd(manager) === 2) assert(addExecutors(manager) === 2) @@ -210,13 +215,13 @@ class ExecutorAllocationManagerSuite assert(numExecutorsToAdd(manager) === 1) // Verify that running a task doesn't affect the target - sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) + post(sc.listenerBus, SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-1"))) assert(numExecutorsTarget(manager) === 5) assert(addExecutors(manager) === 0) assert(numExecutorsToAdd(manager) === 1) // Verify that running a speculative task doesn't affect the target - sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-2", true))) + post(sc.listenerBus, SparkListenerTaskStart(1, 0, createTaskInfo(0, 0, "executor-2", true))) assert(numExecutorsTarget(manager) === 5) assert(addExecutors(manager) === 0) assert(numExecutorsToAdd(manager) === 1) @@ -225,7 +230,7 @@ class ExecutorAllocationManagerSuite test("cancel pending executors when no longer needed") { sc = createSparkContext(0, 10, 0) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, 5))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, 5))) assert(numExecutorsTarget(manager) === 0) assert(numExecutorsToAdd(manager) === 1) @@ -236,15 +241,15 @@ class ExecutorAllocationManagerSuite assert(numExecutorsTarget(manager) === 3) val task1Info = createTaskInfo(0, 0, "executor-1") - sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task1Info)) + post(sc.listenerBus, SparkListenerTaskStart(2, 0, task1Info)) assert(numExecutorsToAdd(manager) === 4) assert(addExecutors(manager) === 2) val task2Info = createTaskInfo(1, 0, "executor-1") - sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, task2Info)) - sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, Success, task1Info, null)) - sc.listenerBus.postToAll(SparkListenerTaskEnd(2, 0, null, Success, task2Info, null)) + post(sc.listenerBus, SparkListenerTaskStart(2, 0, task2Info)) + post(sc.listenerBus, SparkListenerTaskEnd(2, 0, null, Success, task1Info, null)) + post(sc.listenerBus, SparkListenerTaskEnd(2, 0, null, Success, task2Info, null)) assert(adjustRequestedExecutors(manager) === -1) } @@ -352,21 +357,22 @@ class ExecutorAllocationManagerSuite sc = createSparkContext(5, 12, 5) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 8))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 8))) // Remove when numExecutorsTarget is the same as the current number of executors assert(addExecutors(manager) === 1) assert(addExecutors(manager) === 2) (1 to 8).map { i => createTaskInfo(i, i, s"$i") }.foreach { - info => sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, info)) } + info => post(sc.listenerBus, SparkListenerTaskStart(0, 0, info)) } assert(executorIds(manager).size === 8) assert(numExecutorsTarget(manager) === 8) assert(maxNumExecutorsNeeded(manager) == 8) assert(!removeExecutor(manager, "1")) // won't work since numExecutorsTarget == numExecutors // Remove executors when numExecutorsTarget is lower than current number of executors - (1 to 3).map { i => createTaskInfo(i, i, s"$i") }.foreach { - info => sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, Success, info, null)) } + (1 to 3).map { i => createTaskInfo(i, i, s"$i") }.foreach { info => + post(sc.listenerBus, SparkListenerTaskEnd(0, 0, null, Success, info, null)) + } adjustRequestedExecutors(manager) assert(executorIds(manager).size === 8) assert(numExecutorsTarget(manager) === 5) @@ -378,7 +384,7 @@ class ExecutorAllocationManagerSuite onExecutorRemoved(manager, "3") // numExecutorsTarget is lower than minNumExecutors - sc.listenerBus.postToAll( + post(sc.listenerBus, SparkListenerTaskEnd(0, 0, null, Success, createTaskInfo(4, 4, "4"), null)) assert(executorIds(manager).size === 5) assert(numExecutorsTarget(manager) === 5) @@ -390,7 +396,7 @@ class ExecutorAllocationManagerSuite test ("interleaving add and remove") { sc = createSparkContext(5, 12, 5) val manager = sc.executorAllocationManager.get - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1000))) // Add a few executors assert(addExecutors(manager) === 1) @@ -569,7 +575,7 @@ class ExecutorAllocationManagerSuite val clock = new ManualClock(2020L) val manager = sc.executorAllocationManager.get manager.setClock(clock) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1000))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1000))) // Scheduler queue backlogged onSchedulerBacklogged(manager) @@ -682,26 +688,26 @@ class ExecutorAllocationManagerSuite // Starting a stage should start the add timer val numTasks = 10 - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, numTasks))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, numTasks))) assert(addTime(manager) !== NOT_SET) // Starting a subset of the tasks should not cancel the add timer val taskInfos = (0 to numTasks - 1).map { i => createTaskInfo(i, i, "executor-1") } - taskInfos.tail.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, info)) } + taskInfos.tail.foreach { info => post(sc.listenerBus, SparkListenerTaskStart(0, 0, info)) } assert(addTime(manager) !== NOT_SET) // Starting all remaining tasks should cancel the add timer - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, taskInfos.head)) + post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfos.head)) assert(addTime(manager) === NOT_SET) // Start two different stages // The add timer should be canceled only if all tasks in both stages start running - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, numTasks))) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(2, numTasks))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, numTasks))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(2, numTasks))) assert(addTime(manager) !== NOT_SET) - taskInfos.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(1, 0, info)) } + taskInfos.foreach { info => post(sc.listenerBus, SparkListenerTaskStart(1, 0, info)) } assert(addTime(manager) !== NOT_SET) - taskInfos.foreach { info => sc.listenerBus.postToAll(SparkListenerTaskStart(2, 0, info)) } + taskInfos.foreach { info => post(sc.listenerBus, SparkListenerTaskStart(2, 0, info)) } assert(addTime(manager) === NOT_SET) } @@ -715,22 +721,22 @@ class ExecutorAllocationManagerSuite assert(removeTimes(manager).size === 5) // Starting a task cancel the remove timer for that executor - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(1, 1, "executor-1"))) - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(2, 2, "executor-2"))) + post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) + post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(1, 1, "executor-1"))) + post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(2, 2, "executor-2"))) assert(removeTimes(manager).size === 3) assert(!removeTimes(manager).contains("executor-1")) assert(!removeTimes(manager).contains("executor-2")) // Finishing all tasks running on an executor should start the remove timer for that executor - sc.listenerBus.postToAll(SparkListenerTaskEnd( + post(sc.listenerBus, SparkListenerTaskEnd( 0, 0, "task-type", Success, createTaskInfo(0, 0, "executor-1"), new TaskMetrics)) - sc.listenerBus.postToAll(SparkListenerTaskEnd( + post(sc.listenerBus, SparkListenerTaskEnd( 0, 0, "task-type", Success, createTaskInfo(2, 2, "executor-2"), new TaskMetrics)) assert(removeTimes(manager).size === 4) assert(!removeTimes(manager).contains("executor-1")) // executor-1 has not finished yet assert(removeTimes(manager).contains("executor-2")) - sc.listenerBus.postToAll(SparkListenerTaskEnd( + post(sc.listenerBus, SparkListenerTaskEnd( 0, 0, "task-type", Success, createTaskInfo(1, 1, "executor-1"), new TaskMetrics)) assert(removeTimes(manager).size === 5) assert(removeTimes(manager).contains("executor-1")) // executor-1 has now finished @@ -743,13 +749,13 @@ class ExecutorAllocationManagerSuite assert(removeTimes(manager).isEmpty) // New executors have registered - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + post(sc.listenerBus, SparkListenerExecutorAdded( 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 1) assert(removeTimes(manager).contains("executor-1")) - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + post(sc.listenerBus, SparkListenerExecutorAdded( 0L, "executor-2", new ExecutorInfo("host2", 1, Map.empty))) assert(executorIds(manager).size === 2) assert(executorIds(manager).contains("executor-2")) @@ -757,14 +763,14 @@ class ExecutorAllocationManagerSuite assert(removeTimes(manager).contains("executor-2")) // Existing executors have disconnected - sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-1", "")) + post(sc.listenerBus, SparkListenerExecutorRemoved(0L, "executor-1", "")) assert(executorIds(manager).size === 1) assert(!executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 1) assert(!removeTimes(manager).contains("executor-1")) // Unknown executor has disconnected - sc.listenerBus.postToAll(SparkListenerExecutorRemoved(0L, "executor-3", "")) + post(sc.listenerBus, SparkListenerExecutorRemoved(0L, "executor-3", "")) assert(executorIds(manager).size === 1) assert(removeTimes(manager).size === 1) } @@ -775,8 +781,8 @@ class ExecutorAllocationManagerSuite assert(executorIds(manager).isEmpty) assert(removeTimes(manager).isEmpty) - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) + post(sc.listenerBus, SparkListenerExecutorAdded( 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) @@ -788,15 +794,15 @@ class ExecutorAllocationManagerSuite val manager = sc.executorAllocationManager.get assert(executorIds(manager).isEmpty) assert(removeTimes(manager).isEmpty) - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + post(sc.listenerBus, SparkListenerExecutorAdded( 0L, "executor-1", new ExecutorInfo("host1", 1, Map.empty))) - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) + post(sc.listenerBus, SparkListenerTaskStart(0, 0, createTaskInfo(0, 0, "executor-1"))) assert(executorIds(manager).size === 1) assert(executorIds(manager).contains("executor-1")) assert(removeTimes(manager).size === 0) - sc.listenerBus.postToAll(SparkListenerExecutorAdded( + post(sc.listenerBus, SparkListenerExecutorAdded( 0L, "executor-2", new ExecutorInfo("host1", 1, Map.empty))) assert(executorIds(manager).size === 2) assert(executorIds(manager).contains("executor-2")) @@ -809,7 +815,7 @@ class ExecutorAllocationManagerSuite sc = createSparkContext(0, 100000, 0) val manager = sc.executorAllocationManager.get val stage1 = createStageInfo(0, 1000) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(stage1)) + post(sc.listenerBus, SparkListenerStageSubmitted(stage1)) assert(addExecutors(manager) === 1) assert(addExecutors(manager) === 2) @@ -820,12 +826,12 @@ class ExecutorAllocationManagerSuite onExecutorAdded(manager, s"executor-$i") } assert(executorIds(manager).size === 15) - sc.listenerBus.postToAll(SparkListenerStageCompleted(stage1)) + post(sc.listenerBus, SparkListenerStageCompleted(stage1)) adjustRequestedExecutors(manager) assert(numExecutorsTarget(manager) === 0) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 1000))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 1000))) addExecutors(manager) assert(numExecutorsTarget(manager) === 16) } @@ -842,7 +848,7 @@ class ExecutorAllocationManagerSuite // Verify whether the initial number of executors is kept with no pending tasks assert(numExecutorsTarget(manager) === 3) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(1, 2))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(1, 2))) clock.advance(100L) assert(maxNumExecutorsNeeded(manager) === 2) @@ -892,7 +898,7 @@ class ExecutorAllocationManagerSuite Seq.empty ) val stageInfo1 = createStageInfo(1, 5, localityPreferences1) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo1)) + post(sc.listenerBus, SparkListenerStageSubmitted(stageInfo1)) assert(localityAwareTasks(manager) === 3) assert(hostToLocalTaskCount(manager) === @@ -904,13 +910,13 @@ class ExecutorAllocationManagerSuite Seq.empty ) val stageInfo2 = createStageInfo(2, 3, localityPreferences2) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(stageInfo2)) + post(sc.listenerBus, SparkListenerStageSubmitted(stageInfo2)) assert(localityAwareTasks(manager) === 5) assert(hostToLocalTaskCount(manager) === Map("host1" -> 2, "host2" -> 4, "host3" -> 4, "host4" -> 3, "host5" -> 2)) - sc.listenerBus.postToAll(SparkListenerStageCompleted(stageInfo1)) + post(sc.listenerBus, SparkListenerStageCompleted(stageInfo1)) assert(localityAwareTasks(manager) === 2) assert(hostToLocalTaskCount(manager) === Map("host2" -> 1, "host3" -> 2, "host4" -> 1, "host5" -> 2)) @@ -921,16 +927,16 @@ class ExecutorAllocationManagerSuite val manager = sc.executorAllocationManager.get assert(maxNumExecutorsNeeded(manager) === 0) - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 1))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 1))) assert(maxNumExecutorsNeeded(manager) === 1) val taskInfo = createTaskInfo(1, 1, "executor-1") - sc.listenerBus.postToAll(SparkListenerTaskStart(0, 0, taskInfo)) + post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfo)) assert(maxNumExecutorsNeeded(manager) === 1) // If the task is failed, we expect it to be resubmitted later. val taskEndReason = ExceptionFailure(null, null, null, null, None) - sc.listenerBus.postToAll(SparkListenerTaskEnd(0, 0, null, taskEndReason, taskInfo, null)) + post(sc.listenerBus, SparkListenerTaskEnd(0, 0, null, taskEndReason, taskInfo, null)) assert(maxNumExecutorsNeeded(manager) === 1) } @@ -942,7 +948,7 @@ class ExecutorAllocationManagerSuite // Allocation manager is reset when adding executor requests are sent without reporting back // executor added. - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 10))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 10))) assert(addExecutors(manager) === 1) assert(numExecutorsTarget(manager) === 2) @@ -957,7 +963,7 @@ class ExecutorAllocationManagerSuite assert(executorIds(manager) === Set.empty) // Allocation manager is reset when executors are added. - sc.listenerBus.postToAll(SparkListenerStageSubmitted(createStageInfo(0, 10))) + post(sc.listenerBus, SparkListenerStageSubmitted(createStageInfo(0, 10))) addExecutors(manager) addExecutors(manager) http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 703fc1b..6222e57 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -751,7 +751,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi // Helper functions to extract commonly used code in Fetch Failure test cases private def setupStageAbortTest(sc: SparkContext) { - sc.listenerBus.addListener(new EndListener()) + sc.listenerBus.addToSharedQueue(new EndListener()) ended = false jobResult = null } http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index 0afd07b..6b42775 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -164,9 +164,9 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite eventLogger.start() listenerBus.start(Mockito.mock(classOf[SparkContext]), Mockito.mock(classOf[MetricsSystem])) - listenerBus.addListener(eventLogger) - listenerBus.postToAll(applicationStart) - listenerBus.postToAll(applicationEnd) + listenerBus.addToEventLogQueue(eventLogger) + listenerBus.post(applicationStart) + listenerBus.post(applicationEnd) listenerBus.stop() eventLogger.stop() http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 995df1d..d061c78 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -34,6 +34,8 @@ import org.apache.spark.util.{ResetSystemProperties, RpcUtils} class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers with ResetSystemProperties { + import LiveListenerBus._ + /** Length of time to wait while draining listener events. */ val WAIT_TIMEOUT_MILLIS = 10000 @@ -42,18 +44,28 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match private val mockSparkContext: SparkContext = Mockito.mock(classOf[SparkContext]) private val mockMetricsSystem: MetricsSystem = Mockito.mock(classOf[MetricsSystem]) + private def numDroppedEvents(bus: LiveListenerBus): Long = { + bus.metrics.metricRegistry.counter(s"queue.$SHARED_QUEUE.numDroppedEvents").getCount + } + + private def queueSize(bus: LiveListenerBus): Int = { + bus.metrics.metricRegistry.getGauges().get(s"queue.$SHARED_QUEUE.size").getValue() + .asInstanceOf[Int] + } + + private def eventProcessingTimeCount(bus: LiveListenerBus): Long = { + bus.metrics.metricRegistry.timer(s"queue.$SHARED_QUEUE.listenerProcessingTime").getCount() + } + test("don't call sc.stop in listener") { sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) val listener = new SparkContextStoppingListener(sc) - val bus = new LiveListenerBus(sc.conf) - bus.addListener(listener) - // Starting listener bus should flush all buffered events - bus.start(sc, sc.env.metricsSystem) - bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) - bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + sc.listenerBus.addToSharedQueue(listener) + sc.listenerBus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) + sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) + sc.stop() - bus.stop() assert(listener.sparkExSeen) } @@ -61,13 +73,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val conf = new SparkConf() val counter = new BasicJobCounter val bus = new LiveListenerBus(conf) - bus.addListener(counter) + bus.addToSharedQueue(counter) // Metrics are initially empty. assert(bus.metrics.numEventsPosted.getCount === 0) - assert(bus.metrics.numDroppedEvents.getCount === 0) - assert(bus.metrics.queueSize.getValue === 0) - assert(bus.metrics.eventProcessingTime.getCount === 0) + assert(numDroppedEvents(bus) === 0) + assert(queueSize(bus) === 0) + assert(eventProcessingTimeCount(bus) === 0) // Post five events: (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } @@ -75,7 +87,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match // Five messages should be marked as received and queued, but no messages should be posted to // listeners yet because the the listener bus hasn't been started. assert(bus.metrics.numEventsPosted.getCount === 5) - assert(bus.metrics.queueSize.getValue === 5) + assert(queueSize(bus) === 5) assert(counter.count === 0) // Starting listener bus should flush all buffered events @@ -83,18 +95,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match Mockito.verify(mockMetricsSystem).registerSource(bus.metrics) bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(counter.count === 5) - assert(bus.metrics.queueSize.getValue === 0) - assert(bus.metrics.eventProcessingTime.getCount === 5) + assert(queueSize(bus) === 0) + assert(eventProcessingTimeCount(bus) === 5) // After listener bus has stopped, posting events should not increment counter bus.stop() (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } assert(counter.count === 5) - assert(bus.metrics.numEventsPosted.getCount === 5) - - // Make sure per-listener-class timers were created: - assert(bus.metrics.getTimerForListenerClass( - classOf[BasicJobCounter].asSubclass(classOf[SparkListenerInterface])).get.getCount == 5) + assert(eventProcessingTimeCount(bus) === 5) // Listener bus must not be started twice intercept[IllegalStateException] { @@ -135,7 +143,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val bus = new LiveListenerBus(new SparkConf()) val blockingListener = new BlockingListener - bus.addListener(blockingListener) + bus.addToSharedQueue(blockingListener) bus.start(mockSparkContext, mockMetricsSystem) bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) @@ -168,7 +176,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val listenerStarted = new Semaphore(0) val listenerWait = new Semaphore(0) - bus.addListener(new SparkListener { + bus.addToSharedQueue(new SparkListener { override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { listenerStarted.release() listenerWait.acquire() @@ -180,20 +188,19 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match // Post a message to the listener bus and wait for processing to begin: bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) listenerStarted.acquire() - assert(bus.metrics.queueSize.getValue === 0) - assert(bus.metrics.numDroppedEvents.getCount === 0) + assert(queueSize(bus) === 0) + assert(numDroppedEvents(bus) === 0) // If we post an additional message then it should remain in the queue because the listener is // busy processing the first event: bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) - assert(bus.metrics.queueSize.getValue === 1) - assert(bus.metrics.numDroppedEvents.getCount === 0) + assert(queueSize(bus) === 1) + assert(numDroppedEvents(bus) === 0) // The queue is now full, so any additional events posted to the listener will be dropped: bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) - assert(bus.metrics.queueSize.getValue === 1) - assert(bus.metrics.numDroppedEvents.getCount === 1) - + assert(queueSize(bus) === 1) + assert(numDroppedEvents(bus) === 1) // Allow the the remaining events to be processed so we can stop the listener bus: listenerWait.release(2) @@ -419,9 +426,9 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val bus = new LiveListenerBus(new SparkConf()) // Propagate events to bad listener first - bus.addListener(badListener) - bus.addListener(jobCounter1) - bus.addListener(jobCounter2) + bus.addToSharedQueue(badListener) + bus.addToSharedQueue(jobCounter1) + bus.addToSharedQueue(jobCounter2) bus.start(mockSparkContext, mockMetricsSystem) // Post events to all listeners, and wait until the queue is drained @@ -429,7 +436,6 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) // The exception should be caught, and the event should be propagated to other listeners - assert(bus.listenerThreadIsAlive) assert(jobCounter1.count === 5) assert(jobCounter2.count === 5) } @@ -449,6 +455,31 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match .count(_.isInstanceOf[FirehoseListenerThatAcceptsSparkConf]) should be (1) } + test("add and remove listeners to/from LiveListenerBus queues") { + val bus = new LiveListenerBus(new SparkConf(false)) + val counter1 = new BasicJobCounter() + val counter2 = new BasicJobCounter() + val counter3 = new BasicJobCounter() + + bus.addToSharedQueue(counter1) + bus.addToStatusQueue(counter2) + bus.addToStatusQueue(counter3) + assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE)) + assert(bus.findListenersByClass[BasicJobCounter]().size === 3) + + bus.removeListener(counter1) + assert(bus.activeQueues() === Set(APP_STATUS_QUEUE)) + assert(bus.findListenersByClass[BasicJobCounter]().size === 2) + + bus.removeListener(counter2) + assert(bus.activeQueues() === Set(APP_STATUS_QUEUE)) + assert(bus.findListenersByClass[BasicJobCounter]().size === 1) + + bus.removeListener(counter3) + assert(bus.activeQueues().isEmpty) + assert(bus.findListenersByClass[BasicJobCounter]().isEmpty) + } + /** * Assert that the given list of numbers has an average that is greater than zero. */ http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index 1cb5259..79f02f2 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.storage._ * Test various functionality in the StorageListener that supports the StorageTab. */ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { - private var bus: LiveListenerBus = _ + private var bus: SparkListenerBus = _ private var storageStatusListener: StorageStatusListener = _ private var storageListener: StorageListener = _ private val memAndDisk = StorageLevel.MEMORY_AND_DISK @@ -43,7 +43,7 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { before { val conf = new SparkConf() - bus = new LiveListenerBus(conf) + bus = new ReplayListenerBus() storageStatusListener = new StorageStatusListener(conf) storageListener = new StorageListener(storageStatusListener) bus.addListener(storageStatusListener) http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala index 4207013..07e3902 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala @@ -40,7 +40,7 @@ class StreamingQueryListenerBus(sparkListenerBus: LiveListenerBus) import StreamingQueryListener._ - sparkListenerBus.addListener(this) + sparkListenerBus.addToSharedQueue(this) /** * RunIds of active queries whose events are supposed to be forwarded by this ListenerBus http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 7202f12..ad9db30 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -28,6 +28,7 @@ import org.apache.hadoop.fs.FsUrlStreamHandlerFactory import org.apache.spark.{SparkConf, SparkContext, SparkException} import org.apache.spark.internal.Logging +import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.sql.{SparkSession, SQLContext} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.execution.CacheManager @@ -148,7 +149,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) extends Logging { if (SparkSession.sqlListener.get() == null) { val listener = new SQLListener(sc.conf) if (SparkSession.sqlListener.compareAndSet(null, listener)) { - sc.addSparkListener(listener) + sc.listenerBus.addToStatusQueue(listener) sc.ui.foreach(new SQLTab(listener, _)) } } http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index f3b4ff2..8c7418e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -659,8 +659,7 @@ class StreamingContext private[streaming] ( def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = { var shutdownHookRefToRemove: AnyRef = null if (LiveListenerBus.withinListenerThread.value) { - throw new SparkException( - s"Cannot stop StreamingContext within listener thread of ${LiveListenerBus.name}") + throw new SparkException(s"Cannot stop StreamingContext within listener bus thread.") } synchronized { // The state should always be Stopped after calling `stop()`, even if we haven't started yet http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index 5fb0bd0..6a70bf7 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -76,7 +76,7 @@ private[streaming] class StreamingListenerBus(sparkListenerBus: LiveListenerBus) * forward them to StreamingListeners. */ def start(): Unit = { - sparkListenerBus.addListener(this) // for getting callbacks on spark events + sparkListenerBus.addToStatusQueue(this) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/c6ff59a2/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 96ab5a2..5810e73 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -575,8 +575,6 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL test("getActive and getActiveOrCreate") { require(StreamingContext.getActive().isEmpty, "context exists from before") - sc = new SparkContext(conf) - var newContextCreated = false def creatingFunc(): StreamingContext = { @@ -603,6 +601,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL // getActiveOrCreate should create new context and getActive should return it only // after starting the context testGetActiveOrCreate { + sc = new SparkContext(conf) ssc = StreamingContext.getActiveOrCreate(creatingFunc _) assert(ssc != null, "no context created") assert(newContextCreated === true, "new context not created") @@ -622,6 +621,7 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with TimeL // getActiveOrCreate and getActive should return independently created context after activating testGetActiveOrCreate { + sc = new SparkContext(conf) ssc = creatingFunc() // Create assert(StreamingContext.getActive().isEmpty, "new initialized context returned before starting") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org