Repository: incubator-gearpump Updated Branches: refs/heads/master 2913a1fd8 -> 96312a2ac
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala index cec7367..80264d2 100644 --- a/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala +++ b/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala @@ -20,7 +20,6 @@ package org.apache.gearpump.services import scala.concurrent.duration._ import scala.util.{Success, Try} - import akka.actor.ActorRef import akka.http.scaladsl.model.headers.`Cache-Control` import akka.http.scaladsl.testkit.{RouteTestTimeout, ScalatestRouteTest} @@ -30,12 +29,11 @@ import com.typesafe.config.{Config, ConfigFactory} import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} import org.slf4j.Logger import upickle.default.read - import org.apache.gearpump.cluster.AppMasterToMaster.GeneralAppMasterSummary import org.apache.gearpump.cluster.ClientToMaster.{GetLastFailure, QueryAppMasterConfig, QueryHistoryMetrics, ResolveAppId} import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest} import org.apache.gearpump.cluster.MasterToClient._ -import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.cluster.{ApplicationStatus, TestUtil} import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer} import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig} import org.apache.gearpump.util.LogUtil @@ -85,7 +83,7 @@ class AppMasterServiceSpec extends FlatSpec with ScalatestRouteTest sender ! ResolveAppIdResult(Success(mockAppMaster.ref)) KeepRunning case AppMasterDataRequest(appId, _) => - sender ! AppMasterData("active") + sender ! AppMasterData(ApplicationStatus.ACTIVE) KeepRunning case QueryAppMasterConfig(appId) => sender ! AppMasterConfig(null) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala index 1341464..1266337 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala @@ -22,7 +22,7 @@ import java.lang.management.ManagementFactory import akka.actor._ import org.apache.gearpump._ -import org.apache.gearpump.cluster.AppMasterToMaster.ActivateAppMaster +import org.apache.gearpump.cluster.AppMasterToMaster.ApplicationStatusChanged import org.apache.gearpump.cluster.ClientToMaster._ import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterActivated, AppMasterDataDetailRequest, ReplayFromTimestampWindowTrailingEdge} import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem, LastFailure} @@ -36,7 +36,7 @@ import org.apache.gearpump.streaming._ import org.apache.gearpump.streaming.appmaster.AppMaster._ import org.apache.gearpump.streaming.appmaster.DagManager.{GetLatestDAG, LatestDAG, ReplaceProcessor} import org.apache.gearpump.streaming.appmaster.ExecutorManager.{ExecutorInfo, GetExecutorInfo} -import org.apache.gearpump.streaming.appmaster.TaskManager.{ApplicationReady, GetTaskList, TaskList, FailedToRecover} +import org.apache.gearpump.streaming.appmaster.TaskManager.{ApplicationReady, FailedToRecover, GetTaskList, TaskList} import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig} import org.apache.gearpump.streaming.storage.InMemoryAppStoreOnMaster import org.apache.gearpump.streaming.task._ @@ -218,7 +218,7 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli appName = app.name, actorPath = address, clock = clock, - status = MasterToAppMaster.AppMasterActive, + status = ApplicationStatus.ACTIVE, startTime = startTime, uptime = System.currentTimeMillis() - startTime, user = username, @@ -292,7 +292,8 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli def ready: Receive = { case ApplicationReady => - masterProxy ! ActivateAppMaster(appId) + masterProxy ! ApplicationStatusChanged(appId, ApplicationStatus.ACTIVE, + System.currentTimeMillis(), null) case AppMasterActivated(id) => LOG.info(s"AppMaster for app$id is activated") } @@ -302,11 +303,16 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli case FailedToRecover(errorMsg) => if (context.children.toList.contains(sender())) { LOG.error(errorMsg) - masterProxy ! ShutdownApplication(appId) + val failed = ApplicationStatusChanged(appId, ApplicationStatus.FAILED, lastFailure.time, + new Exception(lastFailure.error)) + masterProxy ! failed } case AllocateResourceTimeOut => - LOG.error(s"Failed to allocate resource in time, shutdown application $appId") - masterProxy ! ShutdownApplication(appId) + val errorMsg = s"Failed to allocate resource in time, shutdown application $appId" + LOG.error(errorMsg) + val failed = ApplicationStatusChanged(appId, ApplicationStatus.FAILED, + System.currentTimeMillis(), new Exception(lastFailure.error)) + masterProxy ! failed context.stop(self) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala index 3d43ee7..126ab92 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala @@ -20,8 +20,7 @@ package org.apache.gearpump.streaming.appmaster import org.apache.gearpump._ import org.apache.gearpump.cluster.AppMasterToMaster.AppMasterSummary -import org.apache.gearpump.cluster.MasterToAppMaster.AppMasterStatus -import org.apache.gearpump.cluster.{MasterToAppMaster, UserConfig} +import org.apache.gearpump.cluster.{ApplicationStatus, UserConfig} import org.apache.gearpump.streaming.appmaster.AppMaster.ExecutorBrief import org.apache.gearpump.streaming.{ExecutorId, LifeTime, ProcessorId} import org.apache.gearpump.util.Graph @@ -34,7 +33,7 @@ case class StreamAppMasterSummary( appName: String = null, actorPath: String = null, clock: TimeStamp = 0L, - status: AppMasterStatus = MasterToAppMaster.AppMasterActive, + status: ApplicationStatus = ApplicationStatus.ACTIVE, startTime: TimeStamp = 0L, uptime: TimeStamp = 0L, user: String = null, http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala index f4fffb5..c8478f6 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/StreamingTestUtil.scala @@ -20,7 +20,7 @@ package org.apache.gearpump.streaming import akka.actor._ import akka.testkit.TestActorRef import org.apache.gearpump.cluster.AppMasterToMaster.RegisterAppMaster -import org.apache.gearpump.cluster.appmaster.AppMasterRuntimeInfo +import org.apache.gearpump.cluster.appmaster.ApplicationRuntimeInfo import org.apache.gearpump.cluster.scheduler.Resource import org.apache.gearpump.cluster.{AppDescription, AppMasterContext, MiniCluster, UserConfig} import org.apache.gearpump.streaming.appmaster.AppMaster @@ -34,13 +34,13 @@ object StreamingTestUtil { implicit val actorSystem = miniCluster.system val masterConf = AppMasterContext(appId, testUserName, Resource(1), null, - None, miniCluster.mockMaster, AppMasterRuntimeInfo(appId, appName = appId.toString)) + None, miniCluster.mockMaster) val app = StreamApplication("test", Graph.empty, UserConfig.empty) val appDescription = AppDescription(app.name, app.appMaster.getName, app.userConfig) val props = Props(new AppMaster(masterConf, appDescription)) val appMaster = miniCluster.launchActor(props).asInstanceOf[TestActorRef[AppMaster]] - val registerAppMaster = RegisterAppMaster(appMaster, masterConf.registerData) + val registerAppMaster = RegisterAppMaster(appId, ActorRef.noSender, null) miniCluster.mockMaster.tell(registerAppMaster, appMaster) appMaster http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala index 29dfc57..89dea81 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala @@ -29,7 +29,7 @@ import org.apache.gearpump.cluster.MasterToAppMaster._ import org.apache.gearpump.cluster.MasterToClient.LastFailure import org.apache.gearpump.cluster.WorkerToAppMaster.ExecutorLaunchRejected import org.apache.gearpump.cluster._ -import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, AppMasterRuntimeInfo} +import org.apache.gearpump.cluster.appmaster.{AppMasterRuntimeEnvironment, ApplicationRuntimeInfo} import org.apache.gearpump.cluster.master.MasterProxy import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest} import org.apache.gearpump.cluster.worker.WorkerId @@ -72,7 +72,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with var mockWorker: TestProbe = null var appDescription: AppDescription = null var appMasterContext: AppMasterContext = null - var appMasterRuntimeInfo: AppMasterRuntimeInfo = null + var appMasterRuntimeInfo: ApplicationRuntimeInfo = null override def beforeEach(): Unit = { startActorSystem() @@ -82,13 +82,12 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with mockMaster = TestProbe()(getActorSystem) mockWorker = TestProbe()(getActorSystem) mockMaster.ignoreMsg(ignoreSaveAppData) - appMasterRuntimeInfo = AppMasterRuntimeInfo(appId, appName = appId.toString) + appMasterRuntimeInfo = ApplicationRuntimeInfo(appId, appName = appId.toString) implicit val system = getActorSystem conf = UserConfig.empty.withValue(AppMasterSpec.MASTER, mockMaster.ref) - val mockJar = AppJar("for_test", FilePath("path")) - appMasterContext = AppMasterContext(appId, "test", resource, null, Some(mockJar), - mockMaster.ref, appMasterRuntimeInfo) + val mockJar = Some(AppJar("for_test", FilePath("path"))) + appMasterContext = AppMasterContext(appId, "test", resource, null, mockJar, mockMaster.ref) val graph = Graph(taskDescription1 ~ partitioner ~> taskDescription2) val streamApp = StreamApplication("test", graph, conf) appDescription = Application.ApplicationToAppDescription(streamApp) @@ -124,7 +123,8 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with // triggers ResourceAllocationTimeout in ExecutorSystemScheduler mockMaster.reply(ResourceAllocated(Array(ResourceAllocation(Resource(2), mockWorker.ref, workerId)))) - mockMaster.expectMsg(60.seconds, ShutdownApplication(appId)) + val statusChanged = mockMaster.expectMsgType[ApplicationStatusChanged](60.seconds) + statusChanged.newStatus shouldBe ApplicationStatus.FAILED } "reschedule the resource when the worker reject to start executor" in { @@ -248,7 +248,8 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with // fail to recover after restarting a tasks for 5 times appMaster.tell(MessageLoss(0, TaskId(0, 0), "message loss"), mockTask.ref) - mockMaster.expectMsg(60.seconds, ShutdownApplication(appId)) + val statusChanged = mockMaster.expectMsgType[ApplicationStatusChanged](60.seconds) + statusChanged.newStatus shouldBe ApplicationStatus.FAILED workerSystem.terminate() } @@ -281,7 +282,7 @@ class AppMasterSpec extends WordSpec with Matchers with BeforeAndAfterEach with def expectAppStarted(): Unit = { // wait for app to get started - mockMaster.expectMsg(ActivateAppMaster(appId)) + mockMaster.expectMsgType[ApplicationStatusChanged] mockMaster.reply(AppMasterActivated(appId)) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/96312a2a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorManagerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorManagerSpec.scala index 42b2618..fa23f54 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorManagerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ExecutorManagerSpec.scala @@ -66,7 +66,7 @@ class ExecutorManagerSpec extends FlatSpec with Matchers with BeforeAndAfterAll val appName = "app" val appJar = Some(AppJar("for_test", FilePath("path"))) - val appMasterContext = AppMasterContext(appId, username, null, null, appJar, master.ref, null) + val appMasterContext = AppMasterContext(appId, username, null, null, appJar, master.ref) val executorFactory = (_: ExecutorContext, _: UserConfig, _: Address, _: ExecutorId) => { executor.ref ! StartExecutorActorPlease
