Repository: spark
Updated Branches:
  refs/heads/master 3afc1de89 -> d04634701


[SPARK-9504] [STREAMING] [TESTS] Use eventually to fix the flaky test

The previous code uses `ssc.awaitTerminationOrTimeout(500)`. Since nobody will 
stop it during `awaitTerminationOrTimeout`, it's just like `sleep(500)`. In a 
super overloaded Jenkins worker, the receiver may be not able to start in 500 
milliseconds. Verified this in the log of 
https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/39149/ There 
is no log about starting the receiver before this failure. That's why 
`assert(runningCount > 0)` failed.

This PR replaces `awaitTerminationOrTimeout` with `eventually` which should be 
more reliable.

Author: zsxwing <zsxw...@gmail.com>

Closes #7823 from zsxwing/SPARK-9504 and squashes the following commits:

7af66a6 [zsxwing] Remove wrong assertion
5ba2c99 [zsxwing] Use eventually to fix the flaky test


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0463470
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0463470
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0463470

Branch: refs/heads/master
Commit: d04634701413410938a133358fe1d9fbc077645e
Parents: 3afc1de
Author: zsxwing <zsxw...@gmail.com>
Authored: Fri Jul 31 12:10:55 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Fri Jul 31 12:10:55 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/streaming/StreamingContextSuite.scala    | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d0463470/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 84a5fbb..b7db280 100644
--- 
a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ 
b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -261,7 +261,7 @@ class StreamingContextSuite extends SparkFunSuite with 
BeforeAndAfter with Timeo
     for (i <- 1 to 4) {
       logInfo("==================================\n\n\n")
       ssc = new StreamingContext(sc, Milliseconds(100))
-      var runningCount = 0
+      @volatile var runningCount = 0
       TestReceiver.counter.set(1)
       val input = ssc.receiverStream(new TestReceiver)
       input.count().foreachRDD { rdd =>
@@ -270,14 +270,14 @@ class StreamingContextSuite extends SparkFunSuite with 
BeforeAndAfter with Timeo
         logInfo("Count = " + count + ", Running count = " + runningCount)
       }
       ssc.start()
-      ssc.awaitTerminationOrTimeout(500)
+      eventually(timeout(10.seconds), interval(10.millis)) {
+        assert(runningCount > 0)
+      }
       ssc.stop(stopSparkContext = false, stopGracefully = true)
       logInfo("Running count = " + runningCount)
       logInfo("TestReceiver.counter = " + TestReceiver.counter.get())
-      assert(runningCount > 0)
       assert(
-        (TestReceiver.counter.get() == runningCount + 1) ||
-          (TestReceiver.counter.get() == runningCount + 2),
+        TestReceiver.counter.get() == runningCount + 1,
         "Received records = " + TestReceiver.counter.get() + ", " +
           "processed records = " + runningCount
       )


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to