Repository: incubator-gearpump Updated Branches: refs/heads/master 5c4d60c5b -> 66017ab7b
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala index fb2d898..535497c 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala @@ -23,7 +23,7 @@ import java.time.Instant import org.apache.gearpump._ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.plan.OpTranslator.{DummyInputFunction, SingleInputFunction} +import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction import org.apache.gearpump.streaming.task.{Task, TaskContext} /** @@ -57,15 +57,10 @@ class DataSourceTask[IN, OUT] private[source]( private val processMessage: Message => Unit = operator match { case Some(op) => - op match { - case bad: DummyInputFunction[IN] => - (message: Message) => context.output(message) - case _ => - (message: Message) => { - op.process(message.msg.asInstanceOf[IN]).foreach { m: OUT => - context.output(Message(m, message.timestamp)) - } - } + (message: Message) => { + op.process(message.msg.asInstanceOf[IN]).foreach { m: OUT => + context.output(Message(m, message.timestamp)) + } } case None => (message: Message) => context.output(message) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala index c0b6a29..9a52cc6 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala @@ -23,7 +23,7 @@ import java.util import java.util.concurrent.TimeUnit import akka.actor._ -import org.apache.gearpump.streaming.source.{Watermark, DataSourceTask} +import org.apache.gearpump.streaming.source.Watermark import org.slf4j.Logger import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap @@ -308,9 +308,9 @@ class TaskActor( private def updateUpstreamMinClock(upstreamClock: TimeStamp): Unit = { if (upstreamClock > this.upstreamMinClock) { + this.upstreamMinClock = upstreamClock task.onWatermarkProgress(Instant.ofEpochMilli(this.upstreamMinClock)) } - this.upstreamMinClock = upstreamClock val subMinClock = subscriptions.foldLeft(Long.MaxValue) { (min, sub) => val subMin = sub._2.minClock http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala index e919a34..e0407ec 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala @@ -21,7 +21,10 @@ package org.apache.gearpump.streaming.dsl import akka.actor.ActorSystem import org.apache.gearpump.cluster.TestUtil import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication} import org.apache.gearpump.streaming.source.DataSourceTask +import org.apache.gearpump.util.Graph import org.mockito.Mockito.when import org.scalatest._ import org.scalatest.mock.MockitoSugar @@ -30,7 +33,7 @@ import scala.concurrent.Await import scala.concurrent.duration.Duration class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { - implicit var system: ActorSystem = null + implicit var system: ActorSystem = _ override def beforeAll(): Unit = { system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) @@ -45,49 +48,25 @@ class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with M val context: ClientContext = mock[ClientContext] when(context.system).thenReturn(system) - val app = StreamApp("dsl", context) - app.source(List("A"), 1, "") - app.source(List("B"), 1, "") + val dsl = StreamApp("dsl", context) + dsl.source(List("A"), 2, "A") shouldBe a [Stream[_]] + dsl.source(List("B"), 3, "B") shouldBe a [Stream[_]] - assert(app.graph.vertices.size == 2) - } - - it should "plan the dsl to Processsor(TaskDescription) DAG" in { - val context: ClientContext = mock[ClientContext] - when(context.system).thenReturn(system) - - val app = StreamApp("dsl", context) - val parallism = 3 - app.source(List("A", "B", "C"), parallism, "").flatMap(Array(_)).reduce(_ + _) - val task = app.plan.dag.vertices.iterator.next() - assert(task.taskClass == classOf[DataSourceTask[_, _]].getName) - assert(task.parallelism == parallism) - } - - it should "produce 3 messages" in { - val context: ClientContext = mock[ClientContext] - when(context.system).thenReturn(system) - val app = StreamApp("dsl", context) - val list = List[String]( - "0", - "1", - "2" - ) - val producer = app.source(list, 1, "producer").flatMap(Array(_)).reduce(_ + _) - val task = app.plan.dag.vertices.iterator.next() - /* - val task = app.plan.dag.vertices.iterator.map(desc => { - LOG.info(s"${desc.taskClass}") - }) - val sum = producer.flatMap(msg => { - LOG.info("in flatMap") - assert(msg.msg.isInstanceOf[String]) - val num = msg.msg.asInstanceOf[String].toInt - Array(num) - }).reduce(_+_) - val task = app.plan.dag.vertices.iterator.map(desc => { - LOG.info(s"${desc.taskClass}") - }) - */ + val application = dsl.plan() + application shouldBe a [StreamApplication] + application.name shouldBe "dsl" + val dag = application.userConfig + .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get + dag.vertices.size shouldBe 2 + dag.vertices.foreach { processor => + processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName + if (processor.description == "A") { + processor.parallelism shouldBe 2 + } else if (processor.description == "B") { + processor.parallelism shouldBe 3 + } else { + fail(s"undefined source ${processor.description}") + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala index 816feef..fdc721b 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala @@ -22,10 +22,11 @@ import akka.actor._ import org.apache.gearpump.Message import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.{TestUtil, UserConfig} -import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner} +import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription} +import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication} import org.apache.gearpump.streaming.dsl.StreamSpec.Join import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner -import org.apache.gearpump.streaming.dsl.plan.OpTranslator._ +import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask} import org.apache.gearpump.streaming.source.DataSourceTask import org.apache.gearpump.streaming.task.{Task, TaskContext} import org.apache.gearpump.util.Graph @@ -40,7 +41,6 @@ import scala.util.{Either, Left, Right} class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { - implicit var system: ActorSystem = _ override def beforeAll(): Unit = { @@ -56,7 +56,7 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock val context: ClientContext = mock[ClientContext] when(context.system).thenReturn(system) - val app = StreamApp("dsl", context) + val dsl = StreamApp("dsl", context) val data = """ @@ -66,30 +66,32 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock five four five """ - val stream = app.source(data.lines.toList, 1, ""). + val stream = dsl.source(data.lines.toList, 1, ""). flatMap(line => line.split("[\\s]+")).filter(_.nonEmpty). map(word => (word, 1)). groupBy(_._1, parallelism = 2). reduce((left, right) => (left._1, left._2 + right._2)). map[Either[(String, Int), String]](Left(_)) - val query = app.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_)) + val query = dsl.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_)) stream.merge(query).process[(String, Int)](classOf[Join], 1) - val appDescription = app.plan() + val app: StreamApplication = dsl.plan() + val dag = app.userConfig + .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get - val dagTopology = appDescription.dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) => + val dagTopology = dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) => edge.partitionerFactory.partitioner.getClass.getName } val expectedDagTopology = getExpectedDagTopology - assert(dagTopology.vertices.toSet.equals(expectedDagTopology.vertices.toSet)) - assert(dagTopology.edges.toSet.equals(expectedDagTopology.edges.toSet)) + dagTopology.vertices.toSet should contain theSameElementsAs expectedDagTopology.vertices.toSet + dagTopology.edges.toSet should contain theSameElementsAs expectedDagTopology.edges.toSet } private def getExpectedDagTopology: Graph[String, String] = { val source = classOf[DataSourceTask[_, _]].getName - val group = classOf[GroupByTask[_, _, _]].getName + val group = classOf[CountTriggerTask[_, _]].getName val merge = classOf[TransformTask[_, _]].getName val join = classOf[Join].getName http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala index fcc646d..f49eb04 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala @@ -18,24 +18,33 @@ package org.apache.gearpump.streaming.dsl.partitioner -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} +import java.time.Duration +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} import org.apache.gearpump.Message import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitionerSpec.People +import org.apache.gearpump.streaming.dsl.window.api.{FixedWindow, GroupByFn} +import org.apache.gearpump.streaming.dsl.window.impl.{Bucket, GroupAlsoByWindow} class GroupByPartitionerSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - it should "use the outpout of groupBy function to do partition" in { + + it should "group by message payload and window" in { val mark = People("Mark", "male") val tom = People("Tom", "male") val michelle = People("Michelle", "female") val partitionNum = 10 - val groupBy = new GroupByPartitioner[People, String](_.gender) - assert(groupBy.getPartition(Message(mark), partitionNum) - == groupBy.getPartition(Message(tom), partitionNum)) + val groupByFn: GroupByFn[People, (String, List[Bucket])] = + GroupAlsoByWindow[People, String](_.gender, FixedWindow.apply(Duration.ofMillis(5))) + val groupBy = new GroupByPartitioner[People, (String, List[Bucket])](groupByFn) + groupBy.getPartition(Message(mark, 1L), partitionNum) shouldBe + groupBy.getPartition(Message(tom, 2L), partitionNum) + + groupBy.getPartition(Message(mark, 1L), partitionNum) should not be + groupBy.getPartition(Message(tom, 6L), partitionNum) - assert(groupBy.getPartition(Message(mark), partitionNum) - != groupBy.getPartition(Message(michelle), partitionNum)) + groupBy.getPartition(Message(mark, 2L), partitionNum) should not be + groupBy.getPartition(Message(michelle, 3L), partitionNum) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala new file mode 100644 index 0000000..bf52abc --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.plan + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.streaming.Processor +import org.apache.gearpump.streaming.Processor.DefaultProcessor +import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, AnyTask} +import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction +import org.apache.gearpump.streaming.dsl.window.api.GroupByFn +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.{Task, TaskContext} +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} +import org.scalatest.mock.MockitoSugar + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoSugar { + + private val unchainableOps: List[Op] = List( + mock[DataSourceOp], + mock[DataSinkOp], + mock[GroupByOp[Any, Any]], + mock[MergeOp], + mock[ProcessorOp[AnyTask]]) + + implicit var system: ActorSystem = _ + + override def beforeAll(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + "DataSourceOp" should { + + "chain ChainableOp" in { + val dataSource = new AnySource + val dataSourceOp = DataSourceOp(dataSource) + val chainableOp = mock[ChainableOp[Any, Any]] + val fn = mock[SingleInputFunction[Any, Any]] + + val chainedOp = dataSourceOp.chain(chainableOp) + + chainedOp shouldBe a[DataSourceOp] + verify(chainableOp).fn + + unchainableOps.foreach { op => + intercept[OpChainException] { + dataSourceOp.chain(op) + } + } + } + + "get Processor of DataSource" in { + val dataSource = new AnySource + val dataSourceOp = DataSourceOp(dataSource) + val processor = dataSourceOp.getProcessor + processor shouldBe a[Processor[_]] + processor.parallelism shouldBe dataSourceOp.parallelism + processor.description shouldBe dataSourceOp.description + } + } + + "DataSinkOp" should { + + "not chain any Op" in { + val dataSink = new AnySink + val dataSinkOp = DataSinkOp(dataSink) + val chainableOp = mock[ChainableOp[Any, Any]] + val ops = chainableOp +: unchainableOps + ops.foreach { op => + intercept[OpChainException] { + dataSinkOp.chain(op) + } + } + } + + "get Processor of DataSink" in { + val dataSink = new AnySink + val dataSinkOp = DataSinkOp(dataSink) + val processor = dataSinkOp.getProcessor + processor shouldBe a[Processor[_]] + processor.parallelism shouldBe dataSinkOp.parallelism + processor.description shouldBe dataSinkOp.description + } + } + + "ProcessorOp" should { + + "not chain any Op" in { + val processorOp = new ProcessorOp[AnyTask] + val chainableOp = mock[ChainableOp[Any, Any]] + val ops = chainableOp +: unchainableOps + ops.foreach { op => + intercept[OpChainException] { + processorOp.chain(op) + } + } + } + + "get Processor" in { + val processorOp = new ProcessorOp[AnyTask] + val processor = processorOp.getProcessor + processor shouldBe a [DefaultProcessor[_]] + processor.parallelism shouldBe processorOp.parallelism + processor.description shouldBe processorOp.description + } + } + + "ChainableOp" should { + + "chain ChainableOp" in { + val fn1 = mock[SingleInputFunction[Any, Any]] + val chainableOp1 = ChainableOp[Any, Any](fn1) + + val fn2 = mock[SingleInputFunction[Any, Any]] + val chainableOp2 = ChainableOp[Any, Any](fn2) + + val chainedOp = chainableOp1.chain(chainableOp2) + + verify(fn1).andThen(fn2) + chainedOp shouldBe a[ChainableOp[_, _]] + + unchainableOps.foreach { op => + intercept[OpChainException] { + chainableOp1.chain(op) + } + } + } + + "throw exception on getProcessor" in { + val fn1 = mock[SingleInputFunction[Any, Any]] + val chainableOp1 = ChainableOp[Any, Any](fn1) + intercept[UnsupportedOperationException] { + chainableOp1.getProcessor + } + } + } + + "GroupByOp" should { + + "chain ChainableOp" in { + val groupByFn = mock[GroupByFn[Any, Any]] + val groupByOp = GroupByOp[Any, Any](groupByFn) + val fn = mock[SingleInputFunction[Any, Any]] + val chainableOp = mock[ChainableOp[Any, Any]] + when(chainableOp.fn).thenReturn(fn) + + val chainedOp = groupByOp.chain(chainableOp) + chainedOp shouldBe a[GroupByOp[_, _]] + + unchainableOps.foreach { op => + intercept[OpChainException] { + groupByOp.chain(op) + } + } + } + + "delegate to groupByFn on getProcessor" in { + val groupByFn = mock[GroupByFn[Any, Any]] + val groupByOp = GroupByOp[Any, Any](groupByFn) + + groupByOp.getProcessor + verify(groupByFn).getProcessor(anyInt, anyString, any[UserConfig])(any[ActorSystem]) + } + } + + "MergeOp" should { + + val mergeOp = MergeOp("merge") + + "chain ChainableOp" in { + val fn = mock[SingleInputFunction[Any, Any]] + val chainableOp = mock[ChainableOp[Any, Any]] + when(chainableOp.fn).thenReturn(fn) + + val chainedOp = mergeOp.chain(chainableOp) + chainedOp shouldBe a [MergeOp] + + unchainableOps.foreach { op => + intercept[OpChainException] { + mergeOp.chain(op) + } + } + } + + "get Processor" in { + val processor = mergeOp.getProcessor + processor shouldBe a[Processor[_]] + processor.parallelism shouldBe 1 + } + } +} + +object OpSpec { + class AnyTask(context: TaskContext, config: UserConfig) extends Task(context, config) + + class AnySource extends DataSource { + + override def open(context: TaskContext, startTime: Instant): Unit = {} + + override def read(): Message = Message("any") + + override def close(): Unit = {} + + override def getWatermark: Instant = Instant.now() + } + + class AnySink extends DataSink { + + override def open(context: TaskContext): Unit = {} + + override def write(message: Message): Unit = {} + + override def close(): Unit = {} +} +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala deleted file mode 100644 index 2112fd0..0000000 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.dsl.plan - -import java.time.Instant - -import scala.concurrent.Await -import scala.concurrent.duration.Duration -import akka.actor.ActorSystem -import org.mockito.ArgumentCaptor -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalatest._ -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.{TestUtil, UserConfig} -import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.CollectionDataSource -import org.apache.gearpump.streaming.dsl.plan.OpTranslator._ -import org.apache.gearpump.streaming.source.DataSourceTask - -class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - - - "andThen" should "chain multiple single input function" in { - val dummy = new DummyInputFunction[String] - val split = new FlatMapFunction[String, String](line => line.split("\\s"), "split") - - val filter = new FlatMapFunction[String, String](word => - if (word.isEmpty) None else Some(word), "filter") - - val map = new FlatMapFunction[String, Int](word => Some(1), "map") - - val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum") - - val all = dummy.andThen(split).andThen(filter).andThen(map).andThen(sum) - - assert(all.description == "split.filter.map.sum") - - val data = - """ - five four three two one - five four three two - five four three - five four - five - """ - val count = all.process(data).toList.last - assert(count == 15) - } - - "Source" should "iterate over input source and apply attached operator" in { - - val taskContext = MockUtil.mockTaskContext - implicit val actorSystem = MockUtil.system - - val data = "one two three".split("\\s") - val dataSource = new CollectionDataSource[String](data) - val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource) - - // Source with no transformer - val source = new DataSourceTask[String, String]( - taskContext, conf) - source.onStart(Instant.EPOCH) - source.onNext(Message("next")) - data.foreach { s => - verify(taskContext, times(1)).output(Message(s)) - } - - // Source with transformer - val anotherTaskContext = MockUtil.mockTaskContext - val double = new FlatMapFunction[String, String](word => List(word, word), "double") - val another = new DataSourceTask(anotherTaskContext, - conf.withValue(GEARPUMP_STREAMING_OPERATOR, double)) - another.onStart(Instant.EPOCH) - another.onNext(Message("next")) - data.foreach { s => - verify(anotherTaskContext, times(2)).output(Message(s)) - } - } - - "GroupByTask" should "group input by groupBy Function and " + - "apply attached operator for each group" in { - - val data = "1 2 2 3 3 3" - - val concat = new ReduceFunction[String]({ (left, right) => - left + right - }, "concat") - - implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - val config = UserConfig.empty.withValue[SingleInputFunction[String, String]]( - GEARPUMP_STREAMING_OPERATOR, concat) - - val taskContext = MockUtil.mockTaskContext - - val task = new GroupByTask[String, String, String](input => input, taskContext, config) - task.onStart(Instant.EPOCH) - - val peopleCaptor = ArgumentCaptor.forClass(classOf[Message]) - - data.split("\\s+").foreach { word => - task.onNext(Message(word)) - } - verify(taskContext, times(6)).output(peopleCaptor.capture()) - - import scala.collection.JavaConverters._ - - val values = peopleCaptor.getAllValues.asScala.map(input => input.msg.asInstanceOf[String]) - assert(values.mkString(",") == "1,2,22,3,33,333") - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - "MergeTask" should "accept two stream and apply the attached operator" in { - - // Source with transformer - val taskContext = MockUtil.mockTaskContext - val conf = UserConfig.empty - val double = new FlatMapFunction[String, String](word => List(word, word), "double") - val task = new TransformTask[String, String](Some(double), taskContext, conf) - task.onStart(Instant.EPOCH) - - val data = "1 2 2 3 3 3".split("\\s+") - - data.foreach { input => - task.onNext(Message(input)) - } - - verify(taskContext, times(data.length * 2)).output(anyObject()) - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala new file mode 100644 index 0000000..f8666ba --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.plan + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.partitioner.CoLocationPartitioner +import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner +import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._ +import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction} +import org.apache.gearpump.streaming.dsl.window.api.GroupByFn +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.{MockUtil, Processor} +import org.apache.gearpump.streaming.task.{Task, TaskContext} +import org.apache.gearpump.util.Graph +import org.scalatest.mock.MockitoSugar +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { + + implicit var system: ActorSystem = _ + + override def beforeAll(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + "Planner" should "chain operations" in { + val graph = Graph.empty[Op, OpEdge] + val sourceOp = DataSourceOp(new AnySource) + val groupByOp = GroupByOp(new AnyGroupByFn) + val flatMapOp = ChainableOp[Any, Any](anyFlatMapFunction) + val reduceOp = ChainableOp[Any, Any](anyReduceFunction) + val processorOp = new ProcessorOp[AnyTask] + val sinkOp = DataSinkOp(new AnySink) + val directEdge = Direct + val shuffleEdge = Shuffle + + graph.addVertex(sourceOp) + graph.addVertex(groupByOp) + graph.addEdge(sourceOp, shuffleEdge, groupByOp) + graph.addVertex(flatMapOp) + graph.addEdge(groupByOp, directEdge, flatMapOp) + graph.addVertex(reduceOp) + graph.addEdge(flatMapOp, directEdge, reduceOp) + graph.addVertex(processorOp) + graph.addEdge(reduceOp, directEdge, processorOp) + graph.addVertex(sinkOp) + graph.addEdge(processorOp, directEdge, sinkOp) + + implicit val system = MockUtil.system + + val planner = new Planner + val plan = planner.plan(graph) + .mapVertex(_.description) + + plan.vertices.toSet should contain theSameElementsAs + Set("source", "groupBy", "processor", "sink") + plan.outgoingEdgesOf("source").iterator.next()._2 shouldBe a[GroupByPartitioner[_, _]] + plan.outgoingEdgesOf("groupBy").iterator.next()._2 shouldBe a[CoLocationPartitioner] + plan.outgoingEdgesOf("processor").iterator.next()._2 shouldBe a[CoLocationPartitioner] + } +} + +object PlannerSpec { + + private val anyParallelism = 1 + private val anyFlatMapFunction = new FlatMapFunction[Any, Any](Option(_), "flatMap") + private val anyReduceFunction = new ReduceFunction[Any]( + (left: Any, right: Any) => (left, right), "reduce") + + class AnyTask(context: TaskContext, config: UserConfig) extends Task(context, config) + + class AnySource extends DataSource { + + override def open(context: TaskContext, startTime: Instant): Unit = {} + + override def read(): Message = Message("any") + + override def close(): Unit = {} + + override def getWatermark: Instant = Instant.now() + } + + class AnySink extends DataSink { + + override def open(context: TaskContext): Unit = {} + + override def write(message: Message): Unit = {} + + override def close(): Unit = {} + } + + class AnyGroupByFn extends GroupByFn[Any, Any] { + + override def groupBy(message: Message): Any = message.msg + + override def getProcessor( + parallelism: Int, + description: String, + userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task] = { + Processor[AnyTask](anyParallelism, description) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala new file mode 100644 index 0000000..94feae4 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.plan.functions + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.dsl.CollectionDataSource +import org.apache.gearpump.streaming.source.DataSourceTask +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask} +import org.apache.gearpump.streaming.dsl.window.api.CountWindow +import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow +import org.mockito.ArgumentCaptor +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.{Matchers, WordSpec} +import org.scalatest.mock.MockitoSugar + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar { + import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunctionSpec._ + + "AndThen" should { + + val first = mock[SingleInputFunction[R, S]] + val second = mock[SingleInputFunction[S, T]] + val andThen = new AndThen(first, second) + + "chain first and second functions when processing input value" in { + val input = mock[R] + val firstOutput = mock[S] + val secondOutput = mock[T] + when(first.process(input)).thenReturn(Some(firstOutput)) + when(second.process(firstOutput)).thenReturn(Some(secondOutput)) + + andThen.process(input).toList shouldBe List(secondOutput) + } + + "return chained description" in { + when(first.description).thenReturn("first") + when(second.description).thenReturn("second") + andThen.description shouldBe "first.second" + } + + "return either first result or second on finish" in { + val firstResult = mock[S] + val processedFirst = mock[T] + val secondResult = mock[T] + + when(first.finish()).thenReturn(Some(firstResult)) + when(second.process(firstResult)).thenReturn(Some(processedFirst)) + andThen.finish().toList shouldBe List(processedFirst) + + when(first.finish()).thenReturn(None) + when(second.finish()).thenReturn(Some(secondResult)) + andThen.finish().toList shouldBe List(secondResult) + } + + "clear both states on clearState" in { + andThen.clearState() + + verify(first).clearState() + verify(second).clearState() + } + + "return AndThen on andThen" in { + val third = mock[SingleInputFunction[T, Any]] + andThen.andThen[Any](third) shouldBe an [AndThen[_, _, _]] + } + } + + "FlatMapFunction" should { + + val flatMap = mock[R => TraversableOnce[S]] + val flatMapFunction = new FlatMapFunction[R, S](flatMap, "flatMap") + + "call flatMap function when processing input value" in { + val input = mock[R] + flatMapFunction.process(input) + verify(flatMap).apply(input) + } + + "return passed in description" in { + flatMapFunction.description shouldBe "flatMap" + } + + "return None on finish" in { + flatMapFunction.finish() shouldBe List.empty[S] + } + + "do nothing on clearState" in { + flatMapFunction.clearState() + verifyZeroInteractions(flatMap) + } + + "return AndThen on andThen" in { + val other = mock[SingleInputFunction[S, T]] + flatMapFunction.andThen[T](other) shouldBe an [AndThen[_, _, _]] + } + } + + "ReduceFunction" should { + + + "call reduce function when processing input value" in { + val reduce = mock[(T, T) => T] + val reduceFunction = new ReduceFunction[T](reduce, "reduce") + val input1 = mock[T] + val input2 = mock[T] + val output = mock[T] + + when(reduce.apply(input1, input2)).thenReturn(output, output) + + reduceFunction.process(input1) shouldBe List.empty[T] + reduceFunction.process(input2) shouldBe List.empty[T] + reduceFunction.finish() shouldBe List(output) + + reduceFunction.clearState() + reduceFunction.process(input1) shouldBe List.empty[T] + reduceFunction.clearState() + reduceFunction.process(input2) shouldBe List.empty[T] + reduceFunction.finish() shouldBe List(input2) + } + + "return passed in description" in { + val reduce = mock[(T, T) => T] + val reduceFunction = new ReduceFunction[T](reduce, "reduce") + reduceFunction.description shouldBe "reduce" + } + + "return None on finish" in { + val reduce = mock[(T, T) => T] + val reduceFunction = new ReduceFunction[T](reduce, "reduce") + reduceFunction.finish() shouldBe List.empty[T] + } + + "do nothing on clearState" in { + val reduce = mock[(T, T) => T] + val reduceFunction = new ReduceFunction[T](reduce, "reduce") + reduceFunction.clearState() + verifyZeroInteractions(reduce) + } + + "return AndThen on andThen" in { + val reduce = mock[(T, T) => T] + val reduceFunction = new ReduceFunction[T](reduce, "reduce") + val other = mock[SingleInputFunction[T, Any]] + reduceFunction.andThen[Any](other) shouldBe an[AndThen[_, _, _]] + } + } + + "EmitFunction" should { + + val emit = mock[T => Unit] + val emitFunction = new EmitFunction[T](emit) + + "emit input value when processing input value" in { + val input = mock[T] + + emitFunction.process(input) shouldBe List.empty[Unit] + + verify(emit).apply(input) + } + + "return empty description" in { + emitFunction.description shouldBe "" + } + + "return None on finish" in { + emitFunction.finish() shouldBe List.empty[Unit] + } + + "do nothing on clearState" in { + emitFunction.clearState() + verifyZeroInteractions(emit) + } + + "throw exception on andThen" in { + val other = mock[SingleInputFunction[Unit, Any]] + intercept[UnsupportedOperationException] { + emitFunction.andThen(other) + } + } + } + + "andThen" should { + "chain multiple single input function" in { + val split = new FlatMapFunction[String, String](line => line.split("\\s"), "split") + + val filter = new FlatMapFunction[String, String](word => + if (word.isEmpty) None else Some(word), "filter") + + val map = new FlatMapFunction[String, Int](word => Some(1), "map") + + val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum") + + val all = split.andThen(filter).andThen(map).andThen(sum) + + assert(all.description == "split.filter.map.sum") + + val data = + """ + five four three two one + five four three two + five four three + five four + five + """ + // force eager evaluation + all.process(data).toList + val result = all.finish().toList + assert(result.nonEmpty) + assert(result.last == 15) + } + } + + "Source" should { + "iterate over input source and apply attached operator" in { + + val taskContext = MockUtil.mockTaskContext + implicit val actorSystem = MockUtil.system + + val data = "one two three".split("\\s") + val dataSource = new CollectionDataSource[String](data) + val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource) + + // Source with no transformer + val source = new DataSourceTask[String, String]( + taskContext, conf) + source.onStart(Instant.EPOCH) + source.onNext(Message("next")) + data.foreach { s => + verify(taskContext, times(1)).output(MockUtil.argMatch[Message]( + message => message.msg == s)) + } + + // Source with transformer + val anotherTaskContext = MockUtil.mockTaskContext + val double = new FlatMapFunction[String, String](word => List(word, word), "double") + val another = new DataSourceTask(anotherTaskContext, + conf.withValue(GEARPUMP_STREAMING_OPERATOR, double)) + another.onStart(Instant.EPOCH) + another.onNext(Message("next")) + data.foreach { s => + verify(anotherTaskContext, times(2)).output(MockUtil.argMatch[Message]( + message => message.msg == s)) + } + } + } + + "CountTriggerTask" should { + "group input by groupBy Function and " + + "apply attached operator for each group" in { + + val data = "1 2 2 3 3 3" + + val concat = new ReduceFunction[String]({ (left, right) => + left + right + }, "concat") + + implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + val config = UserConfig.empty.withValue[SingleInputFunction[String, String]]( + GEARPUMP_STREAMING_OPERATOR, concat) + + val taskContext = MockUtil.mockTaskContext + + val groupBy = GroupAlsoByWindow((input: String) => input, CountWindow.apply(1).accumulating) + val task = new CountTriggerTask[String, String](groupBy, taskContext, config) + task.onStart(Instant.EPOCH) + + val peopleCaptor = ArgumentCaptor.forClass(classOf[Message]) + + data.split("\\s+").foreach { word => + task.onNext(Message(word)) + } + verify(taskContext, times(6)).output(peopleCaptor.capture()) + + import scala.collection.JavaConverters._ + + val values = peopleCaptor.getAllValues.asScala.map(input => input.msg.asInstanceOf[String]) + assert(values.mkString(",") == "1,2,22,3,33,333") + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + } + + "MergeTask" should { + "accept two stream and apply the attached operator" in { + + // Source with transformer + val taskContext = MockUtil.mockTaskContext + val conf = UserConfig.empty + val double = new FlatMapFunction[String, String](word => List(word, word), "double") + val task = new TransformTask[String, String](Some(double), taskContext, conf) + task.onStart(Instant.EPOCH) + + val data = "1 2 2 3 3 3".split("\\s+") + + data.foreach { input => + task.onNext(Message(input)) + } + + verify(taskContext, times(data.length * 2)).output(anyObject()) + } + } +} + +object SingleInputFunctionSpec { + type R = AnyRef + type S = AnyRef + type T = AnyRef +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala new file mode 100644 index 0000000..871d751 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import java.time.Instant + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.dsl.window.api.CountWindow +import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner} +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +class CountTriggerTaskSpec extends PropSpec with PropertyChecks + with Matchers with MockitoSugar { + + property("CountTriggerTask should trigger output by number of messages in a window") { + + implicit val system = MockUtil.system + + val numGen = Gen.chooseNum[Int](1, 1000) + + forAll(numGen, numGen) { (windowSize: Int, msgNum: Int) => + + val groupBy = mock[GroupAlsoByWindow[Any, Any]] + val window = CountWindow.apply(windowSize) + when(groupBy.window).thenReturn(window) + val windowRunner = mock[WindowRunner] + val userConfig = UserConfig.empty + + val task = new CountTriggerTask[Any, Any](groupBy, windowRunner, + MockUtil.mockTaskContext, userConfig) + val message = mock[Message] + + for (i <- 1 to msgNum) { + task.onNext(message) + } + verify(windowRunner, times(msgNum)).process(message) + verify(windowRunner, times(msgNum / windowSize)).trigger(Instant.ofEpochMilli(windowSize)) + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala new file mode 100644 index 0000000..a69abe6 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import java.time.{Duration, Instant} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, SlidingWindow} +import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner} +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.{Matchers, PropSpec} +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks + +class EventTimeTriggerTaskSpec extends PropSpec with PropertyChecks + with Matchers with MockitoSugar { + + property("EventTimeTriggerTask should trigger on watermark") { + val longGen = Gen.chooseNum[Long](1L, 1000L) + val windowSizeGen = longGen + val windowStepGen = longGen + val watermarkGen = longGen.map(Instant.ofEpochMilli) + + forAll(windowSizeGen, windowStepGen, watermarkGen) { + (windowSize: Long, windowStep: Long, watermark: Instant) => + + val window = SlidingWindow.apply(Duration.ofMillis(windowSize), + Duration.ofMillis(windowStep)).triggering(EventTimeTrigger) + val groupBy = mock[GroupAlsoByWindow[Any, Any]] + val windowRunner = mock[WindowRunner] + val context = MockUtil.mockTaskContext + val config = UserConfig.empty + + when(groupBy.window).thenReturn(window) + + val task = new EventTimeTriggerTask[Any, Any](groupBy, windowRunner, context, config) + + val message = mock[Message] + task.onNext(message) + verify(windowRunner).process(message) + + task.onWatermarkProgress(watermark) + verify(windowRunner).trigger(any[Instant]) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala new file mode 100644 index 0000000..39e1b4c --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import java.time.{Duration, Instant} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering +import org.apache.gearpump.streaming.dsl.window.api.{ProcessingTimeTrigger, SlidingWindow} +import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner} +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.{Matchers, PropSpec} +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks + +class ProcessingTimeTriggerTaskSpec extends PropSpec with PropertyChecks + with Matchers with MockitoSugar { + + property("ProcessingTimeTriggerTask should trigger on system time interval") { + val longGen = Gen.chooseNum[Long](1L, 1000L) + val windowSizeGen = longGen + val windowStepGen = longGen + val startTimeGen = longGen.map(Instant.ofEpochMilli) + + forAll(windowSizeGen, windowStepGen, startTimeGen) { + (windowSize: Long, windowStep: Long, startTime: Instant) => + + val window = SlidingWindow.apply(Duration.ofMillis(windowSize), + Duration.ofMillis(windowStep)).triggering(ProcessingTimeTrigger) + val groupBy = mock[GroupAlsoByWindow[Any, Any]] + val windowRunner = mock[WindowRunner] + val context = MockUtil.mockTaskContext + val config = UserConfig.empty + + when(groupBy.window).thenReturn(window) + + val task = new ProcessingTimeTriggerTask[Any, Any](groupBy, windowRunner, context, config) + + task.onStart(startTime) + + val message = mock[Message] + task.onNext(message) + verify(windowRunner).process(message) + + task.receiveUnManagedMessage(Triggering) + verify(windowRunner).trigger(any[Instant]) + } + } + +}
