Repository: spark Updated Branches: refs/heads/branch-1.4 0d061ff9e -> b6182ce89
[SPARK-7777] [STREAMING] Fix the flaky test in org.apache.spark.streaming.BasicOperationsSuite Just added a guard to make sure a batch has completed before moving to the next batch. Author: zsxwing <zsxw...@gmail.com> Closes #6306 from zsxwing/SPARK-7777 and squashes the following commits: ecee529 [zsxwing] Fix the failure message 58634fe [zsxwing] Fix the flaky test in org.apache.spark.streaming.BasicOperationsSuite (cherry picked from commit 895baf8f77e630ce32b0e25b00bf5ee45d17398f) Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b6182ce8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b6182ce8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b6182ce8 Branch: refs/heads/branch-1.4 Commit: b6182ce891d9d92b46bf536dffe4c648fe3d4d39 Parents: 0d061ff Author: zsxwing <zsxw...@gmail.com> Authored: Wed May 20 19:56:01 2015 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Wed May 20 19:56:10 2015 -0700 ---------------------------------------------------------------------- .../org/apache/spark/streaming/BasicOperationsSuite.scala | 7 +++++++ 1 file changed, 7 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/b6182ce8/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala index 87bc20f..f269cb7 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala @@ -557,6 +557,9 @@ class BasicOperationsSuite extends TestSuiteBase { withTestServer(new TestServer()) { testServer => withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => testServer.start() + + val batchCounter = new BatchCounter(ssc) + // Set up the streaming context and input streams val networkStream = ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK) @@ -587,7 +590,11 @@ class BasicOperationsSuite extends TestSuiteBase { for (i <- 0 until input.size) { testServer.send(input(i).toString + "\n") Thread.sleep(200) + val numCompletedBatches = batchCounter.getNumCompletedBatches clock.advance(batchDuration.milliseconds) + if (!batchCounter.waitUntilBatchesCompleted(numCompletedBatches + 1, 5000)) { + fail("Batch took more than 5 seconds to complete") + } collectRddInfo() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org