Repository: spark Updated Branches: refs/heads/master 224e0e785 -> 207067ead
[SPARK-19822][TEST] CheckpointSuite.testCheckpointedOperation: should not filter checkpointFilesOfLatestTime with the PATH string. ## What changes were proposed in this pull request? https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/73800/testReport/ ``` sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedDueToTimeoutException: The code passed to eventually never returned normally. Attempted 617 times over 10.003740484 seconds. Last failure message: 8 did not equal 2. at org.scalatest.concurrent.Eventually$class.tryTryAgain$1(Eventually.scala:420) at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:438) at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478) at org.scalatest.concurrent.Eventually$class.eventually(Eventually.scala:336) at org.scalatest.concurrent.Eventually$.eventually(Eventually.scala:478) at org.apache.spark.streaming.DStreamCheckpointTester$class.generateOutput(CheckpointSuite .scala:172) at org.apache.spark.streaming.CheckpointSuite.generateOutput(CheckpointSuite.scala:211) ``` the check condition is: ``` val checkpointFilesOfLatestTime = Checkpoint.getCheckpointFiles(checkpointDir).filter { _.toString.contains(clock.getTimeMillis.toString) } // Checkpoint files are written twice for every batch interval. So assert that both // are written to make sure that both of them have been written. assert(checkpointFilesOfLatestTime.size === 2) ``` the path string may contain the `clock.getTimeMillis.toString`, like `3500` : ``` file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-500 file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-1000 file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-1500 file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-2000 file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-2500 file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3000 file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3500.bk file:/root/dev/spark/assembly/CheckpointSuite/spark-20035007-9891-4fb6-91c1-cc15b7ccaf15/checkpoint-3500 â²â²â²â² ``` so we should only check the filename, but not the whole path. ## How was this patch tested? Jenkins. Author: uncleGen <husty...@gmail.com> Closes #17167 from uncleGen/flaky-CheckpointSuite. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/207067ea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/207067ea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/207067ea Branch: refs/heads/master Commit: 207067ead6db6dc87b0d144a658e2564e3280a89 Parents: 224e0e7 Author: uncleGen <husty...@gmail.com> Authored: Sun Mar 5 18:17:30 2017 -0800 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Sun Mar 5 18:17:30 2017 -0800 ---------------------------------------------------------------------- .../test/scala/org/apache/spark/streaming/CheckpointSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/207067ea/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala ---------------------------------------------------------------------- diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index 7fcf45e..ee2fd45 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -152,11 +152,9 @@ trait DStreamCheckpointTester { self: SparkFunSuite => stopSparkContext: Boolean ): Seq[Seq[V]] = { try { - val batchDuration = ssc.graph.batchDuration val batchCounter = new BatchCounter(ssc) ssc.start() val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] - val currentTime = clock.getTimeMillis() logInfo("Manual clock before advancing = " + clock.getTimeMillis()) clock.setTime(targetBatchTime.milliseconds) @@ -171,7 +169,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite => eventually(timeout(10 seconds)) { val checkpointFilesOfLatestTime = Checkpoint.getCheckpointFiles(checkpointDir).filter { - _.toString.contains(clock.getTimeMillis.toString) + _.getName.contains(clock.getTimeMillis.toString) } // Checkpoint files are written twice for every batch interval. So assert that both // are written to make sure that both of them have been written. --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org