Repository: incubator-gearpump Updated Branches: refs/heads/master 5cf79bfd3 -> 791f45a0c
[GEARPUMP-245] Invoke GroupByFunction.apply in JavaStream DSL Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [x] Make sure the commit message is formatted like: `[GEARPUMP-<Jira issue #>] Meaningful description of pull request` - [x] Make sure tests pass via `sbt clean test`. - [x] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality. Author: manuzhang <[email protected]> Closes #119 from manuzhang/GEARPUMP-246. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/791f45a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/791f45a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/791f45a0 Branch: refs/heads/master Commit: 791f45a0cd789bf2b2190167f701800fc4315be0 Parents: 5cf79bf Author: manuzhang <[email protected]> Authored: Thu Dec 8 11:45:06 2016 +0800 Committer: huafengw <[email protected]> Committed: Thu Dec 8 11:45:06 2016 +0800 ---------------------------------------------------------------------- .../examples/wordcountjava/dsl/WordCount.java | 82 +++++++++++--------- .../streaming/dsl/javaapi/JavaStream.scala | 2 +- .../streaming/dsl/task/CountTriggerTask.scala | 1 - .../dsl/task/EventTimeTriggerTask.scala | 1 - .../dsl/task/ProcessingTimeTriggerTask.scala | 1 - 5 files changed, 45 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/791f45a0/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java index 0ecc42e..a453d8c 100644 --- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java +++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java @@ -19,21 +19,18 @@ package org.apache.gearpump.streaming.examples.wordcountjava.dsl; import com.typesafe.config.Config; +import org.apache.gearpump.Message; import org.apache.gearpump.cluster.ClusterConfig; import org.apache.gearpump.cluster.UserConfig; import org.apache.gearpump.cluster.client.ClientContext; import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; -import org.apache.gearpump.streaming.javaapi.dsl.functions.FlatMapFunction; -import org.apache.gearpump.streaming.javaapi.dsl.functions.GroupByFunction; -import org.apache.gearpump.streaming.javaapi.dsl.functions.MapFunction; -import org.apache.gearpump.streaming.javaapi.dsl.functions.ReduceFunction; +import org.apache.gearpump.streaming.source.DataSource; +import org.apache.gearpump.streaming.task.TaskContext; import scala.Tuple2; -import java.util.ArrayList; +import java.time.Instant; import java.util.Arrays; -import java.util.Iterator; -import java.util.List; /** Java version of WordCount with high level DSL API */ public class WordCount { @@ -45,41 +42,50 @@ public class WordCount { public static void main(Config akkaConf, String[] args) throws InterruptedException { ClientContext context = new ClientContext(akkaConf); JavaStreamApp app = new JavaStreamApp("JavaDSL", context, UserConfig.empty()); - List<String> source = new ArrayList<>(Arrays.asList("This is a good start, bingo!! bingo!!")); - - JavaStream<String> sentence = app.source(source, 1, UserConfig.empty(), "source"); - - JavaStream<String> words = sentence.flatMap(new FlatMapFunction<String, String>() { - @Override - public Iterator<String> apply(String s) { - return new ArrayList<String>(Arrays.asList(s.split("\\s+"))).iterator(); - } - }, "flatMap"); - - JavaStream<Tuple2<String, Integer>> ones = words.map(new MapFunction<String, Tuple2<String, Integer>>() { - @Override - public Tuple2<String, Integer> apply(String s) { - return new Tuple2<String, Integer>(s, 1); - } - }, "map"); - - JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new GroupByFunction<Tuple2<String, Integer>, String>() { - @Override - public String apply(Tuple2<String, Integer> tuple) { - return tuple._1(); - } - }, 1, "groupBy"); - - JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new ReduceFunction<Tuple2<String, Integer>>() { - @Override - public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) { - return new Tuple2<String, Integer>(t1._1(), t1._2() + t2._2()); - } - }, "reduce"); + + JavaStream<String> sentence = app.source(new StringSource("This is a good start, bingo!! bingo!!"), + 1, UserConfig.empty(), "source"); + + JavaStream<String> words = sentence.flatMap(s -> Arrays.asList(s.split("\\s+")).iterator(), + "flatMap"); + + JavaStream<Tuple2<String, Integer>> ones = words.map(s -> new Tuple2<>(s, 1), "map"); + + JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(Tuple2::_1, 1, "groupBy"); + + JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce( + (t1, t2) -> new Tuple2<>(t1._1(), t1._2() + t2._2()), "reduce"); wordcount.log(); app.run(); context.close(); } + + private static class StringSource implements DataSource { + + private final String str; + + StringSource(String str) { + this.str = str; + } + + @Override + public void open(TaskContext context, Instant startTime) { + } + + @Override + public Message read() { + return Message.apply(str, Instant.now().toEpochMilli()); + } + + @Override + public void close() { + } + + @Override + public Instant getWatermark() { + return Instant.now(); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/791f45a0/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala index 3003b98..f68731e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala @@ -65,7 +65,7 @@ class JavaStream[T](val stream: Stream[T]) { */ def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int, description: String): JavaStream[T] = { - new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description)) + new JavaStream[T](stream.groupBy(fn.apply, parallelism, description)) } def window(win: Window, description: String): JavaWindowStream[T] = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/791f45a0/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala index 4ee2fa8..06f2964 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala @@ -19,7 +19,6 @@ package org.apache.gearpump.streaming.dsl.task import java.time.Instant -import akka.actor.ActorSystem import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/791f45a0/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala index 4b7649f..0674339 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala @@ -19,7 +19,6 @@ package org.apache.gearpump.streaming.dsl.task import java.time.Instant -import akka.actor.ActorSystem import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/791f45a0/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala index 980a54b..78ba762 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala @@ -21,7 +21,6 @@ import java.time.Instant import java.util.concurrent.TimeUnit import akka.actor.Actor.Receive -import akka.actor.ActorSystem import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._
