This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c006c5e2d53 Add Default method for OutputReceiver.outputWindowedValue
(#30220)
c006c5e2d53 is described below
commit c006c5e2d53f1544719059db6c1b06ec9bece49d
Author: Yi Hu <[email protected]>
AuthorDate: Tue Feb 6 14:05:45 2024 -0500
Add Default method for OutputReceiver.outputWindowedValue (#30220)
* Revert changes of override method that simply throw
UnsupportedOperationException
in subclasses
---
.../runners/core/construction/SplittableParDo.java | 10 ----------
.../java/org/apache/beam/sdk/transforms/DoFn.java | 7 +++++--
.../extensions/sql/zetasql/BeamZetaSqlCalcRel.java | 12 ------------
.../bigquery/StorageApiWriteUnshardedRecords.java | 20 --------------------
.../apache/beam/sdk/io/gcp/spanner/SpannerIO.java | 12 ------------
.../beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java | 11 -----------
.../beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java | 13 -------------
.../ReadFromSparkReceiverWithOffsetDoFnTest.java | 13 -------------
8 files changed, 5 insertions(+), 93 deletions(-)
diff --git
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
index 9cf0606b68b..5ea2c4968dd 100644
---
a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
+++
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java
@@ -21,7 +21,6 @@ import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Pr
import com.google.auto.service.AutoService;
import java.io.IOException;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
@@ -670,15 +669,6 @@ public class SplittableParDo<InputT, OutputT,
RestrictionT, WatermarkEstimatorSt
public void outputWithTimestamp(RestrictionT part, Instant
timestamp) {
throw new UnsupportedOperationException();
}
-
- @Override
- public void outputWindowedValue(
- RestrictionT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo paneInfo) {
- throw new UnsupportedOperationException();
- }
};
}
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index c22b726c99a..78ec1a8c840 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -395,11 +395,14 @@ public abstract class DoFn<InputT extends @Nullable
Object, OutputT extends @Nul
void outputWithTimestamp(T output, Instant timestamp);
- void outputWindowedValue(
+ default void outputWindowedValue(
T output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
- PaneInfo paneInfo);
+ PaneInfo paneInfo) {
+ throw new UnsupportedOperationException(
+ String.format("Not implemented: %s.outputWindowedValue",
this.getClass().getName()));
+ }
}
/** Receives tagged output for a multi-output function. */
diff --git
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
index ad856c1c3a7..d60ebe46b37 100644
---
a/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
+++
b/sdks/java/extensions/sql/zetasql/src/main/java/org/apache/beam/sdk/extensions/sql/zetasql/BeamZetaSqlCalcRel.java
@@ -26,7 +26,6 @@ import com.google.zetasql.PreparedExpression;
import com.google.zetasql.Value;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayDeque;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -49,7 +48,6 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
@@ -363,16 +361,6 @@ public class BeamZetaSqlCalcRel extends
AbstractBeamCalcRel {
public void outputWithTimestamp(Row output, Instant timestamp) {
c.output(tag, output, timestamp, w);
}
-
- @Override
- public void outputWindowedValue(
- Row output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo paneInfo) {
- throw new UnsupportedOperationException(
- "outputWindowedValue not supported in finish bundle here");
- }
}
private static RuntimeException extractException(Throwable e) {
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index 3c6c73dd021..846e7e3bddc 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -37,7 +37,6 @@ import io.grpc.Status;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
-import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -70,7 +69,6 @@ import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
@@ -1095,15 +1093,6 @@ public class
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
BigQueryStorageApiInsertError output, org.joda.time.Instant
timestamp) {
context.output(failedRowsTag, output, timestamp,
GlobalWindow.INSTANCE);
}
-
- @Override
- public void outputWindowedValue(
- BigQueryStorageApiInsertError output,
- org.joda.time.Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo paneInfo) {
- throw new UnsupportedOperationException("outputWindowedValue not
supported");
- }
};
@Nullable OutputReceiver<TableRow> successfulRowsReceiver = null;
if (successfulRowsTag != null) {
@@ -1118,15 +1107,6 @@ public class
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
public void outputWithTimestamp(TableRow output,
org.joda.time.Instant timestamp) {
context.output(successfulRowsTag, output, timestamp,
GlobalWindow.INSTANCE);
}
-
- @Override
- public void outputWindowedValue(
- TableRow output,
- org.joda.time.Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo paneInfo) {
- throw new UnsupportedOperationException("outputWindowedValue
not supported");
- }
};
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index 3128de45fde..6db79ab69b4 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -56,7 +56,6 @@ import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -103,11 +102,9 @@ import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.Wait;
import org.apache.beam.sdk.transforms.WithTimestamps;
import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.FluentBackoff;
@@ -2005,15 +2002,6 @@ public class SpannerIO {
public void outputWithTimestamp(Iterable<MutationGroup> output, Instant
timestamp) {
c.output(output, timestamp, GlobalWindow.INSTANCE);
}
-
- @Override
- public void outputWindowedValue(
- Iterable<MutationGroup> output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo paneInfo) {
- throw new UnsupportedOperationException("outputWindowedValue not
supported");
- }
}
}
diff --git
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
index 845d974af0b..48b5b060a29 100644
---
a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
+++
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFnTest.java
@@ -50,8 +50,6 @@ import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
import
org.apache.beam.sdk.transforms.errorhandling.ErrorHandler.DefaultErrorHandler;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollection.IsBounded;
@@ -342,15 +340,6 @@ public class ReadFromKafkaDoFnTest {
records.add(output);
}
- @Override
- public void outputWindowedValue(
- T output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo paneInfo) {
- throw new UnsupportedOperationException("Not expecting
outputWindowedValue");
- }
-
public List<T> getOutputs() {
return this.records;
}
diff --git
a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java
b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java
index b72fe423efb..273a1915d2b 100644
---
a/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java
+++
b/sdks/java/io/pulsar/src/test/java/org/apache/beam/sdk/io/pulsar/ReadFromPulsarDoFnTest.java
@@ -21,14 +21,11 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.internal.DefaultImplementation;
@@ -176,16 +173,6 @@ public class ReadFromPulsarDoFnTest {
records.add(output);
}
- @Override
- public void outputWindowedValue(
- PulsarMessage output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo paneInfo) {
- throw new UnsupportedOperationException(
- "unsupported outputWindowedValue in mock outputreceiver");
- }
-
public List<PulsarMessage> getOutputs() {
return records;
}
diff --git
a/sdks/java/io/sparkreceiver/2/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java
b/sdks/java/io/sparkreceiver/2/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java
index bb0e6524241..33827164c6b 100644
---
a/sdks/java/io/sparkreceiver/2/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java
+++
b/sdks/java/io/sparkreceiver/2/src/test/java/org/apache/beam/sdk/io/sparkreceiver/ReadFromSparkReceiverWithOffsetDoFnTest.java
@@ -22,15 +22,12 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.List;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
@@ -67,16 +64,6 @@ public class ReadFromSparkReceiverWithOffsetDoFnTest {
records.add(output);
}
- @Override
- public void outputWindowedValue(
- String output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo paneInfo) {
- throw new UnsupportedOperationException(
- "Not expecting to receive call to outputWindowedValue");
- }
-
public List<String> getOutputs() {
return this.records;
}