Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 3c0ebb13f -> a743a9ca6


[GEARPUMP-192] refactor example sources task to use DataSourceAPI

[GEARPUMP-192]  Refactor example sources task to use DataSourceAPI.

Author: Roshanson <[email protected]>
Author: [email protected] <doyouta123>

Closes #78 from Roshanson/fix-192.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/a743a9ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/a743a9ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/a743a9ca

Branch: refs/heads/master
Commit: a743a9ca6e9bd074c35097cd9602e606a351f46a
Parents: 3c0ebb1
Author: Roshanson <[email protected]>
Authored: Fri Sep 2 23:19:20 2016 +0800
Committer: manuzhang <[email protected]>
Committed: Fri Sep 2 23:19:20 2016 +0800

----------------------------------------------------------------------
 .../streaming/examples/wordcount/Split.scala    | 48 +++++++++++++-------
 .../examples/wordcount/WordCount.scala          | 23 ++++++----
 .../examples/wordcount/SplitSpec.scala          | 30 ++++--------
 .../examples/wordcount/WordCountSpec.scala      |  2 +-
 4 files changed, 58 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a743a9ca/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
index 44cf211..ad6f41a 100644
--- 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
+++ 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
@@ -19,33 +19,48 @@
 package org.apache.gearpump.streaming.examples.wordcount
 
 import java.time.Instant
-import java.util.concurrent.TimeUnit
 
 import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.source.Watermark
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.TaskContext
 
-class Split(taskContext: TaskContext, conf: UserConfig) extends 
Task(taskContext, conf) {
-  import taskContext.output
+import scala.collection.mutable.ArrayBuffer
+
+
+class Split extends DataSource {
+
+  val result = ArrayBuffer[Message]()
+  var item = -1
+  Split.TEXT_TO_SPLIT.lines.foreach { line =>
+    line.split("[\\s]+").filter(_.nonEmpty).foreach { msg =>
+      result.append(new Message(msg, System.currentTimeMillis()))
+
+    }
 
-  override def onStart(startTime: Instant): Unit = {
-    self ! Watermark(Instant.now)
   }
 
-  override def onNext(msg: Message): Unit = {
-    Split.TEXT_TO_SPLIT.lines.foreach { line =>
-      line.split("[\\s]+").filter(_.nonEmpty).foreach { msg =>
-        output(new Message(msg, System.currentTimeMillis()))
-      }
+  override def open(context: TaskContext, startTime: Instant): Unit = {}
+
+  override def read(): Message = {
+
+    if (item < result.size - 1) {
+      item += 1
+      result(item)
+    } else {
+      item = 0
+      result(item)
     }
 
-    import scala.concurrent.duration._
-    taskContext.scheduleOnce(Duration(100, TimeUnit.MILLISECONDS))(self !
-      Watermark(Instant.now))
   }
+
+  override def close(): Unit = {}
+
+  override def getWatermark: Instant = Instant.now()
+
+
 }
 
+
 object Split {
   val TEXT_TO_SPLIT =
     """
@@ -66,3 +81,4 @@ object Split {
       |   limitations under the License.
     """.stripMargin
 }
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a743a9ca/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
index 9917d9f..99b83ad 100644
--- 
a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
+++ 
b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala
@@ -18,16 +18,17 @@
 
 package org.apache.gearpump.streaming.examples.wordcount
 
-import org.slf4j.Logger
-
+import akka.actor.ActorSystem
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.embedded.EmbeddedCluster
 import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption, 
ParseResult}
 import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.source.DataSourceProcessor
 import org.apache.gearpump.streaming.{Processor, StreamApplication}
 import org.apache.gearpump.util.Graph.Node
 import org.apache.gearpump.util.{AkkaApp, Graph, LogUtil}
+import org.slf4j.Logger
 
 /** Same WordCount with low level Processor Graph syntax */
 object WordCount extends AkkaApp with ArgumentsParser {
@@ -35,21 +36,27 @@ object WordCount extends AkkaApp with ArgumentsParser {
   val RUN_FOR_EVER = -1
 
   override val options: Array[(String, CLIOption[Any])] = Array(
-    "split" -> CLIOption[Int]("<how many split tasks>", required = false, 
defaultValue = Some(1)),
+    "source" -> CLIOption[Int]("<how many source tasks>", required = false,
+      defaultValue = Some(1)),
     "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, 
defaultValue = Some(1)),
     "debug" -> CLIOption[Boolean]("<true|false>", required = false, 
defaultValue = Some(false)),
     "sleep" -> CLIOption[Int]("how many seconds to sleep for debug mode", 
required = false,
       defaultValue = Some(30))
   )
 
-  def application(config: ParseResult): StreamApplication = {
-    val splitNum = config.getInt("split")
+  def application(config: ParseResult, system: ActorSystem): StreamApplication 
= {
+    implicit val actorSystem = system
+
     val sumNum = config.getInt("sum")
-    val split = Processor[Split](splitNum)
+    val sourceNum = config.getInt("source")
+    val source = new Split
+    val sourceProcessor = DataSourceProcessor(source, sourceNum)
     val sum = Processor[Sum](sumNum)
     val partitioner = new HashPartitioner
+    val computation = sourceProcessor ~ partitioner ~>
+      sum
 
-    val app = StreamApplication("wordCount", Graph(split ~ partitioner ~> 
sum), UserConfig.empty)
+    val app = StreamApplication("wordCount", Graph(computation), 
UserConfig.empty)
     app
   }
 
@@ -72,7 +79,7 @@ object WordCount extends AkkaApp with ArgumentsParser {
       case None => ClientContext(akkaConf)
     }
 
-    val app = application(config)
+    val app = application(config, context.system)
     context.submit(app)
 
     if (debugMode) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a743a9ca/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
 
b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
index 46d9e97..7c9de35 100644
--- 
a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
+++ 
b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
@@ -19,20 +19,16 @@ package org.apache.gearpump.streaming.examples.wordcount
 
 import java.time.Instant
 
-import org.apache.gearpump.streaming.source.Watermark
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
 import akka.actor.ActorSystem
+import org.apache.gearpump.Message
 import akka.testkit.TestProbe
-import org.mockito.Matchers._
+import org.apache.gearpump.cluster.TestUtil
+import org.apache.gearpump.streaming.MockUtil
 import org.mockito.Mockito._
 import org.scalatest.{Matchers, WordSpec}
 
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.streaming.MockUtil
+import scala.concurrent.Await
+import scala.concurrent.duration.Duration
 
 class SplitSpec extends WordSpec with Matchers {
 
@@ -45,19 +41,13 @@ class SplitSpec extends WordSpec with Matchers {
 
       val mockTaskActor = TestProbe()
 
-      // Mock self ActorRef
       when(taskContext.self).thenReturn(mockTaskActor.ref)
 
-      val conf = UserConfig.empty
-      val split = new Split(taskContext, conf)
-      split.onStart(Instant.EPOCH)
-      mockTaskActor.expectMsgType[Watermark]
-
-      val expectedWordCount = Split.TEXT_TO_SPLIT.split( 
"""[\s\n]+""").count(_.nonEmpty)
-
-      split.onNext(Message("next"))
-      verify(taskContext, times(expectedWordCount)).output(anyObject())
-
+      val split = new Split
+      split.open(taskContext, Instant.now())
+      split.read() shouldBe a[Message]
+      split.close()
+      split.getWatermark
       system.terminate()
       Await.result(system.whenTerminated, Duration.Inf)
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/a743a9ca/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala
----------------------------------------------------------------------
diff --git 
a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala
 
b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala
index f703552..5121815 100644
--- 
a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala
+++ 
b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala
@@ -44,7 +44,7 @@ class WordCountSpec
   property("WordCount should succeed to submit application with required 
arguments") {
     val requiredArgs = Array.empty[String]
     val optionalArgs = Array(
-      "-split", "1",
+      "-source", "1",
       "-sum", "1")
 
     val args = {

Reply via email to