Repository: spark
Updated Branches:
  refs/heads/master d292f7483 -> 767d288b6


[SPARK-11655][CORE] Fix deadlock in handling of launcher stop().

The stop() callback was trying to close the launcher connection in the
same thread that handles connection data, which ended up causing a
deadlock. So avoid that by dispatching the stop() request in its own
thread.

On top of that, add some exception safety to a few parts of the code,
and use "destroyForcibly" from Java 8 if it's available, to force
kill the child process. The flip side is that "kill()" may not actually
work if running Java 7.

Author: Marcelo Vanzin <van...@cloudera.com>

Closes #9633 from vanzin/SPARK-11655.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/767d288b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/767d288b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/767d288b

Branch: refs/heads/master
Commit: 767d288b6b33a79d99324b70c2ac079fcf484a50
Parents: d292f74
Author: Marcelo Vanzin <van...@cloudera.com>
Authored: Thu Nov 12 14:29:16 2015 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Thu Nov 12 14:29:16 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/launcher/LauncherBackend.scala | 12 ++++++++++--
 .../cluster/SparkDeploySchedulerBackend.scala   | 20 +++++++++++---------
 .../spark/launcher/ChildProcAppHandle.java      | 17 +++++++++++++++--
 .../apache/spark/launcher/SparkAppHandle.java   |  3 +++
 4 files changed, 39 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/767d288b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala 
b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala
index 3ea984c..a5d41a1 100644
--- a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala
+++ b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala
@@ -21,7 +21,7 @@ import java.net.{InetAddress, Socket}
 
 import org.apache.spark.SPARK_VERSION
 import org.apache.spark.launcher.LauncherProtocol._
-import org.apache.spark.util.ThreadUtils
+import org.apache.spark.util.{ThreadUtils, Utils}
 
 /**
  * A class that can be used to talk to a launcher server. Users should extend 
this class to
@@ -88,12 +88,20 @@ private[spark] abstract class LauncherBackend {
    */
   protected def onDisconnected() : Unit = { }
 
+  private def fireStopRequest(): Unit = {
+    val thread = LauncherBackend.threadFactory.newThread(new Runnable() {
+      override def run(): Unit = Utils.tryLogNonFatalError {
+        onStopRequest()
+      }
+    })
+    thread.start()
+  }
 
   private class BackendConnection(s: Socket) extends LauncherConnection(s) {
 
     override protected def handle(m: Message): Unit = m match {
       case _: Stop =>
-        onStopRequest()
+        fireStopRequest()
 
       case _ =>
         throw new IllegalArgumentException(s"Unexpected message type: 
${m.getClass().getName()}")

http://git-wip-us.apache.org/repos/asf/spark/blob/767d288b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 05d9bc9..5105475 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -191,17 +191,19 @@ private[spark] class SparkDeploySchedulerBackend(
   }
 
   private def stop(finalState: SparkAppHandle.State): Unit = synchronized {
-    stopping = true
+    try {
+      stopping = true
 
-    launcherBackend.setState(finalState)
-    launcherBackend.close()
+      super.stop()
+      client.stop()
 
-    super.stop()
-    client.stop()
-
-    val callback = shutdownCallback
-    if (callback != null) {
-      callback(this)
+      val callback = shutdownCallback
+      if (callback != null) {
+        callback(this)
+      }
+    } finally {
+      launcherBackend.setState(finalState)
+      launcherBackend.close()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/767d288b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java 
b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
index de50f14..1bfda28 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java
@@ -18,6 +18,7 @@
 package org.apache.spark.launcher;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ThreadFactory;
@@ -102,8 +103,20 @@ class ChildProcAppHandle implements SparkAppHandle {
       disconnect();
     }
     if (childProc != null) {
-      childProc.destroy();
-      childProc = null;
+      try {
+        childProc.exitValue();
+      } catch (IllegalThreadStateException e) {
+        // Child is still alive. Try to use Java 8's "destroyForcibly()" if 
available,
+        // fall back to the old API if it's not there.
+        try {
+          Method destroy = childProc.getClass().getMethod("destroyForcibly");
+          destroy.invoke(childProc);
+        } catch (Exception inner) {
+          childProc.destroy();
+        }
+      } finally {
+        childProc = null;
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/767d288b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
----------------------------------------------------------------------
diff --git 
a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java 
b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
index 13dd9f1..e9caf0b 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java
@@ -89,6 +89,9 @@ public interface SparkAppHandle {
    * Tries to kill the underlying application. Implies {@link #disconnect()}. 
This will not send
    * a {@link #stop()} message to the application, so it's recommended that 
users first try to
    * stop the application cleanly and only resort to this method if that fails.
+   * <p>
+   * Note that if the application is running as a child process, this method 
fail to kill the
+   * process when using Java 7. This may happen if, for example, the 
application is deadlocked.
    */
   void kill();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to