This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch revert-25245-beam13015.5 in repository https://gitbox.apache.org/repos/asf/beam.git
commit 8e9279c8a88107d863a8b698f074b0ec8cf8b573 Author: Yi Hu <huu...@gmail.com> AuthorDate: Mon Feb 6 10:27:22 2023 -0500 Revert "Optimize to use cached output receiver instead of creating one on DoFn invocation #21250 (#25245)" This reverts commit b1c9d8aec07ce72e946bd349eb4345417efbfc9c. --- .../beam/sdk/transforms/DoFnOutputReceivers.java | 2 +- .../apache/beam/fn/harness/FnApiDoFnRunner.java | 310 ++------------------- 2 files changed, 17 insertions(+), 295 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java index 27fbb9754ec..a17264da35d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java @@ -115,7 +115,7 @@ public class DoFnOutputReceivers { checkState(outputCoder != null, "No output tag for " + tag); checkState( outputCoder instanceof SchemaCoder, - "Output with tag " + tag + " must have a schema in order to call getRowReceiver"); + "Output with tag " + tag + " must have a schema in order to call " + " getRowReceiver"); return DoFnOutputReceivers.rowReceiver(context, tag, (SchemaCoder<T>) outputCoder); } } diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index 13d85d27006..2b449e0200b 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -85,6 +85,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer; import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.DoFnOutputReceivers; import org.apache.beam.sdk.transforms.DoFnSchemaInformation; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; @@ -2412,7 +2413,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator /** Base implementation that does not override methods which need to be window aware. */ private abstract class ProcessBundleContextBase extends DoFn<InputT, OutputT>.ProcessContext - implements DoFnInvoker.ArgumentProvider<InputT, OutputT>, OutputReceiver<OutputT> { + implements DoFnInvoker.ArgumentProvider<InputT, OutputT> { private ProcessBundleContextBase() { doFn.super(); @@ -2477,112 +2478,17 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator @Override public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) { - return this; + return DoFnOutputReceivers.windowedReceiver(this, null); } - private final OutputReceiver<Row> mainRowOutputReceiver = - mainOutputSchemaCoder == null - ? null - : new OutputReceiver<Row>() { - private final SerializableFunction<Row, OutputT> fromRowFunction = - mainOutputSchemaCoder.getFromRowFunction(); - - @Override - public void output(Row output) { - ProcessBundleContextBase.this.outputWithTimestamp( - fromRowFunction.apply(output), currentElement.getTimestamp()); - } - - @Override - public void outputWithTimestamp(Row output, Instant timestamp) { - ProcessBundleContextBase.this.outputWithTimestamp( - fromRowFunction.apply(output), timestamp); - } - }; - @Override public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) { - checkState( - mainOutputSchemaCoder != null, - "Output with tag " - + mainOutputTag - + " must have a schema in order to call getRowReceiver"); - return mainRowOutputReceiver; - } - - /** A {@link MultiOutputReceiver} which caches created instances to re-use across bundles. */ - private final MultiOutputReceiver taggedOutputReceiver = - new MultiOutputReceiver() { - private final Map<TupleTag<?>, OutputReceiver<?>> taggedOutputReceivers = new HashMap<>(); - private final Map<TupleTag<?>, OutputReceiver<Row>> taggedRowReceivers = new HashMap<>(); - - private <T> OutputReceiver<T> createTaggedOutputReceiver(TupleTag<T> tag) { - if (tag == null || mainOutputTag.equals(tag)) { - return (OutputReceiver<T>) ProcessBundleContextBase.this; - } - return new OutputReceiver<T>() { - @Override - public void output(T output) { - ProcessBundleContextBase.this.outputWithTimestamp( - tag, output, currentElement.getTimestamp()); - } - - @Override - public void outputWithTimestamp(T output, Instant timestamp) { - ProcessBundleContextBase.this.outputWithTimestamp( - tag, output, currentElement.getTimestamp()); - } - }; - } - - private <T> OutputReceiver<Row> createTaggedRowReceiver(TupleTag<T> tag) { - if (tag == null || mainOutputTag.equals(tag)) { - checkState( - mainOutputSchemaCoder != null, - "Output with tag " - + mainOutputTag - + " must have a schema in order to call getRowReceiver"); - return mainRowOutputReceiver; - } - - Coder<T> outputCoder = (Coder<T>) outputCoders.get(tag); - checkState(outputCoder != null, "No output tag for " + tag); - checkState( - outputCoder instanceof SchemaCoder, - "Output with tag " + tag + " must have a schema in order to call getRowReceiver"); - return new OutputReceiver<Row>() { - private SerializableFunction<Row, T> fromRowFunction = - ((SchemaCoder) outputCoder).getFromRowFunction(); - - @Override - public void output(Row output) { - ProcessBundleContextBase.this.outputWithTimestamp( - tag, fromRowFunction.apply(output), currentElement.getTimestamp()); - } - - @Override - public void outputWithTimestamp(Row output, Instant timestamp) { - ProcessBundleContextBase.this.outputWithTimestamp( - tag, fromRowFunction.apply(output), timestamp); - } - }; - } - - @Override - public <T> OutputReceiver<T> get(TupleTag<T> tag) { - return (OutputReceiver<T>) - taggedOutputReceivers.computeIfAbsent(tag, this::createTaggedOutputReceiver); - } - - @Override - public <T> OutputReceiver<Row> getRowReceiver(TupleTag<T> tag) { - return taggedRowReceivers.computeIfAbsent(tag, this::createTaggedRowReceiver); - } - }; + return DoFnOutputReceivers.rowReceiver(this, null, mainOutputSchemaCoder); + } @Override public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) { - return taggedOutputReceiver; + return DoFnOutputReceivers.windowedMultiReceiver(this, outputCoders); } @Override @@ -2657,8 +2563,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator * DoFn.OnWindowExpiration @OnWindowExpiration}. */ private class OnWindowExpirationContext<K> extends BaseArgumentProvider<InputT, OutputT> { - private class Context extends DoFn<InputT, OutputT>.OnWindowExpirationContext - implements OutputReceiver<OutputT> { + private class Context extends DoFn<InputT, OutputT>.OnWindowExpirationContext { private Context() { doFn.super(); } @@ -2766,108 +2671,17 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator @Override public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) { - return context; + return DoFnOutputReceivers.windowedReceiver(context, null); } - private final OutputReceiver<Row> mainRowOutputReceiver = - mainOutputSchemaCoder == null - ? null - : new OutputReceiver<Row>() { - private final SerializableFunction<Row, OutputT> fromRowFunction = - mainOutputSchemaCoder.getFromRowFunction(); - - @Override - public void output(Row output) { - context.outputWithTimestamp( - fromRowFunction.apply(output), currentElement.getTimestamp()); - } - - @Override - public void outputWithTimestamp(Row output, Instant timestamp) { - context.outputWithTimestamp(fromRowFunction.apply(output), timestamp); - } - }; - @Override public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) { - checkState( - mainOutputSchemaCoder != null, - "Output with tag " - + mainOutputTag - + " must have a schema in order to call getRowReceiver"); - return mainRowOutputReceiver; - } - - /** A {@link MultiOutputReceiver} which caches created instances to re-use across bundles. */ - private final MultiOutputReceiver taggedOutputReceiver = - new MultiOutputReceiver() { - private final Map<TupleTag<?>, OutputReceiver<?>> taggedOutputReceivers = new HashMap<>(); - private final Map<TupleTag<?>, OutputReceiver<Row>> taggedRowReceivers = new HashMap<>(); - - private <T> OutputReceiver<T> createTaggedOutputReceiver(TupleTag<T> tag) { - if (tag == null || mainOutputTag.equals(tag)) { - return (OutputReceiver<T>) context; - } - return new OutputReceiver<T>() { - @Override - public void output(T output) { - context.outputWithTimestamp(tag, output, currentElement.getTimestamp()); - } - - @Override - public void outputWithTimestamp(T output, Instant timestamp) { - context.outputWithTimestamp(tag, output, currentElement.getTimestamp()); - } - }; - } - - private <T> OutputReceiver<Row> createTaggedRowReceiver(TupleTag<T> tag) { - if (tag == null || mainOutputTag.equals(tag)) { - checkState( - mainOutputSchemaCoder != null, - "Output with tag " - + mainOutputTag - + " must have a schema in order to call getRowReceiver"); - return mainRowOutputReceiver; - } - - Coder<T> outputCoder = (Coder<T>) outputCoders.get(tag); - checkState(outputCoder != null, "No output tag for " + tag); - checkState( - outputCoder instanceof SchemaCoder, - "Output with tag " + tag + " must have a schema in order to call getRowReceiver"); - return new OutputReceiver<Row>() { - private SerializableFunction<Row, T> fromRowFunction = - ((SchemaCoder) outputCoder).getFromRowFunction(); - - @Override - public void output(Row output) { - context.outputWithTimestamp( - tag, fromRowFunction.apply(output), currentElement.getTimestamp()); - } - - @Override - public void outputWithTimestamp(Row output, Instant timestamp) { - context.outputWithTimestamp(tag, fromRowFunction.apply(output), timestamp); - } - }; - } - - @Override - public <T> OutputReceiver<T> get(TupleTag<T> tag) { - return (OutputReceiver<T>) - taggedOutputReceivers.computeIfAbsent(tag, this::createTaggedOutputReceiver); - } - - @Override - public <T> OutputReceiver<Row> getRowReceiver(TupleTag<T> tag) { - return taggedRowReceivers.computeIfAbsent(tag, this::createTaggedRowReceiver); - } - }; + return DoFnOutputReceivers.rowReceiver(context, null, mainOutputSchemaCoder); + } @Override public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) { - return taggedOutputReceiver; + return DoFnOutputReceivers.windowedMultiReceiver(context); } @Override @@ -2902,8 +2716,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator /** Provides arguments for a {@link DoFnInvoker} for {@link DoFn.OnTimer @OnTimer}. */ private class OnTimerContext<K> extends BaseArgumentProvider<InputT, OutputT> { - private class Context extends DoFn<InputT, OutputT>.OnTimerContext - implements OutputReceiver<OutputT> { + private class Context extends DoFn<InputT, OutputT>.OnTimerContext { private Context() { doFn.super(); } @@ -3027,108 +2840,17 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator @Override public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) { - return context; + return DoFnOutputReceivers.windowedReceiver(context, null); } - private final OutputReceiver<Row> mainRowOutputReceiver = - mainOutputSchemaCoder == null - ? null - : new OutputReceiver<Row>() { - private final SerializableFunction<Row, OutputT> fromRowFunction = - mainOutputSchemaCoder.getFromRowFunction(); - - @Override - public void output(Row output) { - context.outputWithTimestamp( - fromRowFunction.apply(output), currentElement.getTimestamp()); - } - - @Override - public void outputWithTimestamp(Row output, Instant timestamp) { - context.outputWithTimestamp(fromRowFunction.apply(output), timestamp); - } - }; - @Override public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) { - checkState( - mainOutputSchemaCoder != null, - "Output with tag " - + mainOutputTag - + " must have a schema in order to call getRowReceiver"); - return mainRowOutputReceiver; - } - - /** A {@link MultiOutputReceiver} which caches created instances to re-use across bundles. */ - private final MultiOutputReceiver taggedOutputReceiver = - new MultiOutputReceiver() { - private final Map<TupleTag<?>, OutputReceiver<?>> taggedOutputReceivers = new HashMap<>(); - private final Map<TupleTag<?>, OutputReceiver<Row>> taggedRowReceivers = new HashMap<>(); - - private <T> OutputReceiver<T> createTaggedOutputReceiver(TupleTag<T> tag) { - if (tag == null || mainOutputTag.equals(tag)) { - return (OutputReceiver<T>) context; - } - return new OutputReceiver<T>() { - @Override - public void output(T output) { - context.outputWithTimestamp(tag, output, currentElement.getTimestamp()); - } - - @Override - public void outputWithTimestamp(T output, Instant timestamp) { - context.outputWithTimestamp(tag, output, currentElement.getTimestamp()); - } - }; - } - - private <T> OutputReceiver<Row> createTaggedRowReceiver(TupleTag<T> tag) { - if (tag == null || mainOutputTag.equals(tag)) { - checkState( - mainOutputSchemaCoder != null, - "Output with tag " - + mainOutputTag - + " must have a schema in order to call getRowReceiver"); - return mainRowOutputReceiver; - } - - Coder<T> outputCoder = (Coder<T>) outputCoders.get(tag); - checkState(outputCoder != null, "No output tag for " + tag); - checkState( - outputCoder instanceof SchemaCoder, - "Output with tag " + tag + " must have a schema in order to call getRowReceiver"); - return new OutputReceiver<Row>() { - private SerializableFunction<Row, T> fromRowFunction = - ((SchemaCoder) outputCoder).getFromRowFunction(); - - @Override - public void output(Row output) { - context.outputWithTimestamp( - tag, fromRowFunction.apply(output), currentElement.getTimestamp()); - } - - @Override - public void outputWithTimestamp(Row output, Instant timestamp) { - context.outputWithTimestamp(tag, fromRowFunction.apply(output), timestamp); - } - }; - } - - @Override - public <T> OutputReceiver<T> get(TupleTag<T> tag) { - return (OutputReceiver<T>) - taggedOutputReceivers.computeIfAbsent(tag, this::createTaggedOutputReceiver); - } - - @Override - public <T> OutputReceiver<Row> getRowReceiver(TupleTag<T> tag) { - return taggedRowReceivers.computeIfAbsent(tag, this::createTaggedRowReceiver); - } - }; + return DoFnOutputReceivers.rowReceiver(context, null, mainOutputSchemaCoder); + } @Override public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> doFn) { - return taggedOutputReceiver; + return DoFnOutputReceivers.windowedMultiReceiver(context); } @Override