Batch executions should block without timeout.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3867dcd7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3867dcd7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3867dcd7 Branch: refs/heads/master Commit: 3867dcd793adcb030faa4713624542210b86b68d Parents: f47e0eb Author: Sela <ans...@paypal.com> Authored: Mon Feb 20 20:40:18 2017 +0200 Committer: Sela <ans...@paypal.com> Committed: Wed Mar 1 00:18:05 2017 +0200 ---------------------------------------------------------------------- .../apache/beam/runners/spark/TestSparkRunner.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3867dcd7/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java index 985f75d..d2b5186 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java @@ -116,7 +116,6 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { @Override public SparkPipelineResult run(Pipeline pipeline) { SparkPipelineOptions sparkOptions = pipeline.getOptions().as(SparkPipelineOptions.class); - long timeout = sparkOptions.getForcedTimeout(); SparkPipelineResult result = null; try { // clear state of Aggregators, Metrics and Watermarks. @@ -126,14 +125,12 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { TestPipelineOptions testPipelineOptions = pipeline.getOptions().as(TestPipelineOptions.class); LOG.info("About to run test pipeline " + sparkOptions.getJobName()); - result = delegate.run(pipeline); - result.waitUntilFinish(Duration.millis(timeout)); - - assertThat(result, testPipelineOptions.getOnCreateMatcher()); - assertThat(result, testPipelineOptions.getOnSuccessMatcher()); // if the pipeline was executed in streaming mode, validate aggregators. if (isForceStreaming) { + result = delegate.run(pipeline); + long timeout = sparkOptions.getForcedTimeout(); + result.waitUntilFinish(Duration.millis(timeout)); // validate assertion succeeded (at least once). int successAssertions = result.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class); assertThat( @@ -154,6 +151,13 @@ public final class TestSparkRunner extends PipelineRunner<SparkPipelineResult> { "Successfully asserted pipeline %s with %d successful assertions.", sparkOptions.getJobName(), successAssertions)); + } else { + // for batch test pipelines, run and block until done. + result = delegate.run(pipeline); + result.waitUntilFinish(); + // assert via matchers. + assertThat(result, testPipelineOptions.getOnCreateMatcher()); + assertThat(result, testPipelineOptions.getOnSuccessMatcher()); } } finally { try {