Repository: spark Updated Branches: refs/heads/branch-2.0 5bc5b49fa -> 9e91a1009
[SPARK-15703][SCHEDULER][CORE][WEBUI] Make ListenerBus event queue size configurable (branch 2.0) ## What changes were proposed in this pull request? Backport #14269 to 2.0. ## How was this patch tested? Jenkins. Author: Dhruve Ashar <dhruveas...@gmail.com> Closes #15222 from zsxwing/SPARK-15703-2.0. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e91a100 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e91a100 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e91a100 Branch: refs/heads/branch-2.0 Commit: 9e91a1009e6f916245b4d4018de1664ea3decfe7 Parents: 5bc5b49 Author: Dhruve Ashar <dhruveas...@gmail.com> Authored: Fri Sep 23 14:59:53 2016 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Fri Sep 23 14:59:53 2016 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 4 +-- .../apache/spark/internal/config/package.scala | 5 ++++ .../spark/scheduler/LiveListenerBus.scala | 23 +++++++++------ .../scheduler/EventLoggingListenerSuite.scala | 4 +-- .../spark/scheduler/SparkListenerSuite.scala | 30 +++++++++++--------- .../storage/BlockManagerReplicationSuite.scala | 9 ++++-- .../spark/storage/BlockManagerSuite.scala | 6 ++-- .../spark/ui/storage/StorageTabSuite.scala | 11 +++---- .../streaming/ReceivedBlockHandlerSuite.scala | 5 +++- 9 files changed, 60 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 251c16f..ffd1227 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -249,7 +249,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def isStopped: Boolean = stopped.get() // An asynchronous listener bus for Spark events - private[spark] val listenerBus = new LiveListenerBus + private[spark] val listenerBus = new LiveListenerBus(this) // This function allows components created by SparkEnv to be mocked in unit tests: private[spark] def createSparkEnv( @@ -2154,7 +2154,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } } - listenerBus.start(this) + listenerBus.start() _listenerBusStarted = true } http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/main/scala/org/apache/spark/internal/config/package.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index f28a9a5..29f812a 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -112,4 +112,9 @@ package object config { // To limit how many applications are shown in the History Server summary ui private[spark] val HISTORY_UI_MAX_APPS = ConfigBuilder("spark.history.ui.maxApplications").intConf.createWithDefault(Integer.MAX_VALUE) + + private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE = + ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size") + .intConf + .createWithDefault(10000) } http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 1c21313..bfa3c40 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -22,7 +22,8 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.util.DynamicVariable -import org.apache.spark.SparkContext +import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.internal.config._ import org.apache.spark.util.Utils /** @@ -32,18 +33,24 @@ import org.apache.spark.util.Utils * has started will events be actually propagated to all attached listeners. This listener bus * is stopped when `stop()` is called, and it will drop further events after stopping. */ -private[spark] class LiveListenerBus extends SparkListenerBus { +private[spark] class LiveListenerBus(val sparkContext: SparkContext) extends SparkListenerBus { self => import LiveListenerBus._ - private var sparkContext: SparkContext = null - // Cap the capacity of the event queue so we get an explicit error (rather than // an OOM exception) if it's perpetually being added to more quickly than it's being drained. - private val EVENT_QUEUE_CAPACITY = 10000 - private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) + private lazy val EVENT_QUEUE_CAPACITY = validateAndGetQueueSize() + private lazy val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) + + private def validateAndGetQueueSize(): Int = { + val queueSize = sparkContext.conf.get(LISTENER_BUS_EVENT_QUEUE_SIZE) + if (queueSize <= 0) { + throw new SparkException("spark.scheduler.listenerbus.eventqueue.size must be > 0!") + } + queueSize + } // Indicate if `start()` is called private val started = new AtomicBoolean(false) @@ -96,11 +103,9 @@ private[spark] class LiveListenerBus extends SparkListenerBus { * listens for any additional events asynchronously while the listener bus is still running. * This should only be called once. * - * @param sc Used to stop the SparkContext in case the listener thread dies. */ - def start(sc: SparkContext): Unit = { + def start(): Unit = { if (started.compareAndSet(false, true)) { - sparkContext = sc listenerThread.start() } else { throw new IllegalStateException(s"$name already started!") http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index c4c80b5..7f48592 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -142,14 +142,14 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit extraConf.foreach { case (k, v) => conf.set(k, v) } val logName = compressionCodec.map("test-" + _).getOrElse("test") val eventLogger = new EventLoggingListener(logName, None, testDirPath.toUri(), conf) - val listenerBus = new LiveListenerBus + val listenerBus = new LiveListenerBus(sc) val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None, 125L, "Mickey", None) val applicationEnd = SparkListenerApplicationEnd(1000L) // A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite eventLogger.start() - listenerBus.start(sc) + listenerBus.start() listenerBus.addListener(eventLogger) listenerBus.postToAll(applicationStart) listenerBus.postToAll(applicationEnd) http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 5ba67af..e8a88d4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -37,13 +37,13 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val jobCompletionTime = 1421191296660L test("don't call sc.stop in listener") { - sc = new SparkContext("local", "SparkListenerSuite") + sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) val listener = new SparkContextStoppingListener(sc) - val bus = new LiveListenerBus + val bus = new LiveListenerBus(sc) bus.addListener(listener) // Starting listener bus should flush all buffered events - bus.start(sc) + bus.start() bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) @@ -52,8 +52,9 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match } test("basic creation and shutdown of LiveListenerBus") { + sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) val counter = new BasicJobCounter - val bus = new LiveListenerBus + val bus = new LiveListenerBus(sc) bus.addListener(counter) // Listener bus hasn't started yet, so posting events should not increment counter @@ -61,7 +62,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match assert(counter.count === 0) // Starting listener bus should flush all buffered events - bus.start(sc) + bus.start() bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS) assert(counter.count === 5) @@ -72,14 +73,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match // Listener bus must not be started twice intercept[IllegalStateException] { - val bus = new LiveListenerBus - bus.start(sc) - bus.start(sc) + val bus = new LiveListenerBus(sc) + bus.start() + bus.start() } // ... or stopped before starting intercept[IllegalStateException] { - val bus = new LiveListenerBus + val bus = new LiveListenerBus(sc) bus.stop() } } @@ -106,12 +107,12 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match drained = true } } - - val bus = new LiveListenerBus + sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) + val bus = new LiveListenerBus(sc) val blockingListener = new BlockingListener bus.addListener(blockingListener) - bus.start(sc) + bus.start() bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) listenerStarted.acquire() @@ -353,13 +354,14 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match val badListener = new BadListener val jobCounter1 = new BasicJobCounter val jobCounter2 = new BasicJobCounter - val bus = new LiveListenerBus + sc = new SparkContext("local", "SparkListenerSuite", new SparkConf()) + val bus = new LiveListenerBus(sc) // Propagate events to bad listener first bus.addListener(badListener) bus.addListener(jobCounter1) bus.addListener(jobCounter2) - bus.start(sc) + bus.start() // Post events to all listeners, and wait until the queue is drained (1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) } http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 31687e6..b9e3a36 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -38,7 +38,10 @@ import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.storage.StorageLevel._ /** Testsuite that tests block replication in BlockManager */ -class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with BeforeAndAfter { +class BlockManagerReplicationSuite extends SparkFunSuite + with Matchers + with BeforeAndAfter + with LocalSparkContext { private val conf = new SparkConf(false).set("spark.app.id", "test") private var rpcEnv: RpcEnv = null @@ -91,8 +94,10 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo // to make cached peers refresh frequently conf.set("spark.storage.cachedPeersTtl", "10") + sc = new SparkContext("local", "test", conf) master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", - new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true) + new BlockManagerMasterEndpoint(rpcEnv, true, conf, + new LiveListenerBus(sc))), conf, true) allStores.clear() } http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 6194d23..e93eee2 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -49,7 +49,7 @@ import org.apache.spark.util._ import org.apache.spark.util.io.ChunkedByteBuffer class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterEach - with PrivateMethodTester with ResetSystemProperties { + with PrivateMethodTester with LocalSparkContext with ResetSystemProperties { import BlockManagerSuite._ @@ -107,8 +107,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) conf.set("spark.driver.port", rpcEnv.address.port.toString) + sc = new SparkContext("local", "test", conf) master = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", - new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true) + new BlockManagerMasterEndpoint(rpcEnv, true, conf, + new LiveListenerBus(sc))), conf, true) val initialize = PrivateMethod[Unit]('initialize) SizeEstimator invokePrivate initialize() http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index 411a0dd..f6c8418 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -19,15 +19,14 @@ package org.apache.spark.ui.storage import org.scalatest.BeforeAndAfter -import org.apache.spark.{SparkConf, SparkFunSuite, Success} -import org.apache.spark.executor.TaskMetrics +import org.apache.spark._ import org.apache.spark.scheduler._ import org.apache.spark.storage._ /** * Test various functionality in the StorageListener that supports the StorageTab. */ -class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { +class StorageTabSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter { private var bus: LiveListenerBus = _ private var storageStatusListener: StorageStatusListener = _ private var storageListener: StorageListener = _ @@ -43,8 +42,10 @@ class StorageTabSuite extends SparkFunSuite with BeforeAndAfter { private val bm1 = BlockManagerId("big", "dog", 1) before { - bus = new LiveListenerBus - storageStatusListener = new StorageStatusListener(new SparkConf()) + val conf = new SparkConf() + sc = new SparkContext("local", "test", conf) + bus = new LiveListenerBus(sc) + storageStatusListener = new StorageStatusListener(conf) storageListener = new StorageListener(storageStatusListener) bus.addListener(storageStatusListener) bus.addListener(storageListener) http://git-wip-us.apache.org/repos/asf/spark/blob/9e91a100/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 5d36292..7e66545 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -48,6 +48,7 @@ class ReceivedBlockHandlerSuite extends SparkFunSuite with BeforeAndAfter with Matchers + with LocalSparkContext with Logging { import WriteAheadLogBasedBlockHandler._ @@ -78,8 +79,10 @@ class ReceivedBlockHandlerSuite rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) conf.set("spark.driver.port", rpcEnv.address.port.toString) + sc = new SparkContext("local", "test", conf) blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", - new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true) + new BlockManagerMasterEndpoint(rpcEnv, true, conf, + new LiveListenerBus(sc))), conf, true) storageLevel = StorageLevel.MEMORY_ONLY_SER blockManager = createBlockManager(blockManagerSize, conf) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org