Repository: spark Updated Branches: refs/heads/master d292f7483 -> 767d288b6
[SPARK-11655][CORE] Fix deadlock in handling of launcher stop(). The stop() callback was trying to close the launcher connection in the same thread that handles connection data, which ended up causing a deadlock. So avoid that by dispatching the stop() request in its own thread. On top of that, add some exception safety to a few parts of the code, and use "destroyForcibly" from Java 8 if it's available, to force kill the child process. The flip side is that "kill()" may not actually work if running Java 7. Author: Marcelo Vanzin <van...@cloudera.com> Closes #9633 from vanzin/SPARK-11655. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/767d288b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/767d288b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/767d288b Branch: refs/heads/master Commit: 767d288b6b33a79d99324b70c2ac079fcf484a50 Parents: d292f74 Author: Marcelo Vanzin <van...@cloudera.com> Authored: Thu Nov 12 14:29:16 2015 -0800 Committer: Marcelo Vanzin <van...@cloudera.com> Committed: Thu Nov 12 14:29:16 2015 -0800 ---------------------------------------------------------------------- .../apache/spark/launcher/LauncherBackend.scala | 12 ++++++++++-- .../cluster/SparkDeploySchedulerBackend.scala | 20 +++++++++++--------- .../spark/launcher/ChildProcAppHandle.java | 17 +++++++++++++++-- .../apache/spark/launcher/SparkAppHandle.java | 3 +++ 4 files changed, 39 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/767d288b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala index 3ea984c..a5d41a1 100644 --- a/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala +++ b/core/src/main/scala/org/apache/spark/launcher/LauncherBackend.scala @@ -21,7 +21,7 @@ import java.net.{InetAddress, Socket} import org.apache.spark.SPARK_VERSION import org.apache.spark.launcher.LauncherProtocol._ -import org.apache.spark.util.ThreadUtils +import org.apache.spark.util.{ThreadUtils, Utils} /** * A class that can be used to talk to a launcher server. Users should extend this class to @@ -88,12 +88,20 @@ private[spark] abstract class LauncherBackend { */ protected def onDisconnected() : Unit = { } + private def fireStopRequest(): Unit = { + val thread = LauncherBackend.threadFactory.newThread(new Runnable() { + override def run(): Unit = Utils.tryLogNonFatalError { + onStopRequest() + } + }) + thread.start() + } private class BackendConnection(s: Socket) extends LauncherConnection(s) { override protected def handle(m: Message): Unit = m match { case _: Stop => - onStopRequest() + fireStopRequest() case _ => throw new IllegalArgumentException(s"Unexpected message type: ${m.getClass().getName()}") http://git-wip-us.apache.org/repos/asf/spark/blob/767d288b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 05d9bc9..5105475 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -191,17 +191,19 @@ private[spark] class SparkDeploySchedulerBackend( } private def stop(finalState: SparkAppHandle.State): Unit = synchronized { - stopping = true + try { + stopping = true - launcherBackend.setState(finalState) - launcherBackend.close() + super.stop() + client.stop() - super.stop() - client.stop() - - val callback = shutdownCallback - if (callback != null) { - callback(this) + val callback = shutdownCallback + if (callback != null) { + callback(this) + } + } finally { + launcherBackend.setState(finalState) + launcherBackend.close() } } http://git-wip-us.apache.org/repos/asf/spark/blob/767d288b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java ---------------------------------------------------------------------- diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java index de50f14..1bfda28 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java @@ -18,6 +18,7 @@ package org.apache.spark.launcher; import java.io.IOException; +import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ThreadFactory; @@ -102,8 +103,20 @@ class ChildProcAppHandle implements SparkAppHandle { disconnect(); } if (childProc != null) { - childProc.destroy(); - childProc = null; + try { + childProc.exitValue(); + } catch (IllegalThreadStateException e) { + // Child is still alive. Try to use Java 8's "destroyForcibly()" if available, + // fall back to the old API if it's not there. + try { + Method destroy = childProc.getClass().getMethod("destroyForcibly"); + destroy.invoke(childProc); + } catch (Exception inner) { + childProc.destroy(); + } + } finally { + childProc = null; + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/767d288b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java ---------------------------------------------------------------------- diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java index 13dd9f1..e9caf0b 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkAppHandle.java @@ -89,6 +89,9 @@ public interface SparkAppHandle { * Tries to kill the underlying application. Implies {@link #disconnect()}. This will not send * a {@link #stop()} message to the application, so it's recommended that users first try to * stop the application cleanly and only resort to this method if that fails. + * <p> + * Note that if the application is running as a child process, this method fail to kill the + * process when using Java 7. This may happen if, for example, the application is deadlocked. */ void kill(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org