Repository: incubator-gearpump Updated Branches: refs/heads/master f200da531 -> 385a612bb
[GEARPUMP-261] Translate ChainableOp to Processor of TransformTask Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [x] Make sure the commit message is formatted like: `[GEARPUMP-<Jira issue #>] Meaningful description of pull request` - [x] Make sure tests pass via `sbt clean test`. - [x] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality. Author: manuzhang <[email protected]> Closes #130 from manuzhang/chainable_op. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/385a612b Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/385a612b Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/385a612b Branch: refs/heads/master Commit: 385a612bb916512fdf40afc8cc9c8cab888554c1 Parents: f200da5 Author: manuzhang <[email protected]> Authored: Mon Jan 9 08:57:02 2017 +0800 Committer: huafengw <[email protected]> Committed: Mon Jan 9 08:57:02 2017 +0800 ---------------------------------------------------------------------- .../org/apache/gearpump/streaming/dsl/plan/OP.scala | 3 ++- .../apache/gearpump/streaming/dsl/plan/OpSpec.scala | 15 ++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/385a612b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala index 744976b..f15d875 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala @@ -141,7 +141,8 @@ case class ChainableOp[IN, OUT]( } override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { - throw new UnsupportedOperationException("ChainedOp cannot be translated to Processor") + Processor[TransformTask[Any, Any]](1, description, + userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, fn)) } } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/385a612b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala index bf52abc..98bf24f 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala @@ -155,12 +155,17 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS } } - "throw exception on getProcessor" in { - val fn1 = mock[SingleInputFunction[Any, Any]] - val chainableOp1 = ChainableOp[Any, Any](fn1) - intercept[UnsupportedOperationException] { - chainableOp1.getProcessor + "get Processor" in { + val fn = new SingleInputFunction[Any, Any] { + override def process(value: Any): TraversableOnce[Any] = null + + override def description: String = null } + val chainableOp = ChainableOp[Any, Any](fn) + + val processor = chainableOp.getProcessor + processor shouldBe a[Processor[_]] + processor.parallelism shouldBe 1 } }
