[FLINK-2079] Add TaskManager deathwatch thread for YARN case
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/11b021b0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/11b021b0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/11b021b0 Branch: refs/heads/master Commit: 11b021b0fb36503c06596323b39d531225057f1e Parents: b2b0fe7 Author: Robert Metzger <rmetz...@apache.org> Authored: Fri May 22 13:51:02 2015 +0200 Committer: Robert Metzger <rmetz...@apache.org> Committed: Wed May 27 09:56:54 2015 +0200 ---------------------------------------------------------------------- .../org/apache/flink/configuration/ConfigConstants.java | 5 +++++ .../org/apache/flink/runtime/process/ProcessReaper.java | 2 +- .../scala/org/apache/flink/runtime/akka/AkkaUtils.scala | 9 ++++++++- .../org/apache/flink/runtime/taskmanager/TaskManager.scala | 5 ++--- .../apache/flink/yarn/appMaster/YarnTaskManagerRunner.java | 3 +++ 5 files changed, 19 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/11b021b0/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 26d4fbe..92acd3f 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -406,6 +406,11 @@ public final class ConfigConstants { * Timeout for all blocking calls that look up remote actors */ public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout"; + + /** + * Exit JVM on fatal Akka errors + */ + public static final String AKKA_JVM_EXIT_ON_FATAL_ERROR = "akka.jvm-exit-on-fatal-error"; // ----------------------------- Streaming -------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/11b021b0/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java index 644d7b7..09e1839 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java @@ -56,7 +56,7 @@ public class ProcessReaper extends UntypedActor { Thread.sleep(100); } catch (InterruptedException e) { - // not really problem if we don't sleep... + // not really a problem if we don't sleep... } } } http://git-wip-us.apache.org/repos/asf/flink/blob/11b021b0/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 5b33017..7ffaddd 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -139,6 +139,13 @@ object AkkaUtils { val lifecycleEvents = configuration.getBoolean(ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS, ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS) + val jvmExitOnFatalError = if ( + configuration.getBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, false)){ + "on" + } else { + "off" + } + val logLifecycleEvents = if (lifecycleEvents) "on" else "off" val logLevel = getLogLevel @@ -152,7 +159,7 @@ object AkkaUtils { | logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" | log-config-on-start = off | - | jvm-exit-on-fatal-error = off + | jvm-exit-on-fatal-error = $jvmExitOnFatalError | | serialize-messages = off | http://git-wip-us.apache.org/repos/asf/flink/blob/11b021b0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 8a45fa4..7bf5bc5 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -158,7 +158,6 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { private var heartbeatScheduler: Option[Cancellable] = None - // -------------------------------------------------------------------------- // Actor messages and life cycle // -------------------------------------------------------------------------- @@ -192,7 +191,7 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { */ override def postStop(): Unit = { log.info(s"Stopping TaskManager ${self.path.toSerializationFormat}.") - + cancelAndClearEverything(new Exception("TaskManager is shutting down.")) if (isConnected) { @@ -1289,7 +1288,7 @@ object TaskManager { streamingMode, taskManagerClass) - // start a process reaper that watches the JobManager. If the JobManager actor dies, + // start a process reaper that watches the JobManager. If the TaskManager actor dies, // the process reaper will kill the JVM process (to ensure easy failure detection) LOG.debug("Starting TaskManager process reaper") taskManagerSystem.actorOf( http://git-wip-us.apache.org/repos/asf/flink/blob/11b021b0/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java index 564a0bd..3f13990 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java @@ -85,6 +85,9 @@ public class YarnTaskManagerRunner { LOG.info("YARN daemon runs as '" + UserGroupInformation.getCurrentUser().getShortUserName() +"' setting user to execute Flink TaskManager to '"+yarnClientUsername+"'"); + // tell akka to die in case of an error + configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true); + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername); for (Token<? extends TokenIdentifier> toks : UserGroupInformation.getCurrentUser().getTokens()) { ugi.addToken(toks);