Repository: beam
Updated Branches:
  refs/heads/master 6df661b0e -> 646cbdb70


[BEAM-1777] In certain circumstances PipelineEnforcement shades pipeline's 
exceptions.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d96a95c5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d96a95c5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d96a95c5

Branch: refs/heads/master
Commit: d96a95c5ed561721dcc4cde16bbd3a3308e6f18e
Parents: 6df661b
Author: Stas Levin <stasle...@apache.org>
Authored: Sun Apr 2 14:09:43 2017 +0300
Committer: Stas Levin <stasle...@apache.org>
Committed: Wed Apr 5 10:26:08 2017 +0300

----------------------------------------------------------------------
 .../apache/beam/sdk/testing/TestPipeline.java   | 76 +++++++++++++-------
 .../org/apache/beam/sdk/io/TFRecordIOTest.java  |  7 +-
 2 files changed, 57 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d96a95c5/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
index 6a8335e..a4ab196 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java
@@ -106,7 +106,7 @@ public class TestPipeline extends Pipeline implements 
TestRule {
 
     protected final Pipeline pipeline;
 
-    private boolean runInvoked;
+    protected boolean runAttempted;
 
     private PipelineRunEnforcement(final Pipeline pipeline) {
       this.pipeline = pipeline;
@@ -116,12 +116,14 @@ public class TestPipeline extends Pipeline implements 
TestRule {
       enableAutoRunIfMissing = enable;
     }
 
-    protected void afterPipelineExecution() {
-      runInvoked = true;
+    protected void beforePipelineExecution() {
+      runAttempted = true;
     }
 
-    protected void afterTestCompletion() {
-      if (!runInvoked && enableAutoRunIfMissing) {
+    protected void afterPipelineExecution() {}
+
+    protected void afterUserCodeFinished() {
+      if (!runAttempted && enableAutoRunIfMissing) {
         pipeline.run().waitUntilFinish();
       }
     }
@@ -174,27 +176,38 @@ public class TestPipeline extends Pipeline implements 
TestRule {
     }
 
     private void verifyPipelineExecution() {
-      final List<TransformHierarchy.Node> pipelineNodes = 
recordPipelineNodes(pipeline);
-      if (runVisitedNodes != null && !runVisitedNodes.equals(pipelineNodes)) {
-        final boolean hasDanglingPAssert =
-            FluentIterable.from(pipelineNodes)
-                .filter(Predicates.not(Predicates.in(runVisitedNodes)))
-                .anyMatch(isPAssertNode);
-        if (hasDanglingPAssert) {
-          throw new AbandonedNodeException("The pipeline contains abandoned 
PAssert(s).");
-        } else {
-          throw new AbandonedNodeException("The pipeline contains abandoned 
PTransform(s).");
-        }
-      } else if (runVisitedNodes == null && !enableAutoRunIfMissing) {
-        if (!isEmptyPipeline(pipeline)) {
+      if (!isEmptyPipeline(pipeline)) {
+        if (!runAttempted && !enableAutoRunIfMissing) {
           throw new PipelineRunMissingException(
               "The pipeline has not been run (runner: "
                   + pipeline.getOptions().getRunner().getSimpleName()
                   + ")");
+
+        } else {
+          final List<TransformHierarchy.Node> pipelineNodes = 
recordPipelineNodes(pipeline);
+          if (pipelineRunSucceeded() && !visitedAll(pipelineNodes)) {
+            final boolean hasDanglingPAssert =
+                FluentIterable.from(pipelineNodes)
+                              
.filter(Predicates.not(Predicates.in(runVisitedNodes)))
+                              .anyMatch(isPAssertNode);
+            if (hasDanglingPAssert) {
+              throw new AbandonedNodeException("The pipeline contains 
abandoned PAssert(s).");
+            } else {
+              throw new AbandonedNodeException("The pipeline contains 
abandoned PTransform(s).");
+            }
+          }
         }
       }
     }
 
+    private boolean visitedAll(final List<TransformHierarchy.Node> 
pipelineNodes) {
+      return runVisitedNodes.equals(pipelineNodes);
+    }
+
+    private boolean pipelineRunSucceeded() {
+      return runVisitedNodes != null;
+    }
+
     @Override
     protected void afterPipelineExecution() {
       runVisitedNodes = recordPipelineNodes(pipeline);
@@ -202,8 +215,8 @@ public class TestPipeline extends Pipeline implements 
TestRule {
     }
 
     @Override
-    protected void afterTestCompletion() {
-      super.afterTestCompletion();
+    protected void afterUserCodeFinished() {
+      super.afterUserCodeFinished();
       verifyPipelineExecution();
     }
   }
@@ -283,9 +296,19 @@ public class TestPipeline extends Pipeline implements 
TestRule {
 
       @Override
       public void evaluate() throws Throwable {
+
         setDeducedEnforcementLevel();
+
+        // statement.evaluate() essentially runs the user code contained in 
the unit test at hand.
+        // Exceptions thrown during the execution of the user's test code will 
propagate here,
+        // unless the user explicitly handles them with a "catch" clause in 
his code. If the
+        // exception is handled by a user's "catch" clause, is does not 
interrupt the flow and
+        // we move on to invoking the configured enforcements.
+        // If the user does not handle a thrown exception, it will propagate 
here and interrupt
+        // the flow, preventing the enforcement(s) from being activated.
+        // The motivation for this is avoiding enforcements over faulty 
pipelines.
         statement.evaluate();
-        enforcement.get().afterTestCompletion();
+        enforcement.get().afterUserCodeFinished();
       }
     };
   }
@@ -301,8 +324,10 @@ public class TestPipeline extends Pipeline implements 
TestRule {
         "Is your TestPipeline declaration missing a @Rule annotation? Usage: "
         + "@Rule public final transient TestPipeline pipeline = 
TestPipeline.create();");
 
+    final PipelineResult pipelineResult;
     try {
-      return super.run();
+      enforcement.get().beforePipelineExecution();
+      pipelineResult = super.run();
     } catch (RuntimeException exc) {
       Throwable cause = exc.getCause();
       if (cause instanceof AssertionError) {
@@ -310,9 +335,12 @@ public class TestPipeline extends Pipeline implements 
TestRule {
       } else {
         throw exc;
       }
-    } finally {
-      enforcement.get().afterPipelineExecution();
     }
+
+    // If we reach this point, the pipeline has been run and no exceptions 
have been thrown during
+    // its execution.
+    enforcement.get().afterPipelineExecution();
+    return pipelineResult;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/d96a95c5/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
index 70620fb..94530ef 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java
@@ -59,6 +59,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.ExpectedException;
+import org.junit.rules.RuleChain;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
@@ -96,14 +97,16 @@ public class TFRecordIOTest {
 
   private static Path tempFolder;
 
-  @Rule
+
   public TestPipeline p = TestPipeline.create();
 
+  public ExpectedException expectedException = ExpectedException.none();
+
   @Rule
   public TestPipeline p2 = TestPipeline.create();
 
   @Rule
-  public ExpectedException expectedException = ExpectedException.none();
+  public RuleChain ruleChain = 
RuleChain.outerRule(expectedException).around(p);
 
   @BeforeClass
   public static void setupClass() throws IOException {

Reply via email to