This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch revert-25245-beam13015.5
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 8e9279c8a88107d863a8b698f074b0ec8cf8b573
Author: Yi Hu <huu...@gmail.com>
AuthorDate: Mon Feb 6 10:27:22 2023 -0500

    Revert "Optimize to use cached output receiver instead of creating one on 
DoFn invocation #21250 (#25245)"
    
    This reverts commit b1c9d8aec07ce72e946bd349eb4345417efbfc9c.
---
 .../beam/sdk/transforms/DoFnOutputReceivers.java   |   2 +-
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    | 310 ++-------------------
 2 files changed, 17 insertions(+), 295 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java
index 27fbb9754ec..a17264da35d 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java
@@ -115,7 +115,7 @@ public class DoFnOutputReceivers {
       checkState(outputCoder != null, "No output tag for " + tag);
       checkState(
           outputCoder instanceof SchemaCoder,
-          "Output with tag " + tag + " must have a schema in order to call 
getRowReceiver");
+          "Output with tag " + tag + " must have a schema in order to call " + 
" getRowReceiver");
       return DoFnOutputReceivers.rowReceiver(context, tag, (SchemaCoder<T>) 
outputCoder);
     }
   }
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 13d85d27006..2b449e0200b 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -85,6 +85,7 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
 import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver;
 import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
+import org.apache.beam.sdk.transforms.DoFnOutputReceivers;
 import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
@@ -2412,7 +2413,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
 
   /** Base implementation that does not override methods which need to be 
window aware. */
   private abstract class ProcessBundleContextBase extends DoFn<InputT, 
OutputT>.ProcessContext
-      implements DoFnInvoker.ArgumentProvider<InputT, OutputT>, 
OutputReceiver<OutputT> {
+      implements DoFnInvoker.ArgumentProvider<InputT, OutputT> {
 
     private ProcessBundleContextBase() {
       doFn.super();
@@ -2477,112 +2478,17 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
 
     @Override
     public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
-      return this;
+      return DoFnOutputReceivers.windowedReceiver(this, null);
     }
 
-    private final OutputReceiver<Row> mainRowOutputReceiver =
-        mainOutputSchemaCoder == null
-            ? null
-            : new OutputReceiver<Row>() {
-              private final SerializableFunction<Row, OutputT> fromRowFunction 
=
-                  mainOutputSchemaCoder.getFromRowFunction();
-
-              @Override
-              public void output(Row output) {
-                ProcessBundleContextBase.this.outputWithTimestamp(
-                    fromRowFunction.apply(output), 
currentElement.getTimestamp());
-              }
-
-              @Override
-              public void outputWithTimestamp(Row output, Instant timestamp) {
-                ProcessBundleContextBase.this.outputWithTimestamp(
-                    fromRowFunction.apply(output), timestamp);
-              }
-            };
-
     @Override
     public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
-      checkState(
-          mainOutputSchemaCoder != null,
-          "Output with tag "
-              + mainOutputTag
-              + " must have a schema in order to call getRowReceiver");
-      return mainRowOutputReceiver;
-    }
-
-    /** A {@link MultiOutputReceiver} which caches created instances to re-use 
across bundles. */
-    private final MultiOutputReceiver taggedOutputReceiver =
-        new MultiOutputReceiver() {
-          private final Map<TupleTag<?>, OutputReceiver<?>> 
taggedOutputReceivers = new HashMap<>();
-          private final Map<TupleTag<?>, OutputReceiver<Row>> 
taggedRowReceivers = new HashMap<>();
-
-          private <T> OutputReceiver<T> createTaggedOutputReceiver(TupleTag<T> 
tag) {
-            if (tag == null || mainOutputTag.equals(tag)) {
-              return (OutputReceiver<T>) ProcessBundleContextBase.this;
-            }
-            return new OutputReceiver<T>() {
-              @Override
-              public void output(T output) {
-                ProcessBundleContextBase.this.outputWithTimestamp(
-                    tag, output, currentElement.getTimestamp());
-              }
-
-              @Override
-              public void outputWithTimestamp(T output, Instant timestamp) {
-                ProcessBundleContextBase.this.outputWithTimestamp(
-                    tag, output, currentElement.getTimestamp());
-              }
-            };
-          }
-
-          private <T> OutputReceiver<Row> createTaggedRowReceiver(TupleTag<T> 
tag) {
-            if (tag == null || mainOutputTag.equals(tag)) {
-              checkState(
-                  mainOutputSchemaCoder != null,
-                  "Output with tag "
-                      + mainOutputTag
-                      + " must have a schema in order to call getRowReceiver");
-              return mainRowOutputReceiver;
-            }
-
-            Coder<T> outputCoder = (Coder<T>) outputCoders.get(tag);
-            checkState(outputCoder != null, "No output tag for " + tag);
-            checkState(
-                outputCoder instanceof SchemaCoder,
-                "Output with tag " + tag + " must have a schema in order to 
call getRowReceiver");
-            return new OutputReceiver<Row>() {
-              private SerializableFunction<Row, T> fromRowFunction =
-                  ((SchemaCoder) outputCoder).getFromRowFunction();
-
-              @Override
-              public void output(Row output) {
-                ProcessBundleContextBase.this.outputWithTimestamp(
-                    tag, fromRowFunction.apply(output), 
currentElement.getTimestamp());
-              }
-
-              @Override
-              public void outputWithTimestamp(Row output, Instant timestamp) {
-                ProcessBundleContextBase.this.outputWithTimestamp(
-                    tag, fromRowFunction.apply(output), timestamp);
-              }
-            };
-          }
-
-          @Override
-          public <T> OutputReceiver<T> get(TupleTag<T> tag) {
-            return (OutputReceiver<T>)
-                taggedOutputReceivers.computeIfAbsent(tag, 
this::createTaggedOutputReceiver);
-          }
-
-          @Override
-          public <T> OutputReceiver<Row> getRowReceiver(TupleTag<T> tag) {
-            return taggedRowReceivers.computeIfAbsent(tag, 
this::createTaggedRowReceiver);
-          }
-        };
+      return DoFnOutputReceivers.rowReceiver(this, null, 
mainOutputSchemaCoder);
+    }
 
     @Override
     public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> 
doFn) {
-      return taggedOutputReceiver;
+      return DoFnOutputReceivers.windowedMultiReceiver(this, outputCoders);
     }
 
     @Override
@@ -2657,8 +2563,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
    * DoFn.OnWindowExpiration @OnWindowExpiration}.
    */
   private class OnWindowExpirationContext<K> extends 
BaseArgumentProvider<InputT, OutputT> {
-    private class Context extends DoFn<InputT, 
OutputT>.OnWindowExpirationContext
-        implements OutputReceiver<OutputT> {
+    private class Context extends DoFn<InputT, 
OutputT>.OnWindowExpirationContext {
       private Context() {
         doFn.super();
       }
@@ -2766,108 +2671,17 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
 
     @Override
     public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
-      return context;
+      return DoFnOutputReceivers.windowedReceiver(context, null);
     }
 
-    private final OutputReceiver<Row> mainRowOutputReceiver =
-        mainOutputSchemaCoder == null
-            ? null
-            : new OutputReceiver<Row>() {
-              private final SerializableFunction<Row, OutputT> fromRowFunction 
=
-                  mainOutputSchemaCoder.getFromRowFunction();
-
-              @Override
-              public void output(Row output) {
-                context.outputWithTimestamp(
-                    fromRowFunction.apply(output), 
currentElement.getTimestamp());
-              }
-
-              @Override
-              public void outputWithTimestamp(Row output, Instant timestamp) {
-                context.outputWithTimestamp(fromRowFunction.apply(output), 
timestamp);
-              }
-            };
-
     @Override
     public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
-      checkState(
-          mainOutputSchemaCoder != null,
-          "Output with tag "
-              + mainOutputTag
-              + " must have a schema in order to call getRowReceiver");
-      return mainRowOutputReceiver;
-    }
-
-    /** A {@link MultiOutputReceiver} which caches created instances to re-use 
across bundles. */
-    private final MultiOutputReceiver taggedOutputReceiver =
-        new MultiOutputReceiver() {
-          private final Map<TupleTag<?>, OutputReceiver<?>> 
taggedOutputReceivers = new HashMap<>();
-          private final Map<TupleTag<?>, OutputReceiver<Row>> 
taggedRowReceivers = new HashMap<>();
-
-          private <T> OutputReceiver<T> createTaggedOutputReceiver(TupleTag<T> 
tag) {
-            if (tag == null || mainOutputTag.equals(tag)) {
-              return (OutputReceiver<T>) context;
-            }
-            return new OutputReceiver<T>() {
-              @Override
-              public void output(T output) {
-                context.outputWithTimestamp(tag, output, 
currentElement.getTimestamp());
-              }
-
-              @Override
-              public void outputWithTimestamp(T output, Instant timestamp) {
-                context.outputWithTimestamp(tag, output, 
currentElement.getTimestamp());
-              }
-            };
-          }
-
-          private <T> OutputReceiver<Row> createTaggedRowReceiver(TupleTag<T> 
tag) {
-            if (tag == null || mainOutputTag.equals(tag)) {
-              checkState(
-                  mainOutputSchemaCoder != null,
-                  "Output with tag "
-                      + mainOutputTag
-                      + " must have a schema in order to call getRowReceiver");
-              return mainRowOutputReceiver;
-            }
-
-            Coder<T> outputCoder = (Coder<T>) outputCoders.get(tag);
-            checkState(outputCoder != null, "No output tag for " + tag);
-            checkState(
-                outputCoder instanceof SchemaCoder,
-                "Output with tag " + tag + " must have a schema in order to 
call getRowReceiver");
-            return new OutputReceiver<Row>() {
-              private SerializableFunction<Row, T> fromRowFunction =
-                  ((SchemaCoder) outputCoder).getFromRowFunction();
-
-              @Override
-              public void output(Row output) {
-                context.outputWithTimestamp(
-                    tag, fromRowFunction.apply(output), 
currentElement.getTimestamp());
-              }
-
-              @Override
-              public void outputWithTimestamp(Row output, Instant timestamp) {
-                context.outputWithTimestamp(tag, 
fromRowFunction.apply(output), timestamp);
-              }
-            };
-          }
-
-          @Override
-          public <T> OutputReceiver<T> get(TupleTag<T> tag) {
-            return (OutputReceiver<T>)
-                taggedOutputReceivers.computeIfAbsent(tag, 
this::createTaggedOutputReceiver);
-          }
-
-          @Override
-          public <T> OutputReceiver<Row> getRowReceiver(TupleTag<T> tag) {
-            return taggedRowReceivers.computeIfAbsent(tag, 
this::createTaggedRowReceiver);
-          }
-        };
+      return DoFnOutputReceivers.rowReceiver(context, null, 
mainOutputSchemaCoder);
+    }
 
     @Override
     public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> 
doFn) {
-      return taggedOutputReceiver;
+      return DoFnOutputReceivers.windowedMultiReceiver(context);
     }
 
     @Override
@@ -2902,8 +2716,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
   /** Provides arguments for a {@link DoFnInvoker} for {@link DoFn.OnTimer 
@OnTimer}. */
   private class OnTimerContext<K> extends BaseArgumentProvider<InputT, 
OutputT> {
 
-    private class Context extends DoFn<InputT, OutputT>.OnTimerContext
-        implements OutputReceiver<OutputT> {
+    private class Context extends DoFn<InputT, OutputT>.OnTimerContext {
       private Context() {
         doFn.super();
       }
@@ -3027,108 +2840,17 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
 
     @Override
     public OutputReceiver<OutputT> outputReceiver(DoFn<InputT, OutputT> doFn) {
-      return context;
+      return DoFnOutputReceivers.windowedReceiver(context, null);
     }
 
-    private final OutputReceiver<Row> mainRowOutputReceiver =
-        mainOutputSchemaCoder == null
-            ? null
-            : new OutputReceiver<Row>() {
-              private final SerializableFunction<Row, OutputT> fromRowFunction 
=
-                  mainOutputSchemaCoder.getFromRowFunction();
-
-              @Override
-              public void output(Row output) {
-                context.outputWithTimestamp(
-                    fromRowFunction.apply(output), 
currentElement.getTimestamp());
-              }
-
-              @Override
-              public void outputWithTimestamp(Row output, Instant timestamp) {
-                context.outputWithTimestamp(fromRowFunction.apply(output), 
timestamp);
-              }
-            };
-
     @Override
     public OutputReceiver<Row> outputRowReceiver(DoFn<InputT, OutputT> doFn) {
-      checkState(
-          mainOutputSchemaCoder != null,
-          "Output with tag "
-              + mainOutputTag
-              + " must have a schema in order to call getRowReceiver");
-      return mainRowOutputReceiver;
-    }
-
-    /** A {@link MultiOutputReceiver} which caches created instances to re-use 
across bundles. */
-    private final MultiOutputReceiver taggedOutputReceiver =
-        new MultiOutputReceiver() {
-          private final Map<TupleTag<?>, OutputReceiver<?>> 
taggedOutputReceivers = new HashMap<>();
-          private final Map<TupleTag<?>, OutputReceiver<Row>> 
taggedRowReceivers = new HashMap<>();
-
-          private <T> OutputReceiver<T> createTaggedOutputReceiver(TupleTag<T> 
tag) {
-            if (tag == null || mainOutputTag.equals(tag)) {
-              return (OutputReceiver<T>) context;
-            }
-            return new OutputReceiver<T>() {
-              @Override
-              public void output(T output) {
-                context.outputWithTimestamp(tag, output, 
currentElement.getTimestamp());
-              }
-
-              @Override
-              public void outputWithTimestamp(T output, Instant timestamp) {
-                context.outputWithTimestamp(tag, output, 
currentElement.getTimestamp());
-              }
-            };
-          }
-
-          private <T> OutputReceiver<Row> createTaggedRowReceiver(TupleTag<T> 
tag) {
-            if (tag == null || mainOutputTag.equals(tag)) {
-              checkState(
-                  mainOutputSchemaCoder != null,
-                  "Output with tag "
-                      + mainOutputTag
-                      + " must have a schema in order to call getRowReceiver");
-              return mainRowOutputReceiver;
-            }
-
-            Coder<T> outputCoder = (Coder<T>) outputCoders.get(tag);
-            checkState(outputCoder != null, "No output tag for " + tag);
-            checkState(
-                outputCoder instanceof SchemaCoder,
-                "Output with tag " + tag + " must have a schema in order to 
call getRowReceiver");
-            return new OutputReceiver<Row>() {
-              private SerializableFunction<Row, T> fromRowFunction =
-                  ((SchemaCoder) outputCoder).getFromRowFunction();
-
-              @Override
-              public void output(Row output) {
-                context.outputWithTimestamp(
-                    tag, fromRowFunction.apply(output), 
currentElement.getTimestamp());
-              }
-
-              @Override
-              public void outputWithTimestamp(Row output, Instant timestamp) {
-                context.outputWithTimestamp(tag, 
fromRowFunction.apply(output), timestamp);
-              }
-            };
-          }
-
-          @Override
-          public <T> OutputReceiver<T> get(TupleTag<T> tag) {
-            return (OutputReceiver<T>)
-                taggedOutputReceivers.computeIfAbsent(tag, 
this::createTaggedOutputReceiver);
-          }
-
-          @Override
-          public <T> OutputReceiver<Row> getRowReceiver(TupleTag<T> tag) {
-            return taggedRowReceivers.computeIfAbsent(tag, 
this::createTaggedRowReceiver);
-          }
-        };
+      return DoFnOutputReceivers.rowReceiver(context, null, 
mainOutputSchemaCoder);
+    }
 
     @Override
     public MultiOutputReceiver taggedOutputReceiver(DoFn<InputT, OutputT> 
doFn) {
-      return taggedOutputReceiver;
+      return DoFnOutputReceivers.windowedMultiReceiver(context);
     }
 
     @Override

Reply via email to