This is an automated email from the ASF dual-hosted git repository.

xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5696b6e2fd3 Fix out of range (#35051)
5696b6e2fd3 is described below

commit 5696b6e2fd3f560e1a2474aaa054a5b8255cb173
Author: liferoad <[email protected]>
AuthorDate: Sun Jul 20 15:48:11 2025 -0400

    Fix out of range (#35051)
    
    * fix user metrics in @OnTimer
    
    * minor fix
    
    * Retry OUT_OF_RANGE errors with Bundle
    
    * spotless
    
    * revert some files
    
    * minor fix
    
    * addressed comments
    
    * fixed the null
    
    * clean up
---
 .../bigquery/StorageApiFlushAndFinalizeDoFn.java   | 62 ++++++++++++++++++++++
 1 file changed, 62 insertions(+)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java
index dec86c3360b..fd3853d15e0 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java
@@ -61,6 +61,8 @@ public class StorageApiFlushAndFinalizeDoFn extends 
DoFn<KV<String, Operation>,
       Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, 
"flushOperationsAlreadyExists");
   private final Counter flushOperationsInvalidArgument =
       Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, 
"flushOperationsInvalidArgument");
+  private final Counter flushOperationsOffsetBeyondEnd =
+      Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, 
"flushOperationsOffsetBeyondEnd");
   private final Distribution flushLatencyDistribution =
       Metrics.distribution(StorageApiFlushAndFinalizeDoFn.class, 
"flushOperationLatencyMs");
   private final Counter finalizeOperationsSent =
@@ -70,6 +72,52 @@ public class StorageApiFlushAndFinalizeDoFn extends 
DoFn<KV<String, Operation>,
   private final Counter finalizeOperationsFailed =
       Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, 
"finalizeOperationsFailed");
 
+  /**
+   * Checks if the given throwable indicates that an offset is beyond the end 
of a BigQuery stream.
+   * It primarily uses {@code io.grpc.Status.fromThrowable} to determine the 
gRPC status code and
+   * then checks for specific message content.
+   */
+  private boolean isOffsetBeyondEndOfStreamError(Throwable t) {
+    if (t == null) {
+      return false;
+    }
+
+    // Status.fromThrowable() searches the cause chain for the most specific 
gRPC status.
+    io.grpc.Status grpcStatus = io.grpc.Status.fromThrowable(t);
+
+    // Check if grpcStatus is valid and the code is OUT_OF_RANGE
+    if (grpcStatus != null && grpcStatus.getCode() == 
io.grpc.Status.Code.OUT_OF_RANGE) {
+      // The gRPC status is OUT_OF_RANGE.
+      // Now, verify the message content for the specific "is beyond the end 
of the stream" text.
+      // This text might be in the grpcStatus's description, or in the message 
of the original
+      // throwable 't', or one of its causes.
+
+      // Check the description from the derived gRPC status first.
+      // grpcStatus is confirmed not null here.
+      String description = grpcStatus.getDescription();
+      if (description != null
+          && description.toLowerCase().contains("is beyond the end of the 
stream")) {
+        return true;
+      }
+
+      // If the description didn't match, iterate through the exception chain 
of 't'
+      // to find a message that confirms the "offset beyond end of stream" 
scenario.
+      Throwable currentThrowable = t;
+      while (currentThrowable != null) {
+        String message = currentThrowable.getMessage();
+        if (message != null && message.toLowerCase().contains("is beyond the 
end of the stream")) {
+          // If any exception in the chain has this message, and the overall 
gRPC status
+          // (determined by Status.fromThrowable(t)) is OUT_OF_RANGE, we 
consider it a match.
+          return true;
+        }
+        currentThrowable = currentThrowable.getCause();
+      }
+    }
+    // If grpcStatus was null, or the gRPC status code was not OUT_OF_RANGE,
+    // or if it was OUT_OF_RANGE but no matching message was found.
+    return false;
+  }
+
   @DefaultSchema(JavaFieldSchema.class)
   static class Operation implements Comparable<Operation>, Serializable {
     final long flushOffset;
@@ -186,6 +234,20 @@ public class StorageApiFlushAndFinalizeDoFn extends 
DoFn<KV<String, Operation>,
               if (statusCode.equals(Code.NOT_FOUND)) {
                 return RetryType.DONT_RETRY;
               }
+
+              // check the offset beyond the end of the stream
+              if (isOffsetBeyondEndOfStreamError(error)) {
+                flushOperationsOffsetBeyondEnd.inc();
+                LOG.warn(
+                    "Flush of stream {} to offset {} failed because the offset 
is beyond the end of the stream. "
+                        + "This typically means the stream was finalized or 
truncated by BQ. "
+                        + "The operation will not be retried on this stream. 
Error: {}",
+                    streamId,
+                    offset,
+                    error.toString());
+                // This specific error is not retriable on the same stream.
+                return RetryType.DONT_RETRY;
+              }
             }
             return RetryType.RETRY_ALL_OPERATIONS;
           },

Reply via email to