Streaming tests, especially the ones using checkpoints, need a time buffer to 
finish.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2c707431
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2c707431
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2c707431

Branch: refs/heads/master
Commit: 2c70743121e3da352346c0db4d718c757e36dbe8
Parents: 3c94ff2
Author: Sela <ans...@paypal.com>
Authored: Tue Feb 28 16:16:00 2017 +0200
Committer: Sela <ans...@paypal.com>
Committed: Wed Mar 1 00:18:11 2017 +0200

----------------------------------------------------------------------
 .../translation/streaming/ResumeFromCheckpointStreamingTest.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/2c707431/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 4eea383..7706777 100644
--- 
a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ 
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -166,7 +166,7 @@ public class ResumeFromCheckpointStreamingTest {
 
     // first run will read from Kafka backlog - "auto.offset.reset=smallest"
     SparkPipelineResult res = run(options);
-    res.waitUntilFinish(Duration.standardSeconds(2));
+    res.waitUntilFinish(Duration.standardSeconds(5));
     // assertions 1:
     long processedMessages1 = res.getAggregatorValue("processedMessages", 
Long.class);
     assertThat(
@@ -193,7 +193,7 @@ public class ResumeFromCheckpointStreamingTest {
 
     // recovery should resume from last read offset, and read the second batch 
of input.
     res = runAgain(options);
-    res.waitUntilFinish(Duration.standardSeconds(2));
+    res.waitUntilFinish(Duration.standardSeconds(5));
     // assertions 2:
     long processedMessages2 = res.getAggregatorValue("processedMessages", 
Long.class);
     assertThat(

Reply via email to