Streaming tests, especially the ones using checkpoints, need a time buffer to finish.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2c707431 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2c707431 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2c707431 Branch: refs/heads/master Commit: 2c70743121e3da352346c0db4d718c757e36dbe8 Parents: 3c94ff2 Author: Sela <ans...@paypal.com> Authored: Tue Feb 28 16:16:00 2017 +0200 Committer: Sela <ans...@paypal.com> Committed: Wed Mar 1 00:18:11 2017 +0200 ---------------------------------------------------------------------- .../translation/streaming/ResumeFromCheckpointStreamingTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2c707431/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java index 4eea383..7706777 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java @@ -166,7 +166,7 @@ public class ResumeFromCheckpointStreamingTest { // first run will read from Kafka backlog - "auto.offset.reset=smallest" SparkPipelineResult res = run(options); - res.waitUntilFinish(Duration.standardSeconds(2)); + res.waitUntilFinish(Duration.standardSeconds(5)); // assertions 1: long processedMessages1 = res.getAggregatorValue("processedMessages", Long.class); assertThat( @@ -193,7 +193,7 @@ public class ResumeFromCheckpointStreamingTest { // recovery should resume from last read offset, and read the second batch of input. res = runAgain(options); - res.waitUntilFinish(Duration.standardSeconds(2)); + res.waitUntilFinish(Duration.standardSeconds(5)); // assertions 2: long processedMessages2 = res.getAggregatorValue("processedMessages", Long.class); assertThat(