This is an automated email from the ASF dual-hosted git repository. zsxwing pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c212c9d [SPARK-28574][CORE] Allow to config different sizes for event queues c212c9d is described below commit c212c9d9ed7375cd1ea16c118733edd84037ec0d Author: yunzoud <yun....@databricks.com> AuthorDate: Fri Aug 2 15:27:33 2019 -0700 [SPARK-28574][CORE] Allow to config different sizes for event queues ## What changes were proposed in this pull request? Add configuration spark.scheduler.listenerbus.eventqueue.${name}.capacity to allow configuration of different event queue size. ## How was this patch tested? Unit test in core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala Closes #25307 from yunzoud/SPARK-28574. Authored-by: yunzoud <yun....@databricks.com> Signed-off-by: Shixiong Zhu <zsxw...@gmail.com> --- .../apache/spark/scheduler/AsyncEventQueue.scala | 14 +++++++++-- .../apache/spark/scheduler/LiveListenerBus.scala | 4 ++++ .../spark/scheduler/SparkListenerSuite.scala | 28 ++++++++++++++++++++++ 3 files changed, 44 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala index 7cd2b86..11e2c47 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala @@ -46,8 +46,18 @@ private class AsyncEventQueue( // Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if // it's perpetually being added to more quickly than it's being drained. - private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent]( - conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) + // The capacity can be configured by spark.scheduler.listenerbus.eventqueue.${name}.capacity, + // if no such conf is specified, use the value specified in + // LISTENER_BUS_EVENT_QUEUE_CAPACITY + private[scheduler] def capacity: Int = { + val queuesize = conf.getInt(s"spark.scheduler.listenerbus.eventqueue.${name}.capacity", + conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY)) + assert(queuesize > 0, s"capacity for event queue $name must be greater than 0, " + + s"but $queuesize is configured.") + queuesize + } + + private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](capacity) // Keep the event count separately, so that waitUntilEmpty() can be implemented properly; // this allows that method to return only when the events in the queue have been fully diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index d135190..302ebd3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -236,6 +236,10 @@ private[spark] class LiveListenerBus(conf: SparkConf) { queues.asScala.map(_.name).toSet } + // For testing only. + private[scheduler] def getQueueCapacity(name: String): Option[Int] = { + queues.asScala.find(_.name == name).map(_.capacity) + } } private[spark] object LiveListenerBus { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index a7869d3..8903e10 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -532,6 +532,34 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } } + test("event queue size can be configued through spark conf") { + // configure the shared queue size to be 1, event log queue size to be 2, + // and listner bus event queue size to be 5 + val conf = new SparkConf(false) + .set(LISTENER_BUS_EVENT_QUEUE_CAPACITY, 5) + .set(s"spark.scheduler.listenerbus.eventqueue.${SHARED_QUEUE}.capacity", "1") + .set(s"spark.scheduler.listenerbus.eventqueue.${EVENT_LOG_QUEUE}.capacity", "2") + + val bus = new LiveListenerBus(conf) + val counter1 = new BasicJobCounter() + val counter2 = new BasicJobCounter() + val counter3 = new BasicJobCounter() + + // add a new shared, status and event queue + bus.addToSharedQueue(counter1) + bus.addToStatusQueue(counter2) + bus.addToEventLogQueue(counter3) + + assert(bus.activeQueues() === Set(SHARED_QUEUE, APP_STATUS_QUEUE, EVENT_LOG_QUEUE)) + // check the size of shared queue is 1 as configured + assert(bus.getQueueCapacity(SHARED_QUEUE) == Some(1)) + // no specific size of status queue is configured, + // it shoud use the LISTENER_BUS_EVENT_QUEUE_CAPACITY + assert(bus.getQueueCapacity(APP_STATUS_QUEUE) == Some(5)) + // check the size of event log queue is 5 as configured + assert(bus.getQueueCapacity(EVENT_LOG_QUEUE) == Some(2)) + } + /** * Assert that the given list of numbers has an average that is greater than zero. */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org