Repository: incubator-gearpump Updated Branches: refs/heads/master 636cd6f8e -> ab640f712
[GEARPUMP-252] Return meaningful result than app id when submitting application in ClientContext Author: huafengw <[email protected]> Closes #133 from huafengw/refactorClient. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/ab640f71 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/ab640f71 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/ab640f71 Branch: refs/heads/master Commit: ab640f71285c95d8243818667c84f75d0833fa7c Parents: 636cd6f Author: huafengw <[email protected]> Authored: Wed Jan 18 13:43:21 2017 +0800 Committer: manuzhang <[email protected]> Committed: Wed Jan 18 13:43:44 2017 +0800 ---------------------------------------------------------------------- .../gearpump/cluster/client/ClientContext.scala | 76 +++++++++------ .../gearpump/cluster/client/MasterClient.scala | 81 ---------------- .../cluster/client/RunningApplication.scala | 52 ++++++++++ .../org/apache/gearpump/util/ActorUtil.scala | 16 +++- .../cluster/client/RunningApplicationSpec.scala | 99 ++++++++++++++++++++ .../distributedshell/DistributedShell.scala | 4 +- .../distributeservice/DistributeService.scala | 4 +- .../experiments/storm/main/GearpumpNimbus.scala | 2 +- .../gearpump/services/MasterService.scala | 2 +- .../streaming/dsl/javaapi/JavaStreamApp.scala | 4 +- 10 files changed, 217 insertions(+), 123 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ab640f71/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala index 34582dd..48b95d8 100755 --- a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala @@ -23,8 +23,9 @@ import java.util.concurrent.TimeUnit import akka.actor.{ActorRef, ActorSystem} import akka.util.Timeout import com.typesafe.config.{Config, ConfigValueFactory} -import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersData, ReplayFromTimestampWindowTrailingEdge} -import org.apache.gearpump.cluster.MasterToClient.ReplayApplicationResult +import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication, SubmitApplication} +import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest, ReplayFromTimestampWindowTrailingEdge} +import org.apache.gearpump.cluster.MasterToClient._ import org.apache.gearpump.cluster._ import org.apache.gearpump.cluster.master.MasterProxy import org.apache.gearpump.jarstore.JarStoreClient @@ -32,10 +33,11 @@ import org.apache.gearpump.util.Constants._ import org.apache.gearpump.util.{ActorUtil, Constants, LogUtil, Util} import org.slf4j.Logger +import scala.concurrent.ExecutionContext.Implicits.global import scala.collection.JavaConverters._ import scala.concurrent.duration.Duration import scala.concurrent.{Await, Future} -import scala.util.Try +import scala.util.{Failure, Success, Try} /** * ClientContext is a user facing util to submit/manage an application. @@ -43,7 +45,6 @@ import scala.util.Try * TODO: add interface to query master here */ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { - def this(system: ActorSystem) = { this(system.settings.config, system, null) } @@ -53,20 +54,20 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { } private val LOG: Logger = LogUtil.getLogger(getClass) - private implicit val timeout = Timeout(5, TimeUnit.SECONDS) - implicit val system = Option(sys).getOrElse(ActorSystem(s"client${Util.randInt()}", config)) LOG.info(s"Starting system ${system.name}") - val shouldCleanupSystem = Option(sys).isEmpty - private val jarStoreClient = new JarStoreClient(config, system) + private val masterClientTimeout = { + val timeout = Try(config.getInt(Constants.GEARPUMP_MASTERCLIENT_TIMEOUT)).getOrElse(90) + Timeout(timeout, TimeUnit.SECONDS) + } private lazy val master: ActorRef = { val masters = config.getStringList(Constants.GEARPUMP_CLUSTER_MASTERS).asScala .flatMap(Util.parseHostList) val master = Option(_master).getOrElse(system.actorOf(MasterProxy.props(masters), s"masterproxy${system.name}")) - LOG.info(s"Creating master proxy ${master} for master list: $masters") + LOG.info(s"Creating master proxy $master for master list: $masters") master } @@ -75,26 +76,25 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { * defined. Otherwise, it assumes the jar is on the target runtime classpath, thus will * not send the jar across the wire. */ - def submit(app: Application): Int = { + def submit(app: Application): RunningApplication = { submit(app, System.getProperty(GEARPUMP_APP_JAR)) } - def submit(app: Application, jar: String): Int = { - submit(app, jar, getExecutorNum()) + def submit(app: Application, jar: String): RunningApplication = { + submit(app, jar, getExecutorNum) } - def submit(app: Application, jar: String, executorNum: Int): Int = { - val client = getMasterClient + def submit(app: Application, jar: String, executorNum: Int): RunningApplication = { val appName = checkAndAddNamePrefix(app.name, System.getProperty(GEARPUMP_APP_NAME_PREFIX)) val submissionConfig = getSubmissionConfig(config) .withValue(APPLICATION_EXECUTOR_NUMBER, ConfigValueFactory.fromAnyRef(executorNum)) val appDescription = AppDescription(appName, app.appMaster.getName, app.userConfig, submissionConfig) val appJar = Option(jar).map(loadFile) - client.submitApplication(appDescription, appJar) + submitApplication(SubmitApplication(appDescription, appJar)) } - private def getExecutorNum(): Int = { + private def getExecutorNum: Int = { Try(System.getProperty(APPLICATION_EXECUTOR_NUMBER).toInt).getOrElse(1) } @@ -102,8 +102,11 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { ClusterConfig.filterOutDefaultConfig(config) } + def listApps: AppMastersData = { + ActorUtil.askActor[AppMastersData](master, AppMastersDataRequest, masterClientTimeout) + } + def replayFromTimestampWindowTrailingEdge(appId: Int): ReplayApplicationResult = { - import scala.concurrent.ExecutionContext.Implicits.global val result = Await.result( ActorUtil.askAppMaster[ReplayApplicationResult](master, appId, ReplayFromTimestampWindowTrailingEdge(appId)), Duration.Inf) @@ -111,27 +114,29 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { } def askAppMaster[T](appId: Int, msg: Any): Future[T] = { - import scala.concurrent.ExecutionContext.Implicits.global ActorUtil.askAppMaster[T](master, appId, msg) } - def listApps: AppMastersData = { - val client = getMasterClient - client.listApplications - } - def shutdown(appId: Int): Unit = { - val client = getMasterClient - client.shutdownApplication(appId) + val result = ActorUtil.askActor[ShutdownApplicationResult](master, + ShutdownApplication(appId), masterClientTimeout) + result.appId match { + case Success(_) => + case Failure(ex) => throw ex + } } def resolveAppID(appId: Int): ActorRef = { - val client = getMasterClient - client.resolveAppId(appId) + val result = ActorUtil.askActor[ResolveAppIdResult](master, + ResolveAppId(appId), masterClientTimeout) + result.appMaster match { + case Success(appMaster) => appMaster + case Failure(ex) => throw ex + } } def close(): Unit = { - if (shouldCleanupSystem) { + if (sys == null) { LOG.info(s"Shutting down system ${system.name}") system.terminate() } @@ -161,9 +166,18 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { fullName } - private def getMasterClient: MasterClient = { - val timeout = Try(config.getInt(Constants.GEARPUMP_MASTERCLIENT_TIMEOUT)).getOrElse(90) - new MasterClient(master, akka.util.Timeout(timeout, TimeUnit.SECONDS)) + private def submitApplication(submitApplication: SubmitApplication): RunningApplication = { + val result = ActorUtil.askActor[SubmitApplicationResult](master, + submitApplication, masterClientTimeout) + val application = result.appId match { + case Success(appId) => + // scalastyle:off println + Console.println(s"Submit application succeed. The application id is $appId") + // scalastyle:on println + new RunningApplication(appId, master, masterClientTimeout) + case Failure(ex) => throw ex + } + application } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ab640f71/core/src/main/scala/org/apache/gearpump/cluster/client/MasterClient.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/MasterClient.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/MasterClient.scala deleted file mode 100644 index 77ebedf..0000000 --- a/core/src/main/scala/org/apache/gearpump/cluster/client/MasterClient.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.cluster.client - -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, Future} -import scala.util.{Failure, Success} - -import akka.actor.ActorRef -import akka.pattern.ask -import akka.util.Timeout - -import org.apache.gearpump.cluster.ClientToMaster._ -import org.apache.gearpump.cluster.MasterToAppMaster.{AppMastersData, AppMastersDataRequest} -import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult, SubmitApplicationResult} -import org.apache.gearpump.cluster.{AppDescription, AppJar} - -/** - * Client to inter-operate with Master node. - * - * NOTE: Stateless, thread safe - */ -class MasterClient(master: ActorRef, timeout: Timeout) { - implicit val masterClientTimeout = timeout - - def submitApplication(app: AppDescription, appJar: Option[AppJar]): Int = { - val result = Await.result( - (master ? SubmitApplication(app, appJar)).asInstanceOf[Future[SubmitApplicationResult]], - Duration.Inf) - val appId = result.appId match { - case Success(appId) => - // scalastyle:off println - Console.println(s"Submit application succeed. The application id is $appId") - // scalastyle:on println - appId - case Failure(ex) => throw ex - } - appId - } - - def resolveAppId(appId: Int): ActorRef = { - val result = Await.result( - (master ? ResolveAppId(appId)).asInstanceOf[Future[ResolveAppIdResult]], Duration.Inf) - result.appMaster match { - case Success(appMaster) => appMaster - case Failure(ex) => throw ex - } - } - - def shutdownApplication(appId: Int): Unit = { - val result = Await.result( - (master ? ShutdownApplication(appId)).asInstanceOf[Future[ShutdownApplicationResult]], - Duration.Inf) - result.appId match { - case Success(_) => - case Failure(ex) => throw ex - } - } - - def listApplications: AppMastersData = { - val result = Await.result( - (master ? AppMastersDataRequest).asInstanceOf[Future[AppMastersData]], Duration.Inf) - result - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ab640f71/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala new file mode 100644 index 0000000..153c824 --- /dev/null +++ b/core/src/main/scala/org/apache/gearpump/cluster/client/RunningApplication.scala @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.cluster.client + +import akka.pattern.ask +import akka.actor.ActorRef +import akka.util.Timeout +import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication} +import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult} +import org.apache.gearpump.util.ActorUtil + +import scala.concurrent.Future +import scala.util.{Failure, Success} +import scala.concurrent.ExecutionContext.Implicits.global + +class RunningApplication(val appId: Int, master: ActorRef, timeout: Timeout) { + lazy val appMaster: Future[ActorRef] = resolveAppMaster(appId) + + def shutDown(): Unit = { + val result = ActorUtil.askActor[ShutdownApplicationResult](master, + ShutdownApplication(appId), timeout) + result.appId match { + case Success(_) => + case Failure(ex) => throw ex + } + } + + def askAppMaster[T](msg: Any): Future[T] = { + appMaster.flatMap(_.ask(msg)(timeout).asInstanceOf[Future[T]]) + } + + private def resolveAppMaster(appId: Int): Future[ActorRef] = { + master.ask(ResolveAppId(appId))(timeout). + asInstanceOf[Future[ResolveAppIdResult]].map(_.appMaster.get) + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ab640f71/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala index 09f2969..82c7fe2 100644 --- a/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala +++ b/core/src/main/scala/org/apache/gearpump/util/ActorUtil.scala @@ -21,13 +21,12 @@ package org.apache.gearpump.util import org.apache.gearpump.cluster.AppMasterContext import org.apache.gearpump.cluster.worker.WorkerId -import scala.concurrent.{ExecutionContext, Future} - +import scala.concurrent.{Await, ExecutionContext, Future} import akka.actor.Actor.Receive import akka.actor._ import akka.pattern.ask import org.slf4j.Logger - +import akka.util.Timeout import org.apache.gearpump.cluster.AppMasterToMaster.{ActivateAppMaster, GetAllWorkers} import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ResolveWorkerId} import org.apache.gearpump.cluster.MasterToAppMaster.WorkerList @@ -36,6 +35,8 @@ import org.apache.gearpump.cluster.appmaster.ExecutorSystemScheduler.{ExecutorSy import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest} import org.apache.gearpump.transport.HostPort +import scala.concurrent.duration.Duration + object ActorUtil { private val LOG: Logger = LogUtil.getLogger(getClass) @@ -136,4 +137,13 @@ object ActorUtil { implicit val timeout = Constants.FUTURE_TIMEOUT (actor ? msg).asInstanceOf[Future[T]] } + + def askActor[T](actor: ActorRef, msg: Any, timeout: Timeout)(implicit ex: ExecutionContext): T = { + askActor(actor, msg, timeout, ActorRef.noSender) + } + + def askActor[T](actor: ActorRef, msg: Any, timeout: Timeout, sender: ActorRef) + (implicit ex: ExecutionContext): T = { + Await.result(actor.ask(msg)(timeout, sender).asInstanceOf[Future[T]], Duration.Inf) + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ab640f71/core/src/test/scala/org/apache/gearpump/cluster/client/RunningApplicationSpec.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/gearpump/cluster/client/RunningApplicationSpec.scala b/core/src/test/scala/org/apache/gearpump/cluster/client/RunningApplicationSpec.scala new file mode 100644 index 0000000..5f0d5e4 --- /dev/null +++ b/core/src/test/scala/org/apache/gearpump/cluster/client/RunningApplicationSpec.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.cluster.client + +import java.util.concurrent.TimeUnit + +import akka.actor.ActorSystem +import akka.testkit.TestProbe +import akka.util.Timeout +import org.apache.gearpump.cluster.ClientToMaster.{ResolveAppId, ShutdownApplication} +import org.apache.gearpump.cluster.MasterToClient.{ResolveAppIdResult, ShutdownApplicationResult} +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.cluster.client.RunningApplicationSpec.{MockAskAppMasterRequest, MockAskAppMasterResponse} +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import scala.concurrent.{Await, Future} +import scala.concurrent.duration.Duration +import scala.util.{Failure, Success} +import scala.concurrent.ExecutionContext.Implicits.global + +class RunningApplicationSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + implicit var system: ActorSystem = _ + + override def beforeAll(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + "RunningApplication" should "be able to shutdown application" in { + val errorMsg = "mock exception" + val master = TestProbe() + val timeout = Timeout(90, TimeUnit.SECONDS) + val application = new RunningApplication(1, master.ref, timeout) + Future { + application.shutDown() + } + master.expectMsg(ShutdownApplication(1)) + master.reply(ShutdownApplicationResult(Success(1))) + + val result = Future { + intercept[Exception] { + application.shutDown() + } + } + master.expectMsg(ShutdownApplication(1)) + master.reply(ShutdownApplicationResult(Failure(new Exception(errorMsg)))) + val exception = Await.result(result, Duration.Inf) + assert(exception.getMessage.equals(errorMsg)) + } + + "RunningApplication" should "be able to ask appmaster" in { + val master = TestProbe() + val appMaster = TestProbe() + val appId = 1 + val timeout = Timeout(90, TimeUnit.SECONDS) + val request = MockAskAppMasterRequest("request") + val application = new RunningApplication(appId, master.ref, timeout) + val future = application.askAppMaster[MockAskAppMasterResponse](request) + master.expectMsg(ResolveAppId(appId)) + master.reply(ResolveAppIdResult(Success(appMaster.ref))) + appMaster.expectMsg(MockAskAppMasterRequest("request")) + appMaster.reply(MockAskAppMasterResponse("response")) + val result = Await.result(future, Duration.Inf) + assert(result.res.equals("response")) + + // ResolveAppId should not be called multiple times + val future2 = application.askAppMaster[MockAskAppMasterResponse](request) + appMaster.expectMsg(MockAskAppMasterRequest("request")) + appMaster.reply(MockAskAppMasterResponse("response")) + val result2 = Await.result(future2, Duration.Inf) + assert(result2.res.equals("response")) + } +} + +object RunningApplicationSpec { + case class MockAskAppMasterRequest(req: String) + + case class MockAskAppMasterResponse(res: String) +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ab640f71/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala ---------------------------------------------------------------------- diff --git a/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala index c4eec07..6db8531 100644 --- a/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala +++ b/examples/distributedshell/src/main/scala/org/apache/gearpump/examples/distributedshell/DistributedShell.scala @@ -33,9 +33,9 @@ object DistributedShell extends AkkaApp with ArgumentsParser { override def main(akkaConf: Config, args: Array[String]): Unit = { LOG.info(s"Distributed shell submitting application...") val context = ClientContext(akkaConf) - val appId = context.submit(Application[DistShellAppMaster]("DistributedShell", + val app = context.submit(Application[DistShellAppMaster]("DistributedShell", UserConfig.empty)) context.close() - LOG.info(s"Distributed Shell Application started with appId $appId !") + LOG.info(s"Distributed Shell Application started with appId ${app.appId} !") } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ab640f71/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala ---------------------------------------------------------------------- diff --git a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala index df7a517..655389b 100644 --- a/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala +++ b/examples/distributeservice/src/main/scala/org/apache/gearpump/experiments/distributeservice/DistributeService.scala @@ -33,9 +33,9 @@ object DistributeService extends AkkaApp with ArgumentsParser { override def main(akkaConf: Config, args: Array[String]): Unit = { LOG.info(s"Distribute Service submitting application...") val context = ClientContext(akkaConf) - val appId = context.submit(Application[DistServiceAppMaster]("DistributedService", + val app = context.submit(Application[DistServiceAppMaster]("DistributedService", UserConfig.empty)) context.close() - LOG.info(s"Distribute Service Application started with appId $appId !") + LOG.info(s"Distribute Service Application started with appId ${app.appId} !") } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ab640f71/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala index 544a4eb..df1de06 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala @@ -163,7 +163,7 @@ class GearpumpNimbus(clientContext: ClientContext, stormConf: JMap[AnyRef, AnyRe .withValue[JMap[AnyRef, AnyRef]](StormConstants.STORM_CONFIG, stormConfig) val app = StreamApplication(name, processorGraph, config) LOG.info(s"jar file uploaded to $uploadedJarLocation") - val appId = clientContext.submit(app, uploadedJarLocation, workerNum) + val appId = clientContext.submit(app, uploadedJarLocation, workerNum).appId applications += name -> appId topologies += name -> TopologyData(topology, stormConfig, uploadedJarLocation) LOG.info(s"Storm Application $appId submitted") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ab640f71/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala index 62a431a..ed15121 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala @@ -169,7 +169,7 @@ class MasterService(val master: ActorRef, } val effectiveConfig = if (userConfig == null) UserConfig.empty else userConfig - val appId = context.submit(new StreamApplication(appName, effectiveConfig, graph)) + val appId = context.submit(new StreamApplication(appName, effectiveConfig, graph)).appId import upickle.default.write val submitApplicationResultValue = SubmitApplicationResultValue(appId) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/ab640f71/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala index b8d1f4c..f5b2910 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala @@ -21,7 +21,7 @@ package org.apache.gearpump.streaming.dsl.javaapi import java.util.Collection import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.client.{ClientContext, RunningApplication} import org.apache.gearpump.streaming.dsl.scalaapi.{CollectionDataSource, StreamApp} import org.apache.gearpump.streaming.source.DataSource @@ -42,7 +42,7 @@ class JavaStreamApp(name: String, context: ClientContext, userConfig: UserConfig new JavaStream[T](streamApp.source(dataSource, parallelism, conf, description)) } - def submit(): Int = { + def submit(): RunningApplication = { context.submit(streamApp) } }
