This is an automated email from the ASF dual-hosted git repository. lcwik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 360c4fe [BEAM-9397] Pass all but output receiver parameters to start bundle/finish bundle methods. new a167255 Merge pull request #10989 from lukecwik/beam9397 360c4fe is described below commit 360c4fe092413bbe9fb16ebfbe2d2e39fe31cb73 Author: Luke Cwik <lc...@google.com> AuthorDate: Thu Feb 27 10:05:00 2020 -0800 [BEAM-9397] Pass all but output receiver parameters to start bundle/finish bundle methods. The remaining work is covered by BEAM-1287. --- .../construction/SplittableParDoNaiveBounded.java | 14 +- .../apache/beam/runners/core/SimpleDoFnRunner.java | 308 +++------------------ .../core/SplittableParDoViaKeyedWorkItems.java | 12 +- .../java/org/apache/beam/sdk/transforms/DoFn.java | 6 + .../org/apache/beam/sdk/transforms/DoFnTester.java | 10 + .../sdk/transforms/reflect/DoFnSignatures.java | 8 +- .../sdk/transforms/reflect/DoFnSignaturesTest.java | 17 +- .../apache/beam/fn/harness/FnApiDoFnRunner.java | 10 + 8 files changed, 113 insertions(+), 272 deletions(-) diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java index 30a44da..92a443a 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java @@ -146,6 +146,11 @@ public class SplittableParDoNaiveBounded { } @Override + public PipelineOptions pipelineOptions() { + return c.getPipelineOptions(); + } + + @Override public String getErrorContext() { return "SplittableParDoNaiveBounded/StartBundle"; } @@ -200,19 +205,24 @@ public class SplittableParDoNaiveBounded { public void output( @Nullable OutputT output, Instant timestamp, BoundedWindow window) { throw new UnsupportedOperationException( - "Output from FinishBundle for SDF is not supported"); + "Output from FinishBundle for SDF is not supported in naive implementation"); } @Override public <T> void output( TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) { throw new UnsupportedOperationException( - "Output from FinishBundle for SDF is not supported"); + "Output from FinishBundle for SDF is not supported in naive implementation"); } }; } @Override + public PipelineOptions pipelineOptions() { + return c.getPipelineOptions(); + } + + @Override public String getErrorContext() { return "SplittableParDoNaiveBounded/StartBundle"; } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 71efa12..a37644a 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -168,7 +168,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out public void startBundle() { // This can contain user code. Wrap it in case it throws an exception. try { - invoker.invokeStartBundle(new DoFnStartBundleContext()); + invoker.invokeStartBundle(new DoFnStartBundleArgumentProvider()); } catch (Throwable t) { // Exception in user code. throw wrapUserCodeException(t); @@ -231,7 +231,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out public void finishBundle() { // This can contain user code. Wrap it in case it throws an exception. try { - invoker.invokeFinishBundle(new DoFnFinishBundleContext()); + invoker.invokeFinishBundle(new DoFnFinishBundleArgumentProvider()); } catch (Throwable t) { // Exception in user code. throw wrapUserCodeException(t); @@ -258,298 +258,80 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out outputManager.output(tag, windowedElem); } - /** A concrete implementation of {@link DoFn.StartBundleContext}. */ - private class DoFnStartBundleContext extends DoFn<InputT, OutputT>.StartBundleContext - implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { - private DoFnStartBundleContext() { - fn.super(); - } + /** An {@link DoFnInvoker.ArgumentProvider} for {@link DoFn.StartBundle @StartBundle}. */ + private class DoFnStartBundleArgumentProvider + extends DoFnInvoker.BaseArgumentProvider<InputT, OutputT> { + /** A concrete implementation of {@link DoFn.StartBundleContext}. */ + private class Context extends DoFn<InputT, OutputT>.StartBundleContext { + private Context() { + fn.super(); + } - @Override - public PipelineOptions getPipelineOptions() { - return options; + @Override + public PipelineOptions getPipelineOptions() { + return options; + } } - @Override - public BoundedWindow window() { - throw new UnsupportedOperationException( - "Cannot access window outside of @ProcessElement and @OnTimer methods."); - } - - @Override - public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Cannot access paneInfo outside of @ProcessElement methods."); - } + private final Context context = new Context(); @Override public PipelineOptions pipelineOptions() { - return getPipelineOptions(); + return options; } @Override public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { - return this; - } - - @Override - public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext( - DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Cannot access FinishBundleContext outside of @FinishBundle method."); - } - - @Override - public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Cannot access ProcessContext outside of @ProcessElement method."); - } - - @Override - public InputT element(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Element parameters are not supported outside of @ProcessElement method."); - } - - @Override - public InputT sideInput(String tagId) { - throw new UnsupportedOperationException( - "SideInput parameters are not supported outside of @ProcessElement method."); - } - - @Override - public Object schemaElement(int index) { - throw new UnsupportedOperationException( - "Element parameters are not supported outside of @ProcessElement method."); - } - - @Override - public Instant timestamp(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Cannot access timestamp outside of @ProcessElement method."); - } - - @Override - public String timerId(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException("Cannot access timerId outside of @OnTimer method."); - } - - @Override - public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Cannot access time domain outside of @ProcessTimer method."); + return context; } @Override - public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Cannot access output receiver outside of @ProcessElement method."); - } - - @Override - public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Cannot access output receiver outside of @ProcessElement method."); - } - - @Override - public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Cannot access output receiver outside of @ProcessElement method."); - } - - @Override - public Object restriction() { - throw new UnsupportedOperationException("@Restriction parameters are not supported."); - } - - @Override - public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Cannot access OnTimerContext outside of @OnTimer methods."); - } - - @Override - public RestrictionTracker<?, ?> restrictionTracker() { - throw new UnsupportedOperationException( - "Cannot access RestrictionTracker outside of @ProcessElement method."); - } - - @Override - public State state(String stateId, boolean alwaysFetched) { - throw new UnsupportedOperationException( - "Cannot access state outside of @ProcessElement and @OnTimer methods."); - } - - @Override - public Timer timer(String timerId) { - throw new UnsupportedOperationException( - "Cannot access timers outside of @ProcessElement and @OnTimer methods."); - } - - @Override - public TimerMap timerFamily(String tagId) { - throw new UnsupportedOperationException( - "Cannot access timer family outside of @ProcessElement and @OnTimer methods"); - } - - @Override - public BundleFinalizer bundleFinalizer() { - throw new UnsupportedOperationException( - "Bundle finalization is not supported in non-portable pipelines."); + public String getErrorContext() { + return "SimpleDoFnRunner/StartBundle"; } } - /** B A concrete implementation of {@link DoFn.FinishBundleContext}. */ - private class DoFnFinishBundleContext extends DoFn<InputT, OutputT>.FinishBundleContext - implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { - private DoFnFinishBundleContext() { - fn.super(); - } + /** An {@link DoFnInvoker.ArgumentProvider} for {@link DoFn.StartBundle @StartBundle}. */ + private class DoFnFinishBundleArgumentProvider + extends DoFnInvoker.BaseArgumentProvider<InputT, OutputT> { + /** A concrete implementation of {@link DoFn.FinishBundleContext}. */ + private class Context extends DoFn<InputT, OutputT>.FinishBundleContext { + private Context() { + fn.super(); + } - @Override - public PipelineOptions getPipelineOptions() { - return options; - } + @Override + public PipelineOptions getPipelineOptions() { + return options; + } - @Override - public BoundedWindow window() { - throw new UnsupportedOperationException( - "Cannot access window outside of @ProcessElement and @OnTimer methods."); - } + @Override + public void output(OutputT output, Instant timestamp, BoundedWindow window) { + output(mainOutputTag, output, timestamp, window); + } - @Override - public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Cannot access paneInfo outside of @ProcessElement methods."); + @Override + public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) { + outputWindowedValue(tag, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING)); + } } - @Override - public PipelineOptions pipelineOptions() { - return getPipelineOptions(); - } + private final Context context = new Context(); @Override - public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Cannot access StartBundleContext outside of @StartBundle method."); + public PipelineOptions pipelineOptions() { + return options; } @Override public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext( DoFn<InputT, OutputT> doFn) { - return this; + return context; } @Override - public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Cannot access ProcessContext outside of @ProcessElement method."); - } - - @Override - public InputT element(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Cannot access element outside of @ProcessElement method."); - } - - @Override - public InputT sideInput(String tagId) { - throw new UnsupportedOperationException( - "Cannot access sideInput outside of @ProcessElement method."); - } - - @Override - public Object schemaElement(int index) { - throw new UnsupportedOperationException( - "Cannot access element outside of @ProcessElement method."); - } - - @Override - public Instant timestamp(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Cannot access timestamp outside of @ProcessElement method."); - } - - @Override - public String timerId(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Cannot access timerId as parameter outside of @OnTimer method."); - } - - @Override - public TimeDomain timeDomain(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Cannot access time domain outside of @ProcessTimer method."); - } - - @Override - public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Cannot access outputReceiver in @FinishBundle method."); - } - - @Override - public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Cannot access outputReceiver in @FinishBundle method."); - } - - @Override - public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Cannot access outputReceiver in @FinishBundle method."); - } - - @Override - public Object restriction() { - throw new UnsupportedOperationException("@Restriction parameters are not supported."); - } - - @Override - public DoFn<InputT, OutputT>.OnTimerContext onTimerContext(DoFn<InputT, OutputT> doFn) { - throw new UnsupportedOperationException( - "Cannot access OnTimerContext outside of @OnTimer methods."); - } - - @Override - public RestrictionTracker<?, ?> restrictionTracker() { - throw new UnsupportedOperationException( - "Cannot access RestrictionTracker outside of @ProcessElement method."); - } - - @Override - public State state(String stateId, boolean alwaysFetched) { - throw new UnsupportedOperationException( - "Cannot access state outside of @ProcessElement and @OnTimer methods."); - } - - @Override - public Timer timer(String timerId) { - throw new UnsupportedOperationException( - "Cannot access timers outside of @ProcessElement and @OnTimer methods."); - } - - @Override - public TimerMap timerFamily(String tagId) { - throw new UnsupportedOperationException( - "Cannot access timerFamily outside of @ProcessElement and @OnTimer methods."); - } - - @Override - public void output(OutputT output, Instant timestamp, BoundedWindow window) { - output(mainOutputTag, output, timestamp, window); - } - - @Override - public <T> void output(TupleTag<T> tag, T output, Instant timestamp, BoundedWindow window) { - outputWindowedValue(tag, WindowedValue.of(output, timestamp, window, PaneInfo.NO_FIRING)); - } - - @Override - public BundleFinalizer bundleFinalizer() { - throw new UnsupportedOperationException( - "Bundle finalization is not supported in non-portable pipelines."); + public String getErrorContext() { + return "SimpleDoFnRunner/FinishBundle"; } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java index dc795a55..28277f9 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java @@ -432,6 +432,11 @@ public class SplittableParDoViaKeyedWorkItems { } @Override + public PipelineOptions pipelineOptions() { + return baseContext.getPipelineOptions(); + } + + @Override public String getErrorContext() { return "SplittableParDoViaKeyedWorkItems/StartBundle"; } @@ -464,13 +469,18 @@ public class SplittableParDoViaKeyedWorkItems { private void throwUnsupportedOutput() { throw new UnsupportedOperationException( String.format( - "Splittable DoFn can only output from @%s", + "KWI Splittable DoFn can only output from @%s", ProcessElement.class.getSimpleName())); } }; } @Override + public PipelineOptions pipelineOptions() { + return baseContext.getPipelineOptions(); + } + + @Override public String getErrorContext() { return "SplittableParDoViaKeyedWorkItems/FinishBundle"; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 9500599..d00950d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -592,6 +592,8 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * <ul> * <li>If one of the parameters is of type {@link DoFn.StartBundleContext}, then it will be * passed a context object for the current execution. + * <li>If one of the parameters is of type {@link PipelineOptions}, then it will be passed the + * options for the current pipeline. * <li>If one of the parameters is of type {@link BundleFinalizer}, then it will be passed a * mechanism to register a callback that will be invoked after the runner successfully * commits the output of this bundle. See <a @@ -810,11 +812,15 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD * <ul> * <li>If one of the parameters is of type {@link DoFn.FinishBundleContext}, then it will be * passed a context object for the current execution. + * <li>If one of the parameters is of type {@link PipelineOptions}, then it will be passed the + * options for the current pipeline. * <li>If one of the parameters is of type {@link BundleFinalizer}, then it will be passed a * mechanism to register a callback that will be invoked after the runner successfully * commits the output of this bundle. See <a * href="https://s.apache.org/beam-finalizing-bundles">Apache Beam Portability API: How to * Finalize Bundles</a> for further details. + * <li>TODO(BEAM-1287): Add support for an {@link OutputReceiver} and {@link + * MultiOutputReceiver} that can output to a window. * </ul> * * <p>Note that {@link FinishBundle @FinishBundle} is invoked before the runner commits the output diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index ba6bc17..112046a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -482,6 +482,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { } @Override + public PipelineOptions pipelineOptions() { + return options; + } + + @Override public String getErrorContext() { return "DoFnTester/StartBundle"; } @@ -510,6 +515,11 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { } @Override + public PipelineOptions pipelineOptions() { + return options; + } + + @Override public String getErrorContext() { return "DoFnTester/FinishBundle"; } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java index d34baba..fb76330 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java @@ -132,11 +132,15 @@ public class DoFnSignatures { private static final ImmutableList<Class<? extends Parameter>> ALLOWED_START_BUNDLE_PARAMETERS = ImmutableList.of( - Parameter.StartBundleContextParameter.class, Parameter.BundleFinalizerParameter.class); + Parameter.PipelineOptionsParameter.class, + Parameter.StartBundleContextParameter.class, + Parameter.BundleFinalizerParameter.class); private static final ImmutableList<Class<? extends Parameter>> ALLOWED_FINISH_BUNDLE_PARAMETERS = ImmutableList.of( - Parameter.FinishBundleContextParameter.class, Parameter.BundleFinalizerParameter.class); + Parameter.PipelineOptionsParameter.class, + Parameter.FinishBundleContextParameter.class, + Parameter.BundleFinalizerParameter.class); private static final ImmutableList<Class<? extends Parameter>> ALLOWED_ON_TIMER_PARAMETERS = ImmutableList.of( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java index 24e581b..1414371 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java @@ -53,6 +53,7 @@ import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.state.WatermarkHoldState; import org.apache.beam.sdk.testing.SerializableMatchers; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.BundleFinalizerParameter; @@ -362,13 +363,17 @@ public class DoFnSignaturesTest { @StartBundle public void startBundle( - StartBundleContext context, BundleFinalizer bundleFinalizer) {} + StartBundleContext context, + BundleFinalizer bundleFinalizer, + PipelineOptions options) {} }.getClass()); - assertThat(sig.startBundle().extraParameters().size(), equalTo(2)); + assertThat(sig.startBundle().extraParameters().size(), equalTo(3)); assertThat( sig.startBundle().extraParameters().get(0), instanceOf(StartBundleContextParameter.class)); assertThat( sig.startBundle().extraParameters().get(1), instanceOf(BundleFinalizerParameter.class)); + assertThat( + sig.startBundle().extraParameters().get(2), instanceOf(PipelineOptionsParameter.class)); } @Test @@ -397,14 +402,18 @@ public class DoFnSignaturesTest { @FinishBundle public void finishBundle( - FinishBundleContext context, BundleFinalizer bundleFinalizer) {} + FinishBundleContext context, + BundleFinalizer bundleFinalizer, + PipelineOptions pipelineOptions) {} }.getClass()); - assertThat(sig.finishBundle().extraParameters().size(), equalTo(2)); + assertThat(sig.finishBundle().extraParameters().size(), equalTo(3)); assertThat( sig.finishBundle().extraParameters().get(0), instanceOf(FinishBundleContextParameter.class)); assertThat( sig.finishBundle().extraParameters().get(1), instanceOf(BundleFinalizerParameter.class)); + assertThat( + sig.finishBundle().extraParameters().get(2), instanceOf(PipelineOptionsParameter.class)); } @Test diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index d0f70d5..224abf1 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -945,6 +945,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, OutputT> { } @Override + public PipelineOptions pipelineOptions() { + return pipelineOptions; + } + + @Override public BundleFinalizer bundleFinalizer() { return bundleFinalizer; } @@ -992,6 +997,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, OutputT> { } @Override + public PipelineOptions pipelineOptions() { + return pipelineOptions; + } + + @Override public BundleFinalizer bundleFinalizer() { return bundleFinalizer; }