[GEARPUMP-252] Return application status to client

Author: huafengw <[email protected]>

Closes #134 from huafengw/fix265.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/96312a2a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/96312a2a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/96312a2a

Branch: refs/heads/master
Commit: 96312a2acaa547ba486fb2a827eeaf87da0cb271
Parents: 2913a1f
Author: huafengw <[email protected]>
Authored: Tue Jan 24 12:17:19 2017 +0800
Committer: manuzhang <[email protected]>
Committed: Tue Jan 24 12:17:31 2017 +0800

----------------------------------------------------------------------
 .../gearpump/cluster/AppDescription.scala       |  37 ++-
 .../gearpump/cluster/ClusterMessage.scala       |  58 ++--
 .../appmaster/AppMasterRuntimeEnvironment.scala |  26 +-
 .../appmaster/AppMasterRuntimeInfo.scala        |  38 ---
 .../cluster/appmaster/ApplicationMetaData.scala |  30 ++
 .../appmaster/ApplicationRuntimeInfo.scala      |  52 ++++
 .../cluster/appmaster/ApplicationState.scala    |  47 ----
 .../cluster/client/RunningApplication.scala     |  34 ++-
 .../gearpump/cluster/master/AppManager.scala    | 271 ++++++++++---------
 .../cluster/master/AppMasterLauncher.scala      |  40 +--
 .../apache/gearpump/cluster/master/Master.scala |   6 +-
 .../org/apache/gearpump/util/ActorUtil.scala    |  15 +-
 .../cluster/appmaster/AppManagerSpec.scala      |  37 ++-
 .../AppMasterRuntimeEnvironmentSpec.scala       |   2 +-
 .../appmaster/MasterConnectionKeeperSpec.scala  |   2 +-
 .../apache/gearpump/cluster/main/MainSpec.scala |   5 +-
 .../cluster/master/ApplicationStateSpec.scala   |  11 +-
 .../DistShellAppMasterSpec.scala                |  10 +-
 .../DistServiceAppMasterSpec.scala              |   9 +-
 .../experiments/storm/main/GearpumpNimbus.scala |   9 +-
 .../gearpump/integrationtest/TestSpecBase.scala |   5 +-
 .../checklist/CommandLineSpec.scala             |   6 +-
 .../checklist/RestServiceSpec.scala             |   9 +-
 .../checklist/StabilitySpec.scala               |   9 +-
 .../minicluster/CommandLineClient.scala         |  11 +-
 .../minicluster/RestClient.scala                |  11 +-
 .../gearpump/services/util/UpickleUtil.scala    |  21 +-
 .../services/AppMasterServiceSpec.scala         |   6 +-
 .../streaming/appmaster/AppMaster.scala         |  20 +-
 .../appmaster/StreamAppMasterSummary.scala      |   5 +-
 .../gearpump/streaming/StreamingTestUtil.scala  |   6 +-
 .../streaming/appmaster/AppMasterSpec.scala     |  19 +-
 .../appmaster/ExecutorManagerSpec.scala         |   2 +-
 33 files changed, 472 insertions(+), 397 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala
index 91c2675..c31f01f 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/AppDescription.scala
@@ -18,15 +18,16 @@
 
 package org.apache.gearpump.cluster
 
-import scala.reflect.ClassTag
+import java.io.Serializable
 
 import akka.actor.{Actor, ActorRef, ActorSystem}
 import com.typesafe.config.{Config, ConfigFactory}
-
 import org.apache.gearpump.cluster.appmaster.WorkerInfo
 import org.apache.gearpump.cluster.scheduler.Resource
 import org.apache.gearpump.jarstore.FilePath
 
+import scala.reflect.ClassTag
+
 /**
  * This contains all information to run an application
  *
@@ -38,8 +39,7 @@ import org.apache.gearpump.jarstore.FilePath
  *                      really need to change it, please use 
ClusterConfigSource(filePath) to
  *                      construct the object, while filePath points to the 
.conf file.
  */
-case class AppDescription(
-    name: String, appMaster: String, userConfig: UserConfig,
+case class AppDescription(name: String, appMaster: String, userConfig: 
UserConfig,
     clusterConfig: Config = ConfigFactory.empty())
 
 /**
@@ -96,8 +96,6 @@ abstract class ApplicationMaster extends Actor
  * @param appJar application Jar. If the jar is already in classpath, then it 
can be None.
  * @param masterProxy The proxy to master actor, it bridges the messages 
between appmaster
  *                    and master
- * @param registerData AppMaster are required to send this data to Master by 
when doing
- *                     RegisterAppMaster.
  */
 case class AppMasterContext(
     appId: Int,
@@ -105,8 +103,7 @@ case class AppMasterContext(
     resource: Resource,
     workerInfo: WorkerInfo,
     appJar: Option[AppJar],
-    masterProxy: ActorRef,
-    registerData: AppMasterRegisterData)
+    masterProxy: ActorRef)
 
 /**
  * Jar file container in the cluster
@@ -142,4 +139,26 @@ case class ExecutorContext(
 case class ExecutorJVMConfig(
     classPath: Array[String], jvmArguments: Array[String], mainClass: String,
     arguments: Array[String], jar: Option[AppJar], username: String,
-    executorAkkaConfig: Config = ConfigFactory.empty())
\ No newline at end of file
+    executorAkkaConfig: Config = ConfigFactory.empty())
+
+sealed abstract class ApplicationStatus(val status: String)
+  extends Serializable{
+  override def toString: String = status
+}
+
+sealed abstract class ApplicationTerminalStatus(override val status: String)
+  extends ApplicationStatus(status)
+
+object ApplicationStatus {
+  case object PENDING extends ApplicationStatus("pending")
+
+  case object ACTIVE extends ApplicationStatus("active")
+
+  case object SUCCEEDED extends ApplicationTerminalStatus("succeeded")
+
+  case object FAILED extends ApplicationTerminalStatus("failed")
+
+  case object TERMINATED extends ApplicationTerminalStatus("terminated")
+
+  case object NONEXIST extends ApplicationStatus("nonexist")
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
index 8aa84b5..73e0649 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
@@ -18,16 +18,14 @@
 
 package org.apache.gearpump.cluster
 
-import org.apache.gearpump.cluster.worker.{WorkerSummary, WorkerId}
+import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
 
 import scala.util.Try
-
 import akka.actor.ActorRef
 import com.typesafe.config.Config
-
 import org.apache.gearpump.TimeStamp
-import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterStatus
-import org.apache.gearpump.cluster.master.{MasterNode, MasterSummary}
+import org.apache.gearpump.cluster.appmaster.WorkerInfo
+import org.apache.gearpump.cluster.master.MasterSummary
 import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, 
ResourceRequest}
 import org.apache.gearpump.metrics.Metrics.MetricType
 
@@ -114,6 +112,11 @@ object ClientToMaster {
    * Request app master for a short list of cluster app that administrators 
should be aware of.
    */
   case class GetLastFailure(appId: Int)
+
+  /**
+   * Register a client to wait application's result
+   */
+  case class RegisterAppResultListener(appId: Int)
 }
 
 object MasterToClient {
@@ -155,26 +158,20 @@ object MasterToClient {
 
   /** Return the last error of this streaming application job */
   case class LastFailure(time: TimeStamp, error: String)
-}
 
-trait AppMasterRegisterData
+  sealed trait ApplicationResult
 
-object AppMasterToMaster {
+  case class ApplicationSucceeded(appId: Int) extends ApplicationResult
 
-  /**
-   * Activate the AppMaster when an application is ready to run.
-   * @param appId application id
-   */
-  case class ActivateAppMaster(appId: Int)
+  case class ApplicationFailed(appId: Int, error: Throwable) extends 
ApplicationResult
+}
+
+object AppMasterToMaster {
   
   /**
-   * Register an AppMaster by providing a ActorRef, and registerData
-   * @param registerData The registerData is provided by Master when starting 
the app master.
-   *                     App master should return the registerData back to 
master.
-   *                     Typically registerData hold some context information 
for this app Master.
+   * Register an AppMaster by providing a ActorRef, and workerInfo which is 
running on
    */
-
-  case class RegisterAppMaster(appMaster: ActorRef, registerData: 
AppMasterRegisterData)
+  case class RegisterAppMaster(appId: Int, appMaster: ActorRef, workerInfo: 
WorkerInfo)
 
   case class InvalidAppMaster(appId: Int, appMaster: String, reason: Throwable)
 
@@ -210,7 +207,7 @@ object AppMasterToMaster {
     def appId: Int
     def appName: String
     def actorPath: String
-    def status: AppMasterStatus
+    def status: ApplicationStatus
     def startTime: TimeStamp
     def uptime: TimeStamp
     def user: String
@@ -222,7 +219,7 @@ object AppMasterToMaster {
       appType: String = "general",
       appName: String = null,
       actorPath: String = null,
-      status: AppMasterStatus = MasterToAppMaster.AppMasterActive,
+      status: ApplicationStatus = ApplicationStatus.ACTIVE,
       startTime: TimeStamp = 0L,
       uptime: TimeStamp = 0L,
       user: String = null)
@@ -242,6 +239,12 @@ object AppMasterToMaster {
 
   /** Response to GetMasterData */
   case class MasterData(masterDescription: MasterSummary)
+
+  /**
+   * Denotes the application state change of an app.
+   */
+  case class ApplicationStatusChanged(appId: Int, newStatus: ApplicationStatus,
+      timeStamp: TimeStamp, error: Throwable)
 }
 
 object MasterToAppMaster {
@@ -258,17 +261,10 @@ object MasterToAppMaster {
   /** Shutdown the application job */
   case object ShutdownAppMaster
 
-  type AppMasterStatus = String
-  val AppMasterPending: AppMasterStatus = "pending"
-  val AppMasterActive: AppMasterStatus = "active"
-  val AppMasterInActive: AppMasterStatus = "inactive"
-  val AppMasterNonExist: AppMasterStatus = "nonexist"
-
   sealed trait StreamingType
-  case class AppMasterData(
-      status: AppMasterStatus, appId: Int = 0, appName: String = null, 
appMasterPath: String = null,
-      workerPath: String = null, submissionTime: TimeStamp = 0, startTime: 
TimeStamp = 0,
-      finishTime: TimeStamp = 0, user: String = null)
+  case class AppMasterData(status: ApplicationStatus, appId: Int = 0, appName: 
String = null,
+      appMasterPath: String = null, workerPath: String = null, submissionTime: 
TimeStamp = 0,
+      startTime: TimeStamp = 0, finishTime: TimeStamp = 0, user: String = null)
 
   case class AppMasterDataRequest(appId: Int, detail: Boolean = false)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
index 170e56a..946a4ae 100644
--- 
a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
+++ 
b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironment.scala
@@ -28,6 +28,8 @@ import org.apache.gearpump.cluster.master.MasterProxy
 import org.apache.gearpump.cluster.{AppDescription, AppMasterContext}
 import org.apache.gearpump.util.LogUtil
 
+import scala.concurrent.duration._
+
 /**
  * This serves as runtime environment for AppMaster.
  * When starting an AppMaster, we need to setup the connection to master,
@@ -50,11 +52,9 @@ class AppMasterRuntimeEnvironment(
     masterConnectionKeeperFactory: (MasterActorRef, RegisterAppMaster, 
ListenerActorRef) => Props)
   extends Actor {
 
-  val appId = appContextInput.appId
+  private val appId = appContextInput.appId
   private val LOG = LogUtil.getLogger(getClass, app = appId)
 
-  import scala.concurrent.duration._
-
   private val master = context.actorOf(
     masterFactory(appId, context.actorOf(Props(new MasterProxy(masters, 
30.seconds)))))
   private val appContext = appContextInput.copy(masterProxy = master)
@@ -63,24 +63,25 @@ class AppMasterRuntimeEnvironment(
   private val appMaster = context.actorOf(appMasterFactory(appContext, app))
   context.watch(appMaster)
 
-  private val registerAppMaster = RegisterAppMaster(appMaster, 
appContext.registerData)
+  private val registerAppMaster = RegisterAppMaster(appId, appMaster, 
appContext.workerInfo)
+
   private val masterConnectionKeeper = context.actorOf(
     masterConnectionKeeperFactory(master, registerAppMaster, self))
   context.watch(masterConnectionKeeper)
 
   def receive: Receive = {
     case MasterConnected =>
-      LOG.info(s"Master is connected, start AppMaster ${appId}...")
+      LOG.info(s"Master is connected, start AppMaster $appId...")
       appMaster ! StartAppMaster
     case MasterStopped =>
-      LOG.error(s"Master is stopped, stop AppMaster ${appId}...")
+      LOG.error(s"Master is stopped, stop AppMaster $appId...")
       context.stop(self)
     case Terminated(actor) => actor match {
       case `appMaster` =>
-        LOG.error(s"AppMaster ${appId} is stopped, shutdown myself")
+        LOG.error(s"AppMaster $appId is stopped, shutdown myself")
         context.stop(self)
       case `masterConnectionKeeper` =>
-        LOG.error(s"Master connection keeper is stopped, appId: ${appId}, 
shutdown myself")
+        LOG.error(s"Master connection keeper is stopped, appId: $appId, 
shutdown myself")
         context.stop(self)
       case _ => // Skip
     }
@@ -89,9 +90,8 @@ class AppMasterRuntimeEnvironment(
 
 object AppMasterRuntimeEnvironment {
 
-  def props(
-      masters: Iterable[ActorPath], app: AppDescription, appContextInput: 
AppMasterContext)
-    : Props = {
+  def props(masters: Iterable[ActorPath], app: AppDescription, 
appContextInput: AppMasterContext
+      ): Props = {
 
     val master = (appId: AppId, masterProxy: MasterActorRef) =>
       MasterWithExecutorSystemProvider.props(appId, masterProxy)
@@ -103,8 +103,8 @@ object AppMasterRuntimeEnvironment {
       RegisterAppMaster, listener: ListenerActorRef) => Props(new 
MasterConnectionKeeper(
         registerAppMaster, master, masterStatusListener = listener))
 
-    Props(new AppMasterRuntimeEnvironment(
-      appContextInput, app, masters, master, appMaster, 
masterConnectionKeeper))
+    Props(new AppMasterRuntimeEnvironment(appContextInput, app, masters,
+      master, appMaster, masterConnectionKeeper))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala
deleted file mode 100644
index b3ec88c..0000000
--- 
a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeInfo.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.appmaster
-
-import akka.actor.ActorRef
-import com.typesafe.config.Config
-
-import org.apache.gearpump._
-import org.apache.gearpump.cluster.AppMasterRegisterData
-
-/** Run time info used to start an AppMaster */
-case class AppMasterRuntimeInfo(
-    appId: Int,
-    // AppName is the unique Id for an application
-    appName: String,
-    worker: ActorRef = null,
-    user: String = null,
-    submissionTime: TimeStamp = 0,
-    startTime: TimeStamp = 0,
-    finishTime: TimeStamp = 0,
-    config: Config = null)
-  extends AppMasterRegisterData

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationMetaData.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationMetaData.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationMetaData.scala
new file mode 100644
index 0000000..b011a0d
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationMetaData.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import org.apache.gearpump.cluster.{AppDescription, AppJar}
+import akka.routing.MurmurHash._
+
+/**
+ * The meta data of an application, which stores the crucial infomation of how 
to launch
+ * the application, like the application jar file location. This data is 
distributed
+ * across the masters.
+ */
+case class ApplicationMetaData(appId: Int, attemptId: Int, appDesc: 
AppDescription,
+    jar: Option[AppJar], username: String)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala
new file mode 100644
index 0000000..d9b73e2
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.cluster.appmaster
+
+import akka.actor.ActorRef
+import com.typesafe.config.{Config, ConfigFactory}
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.cluster.{ApplicationStatus, 
ApplicationTerminalStatus}
+
+/** Run time info of Application */
+case class ApplicationRuntimeInfo(
+    appId: Int,
+    // AppName is the unique Id for an application
+    appName: String,
+    appMaster: ActorRef = ActorRef.noSender,
+    worker: ActorRef = ActorRef.noSender,
+    user: String = "",
+    submissionTime: TimeStamp = 0,
+    startTime: TimeStamp = 0,
+    finishTime: TimeStamp = 0,
+    config: Config = ConfigFactory.empty(),
+    status: ApplicationStatus = ApplicationStatus.NONEXIST) {
+
+  def onAppMasterRegistered(appMaster: ActorRef, worker: ActorRef): 
ApplicationRuntimeInfo = {
+    this.copy(appMaster = appMaster, worker = worker)
+  }
+
+  def onAppMasterActivated(timeStamp: TimeStamp): ApplicationRuntimeInfo = {
+    this.copy(startTime = timeStamp, status = ApplicationStatus.ACTIVE)
+  }
+
+  def onFinalStatus(timeStamp: TimeStamp, finalStatus: 
ApplicationTerminalStatus):
+    ApplicationRuntimeInfo = {
+    this.copy(finishTime = timeStamp, status = finalStatus)
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationState.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationState.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationState.scala
deleted file mode 100644
index 7240113..0000000
--- 
a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationState.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.cluster.appmaster
-
-import org.apache.gearpump.cluster.{AppDescription, AppJar}
-
-/**
- * This state for single application, it is be distributed across the masters.
- */
-case class ApplicationState(
-    appId: Int, appName: String, attemptId: Int, app: AppDescription, jar: 
Option[AppJar],
-    username: String, state: Any) extends Serializable {
-
-  override def equals(other: Any): Boolean = {
-    other match {
-      case that: ApplicationState =>
-        if (appId == that.appId && attemptId == that.attemptId) {
-          true
-        } else {
-          false
-        }
-      case _ =>
-        false
-    }
-  }
-
-  override def hashCode: Int = {
-    import akka.routing.MurmurHash._
-    extendHash(appId, attemptId, startMagicA, startMagicB)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
index 153c824..973e1e8 100644
--- 
a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
+++ 
b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala
@@ -17,16 +17,19 @@
  */
 package org.apache.gearpump.cluster.client
 
+import akka.actor.{ActorRef, ActorSystem}
 import akka.pattern.ask
-import akka.actor.ActorRef
 import akka.util.Timeout
-import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, 
ShutdownApplication}
-import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, 
ShutdownApplicationResult}
-import org.apache.gearpump.util.ActorUtil
+import org.apache.gearpump.cluster.ClientToMaster.{RegisterAppResultListener, 
ResolveAppId, ShutdownApplication}
+import org.apache.gearpump.cluster.MasterToClient._
+import org.apache.gearpump.cluster.client.RunningApplication._
+import org.apache.gearpump.util.{ActorUtil, LogUtil}
+import org.slf4j.Logger
 
+import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.Future
+import scala.concurrent.duration._
 import scala.util.{Failure, Success}
-import scala.concurrent.ExecutionContext.Implicits.global
 
 class RunningApplication(val appId: Int, master: ActorRef, timeout: Timeout) {
   lazy val appMaster: Future[ActorRef] = resolveAppMaster(appId)
@@ -40,6 +43,21 @@ class RunningApplication(val appId: Int, master: ActorRef, 
timeout: Timeout) {
     }
   }
 
+  /**
+   * This funtion will block until the application finished or failed.
+   * If failed, an exception will be thrown out
+   */
+  def waitUntilFinish(): Unit = {
+    val result = ActorUtil.askActor[ApplicationResult](master,
+      RegisterAppResultListener(appId), INF_TIMEOUT)
+    result match {
+      case failed: ApplicationFailed =>
+        throw failed.error
+      case _ =>
+        LOG.info(s"Application $appId succeeded")
+    }
+  }
+
   def askAppMaster[T](msg: Any): Future[T] = {
     appMaster.flatMap(_.ask(msg)(timeout).asInstanceOf[Future[T]])
   }
@@ -50,3 +68,9 @@ class RunningApplication(val appId: Int, master: ActorRef, 
timeout: Timeout) {
   }
 }
 
+object RunningApplication {
+  private val LOG: Logger = LogUtil.getLogger(getClass)
+  // This magic number is derived from Akka's configuration, which is the 
maximum delay
+  private val INF_TIMEOUT = new Timeout(2147482 seconds)
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
index 0ae7365..24e70dd 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
@@ -20,13 +20,16 @@ package org.apache.gearpump.cluster.master
 
 import akka.actor._
 import akka.pattern.ask
+import com.typesafe.config.ConfigFactory
+import org.apache.gearpump._
 import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, 
SaveAppDataFailed, _}
 import org.apache.gearpump.cluster.AppMasterToWorker._
+import org.apache.gearpump.cluster.{ApplicationStatus, 
ApplicationTerminalStatus}
 import org.apache.gearpump.cluster.ClientToMaster._
 import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, 
AppMasterDataRequest, AppMastersDataRequest, _}
 import org.apache.gearpump.cluster.MasterToClient._
 import org.apache.gearpump.cluster.WorkerToAppMaster.{ShutdownExecutorFailed, 
_}
-import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeInfo, 
ApplicationState}
+import org.apache.gearpump.cluster.appmaster.{ApplicationMetaData, 
ApplicationRuntimeInfo}
 import org.apache.gearpump.cluster.master.AppManager._
 import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKVResult, 
PutKVResult, PutKVSuccess, _}
 import org.apache.gearpump.cluster.master.Master._
@@ -46,7 +49,6 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
 
   private val LOG: Logger = LogUtil.getLogger(getClass)
 
-  private val EXECUTOR_ID: Int = APPMASTER_DEFAULT_EXECUTOR_ID
   private val appMasterMaxRetries: Int = 5
   private val appMasterRetryTimeRange: Duration = 20.seconds
 
@@ -56,15 +58,8 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
   // Next available appId
   private var nextAppId: Int = 1
 
-  // From appId to appMaster data
-  // Applications not in activeAppMasters or deadAppMasters are in pending 
status
-  private var appMasterRegistry = Map.empty[Int, (ActorRef, 
AppMasterRuntimeInfo)]
-
-  // Active appMaster list where applications are in active status
-  private var activeAppMasters = Set.empty[Int]
-
-  // Dead appMaster list where applications are in inactive status
-  private var deadAppMasters = Set.empty[Int]
+  private var applicationRegistry = Map.empty[Int, ApplicationRuntimeInfo]
+  private var appResultListeners = Map.empty[Int, List[ActorRef]]
 
   private var appMasterRestartPolicies = Map.empty[Int, RestartPolicy]
 
@@ -78,9 +73,7 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
       val masterState = result.asInstanceOf[MasterState]
       if (masterState != null) {
         this.nextAppId = masterState.maxId + 1
-        this.activeAppMasters = masterState.activeAppMasters
-        this.deadAppMasters = masterState.deadAppMasters
-        this.appMasterRegistry = masterState.appMasterRegistry
+        this.applicationRegistry = masterState.applicationRegistry
       }
       context.become(receiveHandler)
       unstashAll()
@@ -107,25 +100,33 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
         client ! SubmitApplicationResult(Failure(
           new Exception(s"Application name ${app.name} already existed")))
       } else {
-        context.actorOf(launcher.props(nextAppId, EXECUTOR_ID, app, jar, 
username, context.parent,
-          Some(client)), s"launcher${nextAppId}_${Util.randInt()}")
-
-        val appState = new ApplicationState(nextAppId, app.name, 0, app, jar, 
username, null)
+        context.actorOf(launcher.props(nextAppId, 
APPMASTER_DEFAULT_EXECUTOR_ID, app, jar, username,
+          context.parent, Some(client)), 
s"launcher${nextAppId}_${Util.randInt()}")
         appMasterRestartPolicies += nextAppId ->
           new RestartPolicy(appMasterMaxRetries, appMasterRetryTimeRange)
-        kvService ! PutKV(nextAppId.toString, APP_STATE, appState)
+
+        val appRuntimeInfo = ApplicationRuntimeInfo(nextAppId, app.name,
+          user = username,
+          submissionTime = System.currentTimeMillis(),
+          config = app.clusterConfig,
+          status = ApplicationStatus.PENDING)
+        applicationRegistry += nextAppId -> appRuntimeInfo
+        val appMetaData = ApplicationMetaData(nextAppId, 0, app, jar, username)
+        kvService ! PutKV(nextAppId.toString, APP_METADATA, appMetaData)
+
         nextAppId += 1
+        kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(nextAppId, 
applicationRegistry))
       }
 
     case RestartApplication(appId) =>
       val client = sender()
-      (kvService ? GetKV(appId.toString, 
APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
+      (kvService ? GetKV(appId.toString, 
APP_METADATA)).asInstanceOf[Future[GetKVResult]].map {
         case GetKVSuccess(_, result) =>
-          val appState = result.asInstanceOf[ApplicationState]
-          if (appState != null) {
+          val metaData = result.asInstanceOf[ApplicationMetaData]
+          if (metaData != null) {
             LOG.info(s"Shutting down the application (restart), $appId")
             self ! ShutdownApplication(appId)
-            self.tell(SubmitApplication(appState.app, appState.jar, 
appState.username), client)
+            self.tell(SubmitApplication(metaData.appDesc, metaData.jar, 
metaData.username), client)
           } else {
             client ! SubmitApplicationResult(Failure(
               new Exception(s"Failed to restart, because the application 
$appId does not exist.")
@@ -140,19 +141,16 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
 
     case ShutdownApplication(appId) =>
       LOG.info(s"App Manager Shutting down application $appId")
-      val (_, appInfo) = appMasterRegistry.get(appId)
-        .filter { case (_, info) => !deadAppMasters.contains(info.appId)}
-        .getOrElse((null, null))
-      Option(appInfo) match {
+      val appInfo = applicationRegistry.get(appId).
+        filter(!_.status.isInstanceOf[ApplicationTerminalStatus])
+      appInfo match {
         case Some(info) =>
-          val worker = info.worker
-          val workerPath = Option(worker).map(_.path).orNull
-          LOG.info(s"Shutdown AppMaster at $workerPath, appId: $appId, 
executorId: $EXECUTOR_ID")
-          cleanApplicationData(appId)
-          val shutdown = ShutdownExecutor(appId, EXECUTOR_ID,
-            s"AppMaster $appId shutdown requested by master...")
-          sendMsgWithTimeOutCallBack(worker, shutdown, 30000, 
shutDownExecutorTimeOut())
+          killAppMaster(appId, info.worker)
           sender ! ShutdownApplicationResult(Success(appId))
+          // Here we use the function to make sure the status is consistent 
because
+          // sending another message to self will involve timing problem
+          this.onApplicationStatusChanged(appId, ApplicationStatus.TERMINATED,
+            System.currentTimeMillis(), null)
         case None =>
           val errorMsg = s"Failed to find registration information for appId: 
$appId"
           LOG.error(errorMsg)
@@ -160,54 +158,49 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
       }
 
     case ResolveAppId(appId) =>
-      val (appMaster, _) = appMasterRegistry.getOrElse(appId, (null, null))
-      if (null != appMaster) {
-        sender ! ResolveAppIdResult(Success(appMaster))
-      } else {
-        sender ! ResolveAppIdResult(Failure(new Exception(s"Can not find 
Application: $appId")))
+      val appMaster = applicationRegistry.get(appId).map(_.appMaster)
+      appMaster match {
+        case Some(appMasterActor) =>
+          sender ! ResolveAppIdResult(Success(appMasterActor))
+        case None =>
+          sender ! ResolveAppIdResult(Failure(new Exception(s"Can not find 
Application: $appId")))
       }
 
     case AppMastersDataRequest =>
       var appMastersData = collection.mutable.ListBuffer[AppMasterData]()
-      appMasterRegistry.foreach(pair => {
-        val (id, (appMaster: ActorRef, info: AppMasterRuntimeInfo)) = pair
-        val appMasterPath = ActorUtil.getFullPath(context.system, 
appMaster.path)
+      applicationRegistry.foreach(pair => {
+        val (id, info: ApplicationRuntimeInfo) = pair
+        val appMasterPath = ActorUtil.getFullPath(context.system, 
info.appMaster)
         val workerPath = Option(info.worker).map(worker =>
-          ActorUtil.getFullPath(context.system, worker.path))
-        val status = getAppMasterStatus(id)
+          ActorUtil.getFullPath(context.system, worker))
         appMastersData += AppMasterData(
-          status, id, info.appName, appMasterPath, workerPath.orNull,
+          info.status, id, info.appName, appMasterPath, workerPath.orNull,
           info.submissionTime, info.startTime, info.finishTime, info.user)
       })
-
       sender ! AppMastersData(appMastersData.toList)
 
     case QueryAppMasterConfig(appId) =>
-      val config =
-        if (appMasterRegistry.contains(appId)) {
-          val (_, info) = appMasterRegistry(appId)
-          info.config
-        } else {
-          null
-        }
+      val config = 
applicationRegistry.get(appId).map(_.config).getOrElse(ConfigFactory.empty())
       sender ! AppMasterConfig(config)
 
     case appMasterDataRequest: AppMasterDataRequest =>
       val appId = appMasterDataRequest.appId
-      val appStatus = getAppMasterStatus(appId)
-
-      appStatus match {
-        case AppMasterNonExist =>
-          sender ! AppMasterData(AppMasterNonExist)
-        case _ =>
-          val (appMaster, info) = appMasterRegistry(appId)
-          val appMasterPath = ActorUtil.getFullPath(context.system, 
appMaster.path)
+      val appRuntimeInfo = applicationRegistry.get(appId)
+      appRuntimeInfo match {
+        case Some(info) =>
+          val appMasterPath = ActorUtil.getFullPath(context.system, 
info.appMaster.path)
           val workerPath = Option(info.worker).map(
             worker => ActorUtil.getFullPath(context.system, 
worker.path)).orNull
           sender ! AppMasterData(
-            appStatus, appId, info.appName, appMasterPath, workerPath,
+            info.status, appId, info.appName, appMasterPath, workerPath,
             info.submissionTime, info.startTime, info.finishTime, info.user)
+        case None =>
+          sender ! AppMasterData(ApplicationStatus.NONEXIST)
       }
+
+    case RegisterAppResultListener(appId) =>
+      val listenerList = appResultListeners.getOrElse(appId, 
List.empty[ActorRef])
+      appResultListeners += appId -> (listenerList :+ sender())
   }
 
   def workerMessage: Receive = {
@@ -217,40 +210,62 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
       LOG.error(failed.reason)
   }
 
-  private def getAppMasterStatus(appId: Int): AppMasterStatus = {
-    if (activeAppMasters.contains(appId)) {
-      AppMasterActive
-    } else if (deadAppMasters.contains(appId)) {
-      AppMasterInActive
-    } else if (appMasterRegistry.contains(appId)) {
-      AppMasterPending
-    } else {
-      AppMasterNonExist
-    }
-  }
+  def appMasterMessage: Receive = {
+    case RegisterAppMaster(appId, appMaster, workerInfo) =>
+      val appInfo = applicationRegistry.get(appId)
+      appInfo match {
+        case Some(info) =>
+          LOG.info(s"Register AppMaster for app: $appId")
+          val updatedInfo = info.onAppMasterRegistered(appMaster, 
workerInfo.ref)
+          context.watch(appMaster)
+          applicationRegistry += appId -> updatedInfo
+          kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(nextAppId, 
applicationRegistry))
+          sender ! AppMasterRegistered(appId)
+        case None =>
+          LOG.error(s"Can not find submitted application $appId")
+      }
 
-  private def shutDownExecutorTimeOut(): Unit = {
-    LOG.error(s"Shut down executor time out")
+    case ApplicationStatusChanged(appId, newStatus, timeStamp, error) =>
+      onApplicationStatusChanged(appId, newStatus, timeStamp, error)
   }
 
-  def appMasterMessage: Receive = {
-    case RegisterAppMaster(appMaster, registerBack: AppMasterRuntimeInfo) =>
-      val startTime = System.currentTimeMillis()
-      val register = registerBack.copy(startTime = startTime)
-
-      LOG.info(s"Register AppMaster for app: ${register.appId}, $register")
-      context.watch(appMaster)
-      appMasterRegistry += register.appId -> (appMaster, register)
-      kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
-        MasterState(nextAppId, appMasterRegistry, activeAppMasters, 
deadAppMasters))
-      sender ! AppMasterRegistered(register.appId)
-
-    case ActivateAppMaster(appId) =>
-      LOG.info(s"Activate AppMaster for app $appId")
-      activeAppMasters += appId
-      kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
-        MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, 
deadAppMasters))
-      sender ! AppMasterActivated(appId)
+  private def onApplicationStatusChanged(appId: Int, newStatus: 
ApplicationStatus,
+      timeStamp: TimeStamp, error: Throwable): Unit = {
+    applicationRegistry.get(appId) match {
+      case Some(appRuntimeInfo) =>
+        var updatedStatus: ApplicationRuntimeInfo = null
+        LOG.info(s"Application $appId change to ${newStatus.toString} at 
$timeStamp")
+        newStatus match {
+          case ApplicationStatus.ACTIVE =>
+            updatedStatus = appRuntimeInfo.onAppMasterActivated(timeStamp)
+            sender ! AppMasterActivated(appId)
+          case [email protected] =>
+            killAppMaster(appId, appRuntimeInfo.worker)
+            updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, succeeded)
+            appResultListeners.getOrElse(appId, List.empty).foreach{ client =>
+              client ! ApplicationSucceeded(appId)
+            }
+          case [email protected] =>
+            killAppMaster(appId, appRuntimeInfo.worker)
+            updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, failed)
+            appResultListeners.getOrElse(appId, List.empty).foreach{ client =>
+              client ! ApplicationFailed(appId, error)
+            }
+          case [email protected] =>
+            updatedStatus = appRuntimeInfo.onFinalStatus(timeStamp, terminated)
+          case status =>
+            LOG.error(s"App $appId should not change it's status to $status")
+        }
+
+        if (newStatus.isInstanceOf[ApplicationTerminalStatus]) {
+          kvService ! DeleteKVGroup(appId.toString)
+        }
+        applicationRegistry += appId -> updatedStatus
+        kvService ! PutKV(MASTER_GROUP, MASTER_STATE, MasterState(nextAppId, 
applicationRegistry))
+      case None =>
+        LOG.error(s"Can not find application runtime info for appId $appId 
when it's " +
+          s"status changed to ${newStatus.toString}")
+    }
   }
 
   def appDataStoreService: Receive = {
@@ -265,7 +280,7 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
     case GetAppData(appId, key) =>
       val client = sender()
       (kvService ? GetKV(appId.toString, 
key)).asInstanceOf[Future[GetKVResult]].map {
-        case GetKVSuccess(privateKey, value) =>
+        case GetKVSuccess(_, value) =>
           client ! GetAppDataResult(key, value)
         case GetKVFailed(ex) =>
           client ! GetAppDataResult(key, null)
@@ -279,20 +294,23 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
 
       // Now we assume that the only normal way to stop the application is 
submitting a
       // ShutdownApplication request
-      val application = appMasterRegistry.find { appInfo =>
-        val (_, (actorRef, _)) = appInfo
-        actorRef.compareTo(terminate.actor) == 0
+      val application = applicationRegistry.find { appInfo =>
+        val (_, runtimeInfo) = appInfo
+        terminate.actor.equals(runtimeInfo.appMaster) &&
+          !runtimeInfo.status.isInstanceOf[ApplicationTerminalStatus]
       }
       if (application.nonEmpty) {
         val appId = application.get._1
-        (kvService ? GetKV(appId.toString, 
APP_STATE)).asInstanceOf[Future[GetKVResult]].map {
+        (kvService ? GetKV(appId.toString, 
APP_METADATA)).asInstanceOf[Future[GetKVResult]].map {
           case GetKVSuccess(_, result) =>
-            val appState = result.asInstanceOf[ApplicationState]
-            if (appState != null) {
+            val appMetadata = result.asInstanceOf[ApplicationMetaData]
+            if (appMetadata != null) {
               LOG.info(s"Recovering application, $appId")
-              self ! RecoverApplication(appState)
+              val updatedInfo = application.get._2.copy(status = 
ApplicationStatus.PENDING)
+              applicationRegistry += appId -> updatedInfo
+              self ! RecoverApplication(appMetadata)
             } else {
-              LOG.error(s"Cannot find application state for $appId")
+              LOG.error(s"Cannot find application meta data for $appId")
             }
           case GetKVFailed(ex) =>
             LOG.error(s"Cannot find master state to recover")
@@ -305,50 +323,41 @@ private[cluster] class AppManager(kvService: ActorRef, 
launcher: AppMasterLaunch
       val appId = state.appId
       if (appMasterRestartPolicies.get(appId).get.allowRestart) {
         LOG.info(s"AppManager Recovering Application $appId...")
-        activeAppMasters -= appId
         kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
-          MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, 
deadAppMasters))
-        context.actorOf(launcher.props(appId, EXECUTOR_ID, state.app, 
state.jar, state.username,
-          context.parent, None), s"launcher${appId}_${Util.randInt()}")
+          MasterState(this.nextAppId, applicationRegistry))
+        context.actorOf(launcher.props(appId, APPMASTER_DEFAULT_EXECUTOR_ID, 
state.appDesc,
+          state.jar, state.username, context.parent, None), 
s"launcher${appId}_${Util.randInt()}")
       } else {
         LOG.error(s"Application $appId failed too many times")
       }
   }
 
-  case class RecoverApplication(applicationStatus: ApplicationState)
-
-  private def cleanApplicationData(appId: Int): Unit = {
-    if (appMasterRegistry.contains(appId)) {
-      // Add the dead app to dead appMasters
-      deadAppMasters += appId
-      // Remove the dead app from active appMasters
-      activeAppMasters -= appId
-
-      appMasterRegistry += appId -> {
-        val (ref, info) = appMasterRegistry(appId)
-        (ref, info.copy(finishTime = System.currentTimeMillis()))
-      }
-      kvService ! PutKV(MASTER_GROUP, MASTER_STATE,
-        MasterState(this.nextAppId, appMasterRegistry, activeAppMasters, 
deadAppMasters))
-      kvService ! DeleteKVGroup(appId.toString)
-    }
+  private def killAppMaster(appId: Int, worker: ActorRef): Unit = {
+    val workerPath = Option(worker).map(_.path).orNull
+    LOG.info(s"Shutdown AppMaster at $workerPath, appId: $appId, executorId:" +
+      s" $APPMASTER_DEFAULT_EXECUTOR_ID")
+    val shutdown = ShutdownExecutor(appId, APPMASTER_DEFAULT_EXECUTOR_ID,
+      s"AppMaster $appId shutdown requested by master...")
+    sendMsgWithTimeOutCallBack(worker, shutdown, 30000, 
shutDownExecutorTimeOut())
   }
 
   private def applicationNameExist(appName: String): Boolean = {
-    appMasterRegistry.values.exists { case (_, info) =>
-      info.appName == appName && !deadAppMasters.contains(info.appId)
+    applicationRegistry.values.exists { info =>
+      info.appName == appName && 
!info.status.isInstanceOf[ApplicationTerminalStatus]
     }
   }
+
+  private def shutDownExecutorTimeOut(): Unit = {
+    LOG.error(s"Shut down executor time out")
+  }
 }
 
 object AppManager {
-  final val APP_STATE = "app_state"
+  final val APP_METADATA = "app_metadata"
   // The id is used in KVStore
   final val MASTER_STATE = "master_state"
 
-  case class MasterState(
-      maxId: Int,
-      appMasterRegistry: Map[Int, (ActorRef, AppMasterRuntimeInfo)],
-      activeAppMasters: Set[Int],
-      deadAppMasters: Set[Int])
+  case class RecoverApplication(appMetaData: ApplicationMetaData)
+
+  case class MasterState(maxId: Int, applicationRegistry: Map[Int, 
ApplicationRuntimeInfo])
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala
 
b/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala
index 9305d5c..2d79558 100644
--- 
a/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala
+++ 
b/core/src/main/scala/org/apache/gearpump/cluster/master/AppMasterLauncher.scala
@@ -34,7 +34,7 @@ import 
org.apache.gearpump.cluster.AppMasterToWorker.{LaunchExecutor, ShutdownEx
 import org.apache.gearpump.cluster.MasterToAppMaster.ResourceAllocated
 import org.apache.gearpump.cluster.MasterToClient.SubmitApplicationResult
 import org.apache.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
-import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, 
AppMasterRuntimeInfo, WorkerInfo}
+import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, 
ApplicationRuntimeInfo, WorkerInfo}
 import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, 
ResourceRequest}
 import org.apache.gearpump.cluster.{AppDescription, AppJar, _}
 import org.apache.gearpump.transport.HostPort
@@ -43,7 +43,6 @@ import org.apache.gearpump.util.Constants._
 import org.apache.gearpump.util.{ActorSystemBooter, ActorUtil, LogUtil, Util}
 
 /**
- *
  * AppMasterLauncher is a child Actor of AppManager, it is responsible
  * to launch the AppMaster on the cluster.
  */
@@ -53,11 +52,11 @@ class AppMasterLauncher(
   extends Actor {
   private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
 
-  val scheduler = context.system.scheduler
-  val systemConfig = context.system.settings.config
-  val TIMEOUT = Duration(15, TimeUnit.SECONDS)
+  private val scheduler = context.system.scheduler
+  private val systemConfig = context.system.settings.config
+  private val TIMEOUT = Duration(15, TimeUnit.SECONDS)
 
-  val appMasterAkkaConfig: Config = app.clusterConfig
+  private val appMasterAkkaConfig: Config = app.clusterConfig
 
   LOG.info(s"Ask Master resource to start AppMaster $appId...")
   master ! RequestResource(appId, ResourceRequest(Resource(1), 
WorkerId.unspecified))
@@ -66,18 +65,12 @@ class AppMasterLauncher(
 
   def waitForResourceAllocation: Receive = {
     case ResourceAllocated(allocations) =>
-
       val ResourceAllocation(resource, worker, workerId) = allocations(0)
-      LOG.info(s"Resource allocated for appMaster $appId on worker 
${workerId}(${worker.path})")
-
-      val submissionTime = System.currentTimeMillis()
+      LOG.info(s"Resource allocated for appMaster $appId on worker 
$workerId(${worker.path})")
 
-      val appMasterInfo = AppMasterRuntimeInfo(appId, app.name, worker, 
username,
-        submissionTime, config = appMasterAkkaConfig)
       val workerInfo = WorkerInfo(workerId, worker)
-      val appMasterContext =
-        AppMasterContext(appId, username, resource, workerInfo, jar, null, 
appMasterInfo)
-      LOG.info(s"Try to launch a executor for AppMaster on worker ${workerId} 
for app $appId")
+      val appMasterContext = AppMasterContext(appId, username, resource, 
workerInfo, jar, null)
+      LOG.info(s"Try to launch a executor for AppMaster on worker $workerId 
for app $appId")
       val name = ActorUtil.actorNameForExecutor(appId, executorId)
       val selfPath = ActorUtil.getFullPath(context.system, self.path)
 
@@ -88,12 +81,11 @@ class AppMasterLauncher(
         username, appMasterAkkaConfig)
 
       worker ! LaunchExecutor(appId, executorId, resource, executorJVM)
-      context.become(waitForActorSystemToStart(worker, appMasterContext, 
app.userConfig, resource))
+      context.become(waitForActorSystemToStart(worker, appMasterContext, 
resource))
   }
 
-  def waitForActorSystemToStart(
-      worker: ActorRef, appContext: AppMasterContext, user: UserConfig, 
resource: Resource)
-    : Receive = {
+  def waitForActorSystemToStart(worker: ActorRef, appContext: AppMasterContext,
+      resource: Resource): Receive = {
     case ExecutorLaunchRejected(reason, ex) =>
       LOG.error(s"Executor Launch failed reason: $reason", ex)
       LOG.info(s"reallocate resource $resource to start appmaster")
@@ -105,8 +97,8 @@ class AppMasterLauncher(
 
       val masterAddress = systemConfig.getStringList(GEARPUMP_CLUSTER_MASTERS)
         .asScala.map(HostPort(_)).map(ActorUtil.getMasterActorPath)
-      sender ! CreateActor(
-        AppMasterRuntimeEnvironment.props(masterAddress, app, appContext), 
s"appdaemon$appId")
+      sender ! CreateActor(AppMasterRuntimeEnvironment.props(masterAddress, 
app, appContext),
+        s"appdaemon$appId")
 
       import context.dispatcher
       val appMasterTimeout = scheduler.scheduleOnce(TIMEOUT, self,
@@ -121,7 +113,7 @@ class AppMasterLauncher(
       LOG.info(s"AppMaster is created, mission complete...")
       replyToClient(SubmitApplicationResult(Success(appId)))
       context.stop(self)
-    case CreateActorFailed(name, reason) =>
+    case CreateActorFailed(_, reason) =>
       cancel.cancel()
       worker ! ShutdownExecutor(appId, executorId, reason.getMessage)
       replyToClient(SubmitApplicationResult(Failure(reason)))
@@ -129,9 +121,7 @@ class AppMasterLauncher(
   }
 
   def replyToClient(result: SubmitApplicationResult): Unit = {
-    if (client.isDefined) {
-      client.get.tell(result, master)
-    }
+    client.foreach(_.tell(result, master))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala 
b/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
index 6b4df07..8da417e 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/Master.scala
@@ -184,8 +184,6 @@ private[cluster] class Master extends Actor with Stash {
       scheduler forward request
     case registerAppMaster: RegisterAppMaster =>
       appManager forward registerAppMaster
-    case activateAppMaster: ActivateAppMaster =>
-      appManager forward activateAppMaster
     case save: SaveAppData =>
       appManager forward save
     case get: GetAppData =>
@@ -215,6 +213,8 @@ private[cluster] class Master extends Actor with Stash {
 
     case invalidAppMaster: InvalidAppMaster =>
       appManager forward invalidAppMaster
+    case statusChanged: ApplicationStatusChanged =>
+      appManager forward statusChanged
   }
 
   import scala.util.{Failure, Success}
@@ -257,6 +257,8 @@ private[cluster] class Master extends Actor with Stash {
       appManager forward query
     case QueryMasterConfig =>
       sender ! MasterConfig(ClusterConfig.filterOutDefaultConfig(systemConfig))
+    case register: RegisterAppResultListener =>
+      appManager forward register
   }
 
   def disassociated: Receive = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala 
b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
index 82c7fe2..0f49a59 100644
--- a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.util
 
-import org.apache.gearpump.cluster.AppMasterContext
+import org.apache.gearpump.cluster.{ApplicationStatus, AppMasterContext}
 import org.apache.gearpump.cluster.worker.WorkerId
 
 import scala.concurrent.{Await, ExecutionContext, Future}
@@ -27,7 +27,7 @@ import akka.actor._
 import akka.pattern.ask
 import org.slf4j.Logger
 import akka.util.Timeout
-import org.apache.gearpump.cluster.AppMasterToMaster.{ActivateAppMaster, 
GetAllWorkers}
+import 
org.apache.gearpump.cluster.AppMasterToMaster.{ApplicationStatusChanged, 
GetAllWorkers}
 import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, 
ResolveWorkerId}
 import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList
 import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, 
ResolveWorkerIdResult}
@@ -44,6 +44,14 @@ object ActorUtil {
     system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
   }
 
+  def getFullPath(system: ActorSystem, actorRef: ActorRef): String = {
+    if (actorRef != ActorRef.noSender) {
+      getFullPath(system, actorRef.path)
+    } else {
+      ""
+    }
+  }
+
   def getFullPath(system: ActorSystem, path: ActorPath): String = {
     path.toStringWithAddress(getSystemAddress(system))
   }
@@ -102,7 +110,8 @@ object ActorUtil {
   def tellMasterIfApplicationReady(workerNum: Option[Int], executorSystemNum: 
Int,
       appContext: AppMasterContext): Unit = {
     if (workerNum.contains(executorSystemNum)) {
-      appContext.masterProxy ! ActivateAppMaster(appContext.appId)
+      appContext.masterProxy ! ApplicationStatusChanged(appContext.appId, 
ApplicationStatus.ACTIVE,
+        System.currentTimeMillis(), null)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
 
b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
index f9b0762..ab0275a 100644
--- 
a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
+++ 
b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppManagerSpec.scala
@@ -21,13 +21,14 @@ package org.apache.gearpump.cluster.appmaster
 import akka.actor.{Actor, ActorRef, Props}
 import akka.testkit.TestProbe
 import com.typesafe.config.Config
-import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, _}
+import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, 
RegisterAppMaster, _}
 import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, 
ShutdownApplication, SubmitApplication}
 import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, 
AppMasterRegistered, AppMastersData, AppMastersDataRequest, _}
 import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, 
ShutdownApplicationResult, SubmitApplicationResult}
-import org.apache.gearpump.cluster.master.{AppMasterLauncherFactory, 
AppManager}
+import org.apache.gearpump.cluster.master.{AppManager, 
AppMasterLauncherFactory}
 import org.apache.gearpump.cluster.master.AppManager._
 import org.apache.gearpump.cluster.master.InMemoryKVService.{GetKV, 
GetKVSuccess, PutKV, PutKVSuccess}
+import org.apache.gearpump.cluster.worker.WorkerId
 import org.apache.gearpump.cluster.{TestUtil, _}
 import org.apache.gearpump.util.LogUtil
 import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
@@ -51,7 +52,7 @@ class AppManagerSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach with
     appManager = getActorSystem.actorOf(Props(new AppManager(kvService.ref,
       new DummyAppMasterLauncherFactory(appLauncher))))
     kvService.expectMsgType[GetKV]
-    kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty, 
Set.empty, Set.empty)))
+    kvService.reply(GetKVSuccess(MASTER_STATE, MasterState(0, Map.empty)))
   }
 
   override def afterEach(): Unit = {
@@ -59,14 +60,24 @@ class AppManagerSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach with
   }
 
   "AppManager" should "handle AppMaster message correctly" in {
+    val app = TestUtil.dummyApp
+    val submit = SubmitApplication(app, None, "username")
+    val client = TestProbe()(getActorSystem)
+
+    client.send(appManager, submit)
+
     val appMaster = TestProbe()(getActorSystem)
     val appId = 1
 
-    val register = RegisterAppMaster(appMaster.ref, 
AppMasterRuntimeInfo(appId, "appName"))
+    kvService.expectMsgType[PutKV]
+    kvService.expectMsgType[PutKV]
+    appLauncher.expectMsg(LauncherStarted(appId))
+    val register = RegisterAppMaster(appId, appMaster.ref, 
WorkerInfo(WorkerId(1, 0), null))
     appMaster.send(appManager, register)
     appMaster.expectMsgType[AppMasterRegistered]
 
-    appMaster.send(appManager, ActivateAppMaster(appId))
+    val active = ApplicationStatusChanged(appId, ApplicationStatus.ACTIVE, 0, 
null)
+    appMaster.send(appManager, active)
     appMaster.expectMsgType[AppMasterActivated]
   }
 
@@ -101,7 +112,7 @@ class AppManagerSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach with
     
assert(mockClient.receiveN(1).head.asInstanceOf[ResolveAppIdResult].appMaster.isFailure)
 
     mockClient.send(appManager, AppMasterDataRequest(1))
-    mockClient.expectMsg(AppMasterData(AppMasterNonExist))
+    mockClient.expectMsg(AppMasterData(ApplicationStatus.NONEXIST))
   }
 
   "AppManager" should "reject the application submission if the app name 
already existed" in {
@@ -115,9 +126,10 @@ class AppManagerSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach with
     client.send(appManager, submit)
 
     kvService.expectMsgType[PutKV]
+    kvService.expectMsgType[PutKV]
     appLauncher.expectMsg(LauncherStarted(appId))
-    appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
-      AppMasterRuntimeInfo(appId, app.name)))
+    val register = RegisterAppMaster(appId, appMaster.ref, 
WorkerInfo(WorkerId(1, 0), worker.ref))
+    appMaster.send(appManager, register)
     appMaster.expectMsgType[AppMasterRegistered]
 
     client.send(appManager, submit)
@@ -135,9 +147,10 @@ class AppManagerSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach with
     client.send(appManager, submit)
 
     kvService.expectMsgType[PutKV]
+    kvService.expectMsgType[PutKV]
     appLauncher.expectMsg(LauncherStarted(appId))
-    appMaster.send(appManager, RegisterAppMaster(appMaster.ref,
-      AppMasterRuntimeInfo(appId, app.name)))
+    val register = RegisterAppMaster(appId, appMaster.ref, 
WorkerInfo(WorkerId(1, 0), worker.ref))
+    appMaster.send(appManager, register)
     kvService.expectMsgType[PutKV]
     appMaster.expectMsgType[AppMasterRegistered]
 
@@ -157,8 +170,8 @@ class AppManagerSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach with
       // Do recovery
       getActorSystem.stop(appMaster.ref)
       kvService.expectMsgType[GetKV]
-      val appState = ApplicationState(appId, "application1", 1, app, None, 
"username", null)
-      kvService.reply(GetKVSuccess(APP_STATE, appState))
+      val appState = ApplicationMetaData(appId, 1, app, None, "username")
+      kvService.reply(GetKVSuccess(APP_METADATA, appState))
       appLauncher.expectMsg(LauncherStarted(appId))
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala
 
b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala
index a41856d..3a698ae 100644
--- 
a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala
+++ 
b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/AppMasterRuntimeEnvironmentSpec.scala
@@ -112,7 +112,7 @@ class AppMasterRuntimeEnvironmentSpec extends FlatSpec with 
Matchers with Before
   }
 
   private def setupAppMasterRuntimeEnv(): TestAppMasterEnv = {
-    val appContext = AppMasterContext(0, null, null, null, null, null, null)
+    val appContext = AppMasterContext(0, null, null, null, null, null)
     val app = AppDescription("app", "AppMasterClass", null, null)
     val master = TestProbe()
     val masterFactory = (_: AppId, _: MasterActorRef) => toProps(master)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala
 
b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala
index 163da0a..3864cc3 100644
--- 
a/core/src/test/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala
+++ 
b/core/src/test/scala/org/apache/gearpump/cluster/appmaster/MasterConnectionKeeperSpec.scala
@@ -35,8 +35,8 @@ import 
org.apache.gearpump.cluster.master.MasterProxy.WatchMaster
 class MasterConnectionKeeperSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll {
 
   implicit var system: ActorSystem = null
-  val register = RegisterAppMaster(null, null)
   val appId = 0
+  val register = RegisterAppMaster(appId, null, null)
 
   override def beforeAll(): Unit = {
     system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala 
b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
index 2166976..0ad6883 100644
--- a/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
+++ b/core/src/test/scala/org/apache/gearpump/cluster/main/MainSpec.scala
@@ -28,7 +28,7 @@ import 
org.apache.gearpump.cluster.MasterToClient.{ReplayApplicationResult, Reso
 import org.apache.gearpump.cluster.MasterToWorker.WorkerRegistered
 import org.apache.gearpump.cluster.WorkerToMaster.RegisterNewWorker
 import org.apache.gearpump.cluster.master.MasterProxy
-import org.apache.gearpump.cluster.{MasterHarness, TestUtil}
+import org.apache.gearpump.cluster.{ApplicationStatus, MasterHarness, TestUtil}
 import org.apache.gearpump.transport.HostPort
 import org.apache.gearpump.util.Constants._
 import org.apache.gearpump.util.{Constants, LogUtil, Util}
@@ -107,7 +107,8 @@ class MainSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach with Maste
     }
 
     masterReceiver.expectMsg(PROCESS_BOOT_TIME, AppMastersDataRequest)
-    masterReceiver.reply(AppMastersData(List(AppMasterData(AppMasterActive, 0, 
"appName"))))
+    masterReceiver.reply(AppMastersData(List(AppMasterData(
+      ApplicationStatus.ACTIVE, 0, "appName"))))
   }
 
   "Kill" should "be started without exception" in {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala
 
b/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala
index a8adaf0..6593836 100644
--- 
a/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala
+++ 
b/core/src/test/scala/org/apache/gearpump/cluster/master/ApplicationStateSpec.scala
@@ -18,16 +18,17 @@
 
 package org.apache.gearpump.cluster.master
 
+import org.apache.gearpump.cluster.AppDescription
 import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
-
-import org.apache.gearpump.cluster.appmaster.ApplicationState
+import org.apache.gearpump.cluster.appmaster.ApplicationMetaData
 
 class ApplicationStateSpec extends FlatSpec with Matchers with 
BeforeAndAfterEach {
 
   "ApplicationState" should "check equal with respect to only appId and 
attemptId" in {
-    val stateA = ApplicationState(0, "application0", 0, null, null, null, "A")
-    val stateB = ApplicationState(0, "application0", 0, null, null, null, "B")
-    val stateC = ApplicationState(0, "application1", 1, null, null, null, "A")
+    val appDescription = AppDescription("app", "AppMaster", null)
+    val stateA = ApplicationMetaData(0, 0, appDescription, null, null)
+    val stateB = ApplicationMetaData(0, 0, appDescription, null, null)
+    val stateC = ApplicationMetaData(0, 1, appDescription, null, null)
 
     assert(stateA == stateB)
     assert(stateA.hashCode == stateB.hashCode)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
 
b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
index e22abaf..f86a78a 100644
--- 
a/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
+++ 
b/examples/distributedshell/src/test/scala/org/apache/gearpump/examples/distributedshell/DistShellAppMasterSpec.scala
@@ -28,7 +28,7 @@ import 
org.apache.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, RegisterApp
 import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor
 import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, 
ResourceAllocated, WorkerList}
 import org.apache.gearpump.cluster._
-import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, 
AppMasterRuntimeInfo}
+import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, 
ApplicationRuntimeInfo}
 import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, 
ResourceAllocation, ResourceRequest}
 import org.apache.gearpump.cluster.worker.WorkerId
 import org.apache.gearpump.util.ActorSystemBooter.RegisterActorSystem
@@ -49,11 +49,11 @@ class DistShellAppMasterSpec extends WordSpec with Matchers 
with BeforeAndAfter
 
   "DistributedShell AppMaster" should {
     "launch one ShellTask on each worker" in {
-      val appMasterInfo = AppMasterRuntimeInfo(appId, appName = appId.toString)
-      val appMasterContext = AppMasterContext(appId, userName, resource, null, 
appJar,
-        masterProxy, appMasterInfo)
+      val appMasterInfo = ApplicationRuntimeInfo(appId, appName = 
appId.toString)
+      val appMasterContext = AppMasterContext(appId, userName, resource, null, 
appJar, masterProxy)
       TestActorRef[DistShellAppMaster](
-        AppMasterRuntimeEnvironment.props(List(masterProxy.path), 
appDescription, appMasterContext))
+        AppMasterRuntimeEnvironment.props(List(masterProxy.path), 
appDescription,
+          appMasterContext))
       mockMaster.expectMsgType[RegisterAppMaster]
       mockMaster.reply(AppMasterRegistered(appId))
       // The DistributedShell AppMaster asks for worker list from Master.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
 
b/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
index 7516138..b78bfc2 100644
--- 
a/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
+++ 
b/examples/distributeservice/src/test/scala/org/apache/gearpump/experiments/distributeservice/DistServiceAppMasterSpec.scala
@@ -27,7 +27,7 @@ import org.scalatest.{BeforeAndAfter, Matchers, WordSpec}
 import org.apache.gearpump.cluster.AppMasterToMaster.{GetAllWorkers, 
RegisterAppMaster, RequestResource}
 import org.apache.gearpump.cluster.AppMasterToWorker.LaunchExecutor
 import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterRegistered, 
ResourceAllocated, WorkerList}
-import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, 
AppMasterRuntimeInfo}
+import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, 
ApplicationRuntimeInfo}
 import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, 
ResourceAllocation, ResourceRequest}
 import org.apache.gearpump.cluster.worker.WorkerId
 import org.apache.gearpump.cluster.{AppDescription, AppMasterContext, 
TestUtil, UserConfig}
@@ -52,11 +52,10 @@ class DistServiceAppMasterSpec extends WordSpec with 
Matchers with BeforeAndAfte
 
   "DistService AppMaster" should {
     "responsable for service distributing" in {
-      val appMasterInfo = AppMasterRuntimeInfo(appId, "appName", 
mockWorker1.ref)
-      val appMasterContext = AppMasterContext(appId, userName, resource, null, 
appJar, masterProxy,
-        appMasterInfo)
+      val appMasterContext = AppMasterContext(appId, userName, resource, null, 
appJar, masterProxy)
       TestActorRef[DistServiceAppMaster](
-        AppMasterRuntimeEnvironment.props(List(masterProxy.path), 
appDescription, appMasterContext))
+        AppMasterRuntimeEnvironment.props(List(masterProxy.path), 
appDescription,
+          appMasterContext))
       val registerAppMaster = mockMaster.receiveOne(15.seconds)
       assert(registerAppMaster.isInstanceOf[RegisterAppMaster])
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
----------------------------------------------------------------------
diff --git 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
index df1de06..e2d421c 100644
--- 
a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
+++ 
b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala
@@ -21,11 +21,11 @@ package org.apache.gearpump.experiments.storm.main
 import java.io.{File, FileOutputStream, FileWriter}
 import java.nio.ByteBuffer
 import java.nio.channels.{Channels, WritableByteChannel}
-import java.util.{HashMap => JHashMap, Map => JMap, UUID}
+import java.util.{UUID, HashMap => JHashMap, Map => JMap}
+
 import scala.collection.JavaConverters._
 import scala.concurrent.duration.Duration
 import scala.concurrent.{Await, Future}
-
 import akka.actor.ActorSystem
 import com.typesafe.config.ConfigValueFactory
 import backtype.storm.Config
@@ -35,10 +35,9 @@ import backtype.storm.utils.Utils
 import org.apache.storm.shade.org.json.simple.JSONValue
 import org.apache.storm.shade.org.yaml.snakeyaml.Yaml
 import org.slf4j.Logger
-
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import org.apache.gearpump.cluster.{MasterToAppMaster, UserConfig}
+import org.apache.gearpump.cluster.{ApplicationStatus, MasterToAppMaster, 
UserConfig}
 import org.apache.gearpump.experiments.storm.topology.GearpumpStormTopology
 import org.apache.gearpump.experiments.storm.util.TimeCacheMapWrapper.Callback
 import org.apache.gearpump.experiments.storm.util.{GraphBuilder, 
StormConstants, StormUtil, TimeCacheMapWrapper}
@@ -271,7 +270,7 @@ class GearpumpNimbus(clientContext: ClientContext, 
stormConf: JMap[AnyRef, AnyRe
     clientContext.listApps.appMasters.foreach { app =>
       val name = app.appName
       if (applications.contains(name)) {
-        if (app.status != MasterToAppMaster.AppMasterActive) {
+        if (app.status != ApplicationStatus.ACTIVE) {
           removeTopology(name)
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala
 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala
index a00495a..f863a97 100644
--- 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala
+++ 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/TestSpecBase.scala
@@ -18,8 +18,7 @@
 package org.apache.gearpump.integrationtest
 
 import org.scalatest._
-
-import org.apache.gearpump.cluster.MasterToAppMaster
+import org.apache.gearpump.cluster.{ApplicationStatus, MasterToAppMaster}
 import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterData
 import org.apache.gearpump.util.LogUtil
 
@@ -87,7 +86,7 @@ trait TestSpecBase
   def expectAppIsRunning(appId: Int, expectedAppName: String): Unit = {
     Util.retryUntil(() => {
       val app = restClient.queryApp(appId)
-      app.status == MasterToAppMaster.AppMasterActive && app.appName == 
expectedAppName
+      app.status == ApplicationStatus.ACTIVE && app.appName == expectedAppName
     }, s"$expectedAppName is running")
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala
 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala
index 3fa6f6a..a147377 100644
--- 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala
+++ 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/CommandLineSpec.scala
@@ -17,8 +17,8 @@
  */
 package org.apache.gearpump.integrationtest.checklist
 
-import org.apache.gearpump.cluster.MasterToAppMaster
-import org.apache.gearpump.integrationtest.{Util, TestSpecBase}
+import org.apache.gearpump.cluster.{ApplicationStatus, MasterToAppMaster}
+import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
 
 /**
  * The test spec checks the command-line usage
@@ -129,7 +129,7 @@ class CommandLineSpec extends TestSpecBase {
       val actual = commandLineClient.queryApp(appId)
       actual.contains(s"application: $appId, ") &&
         actual.contains(s"name: $expectedName, ") &&
-        actual.contains(s"status: ${MasterToAppMaster.AppMasterActive}")
+        actual.contains(s"status: ${ApplicationStatus.ACTIVE.status}")
     }, "application is running")
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala
 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala
index 8b5b82a..c1c7b84 100644
--- 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala
+++ 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/RestServiceSpec.scala
@@ -18,8 +18,7 @@
 package org.apache.gearpump.integrationtest.checklist
 
 import scala.concurrent.duration._
-
-import org.apache.gearpump.cluster.MasterToAppMaster
+import org.apache.gearpump.cluster.{ApplicationStatus, MasterToAppMaster}
 import org.apache.gearpump.cluster.master.MasterStatus
 import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
 import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
@@ -205,7 +204,7 @@ class RestServiceSpec extends TestSpecBase {
         runningWorkers.length == expectedWorkersCount
       }, "all workers running")
       runningWorkers.foreach { worker =>
-        worker.state shouldEqual MasterToAppMaster.AppMasterActive
+        worker.state shouldEqual "active"
       }
     }
 
@@ -341,7 +340,7 @@ class RestServiceSpec extends TestSpecBase {
       Util.retryUntil(() => restClient.restartApp(originAppId), "app 
restarted")
       val killedApp = restClient.queryApp(originAppId)
       killedApp.appId shouldEqual originAppId
-      killedApp.status shouldEqual MasterToAppMaster.AppMasterInActive
+      killedApp.status shouldEqual ApplicationStatus.TERMINATED
       val newAppId = originAppId + 1
       expectAppIsRunning(newAppId, wordCountName)
       val runningApps = restClient.listRunningApps()
@@ -360,7 +359,7 @@ class RestServiceSpec extends TestSpecBase {
 
     val actualApp = restClient.queryApp(appId)
     actualApp.appId shouldEqual appId
-    actualApp.status shouldEqual MasterToAppMaster.AppMasterInActive
+    actualApp.status shouldEqual ApplicationStatus.TERMINATED
   }
 
   private def expectMetricsAvailable(condition: => Boolean, 
conditionDescription: String): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala
 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala
index 4b15055..4ece4c0 100644
--- 
a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala
+++ 
b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/StabilitySpec.scala
@@ -18,8 +18,7 @@
 package org.apache.gearpump.integrationtest.checklist
 
 import scala.concurrent.duration.Duration
-
-import org.apache.gearpump.cluster.MasterToAppMaster
+import org.apache.gearpump.cluster.{ApplicationStatus, MasterToAppMaster}
 import org.apache.gearpump.cluster.worker.WorkerId
 import org.apache.gearpump.integrationtest.{TestSpecBase, Util}
 import org.apache.gearpump.util.{Constants, LogUtil}
@@ -48,7 +47,7 @@ class StabilitySpec extends TestSpecBase {
 
       // verify
       val laterAppMaster = restClient.queryStreamingAppDetail(appId)
-      laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive
+      laterAppMaster.status shouldEqual ApplicationStatus.ACTIVE
       laterAppMaster.clock should be > 0L
     }
   }
@@ -70,7 +69,7 @@ class StabilitySpec extends TestSpecBase {
 
       // verify
       val laterAppMaster = restClient.queryStreamingAppDetail(appId)
-      laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive
+      laterAppMaster.status shouldEqual ApplicationStatus.ACTIVE
       laterAppMaster.clock should be > 0L
     }
   }
@@ -129,7 +128,7 @@ class StabilitySpec extends TestSpecBase {
 
       // verify
       val laterAppMaster = restClient.queryStreamingAppDetail(appId)
-      laterAppMaster.status shouldEqual MasterToAppMaster.AppMasterActive
+      laterAppMaster.status shouldEqual ApplicationStatus.ACTIVE
       laterAppMaster.clock should be > 0L
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala
 
b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala
index 884a8d1..2247cf3 100644
--- 
a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala
+++ 
b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/CommandLineClient.scala
@@ -18,8 +18,7 @@
 package org.apache.gearpump.integrationtest.minicluster
 
 import org.apache.log4j.Logger
-
-import org.apache.gearpump.cluster.MasterToAppMaster
+import org.apache.gearpump.cluster.ApplicationStatus
 import org.apache.gearpump.integrationtest.Docker
 
 /**
@@ -36,14 +35,10 @@ class CommandLineClient(host: String) {
   }
 
   def listRunningApps(): Array[String] =
-    listApps().filter(
-      _.contains(s", status: ${MasterToAppMaster.AppMasterActive}")
-    )
+    listApps().filter(_.contains(s", status: ${ApplicationStatus.ACTIVE}"))
 
   def queryApp(appId: Int): String = try {
-    listApps().filter(
-      _.startsWith(s"application: $appId")
-    ).head
+    listApps().filter(_.startsWith(s"application: $appId")).head
   } catch {
     case ex: Throwable =>
       LOG.warn(s"swallowed an exception: $ex")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala
----------------------------------------------------------------------
diff --git 
a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala
 
b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala
index 8fa0679..b8241af 100644
--- 
a/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala
+++ 
b/integrationtest/core/src/main/scala/org/apache/gearpump/integrationtest/minicluster/RestClient.scala
@@ -24,12 +24,13 @@ import org.apache.log4j.Logger
 import upickle.Js
 import upickle.default._
 
+import org.apache.gearpump.cluster.ApplicationStatus._
 import org.apache.gearpump.cluster.AppMasterToMaster.MasterData
 import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, 
AppMastersData}
 import org.apache.gearpump.cluster.MasterToClient.HistoryMetrics
 import org.apache.gearpump.cluster.master.MasterSummary
 import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
-import org.apache.gearpump.cluster.{AppJar, MasterToAppMaster}
+import org.apache.gearpump.cluster.AppJar
 import org.apache.gearpump.integrationtest.{Docker, Util}
 import org.apache.gearpump.services.AppMasterService.Status
 import org.apache.gearpump.services.MasterService.{AppSubmissionResult, 
BuiltinPartitioners}
@@ -78,7 +79,7 @@ class RestClient(host: String, port: Int) {
   }
 
   def listRunningWorkers(): Array[WorkerSummary] = {
-    listWorkers().filter(_.state == MasterToAppMaster.AppMasterActive)
+    listWorkers().filter(_.state == ACTIVE.status)
   }
 
   def listApps(): Array[AppMasterData] = {
@@ -87,12 +88,12 @@ class RestClient(host: String, port: Int) {
   }
 
   def listPendingOrRunningApps(): Array[AppMasterData] = {
-    listApps().filter(app => app.status == MasterToAppMaster.AppMasterActive
-      || app.status == MasterToAppMaster.AppMasterPending)
+    listApps().filter(app => app.status == ACTIVE
+      || app.status == PENDING)
   }
 
   def listRunningApps(): Array[AppMasterData] = {
-    listApps().filter(_.status == MasterToAppMaster.AppMasterActive)
+    listApps().filter(_.status == ACTIVE)
   }
 
   def getNextAvailableAppId(): Int = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/services/jvm/src/main/scala/org/apache/gearpump/services/util/UpickleUtil.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/main/scala/org/apache/gearpump/services/util/UpickleUtil.scala
 
b/services/jvm/src/main/scala/org/apache/gearpump/services/util/UpickleUtil.scala
index caa3a33..b5b0293 100644
--- 
a/services/jvm/src/main/scala/org/apache/gearpump/services/util/UpickleUtil.scala
+++ 
b/services/jvm/src/main/scala/org/apache/gearpump/services/util/UpickleUtil.scala
@@ -18,8 +18,8 @@
 
 package org.apache.gearpump.services.util
 
+import org.apache.gearpump.cluster.ApplicationStatus
 import upickle.Js
-
 import org.apache.gearpump.cluster.worker.WorkerId
 import org.apache.gearpump.util.Graph
 
@@ -37,6 +37,19 @@ object UpickleUtil {
     }
   }
 
+  implicit val appStatusReader: upickle.default.Reader[ApplicationStatus] =
+    upickle.default.Reader[ApplicationStatus] {
+      case Js.Str(str) =>
+        str match {
+          case "pending" => ApplicationStatus.PENDING
+          case "active" => ApplicationStatus.ACTIVE
+          case "succeeded" => ApplicationStatus.SUCCEEDED
+          case "failed" => ApplicationStatus.FAILED
+          case "terminated" => ApplicationStatus.TERMINATED
+          case _ => ApplicationStatus.NONEXIST
+        }
+    }
+
   implicit val workerIdReader: upickle.default.Reader[WorkerId] = 
upickle.default.Reader[WorkerId] {
     case Js.Str(str) =>
       WorkerId.parse(str)
@@ -46,4 +59,10 @@ object UpickleUtil {
     case workerId: WorkerId =>
       Js.Str(WorkerId.render(workerId))
   }
+
+  implicit val appStatusWriter: upickle.default.Writer[ApplicationStatus] =
+    upickle.default.Writer[ApplicationStatus] {
+    case status: ApplicationStatus =>
+      Js.Str(status.toString)
+  }
 }
\ No newline at end of file


Reply via email to