Repository: spark
Updated Branches:
  refs/heads/branch-1.4 0d061ff9e -> b6182ce89


[SPARK-7777] [STREAMING] Fix the flaky test in 
org.apache.spark.streaming.BasicOperationsSuite

Just added a guard to make sure a batch has completed before moving to the next 
batch.

Author: zsxwing <zsxw...@gmail.com>

Closes #6306 from zsxwing/SPARK-7777 and squashes the following commits:

ecee529 [zsxwing] Fix the failure message
58634fe [zsxwing] Fix the flaky test in 
org.apache.spark.streaming.BasicOperationsSuite

(cherry picked from commit 895baf8f77e630ce32b0e25b00bf5ee45d17398f)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b6182ce8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b6182ce8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b6182ce8

Branch: refs/heads/branch-1.4
Commit: b6182ce891d9d92b46bf536dffe4c648fe3d4d39
Parents: 0d061ff
Author: zsxwing <zsxw...@gmail.com>
Authored: Wed May 20 19:56:01 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed May 20 19:56:10 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/streaming/BasicOperationsSuite.scala     | 7 +++++++
 1 file changed, 7 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b6182ce8/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 87bc20f..f269cb7 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -557,6 +557,9 @@ class BasicOperationsSuite extends TestSuiteBase {
     withTestServer(new TestServer()) { testServer =>
       withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
         testServer.start()
+
+        val batchCounter = new BatchCounter(ssc)
+
         // Set up the streaming context and input streams
         val networkStream =
           ssc.socketTextStream("localhost", testServer.port, 
StorageLevel.MEMORY_AND_DISK)
@@ -587,7 +590,11 @@ class BasicOperationsSuite extends TestSuiteBase {
         for (i <- 0 until input.size) {
           testServer.send(input(i).toString + "\n")
           Thread.sleep(200)
+          val numCompletedBatches = batchCounter.getNumCompletedBatches
           clock.advance(batchDuration.milliseconds)
+          if (!batchCounter.waitUntilBatchesCompleted(numCompletedBatches + 1, 
5000)) {
+            fail("Batch took more than 5 seconds to complete")
+          }
           collectRddInfo()
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to