TestDataflowRunner: throw AssertionError only when assertion known failed

It is quite confusing to receive an assertion error when in fact the pipeline
has crashed because of user error interacting with e.g. timers.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1a8a217b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1a8a217b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1a8a217b

Branch: refs/heads/master
Commit: 1a8a217bf8e291a2930c99dba58a305deea3270f
Parents: 8641675
Author: Kenneth Knowles <k...@google.com>
Authored: Sun Apr 30 16:08:48 2017 -0700
Committer: Kenneth Knowles <k...@google.com>
Committed: Wed May 10 14:00:03 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/TestDataflowRunner.java    | 254 ++++++++++++-------
 .../dataflow/TestDataflowRunnerTest.java        |  42 +--
 2 files changed, 172 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/1a8a217b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
index b81b487..1abea99 100644
--- 
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
+++ 
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/TestDataflowRunner.java
@@ -29,6 +29,7 @@ import com.google.common.base.Strings;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.List;
+import java.util.concurrent.Callable;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
 import org.apache.beam.runners.dataflow.util.MonitoringUtil;
@@ -45,8 +46,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * {@link TestDataflowRunner} is a pipeline runner that wraps a
- * {@link DataflowRunner} when running tests against the {@link TestPipeline}.
+ * {@link TestDataflowRunner} is a pipeline runner that wraps a {@link 
DataflowRunner} when running
+ * tests against the {@link TestPipeline}.
  *
  * @see TestPipeline
  */
@@ -65,16 +66,12 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     this.runner = DataflowRunner.fromOptions(options);
   }
 
-  /**
-   * Constructs a runner from the provided options.
-   */
+  /** Constructs a runner from the provided options. */
   public static TestDataflowRunner fromOptions(PipelineOptions options) {
     TestDataflowPipelineOptions dataflowOptions = 
options.as(TestDataflowPipelineOptions.class);
-    String tempLocation = Joiner.on("/").join(
-        dataflowOptions.getTempRoot(),
-        dataflowOptions.getJobName(),
-        "output",
-        "results");
+    String tempLocation =
+        Joiner.on("/")
+            .join(dataflowOptions.getTempRoot(), dataflowOptions.getJobName(), 
"output", "results");
     dataflowOptions.setTempLocation(tempLocation);
 
     return new TestDataflowRunner(
@@ -99,88 +96,115 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     final DataflowPipelineJob job;
     job = runner.run(pipeline);
 
-    LOG.info("Running Dataflow job {} with {} expected assertions.",
-        job.getJobId(), expectedNumberOfAssertions);
+    LOG.info(
+        "Running Dataflow job {} with {} expected assertions.",
+        job.getJobId(),
+        expectedNumberOfAssertions);
 
     assertThat(job, testPipelineOptions.getOnCreateMatcher());
 
-    final ErrorMonitorMessagesHandler messageHandler =
+    Boolean jobSuccess;
+    Optional<Boolean> allAssertionsPassed;
+
+    ErrorMonitorMessagesHandler messageHandler =
         new ErrorMonitorMessagesHandler(job, new 
MonitoringUtil.LoggingHandler());
 
-    try {
-      Optional<Boolean> result = Optional.absent();
-
-      if (options.isStreaming()) {
-        // In streaming, there are infinite retries, so rather than timeout
-        // we try to terminate early by polling and canceling if we see
-        // an error message
-        while (true) {
-          State state = job.waitUntilFinish(Duration.standardSeconds(3), 
messageHandler);
-          if (state != null && state.isTerminal()) {
-            break;
-          }
+    if (options.isStreaming()) {
+      jobSuccess = waitForStreamingJobTermination(job, messageHandler);
+      // No metrics in streaming
+      allAssertionsPassed = Optional.absent();
+    } else {
+      jobSuccess = waitForBatchJobTermination(job, messageHandler);
+      allAssertionsPassed = checkForPAssertSuccess(job);
+    }
 
-          if (messageHandler.hasSeenError()) {
-            if (!job.getState().isTerminal()) {
-              LOG.info("Cancelling Dataflow job {}", job.getJobId());
-              job.cancel();
-            }
-            break;
-          }
-        }
+    // If there is a certain assertion failure, throw the most precise 
exception we can.
+    // There are situations where the metric will not be available, but as 
long as we recover
+    // the actionable message from the logs it is acceptable.
+    if (!allAssertionsPassed.isPresent()) {
+      LOG.warn("Dataflow job {} did not output a success or failure metric.", 
job.getJobId());
+    } else if (!allAssertionsPassed.get()) {
+      throw new AssertionError(errorMessage(job, messageHandler));
+    }
 
-        // Whether we canceled or not, this gets the final state of the job or 
times out
-        State finalState =
-            job.waitUntilFinish(
-                Duration.standardSeconds(options.getTestTimeoutSeconds()), 
messageHandler);
+    // Other failures, or jobs where metrics fell through for some reason, 
will manifest
+    // as simply job failures.
+    if (!jobSuccess) {
+      throw new RuntimeException(errorMessage(job, messageHandler));
+    }
 
-        // Getting the final state timed out; it may not indicate a failure.
-        // This cancellation may be the second
-        if (finalState == null || finalState == State.RUNNING) {
-          LOG.info(
-              "Dataflow job {} took longer than {} seconds to complete, 
cancelling.",
-              job.getJobId(),
-              options.getTestTimeoutSeconds());
-          job.cancel();
-        }
+    // If there is no reason to immediately fail, run the success matcher.
+    assertThat(job, testPipelineOptions.getOnSuccessMatcher());
+    return job;
+  }
 
-        if (messageHandler.hasSeenError()) {
-          result = Optional.of(false);
-        }
-      } else {
-        job.waitUntilFinish(Duration.standardSeconds(-1), messageHandler);
-        result = checkForPAssertSuccess(job);
+  /**
+   * Return {@code true} if the job succeeded or {@code false} if it 
terminated in any other manner.
+   */
+  private boolean waitForStreamingJobTermination(
+      final DataflowPipelineJob job, ErrorMonitorMessagesHandler 
messageHandler) {
+    // In streaming, there are infinite retries, so rather than timeout
+    // we try to terminate early by polling and canceling if we see
+    // an error message
+    options.getExecutorService().submit(new CancelOnError(job, 
messageHandler));
+
+    // Whether we canceled or not, this gets the final state of the job or 
times out
+    State finalState;
+    try {
+      finalState =
+          job.waitUntilFinish(
+              Duration.standardSeconds(options.getTestTimeoutSeconds()), 
messageHandler);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } catch (InterruptedException e) {
+      Thread.interrupted();
+      return false;
+    }
+
+    // Getting the final state may have timed out; it may not indicate a 
failure.
+    // This cancellation may be the second
+    if (finalState == null || !finalState.isTerminal()) {
+      LOG.info(
+          "Dataflow job {} took longer than {} seconds to complete, 
cancelling.",
+          job.getJobId(),
+          options.getTestTimeoutSeconds());
+      try {
+        job.cancel();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
       }
+      return false;
+    } else {
+      return finalState == State.DONE && !messageHandler.hasSeenError();
+    }
+  }
 
-      if (!result.isPresent()) {
-        if (options.isStreaming()) {
-          LOG.warn(
-              "Dataflow job {} did not output a success or failure metric."
-                  + " In rare situations, some PAsserts may not have run."
-                  + " This is a known limitation of Dataflow in streaming.",
-              job.getJobId());
-        } else {
-          throw new IllegalStateException(
-              String.format(
-                  "Dataflow job %s did not output a success or failure 
metric.", job.getJobId()));
-        }
-      } else if (!result.get()) {
-        throw new AssertionError(
-            Strings.isNullOrEmpty(messageHandler.getErrorMessage())
-                ? String.format(
-                    "Dataflow job %s terminated in state %s but did not return 
a failure reason.",
-                    job.getJobId(), job.getState())
-                : messageHandler.getErrorMessage());
-      } else {
-        assertThat(job, testPipelineOptions.getOnSuccessMatcher());
+  /**
+   * Return {@code true} if the job succeeded or {@code false} if it 
terminated in any other manner.
+   */
+  private boolean waitForBatchJobTermination(
+      DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
+    {
+      try {
+        job.waitUntilFinish(Duration.standardSeconds(-1), messageHandler);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      } catch (InterruptedException e) {
+        Thread.interrupted();
+        return false;
       }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      throw new RuntimeException(e);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+
+      return job.getState() == State.DONE && !messageHandler.hasSeenError();
     }
-    return job;
+  }
+
+  private static String errorMessage(
+      DataflowPipelineJob job, ErrorMonitorMessagesHandler messageHandler) {
+    return Strings.isNullOrEmpty(messageHandler.getErrorMessage())
+        ? String.format(
+            "Dataflow job %s terminated in state %s but did not return a 
failure reason.",
+            job.getJobId(), job.getState())
+        : messageHandler.getErrorMessage();
   }
 
   @VisibleForTesting
@@ -199,19 +223,12 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
    * <p>If the pipeline is not in a failed/cancelled state and no PAsserts 
were used within the
    * pipeline, then this method will state that all PAsserts succeeded.
    *
-   * @return Optional.of(false) if we are certain a PAssert or some other 
critical thing has failed,
-   *     Optional.of(true) if we are certain all PAsserts passed, and 
Optional.absent() if the
-   *     evidence is inconclusive.
+   * @return Optional.of(false) if we are certain a PAssert failed. 
Optional.of(true) if we are
+   *     certain all PAsserts passed. Optional.absent() if the evidence is 
inconclusive, including
+   *     when the pipeline may have failed for other reasons.
    */
   @VisibleForTesting
-  Optional<Boolean> checkForPAssertSuccess(DataflowPipelineJob job) throws 
IOException {
-
-    // If the job failed, this is a definite failure. We only cancel jobs when 
they fail.
-    State state = job.getState();
-    if (state == State.FAILED || state == State.CANCELLED) {
-      LOG.info("Dataflow job {} terminated in failure state {}", 
job.getJobId(), state);
-      return Optional.of(false);
-    }
+  Optional<Boolean> checkForPAssertSuccess(DataflowPipelineJob job) {
 
     JobMetrics metrics = getJobMetrics(job);
     if (metrics == null || metrics.getMetrics() == null) {
@@ -236,8 +253,12 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
     }
 
     if (failures > 0) {
-      LOG.info("Failure result for Dataflow job {}. Found {} success, {} 
failures out of "
-          + "{} expected assertions.", job.getJobId(), successes, failures,
+      LOG.info(
+          "Failure result for Dataflow job {}. Found {} success, {} failures 
out of "
+              + "{} expected assertions.",
+          job.getJobId(),
+          successes,
+          failures,
           expectedNumberOfAssertions);
       return Optional.of(false);
     } else if (successes >= expectedNumberOfAssertions) {
@@ -251,6 +272,16 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
       return Optional.of(true);
     }
 
+    // If the job failed, this is a definite failure. We only cancel jobs when 
they fail.
+    State state = job.getState();
+    if (state == State.FAILED || state == State.CANCELLED) {
+      LOG.info(
+          "Dataflow job {} terminated in failure state {} without reporting a 
failed assertion",
+          job.getJobId(),
+          state);
+      return Optional.absent();
+    }
+
     LOG.info(
         "Inconclusive results for Dataflow job {}."
             + " Found {} success, {} failures out of {} expected assertions.",
@@ -303,8 +334,10 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
       for (JobMessage message : messages) {
         if (message.getMessageImportance() != null
             && message.getMessageImportance().equals("JOB_MESSAGE_ERROR")) {
-          LOG.info("Dataflow job {} threw exception. Failure message was: {}",
-              job.getJobId(), message.getMessageText());
+          LOG.info(
+              "Dataflow job {} threw exception. Failure message was: {}",
+              job.getJobId(),
+              message.getMessageText());
           errorMessage.append(message.getMessageText());
           hasSeenError = true;
         }
@@ -319,4 +352,37 @@ public class TestDataflowRunner extends 
PipelineRunner<DataflowPipelineJob> {
       return errorMessage.toString();
     }
   }
+
+  private static class CancelOnError implements Callable<Void> {
+
+    private final DataflowPipelineJob job;
+    private final ErrorMonitorMessagesHandler messageHandler;
+
+    public CancelOnError(DataflowPipelineJob job, ErrorMonitorMessagesHandler 
messageHandler) {
+      this.job = job;
+      this.messageHandler = messageHandler;
+    }
+
+    @Override
+    public Void call() throws Exception {
+      while (true) {
+        State jobState = job.getState();
+
+        // If we see an error, cancel and note failure
+        if (messageHandler.hasSeenError()) {
+          if (!job.getState().isTerminal()) {
+            job.cancel();
+            LOG.info("Cancelling Dataflow job {}", job.getJobId());
+            return null;
+          }
+        }
+
+        if (jobState.isTerminal()) {
+          return null;
+        }
+
+        Thread.sleep(3000L);
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/1a8a217b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
index 883d344..1c0876a 100644
--- 
a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
+++ 
b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/TestDataflowRunnerTest.java
@@ -124,6 +124,10 @@ public class TestDataflowRunnerTest {
     assertEquals(mockJob, runner.run(p, mockRunner));
   }
 
+  /**
+   * Tests that when a batch job terminates in a failure state even if all 
assertions
+   * passed, it throws an error to that effect.
+   */
   @Test
   public void testRunBatchJobThatFails() throws Exception {
     Pipeline p = TestPipeline.create(options);
@@ -140,12 +144,9 @@ public class TestDataflowRunnerTest {
 
     TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
     when(mockClient.getJobMetrics(anyString()))
-        .thenReturn(generateMockMetricResponse(false /* success */, false /* 
tentative */));
-    try {
-      runner.run(p, mockRunner);
-    } catch (AssertionError expected) {
-      return;
-    }
+        .thenReturn(generateMockMetricResponse(true /* success */, false /* 
tentative */));
+    expectedException.expect(RuntimeException.class);
+    runner.run(p, mockRunner);
     // Note that fail throws an AssertionError which is why it is placed out 
here
     // instead of inside the try-catch block.
     fail("AssertionError expected");
@@ -357,22 +358,6 @@ public class TestDataflowRunnerTest {
   }
 
   /**
-   * Tests that if a streaming pipeline terminates with FAIL that the check 
for PAssert
-   * success is a conclusive failure.
-   */
-  @Test
-  public void testStreamingPipelineFailsIfServiceFails() throws Exception {
-    DataflowPipelineJob job = spy(new DataflowPipelineJob(mockClient, 
"test-job", options, null));
-    Pipeline p = TestPipeline.create(options);
-    PCollection<Integer> pc = p.apply(Create.of(1, 2, 3));
-    PAssert.that(pc).containsInAnyOrder(1, 2, 3);
-
-    TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
-    doReturn(State.FAILED).when(job).getState();
-    assertThat(runner.checkForPAssertSuccess(job), 
equalTo(Optional.of(false)));
-  }
-
-  /**
    * Tests that if a streaming pipeline crash loops for a non-assertion reason 
that the test run
    * throws an {@link AssertionError}.
    *
@@ -411,12 +396,8 @@ public class TestDataflowRunnerTest {
         .thenReturn(generateMockMetricResponse(false /* success */, true /* 
tentative */));
     TestDataflowRunner runner = 
TestDataflowRunner.fromOptionsAndClient(options, mockClient);
 
-    try {
-      runner.run(pipeline, mockRunner);
-    } catch (AssertionError exc) {
-      return;
-    }
-    fail("AssertionError expected");
+    expectedException.expect(RuntimeException.class);
+    runner.run(pipeline, mockRunner);
   }
 
   @Test
@@ -581,7 +562,7 @@ public class TestDataflowRunnerTest {
    * Tests that when a streaming pipeline terminates in FAIL that the {@link
    * TestPipelineOptions#setOnSuccessMatcher(SerializableMatcher) on success 
matcher} is not
    * invoked.
-   */
+   */t ad
   @Test
   public void testStreamingOnSuccessMatcherWhenPipelineFails() throws 
Exception {
     options.setStreaming(true);
@@ -603,8 +584,9 @@ public class TestDataflowRunnerTest {
     when(mockJob.waitUntilFinish(any(Duration.class), 
any(JobMessagesHandler.class)))
         .thenReturn(State.FAILED);
 
+    expectedException.expect(RuntimeException.class);
     runner.run(p, mockRunner);
-    // If the onSuccessMatcher were invoked, it would have crashed here.
+    // If the onSuccessMatcher were invoked, it would have crashed here with 
AssertionError
   }
 
   static class TestSuccessMatcher extends BaseMatcher<PipelineResult> 
implements

Reply via email to