Repository: incubator-gearpump Updated Branches: refs/heads/master 58a1a63e6 -> 5498ae205
[GEARPUMP-283] Return app exception to client Author: manuzhang <[email protected]> Closes #161 from manuzhang/GEARPUMP-283. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/5498ae20 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/5498ae20 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/5498ae20 Branch: refs/heads/master Commit: 5498ae2058ee0faf98b4107bf2e73e927e80514a Parents: 58a1a63 Author: manuzhang <[email protected]> Authored: Tue Feb 28 11:00:49 2017 +0800 Committer: manuzhang <[email protected]> Committed: Tue Feb 28 11:01:09 2017 +0800 ---------------------------------------------------------------------- .../apache/gearpump/streaming/ClusterMessage.scala | 3 ++- .../gearpump/streaming/appmaster/AppMaster.scala | 17 ++++++++++------- .../gearpump/streaming/appmaster/TaskManager.scala | 4 ++-- .../gearpump/streaming/executor/Executor.scala | 2 +- 4 files changed, 15 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5498ae20/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala index 4b801a2..8a76916 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala @@ -77,7 +77,8 @@ object ExecutorToAppMaster { case class RegisterTask(taskId: TaskId, executorId: Int, task: HostPort) case class UnRegisterTask(taskId: TaskId, executorId: Int) - case class MessageLoss(executorId: Int, taskId: TaskId, cause: String) + case class MessageLoss(executorId: Int, taskId: TaskId, + cause: String, ex: Option[Throwable] = None) } object AppMasterToMaster { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5498ae20/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala index 5ace1b2..15df0b3 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala @@ -82,7 +82,8 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli private var taskManager: Option[ActorRef] = None private var clockService: Option[ActorRef] = None private val systemConfig = context.system.settings.config - private var lastFailure = LastFailure(0L, null) + // TODO: Consolidate failure and exception into one which requires refactoring of MessageLoss + private var lastFailure: (LastFailure, Option[Throwable]) = (LastFailure(0L, null), None) private val appMasterBrief = ExecutorBrief(APPMASTER_DEFAULT_EXECUTOR_ID, self.path.toString, @@ -160,8 +161,8 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli } else { LOG.error(s"replay for invalid appId ${replay.appId}") } - case messageLoss: MessageLoss => - lastFailure = LastFailure(System.currentTimeMillis(), messageLoss.cause) + case messageLoss@MessageLoss(_, _, cause, ex) => + lastFailure = LastFailure(System.currentTimeMillis(), cause) -> ex taskManager.foreach(_ forward messageLoss) case lookupTask: LookupTaskActorRef => taskManager.foreach(_ forward lookupTask) @@ -259,7 +260,7 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli dagManager forward replaceDAG case GetLastFailure(id) => if (id == appId) { - sender ! lastFailure + sender ! lastFailure._1 } else { LOG.error(s"GetLastFailure for invalid appId $id") } @@ -306,15 +307,17 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli case FailedToRecover(errorMsg) => if (context.children.toList.contains(sender())) { LOG.error(errorMsg) - val failed = ApplicationStatusChanged(appId, ApplicationStatus.FAILED, lastFailure.time, - new Exception(lastFailure.error)) + val (failure, exception) = lastFailure + val failed = ApplicationStatusChanged(appId, ApplicationStatus.FAILED, failure.time, + exception.getOrElse(new Exception(failure.error))) masterProxy ! failed } case AllocateResourceTimeOut => val errorMsg = s"Failed to allocate resource in time, shutdown application $appId" LOG.error(errorMsg) + val (failure, exception) = lastFailure val failed = ApplicationStatusChanged(appId, ApplicationStatus.FAILED, - System.currentTimeMillis(), new Exception(lastFailure.error)) + System.currentTimeMillis(), exception.getOrElse(new Exception(failure.error))) masterProxy ! failed context.stop(self) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5498ae20/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala index 81ed79a..51c4de9 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala @@ -132,7 +132,7 @@ private[appmaster] class TaskManager( self ! executorStopped context.become(recovery(recoverState)) } - case MessageLoss(executorId, taskId, cause) => + case MessageLoss(executorId, taskId, _, _) => if (state.taskRegistry.isTaskRegisteredForExecutor(executorId) && appRestartPolicy.allowRestart) { context.become(recovery(recoverState)) @@ -217,7 +217,7 @@ private[appmaster] class TaskManager( val onMessageLoss: Receive = { case ExecutorStopped(executorId) => context.become(recovery(recoverState)) - case MessageLoss(executorId, taskId, cause) => + case MessageLoss(executorId, taskId, cause, _) => if (state.taskRegistry.isTaskRegisteredForExecutor(executorId) && appRestartPolicy.allowRestart) { context.become(recovery(recoverState)) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5498ae20/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala index 56bf61d..bfc205a 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/executor/Executor.scala @@ -124,7 +124,7 @@ class Executor(executorContext: ExecutorContext, userConf : UserConfig, launcher s" MessageLoss, so that the system will replay all lost message" LOG.error(errorMsg, ex) val detailErrorMsg = errorMsg + "\n" + ExceptionUtils.getStackTrace(ex) - taskId.foreach(appMaster ! MessageLoss(executorId, _, detailErrorMsg)) + taskId.foreach(appMaster ! MessageLoss(executorId, _, detailErrorMsg, Some(ex))) Resume }
