[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15204584#comment-15204584 ]
ASF GitHub Bot commented on FLINK-3544: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56855904 --- Diff: flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala --- @@ -886,10 +957,42 @@ class JobManager( if (instanceManager.isRegistered(taskManager)) { log.info(s"Task manager ${taskManager.path} wants to disconnect, because $msg.") - instanceManager.unregisterTaskManager(taskManager, false) + instanceManager.unregisterTaskManager(taskManager, false) context.unwatch(taskManager) } + case msg: StopCluster => + + log.info(s"Stopping JobManager with final application status ${msg.finalStatus()} " + + s"and diagnostics: ${msg.message()}") + + val respondTo = sender() + + // stop all task managers + instanceManager.getAllRegisteredInstances.asScala foreach { + instance => + instance.getActorGateway.tell(msg) + } + + // send resource manager the ok + currentResourceManager match { + case Some(rm) => + + // inform rm + rm ! decorateMessage(msg) + + respondTo ! decorateMessage(StopClusterSuccessful.get()) + + // trigger shutdown + shutdown() + + case None => + // retry + context.system.scheduler.scheduleOnce( + 2 seconds, self, decorateMessage(msg) + )(context.dispatcher) --- End diff -- What if we will never establish a connection to the RM again for some reason? Wouldn't that mean that we will never shutdown the JM? > ResourceManager runtime components > ---------------------------------- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager > Affects Versions: 1.1.0 > Reporter: Maximilian Michels > Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)