Repository: incubator-gearpump Updated Branches: refs/heads/master a743a9ca6 -> 5bf7c7cb6
[GEARPUMP-201] integration test failure Author: manuzhang <[email protected]> Closes #79 from manuzhang/fix_it. Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/5bf7c7cb Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/5bf7c7cb Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/5bf7c7cb Branch: refs/heads/master Commit: 5bf7c7cb606aca8c8ffc93375b8c6316f8f6624c Parents: a743a9c Author: manuzhang <[email protected]> Authored: Mon Sep 5 15:39:04 2016 +0800 Committer: manuzhang <[email protected]> Committed: Mon Sep 5 15:39:04 2016 +0800 ---------------------------------------------------------------------- .../streaming/examples/wordcount/Split.scala | 14 +++------ .../examples/wordcount/WordCount.scala | 12 +++---- .../examples/wordcount/WordCountSpec.scala | 2 +- .../checklist/DynamicDagSpec.scala | 33 ++++++++++---------- 4 files changed, 27 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5bf7c7cb/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala index ad6f41a..c07e124 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala @@ -29,14 +29,12 @@ import scala.collection.mutable.ArrayBuffer class Split extends DataSource { - val result = ArrayBuffer[Message]() - var item = -1 + private val result = ArrayBuffer[String]() + private var item = -1 Split.TEXT_TO_SPLIT.lines.foreach { line => line.split("[\\s]+").filter(_.nonEmpty).foreach { msg => - result.append(new Message(msg, System.currentTimeMillis())) - + result.append(msg) } - } override def open(context: TaskContext, startTime: Instant): Unit = {} @@ -45,10 +43,10 @@ class Split extends DataSource { if (item < result.size - 1) { item += 1 - result(item) + Message(result(item), System.currentTimeMillis()) } else { item = 0 - result(item) + Message(result(item), System.currentTimeMillis()) } } @@ -57,10 +55,8 @@ class Split extends DataSource { override def getWatermark: Instant = Instant.now() - } - object Split { val TEXT_TO_SPLIT = """ http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5bf7c7cb/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala index 99b83ad..9580e63 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/WordCount.scala @@ -36,7 +36,7 @@ object WordCount extends AkkaApp with ArgumentsParser { val RUN_FOR_EVER = -1 override val options: Array[(String, CLIOption[Any])] = Array( - "source" -> CLIOption[Int]("<how many source tasks>", required = false, + "split" -> CLIOption[Int]("<how many source tasks>", required = false, defaultValue = Some(1)), "sum" -> CLIOption[Int]("<how many sum tasks>", required = false, defaultValue = Some(1)), "debug" -> CLIOption[Boolean]("<true|false>", required = false, defaultValue = Some(false)), @@ -48,14 +48,12 @@ object WordCount extends AkkaApp with ArgumentsParser { implicit val actorSystem = system val sumNum = config.getInt("sum") - val sourceNum = config.getInt("source") - val source = new Split - val sourceProcessor = DataSourceProcessor(source, sourceNum) + val splitNum = config.getInt("split") + val split = new Split + val sourceProcessor = DataSourceProcessor(split, splitNum, "Split") val sum = Processor[Sum](sumNum) val partitioner = new HashPartitioner - val computation = sourceProcessor ~ partitioner ~> - sum - + val computation = sourceProcessor ~ partitioner ~> sum val app = StreamApplication("wordCount", Graph(computation), UserConfig.empty) app } http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5bf7c7cb/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala ---------------------------------------------------------------------- diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala index 5121815..f703552 100644 --- a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala +++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/WordCountSpec.scala @@ -44,7 +44,7 @@ class WordCountSpec property("WordCount should succeed to submit application with required arguments") { val requiredArgs = Array.empty[String] val optionalArgs = Array( - "-source", "1", + "-split", "1", "-sum", "1") val args = { http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/5bf7c7cb/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala ---------------------------------------------------------------------- diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala index 81e7b2a..89b8ef7 100644 --- a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala +++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala @@ -24,10 +24,9 @@ import org.apache.gearpump.streaming.appmaster.ProcessorSummary class DynamicDagSpec extends TestSpecBase { + val sourceTaskClass = "org.apache.gearpump.streaming.examples.sol.SOLStreamProducer" + val sinkTaskClass = "org.apache.gearpump.streaming.examples.sol.SOLStreamProcessor" lazy val solJar = cluster.queryBuiltInExampleJars("sol-").head - val splitTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Split" - val sumTaskClass = "org.apache.gearpump.streaming.examples.wordcount.Sum" - val solName = "sol" "dynamic dag" should { "can retrieve a list of built-in partitioner classes" in { @@ -42,13 +41,13 @@ class DynamicDagSpec extends TestSpecBase { // todo: blocked by #1450 } - "can replace downstream with wordcount's sum processor (new processor will have metrics)" in { + "can replace downstream with SOLStreamProcessor (new processor will have metrics)" in { // setup - val appId = expectSolJarSubmittedWithAppId() + val appId = expectWordCountJarSubmittedWithAppId() // exercise val formerProcessors = restClient.queryStreamingAppDetail(appId).processors - replaceProcessor(appId, 1, sumTaskClass) + replaceProcessor(appId, 1, sinkTaskClass) var laterProcessors: Map[ProcessorId, ProcessorSummary] = null Util.retryUntil(() => { laterProcessors = restClient.queryStreamingAppDetail(appId).processors @@ -57,13 +56,13 @@ class DynamicDagSpec extends TestSpecBase { processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput") } - "can replace upstream with wordcount's split processor (new processor will have metrics)" in { + "can replace upstream with SOLStreamProducer (new processor will have metrics)" in { // setup - val appId = expectSolJarSubmittedWithAppId() + val appId = expectWordCountJarSubmittedWithAppId() // exercise val formerProcessors = restClient.queryStreamingAppDetail(appId).processors - replaceProcessor(appId, 0, splitTaskClass) + replaceProcessor(appId, 0, sourceTaskClass) var laterProcessors: Map[ProcessorId, ProcessorSummary] = null Util.retryUntil(() => { laterProcessors = restClient.queryStreamingAppDetail(appId).processors @@ -74,11 +73,11 @@ class DynamicDagSpec extends TestSpecBase { "fall back to last dag version when replacing a processor failid" in { // setup - val appId = expectSolJarSubmittedWithAppId() + val appId = expectWordCountJarSubmittedWithAppId() // exercise val formerProcessors = restClient.queryStreamingAppDetail(appId).processors - replaceProcessor(appId, 1, sumTaskClass) + replaceProcessor(appId, 1, sinkTaskClass) var laterProcessors: Map[ProcessorId, ProcessorSummary] = null Util.retryUntil(() => { laterProcessors = restClient.queryStreamingAppDetail(appId).processors @@ -99,12 +98,12 @@ class DynamicDagSpec extends TestSpecBase { "fall back to last dag version when AppMaster HA triggered" in { // setup - val appId = expectSolJarSubmittedWithAppId() + val appId = expectWordCountJarSubmittedWithAppId() // exercise val formerAppMaster = restClient.queryApp(appId).appMasterPath val formerProcessors = restClient.queryStreamingAppDetail(appId).processors - replaceProcessor(appId, 1, sumTaskClass) + replaceProcessor(appId, 1, sinkTaskClass) var laterProcessors: Map[ProcessorId, ProcessorSummary] = null Util.retryUntil(() => { laterProcessors = restClient.queryStreamingAppDetail(appId).processors @@ -120,11 +119,11 @@ class DynamicDagSpec extends TestSpecBase { } } - private def expectSolJarSubmittedWithAppId(): Int = { + private def expectWordCountJarSubmittedWithAppId(): Int = { val appId = restClient.getNextAvailableAppId() - val success = restClient.submitApp(solJar, cluster.getWorkerHosts.length) + val success = restClient.submitApp(wordCountJar, cluster.getWorkerHosts.length) success shouldBe true - expectAppIsRunning(appId, solName) + expectAppIsRunning(appId, wordCountName) Util.retryUntil(() => restClient.queryStreamingAppDetail(appId).clock > 0, "app running") appId } @@ -135,7 +134,7 @@ class DynamicDagSpec extends TestSpecBase { newTaskClass: String, newProcessorDescription: String = "", newParallelism: Int = 1): Unit = { - val uploadedJar = restClient.uploadJar(wordCountJar) + val uploadedJar = restClient.uploadJar(solJar) val replaceMe = new ProcessorDescription(formerProcessorId, newTaskClass, newParallelism, newProcessorDescription, jar = uploadedJar)
