This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c006c5e2d53 Add Default method for OutputReceiver.outputWindowedValue 
(#30220)
c006c5e2d53 is described below

commit c006c5e2d53f1544719059db6c1b06ec9bece49d
Author: Yi Hu <[email protected]>
AuthorDate: Tue Feb 6 14:05:45 2024 -0500

    Add Default method for OutputReceiver.outputWindowedValue (#30220)
    
    * Revert changes of override method that simply throw 
UnsupportedOperationException
      in subclasses
---
 .../runners/core/construction/SplittableParDo.java   | 10 ----------
 .../java/org/apache/beam/sdk/transforms/DoFn.java    |  7 +++++--
 .../extensions/sql/zetasql/BeamZetaSqlCalcRel.java   | 12 ------------
 .../bigquery/StorageApiWriteUnshardedRecords.java    | 20 --------------------
 .../apache/beam/sdk/io/gcp/spanner/SpannerIO.java    | 12 ------------
 .../beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java     | 11 -----------
 .../beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java   | 13 -------------
 .../ReadFromSparkReceiverWithOffsetDoFnTest.java     | 13 -------------
 8 files changed, 5 insertions(+), 93 deletions(-)

diff --git 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index 9cf0606b68b..5ea2c4968dd 100644
--- 
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++ 
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -21,7 +21,6 @@ import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
 
 import com.google.auto.service.AutoService;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
@@ -670,15 +669,6 @@ public class SplittableParDo<InputT, OutputT, 
RestrictionT, WatermarkEstimatorSt
                     public void outputWithTimestamp(RestrictionT part, Instant 
timestamp) {
                       throw new UnsupportedOperationException();
                     }
-
-                    @Override
-                    public void outputWindowedValue(
-                        RestrictionT output,
-                        Instant timestamp,
-                        Collection<? extends BoundedWindow> windows,
-                        PaneInfo paneInfo) {
-                      throw new UnsupportedOperationException();
-                    }
                   };
                 }
 
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index c22b726c99a..78ec1a8c840 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -395,11 +395,14 @@ public abstract class DoFn<InputT extends @Nullable 
Object, OutputT extends @Nul
 
     void outputWithTimestamp(T output, Instant timestamp);
 
-    void outputWindowedValue(
+    default void outputWindowedValue(
         T output,
         Instant timestamp,
         Collection<? extends BoundedWindow> windows,
-        PaneInfo paneInfo);
+        PaneInfo paneInfo) {
+      throw new UnsupportedOperationException(
+          String.format("Not implemented: %s.outputWindowedValue", 
this.getClass().getName()));
+    }
   }
 
   /** Receives tagged output for a multi-output function. */
diff --git 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
index ad856c1c3a7..d60ebe46b37 100644
--- 
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
+++ 
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
@@ -26,7 +26,6 @@ import com.google.zetasql.PreparedExpression;
 import com.google.zetasql.Value;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.util.ArrayDeque;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -49,7 +48,6 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -363,16 +361,6 @@ public class BeamZetaSqlCalcRel extends 
AbstractBeamCalcRel {
       public void outputWithTimestamp(Row output, Instant timestamp) {
         c.output(tag, output, timestamp, w);
       }
-
-      @Override
-      public void outputWindowedValue(
-          Row output,
-          Instant timestamp,
-          Collection<? extends BoundedWindow> windows,
-          PaneInfo paneInfo) {
-        throw new UnsupportedOperationException(
-            "outputWindowedValue not supported in finish bundle here");
-      }
     }
 
     private static RuntimeException extractException(Throwable e) {
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 3c6c73dd021..846e7e3bddc 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -37,7 +37,6 @@ import io.grpc.Status;
 import java.io.IOException;
 import java.time.Instant;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -70,7 +69,6 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.Preconditions;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
@@ -1095,15 +1093,6 @@ public class 
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
                 BigQueryStorageApiInsertError output, org.joda.time.Instant 
timestamp) {
               context.output(failedRowsTag, output, timestamp, 
GlobalWindow.INSTANCE);
             }
-
-            @Override
-            public void outputWindowedValue(
-                BigQueryStorageApiInsertError output,
-                org.joda.time.Instant timestamp,
-                Collection<? extends BoundedWindow> windows,
-                PaneInfo paneInfo) {
-              throw new UnsupportedOperationException("outputWindowedValue not 
supported");
-            }
           };
       @Nullable OutputReceiver<TableRow> successfulRowsReceiver = null;
       if (successfulRowsTag != null) {
@@ -1118,15 +1107,6 @@ public class 
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
               public void outputWithTimestamp(TableRow output, 
org.joda.time.Instant timestamp) {
                 context.output(successfulRowsTag, output, timestamp, 
GlobalWindow.INSTANCE);
               }
-
-              @Override
-              public void outputWindowedValue(
-                  TableRow output,
-                  org.joda.time.Instant timestamp,
-                  Collection<? extends BoundedWindow> windows,
-                  PaneInfo paneInfo) {
-                throw new UnsupportedOperationException("outputWindowedValue 
not supported");
-              }
             };
       }
 
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index 3128de45fde..6db79ab69b4 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -56,7 +56,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -103,11 +102,9 @@ import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.Wait;
 import org.apache.beam.sdk.transforms.WithTimestamps;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.BackOff;
 import org.apache.beam.sdk.util.FluentBackoff;
@@ -2005,15 +2002,6 @@ public class SpannerIO {
       public void outputWithTimestamp(Iterable<MutationGroup> output, Instant 
timestamp) {
         c.output(output, timestamp, GlobalWindow.INSTANCE);
       }
-
-      @Override
-      public void outputWindowedValue(
-          Iterable<MutationGroup> output,
-          Instant timestamp,
-          Collection<? extends BoundedWindow> windows,
-          PaneInfo paneInfo) {
-        throw new UnsupportedOperationException("outputWindowedValue not 
supported");
-      }
     }
   }
 
diff --git 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
index 845d974af0b..48b5b060a29 100644
--- 
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
+++ 
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
@@ -50,8 +50,6 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
 import 
org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.DefaultErrorHandler;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -342,15 +340,6 @@ public class ReadFromKafkaDoFnTest {
       records.add(output);
     }
 
-    @Override
-    public void outputWindowedValue(
-        T output,
-        Instant timestamp,
-        Collection<? extends BoundedWindow> windows,
-        PaneInfo paneInfo) {
-      throw new UnsupportedOperationException("Not expecting 
outputWindowedValue");
-    }
-
     public List<T> getOutputs() {
       return this.records;
     }
diff --git 
a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java
 
b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java
index b72fe423efb..273a1915d2b 100644
--- 
a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java
+++ 
b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java
@@ -21,14 +21,11 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.internal.DefaultImplementation;
@@ -176,16 +173,6 @@ public class ReadFromPulsarDoFnTest {
       records.add(output);
     }
 
-    @Override
-    public void outputWindowedValue(
-        PulsarMessage output,
-        Instant timestamp,
-        Collection<? extends BoundedWindow> windows,
-        PaneInfo paneInfo) {
-      throw new UnsupportedOperationException(
-          "unsupported outputWindowedValue in mock outputreceiver");
-    }
-
     public List<PulsarMessage> getOutputs() {
       return records;
     }
diff --git 
a/sdks/java/io/sparkreceiver/2/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java
 
b/sdks/java/io/sparkreceiver/2/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java
index bb0e6524241..33827164c6b 100644
--- 
a/sdks/java/io/sparkreceiver/2/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java
+++ 
b/sdks/java/io/sparkreceiver/2/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java
@@ -22,15 +22,12 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.checkerframework.checker.initialization.qual.Initialized;
 import org.checkerframework.checker.nullness.qual.NonNull;
 import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
@@ -67,16 +64,6 @@ public class ReadFromSparkReceiverWithOffsetDoFnTest {
       records.add(output);
     }
 
-    @Override
-    public void outputWindowedValue(
-        String output,
-        Instant timestamp,
-        Collection<? extends BoundedWindow> windows,
-        PaneInfo paneInfo) {
-      throw new UnsupportedOperationException(
-          "Not expecting to receive call to outputWindowedValue");
-    }
-
     public List<String> getOutputs() {
       return this.records;
     }

Reply via email to