Repository: spark Updated Branches: refs/heads/branch-1.6 d98fb19c1 -> 4fdac3c27
[SPARK-6005][TESTS] Fix flaky test: o.a.s.streaming.kafka.DirectKafkaStreamSuite.offset recovery ## What changes were proposed in this pull request? Because this test extracts data from `DStream.generatedRDDs` before stopping, it may get data before checkpointing. Then after recovering from the checkpoint, `recoveredOffsetRanges` may contain something not in `offsetRangesBeforeStop`, which will fail the test. Adding `Thread.sleep(1000)` before `ssc.stop()` will reproduce this failure. This PR just moves the logic of `offsetRangesBeforeStop` (also renamed to `offsetRangesAfterStop`) after `ssc.stop()` to fix the flaky test. ## How was this patch tested? Jenkins unit tests. Author: Shixiong Zhu <shixi...@databricks.com> Closes #12903 from zsxwing/SPARK-6005. (cherry picked from commit 9533f5390a3ad7ab96a7bea01cdb6aed89503a51) Signed-off-by: Sean Owen <so...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4fdac3c2 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4fdac3c2 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4fdac3c2 Branch: refs/heads/branch-1.6 Commit: 4fdac3c271eccc5db69c45788af15e955752a163 Parents: d98fb19 Author: Shixiong Zhu <shixi...@databricks.com> Authored: Tue May 10 13:26:53 2016 -0700 Committer: Sean Owen <so...@cloudera.com> Committed: Wed Jun 22 14:10:50 2016 +0100 ---------------------------------------------------------------------- .../kafka/DirectKafkaStreamSuite.scala | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4fdac3c2/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 02225d5..feea0ae 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -280,14 +280,20 @@ class DirectKafkaStreamSuite sendDataAndWaitForReceive(i) } + ssc.stop() + // Verify that offset ranges were generated - val offsetRangesBeforeStop = getOffsetRanges(kafkaStream) - assert(offsetRangesBeforeStop.size >= 1, "No offset ranges generated") + // Since "offsetRangesAfterStop" will be used to compare with "recoveredOffsetRanges", we should + // collect offset ranges after stopping. Otherwise, because new RDDs keep being generated before + // stopping, we may not be able to get the latest RDDs, then "recoveredOffsetRanges" will + // contain something not in "offsetRangesAfterStop". + val offsetRangesAfterStop = getOffsetRanges(kafkaStream) + assert(offsetRangesAfterStop.size >= 1, "No offset ranges generated") assert( - offsetRangesBeforeStop.head._2.forall { _.fromOffset === 0 }, + offsetRangesAfterStop.head._2.forall { _.fromOffset === 0 }, "starting offset not zero" ) - ssc.stop() + logInfo("====== RESTARTING ========") // Recover context from checkpoints @@ -297,12 +303,14 @@ class DirectKafkaStreamSuite // Verify offset ranges have been recovered val recoveredOffsetRanges = getOffsetRanges(recoveredStream) assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered") - val earlierOffsetRangesAsSets = offsetRangesBeforeStop.map { x => (x._1, x._2.toSet) } + val earlierOffsetRangesAsSets = offsetRangesAfterStop.map { x => (x._1, x._2.toSet) } assert( recoveredOffsetRanges.forall { or => earlierOffsetRangesAsSets.contains((or._1, or._2.toSet)) }, - "Recovered ranges are not the same as the ones generated" + "Recovered ranges are not the same as the ones generated\n" + + s"recoveredOffsetRanges: $recoveredOffsetRanges\n" + + s"earlierOffsetRangesAsSets: $earlierOffsetRangesAsSets" ) // Restart context, give more data and verify the total at the end // If the total is write that means each records has been received only once --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org