Batch executions should block without timeout.

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3867dcd7
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3867dcd7
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3867dcd7

Branch: refs/heads/master
Commit: 3867dcd793adcb030faa4713624542210b86b68d
Parents: f47e0eb
Author: Sela <ans...@paypal.com>
Authored: Mon Feb 20 20:40:18 2017 +0200
Committer: Sela <ans...@paypal.com>
Committed: Wed Mar 1 00:18:05 2017 +0200

----------------------------------------------------------------------
 .../apache/beam/runners/spark/TestSparkRunner.java  | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3867dcd7/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
index 985f75d..d2b5186 100644
--- 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkRunner.java
@@ -116,7 +116,6 @@ public final class TestSparkRunner extends 
PipelineRunner<SparkPipelineResult> {
   @Override
   public SparkPipelineResult run(Pipeline pipeline) {
     SparkPipelineOptions sparkOptions = 
pipeline.getOptions().as(SparkPipelineOptions.class);
-    long timeout = sparkOptions.getForcedTimeout();
     SparkPipelineResult result = null;
     try {
       // clear state of Aggregators, Metrics and Watermarks.
@@ -126,14 +125,12 @@ public final class TestSparkRunner extends 
PipelineRunner<SparkPipelineResult> {
 
       TestPipelineOptions testPipelineOptions = 
pipeline.getOptions().as(TestPipelineOptions.class);
       LOG.info("About to run test pipeline " + sparkOptions.getJobName());
-      result = delegate.run(pipeline);
-      result.waitUntilFinish(Duration.millis(timeout));
-
-      assertThat(result, testPipelineOptions.getOnCreateMatcher());
-      assertThat(result, testPipelineOptions.getOnSuccessMatcher());
 
       // if the pipeline was executed in streaming mode, validate aggregators.
       if (isForceStreaming) {
+        result = delegate.run(pipeline);
+        long timeout = sparkOptions.getForcedTimeout();
+        result.waitUntilFinish(Duration.millis(timeout));
         // validate assertion succeeded (at least once).
         int successAssertions = 
result.getAggregatorValue(PAssert.SUCCESS_COUNTER, Integer.class);
         assertThat(
@@ -154,6 +151,13 @@ public final class TestSparkRunner extends 
PipelineRunner<SparkPipelineResult> {
                 "Successfully asserted pipeline %s with %d successful 
assertions.",
                 sparkOptions.getJobName(),
                 successAssertions));
+      } else {
+        // for batch test pipelines, run and block until done.
+        result = delegate.run(pipeline);
+        result.waitUntilFinish();
+        // assert via matchers.
+        assertThat(result, testPipelineOptions.getOnCreateMatcher());
+        assertThat(result, testPipelineOptions.getOnSuccessMatcher());
       }
     } finally {
       try {

Reply via email to