This is an automated email from the ASF dual-hosted git repository.

xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a3af5d54e25 feat(BigQueryWriteSchemaTransformProvider): add output 
collection names and error handling check (#36937)
a3af5d54e25 is described below

commit a3af5d54e257fc5da8e923916d8956ef1f31f1b3
Author: liferoad <[email protected]>
AuthorDate: Sat Jan 3 13:49:53 2026 -0500

    feat(BigQueryWriteSchemaTransformProvider): add output collection names and 
error handling check (#36937)
    
    Add support for failed rows output collections and modify bounded check to 
consider error handling configuration
---
 .../providers/BigQueryWriteSchemaTransformProvider.java      | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java
index abab169d693..a741c637a19 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryWriteSchemaTransformProvider.java
@@ -20,6 +20,8 @@ package org.apache.beam.sdk.io.gcp.bigquery.providers;
 import static org.apache.beam.sdk.util.construction.BeamUrns.getUrn;
 
 import com.google.auto.service.AutoService;
+import java.util.Arrays;
+import java.util.List;
 import org.apache.beam.model.pipeline.v1.ExternalTransforms;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.schemas.NoSuchSchemaException;
@@ -47,6 +49,11 @@ public class BigQueryWriteSchemaTransformProvider
     return getUrn(ExternalTransforms.ManagedTransforms.Urns.BIGQUERY_WRITE);
   }
 
+  @Override
+  public List<String> outputCollectionNames() {
+    return Arrays.asList("FailedRows", "FailedRowsWithErrors", "errors");
+  }
+
   @Override
   protected SchemaTransform from(BigQueryWriteConfiguration configuration) {
     return new BigQueryWriteSchemaTransform(configuration);
@@ -62,9 +69,10 @@ public class BigQueryWriteSchemaTransformProvider
 
     @Override
     public PCollectionRowTuple expand(PCollectionRowTuple input) {
-      if 
(input.getSinglePCollection().isBounded().equals(PCollection.IsBounded.BOUNDED))
 {
+      if 
(input.getSinglePCollection().isBounded().equals(PCollection.IsBounded.BOUNDED)
+          && configuration.getErrorHandling() == null) {
         return input.apply(new 
BigQueryFileLoadsSchemaTransformProvider().from(configuration));
-      } else { // UNBOUNDED
+      } else { // UNBOUNDED or error handling specified
         return input.apply(
             new 
BigQueryStorageWriteApiSchemaTransformProvider().from(configuration));
       }

Reply via email to