Repository: spark
Updated Branches:
  refs/heads/branch-1.5 6a616d0d0 -> 4174b94f0


[SPARK-10769] [STREAMING] [TESTS] Fix o.a.s.streaming.CheckpointSuite.maintains 
rate controller

Fixed the following failure in 
https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1787/testReport/junit/org.apache.spark.streaming/CheckpointSuite/recovery_maintains_rate_controller/
```
sbt.ForkMain$ForkError: The code passed to eventually never returned normally. 
Attempted 660 times over 10.000044392000001 seconds. Last failure message: 
9223372036854775807 did not equal 200.
        at 
org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
        at 
org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
        at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
        at 
org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:336)
        at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
        at 
org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply$mcV$sp(CheckpointSuite.scala:413)
        at 
org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply(CheckpointSuite.scala:396)
        at 
org.apache.spark.streaming.CheckpointSuite$$anonfun$15.apply(CheckpointSuite.scala:396)
        at 
org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
        at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
        at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
        at org.scalatest.Transformer.apply(Transformer.scala:22)
```

In this test, it calls `advanceTimeWithRealDelay(ssc, 2)` to run two batch 
jobs. However, one race condition is these two jobs can finish before the 
receiver is registered. Then `UpdateRateLimit` won't be sent to the receiver 
and `getDefaultBlockGeneratorRateLimit` cannot be updated.

Here are the logs related to this issue:
```
15/09/22 19:28:26.154 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO 
CheckpointSuite: Manual clock before advancing = 2500

15/09/22 19:28:26.869 JobScheduler INFO JobScheduler: Finished job streaming 
job 3000 ms.0 from job set of time 3000 ms
15/09/22 19:28:26.869 JobScheduler INFO JobScheduler: Total delay: 
1442975303.869 s for time 3000 ms (execution: 0.711 s)

15/09/22 19:28:26.873 JobScheduler INFO JobScheduler: Finished job streaming 
job 3500 ms.0 from job set of time 3500 ms
15/09/22 19:28:26.873 JobScheduler INFO JobScheduler: Total delay: 
1442975303.373 s for time 3500 ms (execution: 0.004 s)

15/09/22 19:28:26.879 sparkDriver-akka.actor.default-dispatcher-3 INFO 
ReceiverTracker: Registered receiver for stream 0 from localhost:57749

15/09/22 19:28:27.154 pool-1-thread-1-ScalaTest-running-CheckpointSuite INFO 
CheckpointSuite: Manual clock after advancing = 3500
```
`advanceTimeWithRealDelay(ssc, 2)` triggered job 3000ms and 3500ms but the 
receiver was registered after job 3000ms and 3500ms finished.

So we should make sure the receiver online before running 
`advanceTimeWithRealDelay(ssc, 2)`.

Author: zsxwing <zsxw...@gmail.com>

Closes #8877 from zsxwing/SPARK-10769.

(cherry picked from commit 50e4634236668a0195390f0080d0ac230d428d05)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4174b94f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4174b94f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4174b94f

Branch: refs/heads/branch-1.5
Commit: 4174b94f05282ca51f1219aa6aba3226e205aee0
Parents: 6a616d0
Author: zsxwing <zsxw...@gmail.com>
Authored: Wed Sep 23 01:29:30 2015 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Wed Sep 23 01:30:21 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/streaming/CheckpointSuite.scala     | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4174b94f/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 1bba7a1..a695653 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -408,10 +408,14 @@ class CheckpointSuite extends TestSuiteBase {
 
     ssc = new StreamingContext(checkpointDir)
     ssc.start()
-    val outputNew = advanceTimeWithRealDelay(ssc, 2)
 
     eventually(timeout(10.seconds)) {
       assert(RateTestReceiver.getActive().nonEmpty)
+    }
+
+    advanceTimeWithRealDelay(ssc, 2)
+
+    eventually(timeout(10.seconds)) {
       
assert(RateTestReceiver.getActive().get.getDefaultBlockGeneratorRateLimit() === 
200)
     }
     ssc.stop()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to