spark git commit: [SPARK-15508][STREAMING][TESTS] Fix flaky test: JavaKafkaStreamSuite.testKafkaStream

2016-05-24 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 fb7b90f61 -> 6f22ba3e1


[SPARK-15508][STREAMING][TESTS] Fix flaky test: 
JavaKafkaStreamSuite.testKafkaStream

## What changes were proposed in this pull request?

`JavaKafkaStreamSuite.testKafkaStream` assumes when `sent.size == result.size`, 
the contents of `sent` and `result` should be same. However, that's not true. 
The content of `result` may not be the final content.

This PR modified the test to always retry the assertions even if the contents 
of `sent` and `result` are not same.

Here is the failure in Jenkins: 
http://spark-tests.appspot.com/tests/org.apache.spark.streaming.kafka.JavaKafkaStreamSuite/testKafkaStream

## How was this patch tested?

Jenkins unit tests.

Author: Shixiong Zhu 

Closes #13281 from zsxwing/flaky-kafka-test.

(cherry picked from commit c9c1c0e54d34773ac2cf5457fe5925559ece36c7)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6f22ba3e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6f22ba3e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6f22ba3e

Branch: refs/heads/branch-2.0
Commit: 6f22ba3e16c93c83f38ef01cbe511792fe0772ef
Parents: fb7b90f
Author: Shixiong Zhu 
Authored: Tue May 24 22:01:40 2016 -0700
Committer: Shixiong Zhu 
Committed: Tue May 24 22:01:46 2016 -0700

--
 .../streaming/kafka/JavaKafkaStreamSuite.java   | 21 ++--
 1 file changed, 15 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6f22ba3e/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
--
diff --git 
a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
 
b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 868df64..98fe38e 100644
--- 
a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ 
b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -122,14 +122,23 @@ public class JavaKafkaStreamSuite implements Serializable 
{
 ssc.start();
 
 long startTime = System.currentTimeMillis();
-boolean sizeMatches = false;
-while (!sizeMatches && System.currentTimeMillis() - startTime < 2) {
-  sizeMatches = sent.size() == result.size();
+AssertionError lastError = null;
+while (System.currentTimeMillis() - startTime < 2) {
+  try {
+Assert.assertEquals(sent.size(), result.size());
+for (Map.Entry e : sent.entrySet()) {
+  Assert.assertEquals(e.getValue().intValue(), 
result.get(e.getKey()).intValue());
+}
+return;
+  } catch (AssertionError e) {
+lastError = e;
+  }
   Thread.sleep(200);
 }
-Assert.assertEquals(sent.size(), result.size());
-for (Map.Entry e : sent.entrySet()) {
-  Assert.assertEquals(e.getValue().intValue(), 
result.get(e.getKey()).intValue());
+if (lastError != null) {
+  throw lastError;
+} else {
+  Assert.fail("timeout");
 }
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-15508][STREAMING][TESTS] Fix flaky test: JavaKafkaStreamSuite.testKafkaStream

2016-05-24 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 50b660d72 -> c9c1c0e54


[SPARK-15508][STREAMING][TESTS] Fix flaky test: 
JavaKafkaStreamSuite.testKafkaStream

## What changes were proposed in this pull request?

`JavaKafkaStreamSuite.testKafkaStream` assumes when `sent.size == result.size`, 
the contents of `sent` and `result` should be same. However, that's not true. 
The content of `result` may not be the final content.

This PR modified the test to always retry the assertions even if the contents 
of `sent` and `result` are not same.

Here is the failure in Jenkins: 
http://spark-tests.appspot.com/tests/org.apache.spark.streaming.kafka.JavaKafkaStreamSuite/testKafkaStream

## How was this patch tested?

Jenkins unit tests.

Author: Shixiong Zhu 

Closes #13281 from zsxwing/flaky-kafka-test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c9c1c0e5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c9c1c0e5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c9c1c0e5

Branch: refs/heads/master
Commit: c9c1c0e54d34773ac2cf5457fe5925559ece36c7
Parents: 50b660d
Author: Shixiong Zhu 
Authored: Tue May 24 22:01:40 2016 -0700
Committer: Shixiong Zhu 
Committed: Tue May 24 22:01:40 2016 -0700

--
 .../streaming/kafka/JavaKafkaStreamSuite.java   | 21 ++--
 1 file changed, 15 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c9c1c0e5/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
--
diff --git 
a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
 
b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 868df64..98fe38e 100644
--- 
a/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ 
b/external/kafka-0-8/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -122,14 +122,23 @@ public class JavaKafkaStreamSuite implements Serializable 
{
 ssc.start();
 
 long startTime = System.currentTimeMillis();
-boolean sizeMatches = false;
-while (!sizeMatches && System.currentTimeMillis() - startTime < 2) {
-  sizeMatches = sent.size() == result.size();
+AssertionError lastError = null;
+while (System.currentTimeMillis() - startTime < 2) {
+  try {
+Assert.assertEquals(sent.size(), result.size());
+for (Map.Entry e : sent.entrySet()) {
+  Assert.assertEquals(e.getValue().intValue(), 
result.get(e.getKey()).intValue());
+}
+return;
+  } catch (AssertionError e) {
+lastError = e;
+  }
   Thread.sleep(200);
 }
-Assert.assertEquals(sent.size(), result.size());
-for (Map.Entry e : sent.entrySet()) {
-  Assert.assertEquals(e.getValue().intValue(), 
result.get(e.getKey()).intValue());
+if (lastError != null) {
+  throw lastError;
+} else {
+  Assert.fail("timeout");
 }
   }
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org