gemini-code-assist[bot] commented on code in PR #38948:
URL: https://github.com/apache/beam/pull/38948#discussion_r3404920785


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -1066,7 +1066,22 @@ public void process(
 
         if (numAppends > 0) {
           initializeContexts.accept(contexts);
-          retryManager.run(true);
+          try {
+            retryManager.run(true);
+          } catch (Exception e) {
+            Status.Code statusCode = Status.fromThrowable(e).getCode();
+            String errorMessage =
+                String.format(
+                    "More than %d attempts to call AppendRows failed. Last 
encountered error: %s",
+                    maxRetries, e.toString());
+            if (statusCode == Status.Code.PERMISSION_DENIED
+                || statusCode == Status.Code.NOT_FOUND) {
+              errorMessage +=
+                  ". Please check if the destination table exists and if the 
service account has the "
+                      + "TABLES_UPDATE_DATA permission.";
+            }
+            throw new RuntimeException(errorMessage, e);
+          }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   When catching `Exception` from `retryManager.run(true)`, we should handle 
`InterruptedException` separately. Wrapping `InterruptedException` in a 
`RuntimeException` without restoring the interrupted status is a Java 
anti-pattern that can prevent clean thread shutdown in multi-threaded 
environments. Additionally, the IAM permission name should be updated to 
`bigquery.tables.updateData` to match the official GCP permission.
   
   ```java
             try {
               retryManager.run(true);
             } catch (InterruptedException e) {
               Thread.currentThread().interrupt();
               throw e;
             } catch (Exception e) {
               Status.Code statusCode = Status.fromThrowable(e).getCode();
               String errorMessage =
                   String.format(
                       "More than %d attempts to call AppendRows failed. Last 
encountered error: %s",
                       maxRetries, e.toString());
               if (statusCode == Status.Code.PERMISSION_DENIED
                   || statusCode == Status.Code.NOT_FOUND) {
                 errorMessage +=
                     ". Please check if the destination table exists and if the 
service account has the "
                         + "bigquery.tables.updateData permission.";
               }
               throw new RuntimeException(errorMessage, e);
             }
   ```



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java:
##########
@@ -1659,6 +1659,48 @@ public void 
testStreamingStorageApiWriteWithErrorHandling() throws Exception {
     storageWriteWithErrorHandling(false);
   }
 
+  @Test
+  public void testStorageApiWriteFailureExhaustedRetries() throws Exception {
+    assumeTrue(useStorageApi);
+
+    // Set up fake dataset service to return PERMISSION_DENIED for appendRows
+    fakeDatasetService.setAppendRowsError(
+        new io.grpc.StatusRuntimeException(
+            io.grpc.Status.PERMISSION_DENIED.withDescription("Missing 
permissions")));
+
+    List<Integer> elements = Lists.newArrayList(1, 2, 3);
+
+    BigQueryIO.Write<Integer> write =
+        BigQueryIO.<Integer>write()
+            .to("project-id:dataset-id.table-id")
+            
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
+            .withFormatFunction(
+                (SerializableFunction<Integer, TableRow>)
+                    input -> new TableRow().set("number", input))
+            .withSchema(
+                new TableSchema()
+                    .setFields(
+                        ImmutableList.of(
+                            new 
TableFieldSchema().setName("number").setType("INTEGER"))))
+            .withTestServices(fakeBqServices)
+            .withoutValidation();
+
+    if (useStreaming) {
+      write = write.withTriggeringFrequency(Duration.standardSeconds(30));
+    }
+
+    PCollection<Integer> input = 
p.apply(Create.of(elements).withCoder(BigEndianIntegerCoder.of()));
+    input.apply("WriteToBQ", write);
+
+    thrown.expect(RuntimeException.class);
+    thrown.expectMessage("More than");
+    thrown.expectMessage("attempts to call AppendRows failed");
+    thrown.expectMessage("PERMISSION_DENIED");
+    thrown.expectMessage("TABLES_UPDATE_DATA");

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Update the expected error message in the test to match the updated 
permission name `bigquery.tables.updateData`.
   
   ```suggestion
       thrown.expectMessage("bigquery.tables.updateData");
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.java:
##########
@@ -792,9 +792,17 @@ long flush(
 
               // Maximum number of times we retry before we fail the work item.
               if (failedContext.failureCount > allowedRetry) {
-                throw new RuntimeException(
+                String errorMessage =
                     String.format(
-                        "More than %d attempts to call AppendRows failed.", 
allowedRetry));
+                        "More than %d attempts to call AppendRows failed. Last 
encountered error: %s",
+                        allowedRetry, error != null ? error.toString() : 
"unknown");
+                if (statusCode == Status.Code.PERMISSION_DENIED
+                    || statusCode == Status.Code.NOT_FOUND) {
+                  errorMessage +=
+                      ". Please check if the destination table exists and if 
the service account has the "
+                          + "TABLES_UPDATE_DATA permission.";
+                }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The IAM permission required to write data to a BigQuery table is 
`bigquery.tables.updateData`. Using the exact permission name instead of 
`TABLES_UPDATE_DATA` makes the error message much more helpful for users 
configuring IAM roles.
   
   ```suggestion
                   if (statusCode == Status.Code.PERMISSION_DENIED
                       || statusCode == Status.Code.NOT_FOUND) {
                     errorMessage +=
                         ". Please check if the destination table exists and if 
the service account has the "
                             + "bigquery.tables.updateData permission.";
                   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to