Repository: spark
Updated Branches:
  refs/heads/master 224e0e785 -> 207067ead


[SPARK-19822][TEST] CheckpointSuite.testCheckpointedOperation: should not 
filter checkpointFilesOfLatestTime with the PATH string.

## What changes were proposed in this pull request?

https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73800/testReport/

```
sbt.ForkMain$ForkError: 
org.scalatest.exceptions.TestFailedDueToTimeoutException: The code
passed to eventually never returned normally. Attempted 617 times over 
10.003740484 seconds.
Last failure message: 8 did not equal 2.
        at 
org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420)
        at 
org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438)
        at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
        at 
org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:336)
        at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478)
        at 
org.apache.spark.streaming.DStreamCheckpointTester$class.generateOutput(CheckpointSuite
.scala:172)
        at 
org.apache.spark.streaming.CheckpointSuite.generateOutput(CheckpointSuite.scala:211)
```

the check condition is:

```
val checkpointFilesOfLatestTime = 
Checkpoint.getCheckpointFiles(checkpointDir).filter {
     _.toString.contains(clock.getTimeMillis.toString)
}
// Checkpoint files are written twice for every batch interval. So assert that 
both
// are written to make sure that both of them have been written.
assert(checkpointFilesOfLatestTime.size === 2)
```

the path string may contain the `clock.getTimeMillis.toString`, like `3500` :

```
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-500
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-1000
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-1500
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-2000
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-2500
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3000
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3500.bk
file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3500
                                                       ▲▲▲▲
```

so we should only check the filename, but not the whole path.

## How was this patch tested?

Jenkins.

Author: uncleGen <husty...@gmail.com>

Closes #17167 from uncleGen/flaky-CheckpointSuite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/207067ea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/207067ea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/207067ea

Branch: refs/heads/master
Commit: 207067ead6db6dc87b0d144a658e2564e3280a89
Parents: 224e0e7
Author: uncleGen <husty...@gmail.com>
Authored: Sun Mar 5 18:17:30 2017 -0800
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Sun Mar 5 18:17:30 2017 -0800

----------------------------------------------------------------------
 .../test/scala/org/apache/spark/streaming/CheckpointSuite.scala  | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/207067ea/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala 
b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 7fcf45e..ee2fd45 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -152,11 +152,9 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
       stopSparkContext: Boolean
     ): Seq[Seq[V]] = {
     try {
-      val batchDuration = ssc.graph.batchDuration
       val batchCounter = new BatchCounter(ssc)
       ssc.start()
       val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
-      val currentTime = clock.getTimeMillis()
 
       logInfo("Manual clock before advancing = " + clock.getTimeMillis())
       clock.setTime(targetBatchTime.milliseconds)
@@ -171,7 +169,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
 
       eventually(timeout(10 seconds)) {
         val checkpointFilesOfLatestTime = 
Checkpoint.getCheckpointFiles(checkpointDir).filter {
-          _.toString.contains(clock.getTimeMillis.toString)
+          _.getName.contains(clock.getTimeMillis.toString)
         }
         // Checkpoint files are written twice for every batch interval. So 
assert that both
         // are written to make sure that both of them have been written.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to