Repository: incubator-gearpump Updated Branches: refs/heads/master 0f5f7221e -> 4ad0ec428
[GEARPUMP-334] Fix Java WordCount DSL example Author: manuzhang <[email protected]> Closes #204 from manuzhang/fix_wc_java_dsl. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/4ad0ec42 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/4ad0ec42 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/4ad0ec42 Branch: refs/heads/master Commit: 4ad0ec42819d81adeac57b13b772b023a8746aac Parents: 0f5f722 Author: manuzhang <[email protected]> Authored: Wed Aug 2 17:53:15 2017 +0800 Committer: manuzhang <[email protected]> Committed: Wed Aug 2 17:53:26 2017 +0800 ---------------------------------------------------------------------- .../gearpump/cluster/client/ClientContext.scala | 37 ++++++++++++++++---- .../cluster/client/RuntimeEnvironment.scala | 5 ++- .../cluster/embedded/EmbeddedCluster.scala | 7 +--- .../embedded/EmbeddedRuntimeEnvironemnt.scala | 6 ++-- .../examples/wordcountjava/dsl/WordCount.java | 14 ++++++-- .../gearpump/akkastream/graph/RemoteGraph.scala | 12 +------ .../experiments/storm/main/GearpumpNimbus.scala | 2 +- .../gearpump/services/MasterService.scala | 2 +- 8 files changed, 52 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala index fc8af59..4840120 100755 --- a/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/client/ClientContext.scala @@ -40,14 +40,10 @@ import scala.concurrent.{Await, Future} import scala.util.{Failure, Success, Try} /** - * ClientContext is a user facing util to submit/manage an application. - * - * TODO: add interface to query master here + * ClientContext is a user facing utility to interact with the master. + * (e.g. submit/manage an application). */ -class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { - def this(config: Config) = { - this(config, null, null) - } +class ClientContext protected(config: Config, sys: ActorSystem, _master: ActorRef) { private val LOG: Logger = LogUtil.getLogger(getClass) implicit val system = Option(sys).getOrElse(ActorSystem(s"client${Util.randInt()}", config)) @@ -178,9 +174,36 @@ class ClientContext(config: Config, sys: ActorSystem, _master: ActorRef) { object ClientContext { + /** + * Create a [[ClientContext]] which will instantiate an actor system + * to interact with the master parsed from `gearpump.cluster.masters`. + * The config is loaded from classpath. + */ def apply(): ClientContext = apply(ClusterConfig.default()) + /** + * Create a [[ClientContext]] which will instantiate an actor system + * to interact with the master parsed from `gearpump.cluster.masters` + * through the given config. + */ def apply(config: Config): ClientContext = { RuntimeEnvironment.newClientContext(config) } + + /** + * Create a [[ClientContext]] for the passed in actor system + * to interact with the master parsed from `gearpump.cluster.masters` + * through the given config. + */ + def apply(config: Config, system: ActorSystem): ClientContext = { + new ClientContext(config, system, null) + } + + /** + * Create a [[ClientContext]] for the passed in actor system + * to interact with the given master. + */ + def apply(config: Config, system: ActorSystem, master: ActorRef): ClientContext = { + new ClientContext(config, system, master) + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala b/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala index e90e73b..cf5842f 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/client/RuntimeEnvironment.scala @@ -19,6 +19,7 @@ package org.apache.gearpump.cluster.client import com.typesafe.config.Config +import org.apache.gearpump.cluster.client.RuntimeEnvironment.RemoteClientContext import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt /** @@ -34,13 +35,15 @@ abstract class RuntimeEnvironment { */ class RemoteRuntimeEnvironment extends RuntimeEnvironment { override def newClientContext(akkaConf: Config): ClientContext = { - new ClientContext(akkaConf) + new RemoteClientContext(akkaConf) } } object RuntimeEnvironment { private var envInstance: RuntimeEnvironment = _ + class RemoteClientContext(akkaConf: Config) extends ClientContext(akkaConf, null, null) + def get() : RuntimeEnvironment = { Option(envInstance).getOrElse(new EmbeddedRuntimeEnvironemnt) } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala index a3a3e39..8abcd96 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedCluster.scala @@ -21,12 +21,11 @@ package org.apache.gearpump.cluster.embedded import scala.collection.JavaConverters._ import scala.concurrent.Await import scala.concurrent.duration.Duration - import akka.actor.{ActorRef, ActorSystem, Props} import com.typesafe.config.{Config, ConfigValueFactory} - import org.apache.gearpump.cluster.ClusterConfig import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt.EmbeddedClientContext import org.apache.gearpump.cluster.master.{Master => MasterActor} import org.apache.gearpump.cluster.worker.{Worker => WorkerActor} import org.apache.gearpump.util.Constants.{GEARPUMP_CLUSTER_EXECUTOR_WORKER_SHARE_SAME_PROCESS, GEARPUMP_CLUSTER_MASTERS, GEARPUMP_METRIC_ENABLED, MASTER} @@ -65,10 +64,6 @@ class EmbeddedCluster(inputConfig: Config) { config } - def newClientContext: ClientContext = { - new ClientContext(config, system, master) - } - def stop(): Unit = { system.stop(master) system.terminate() http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala index 246fabd..fbea53f 100644 --- a/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala +++ b/core/src/main/scala/org/apache/gearpump/cluster/embedded/EmbeddedRuntimeEnvironemnt.scala @@ -19,7 +19,7 @@ package org.apache.gearpump.cluster.embedded import com.typesafe.config.Config import org.apache.gearpump.cluster.client.{ClientContext, RuntimeEnvironment} -import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt.LocalClientContext +import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt.EmbeddedClientContext /** * The EmbeddedRuntimeEnvironemnt is initiated when user trying to launch their application @@ -28,12 +28,12 @@ import org.apache.gearpump.cluster.embedded.EmbeddedRuntimeEnvironemnt.LocalClie */ class EmbeddedRuntimeEnvironemnt extends RuntimeEnvironment { override def newClientContext(akkaConf: Config): ClientContext = { - new LocalClientContext(akkaConf) + new EmbeddedClientContext(akkaConf) } } object EmbeddedRuntimeEnvironemnt { - class LocalClientContext private (cluster: EmbeddedCluster) + class EmbeddedClientContext private(cluster: EmbeddedCluster) extends ClientContext(cluster.config, cluster.system, cluster.master) { def this(akkaConf: Config) { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java index 2830b16..e8467fa 100644 --- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java +++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java @@ -31,6 +31,7 @@ import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction; import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction; import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction; import org.apache.gearpump.streaming.source.DataSource; +import org.apache.gearpump.streaming.source.Watermark; import org.apache.gearpump.streaming.task.TaskContext; import scala.Tuple2; @@ -46,7 +47,7 @@ public class WordCount { } public static void main(Config akkaConf, String[] args) throws InterruptedException { - ClientContext context = new ClientContext(akkaConf); + ClientContext context = ClientContext.apply(akkaConf); JavaStreamApp app = new JavaStreamApp("JavaDSL", context, UserConfig.empty()); JavaStream<String> sentence = app.source(new StringSource("This is a good start, bingo!! bingo!!"), @@ -69,6 +70,7 @@ public class WordCount { private static class StringSource implements DataSource { private final String str; + private boolean hasNext = true; StringSource(String str) { this.str = str; @@ -80,7 +82,9 @@ public class WordCount { @Override public Message read() { - return new DefaultMessage(str, Instant.now()); + Message msg = new DefaultMessage(str, Instant.now()); + hasNext = false; + return msg; } @Override @@ -89,7 +93,11 @@ public class WordCount { @Override public Instant getWatermark() { - return Instant.now(); + if (hasNext) { + return Instant.now(); + } else { + return Watermark.MAX(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala ---------------------------------------------------------------------- diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala index 4d400ff..f45cae0 100644 --- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala +++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/graph/RemoteGraph.scala @@ -26,7 +26,6 @@ import org.apache.gearpump.akkastream.task.SinkBridgeTask.SinkBridgeTaskClient import org.apache.gearpump.akkastream.task.SourceBridgeTask.SourceBridgeTaskClient import akka.stream.impl.StreamLayout.Module import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.cluster.embedded.EmbeddedCluster import org.apache.gearpump.streaming.ProcessorId import org.apache.gearpump.util.Graph @@ -48,16 +47,8 @@ object RemoteGraph { */ class RemoteGraphMaterializer(useInProcessCluster: Boolean, system: ActorSystem) extends SubGraphMaterializer { - private val local = if (useInProcessCluster) { - Some(EmbeddedCluster()) - } else { - None - } - private val context: ClientContext = local match { - case Some(l) => l.newClientContext - case None => ClientContext(null) - } + private val context: ClientContext = ClientContext() override def materialize(subGraph: SubGraph, inputMatValues: scala.collection.mutable.Map[Module, Any]): @@ -105,7 +96,6 @@ object RemoteGraph { override def shutdown: Unit = { context.close() - local.foreach(_.stop()) } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala ---------------------------------------------------------------------- diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala index e2d421c..987546c 100644 --- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala +++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/main/GearpumpNimbus.scala @@ -60,7 +60,7 @@ object GearpumpNimbus extends AkkaApp with ArgumentsParser { val akkaConf = updateClientConfig(inputAkkaConf) val system = ActorSystem("storm", akkaConf) - val clientContext = new ClientContext(akkaConf, system, null) + val clientContext = ClientContext(akkaConf, system, null) val stormConf = Utils.readStormConfig().asInstanceOf[JMap[AnyRef, AnyRef]] val thriftConf: JMap[AnyRef, AnyRef] = Map( Config.NIMBUS_HOST -> akkaConf.getString(Constants.GEARPUMP_HOSTNAME), http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/4ad0ec42/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala ---------------------------------------------------------------------- diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala index 5ba101b..be96577 100644 --- a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala +++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala @@ -160,7 +160,7 @@ class MasterService(val master: ActorRef, val msg = java.net.URLDecoder.decode(request, "UTF-8") val submitApplicationRequest = read[SubmitApplicationRequest](msg) import submitApplicationRequest.{appName, dag, processors, userConfig} - val context = new ClientContext(system.settings.config, system, master) + val context = ClientContext(system.settings.config, system, master) val graph = dag.mapVertex { processorId => processors(processorId)
