[FLINK-1796] [jobmanager] In local mode, the embedded TaskManager is watched by a process reaper as well.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8c321425 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8c321425 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8c321425 Branch: refs/heads/master Commit: 8c32142528590a030693529c7c8d93f194968c0a Parents: d6ea1f2 Author: Stephan Ewen <se...@apache.org> Authored: Fri Mar 27 20:26:33 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Sun Mar 29 18:34:32 2015 +0200 ---------------------------------------------------------------------- .../java/org/apache/flink/runtime/process/ProcessReaper.java | 2 +- .../org/apache/flink/runtime/jobmanager/JobManager.scala | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/8c321425/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java index b12b82d..5ab550f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/process/ProcessReaper.java @@ -47,7 +47,7 @@ public class ProcessReaper extends UntypedActor { if (message instanceof Terminated) { try { Terminated term = (Terminated) message; - String name = term.actor().path().name(); + String name = term.actor().path().toSerializationFormat(); if (log != null) { log.error("Actor " + name + " terminated, stopping process..."); } http://git-wip-us.apache.org/repos/asf/flink/blob/8c321425/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 2233dbf..9aa476d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -769,8 +769,14 @@ object JobManager { if (executionMode == JobManagerMode.LOCAL) { LOG.info("Starting embedded TaskManager for JobManager's LOCAL execution mode") - TaskManager.startTaskManagerActor(configuration, jobManagerSystem, listeningAddress, + val taskManagerActor = TaskManager.startTaskManagerActor( + configuration, jobManagerSystem, listeningAddress, TaskManager.TASK_MANAGER_NAME, true, true, classOf[TaskManager]) + + LOG.debug("Starting TaskManager process reaper") + jobManagerSystem.actorOf( + Props(classOf[ProcessReaper], taskManagerActor, LOG, RUNTIME_FAILURE_RETURN_CODE), + "TaskManager_Process_Reaper") } // start the job manager web frontend