Repository: spark
Updated Branches:
  refs/heads/branch-1.5 125827a4f -> 03bcf627d


[SPARK-9519] [YARN] Confirm stop sc successfully when application was killed

Currently, when we kill application on Yarn, then will call sc.stop() at Yarn 
application state monitor thread, then in YarnClientSchedulerBackend.stop() 
will call interrupt this will cause SparkContext not stop fully as we will wait 
executor to exit.

Author: linweizhong <linweizh...@huawei.com>

Closes #7846 from Sephiroth-Lin/SPARK-9519 and squashes the following commits:

1ae736d [linweizhong] Update comments
2e8e365 [linweizhong] Add comment explaining the code
ad0e23b [linweizhong] Update
243d2c7 [linweizhong] Confirm stop sc successfully when application was killed

(cherry picked from commit 7a969a6967c4ecc0f004b73bff27a75257a94e86)
Signed-off-by: Marcelo Vanzin <van...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03bcf627
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03bcf627
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03bcf627

Branch: refs/heads/branch-1.5
Commit: 03bcf627dd829c0e9bbcaf1d42626909511eccbf
Parents: 125827a
Author: linweizhong <linweizh...@huawei.com>
Authored: Wed Aug 5 10:16:12 2015 -0700
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Wed Aug 5 10:16:26 2015 -0700

----------------------------------------------------------------------
 .../cluster/YarnClientSchedulerBackend.scala    | 47 +++++++++++++-------
 1 file changed, 32 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/03bcf627/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index d97fa2e..d225061 100644
--- 
a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -33,7 +33,7 @@ private[spark] class YarnClientSchedulerBackend(
 
   private var client: Client = null
   private var appId: ApplicationId = null
-  private var monitorThread: Thread = null
+  private var monitorThread: MonitorThread = null
 
   /**
    * Create a Yarn client to submit an application to the ResourceManager.
@@ -132,23 +132,41 @@ private[spark] class YarnClientSchedulerBackend(
   }
 
   /**
+   * We create this class for SPARK-9519. Basically when we interrupt the 
monitor thread it's
+   * because the SparkContext is being shut down(sc.stop() called by user 
code), but if
+   * monitorApplication return, it means the Yarn application finished before 
sc.stop() was called,
+   * which means we should call sc.stop() here, and we don't allow the monitor 
to be interrupted
+   * before SparkContext stops successfully.
+   */
+  private class MonitorThread extends Thread {
+    private var allowInterrupt = true
+
+    override def run() {
+      try {
+        val (state, _) = client.monitorApplication(appId, logApplicationReport 
= false)
+        logError(s"Yarn application has already exited with state $state!")
+        allowInterrupt = false
+        sc.stop()
+      } catch {
+        case e: InterruptedException => logInfo("Interrupting monitor thread")
+      }
+    }
+
+    def stopMonitor(): Unit = {
+      if (allowInterrupt) {
+        this.interrupt()
+      }
+    }
+  }
+
+  /**
    * Monitor the application state in a separate thread.
    * If the application has exited for any reason, stop the SparkContext.
    * This assumes both `client` and `appId` have already been set.
    */
-  private def asyncMonitorApplication(): Thread = {
+  private def asyncMonitorApplication(): MonitorThread = {
     assert(client != null && appId != null, "Application has not been 
submitted yet!")
-    val t = new Thread {
-      override def run() {
-        try {
-          val (state, _) = client.monitorApplication(appId, 
logApplicationReport = false)
-          logError(s"Yarn application has already exited with state $state!")
-          sc.stop()
-        } catch {
-          case e: InterruptedException => logInfo("Interrupting monitor 
thread")
-        }
-      }
-    }
+    val t = new MonitorThread
     t.setName("Yarn application state monitor")
     t.setDaemon(true)
     t
@@ -160,7 +178,7 @@ private[spark] class YarnClientSchedulerBackend(
   override def stop() {
     assert(client != null, "Attempted to stop this scheduler before starting 
it!")
     if (monitorThread != null) {
-      monitorThread.interrupt()
+      monitorThread.stopMonitor()
     }
     super.stop()
     client.stop()
@@ -174,5 +192,4 @@ private[spark] class YarnClientSchedulerBackend(
       super.applicationId
     }
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to