[FLINK-2079] Add TaskManager deathwatch thread for YARN case

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/11b021b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/11b021b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/11b021b0

Branch: refs/heads/master
Commit: 11b021b0fb36503c06596323b39d531225057f1e
Parents: b2b0fe7
Author: Robert Metzger <rmetz...@apache.org>
Authored: Fri May 22 13:51:02 2015 +0200
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Wed May 27 09:56:54 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/configuration/ConfigConstants.java     | 5 +++++
 .../org/apache/flink/runtime/process/ProcessReaper.java     | 2 +-
 .../scala/org/apache/flink/runtime/akka/AkkaUtils.scala     | 9 ++++++++-
 .../org/apache/flink/runtime/taskmanager/TaskManager.scala  | 5 ++---
 .../apache/flink/yarn/appMaster/YarnTaskManagerRunner.java  | 3 +++
 5 files changed, 19 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/11b021b0/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 26d4fbe..92acd3f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -406,6 +406,11 @@ public final class ConfigConstants {
         * Timeout for all blocking calls that look up remote actors
         */
        public static final String AKKA_LOOKUP_TIMEOUT = "akka.lookup.timeout";
+
+       /**
+        * Exit JVM on fatal Akka errors
+        */
+       public static final String AKKA_JVM_EXIT_ON_FATAL_ERROR = 
"akka.jvm-exit-on-fatal-error";
        
        // ----------------------------- Streaming 
--------------------------------
        

http://git-wip-us.apache.org/repos/asf/flink/blob/11b021b0/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
index 644d7b7..09e1839 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
@@ -56,7 +56,7 @@ public class ProcessReaper extends UntypedActor {
                                                Thread.sleep(100);
                                        }
                                        catch (InterruptedException e) {
-                                               // not really problem if we 
don't sleep...
+                                               // not really a problem if we 
don't sleep...
                                        }
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/11b021b0/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 5b33017..7ffaddd 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -139,6 +139,13 @@ object AkkaUtils {
     val lifecycleEvents = 
configuration.getBoolean(ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS,
       ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS)
 
+    val jvmExitOnFatalError = if (
+      configuration.getBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, 
false)){
+      "on"
+    } else {
+      "off"
+    }
+
     val logLifecycleEvents = if (lifecycleEvents) "on" else "off"
 
     val logLevel = getLogLevel
@@ -152,7 +159,7 @@ object AkkaUtils {
         | logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
         | log-config-on-start = off
         |
-        | jvm-exit-on-fatal-error = off
+        | jvm-exit-on-fatal-error = $jvmExitOnFatalError
         |
         | serialize-messages = off
         |

http://git-wip-us.apache.org/repos/asf/flink/blob/11b021b0/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 8a45fa4..7bf5bc5 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -158,7 +158,6 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
 
   private var heartbeatScheduler: Option[Cancellable] = None
 
-
   // --------------------------------------------------------------------------
   //  Actor messages and life cycle
   // --------------------------------------------------------------------------
@@ -192,7 +191,7 @@ extends Actor with ActorLogMessages with 
ActorSynchronousLogging {
    */
   override def postStop(): Unit = {
     log.info(s"Stopping TaskManager ${self.path.toSerializationFormat}.")
-
+    
     cancelAndClearEverything(new Exception("TaskManager is shutting down."))
 
     if (isConnected) {
@@ -1289,7 +1288,7 @@ object TaskManager {
                                                            streamingMode,
                                                            taskManagerClass)
 
-      // start a process reaper that watches the JobManager. If the JobManager 
actor dies,
+      // start a process reaper that watches the JobManager. If the 
TaskManager actor dies,
       // the process reaper will kill the JVM process (to ensure easy failure 
detection)
       LOG.debug("Starting TaskManager process reaper")
       taskManagerSystem.actorOf(

http://git-wip-us.apache.org/repos/asf/flink/blob/11b021b0/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
 
b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
index 564a0bd..3f13990 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/appMaster/YarnTaskManagerRunner.java
@@ -85,6 +85,9 @@ public class YarnTaskManagerRunner {
                LOG.info("YARN daemon runs as '" + 
UserGroupInformation.getCurrentUser().getShortUserName()
                                +"' setting user to execute Flink TaskManager 
to '"+yarnClientUsername+"'");
 
+               // tell akka to die in case of an error
+               
configuration.setBoolean(ConfigConstants.AKKA_JVM_EXIT_ON_FATAL_ERROR, true);
+
                UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(yarnClientUsername);
                for (Token<? extends TokenIdentifier> toks : 
UserGroupInformation.getCurrentUser().getTokens()) {
                        ugi.addToken(toks);

Reply via email to