Repository: incubator-gearpump Updated Branches: refs/heads/master c1370d9bf -> 24e1a4546
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala index d007e09..ca0135d 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala @@ -25,13 +25,13 @@ import org.apache.gearpump.cluster.{TestUtil, UserConfig} import org.apache.gearpump.streaming.Processor import org.apache.gearpump.streaming.Processor.DefaultProcessor import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, AnyTask} -import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, FunctionRunner} +import org.apache.gearpump.streaming.dsl.plan.functions.{DummyRunner, FlatMapper, FunctionRunner} import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction -import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow +import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows +import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner} import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.source.DataSource import org.apache.gearpump.streaming.task.{Task, TaskContext} -import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} import org.scalatest.mock.MockitoSugar @@ -61,16 +61,16 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS "DataSourceOp" should { - "chain ChainableOp" in { + "chain TransformOp" in { val dataSource = new AnySource val dataSourceOp = DataSourceOp(dataSource) - val chainableOp = mock[ChainableOp[Any, Any]] + val transformOp = mock[TransformOp[Any, Any]] val fn = mock[FunctionRunner[Any, Any]] + when(transformOp.fn).thenReturn(fn) - val chainedOp = dataSourceOp.chain(chainableOp) + val chainedOp = dataSourceOp.chain(transformOp) chainedOp shouldBe a[DataSourceOp] - verify(chainableOp).fn unchainableOps.foreach { op => intercept[OpChainException] { @@ -79,13 +79,13 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS } } - "get Processor of DataSource" in { + "be translated into processor" in { val dataSource = new AnySource val dataSourceOp = DataSourceOp(dataSource) - val processor = dataSourceOp.getProcessor + val processor = dataSourceOp.toProcessor processor shouldBe a[Processor[_]] processor.parallelism shouldBe dataSourceOp.parallelism - processor.description shouldBe dataSourceOp.description + processor.description shouldBe s"${dataSourceOp.description}.globalWindows" } } @@ -94,7 +94,7 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS "not chain any Op" in { val dataSink = new AnySink val dataSinkOp = DataSinkOp(dataSink) - val chainableOp = mock[ChainableOp[Any, Any]] + val chainableOp = mock[TransformOp[Any, Any]] val ops = chainableOp +: unchainableOps ops.foreach { op => intercept[OpChainException] { @@ -103,10 +103,10 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS } } - "get Processor of DataSink" in { + "be translated to processor" in { val dataSink = new AnySink val dataSinkOp = DataSinkOp(dataSink) - val processor = dataSinkOp.getProcessor + val processor = dataSinkOp.toProcessor processor shouldBe a[Processor[_]] processor.parallelism shouldBe dataSinkOp.parallelism processor.description shouldBe dataSinkOp.description @@ -117,7 +117,7 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS "not chain any Op" in { val processorOp = new ProcessorOp[AnyTask] - val chainableOp = mock[ChainableOp[Any, Any]] + val chainableOp = mock[TransformOp[Any, Any]] val ops = chainableOp +: unchainableOps ops.foreach { op => intercept[OpChainException] { @@ -126,41 +126,41 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS } } - "get Processor" in { + "be translated into processor" in { val processorOp = new ProcessorOp[AnyTask] - val processor = processorOp.getProcessor + val processor = processorOp.toProcessor processor shouldBe a [DefaultProcessor[_]] processor.parallelism shouldBe processorOp.parallelism processor.description shouldBe processorOp.description } } - "ChainableOp" should { + "TransformOp" should { - "chain ChainableOp" in { + "chain TransformOp" in { val fn1 = mock[FunctionRunner[Any, Any]] - val chainableOp1 = ChainableOp[Any, Any](fn1) + val transformOp1 = TransformOp[Any, Any](fn1) val fn2 = mock[FunctionRunner[Any, Any]] - val chainableOp2 = ChainableOp[Any, Any](fn2) + val transformOp2 = TransformOp[Any, Any](fn2) - val chainedOp = chainableOp1.chain(chainableOp2) + val chainedOp = transformOp1.chain(transformOp2) - chainedOp shouldBe a[ChainableOp[_, _]] + chainedOp shouldBe a[TransformOp[_, _]] unchainableOps.foreach { op => intercept[OpChainException] { - chainableOp1.chain(op) + transformOp1.chain(op) } } } - "get Processor" in { + "be translated to processor" in { val fn = mock[FlatMapFunction[Any, Any]] val flatMapper = new FlatMapper(fn, "flatMap") - val chainableOp = ChainableOp[Any, Any](flatMapper) + val transformOp = TransformOp[Any, Any](flatMapper) - val processor = chainableOp.getProcessor + val processor = transformOp.toProcessor processor shouldBe a[Processor[_]] processor.parallelism shouldBe 1 } @@ -168,14 +168,16 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS "GroupByOp" should { - "chain ChainableOp" in { - val groupBy = mock[GroupAlsoByWindow[Any, Any]] - val groupByOp = GroupByOp[Any, Any](groupBy) - val fn = mock[FunctionRunner[Any, Any]] - val chainableOp = mock[ChainableOp[Any, Any]] - when(chainableOp.fn).thenReturn(fn) + val groupBy = (any: Any) => any + val groupByOp = GroupByOp[Any, Any](groupBy) + + "chain WindowTransformOp" in { - val chainedOp = groupByOp.chain(chainableOp) + val runner = new DefaultWindowRunner[Any, Any](GlobalWindows(), new DummyRunner()) + val windowTransformOp = mock[WindowTransformOp[Any, Any]] + when(windowTransformOp.windowRunner).thenReturn(runner) + + val chainedOp = groupByOp.chain(windowTransformOp) chainedOp shouldBe a[GroupByOp[_, _]] unchainableOps.foreach { op => @@ -185,25 +187,23 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS } } - "delegate to groupByFn on getProcessor" in { - val groupBy = mock[GroupAlsoByWindow[Any, Any]] - val groupByOp = GroupByOp[Any, Any](groupBy) - - groupByOp.getProcessor - verify(groupBy).getProcessor(anyInt, anyString, any[UserConfig])(any[ActorSystem]) + "be translated to processor" in { + val processor = groupByOp.toProcessor + processor shouldBe a[Processor[_]] + processor.parallelism shouldBe 1 } } "MergeOp" should { - val mergeOp = MergeOp("merge") + val mergeOp = MergeOp() - "chain ChainableOp" in { - val fn = mock[FunctionRunner[Any, Any]] - val chainableOp = mock[ChainableOp[Any, Any]] - when(chainableOp.fn).thenReturn(fn) + "chain WindowTransformOp" in { + val runner = mock[WindowRunner[Any, Any]] + val windowTransformOp = mock[WindowTransformOp[Any, Any]] + when(windowTransformOp.windowRunner).thenReturn(runner) - val chainedOp = mergeOp.chain(chainableOp) + val chainedOp = mergeOp.chain(windowTransformOp) chainedOp shouldBe a [MergeOp] unchainableOps.foreach { op => @@ -213,8 +213,8 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS } } - "get Processor" in { - val processor = mergeOp.getProcessor + "be translated to processor" in { + val processor = mergeOp.toProcessor processor shouldBe a[Processor[_]] processor.parallelism shouldBe 1 } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala index 70abde9..70d21b5 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala @@ -24,16 +24,14 @@ import akka.actor.ActorSystem import org.apache.gearpump.Message import org.apache.gearpump.cluster.{TestUtil, UserConfig} import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction -import org.apache.gearpump.streaming.partitioner.CoLocationPartitioner -import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner +import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, GroupByPartitioner} import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._ import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, FoldRunner} import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction -import org.apache.gearpump.streaming.dsl.window.api.CountWindows -import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.source.DataSource import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows import org.apache.gearpump.streaming.task.{Task, TaskContext} import org.apache.gearpump.util.Graph import org.scalatest.mock.MockitoSugar @@ -58,10 +56,11 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc "Planner" should "chain operations" in { val graph = Graph.empty[Op, OpEdge] val sourceOp = DataSourceOp(new AnySource) - val groupBy = GroupAlsoByWindow((any: Any) => any, CountWindows.apply[Any](1)) + val groupBy = (any: Any) => any val groupByOp = GroupByOp(groupBy) - val flatMapOp = ChainableOp[Any, Any](anyFlatMapper) - val reduceOp = ChainableOp[Any, Option[Any]](anyReducer) + val windowOp = WindowOp(GlobalWindows()) + val flatMapOp = TransformOp[Any, Any](anyFlatMapper) + val reduceOp = TransformOp[Any, Option[Any]](anyReducer) val processorOp = new ProcessorOp[AnyTask] val sinkOp = DataSinkOp(new AnySink) val directEdge = Direct @@ -70,8 +69,10 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc graph.addVertex(sourceOp) graph.addVertex(groupByOp) graph.addEdge(sourceOp, shuffleEdge, groupByOp) + graph.addVertex(windowOp) + graph.addEdge(groupByOp, directEdge, windowOp) graph.addVertex(flatMapOp) - graph.addEdge(groupByOp, directEdge, flatMapOp) + graph.addEdge(windowOp, directEdge, flatMapOp) graph.addVertex(reduceOp) graph.addEdge(flatMapOp, directEdge, reduceOp) graph.addVertex(processorOp) @@ -86,9 +87,11 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc .mapVertex(_.description) plan.vertices.toSet should contain theSameElementsAs - Set("source", "groupBy", "processor", "sink") - plan.outgoingEdgesOf("source").iterator.next()._2 shouldBe a[GroupByPartitioner[_, _]] - plan.outgoingEdgesOf("groupBy").iterator.next()._2 shouldBe a[CoLocationPartitioner] + Set("source.globalWindows", "groupBy.globalWindows.flatMap.reduce", "processor", "sink") + plan.outgoingEdgesOf("source.globalWindows").iterator.next()._2 shouldBe + a[GroupByPartitioner[_, _]] + plan.outgoingEdgesOf("groupBy.globalWindows.flatMap.reduce").iterator.next()._2 shouldBe + a[CoLocationPartitioner] plan.outgoingEdgesOf("processor").iterator.next()._2 shouldBe a[CoLocationPartitioner] } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala index f5d7c20..6244224 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala @@ -19,28 +19,22 @@ package org.apache.gearpump.streaming.dsl.plan.functions import java.time.Instant -import akka.actor.ActorSystem import org.apache.gearpump.Message -import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil import org.apache.gearpump.streaming.source.{DataSourceTask, Watermark} import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.dsl.api.functions.{FoldFunction, ReduceFunction} import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction -import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform -import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask} -import org.apache.gearpump.streaming.dsl.window.api.CountWindows -import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow -import org.mockito.ArgumentCaptor +import org.apache.gearpump.streaming.dsl.task.TransformTask +import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows +import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner} import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalatest.{Matchers, WordSpec} import org.scalatest.mock.MockitoSugar -import scala.concurrent.Await -import scala.concurrent.duration.Duration - class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunnerSpec._ @@ -216,40 +210,6 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { } } - "Emit" should { - - val emitFunction = mock[T => Unit] - val emit = new Emit[T](emitFunction) - - "emit input value when processing input value" in { - val input = mock[T] - - emit.process(input) shouldBe List.empty[Unit] - - verify(emitFunction).apply(input) - } - - "return empty description" in { - emit.description shouldBe "" - } - - "return None on finish" in { - emit.finish() shouldBe List.empty[Unit] - } - - "do nothing on setup" in { - emit.setup() - - verifyZeroInteractions(emitFunction) - } - - "do nothing on teardown" in { - emit.teardown() - - verifyZeroInteractions(emitFunction) - } - } - "Source" should { "iterate over input source and apply attached operator" in { @@ -258,7 +218,11 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { val data = "one two three".split("\\s+") val dataSource = new CollectionDataSource[String](data) - val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource) + val runner1 = new DefaultWindowRunner[String, String]( + GlobalWindows(), new DummyRunner[String]) + val conf = UserConfig.empty + .withValue(GEARPUMP_STREAMING_SOURCE, dataSource) + .withValue[WindowRunner[String, String]](GEARPUMP_STREAMING_OPERATOR, runner1) // Source with no transformer val source = new DataSourceTask[String, String]( @@ -275,8 +239,10 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { val anotherTaskContext = MockUtil.mockTaskContext val double = new FlatMapper[String, String](FlatMapFunction( word => List(word, word)), "double") + val runner2 = new DefaultWindowRunner[String, String]( + GlobalWindows(), double) val another = new DataSourceTask(anotherTaskContext, - conf.withValue(GEARPUMP_STREAMING_OPERATOR, double)) + conf.withValue(GEARPUMP_STREAMING_OPERATOR, runner2)) another.onStart(Instant.EPOCH) another.onNext(Message("next")) another.onWatermarkProgress(Watermark.MAX) @@ -287,44 +253,8 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { } } - "CountTriggerTask" should { - "group input by groupBy Function and " + - "apply attached operator for each group" in { - - val data = "1 2 2 3 3 3" - - val concat = new FoldRunner[String, Option[String]](ReduceFunction({ (left, right) => - left + right}), "concat") - - implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - val config = UserConfig.empty.withValue[FunctionRunner[String, Option[String]]]( - GEARPUMP_STREAMING_OPERATOR, concat) - - val taskContext = MockUtil.mockTaskContext - - val groupBy = GroupAlsoByWindow((input: String) => input, - CountWindows.apply[String](1).accumulating) - val task = new CountTriggerTask[String, String](groupBy, taskContext, config) - task.onStart(Instant.EPOCH) - - val peopleCaptor = ArgumentCaptor.forClass(classOf[Message]) - - data.split("\\s+").foreach { word => - task.onNext(Message(word)) - } - verify(taskContext, times(6)).output(peopleCaptor.capture()) - - import scala.collection.JavaConverters._ - - val values = peopleCaptor.getAllValues.asScala.map(input => - input.value.asInstanceOf[Option[String]].get) - assert(values.mkString(",") == "1,2,22,3,33,333") - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - } - "TransformTask" should { + "MergeTask" should { "accept two stream and apply the attached operator" in { // Source with transformer @@ -332,7 +262,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { val conf = UserConfig.empty val double = new FlatMapper[String, String](FlatMapFunction( word => List(word, word)), "double") - val transform = new Transform[String, String](taskContext, Some(double)) + val transform = new DefaultWindowRunner[String, String](GlobalWindows(), double) val task = new TransformTask[String, String](transform, taskContext, conf) task.onStart(Instant.EPOCH) http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala index 5b90a3e..c8c8b9f 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala @@ -61,9 +61,9 @@ class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with M dag.vertices.size shouldBe 2 dag.vertices.foreach { processor => processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName - if (processor.description == "A") { + if (processor.description == "A.globalWindows") { processor.parallelism shouldBe 2 - } else if (processor.description == "B") { + } else if (processor.description == "B.globalWindows") { processor.parallelism shouldBe 3 } else { fail(s"undefined source ${processor.description}") http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala index 4c7e209..ef8f932 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala @@ -22,10 +22,9 @@ import akka.actor._ import org.apache.gearpump.Message import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.{TestUtil, UserConfig} -import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner import org.apache.gearpump.streaming.dsl.scalaapi.StreamSpec.Join -import org.apache.gearpump.streaming.dsl.task.{EventTimeTriggerTask, TransformTask} -import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription} +import org.apache.gearpump.streaming.dsl.task.{GroupByTask, TransformTask} +import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, GroupByPartitioner, HashPartitioner, PartitionerDescription} import org.apache.gearpump.streaming.source.DataSourceTask import org.apache.gearpump.streaming.task.{Task, TaskContext} import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication} @@ -92,7 +91,7 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock private def getExpectedDagTopology: Graph[String, String] = { val source = classOf[DataSourceTask[_, _]].getName - val group = classOf[EventTimeTriggerTask[_, _]].getName + val group = classOf[GroupByTask[_, _, _]].getName val merge = classOf[TransformTask[_, _]].getName val join = classOf[Join].getName http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala deleted file mode 100644 index 1a4958a..0000000 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.dsl.task - -import java.time.Instant - -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.window.api.CountWindows -import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner} -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks -import org.scalatest.{Matchers, PropSpec} - -class CountTriggerTaskSpec extends PropSpec with PropertyChecks - with Matchers with MockitoSugar { - - property("CountTriggerTask should trigger output by number of messages in a window") { - - implicit val system = MockUtil.system - - val numGen = Gen.chooseNum[Int](1, 1000) - - forAll(numGen, numGen) { (windowSize: Int, msgNum: Int) => - - val groupBy = mock[GroupAlsoByWindow[Any, Any]] - val window = CountWindows.apply[Any](windowSize) - when(groupBy.window).thenReturn(window) - val windowRunner = mock[WindowRunner] - val userConfig = UserConfig.empty - - val task = new CountTriggerTask[Any, Any](groupBy, windowRunner, - MockUtil.mockTaskContext, userConfig) - val message = mock[Message] - - for (i <- 1 to msgNum) { - task.onNext(message) - } - verify(windowRunner, times(msgNum)).process(message) - verify(windowRunner, times(msgNum / windowSize)).trigger(Instant.ofEpochMilli(windowSize)) - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala deleted file mode 100644 index 9414c76..0000000 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.dsl.task - -import java.time.{Duration, Instant} - -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, SlidingWindows} -import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner} -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.{Matchers, PropSpec} -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks - -class EventTimeTriggerTaskSpec extends PropSpec with PropertyChecks - with Matchers with MockitoSugar { - - property("EventTimeTriggerTask should trigger on watermark") { - val longGen = Gen.chooseNum[Long](1L, 1000L) - val windowSizeGen = longGen - val windowStepGen = longGen - val watermarkGen = longGen.map(Instant.ofEpochMilli) - - forAll(windowSizeGen, windowStepGen, watermarkGen) { - (windowSize: Long, windowStep: Long, watermark: Instant) => - - val window = SlidingWindows.apply[Any](Duration.ofMillis(windowSize), - Duration.ofMillis(windowStep)).triggering(EventTimeTrigger) - val groupBy = mock[GroupAlsoByWindow[Any, Any]] - val windowRunner = mock[WindowRunner] - val context = MockUtil.mockTaskContext - val config = UserConfig.empty - - when(groupBy.window).thenReturn(window) - - val task = new EventTimeTriggerTask[Any, Any](groupBy, windowRunner, context, config) - - val message = mock[Message] - task.onNext(message) - verify(windowRunner).process(message) - - task.onWatermarkProgress(watermark) - verify(windowRunner).trigger(any[Instant]) - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala new file mode 100644 index 0000000..0f87a1c --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import java.time.Instant + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner +import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows +import org.apache.gearpump.streaming.{Constants, MockUtil} +import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner} +import org.apache.gearpump.streaming.source.Watermark +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.{Matchers, PropSpec} +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks + +class GroupByTaskSpec extends PropSpec with PropertyChecks + with Matchers with MockitoSugar { + + property("GroupByTask should trigger on watermark") { + val longGen = Gen.chooseNum[Long](1L, 1000L).map(Instant.ofEpochMilli) + + forAll(longGen) { (time: Instant) => + val groupBy = mock[Any => Int] + val windowRunner = new DefaultWindowRunner[Any, Any](GlobalWindows(), new DummyRunner[Any]) + val context = MockUtil.mockTaskContext + val config = UserConfig.empty + .withValue( + Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner)(MockUtil.system) + + val task = new GroupByTask[Any, Int, Any](groupBy, context, config) + val value = time + val message = Message(value, time) + when(groupBy(time)).thenReturn(0) + task.onNext(message) + + task.onWatermarkProgress(Watermark.MAX) + verify(context).output(message) + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala deleted file mode 100644 index cbc9e0c..0000000 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gearpump.streaming.dsl.task - -import java.time.{Duration, Instant} - -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering -import org.apache.gearpump.streaming.dsl.window.api.{ProcessingTimeTrigger, SlidingWindows} -import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner} -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalacheck.Gen -import org.scalatest.{Matchers, PropSpec} -import org.scalatest.mock.MockitoSugar -import org.scalatest.prop.PropertyChecks - -class ProcessingTimeTriggerTaskSpec extends PropSpec with PropertyChecks - with Matchers with MockitoSugar { - - property("ProcessingTimeTriggerTask should trigger on system time interval") { - val longGen = Gen.chooseNum[Long](1L, 1000L) - val windowSizeGen = longGen - val windowStepGen = longGen - val startTimeGen = longGen.map(Instant.ofEpochMilli) - - forAll(windowSizeGen, windowStepGen, startTimeGen) { - (windowSize: Long, windowStep: Long, startTime: Instant) => - - val window = SlidingWindows.apply[Any](Duration.ofMillis(windowSize), - Duration.ofMillis(windowStep)).triggering(ProcessingTimeTrigger) - val groupBy = mock[GroupAlsoByWindow[Any, Any]] - val windowRunner = mock[WindowRunner] - val context = MockUtil.mockTaskContext - val config = UserConfig.empty - - when(groupBy.window).thenReturn(window) - - val task = new ProcessingTimeTriggerTask[Any, Any](groupBy, windowRunner, context, config) - - task.onStart(startTime) - - val message = mock[Message] - task.onNext(message) - verify(windowRunner).process(message) - - task.receiveUnManagedMessage(Triggering) - verify(windowRunner).trigger(any[Instant]) - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala index 481925a..6b66f01 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala @@ -22,11 +22,9 @@ import java.time.Instant import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner -import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform -import org.apache.gearpump.streaming.source.Watermark +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner} import org.mockito.{Matchers => MockitoMatchers} -import org.mockito.Mockito.{times, verify, when} +import org.mockito.Mockito.{verify, when} import org.scalacheck.Gen import org.scalatest.{Matchers, PropSpec} import org.scalatest.mock.MockitoSugar @@ -34,43 +32,25 @@ import org.scalatest.prop.PropertyChecks class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - private val timeGen = Gen.chooseNum[Long](Watermark.MIN.toEpochMilli, - Watermark.MAX.toEpochMilli - 1).map(Instant.ofEpochMilli) - private val runnerGen = { - val runner = mock[FunctionRunner[Any, Any]] - Gen.oneOf(Some(runner), None) - } - - property("TransformTask should emit on watermark") { - val msgGen = for { - str <- Gen.alphaStr.suchThat(!_.isEmpty) - t <- timeGen - } yield Message(s"$str:$t", t) - val msgsGen = Gen.listOfN(10, msgGen) - - forAll(runnerGen, msgsGen) { - (runner: Option[FunctionRunner[Any, Any]], msgs: List[Message]) => - val taskContext = MockUtil.mockTaskContext - implicit val system = MockUtil.system - val config = UserConfig.empty - val transform = new Transform[Any, Any](taskContext, runner) - val task = new TransformTask[Any, Any](transform, taskContext, config) - - msgs.foreach(task.onNext) - - runner.foreach(r => when(r.finish()).thenReturn(None)) - task.onWatermarkProgress(Watermark.MIN) - verify(taskContext, times(0)).output(MockitoMatchers.any[Message]) - - msgs.foreach { msg => - runner.foreach(r => - when(r.process(msg.value)).thenReturn(Some(msg.value))) - } - task.onWatermarkProgress(Watermark.MAX) - - msgs.foreach { msg => - verify(taskContext).output(MockitoMatchers.eq(msg)) - } + property("MergeTask should trigger on watermark") { + val longGen = Gen.chooseNum[Long](1L, 1000L) + val watermarkGen = longGen.map(Instant.ofEpochMilli) + + forAll(watermarkGen) { (watermark: Instant) => + val windowRunner = mock[WindowRunner[Any, Any]] + val context = MockUtil.mockTaskContext + val config = UserConfig.empty + val task = new TransformTask[Any, Any](windowRunner, context, config) + val time = watermark.minusMillis(1L) + val value: Any = time + val message = Message(value, time) + + task.onNext(message) + verify(windowRunner).process(TimestampedValue(value, time)) + + when(windowRunner.trigger(watermark)).thenReturn(Some(TimestampedValue(value, time))) + task.onWatermarkProgress(watermark) + verify(context).output(message) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala index fbbee3e..98e9919 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala @@ -21,18 +21,15 @@ package org.apache.gearpump.streaming.dsl.window.impl import java.time.{Duration, Instant} import org.apache.gearpump.Message -import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction -import org.apache.gearpump.streaming.{Constants, MockUtil} +import org.apache.gearpump.streaming.MockUtil import org.apache.gearpump.streaming.dsl.plan.functions.FoldRunner import org.apache.gearpump.streaming.dsl.window.api.SessionWindows import org.apache.gearpump.streaming.source.Watermark -import org.mockito.Mockito.{times, verify} import org.scalatest.{Matchers, PropSpec} import org.scalatest.mock.MockitoSugar import org.scalatest.prop.PropertyChecks - class DefaultWindowRunnerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { @@ -40,34 +37,25 @@ class DefaultWindowRunnerSpec extends PropSpec with PropertyChecks val data = List( Message(("foo", 1L), Instant.ofEpochMilli(1L)), - Message(("bar", 1L), Instant.ofEpochMilli(8L)), Message(("foo", 1L), Instant.ofEpochMilli(15L)), - Message(("bar", 1L), Instant.ofEpochMilli(17L)), - Message(("bar", 1L), Instant.ofEpochMilli(18L)), Message(("foo", 1L), Instant.ofEpochMilli(25L)), - Message(("foo", 1L), Instant.ofEpochMilli(26L)), - Message(("bar", 1L), Instant.ofEpochMilli(30L)), - Message(("bar", 1L), Instant.ofEpochMilli(31L)) + Message(("foo", 1L), Instant.ofEpochMilli(26L)) ) type KV = (String, Long) - val taskContext = MockUtil.mockTaskContext implicit val system = MockUtil.system val reduce = ReduceFunction[KV]((kv1, kv2) => (kv1._1, kv1._2 + kv2._2)) - val operator = new FoldRunner(reduce, "reduce") - val userConfig = UserConfig.empty.withValue( - Constants.GEARPUMP_STREAMING_OPERATOR, operator) - val windows = SessionWindows.apply[KV](Duration.ofMillis(4L)) - val groupBy = GroupAlsoByWindow[KV, String](_._1, windows) - val windowRunner = new DefaultWindowRunner(taskContext, userConfig, groupBy) - - data.foreach(windowRunner.process) - windowRunner.trigger(Watermark.MAX) - - verify(taskContext, times(2)).output(Message(Some(("foo", 1)), Watermark.MAX)) - verify(taskContext).output(Message(Some(("foo", 2)), Watermark.MAX)) - verify(taskContext, times(2)).output(Message(Some(("bar", 2)), Watermark.MAX)) - verify(taskContext).output(Message(Some(("bar", 1)), Watermark.MAX)) + val windows = SessionWindows.apply(Duration.ofMillis(4L)) + val windowRunner = new DefaultWindowRunner[KV, Option[KV]](windows, + new FoldRunner[KV, Option[KV]](reduce, "reduce")) + + data.foreach(m => windowRunner.process(TimestampedValue(m.value.asInstanceOf[KV], m.timestamp))) + windowRunner.trigger(Watermark.MAX).toList shouldBe + List( + TimestampedValue(Some(("foo", 1)), Instant.ofEpochMilli(4)), + TimestampedValue(Some(("foo", 1)), Instant.ofEpochMilli(18)), + TimestampedValue(Some(("foo", 2)), Instant.ofEpochMilli(29)) + ) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitionerSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitionerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitionerSpec.scala new file mode 100644 index 0000000..038f91d --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitionerSpec.scala @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.partitioner + +import org.apache.gearpump.Message +import org.apache.gearpump.streaming.partitioner.GroupByPartitionerSpec.People +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +class GroupByPartitionerSpec extends FlatSpec with Matchers with BeforeAndAfterAll { + + it should "group by message payload and window" in { + val mark = People("Mark", "male") + val tom = People("Tom", "male") + val michelle = People("Michelle", "female") + + val partitionNum = 10 + + val groupBy = new GroupByPartitioner[People, String](_.gender) + groupBy.getPartition(Message(mark, 1L), partitionNum) shouldBe + groupBy.getPartition(Message(tom, 2L), partitionNum) + + groupBy.getPartition(Message(mark, 2L), partitionNum) should not be + groupBy.getPartition(Message(michelle, 3L), partitionNum) + } +} + +object GroupByPartitionerSpec { + case class People(name: String, gender: String) +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala index 7651251..f7a3a63 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala @@ -23,8 +23,7 @@ import java.time.Instant import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner -import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner} import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar @@ -33,21 +32,16 @@ import org.scalatest.prop.PropertyChecks class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { - private val runnerGen = { - val runner = mock[FunctionRunner[Any, Any]] - Gen.oneOf(Some(runner), None) - } - property("DataSourceTask should setup data source") { - forAll(runnerGen, Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { - (runner: Option[FunctionRunner[Any, Any]], startTime: Instant) => + forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { + (startTime: Instant) => val taskContext = MockUtil.mockTaskContext implicit val system = MockUtil.system val dataSource = mock[DataSource] val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - val transform = new Transform[Any, Any](taskContext, runner) - val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform) + val runner = mock[WindowRunner[Any, Any]] + val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config) sourceTask.onStart(startTime) @@ -56,21 +50,20 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with } property("DataSourceTask should read from DataSource and transform inputs") { - forAll(runnerGen, Gen.alphaStr, Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { - (runner: Option[FunctionRunner[Any, Any]], str: String, timestamp: Instant) => + forAll(Gen.alphaStr, Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { + (str: String, timestamp: Instant) => val taskContext = MockUtil.mockTaskContext implicit val system = MockUtil.system val dataSource = mock[DataSource] val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - val transform = new Transform[Any, Any](taskContext, runner) - val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform) + val runner = mock[WindowRunner[Any, Any]] + val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config) val msg = Message(str, timestamp) when(dataSource.read()).thenReturn(msg) - runner.foreach(r => { - when(r.process(str)).thenReturn(Some(str)) - when(r.finish()).thenReturn(None) - }) + + when(runner.trigger(Watermark.MAX)).thenReturn( + Some(TimestampedValue(str.asInstanceOf[Any], timestamp))) sourceTask.onNext(Message("next")) sourceTask.onWatermarkProgress(Watermark.MAX) @@ -80,18 +73,16 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with } property("DataSourceTask should teardown DataSource") { - forAll(runnerGen) { (runner: Option[FunctionRunner[Any, Any]]) => - val taskContext = MockUtil.mockTaskContext - implicit val system = MockUtil.system - val dataSource = mock[DataSource] - val config = UserConfig.empty - .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - val transform = new Transform[Any, Any](taskContext, runner) - val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform) + val taskContext = MockUtil.mockTaskContext + implicit val system = MockUtil.system + val dataSource = mock[DataSource] + val config = UserConfig.empty + .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) + val runner = mock[WindowRunner[Any, Any]] + val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config) - sourceTask.onStop() + sourceTask.onStop() - verify(dataSource).close() - } + verify(dataSource).close() } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala index fb0beaa..65cb17a 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala @@ -24,9 +24,10 @@ import java.util.Random import org.mockito.Mockito._ import org.scalatest.mock.MockitoSugar import org.scalatest.{FlatSpec, Matchers} -import org.apache.gearpump.{MAX_TIME_MILLIS, Message} +import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner} +import org.apache.gearpump.streaming.source.Watermark import org.apache.gearpump.streaming.task.SubscriptionSpec.NextTask import org.apache.gearpump.streaming.{LifeTime, ProcessorDescription} @@ -115,7 +116,7 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { subscription.sendMessage(Message(randomMessage, clock)) } - assert(subscription.allowSendingMoreMessages() == false) + assert(!subscription.allowSendingMoreMessages()) } it should "report minClock as Long.MaxValue when there is no pending message" in { @@ -124,7 +125,7 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar { subscription.sendMessage(msg1) assert(subscription.minClock == 70) subscription.receiveAck(Ack(TaskId(1, 1), 1, 1, session)) - assert(subscription.minClock == MAX_TIME_MILLIS) + assert(subscription.minClock == Watermark.MAX.toEpochMilli) } private def randomMessage: String = new Random().nextInt.toString
