Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 2913a1fd8 -> 96312a2ac


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
----------------------------------------------------------------------
diff --git 
a/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
 
b/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
index cec7367..80264d2 100644
--- 
a/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
+++ 
b/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
@@ -20,7 +20,6 @@ package org.apache.gearpump.services
 
 import scala.concurrent.duration._
 import scala.util.{Success, Try}
-
 import akka.actor.ActorRef
 import akka.http.scaladsl.model.headers.`Cache-Control`
 import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest}
@@ -30,12 +29,11 @@ import com.typesafe.config.{Config, ConfigFactory}
 import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
 import org.slf4j.Logger
 import upickle.default.read
-
 import org.apache.gearpump.cluster.AppMasterToMaster.GeneralAppMasterSummary
 import org.apache.gearpump.cluster.ClientToMaster.{GetLastFailure, 
QueryAppMasterConfig, QueryHistoryMetrics, ResolveAppId}
 import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, 
AppMasterDataDetailRequest, AppMasterDataRequest}
 import org.apache.gearpump.cluster.MasterToClient._
-import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.cluster.{ApplicationStatus, TestUtil}
 import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer}
 import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, 
ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
 import org.apache.gearpump.util.LogUtil
@@ -85,7 +83,7 @@ class AppMasterServiceSpec extends FlatSpec with 
ScalatestRouteTest
           sender ! ResolveAppIdResult(Success(mockAppMaster.ref))
           KeepRunning
         case AppMasterDataRequest(appId, _) =>
-          sender ! AppMasterData("active")
+          sender ! AppMasterData(ApplicationStatus.ACTIVE)
           KeepRunning
         case QueryAppMasterConfig(appId) =>
           sender ! AppMasterConfig(null)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
index 1341464..1266337 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
@@ -22,7 +22,7 @@ import java.lang.management.ManagementFactory
 
 import akka.actor._
 import org.apache.gearpump._
-import org.apache.gearpump.cluster.AppMasterToMaster.ActivateAppMaster
+import org.apache.gearpump.cluster.AppMasterToMaster.ApplicationStatusChanged
 import org.apache.gearpump.cluster.ClientToMaster._
 import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterActivated, 
AppMasterDataDetailRequest, ReplayFromTimestampWindowTrailingEdge}
 import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, 
HistoryMetricsItem, LastFailure}
@@ -36,7 +36,7 @@ import org.apache.gearpump.streaming._
 import org.apache.gearpump.streaming.appmaster.AppMaster._
 import org.apache.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, 
LatestDAG, ReplaceProcessor}
 import org.apache.gearpump.streaming.appmaster.ExecutorManager.{ExecutorInfo, 
GetExecutorInfo}
-import org.apache.gearpump.streaming.appmaster.TaskManager.{ApplicationReady, 
GetTaskList, TaskList, FailedToRecover}
+import org.apache.gearpump.streaming.appmaster.TaskManager.{ApplicationReady, 
FailedToRecover, GetTaskList, TaskList}
 import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, 
ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
 import org.apache.gearpump.streaming.storage.InMemoryAppStoreOnMaster
 import org.apache.gearpump.streaming.task._
@@ -218,7 +218,7 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
             appName = app.name,
             actorPath = address,
             clock = clock,
-            status = MasterToAppMaster.AppMasterActive,
+            status = ApplicationStatus.ACTIVE,
             startTime = startTime,
             uptime = System.currentTimeMillis() - startTime,
             user = username,
@@ -292,7 +292,8 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
 
   def ready: Receive = {
     case ApplicationReady =>
-      masterProxy ! ActivateAppMaster(appId)
+      masterProxy ! ApplicationStatusChanged(appId, ApplicationStatus.ACTIVE,
+        System.currentTimeMillis(), null)
     case AppMasterActivated(id) =>
       LOG.info(s"AppMaster for app$id is activated")
   }
@@ -302,11 +303,16 @@ class AppMaster(appContext: AppMasterContext, app: 
AppDescription) extends Appli
     case FailedToRecover(errorMsg) =>
       if (context.children.toList.contains(sender())) {
         LOG.error(errorMsg)
-        masterProxy ! ShutdownApplication(appId)
+        val failed = ApplicationStatusChanged(appId, ApplicationStatus.FAILED, 
lastFailure.time,
+          new Exception(lastFailure.error))
+        masterProxy ! failed
       }
     case AllocateResourceTimeOut =>
-      LOG.error(s"Failed to allocate resource in time, shutdown application 
$appId")
-      masterProxy ! ShutdownApplication(appId)
+      val errorMsg = s"Failed to allocate resource in time, shutdown 
application $appId"
+      LOG.error(errorMsg)
+      val failed = ApplicationStatusChanged(appId, ApplicationStatus.FAILED,
+        System.currentTimeMillis(), new Exception(lastFailure.error))
+      masterProxy ! failed
       context.stop(self)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
index 3d43ee7..126ab92 100644
--- 
a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
+++ 
b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
@@ -20,8 +20,7 @@ package org.apache.gearpump.streaming.appmaster
 
 import org.apache.gearpump._
 import org.apache.gearpump.cluster.AppMasterToMaster.AppMasterSummary
-import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterStatus
-import org.apache.gearpump.cluster.{MasterToAppMaster, UserConfig}
+import org.apache.gearpump.cluster.{ApplicationStatus, UserConfig}
 import org.apache.gearpump.streaming.appmaster.AppMaster.ExecutorBrief
 import org.apache.gearpump.streaming.{ExecutorId, LifeTime, ProcessorId}
 import org.apache.gearpump.util.Graph
@@ -34,7 +33,7 @@ case class StreamAppMasterSummary(
     appName: String = null,
     actorPath: String = null,
     clock: TimeStamp = 0L,
-    status: AppMasterStatus = MasterToAppMaster.AppMasterActive,
+    status: ApplicationStatus = ApplicationStatus.ACTIVE,
     startTime: TimeStamp = 0L,
     uptime: TimeStamp = 0L,
     user: String = null,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala
index f4fffb5..c8478f6 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.streaming
 import akka.actor._
 import akka.testkit.TestActorRef
 import org.apache.gearpump.cluster.AppMasterToMaster.RegisterAppMaster
-import org.apache.gearpump.cluster.appmaster.AppMasterRuntimeInfo
+import org.apache.gearpump.cluster.appmaster.ApplicationRuntimeInfo
 import org.apache.gearpump.cluster.scheduler.Resource
 import org.apache.gearpump.cluster.{AppDescription, AppMasterContext, 
MiniCluster, UserConfig}
 import org.apache.gearpump.streaming.appmaster.AppMaster
@@ -34,13 +34,13 @@ object StreamingTestUtil {
 
     implicit val actorSystem = miniCluster.system
     val masterConf = AppMasterContext(appId, testUserName, Resource(1), null,
-      None, miniCluster.mockMaster, AppMasterRuntimeInfo(appId, appName = 
appId.toString))
+      None, miniCluster.mockMaster)
 
     val app = StreamApplication("test", Graph.empty, UserConfig.empty)
     val appDescription = AppDescription(app.name, app.appMaster.getName, 
app.userConfig)
     val props = Props(new AppMaster(masterConf, appDescription))
     val appMaster = 
miniCluster.launchActor(props).asInstanceOf[TestActorRef[AppMaster]]
-    val registerAppMaster = RegisterAppMaster(appMaster, 
masterConf.registerData)
+    val registerAppMaster = RegisterAppMaster(appId, ActorRef.noSender, null)
     miniCluster.mockMaster.tell(registerAppMaster, appMaster)
 
     appMaster

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
index 29dfc57..89dea81 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
@@ -29,7 +29,7 @@ import org.apache.gearpump.cluster.MasterToAppMaster._
 import org.apache.gearpump.cluster.MasterToClient.LastFailure
 import org.apache.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected
 import org.apache.gearpump.cluster._
-import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, 
AppMasterRuntimeInfo}
+import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, 
ApplicationRuntimeInfo}
 import org.apache.gearpump.cluster.master.MasterProxy
 import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, 
ResourceRequest}
 import org.apache.gearpump.cluster.worker.WorkerId
@@ -72,7 +72,7 @@ class AppMasterSpec extends WordSpec with Matchers with 
BeforeAndAfterEach with
   var mockWorker: TestProbe = null
   var appDescription: AppDescription = null
   var appMasterContext: AppMasterContext = null
-  var appMasterRuntimeInfo: AppMasterRuntimeInfo = null
+  var appMasterRuntimeInfo: ApplicationRuntimeInfo = null
 
   override def beforeEach(): Unit = {
     startActorSystem()
@@ -82,13 +82,12 @@ class AppMasterSpec extends WordSpec with Matchers with 
BeforeAndAfterEach with
     mockMaster = TestProbe()(getActorSystem)
     mockWorker = TestProbe()(getActorSystem)
     mockMaster.ignoreMsg(ignoreSaveAppData)
-    appMasterRuntimeInfo = AppMasterRuntimeInfo(appId, appName = 
appId.toString)
+    appMasterRuntimeInfo = ApplicationRuntimeInfo(appId, appName = 
appId.toString)
 
     implicit val system = getActorSystem
     conf = UserConfig.empty.withValue(AppMasterSpec.MASTER, mockMaster.ref)
-    val mockJar = AppJar("for_test", FilePath("path"))
-    appMasterContext = AppMasterContext(appId, "test", resource, null, 
Some(mockJar),
-      mockMaster.ref, appMasterRuntimeInfo)
+    val mockJar = Some(AppJar("for_test", FilePath("path")))
+    appMasterContext = AppMasterContext(appId, "test", resource, null, 
mockJar, mockMaster.ref)
     val graph = Graph(taskDescription1 ~ partitioner ~> taskDescription2)
     val streamApp = StreamApplication("test", graph, conf)
     appDescription = Application.ApplicationToAppDescription(streamApp)
@@ -124,7 +123,8 @@ class AppMasterSpec extends WordSpec with Matchers with 
BeforeAndAfterEach with
       // triggers ResourceAllocationTimeout in ExecutorSystemScheduler
       mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(2),
         mockWorker.ref, workerId))))
-      mockMaster.expectMsg(60.seconds, ShutdownApplication(appId))
+      val statusChanged = 
mockMaster.expectMsgType[ApplicationStatusChanged](60.seconds)
+      statusChanged.newStatus shouldBe ApplicationStatus.FAILED
     }
 
     "reschedule the resource when the worker reject to start executor" in {
@@ -248,7 +248,8 @@ class AppMasterSpec extends WordSpec with Matchers with 
BeforeAndAfterEach with
 
       // fail to recover after restarting a tasks for 5 times
       appMaster.tell(MessageLoss(0, TaskId(0, 0), "message loss"), 
mockTask.ref)
-      mockMaster.expectMsg(60.seconds, ShutdownApplication(appId))
+      val statusChanged = 
mockMaster.expectMsgType[ApplicationStatusChanged](60.seconds)
+      statusChanged.newStatus shouldBe ApplicationStatus.FAILED
 
       workerSystem.terminate()
     }
@@ -281,7 +282,7 @@ class AppMasterSpec extends WordSpec with Matchers with 
BeforeAndAfterEach with
 
   def expectAppStarted(): Unit = {
     // wait for app to get started
-    mockMaster.expectMsg(ActivateAppMaster(appId))
+    mockMaster.expectMsgType[ApplicationStatusChanged]
     mockMaster.reply(AppMasterActivated(appId))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
index 42b2618..fa23f54 100644
--- 
a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
+++ 
b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorManagerSpec.scala
@@ -66,7 +66,7 @@ class ExecutorManagerSpec extends FlatSpec with Matchers with 
BeforeAndAfterAll
     val appName = "app"
     val appJar = Some(AppJar("for_test", FilePath("path")))
 
-    val appMasterContext = AppMasterContext(appId, username, null, null, 
appJar, master.ref, null)
+    val appMasterContext = AppMasterContext(appId, username, null, null, 
appJar, master.ref)
 
     val executorFactory = (_: ExecutorContext, _: UserConfig, _: Address, _: 
ExecutorId) => {
       executor.ref ! StartExecutorActorPlease

Reply via email to