[1/3] git commit: SPARK-1407 drain event queue before stopping event logger
Repository: spark Updated Branches: refs/heads/branch-1.0 bde9cc11f - 8ca3b2bc9 SPARK-1407 drain event queue before stopping event logger Author: Kan Zhang kzh...@apache.org Closes #366 from kanzhang/SPARK-1407 and squashes the following commits: cd0629f [Kan Zhang] code refactoring and adding test b073ee6 [Kan Zhang] SPARK-1407 drain event queue before stopping event logger Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb5f2b64 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb5f2b64 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb5f2b64 Branch: refs/heads/branch-1.0 Commit: eb5f2b64230faa69a53815cb61bcc87aeb233d20 Parents: bde9cc1 Author: Kan Zhang kzh...@apache.org Authored: Wed Apr 9 15:24:33 2014 -0700 Committer: Patrick Wendell pwend...@gmail.com Committed: Wed Apr 9 15:25:29 2014 -0700 -- .../scala/org/apache/spark/SparkContext.scala | 4 +- .../spark/scheduler/LiveListenerBus.scala | 32 -- .../spark/scheduler/SparkListenerSuite.scala| 45 .../org/apache/spark/examples/SparkHdfsLR.scala | 2 +- 4 files changed, 67 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/eb5f2b64/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f775051..7630523 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -931,7 +931,6 @@ class SparkContext(config: SparkConf) extends Logging { /** Shut down the SparkContext. */ def stop() { ui.stop() -eventLogger.foreach(_.stop()) // Do this only if not stopped already - best case effort. // prevent NPE if stopped more than once. val dagSchedulerCopy = dagScheduler @@ -940,13 +939,14 @@ class SparkContext(config: SparkConf) extends Logging { metadataCleaner.cancel() cleaner.foreach(_.stop()) dagSchedulerCopy.stop() - listenerBus.stop() taskScheduler = null // TODO: Cache.stop()? env.stop() SparkEnv.set(null) ShuffleMapTask.clearCache() ResultTask.clearCache() + listenerBus.stop() + eventLogger.foreach(_.stop()) logInfo(Successfully stopped SparkContext) } else { logInfo(SparkContext already stopped) http://git-wip-us.apache.org/repos/asf/spark/blob/eb5f2b64/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 353a486..76f3e32 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -36,6 +36,22 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) private var queueFullErrorMessageLogged = false private var started = false + private val listenerThread = new Thread(SparkListenerBus) { +setDaemon(true) +override def run() { + while (true) { +val event = eventQueue.take +if (event == SparkListenerShutdown) { + // Get out of the while loop and shutdown the daemon thread + return +} +postToAll(event) + } +} + } + + // Exposed for testing + @volatile private[spark] var stopCalled = false /** * Start sending events to attached listeners. @@ -48,20 +64,8 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { if (started) { throw new IllegalStateException(Listener bus already started!) } +listenerThread.start() started = true -new Thread(SparkListenerBus) { - setDaemon(true) - override def run() { -while (true) { - val event = eventQueue.take - if (event == SparkListenerShutdown) { -// Get out of the while loop and shutdown the daemon thread -return - } - postToAll(event) -} - } -}.start() } def post(event: SparkListenerEvent) { @@ -93,9 +97,11 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { } def stop() { +stopCalled = true if (!started) { throw new IllegalStateException(Attempted to stop a listener bus that has not yet started!) } post(SparkListenerShutdown) +listenerThread.join()
git commit: SPARK-1407 drain event queue before stopping event logger
Repository: spark Updated Branches: refs/heads/master bde9cc11f - eb5f2b642 SPARK-1407 drain event queue before stopping event logger Author: Kan Zhang kzh...@apache.org Closes #366 from kanzhang/SPARK-1407 and squashes the following commits: cd0629f [Kan Zhang] code refactoring and adding test b073ee6 [Kan Zhang] SPARK-1407 drain event queue before stopping event logger Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb5f2b64 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb5f2b64 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb5f2b64 Branch: refs/heads/master Commit: eb5f2b64230faa69a53815cb61bcc87aeb233d20 Parents: bde9cc1 Author: Kan Zhang kzh...@apache.org Authored: Wed Apr 9 15:24:33 2014 -0700 Committer: Patrick Wendell pwend...@gmail.com Committed: Wed Apr 9 15:25:29 2014 -0700 -- .../scala/org/apache/spark/SparkContext.scala | 4 +- .../spark/scheduler/LiveListenerBus.scala | 32 -- .../spark/scheduler/SparkListenerSuite.scala| 45 .../org/apache/spark/examples/SparkHdfsLR.scala | 2 +- 4 files changed, 67 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/eb5f2b64/core/src/main/scala/org/apache/spark/SparkContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index f775051..7630523 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -931,7 +931,6 @@ class SparkContext(config: SparkConf) extends Logging { /** Shut down the SparkContext. */ def stop() { ui.stop() -eventLogger.foreach(_.stop()) // Do this only if not stopped already - best case effort. // prevent NPE if stopped more than once. val dagSchedulerCopy = dagScheduler @@ -940,13 +939,14 @@ class SparkContext(config: SparkConf) extends Logging { metadataCleaner.cancel() cleaner.foreach(_.stop()) dagSchedulerCopy.stop() - listenerBus.stop() taskScheduler = null // TODO: Cache.stop()? env.stop() SparkEnv.set(null) ShuffleMapTask.clearCache() ResultTask.clearCache() + listenerBus.stop() + eventLogger.foreach(_.stop()) logInfo(Successfully stopped SparkContext) } else { logInfo(SparkContext already stopped) http://git-wip-us.apache.org/repos/asf/spark/blob/eb5f2b64/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala -- diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 353a486..76f3e32 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -36,6 +36,22 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) private var queueFullErrorMessageLogged = false private var started = false + private val listenerThread = new Thread(SparkListenerBus) { +setDaemon(true) +override def run() { + while (true) { +val event = eventQueue.take +if (event == SparkListenerShutdown) { + // Get out of the while loop and shutdown the daemon thread + return +} +postToAll(event) + } +} + } + + // Exposed for testing + @volatile private[spark] var stopCalled = false /** * Start sending events to attached listeners. @@ -48,20 +64,8 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { if (started) { throw new IllegalStateException(Listener bus already started!) } +listenerThread.start() started = true -new Thread(SparkListenerBus) { - setDaemon(true) - override def run() { -while (true) { - val event = eventQueue.take - if (event == SparkListenerShutdown) { -// Get out of the while loop and shutdown the daemon thread -return - } - postToAll(event) -} - } -}.start() } def post(event: SparkListenerEvent) { @@ -93,9 +97,11 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { } def stop() { +stopCalled = true if (!started) { throw new IllegalStateException(Attempted to stop a listener bus that has not yet started!) } post(SparkListenerShutdown) +listenerThread.join() } }