Repository: incubator-gearpump Updated Branches: refs/heads/akka-streams f1bec6709 -> cc0578e5d
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala index f8666ba..3f23fa9 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala @@ -23,10 +23,12 @@ import java.time.Instant import akka.actor.ActorSystem import org.apache.gearpump.Message import org.apache.gearpump.cluster.{TestUtil, UserConfig} -import org.apache.gearpump.partitioner.CoLocationPartitioner +import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction +import org.apache.gearpump.streaming.partitioner.CoLocationPartitioner import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._ -import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction} +import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, Reducer} +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.window.api.GroupByFn import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.source.DataSource @@ -56,8 +58,8 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc val graph = Graph.empty[Op, OpEdge] val sourceOp = DataSourceOp(new AnySource) val groupByOp = GroupByOp(new AnyGroupByFn) - val flatMapOp = ChainableOp[Any, Any](anyFlatMapFunction) - val reduceOp = ChainableOp[Any, Any](anyReduceFunction) + val flatMapOp = ChainableOp[Any, Any](anyFlatMapper) + val reduceOp = ChainableOp[Any, Any](anyReducer) val processorOp = new ProcessorOp[AnyTask] val sinkOp = DataSinkOp(new AnySink) val directEdge = Direct @@ -92,9 +94,10 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc object PlannerSpec { private val anyParallelism = 1 - private val anyFlatMapFunction = new FlatMapFunction[Any, Any](Option(_), "flatMap") - private val anyReduceFunction = new ReduceFunction[Any]( - (left: Any, right: Any) => (left, right), "reduce") + private val anyFlatMapper = new FlatMapper[Any, Any]( + FlatMapFunction(Option(_)), "flatMap") + private val anyReducer = new Reducer[Any]( + ReduceFunction((left: Any, right: Any) => (left, right)), "reduce") class AnyTask(context: TaskContext, config: UserConfig) extends Task(context, config) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala index 94feae4..2c03e1c 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala @@ -23,9 +23,11 @@ import akka.actor.ActorSystem import org.apache.gearpump.Message import org.apache.gearpump.cluster.{TestUtil, UserConfig} import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.CollectionDataSource import org.apache.gearpump.streaming.source.DataSourceTask import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction +import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask} import org.apache.gearpump.streaming.dsl.window.api.CountWindow import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow @@ -45,7 +47,7 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar { val first = mock[SingleInputFunction[R, S]] val second = mock[SingleInputFunction[S, T]] - val andThen = new AndThen(first, second) + val andThen = AndThen(first, second) "chain first and second functions when processing input value" in { val input = mock[R] @@ -77,161 +79,164 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar { andThen.finish().toList shouldBe List(secondResult) } - "clear both states on clearState" in { - andThen.clearState() + "set up both functions on setup" in { + andThen.setup() - verify(first).clearState() - verify(second).clearState() + verify(first).setup() + verify(second).setup() } - "return AndThen on andThen" in { - val third = mock[SingleInputFunction[T, Any]] - andThen.andThen[Any](third) shouldBe an [AndThen[_, _, _]] + "tear down both functions on teardown" in { + andThen.teardown() + + verify(first).teardown() + verify(second).teardown() + } + + "chain multiple single input function" in { + val split = new FlatMapper[String, String](FlatMapFunction(_.split("\\s")), "split") + + val filter = new FlatMapper[String, String]( + FlatMapFunction(word => if (word.isEmpty) None else Some(word)), "filter") + + val map = new FlatMapper[String, Int](FlatMapFunction(word => Some(1)), "map") + + val sum = new Reducer[Int](ReduceFunction({(left, right) => left + right}), "sum") + + val all = AndThen(split, AndThen(filter, AndThen(map, sum))) + + assert(all.description == "split.filter.map.sum") + + val data = + """ + five four three two one + five four three two + five four three + five four + five + """ + // force eager evaluation + all.process(data).toList + val result = all.finish().toList + assert(result.nonEmpty) + assert(result.last == 15) } } - "FlatMapFunction" should { + "FlatMapper" should { - val flatMap = mock[R => TraversableOnce[S]] - val flatMapFunction = new FlatMapFunction[R, S](flatMap, "flatMap") + val flatMapFunction = mock[FlatMapFunction[R, S]] + val flatMapper = new FlatMapper[R, S](flatMapFunction, "flatMap") "call flatMap function when processing input value" in { val input = mock[R] - flatMapFunction.process(input) - verify(flatMap).apply(input) + flatMapper.process(input) + verify(flatMapFunction).apply(input) } "return passed in description" in { - flatMapFunction.description shouldBe "flatMap" + flatMapper.description shouldBe "flatMap" } "return None on finish" in { - flatMapFunction.finish() shouldBe List.empty[S] + flatMapper.finish() shouldBe List.empty[S] } - "do nothing on clearState" in { - flatMapFunction.clearState() - verifyZeroInteractions(flatMap) + "set up FlatMapFunction on setup" in { + flatMapper.setup() + + verify(flatMapFunction).setup() } - "return AndThen on andThen" in { - val other = mock[SingleInputFunction[S, T]] - flatMapFunction.andThen[T](other) shouldBe an [AndThen[_, _, _]] + "tear down FlatMapFunction on teardown" in { + flatMapper.teardown() + + verify(flatMapFunction).teardown() } } "ReduceFunction" should { - "call reduce function when processing input value" in { - val reduce = mock[(T, T) => T] - val reduceFunction = new ReduceFunction[T](reduce, "reduce") + val reduceFunction = mock[ReduceFunction[T]] + val reducer = new Reducer[T](reduceFunction, "reduce") val input1 = mock[T] val input2 = mock[T] val output = mock[T] - when(reduce.apply(input1, input2)).thenReturn(output, output) + when(reduceFunction.apply(input1, input2)).thenReturn(output, output) - reduceFunction.process(input1) shouldBe List.empty[T] - reduceFunction.process(input2) shouldBe List.empty[T] - reduceFunction.finish() shouldBe List(output) + reducer.process(input1) shouldBe List.empty[T] + reducer.process(input2) shouldBe List.empty[T] + reducer.finish() shouldBe List(output) - reduceFunction.clearState() - reduceFunction.process(input1) shouldBe List.empty[T] - reduceFunction.clearState() - reduceFunction.process(input2) shouldBe List.empty[T] - reduceFunction.finish() shouldBe List(input2) + reducer.teardown() + reducer.process(input1) shouldBe List.empty[T] + reducer.teardown() + reducer.process(input2) shouldBe List.empty[T] + reducer.finish() shouldBe List(input2) } "return passed in description" in { - val reduce = mock[(T, T) => T] - val reduceFunction = new ReduceFunction[T](reduce, "reduce") - reduceFunction.description shouldBe "reduce" + val reduceFunction = mock[ReduceFunction[T]] + val reducer = new Reducer[T](reduceFunction, "reduce") + reducer.description shouldBe "reduce" } "return None on finish" in { - val reduce = mock[(T, T) => T] - val reduceFunction = new ReduceFunction[T](reduce, "reduce") - reduceFunction.finish() shouldBe List.empty[T] + val reduceFunction = mock[ReduceFunction[T]] + val reducer = new Reducer[T](reduceFunction, "reduce") + reducer.finish() shouldBe List.empty[T] } - "do nothing on clearState" in { - val reduce = mock[(T, T) => T] - val reduceFunction = new ReduceFunction[T](reduce, "reduce") - reduceFunction.clearState() - verifyZeroInteractions(reduce) + "set up reduce function on setup" in { + val reduceFunction = mock[ReduceFunction[T]] + val reducer = new Reducer[T](reduceFunction, "reduce") + reducer.setup() + + verify(reduceFunction).setup() } - "return AndThen on andThen" in { - val reduce = mock[(T, T) => T] - val reduceFunction = new ReduceFunction[T](reduce, "reduce") - val other = mock[SingleInputFunction[T, Any]] - reduceFunction.andThen[Any](other) shouldBe an[AndThen[_, _, _]] + "tear down reduce function on teardown" in { + val reduceFunction = mock[ReduceFunction[T]] + val reducer = new Reducer[T](reduceFunction, "reduce") + reducer.teardown() + + verify(reduceFunction).teardown() } } - "EmitFunction" should { + "Emit" should { - val emit = mock[T => Unit] - val emitFunction = new EmitFunction[T](emit) + val emitFunction = mock[T => Unit] + val emit = new Emit[T](emitFunction) "emit input value when processing input value" in { val input = mock[T] - emitFunction.process(input) shouldBe List.empty[Unit] + emit.process(input) shouldBe List.empty[Unit] - verify(emit).apply(input) + verify(emitFunction).apply(input) } "return empty description" in { - emitFunction.description shouldBe "" + emit.description shouldBe "" } "return None on finish" in { - emitFunction.finish() shouldBe List.empty[Unit] + emit.finish() shouldBe List.empty[Unit] } - "do nothing on clearState" in { - emitFunction.clearState() - verifyZeroInteractions(emit) - } + "do nothing on setup" in { + emit.setup() - "throw exception on andThen" in { - val other = mock[SingleInputFunction[Unit, Any]] - intercept[UnsupportedOperationException] { - emitFunction.andThen(other) - } + verifyZeroInteractions(emitFunction) } - } - "andThen" should { - "chain multiple single input function" in { - val split = new FlatMapFunction[String, String](line => line.split("\\s"), "split") - - val filter = new FlatMapFunction[String, String](word => - if (word.isEmpty) None else Some(word), "filter") - - val map = new FlatMapFunction[String, Int](word => Some(1), "map") - - val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum") - - val all = split.andThen(filter).andThen(map).andThen(sum) - - assert(all.description == "split.filter.map.sum") + "do nothing on teardown" in { + emit.teardown() - val data = - """ - five four three two one - five four three two - five four three - five four - five - """ - // force eager evaluation - all.process(data).toList - val result = all.finish().toList - assert(result.nonEmpty) - assert(result.last == 15) + verifyZeroInteractions(emitFunction) } } @@ -241,7 +246,7 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar { val taskContext = MockUtil.mockTaskContext implicit val actorSystem = MockUtil.system - val data = "one two three".split("\\s") + val data = "one two three".split("\\s+") val dataSource = new CollectionDataSource[String](data) val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource) @@ -257,7 +262,8 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar { // Source with transformer val anotherTaskContext = MockUtil.mockTaskContext - val double = new FlatMapFunction[String, String](word => List(word, word), "double") + val double = new FlatMapper[String, String](FlatMapFunction( + word => List(word, word)), "double") val another = new DataSourceTask(anotherTaskContext, conf.withValue(GEARPUMP_STREAMING_OPERATOR, double)) another.onStart(Instant.EPOCH) @@ -275,9 +281,8 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar { val data = "1 2 2 3 3 3" - val concat = new ReduceFunction[String]({ (left, right) => - left + right - }, "concat") + val concat = new Reducer[String](ReduceFunction({ (left, right) => + left + right}), "concat") implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) val config = UserConfig.empty.withValue[SingleInputFunction[String, String]]( @@ -311,7 +316,8 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar { // Source with transformer val taskContext = MockUtil.mockTaskContext val conf = UserConfig.empty - val double = new FlatMapFunction[String, String](word => List(word, word), "double") + val double = new FlatMapper[String, String](FlatMapFunction( + word => List(word, word)), "double") val task = new TransformTask[String, String](Some(double), taskContext, conf) task.onStart(Instant.EPOCH) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala new file mode 100644 index 0000000..5b90a3e --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.scalaapi + +import akka.actor.ActorSystem +import org.apache.gearpump.cluster.TestUtil +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.streaming.dsl.scalaapi +import org.apache.gearpump.streaming.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.source.DataSourceTask +import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication} +import org.apache.gearpump.util.Graph +import org.mockito.Mockito.when +import org.scalatest._ +import org.scalatest.mock.MockitoSugar + +import scala.concurrent.Await +import scala.concurrent.duration.Duration +class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { + + implicit var system: ActorSystem = _ + + override def beforeAll(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + it should "be able to generate multiple new streams" in { + val context: ClientContext = mock[ClientContext] + when(context.system).thenReturn(system) + + val dsl = StreamApp("dsl", context) + dsl.source(List("A"), 2, "A") shouldBe a [scalaapi.Stream[_]] + dsl.source(List("B"), 3, "B") shouldBe a [scalaapi.Stream[_]] + + val application = dsl.plan() + application shouldBe a [StreamApplication] + application.name shouldBe "dsl" + val dag = application.userConfig + .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get + dag.vertices.size shouldBe 2 + dag.vertices.foreach { processor => + processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName + if (processor.description == "A") { + processor.parallelism shouldBe 2 + } else if (processor.description == "B") { + processor.parallelism shouldBe 3 + } else { + fail(s"undefined source ${processor.description}") + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala new file mode 100644 index 0000000..62a3bcb --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.scalaapi + +import akka.actor._ +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner +import org.apache.gearpump.streaming.dsl.scalaapi.StreamSpec.Join +import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask} +import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription} +import org.apache.gearpump.streaming.source.DataSourceTask +import org.apache.gearpump.streaming.task.{Task, TaskContext} +import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication} +import org.apache.gearpump.util.Graph +import org.apache.gearpump.util.Graph._ +import org.mockito.Mockito.when +import org.scalatest._ +import org.scalatest.mock.MockitoSugar + +import scala.concurrent.Await +import scala.concurrent.duration.Duration +import scala.util.{Either, Left, Right} + +class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { + + implicit var system: ActorSystem = _ + + override def beforeAll(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + it should "translate the DSL to a DAG" in { + val context: ClientContext = mock[ClientContext] + when(context.system).thenReturn(system) + + val dsl = StreamApp("dsl", context) + + val data = + """ + five four three two one + five four three two + five four three + five four + five + """ + val stream = dsl.source(data.lines.toList, 1, ""). + flatMap(line => line.split("[\\s]+")).filter(_.nonEmpty). + map(word => (word, 1)). + groupBy(_._1, parallelism = 2). + reduce((left, right) => (left._1, left._2 + right._2)). + map[Either[(String, Int), String]]({t: (String, Int) => Left(t)}) + + val query = dsl.source(List("two"), 1, "").map[Either[(String, Int), String]]( + {s: String => Right(s)}) + stream.merge(query).process[(String, Int)](classOf[Join], 1) + + val app: StreamApplication = dsl.plan() + val dag = app.userConfig + .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get + + val dagTopology = dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) => + edge.partitionerFactory.partitioner.getClass.getName + } + val expectedDagTopology = getExpectedDagTopology + + dagTopology.vertices.toSet should contain theSameElementsAs expectedDagTopology.vertices.toSet + dagTopology.edges.toSet should contain theSameElementsAs expectedDagTopology.edges.toSet + } + + private def getExpectedDagTopology: Graph[String, String] = { + val source = classOf[DataSourceTask[_, _]].getName + val group = classOf[CountTriggerTask[_, _]].getName + val merge = classOf[TransformTask[_, _]].getName + val join = classOf[Join].getName + + val hash = classOf[HashPartitioner].getName + val groupBy = classOf[GroupByPartitioner[_, _]].getName + val colocation = classOf[CoLocationPartitioner].getName + + val expectedDagTopology = Graph( + source ~ groupBy ~> group ~ colocation ~> merge ~ hash ~> join, + source ~ hash ~> merge + ) + expectedDagTopology + } +} + +object StreamSpec { + + class Join(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { + + var query: String = _ + + override def onNext(msg: Message): Unit = { + msg.msg match { + case Left(wordCount: (String @unchecked, Int @unchecked)) => + if (query != null && wordCount._1 == query) { + taskContext.output(new Message(wordCount)) + } + + case Right(query: String) => + this.query = query + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala new file mode 100644 index 0000000..b6e7342 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import java.time.Instant + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction +import org.mockito.Mockito.{verify, when} +import org.scalacheck.Gen +import org.scalatest.{Matchers, PropSpec} +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks + +class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + property("TransformTask.onStart should call SingleInputFunction.setup") { + forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { (startTime: Instant) => + val taskContext = MockUtil.mockTaskContext + implicit val system = MockUtil.system + val config = UserConfig.empty + val operator = mock[SingleInputFunction[Any, Any]] + val sourceTask = new TransformTask[Any, Any](Some(operator), taskContext, config) + + sourceTask.onStart(startTime) + + verify(operator).setup() + } + } + + property("TransformTask.onNext should call SingleInputFunction.process") { + forAll(Gen.alphaStr) { (str: String) => + val taskContext = MockUtil.mockTaskContext + implicit val system = MockUtil.system + val config = UserConfig.empty + val operator = mock[SingleInputFunction[Any, Any]] + val task = new TransformTask[Any, Any](Some(operator), taskContext, config) + val msg = Message(str) + when(operator.process(str)).thenReturn(Some(str)) + + task.onNext(msg) + + verify(taskContext).output(msg) + } + } + + property("DataSourceTask.onStop should call SingleInputFunction.setup") { + val taskContext = MockUtil.mockTaskContext + implicit val system = MockUtil.system + val config = UserConfig.empty + val operator = mock[SingleInputFunction[Any, Any]] + val task = new TransformTask[Any, Any](Some(operator), taskContext, config) + + task.onStop() + + verify(operator).teardown() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/PartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/PartitionerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/PartitionerSpec.scala new file mode 100644 index 0000000..277a31c --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/PartitionerSpec.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.partitioner + +import org.apache.gearpump.Message +import org.scalatest.{FlatSpec, Matchers} + +class PartitionerSpec extends FlatSpec with Matchers { + val NUM = 10 + + "HashPartitioner" should "hash same key to same slots" in { + val partitioner = new HashPartitioner + + val data = new Array[Byte](1000) + (new java.util.Random()).nextBytes(data) + val msg = Message(data) + + val partition = partitioner.getPartition(msg, NUM) + assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0") + + assert(partition == partitioner.getPartition(msg, NUM), "multiple run should return" + + "consistent result") + } + + "ShufflePartitioner" should "hash same key randomly" in { + val partitioner = new ShufflePartitioner + + val data = new Array[Byte](1000) + (new java.util.Random()).nextBytes(data) + val msg = Message(data) + + val partition = partitioner.getPartition(msg, NUM) + assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0") + + assert(partition != partitioner.getPartition(msg, NUM), "multiple run should return" + + "consistent result") + } + + "BroadcastPartitioner" should "return all partitions" in { + val partitioner = new BroadcastPartitioner + + val data = new Array[Byte](1000) + (new java.util.Random()).nextBytes(data) + val msg = Message(data) + val partitions = partitioner.getPartitions(msg, NUM) + + partitions should contain theSameElementsAs 0.until(NUM) + } + + + "ShuffleGroupingPartitioner" should "hash same key randomly" in { + val partitioner = new ShuffleGroupingPartitioner + + val data = new Array[Byte](1000) + (new java.util.Random()).nextBytes(data) + val msg = Message(data) + + val partition = partitioner.getPartition(msg, NUM) + assert(partition >= 0 && partition < NUM, "Partition Id should be >= 0") + + assert(partition != partitioner.getPartition(msg, NUM), "multiple run should return" + + "consistent result") + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala index c786047..4e95bdd 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala @@ -23,6 +23,7 @@ import java.time.Instant import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar @@ -38,11 +39,13 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with val dataSource = mock[DataSource] val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - - val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, None) + val operator = mock[SingleInputFunction[Any, Any]] + val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, Some(operator)) sourceTask.onStart(startTime) + verify(dataSource).open(taskContext, startTime) + verify(operator).setup() } } @@ -69,9 +72,12 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with val dataSource = mock[DataSource] val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, None) + val operator = mock[SingleInputFunction[Any, Any]] + val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, Some(operator)) sourceTask.onStop() + verify(dataSource).close() + verify(operator).teardown() } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala index cfe47eb..bd3ddb8 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriberSpec.scala @@ -19,7 +19,7 @@ package org.apache.gearpump.streaming.task import org.scalatest.{FlatSpec, Matchers} -import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner} import org.apache.gearpump.streaming.task.SubscriberSpec.TestTask import org.apache.gearpump.streaming.{DAG, ProcessorDescription} import org.apache.gearpump.util.Graph http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala index 258a5ff..d128ace 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala @@ -26,7 +26,7 @@ import org.scalatest.{FlatSpec, Matchers} import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner} import org.apache.gearpump.streaming.task.SubscriptionSpec.NextTask import org.apache.gearpump.streaming.{LifeTime, ProcessorDescription} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala index 48901d2..8deee78 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/TaskActorSpec.scala @@ -25,7 +25,7 @@ import org.scalatest.{BeforeAndAfterEach, Matchers, WordSpec} import org.apache.gearpump.Message import org.apache.gearpump.cluster.{MasterHarness, TestUtil, UserConfig} -import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner} import org.apache.gearpump.serializer.{FastKryoSerializer, SerializationFramework} import org.apache.gearpump.streaming.AppMasterToExecutor.{ChangeTask, MsgLostException, StartTask, TaskChanged, TaskRegistered} import org.apache.gearpump.streaming.task.TaskActorSpec.TestTask http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/version.sbt ---------------------------------------------------------------------- diff --git a/version.sbt b/version.sbt index 6f96fe0..f817483 100644 --- a/version.sbt +++ b/version.sbt @@ -16,4 +16,4 @@ * limitations under the License. */ -version in ThisBuild := "0.8.2-SNAPSHOT" +version in ThisBuild := "0.8.3-SNAPSHOT"
