Repository: spark
Updated Branches:
  refs/heads/master 0869b3a5f -> 0b71d9ae0


[SPARK-15703][SCHEDULER][CORE][WEBUI] Make ListenerBus event queue size 
configurable

## What changes were proposed in this pull request?
This change adds a new configuration entry to specify the size of the spark 
listener bus event queue. The value for this config 
("spark.scheduler.listenerbus.eventqueue.size") is set to a default to 10000.

Note:
I haven't currently documented the configuration entry. We can decide whether 
it would be appropriate to make it a public configuration or keep it as an 
undocumented one. Refer JIRA for more details.

## How was this patch tested?
Ran existing jobs and verified the event queue size with debug logs and from 
the Spark WebUI Environment tab.

Author: Dhruve Ashar <dhruveas...@gmail.com>

Closes #14269 from dhruve/bug/SPARK-15703.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b71d9ae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b71d9ae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b71d9ae

Branch: refs/heads/master
Commit: 0b71d9ae0804b0394e4abd02c7cebf52a9102216
Parents: 0869b3a
Author: Dhruve Ashar <dhruveas...@gmail.com>
Authored: Tue Jul 26 13:23:33 2016 -0500
Committer: Tom Graves <tgra...@yahoo-inc.com>
Committed: Tue Jul 26 13:23:33 2016 -0500

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  4 +--
 .../apache/spark/internal/config/package.scala  |  5 ++++
 .../spark/scheduler/LiveListenerBus.scala       | 23 +++++++++------
 .../scheduler/EventLoggingListenerSuite.scala   |  4 +--
 .../spark/scheduler/SparkListenerSuite.scala    | 30 +++++++++++---------
 .../storage/BlockManagerReplicationSuite.scala  |  9 ++++--
 .../spark/storage/BlockManagerSuite.scala       |  6 ++--
 .../spark/ui/storage/StorageTabSuite.scala      | 11 +++----
 .../streaming/ReceivedBlockHandlerSuite.scala   |  5 +++-
 9 files changed, 60 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0b71d9ae/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 6d7f05d..d48e2b4 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -249,7 +249,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
   def isStopped: Boolean = stopped.get()
 
   // An asynchronous listener bus for Spark events
-  private[spark] val listenerBus = new LiveListenerBus
+  private[spark] val listenerBus = new LiveListenerBus(this)
 
   // This function allows components created by SparkEnv to be mocked in unit 
tests:
   private[spark] def createSparkEnv(
@@ -2148,7 +2148,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
         }
     }
 
-    listenerBus.start(this)
+    listenerBus.start()
     _listenerBusStarted = true
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0b71d9ae/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 05dd683..ebb21e9 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -103,4 +103,9 @@ package object config {
     .stringConf
     .checkValues(Set("hive", "in-memory"))
     .createWithDefault("in-memory")
+
+  private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
+    ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
+      .intConf
+      .createWithDefault(10000)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b71d9ae/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 1c21313..bfa3c40 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -22,7 +22,8 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.util.DynamicVariable
 
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkContext, SparkException}
+import org.apache.spark.internal.config._
 import org.apache.spark.util.Utils
 
 /**
@@ -32,18 +33,24 @@ import org.apache.spark.util.Utils
  * has started will events be actually propagated to all attached listeners. 
This listener bus
  * is stopped when `stop()` is called, and it will drop further events after 
stopping.
  */
-private[spark] class LiveListenerBus extends SparkListenerBus {
+private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends 
SparkListenerBus {
 
   self =>
 
   import LiveListenerBus._
 
-  private var sparkContext: SparkContext = null
-
   // Cap the capacity of the event queue so we get an explicit error (rather 
than
   // an OOM exception) if it's perpetually being added to more quickly than 
it's being drained.
-  private val EVENT_QUEUE_CAPACITY = 10000
-  private val eventQueue = new 
LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
+  private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize()
+  private lazy val eventQueue = new 
LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
+
+  private def validateAndGetQueueSize(): Int = {
+    val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE)
+    if (queueSize <= 0) {
+      throw new SparkException("spark.scheduler.listenerbus.eventqueue.size 
must be > 0!")
+    }
+    queueSize
+  }
 
   // Indicate if `start()` is called
   private val started = new AtomicBoolean(false)
@@ -96,11 +103,9 @@ private[spark] class LiveListenerBus extends 
SparkListenerBus {
    * listens for any additional events asynchronously while the listener bus 
is still running.
    * This should only be called once.
    *
-   * @param sc Used to stop the SparkContext in case the listener thread dies.
    */
-  def start(sc: SparkContext): Unit = {
+  def start(): Unit = {
     if (started.compareAndSet(false, true)) {
-      sparkContext = sc
       listenerThread.start()
     } else {
       throw new IllegalStateException(s"$name already started!")

http://git-wip-us.apache.org/repos/asf/spark/blob/0b71d9ae/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index c4c80b5..7f48592 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -142,14 +142,14 @@ class EventLoggingListenerSuite extends SparkFunSuite 
with LocalSparkContext wit
     extraConf.foreach { case (k, v) => conf.set(k, v) }
     val logName = compressionCodec.map("test-" + _).getOrElse("test")
     val eventLogger = new EventLoggingListener(logName, None, 
testDirPath.toUri(), conf)
-    val listenerBus = new LiveListenerBus
+    val listenerBus = new LiveListenerBus(sc)
     val applicationStart = SparkListenerApplicationStart("Greatest App 
(N)ever", None,
       125L, "Mickey", None)
     val applicationEnd = SparkListenerApplicationEnd(1000L)
 
     // A comprehensive test on JSON de/serialization of all events is in 
JsonProtocolSuite
     eventLogger.start()
-    listenerBus.start(sc)
+    listenerBus.start()
     listenerBus.addListener(eventLogger)
     listenerBus.postToAll(applicationStart)
     listenerBus.postToAll(applicationEnd)

http://git-wip-us.apache.org/repos/asf/spark/blob/0b71d9ae/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index 5ba67af..e8a88d4 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -37,13 +37,13 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
   val jobCompletionTime = 1421191296660L
 
   test("don't call sc.stop in listener") {
-    sc = new SparkContext("local", "SparkListenerSuite")
+    sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
     val listener = new SparkContextStoppingListener(sc)
-    val bus = new LiveListenerBus
+    val bus = new LiveListenerBus(sc)
     bus.addListener(listener)
 
     // Starting listener bus should flush all buffered events
-    bus.start(sc)
+    bus.start()
     bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
     bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
 
@@ -52,8 +52,9 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
   }
 
   test("basic creation and shutdown of LiveListenerBus") {
+    sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
     val counter = new BasicJobCounter
-    val bus = new LiveListenerBus
+    val bus = new LiveListenerBus(sc)
     bus.addListener(counter)
 
     // Listener bus hasn't started yet, so posting events should not increment 
counter
@@ -61,7 +62,7 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
     assert(counter.count === 0)
 
     // Starting listener bus should flush all buffered events
-    bus.start(sc)
+    bus.start()
     bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
     assert(counter.count === 5)
 
@@ -72,14 +73,14 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
 
     // Listener bus must not be started twice
     intercept[IllegalStateException] {
-      val bus = new LiveListenerBus
-      bus.start(sc)
-      bus.start(sc)
+      val bus = new LiveListenerBus(sc)
+      bus.start()
+      bus.start()
     }
 
     // ... or stopped before starting
     intercept[IllegalStateException] {
-      val bus = new LiveListenerBus
+      val bus = new LiveListenerBus(sc)
       bus.stop()
     }
   }
@@ -106,12 +107,12 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
         drained = true
       }
     }
-
-    val bus = new LiveListenerBus
+    sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
+    val bus = new LiveListenerBus(sc)
     val blockingListener = new BlockingListener
 
     bus.addListener(blockingListener)
-    bus.start(sc)
+    bus.start()
     bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
 
     listenerStarted.acquire()
@@ -353,13 +354,14 @@ class SparkListenerSuite extends SparkFunSuite with 
LocalSparkContext with Match
     val badListener = new BadListener
     val jobCounter1 = new BasicJobCounter
     val jobCounter2 = new BasicJobCounter
-    val bus = new LiveListenerBus
+    sc = new SparkContext("local", "SparkListenerSuite", new SparkConf())
+    val bus = new LiveListenerBus(sc)
 
     // Propagate events to bad listener first
     bus.addListener(badListener)
     bus.addListener(jobCounter1)
     bus.addListener(jobCounter2)
-    bus.start(sc)
+    bus.start()
 
     // Post events to all listeners, and wait until the queue is drained
     (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, 
JobSucceeded)) }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b71d9ae/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index 31687e6..b9e3a36 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -38,7 +38,10 @@ import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.storage.StorageLevel._
 
 /** Testsuite that tests block replication in BlockManager */
-class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with 
BeforeAndAfter {
+class BlockManagerReplicationSuite extends SparkFunSuite
+    with Matchers
+    with BeforeAndAfter
+    with LocalSparkContext {
 
   private val conf = new SparkConf(false).set("spark.app.id", "test")
   private var rpcEnv: RpcEnv = null
@@ -91,8 +94,10 @@ class BlockManagerReplicationSuite extends SparkFunSuite 
with Matchers with Befo
     // to make cached peers refresh frequently
     conf.set("spark.storage.cachedPeersTtl", "10")
 
+    sc = new SparkContext("local", "test", conf)
     master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
-      new BlockManagerMasterEndpoint(rpcEnv, true, conf, new 
LiveListenerBus)), conf, true)
+      new BlockManagerMasterEndpoint(rpcEnv, true, conf,
+        new LiveListenerBus(sc))), conf, true)
     allStores.clear()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0b71d9ae/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 8077a1b..87c8628 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -49,7 +49,7 @@ import org.apache.spark.util._
 import org.apache.spark.util.io.ChunkedByteBuffer
 
 class BlockManagerSuite extends SparkFunSuite with Matchers with 
BeforeAndAfterEach
-  with PrivateMethodTester with ResetSystemProperties {
+  with PrivateMethodTester with LocalSparkContext with ResetSystemProperties {
 
   import BlockManagerSuite._
 
@@ -107,8 +107,10 @@ class BlockManagerSuite extends SparkFunSuite with 
Matchers with BeforeAndAfterE
     rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
     conf.set("spark.driver.port", rpcEnv.address.port.toString)
 
+    sc = new SparkContext("local", "test", conf)
     master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
-      new BlockManagerMasterEndpoint(rpcEnv, true, conf, new 
LiveListenerBus)), conf, true)
+      new BlockManagerMasterEndpoint(rpcEnv, true, conf,
+        new LiveListenerBus(sc))), conf, true)
 
     val initialize = PrivateMethod[Unit]('initialize)
     SizeEstimator invokePrivate initialize()

http://git-wip-us.apache.org/repos/asf/spark/blob/0b71d9ae/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala 
b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index 411a0dd..f6c8418 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -19,15 +19,14 @@ package org.apache.spark.ui.storage
 
 import org.scalatest.BeforeAndAfter
 
-import org.apache.spark.{SparkConf, SparkFunSuite, Success}
-import org.apache.spark.executor.TaskMetrics
+import org.apache.spark._
 import org.apache.spark.scheduler._
 import org.apache.spark.storage._
 
 /**
  * Test various functionality in the StorageListener that supports the 
StorageTab.
  */
-class StorageTabSuite extends SparkFunSuite with BeforeAndAfter {
+class StorageTabSuite extends SparkFunSuite with LocalSparkContext with 
BeforeAndAfter {
   private var bus: LiveListenerBus = _
   private var storageStatusListener: StorageStatusListener = _
   private var storageListener: StorageListener = _
@@ -43,8 +42,10 @@ class StorageTabSuite extends SparkFunSuite with 
BeforeAndAfter {
   private val bm1 = BlockManagerId("big", "dog", 1)
 
   before {
-    bus = new LiveListenerBus
-    storageStatusListener = new StorageStatusListener(new SparkConf())
+    val conf = new SparkConf()
+    sc = new SparkContext("local", "test", conf)
+    bus = new LiveListenerBus(sc)
+    storageStatusListener = new StorageStatusListener(conf)
     storageListener = new StorageListener(storageStatusListener)
     bus.addListener(storageStatusListener)
     bus.addListener(storageListener)

http://git-wip-us.apache.org/repos/asf/spark/blob/0b71d9ae/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index e974279..feb5c30 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -47,6 +47,7 @@ class ReceivedBlockHandlerSuite
   extends SparkFunSuite
   with BeforeAndAfter
   with Matchers
+  with LocalSparkContext
   with Logging {
 
   import WriteAheadLogBasedBlockHandler._
@@ -77,8 +78,10 @@ class ReceivedBlockHandlerSuite
     rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
     conf.set("spark.driver.port", rpcEnv.address.port.toString)
 
+    sc = new SparkContext("local", "test", conf)
     blockManagerMaster = new 
BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
-      new BlockManagerMasterEndpoint(rpcEnv, true, conf, new 
LiveListenerBus)), conf, true)
+      new BlockManagerMasterEndpoint(rpcEnv, true, conf,
+        new LiveListenerBus(sc))), conf, true)
 
     storageLevel = StorageLevel.MEMORY_ONLY_SER
     blockManager = createBlockManager(blockManagerSize, conf)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to