Repository: spark Updated Branches: refs/heads/branch-1.6 10272d5c9 -> 93ac30741
[SPARK-11572] Exit AsynchronousListenerBus thread when stop() is called As vonnagy reported in the following thread: http://search-hadoop.com/m/q3RTtk982kvIow22 Attempts to join the thread in AsynchronousListenerBus resulted in lock up because AsynchronousListenerBus thread was still getting messages `SparkListenerExecutorMetricsUpdate` from the DAGScheduler Author: tedyu <yuzhih...@gmail.com> Closes #9546 from ted-yu/master. (cherry picked from commit 3e0a6cf1e02a19b37c68d3026415d53bb57a576b) Signed-off-by: Andrew Or <and...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/93ac3074 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/93ac3074 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/93ac3074 Branch: refs/heads/branch-1.6 Commit: 93ac30741cd0ed99512f01525fbda8a08c87967a Parents: 10272d5 Author: tedyu <yuzhih...@gmail.com> Authored: Tue Nov 10 16:51:25 2015 -0800 Committer: Andrew Or <and...@databricks.com> Committed: Tue Nov 10 16:51:33 2015 -0800 ---------------------------------------------------------------------- .../org/apache/spark/util/AsynchronousListenerBus.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/93ac3074/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala index 61b5a4c..b8481ea 100644 --- a/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/AsynchronousListenerBus.scala @@ -67,15 +67,12 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri processingEvent = true } try { - val event = eventQueue.poll - if (event == null) { + if (stopped.get()) { // Get out of the while loop and shutdown the daemon thread - if (!stopped.get) { - throw new IllegalStateException("Polling `null` from eventQueue means" + - " the listener bus has been stopped. So `stopped` must be true") - } return } + val event = eventQueue.poll + assert(event != null, "event queue was empty but the listener bus was not stopped") postToAll(event) } finally { self.synchronized { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org