This is an automated email from the ASF dual-hosted git repository.
xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a3af5d54e25 feat(BigQueryWriteSchemaTransformProvider): add output
collection names and error handling check (#36937)
a3af5d54e25 is described below
commit a3af5d54e257fc5da8e923916d8956ef1f31f1b3
Author: liferoad <[email protected]>
AuthorDate: Sat Jan 3 13:49:53 2026 -0500
feat(BigQueryWriteSchemaTransformProvider): add output collection names and
error handling check (#36937)
Add support for failed rows output collections and modify bounded check to
consider error handling configuration
---
.../providers/BigQueryWriteSchemaTransformProvider.java | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java
index abab169d693..a741c637a19 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.io.gcp.bigquery.providers;
import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
import com.google.auto.service.AutoService;
+import java.util.Arrays;
+import java.util.List;
import org.apache.beam.model.pipeline.v1.ExternalTransforms;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
@@ -47,6 +49,11 @@ public class BigQueryWriteSchemaTransformProvider
return getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE);
}
+ @Override
+ public List<String> outputCollectionNames() {
+ return Arrays.asList("FailedRows", "FailedRowsWithErrors", "errors");
+ }
+
@Override
protected SchemaTransform from(BigQueryWriteConfiguration configuration) {
return new BigQueryWriteSchemaTransform(configuration);
@@ -62,9 +69,10 @@ public class BigQueryWriteSchemaTransformProvider
@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
- if
(input.getSinglePCollection().isBounded().equals(PCollection.IsBounded.BOUNDED))
{
+ if
(input.getSinglePCollection().isBounded().equals(PCollection.IsBounded.BOUNDED)
+ && configuration.getErrorHandling() == null) {
return input.apply(new
BigQueryFileLoadsSchemaTransformProvider().from(configuration));
- } else { // UNBOUNDED
+ } else { // UNBOUNDED or error handling specified
return input.apply(
new
BigQueryStorageWriteApiSchemaTransformProvider().from(configuration));
}