[ https://issues.apache.org/jira/browse/GEARPUMP-265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15833375#comment-15833375 ]
ASF GitHub Bot commented on GEARPUMP-265: ----------------------------------------- Github user manuzhang commented on a diff in the pull request: https://github.com/apache/incubator-gearpump/pull/134#discussion_r97216886 --- Diff: core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala --- @@ -217,40 +206,56 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch LOG.error(failed.reason) } - private def getAppMasterStatus(appId: Int): AppMasterStatus = { - if (activeAppMasters.contains(appId)) { - AppMasterActive - } else if (deadAppMasters.contains(appId)) { - AppMasterInActive - } else if (appMasterRegistry.contains(appId)) { - AppMasterPending - } else { - AppMasterNonExist - } - } + def appMasterMessage: Receive = { + case RegisterAppMaster(appId, appMaster, workerInfo) => + val appInfo = applicationRegistry.get(appId) + appInfo match { + case Some(info) => + LOG.info(s"Register AppMaster for app: $appId") + val updatedInfo = info.onAppMasterRegister(appMaster, workerInfo.ref) + context.watch(appMaster) + applicationRegistry += appId -> updatedInfo + kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(nextAppId, applicationRegistry)) + sender ! AppMasterRegistered(appId) + case None => + LOG.error(s"Can not find submitted application $appId") + } - private def shutDownExecutorTimeOut(): Unit = { - LOG.error(s"Shut down executor time out") - } + case ApplicationStatusChanged(appId, newStatus, timeStamp, error) => + applicationRegistry.get(appId) match { + case Some(appRuntimeInfo) => + var updatedStatus: ApplicationRuntimeInfo = null + LOG.info(s"Application $appId change to ${newStatus.toString} at $timeStamp") + newStatus match { + case ApplicationStatus.Active => + updatedStatus = appRuntimeInfo.onActived(timeStamp) + sender ! AppMasterActivated(appId) + case finished@ApplicationStatus.Finished => + killAppMasterExecutor(appId, appRuntimeInfo.worker) + updatedStatus = appRuntimeInfo.onTerminalStatus(timeStamp, finished) + appResultListeners.getOrElse(appId, List.empty).foreach{ client => + client ! ApplicationFinished(appId) + } + case failed@ApplicationStatus.Failed => + killAppMasterExecutor(appId, appRuntimeInfo.worker) + updatedStatus = appRuntimeInfo.onTerminalStatus(timeStamp, failed) + appResultListeners.getOrElse(appId, List.empty).foreach{ client => + client ! ApplicationFailed(appId, error) + } + case terminated@ApplicationStatus.Terminated => + updatedStatus = appRuntimeInfo.onTerminalStatus(timeStamp, terminated) + case _ => --- End diff -- what is for ? Any comments ? > remove AppMasterRuntimeInfo from AppMasterContext > ------------------------------------------------- > > Key: GEARPUMP-265 > URL: https://issues.apache.org/jira/browse/GEARPUMP-265 > Project: Apache Gearpump > Issue Type: Sub-task > Affects Versions: 0.8.2 > Reporter: Huafeng Wang > -- This message was sent by Atlassian JIRA (v6.3.4#6332)