[FLINK-1796] [jobmanager] In local mode, the embedded TaskManager is watched by 
a process reaper as well.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c321425
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c321425
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c321425

Branch: refs/heads/master
Commit: 8c32142528590a030693529c7c8d93f194968c0a
Parents: d6ea1f2
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Mar 27 20:26:33 2015 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Mar 29 18:34:32 2015 +0200

----------------------------------------------------------------------
 .../java/org/apache/flink/runtime/process/ProcessReaper.java | 2 +-
 .../org/apache/flink/runtime/jobmanager/JobManager.scala     | 8 +++++++-
 2 files changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8c321425/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
index b12b82d..5ab550f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java
@@ -47,7 +47,7 @@ public class ProcessReaper extends UntypedActor {
                if (message instanceof Terminated) {
                        try {
                                Terminated term = (Terminated) message;
-                               String name = term.actor().path().name();
+                               String name = 
term.actor().path().toSerializationFormat();
                                if (log != null) {
                                        log.error("Actor " + name + " 
terminated, stopping process...");
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/8c321425/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2233dbf..9aa476d 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -769,8 +769,14 @@ object JobManager {
       if (executionMode == JobManagerMode.LOCAL) {
         LOG.info("Starting embedded TaskManager for JobManager's LOCAL 
execution mode")
 
-        TaskManager.startTaskManagerActor(configuration, jobManagerSystem, 
listeningAddress,
+        val taskManagerActor = TaskManager.startTaskManagerActor(
+          configuration, jobManagerSystem, listeningAddress,
           TaskManager.TASK_MANAGER_NAME, true, true, classOf[TaskManager])
+
+        LOG.debug("Starting TaskManager process reaper")
+        jobManagerSystem.actorOf(
+          Props(classOf[ProcessReaper], taskManagerActor, LOG, 
RUNTIME_FAILURE_RETURN_CODE),
+          "TaskManager_Process_Reaper")
       }
 
       // start the job manager web frontend

Reply via email to