Repository: incubator-gearpump Updated Branches: refs/heads/master 4d8c02dfe -> d78683328
[GEARPUMP-289] Add FoldFunction Author: manuzhang <[email protected]> Closes #167 from manuzhang/fold. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/d7868332 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/d7868332 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/d7868332 Branch: refs/heads/master Commit: d78683328e1fb6a1143c5bc448a627de90466d7c Parents: 4d8c02d Author: manuzhang <[email protected]> Authored: Mon Mar 6 22:14:44 2017 +0800 Committer: manuzhang <[email protected]> Committed: Mon Mar 6 22:15:08 2017 +0800 ---------------------------------------------------------------------- .../examples/wordcountjava/dsl/WordCount.java | 8 +- .../dsl/api/functions/FilterFunction.scala | 4 +- .../dsl/api/functions/FoldFunction.scala | 34 ++++++++ .../dsl/api/functions/MapFunction.scala | 4 +- .../dsl/api/functions/ReduceFunction.scala | 17 ++-- .../streaming/dsl/javaapi/JavaStream.scala | 12 ++- .../dsl/javaapi/functions/FlatMapFunction.scala | 2 +- .../dsl/javaapi/functions/GroupByFunction.scala | 9 +- .../dsl/plan/functions/FunctionRunner.scala | 21 ++--- .../streaming/dsl/scalaapi/Stream.scala | 15 +++- .../scalaapi/functions/FlatMapFunction.scala | 16 ++-- .../streaming/dsl/plan/PlannerSpec.scala | 8 +- .../dsl/plan/functions/FunctionRunnerSpec.scala | 88 +++++++++++--------- 13 files changed, 154 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java index 7d8400d..fd32408 100644 --- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java +++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java @@ -95,7 +95,7 @@ public class WordCount { private static class Split extends FlatMapFunction<String, String> { @Override - public Iterator<String> apply(String s) { + public Iterator<String> flatMap(String s) { return Arrays.asList(s.split("\\s+")).iterator(); } } @@ -103,7 +103,7 @@ public class WordCount { private static class Ones extends MapFunction<String, Tuple2<String, Integer>> { @Override - public Tuple2<String, Integer> apply(String s) { + public Tuple2<String, Integer> map(String s) { return new Tuple2<>(s, 1); } } @@ -111,7 +111,7 @@ public class WordCount { private static class Count extends ReduceFunction<Tuple2<String, Integer>> { @Override - public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) { + public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) { return new Tuple2<>(t1._1(), t1._2() + t2._2()); } } @@ -119,7 +119,7 @@ public class WordCount { private static class TupleKey extends GroupByFunction<Tuple2<String, Integer>, String> { @Override - public String apply(Tuple2<String, Integer> tuple) { + public String groupBy(Tuple2<String, Integer> tuple) { return tuple._1(); } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala index e4e7309..25a0929 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala @@ -23,7 +23,7 @@ object FilterFunction { def apply[T](fn: T => Boolean): FilterFunction[T] = { new FilterFunction[T] { - override def apply(t: T): Boolean = { + override def filter(t: T): Boolean = { fn(t) } } @@ -37,6 +37,6 @@ object FilterFunction { */ abstract class FilterFunction[T] extends SerializableFunction { - def apply(t: T): Boolean + def filter(t: T): Boolean } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala new file mode 100644 index 0000000..9ff44a8 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.api.functions + +import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction + +/** + * Combines input into an accumulator. + * + * @param A type of accumulator + * @param T Type of input + */ +abstract class FoldFunction[T, A] extends SerializableFunction { + + def init: A + + def fold(accumulator: A, t: T): A +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala index 70fe9d4..a4fdca6 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala @@ -23,7 +23,7 @@ object MapFunction { def apply[T, R](fn: T => R): MapFunction[T, R] = { new MapFunction[T, R] { - override def apply(t: T): R = { + override def map(t: T): R = { fn(t) } } @@ -38,6 +38,6 @@ object MapFunction { */ abstract class MapFunction[T, R] extends SerializableFunction { - def apply(t: T): R + def map(t: T): R } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala index 25b12be..25f157b 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala @@ -17,13 +17,11 @@ */ package org.apache.gearpump.streaming.dsl.api.functions -import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction - object ReduceFunction { def apply[T](fn: (T, T) => T): ReduceFunction[T] = { new ReduceFunction[T] { - override def apply(t1: T, t2: T): T = { + override def reduce(t1: T, t2: T): T = { fn(t1, t2) } } @@ -35,8 +33,17 @@ object ReduceFunction { * * @param T Type of both inputs and output */ -abstract class ReduceFunction[T] extends SerializableFunction { +abstract class ReduceFunction[T] extends FoldFunction[T, Option[T]] { + + override def init: Option[T] = None - def apply(t1: T, t2: T): T + override def fold(accumulator: Option[T], t: T): Option[T] = { + if (accumulator.isEmpty) { + Option(t) + } else { + accumulator.map(reduce(_, t)) + } + } + def reduce(t1: T, t2: T): T } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala index 592c4dc..da0e4db 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala @@ -18,8 +18,8 @@ package org.apache.gearpump.streaming.dsl.javaapi import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction} -import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction, GroupByFunction} +import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, FoldFunction, MapFunction, ReduceFunction} +import org.apache.gearpump.streaming.dsl.javaapi.functions.{GroupByFunction, FlatMapFunction => JFlatMapFunction} import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, WindowStream} import org.apache.gearpump.streaming.dsl.window.api.Windows @@ -45,6 +45,10 @@ class JavaStream[T](val stream: Stream[T]) { new JavaStream[T](stream.flatMap(FlatMapFunction(fn), description)) } + def fold[A](fn: FoldFunction[T, A], description: String): JavaStream[A] = { + new JavaStream[A](stream.fold(fn, description)) + } + /** Does aggregation on the stream */ def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = { new JavaStream[T](stream.reduce(fn, description)) @@ -65,7 +69,7 @@ class JavaStream[T](val stream: Stream[T]) { */ def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int, description: String): JavaStream[T] = { - new JavaStream[T](stream.groupBy(fn.apply, parallelism, description)) + new JavaStream[T](stream.groupBy(fn.groupBy, parallelism, description)) } def window(win: Windows[T], description: String): JavaWindowStream[T] = { @@ -84,6 +88,6 @@ class JavaWindowStream[T](stream: WindowStream[T]) { def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int, description: String): JavaStream[T] = { - new JavaStream[T](stream.groupBy(fn.apply, parallelism, description)) + new JavaStream[T](stream.groupBy(fn.groupBy, parallelism, description)) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala index 85d597d..11e2416 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala @@ -28,5 +28,5 @@ import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction */ abstract class FlatMapFunction[T, R] extends SerializableFunction { - def apply(t: T): java.util.Iterator[R] + def flatMap(t: T): java.util.Iterator[R] } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala index 7656cba..5a86a86 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala @@ -25,4 +25,11 @@ import org.apache.gearpump.streaming.dsl.api.functions.MapFunction * @param T Input value type * @param GROUP Group value type */ -abstract class GroupByFunction[T, GROUP] extends MapFunction[T, GROUP] +abstract class GroupByFunction[T, GROUP] extends MapFunction[T, GROUP] { + + override def map(t: T): GROUP = { + groupBy(t) + } + + def groupBy(t: T): GROUP +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala index 9dfa6ad..c27300f 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala @@ -17,7 +17,7 @@ */ package org.apache.gearpump.streaming.dsl.plan.functions -import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction +import org.apache.gearpump.streaming.dsl.api.functions.{FoldFunction, ReduceFunction} import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction object FunctionRunner { @@ -88,7 +88,7 @@ class FlatMapper[IN, OUT](fn: FlatMapFunction[IN, OUT], val description: String) } override def process(value: IN): TraversableOnce[OUT] = { - fn(value) + fn.flatMap(value) } override def teardown(): Unit = { @@ -96,25 +96,22 @@ class FlatMapper[IN, OUT](fn: FlatMapFunction[IN, OUT], val description: String) } } -class Reducer[T](fn: ReduceFunction[T], val description: String) - extends FunctionRunner[T, T] { +class FoldRunner[T, A](fn: FoldFunction[T, A], val description: String) + extends FunctionRunner[T, A] { - private var state: Option[T] = None + private var state: Option[A] = None override def setup(): Unit = { fn.setup() + state = Option(fn.init) } - override def process(value: T): TraversableOnce[T] = { - if (state.isEmpty) { - state = Option(value) - } else { - state = state.map(fn(_, value)) - } + override def process(value: T): TraversableOnce[A] = { + state = state.map(fn.fold(_, value)) None } - override def finish(): TraversableOnce[T] = { + override def finish(): TraversableOnce[A] = { state } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala index f71276b..9a614e8 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala @@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.dsl.scalaapi import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction} +import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, FoldFunction, MapFunction, ReduceFunction} import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.plan._ import org.apache.gearpump.streaming.dsl.plan.functions._ @@ -100,6 +100,17 @@ class Stream[T]( def filter(fn: FilterFunction[T], description: String): Stream[T] = { this.flatMap(FlatMapFunction(fn), description) } + + /** + * Returns a new stream by applying a fold function over all the elements + * + * @param fn fold function + * @return a new stream after fold + */ + def fold[A](fn: FoldFunction[T, A], description: String): Stream[A] = { + transform(new FoldRunner(fn, description)) + } + /** * Returns a new stream by applying a reduce function over all the elements. * @@ -119,7 +130,7 @@ class Stream[T]( * @return a new stream after reduce */ def reduce(fn: ReduceFunction[T], description: String): Stream[T] = { - transform(new Reducer[T](fn, description)) + fold(fn, description).map(_.get) } private def transform[R](fn: FunctionRunner[T, R]): Stream[R] = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala index f10a3db..252b5bd 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala @@ -31,8 +31,8 @@ object FlatMapFunction { fn.setup() } - override def apply(t: T): TraversableOnce[R] = { - fn.apply(t).asScala + override def flatMap(t: T): TraversableOnce[R] = { + fn.flatMap(t).asScala } @@ -44,7 +44,7 @@ object FlatMapFunction { def apply[T, R](fn: T => TraversableOnce[R]): FlatMapFunction[T, R] = { new FlatMapFunction[T, R] { - override def apply(t: T): TraversableOnce[R] = { + override def flatMap(t: T): TraversableOnce[R] = { fn(t) } } @@ -57,8 +57,8 @@ object FlatMapFunction { fn.setup() } - override def apply(t: T): TraversableOnce[R] = { - Option(fn(t)) + override def flatMap(t: T): TraversableOnce[R] = { + Option(fn.map(t)) } override def teardown(): Unit = { @@ -74,8 +74,8 @@ object FlatMapFunction { fn.setup() } - override def apply(t: T): TraversableOnce[T] = { - if (fn(t)) { + override def flatMap(t: T): TraversableOnce[T] = { + if (fn.filter(t)) { Option(t) } else { None @@ -98,6 +98,6 @@ object FlatMapFunction { */ abstract class FlatMapFunction[T, R] extends SerializableFunction { - def apply(t: T): TraversableOnce[R] + def flatMap(t: T): TraversableOnce[R] } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala index 2e4bbb3..70abde9 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala @@ -27,13 +27,13 @@ import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction import org.apache.gearpump.streaming.partitioner.CoLocationPartitioner import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._ -import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, Reducer} +import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, FoldRunner} import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.window.api.CountWindows import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.source.DataSource -import org.apache.gearpump.streaming.{MockUtil, Processor} +import org.apache.gearpump.streaming.MockUtil import org.apache.gearpump.streaming.task.{Task, TaskContext} import org.apache.gearpump.util.Graph import org.scalatest.mock.MockitoSugar @@ -61,7 +61,7 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc val groupBy = GroupAlsoByWindow((any: Any) => any, CountWindows.apply[Any](1)) val groupByOp = GroupByOp(groupBy) val flatMapOp = ChainableOp[Any, Any](anyFlatMapper) - val reduceOp = ChainableOp[Any, Any](anyReducer) + val reduceOp = ChainableOp[Any, Option[Any]](anyReducer) val processorOp = new ProcessorOp[AnyTask] val sinkOp = DataSinkOp(new AnySink) val directEdge = Direct @@ -97,7 +97,7 @@ object PlannerSpec { private val anyFlatMapper = new FlatMapper[Any, Any]( FlatMapFunction(Option(_)), "flatMap") - private val anyReducer = new Reducer[Any]( + private val anyReducer = new FoldRunner[Any, Option[Any]]( ReduceFunction((left: Any, right: Any) => (left, right)), "reduce") class AnyTask(context: TaskContext, config: UserConfig) extends Task(context, config) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala index d26b7d9..a9b23fe 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala @@ -25,7 +25,7 @@ import org.apache.gearpump.cluster.{TestUtil, UserConfig} import org.apache.gearpump.streaming.MockUtil import org.apache.gearpump.streaming.source.{DataSourceTask, Watermark} import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction +import org.apache.gearpump.streaming.dsl.api.functions.{FoldFunction, ReduceFunction} import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform @@ -102,7 +102,8 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { val map = new FlatMapper[String, Int](FlatMapFunction(word => Some(1)), "map") - val sum = new Reducer[Int](ReduceFunction({(left, right) => left + right}), "sum") + val sum = new FoldRunner[Int, Option[Int]]( + ReduceFunction({(left, right) => left + right}), "sum") val all = AndThen(split, AndThen(filter, AndThen(map, sum))) @@ -116,11 +117,14 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { five four five """ + + all.setup() // force eager evaluation all.process(data).toList - val result = all.finish().toList + val result = all.finish().toList.map(_.get) assert(result.nonEmpty) assert(result.last == 15) + all.teardown() } } @@ -132,7 +136,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { "call flatMap function when processing input value" in { val input = mock[R] flatMapper.process(input) - verify(flatMapFunction).apply(input) + verify(flatMapFunction).flatMap(input) } "return passed in description" in { @@ -156,54 +160,59 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { } } - "ReduceFunction" should { + "FoldRunner" should { - "call reduce function when processing input value" in { - val reduceFunction = mock[ReduceFunction[T]] - val reducer = new Reducer[T](reduceFunction, "reduce") + "call fold function when processing input value" in { + val foldFunction = mock[FoldFunction[T, List[T]]] + val foldRunner = new FoldRunner[T, List[T]](foldFunction, "fold") val input1 = mock[T] val input2 = mock[T] - val output = mock[T] - - when(reduceFunction.apply(input1, input2)).thenReturn(output, output) - - reducer.process(input1) shouldBe List.empty[T] - reducer.process(input2) shouldBe List.empty[T] - reducer.finish() shouldBe List(output) - reducer.teardown() - reducer.process(input1) shouldBe List.empty[T] - reducer.teardown() - reducer.process(input2) shouldBe List.empty[T] - reducer.finish() shouldBe List(input2) + when(foldFunction.init).thenReturn(Nil) + when(foldFunction.fold(Nil, input1)).thenReturn(List(input1)) + when(foldFunction.fold(Nil, input2)).thenReturn(List(input2)) + when(foldFunction.fold(List(input1), input2)).thenReturn(List(input1, input2)) + + foldRunner.setup() + foldRunner.process(input1) shouldBe List.empty[T] + foldRunner.process(input2) shouldBe List.empty[T] + foldRunner.finish() shouldBe List(List(input1, input2)) + foldRunner.teardown() + + foldRunner.setup() + foldRunner.process(input1) shouldBe List.empty[T] + foldRunner.teardown() + foldRunner.setup() + foldRunner.process(input2) shouldBe List.empty[T] + foldRunner.finish() shouldBe List(List(input2)) } "return passed in description" in { - val reduceFunction = mock[ReduceFunction[T]] - val reducer = new Reducer[T](reduceFunction, "reduce") - reducer.description shouldBe "reduce" + val foldFunction = mock[FoldFunction[S, T]] + val foldRunner = new FoldRunner[S, T](foldFunction, "fold") + foldRunner.description shouldBe "fold" } "return None on finish" in { - val reduceFunction = mock[ReduceFunction[T]] - val reducer = new Reducer[T](reduceFunction, "reduce") - reducer.finish() shouldBe List.empty[T] + val foldFunction = mock[FoldFunction[S, T]] + val foldRunner = new FoldRunner[S, T](foldFunction, "fold") + foldRunner.finish() shouldBe List.empty[T] } - "set up reduce function on setup" in { - val reduceFunction = mock[ReduceFunction[T]] - val reducer = new Reducer[T](reduceFunction, "reduce") - reducer.setup() + "set up fold function on setup" in { + val foldFunction = mock[FoldFunction[S, T]] + val foldRunner = new FoldRunner[S, T](foldFunction, "fold") + foldRunner.setup() - verify(reduceFunction).setup() + verify(foldFunction).setup() } - "tear down reduce function on teardown" in { - val reduceFunction = mock[ReduceFunction[T]] - val reducer = new Reducer[T](reduceFunction, "reduce") - reducer.teardown() + "tear down fold function on teardown" in { + val foldFunction = mock[FoldFunction[S, T]] + val foldRunner = new FoldRunner[S, T](foldFunction, "fold") + foldRunner.teardown() - verify(reduceFunction).teardown() + verify(foldFunction).teardown() } } @@ -284,11 +293,11 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { val data = "1 2 2 3 3 3" - val concat = new Reducer[String](ReduceFunction({ (left, right) => + val concat = new FoldRunner[String, Option[String]](ReduceFunction({ (left, right) => left + right}), "concat") implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - val config = UserConfig.empty.withValue[FunctionRunner[String, String]]( + val config = UserConfig.empty.withValue[FunctionRunner[String, Option[String]]]( GEARPUMP_STREAMING_OPERATOR, concat) val taskContext = MockUtil.mockTaskContext @@ -307,7 +316,8 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { import scala.collection.JavaConverters._ - val values = peopleCaptor.getAllValues.asScala.map(input => input.msg.asInstanceOf[String]) + val values = peopleCaptor.getAllValues.asScala.map(input => + input.msg.asInstanceOf[Option[String]].get) assert(values.mkString(",") == "1,2,22,3,33,333") system.terminate() Await.result(system.whenTerminated, Duration.Inf)
