Remove exception suppression from PAssert.SideInputCheckerDoFn
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3669146c Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3669146c Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3669146c Branch: refs/heads/gearpump-runner Commit: 3669146c95e265c0fbde4444ce7d04f6b787cdac Parents: 59fd45b Author: Aviem Zur <aviem...@gmail.com> Authored: Fri Mar 10 23:15:15 2017 +0200 Committer: Aviem Zur <aviem...@gmail.com> Committed: Fri Mar 10 23:15:15 2017 +0200 ---------------------------------------------------------------------- .../main/java/org/apache/beam/sdk/testing/PAssert.java | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/3669146c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index d88c4d6..2596335 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -40,7 +40,6 @@ import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.MapCoder; import org.apache.beam.sdk.coders.VarIntCoder; -import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.transforms.Aggregator; @@ -1102,15 +1101,8 @@ public class PAssert { @ProcessElement public void processElement(ProcessContext c) { - try { - ActualT actualContents = c.sideInput(actual); - doChecks(actualContents, checkerFn, success, failure); - } catch (Throwable t) { - // Suppress exception in streaming - if (!c.getPipelineOptions().as(StreamingOptions.class).isStreaming()) { - throw t; - } - } + ActualT actualContents = c.sideInput(actual); + doChecks(actualContents, checkerFn, success, failure); } }