Repository: spark
Updated Branches:
  refs/heads/branch-1.6 d98fb19c1 -> 4fdac3c27


[SPARK-6005][TESTS] Fix flaky test: 
o.a.s.streaming.kafka.DirectKafkaStreamSuite.offset recovery

## What changes were proposed in this pull request?

Because this test extracts data from `DStream.generatedRDDs` before stopping, 
it may get data before checkpointing. Then after recovering from the 
checkpoint, `recoveredOffsetRanges` may contain something not in 
`offsetRangesBeforeStop`, which will fail the test. Adding `Thread.sleep(1000)` 
before `ssc.stop()` will reproduce this failure.

This PR just moves the logic of `offsetRangesBeforeStop` (also renamed to 
`offsetRangesAfterStop`) after `ssc.stop()` to fix the flaky test.

## How was this patch tested?

Jenkins unit tests.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #12903 from zsxwing/SPARK-6005.

(cherry picked from commit 9533f5390a3ad7ab96a7bea01cdb6aed89503a51)
Signed-off-by: Sean Owen <so...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4fdac3c2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4fdac3c2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4fdac3c2

Branch: refs/heads/branch-1.6
Commit: 4fdac3c271eccc5db69c45788af15e955752a163
Parents: d98fb19
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Tue May 10 13:26:53 2016 -0700
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Jun 22 14:10:50 2016 +0100

----------------------------------------------------------------------
 .../kafka/DirectKafkaStreamSuite.scala          | 20 ++++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4fdac3c2/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
index 02225d5..feea0ae 100644
--- 
a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
+++ 
b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala
@@ -280,14 +280,20 @@ class DirectKafkaStreamSuite
       sendDataAndWaitForReceive(i)
     }
 
+    ssc.stop()
+
     // Verify that offset ranges were generated
-    val offsetRangesBeforeStop = getOffsetRanges(kafkaStream)
-    assert(offsetRangesBeforeStop.size >= 1, "No offset ranges generated")
+    // Since "offsetRangesAfterStop" will be used to compare with 
"recoveredOffsetRanges", we should
+    // collect offset ranges after stopping. Otherwise, because new RDDs keep 
being generated before
+    // stopping, we may not be able to get the latest RDDs, then 
"recoveredOffsetRanges" will
+    // contain something not in "offsetRangesAfterStop".
+    val offsetRangesAfterStop = getOffsetRanges(kafkaStream)
+    assert(offsetRangesAfterStop.size >= 1, "No offset ranges generated")
     assert(
-      offsetRangesBeforeStop.head._2.forall { _.fromOffset === 0 },
+      offsetRangesAfterStop.head._2.forall { _.fromOffset === 0 },
       "starting offset not zero"
     )
-    ssc.stop()
+
     logInfo("====== RESTARTING ========")
 
     // Recover context from checkpoints
@@ -297,12 +303,14 @@ class DirectKafkaStreamSuite
     // Verify offset ranges have been recovered
     val recoveredOffsetRanges = getOffsetRanges(recoveredStream)
     assert(recoveredOffsetRanges.size > 0, "No offset ranges recovered")
-    val earlierOffsetRangesAsSets = offsetRangesBeforeStop.map { x => (x._1, 
x._2.toSet) }
+    val earlierOffsetRangesAsSets = offsetRangesAfterStop.map { x => (x._1, 
x._2.toSet) }
     assert(
       recoveredOffsetRanges.forall { or =>
         earlierOffsetRangesAsSets.contains((or._1, or._2.toSet))
       },
-      "Recovered ranges are not the same as the ones generated"
+      "Recovered ranges are not the same as the ones generated\n" +
+        s"recoveredOffsetRanges: $recoveredOffsetRanges\n" +
+        s"earlierOffsetRangesAsSets: $earlierOffsetRangesAsSets"
     )
     // Restart context, give more data and verify the total at the end
     // If the total is write that means each records has been received only 
once


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to