Repository: spark
Updated Branches:
  refs/heads/branch-1.6 10272d5c9 -> 93ac30741


[SPARK-11572] Exit AsynchronousListenerBus thread when stop() is called

As vonnagy reported in the following thread:
http://search-hadoop.com/m/q3RTtk982kvIow22

Attempts to join the thread in AsynchronousListenerBus resulted in lock up 
because AsynchronousListenerBus thread was still getting messages 
`SparkListenerExecutorMetricsUpdate` from the DAGScheduler

Author: tedyu <yuzhih...@gmail.com>

Closes #9546 from ted-yu/master.

(cherry picked from commit 3e0a6cf1e02a19b37c68d3026415d53bb57a576b)
Signed-off-by: Andrew Or <and...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93ac3074
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93ac3074
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93ac3074

Branch: refs/heads/branch-1.6
Commit: 93ac30741cd0ed99512f01525fbda8a08c87967a
Parents: 10272d5
Author: tedyu <yuzhih...@gmail.com>
Authored: Tue Nov 10 16:51:25 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Tue Nov 10 16:51:33 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/util/AsynchronousListenerBus.scala     | 9 +++------
 1 file changed, 3 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/93ac3074/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala 
b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
index 61b5a4c..b8481ea 100644
--- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala
@@ -67,15 +67,12 @@ private[spark] abstract class AsynchronousListenerBus[L <: 
AnyRef, E](name: Stri
           processingEvent = true
         }
         try {
-          val event = eventQueue.poll
-          if (event == null) {
+          if (stopped.get()) {
             // Get out of the while loop and shutdown the daemon thread
-            if (!stopped.get) {
-              throw new IllegalStateException("Polling `null` from eventQueue 
means" +
-                " the listener bus has been stopped. So `stopped` must be 
true")
-            }
             return
           }
+          val event = eventQueue.poll
+          assert(event != null, "event queue was empty but the listener bus 
was not stopped")
           postToAll(event)
         } finally {
           self.synchronized {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to