[1/3] git commit: SPARK-1407 drain event queue before stopping event logger

2014-04-10 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-1.0 bde9cc11f - 8ca3b2bc9


SPARK-1407 drain event queue before stopping event logger

Author: Kan Zhang kzh...@apache.org

Closes #366 from kanzhang/SPARK-1407 and squashes the following commits:

cd0629f [Kan Zhang] code refactoring and adding test
b073ee6 [Kan Zhang] SPARK-1407 drain event queue before stopping event logger


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb5f2b64
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb5f2b64
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb5f2b64

Branch: refs/heads/branch-1.0
Commit: eb5f2b64230faa69a53815cb61bcc87aeb233d20
Parents: bde9cc1
Author: Kan Zhang kzh...@apache.org
Authored: Wed Apr 9 15:24:33 2014 -0700
Committer: Patrick Wendell pwend...@gmail.com
Committed: Wed Apr 9 15:25:29 2014 -0700

--
 .../scala/org/apache/spark/SparkContext.scala   |  4 +-
 .../spark/scheduler/LiveListenerBus.scala   | 32 --
 .../spark/scheduler/SparkListenerSuite.scala| 45 
 .../org/apache/spark/examples/SparkHdfsLR.scala |  2 +-
 4 files changed, 67 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/eb5f2b64/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f775051..7630523 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -931,7 +931,6 @@ class SparkContext(config: SparkConf) extends Logging {
   /** Shut down the SparkContext. */
   def stop() {
 ui.stop()
-eventLogger.foreach(_.stop())
 // Do this only if not stopped already - best case effort.
 // prevent NPE if stopped more than once.
 val dagSchedulerCopy = dagScheduler
@@ -940,13 +939,14 @@ class SparkContext(config: SparkConf) extends Logging {
   metadataCleaner.cancel()
   cleaner.foreach(_.stop())
   dagSchedulerCopy.stop()
-  listenerBus.stop()
   taskScheduler = null
   // TODO: Cache.stop()?
   env.stop()
   SparkEnv.set(null)
   ShuffleMapTask.clearCache()
   ResultTask.clearCache()
+  listenerBus.stop()
+  eventLogger.foreach(_.stop())
   logInfo(Successfully stopped SparkContext)
 } else {
   logInfo(SparkContext already stopped)

http://git-wip-us.apache.org/repos/asf/spark/blob/eb5f2b64/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 353a486..76f3e32 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -36,6 +36,22 @@ private[spark] class LiveListenerBus extends 
SparkListenerBus with Logging {
   private val eventQueue = new 
LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
   private var queueFullErrorMessageLogged = false
   private var started = false
+  private val listenerThread = new Thread(SparkListenerBus) {
+setDaemon(true)
+override def run() {
+  while (true) {
+val event = eventQueue.take
+if (event == SparkListenerShutdown) {
+  // Get out of the while loop and shutdown the daemon thread
+  return
+}
+postToAll(event)
+  }
+}
+  }
+
+  // Exposed for testing
+  @volatile private[spark] var stopCalled = false
 
   /**
* Start sending events to attached listeners.
@@ -48,20 +64,8 @@ private[spark] class LiveListenerBus extends 
SparkListenerBus with Logging {
 if (started) {
   throw new IllegalStateException(Listener bus already started!)
 }
+listenerThread.start()
 started = true
-new Thread(SparkListenerBus) {
-  setDaemon(true)
-  override def run() {
-while (true) {
-  val event = eventQueue.take
-  if (event == SparkListenerShutdown) {
-// Get out of the while loop and shutdown the daemon thread
-return
-  }
-  postToAll(event)
-}
-  }
-}.start()
   }
 
   def post(event: SparkListenerEvent) {
@@ -93,9 +97,11 @@ private[spark] class LiveListenerBus extends 
SparkListenerBus with Logging {
   }
 
   def stop() {
+stopCalled = true
 if (!started) {
   throw new IllegalStateException(Attempted to stop a listener bus that 
has not yet started!)
 }
 post(SparkListenerShutdown)
+listenerThread.join()

git commit: SPARK-1407 drain event queue before stopping event logger

2014-04-09 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/master bde9cc11f - eb5f2b642


SPARK-1407 drain event queue before stopping event logger

Author: Kan Zhang kzh...@apache.org

Closes #366 from kanzhang/SPARK-1407 and squashes the following commits:

cd0629f [Kan Zhang] code refactoring and adding test
b073ee6 [Kan Zhang] SPARK-1407 drain event queue before stopping event logger


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb5f2b64
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb5f2b64
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb5f2b64

Branch: refs/heads/master
Commit: eb5f2b64230faa69a53815cb61bcc87aeb233d20
Parents: bde9cc1
Author: Kan Zhang kzh...@apache.org
Authored: Wed Apr 9 15:24:33 2014 -0700
Committer: Patrick Wendell pwend...@gmail.com
Committed: Wed Apr 9 15:25:29 2014 -0700

--
 .../scala/org/apache/spark/SparkContext.scala   |  4 +-
 .../spark/scheduler/LiveListenerBus.scala   | 32 --
 .../spark/scheduler/SparkListenerSuite.scala| 45 
 .../org/apache/spark/examples/SparkHdfsLR.scala |  2 +-
 4 files changed, 67 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/eb5f2b64/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index f775051..7630523 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -931,7 +931,6 @@ class SparkContext(config: SparkConf) extends Logging {
   /** Shut down the SparkContext. */
   def stop() {
 ui.stop()
-eventLogger.foreach(_.stop())
 // Do this only if not stopped already - best case effort.
 // prevent NPE if stopped more than once.
 val dagSchedulerCopy = dagScheduler
@@ -940,13 +939,14 @@ class SparkContext(config: SparkConf) extends Logging {
   metadataCleaner.cancel()
   cleaner.foreach(_.stop())
   dagSchedulerCopy.stop()
-  listenerBus.stop()
   taskScheduler = null
   // TODO: Cache.stop()?
   env.stop()
   SparkEnv.set(null)
   ShuffleMapTask.clearCache()
   ResultTask.clearCache()
+  listenerBus.stop()
+  eventLogger.foreach(_.stop())
   logInfo(Successfully stopped SparkContext)
 } else {
   logInfo(SparkContext already stopped)

http://git-wip-us.apache.org/repos/asf/spark/blob/eb5f2b64/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
index 353a486..76f3e32 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala
@@ -36,6 +36,22 @@ private[spark] class LiveListenerBus extends 
SparkListenerBus with Logging {
   private val eventQueue = new 
LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
   private var queueFullErrorMessageLogged = false
   private var started = false
+  private val listenerThread = new Thread(SparkListenerBus) {
+setDaemon(true)
+override def run() {
+  while (true) {
+val event = eventQueue.take
+if (event == SparkListenerShutdown) {
+  // Get out of the while loop and shutdown the daemon thread
+  return
+}
+postToAll(event)
+  }
+}
+  }
+
+  // Exposed for testing
+  @volatile private[spark] var stopCalled = false
 
   /**
* Start sending events to attached listeners.
@@ -48,20 +64,8 @@ private[spark] class LiveListenerBus extends 
SparkListenerBus with Logging {
 if (started) {
   throw new IllegalStateException(Listener bus already started!)
 }
+listenerThread.start()
 started = true
-new Thread(SparkListenerBus) {
-  setDaemon(true)
-  override def run() {
-while (true) {
-  val event = eventQueue.take
-  if (event == SparkListenerShutdown) {
-// Get out of the while loop and shutdown the daemon thread
-return
-  }
-  postToAll(event)
-}
-  }
-}.start()
   }
 
   def post(event: SparkListenerEvent) {
@@ -93,9 +97,11 @@ private[spark] class LiveListenerBus extends 
SparkListenerBus with Logging {
   }
 
   def stop() {
+stopCalled = true
 if (!started) {
   throw new IllegalStateException(Attempted to stop a listener bus that 
has not yet started!)
 }
 post(SparkListenerShutdown)
+listenerThread.join()
   }
 }