Repository: incubator-gearpump Updated Branches: refs/heads/master a23a40f5e -> 636cd6f8e
[GEARPUMP-262] Invoke setup/teardown methods of SingleInputFunction i⦠â¦n all tasks Author: manuzhang <[email protected]> Closes #132 from manuzhang/setup_teardown. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/636cd6f8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/636cd6f8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/636cd6f8 Branch: refs/heads/master Commit: 636cd6f8ef566260932848d2cc1b6c77fd8c90b3 Parents: a23a40f Author: manuzhang <[email protected]> Authored: Sat Jan 14 19:56:17 2017 +0800 Committer: manuzhang <[email protected]> Committed: Sat Jan 14 19:56:29 2017 +0800 ---------------------------------------------------------------------- .../streaming/dsl/task/TransformTask.scala | 10 +++ .../streaming/source/DataSourceTask.scala | 2 + .../streaming/dsl/task/TransformTaskSpec.scala | 75 ++++++++++++++++++++ .../streaming/source/DataSourceTaskSpec.scala | 12 +++- 4 files changed, 96 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/636cd6f8/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala index c13a4fb..f8fbefa 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala @@ -17,6 +17,8 @@ */ package org.apache.gearpump.streaming.dsl.task +import java.time.Instant + import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ @@ -31,6 +33,10 @@ class TransformTask[IN, OUT](operator: Option[SingleInputFunction[IN, OUT]], GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf) } + override def onStart(startTime: Instant): Unit = { + operator.foreach(_.setup()) + } + override def onNext(msg: Message): Unit = { val time = msg.timestamp @@ -43,4 +49,8 @@ class TransformTask[IN, OUT](operator: Option[SingleInputFunction[IN, OUT]], taskContext.output(new Message(msg.msg, time)) } } + + override def onStop(): Unit = { + operator.foreach(_.teardown()) + } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/636cd6f8/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala index 535497c..450f2d6 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala @@ -69,6 +69,7 @@ class DataSourceTask[IN, OUT] private[source]( override def onStart(startTime: Instant): Unit = { LOG.info(s"opening data source at $startTime") source.open(context, startTime) + operator.foreach(_.setup()) self ! Watermark(source.getWatermark) } @@ -82,6 +83,7 @@ class DataSourceTask[IN, OUT] private[source]( } override def onStop(): Unit = { + operator.foreach(_.teardown()) LOG.info("closing data source...") source.close() } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/636cd6f8/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala new file mode 100644 index 0000000..b6e7342 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import java.time.Instant + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction +import org.mockito.Mockito.{verify, when} +import org.scalacheck.Gen +import org.scalatest.{Matchers, PropSpec} +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks + +class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar { + + property("TransformTask.onStart should call SingleInputFunction.setup") { + forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) { (startTime: Instant) => + val taskContext = MockUtil.mockTaskContext + implicit val system = MockUtil.system + val config = UserConfig.empty + val operator = mock[SingleInputFunction[Any, Any]] + val sourceTask = new TransformTask[Any, Any](Some(operator), taskContext, config) + + sourceTask.onStart(startTime) + + verify(operator).setup() + } + } + + property("TransformTask.onNext should call SingleInputFunction.process") { + forAll(Gen.alphaStr) { (str: String) => + val taskContext = MockUtil.mockTaskContext + implicit val system = MockUtil.system + val config = UserConfig.empty + val operator = mock[SingleInputFunction[Any, Any]] + val task = new TransformTask[Any, Any](Some(operator), taskContext, config) + val msg = Message(str) + when(operator.process(str)).thenReturn(Some(str)) + + task.onNext(msg) + + verify(taskContext).output(msg) + } + } + + property("DataSourceTask.onStop should call SingleInputFunction.setup") { + val taskContext = MockUtil.mockTaskContext + implicit val system = MockUtil.system + val config = UserConfig.empty + val operator = mock[SingleInputFunction[Any, Any]] + val task = new TransformTask[Any, Any](Some(operator), taskContext, config) + + task.onStop() + + verify(operator).teardown() + } +} http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/636cd6f8/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala index c786047..4e95bdd 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala @@ -23,6 +23,7 @@ import java.time.Instant import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar @@ -38,11 +39,13 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with val dataSource = mock[DataSource] val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - - val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, None) + val operator = mock[SingleInputFunction[Any, Any]] + val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, Some(operator)) sourceTask.onStart(startTime) + verify(dataSource).open(taskContext, startTime) + verify(operator).setup() } } @@ -69,9 +72,12 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with val dataSource = mock[DataSource] val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, None) + val operator = mock[SingleInputFunction[Any, Any]] + val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, Some(operator)) sourceTask.onStop() + verify(dataSource).close() + verify(operator).teardown() } }
