This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cd00549dd8 Do not suppress downstream Exception in BigQueryIO Storage 
write API FailedStorageApiInserts Handling (#31506)
9cd00549dd8 is described below

commit 9cd00549dd8661d2a86543d35efa099c176c9649
Author: Yi Hu <ya...@google.com>
AuthorDate: Wed Jun 5 19:12:59 2024 -0400

    Do not suppress downstream Exception in BigQueryIO Storage write API 
FailedStorageApiInserts Handling (#31506)
---
 .../sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java   | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
index ed13e6338f8..02d91e46d69 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java
@@ -678,6 +678,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, 
ElementT>
                   // Convert the message to a TableRow and send it to the 
failedRows collection.
                   ByteString protoBytes = 
failedContext.protoRows.getSerializedRows(failedIndex);
                   org.joda.time.Instant timestamp = 
failedContext.timestamps.get(failedIndex);
+                  BigQueryStorageApiInsertError element = null;
                   try {
                     TableRow failedRow =
                         TableRowToStorageApiProto.tableRowFromMessage(
@@ -687,13 +688,16 @@ public class 
StorageApiWriteUnshardedRecords<DestinationT, ElementT>
                                         .getDescriptor()),
                                 protoBytes),
                             true);
-                    failedRowsReceiver.outputWithTimestamp(
+                    element =
                         new BigQueryStorageApiInsertError(
-                            failedRow, 
error.getRowIndexToErrorMessage().get(failedIndex)),
-                        timestamp);
+                            failedRow, 
error.getRowIndexToErrorMessage().get(failedIndex));
                   } catch (Exception e) {
                     LOG.error("Failed to insert row and could not parse the 
result!", e);
                   }
+                  // output outside try {} clause to avoid suppress downstream 
Exception
+                  if (element != null) {
+                    failedRowsReceiver.outputWithTimestamp(element, timestamp);
+                  }
                 }
                 int numRowsFailed = failedRowIndices.size();
                 rowsSentToFailedRowsCollection.inc(numRowsFailed);

Reply via email to