Repository: spark
Updated Branches:
  refs/heads/branch-1.4 3d6a9214e -> 5ad9f950c


[SPARK-9446] Clear Active SparkContext in stop() method

In thread 'stopped SparkContext remaining active' on mailing list, Andres 
observed the following in driver log:
```
15/07/29 15:17:09 WARN YarnSchedulerBackend$YarnSchedulerEndpoint: 
ApplicationMaster has disassociated: <address removed>
15/07/29 15:17:09 INFO YarnClientSchedulerBackend: Shutting down all executors
Exception in thread "Yarn application state monitor" 
org.apache.spark.SparkException: Error asking standalone scheduler to shut down 
executors
        at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:261)
        at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stop(CoarseGrainedSchedulerBackend.scala:266)
        at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend.stop(YarnClientSchedulerBackend.scala:158)
        at 
org.apache.spark.scheduler.TaskSchedulerImpl.stop(TaskSchedulerImpl.scala:416)
        at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1411)
        at org.apache.spark.SparkContext.stop(SparkContext.scala:1644)
        at 
org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$$anon$1.run(YarnClientSchedulerBackend.scala:139)
Caused by: java.lang.InterruptedException
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.tryAcquireSharedNanos(AbstractQueuedSynchronizer.java:1325)
        at 
scala.concurrent.impl.Promise$DefaultPromise.tryAwait(Promise.scala:208)
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:218)
        at 
scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
        at 
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:190)15/07/29 15:17:09 
INFO YarnClientSchedulerBackend: Asking each executor to shut down

        at 
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102)
        at 
org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78)
        at 
org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.stopExecutors(CoarseGrainedSchedulerBackend.scala:257)
        ... 6 more
```
Effect of the above exception is that a stopped SparkContext is returned to 
user since SparkContext.clearActiveContext() is not called.

Author: tedyu <yuzhih...@gmail.com>

Closes #7756 from tedyu/master and squashes the following commits:

7339ff2 [tedyu] Move null assignment out of tryLogNonFatalError block
6e02cd9 [tedyu] Use Utils.tryLogNonFatalError to guard resource release
f5fb519 [tedyu] Clear Active SparkContext in stop() method using finally

(cherry picked from commit 27ae851ce16082775ffbcb5b8fc6bdbe65dc70fc)
Signed-off-by: Sean Owen <so...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5ad9f950
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5ad9f950
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5ad9f950

Branch: refs/heads/branch-1.4
Commit: 5ad9f950c4bd0042d79cdccb5277c10f8412be85
Parents: 3d6a921
Author: tedyu <yuzhih...@gmail.com>
Authored: Fri Jul 31 18:16:55 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Fri Jul 31 18:17:05 2015 +0100

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 50 +++++++++++++++-----
 1 file changed, 37 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5ad9f950/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index d499aba..3caae87 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1630,33 +1630,57 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
       Utils.removeShutdownHook(_shutdownHookRef)
     }
 
-    postApplicationEnd()
-    _ui.foreach(_.stop())
+    Utils.tryLogNonFatalError {
+      postApplicationEnd()
+    }
+    Utils.tryLogNonFatalError {
+      _ui.foreach(_.stop())
+    }
     if (env != null) {
-      env.metricsSystem.report()
+      Utils.tryLogNonFatalError {
+        env.metricsSystem.report()
+      }
     }
     if (metadataCleaner != null) {
-      metadataCleaner.cancel()
+      Utils.tryLogNonFatalError {
+        metadataCleaner.cancel()
+      }
+    }
+    Utils.tryLogNonFatalError {
+      _cleaner.foreach(_.stop())
+    }
+    Utils.tryLogNonFatalError {
+      _executorAllocationManager.foreach(_.stop())
     }
-    _cleaner.foreach(_.stop())
-    _executorAllocationManager.foreach(_.stop())
     if (_dagScheduler != null) {
-      _dagScheduler.stop()
+      Utils.tryLogNonFatalError {
+        _dagScheduler.stop()
+      }
       _dagScheduler = null
     }
     if (_listenerBusStarted) {
-      listenerBus.stop()
-      _listenerBusStarted = false
+      Utils.tryLogNonFatalError {
+        listenerBus.stop()
+        _listenerBusStarted = false
+      }
+    }
+    Utils.tryLogNonFatalError {
+      _eventLogger.foreach(_.stop())
     }
-    _eventLogger.foreach(_.stop())
     if (env != null && _heartbeatReceiver != null) {
-      env.rpcEnv.stop(_heartbeatReceiver)
+      Utils.tryLogNonFatalError {
+        env.rpcEnv.stop(_heartbeatReceiver)
+      }
+    }
+    Utils.tryLogNonFatalError {
+      _progressBar.foreach(_.stop())
     }
-    _progressBar.foreach(_.stop())
     _taskScheduler = null
     // TODO: Cache.stop()?
     if (_env != null) {
-      _env.stop()
+      Utils.tryLogNonFatalError {
+        _env.stop()
+      }
       SparkEnv.set(null)
     }
     SparkContext.clearActiveContext()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to