This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 9cd00549dd8 Do not suppress downstream Exception in BigQueryIO Storage write API FailedStorageApiInserts Handling (#31506) 9cd00549dd8 is described below commit 9cd00549dd8661d2a86543d35efa099c176c9649 Author: Yi Hu <ya...@google.com> AuthorDate: Wed Jun 5 19:12:59 2024 -0400 Do not suppress downstream Exception in BigQueryIO Storage write API FailedStorageApiInserts Handling (#31506) --- .../sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java index ed13e6338f8..02d91e46d69 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java @@ -678,6 +678,7 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> // Convert the message to a TableRow and send it to the failedRows collection. ByteString protoBytes = failedContext.protoRows.getSerializedRows(failedIndex); org.joda.time.Instant timestamp = failedContext.timestamps.get(failedIndex); + BigQueryStorageApiInsertError element = null; try { TableRow failedRow = TableRowToStorageApiProto.tableRowFromMessage( @@ -687,13 +688,16 @@ public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> .getDescriptor()), protoBytes), true); - failedRowsReceiver.outputWithTimestamp( + element = new BigQueryStorageApiInsertError( - failedRow, error.getRowIndexToErrorMessage().get(failedIndex)), - timestamp); + failedRow, error.getRowIndexToErrorMessage().get(failedIndex)); } catch (Exception e) { LOG.error("Failed to insert row and could not parse the result!", e); } + // output outside try {} clause to avoid suppress downstream Exception + if (element != null) { + failedRowsReceiver.outputWithTimestamp(element, timestamp); + } } int numRowsFailed = failedRowIndices.size(); rowsSentToFailedRowsCollection.inc(numRowsFailed);