Repository: beam Updated Branches: refs/heads/master 6df661b0e -> 646cbdb70
[BEAM-1777] In certain circumstances PipelineEnforcement shades pipeline's exceptions. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d96a95c5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d96a95c5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d96a95c5 Branch: refs/heads/master Commit: d96a95c5ed561721dcc4cde16bbd3a3308e6f18e Parents: 6df661b Author: Stas Levin <stasle...@apache.org> Authored: Sun Apr 2 14:09:43 2017 +0300 Committer: Stas Levin <stasle...@apache.org> Committed: Wed Apr 5 10:26:08 2017 +0300 ---------------------------------------------------------------------- .../apache/beam/sdk/testing/TestPipeline.java | 76 +++++++++++++------- .../org/apache/beam/sdk/io/TFRecordIOTest.java | 7 +- 2 files changed, 57 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d96a95c5/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java index 6a8335e..a4ab196 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipeline.java @@ -106,7 +106,7 @@ public class TestPipeline extends Pipeline implements TestRule { protected final Pipeline pipeline; - private boolean runInvoked; + protected boolean runAttempted; private PipelineRunEnforcement(final Pipeline pipeline) { this.pipeline = pipeline; @@ -116,12 +116,14 @@ public class TestPipeline extends Pipeline implements TestRule { enableAutoRunIfMissing = enable; } - protected void afterPipelineExecution() { - runInvoked = true; + protected void beforePipelineExecution() { + runAttempted = true; } - protected void afterTestCompletion() { - if (!runInvoked && enableAutoRunIfMissing) { + protected void afterPipelineExecution() {} + + protected void afterUserCodeFinished() { + if (!runAttempted && enableAutoRunIfMissing) { pipeline.run().waitUntilFinish(); } } @@ -174,27 +176,38 @@ public class TestPipeline extends Pipeline implements TestRule { } private void verifyPipelineExecution() { - final List<TransformHierarchy.Node> pipelineNodes = recordPipelineNodes(pipeline); - if (runVisitedNodes != null && !runVisitedNodes.equals(pipelineNodes)) { - final boolean hasDanglingPAssert = - FluentIterable.from(pipelineNodes) - .filter(Predicates.not(Predicates.in(runVisitedNodes))) - .anyMatch(isPAssertNode); - if (hasDanglingPAssert) { - throw new AbandonedNodeException("The pipeline contains abandoned PAssert(s)."); - } else { - throw new AbandonedNodeException("The pipeline contains abandoned PTransform(s)."); - } - } else if (runVisitedNodes == null && !enableAutoRunIfMissing) { - if (!isEmptyPipeline(pipeline)) { + if (!isEmptyPipeline(pipeline)) { + if (!runAttempted && !enableAutoRunIfMissing) { throw new PipelineRunMissingException( "The pipeline has not been run (runner: " + pipeline.getOptions().getRunner().getSimpleName() + ")"); + + } else { + final List<TransformHierarchy.Node> pipelineNodes = recordPipelineNodes(pipeline); + if (pipelineRunSucceeded() && !visitedAll(pipelineNodes)) { + final boolean hasDanglingPAssert = + FluentIterable.from(pipelineNodes) + .filter(Predicates.not(Predicates.in(runVisitedNodes))) + .anyMatch(isPAssertNode); + if (hasDanglingPAssert) { + throw new AbandonedNodeException("The pipeline contains abandoned PAssert(s)."); + } else { + throw new AbandonedNodeException("The pipeline contains abandoned PTransform(s)."); + } + } } } } + private boolean visitedAll(final List<TransformHierarchy.Node> pipelineNodes) { + return runVisitedNodes.equals(pipelineNodes); + } + + private boolean pipelineRunSucceeded() { + return runVisitedNodes != null; + } + @Override protected void afterPipelineExecution() { runVisitedNodes = recordPipelineNodes(pipeline); @@ -202,8 +215,8 @@ public class TestPipeline extends Pipeline implements TestRule { } @Override - protected void afterTestCompletion() { - super.afterTestCompletion(); + protected void afterUserCodeFinished() { + super.afterUserCodeFinished(); verifyPipelineExecution(); } } @@ -283,9 +296,19 @@ public class TestPipeline extends Pipeline implements TestRule { @Override public void evaluate() throws Throwable { + setDeducedEnforcementLevel(); + + // statement.evaluate() essentially runs the user code contained in the unit test at hand. + // Exceptions thrown during the execution of the user's test code will propagate here, + // unless the user explicitly handles them with a "catch" clause in his code. If the + // exception is handled by a user's "catch" clause, is does not interrupt the flow and + // we move on to invoking the configured enforcements. + // If the user does not handle a thrown exception, it will propagate here and interrupt + // the flow, preventing the enforcement(s) from being activated. + // The motivation for this is avoiding enforcements over faulty pipelines. statement.evaluate(); - enforcement.get().afterTestCompletion(); + enforcement.get().afterUserCodeFinished(); } }; } @@ -301,8 +324,10 @@ public class TestPipeline extends Pipeline implements TestRule { "Is your TestPipeline declaration missing a @Rule annotation? Usage: " + "@Rule public final transient TestPipeline pipeline = TestPipeline.create();"); + final PipelineResult pipelineResult; try { - return super.run(); + enforcement.get().beforePipelineExecution(); + pipelineResult = super.run(); } catch (RuntimeException exc) { Throwable cause = exc.getCause(); if (cause instanceof AssertionError) { @@ -310,9 +335,12 @@ public class TestPipeline extends Pipeline implements TestRule { } else { throw exc; } - } finally { - enforcement.get().afterPipelineExecution(); } + + // If we reach this point, the pipeline has been run and no exceptions have been thrown during + // its execution. + enforcement.get().afterPipelineExecution(); + return pipelineResult; } /** http://git-wip-us.apache.org/repos/asf/beam/blob/d96a95c5/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java index 70620fb..94530ef 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TFRecordIOTest.java @@ -59,6 +59,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.ExpectedException; +import org.junit.rules.RuleChain; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -96,14 +97,16 @@ public class TFRecordIOTest { private static Path tempFolder; - @Rule + public TestPipeline p = TestPipeline.create(); + public ExpectedException expectedException = ExpectedException.none(); + @Rule public TestPipeline p2 = TestPipeline.create(); @Rule - public ExpectedException expectedException = ExpectedException.none(); + public RuleChain ruleChain = RuleChain.outerRule(expectedException).around(p); @BeforeClass public static void setupClass() throws IOException {