This is an automated email from the ASF dual-hosted git repository.
xqhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 5696b6e2fd3 Fix out of range (#35051)
5696b6e2fd3 is described below
commit 5696b6e2fd3f560e1a2474aaa054a5b8255cb173
Author: liferoad <[email protected]>
AuthorDate: Sun Jul 20 15:48:11 2025 -0400
Fix out of range (#35051)
* fix user metrics in @OnTimer
* minor fix
* Retry OUT_OF_RANGE errors with Bundle
* spotless
* revert some files
* minor fix
* addressed comments
* fixed the null
* clean up
---
.../bigquery/StorageApiFlushAndFinalizeDoFn.java | 62 ++++++++++++++++++++++
1 file changed, 62 insertions(+)
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java
index dec86c3360b..fd3853d15e0 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java
@@ -61,6 +61,8 @@ public class StorageApiFlushAndFinalizeDoFn extends
DoFn<KV<String, Operation>,
Metrics.counter(StorageApiFlushAndFinalizeDoFn.class,
"flushOperationsAlreadyExists");
private final Counter flushOperationsInvalidArgument =
Metrics.counter(StorageApiFlushAndFinalizeDoFn.class,
"flushOperationsInvalidArgument");
+ private final Counter flushOperationsOffsetBeyondEnd =
+ Metrics.counter(StorageApiFlushAndFinalizeDoFn.class,
"flushOperationsOffsetBeyondEnd");
private final Distribution flushLatencyDistribution =
Metrics.distribution(StorageApiFlushAndFinalizeDoFn.class,
"flushOperationLatencyMs");
private final Counter finalizeOperationsSent =
@@ -70,6 +72,52 @@ public class StorageApiFlushAndFinalizeDoFn extends
DoFn<KV<String, Operation>,
private final Counter finalizeOperationsFailed =
Metrics.counter(StorageApiFlushAndFinalizeDoFn.class,
"finalizeOperationsFailed");
+ /**
+ * Checks if the given throwable indicates that an offset is beyond the end
of a BigQuery stream.
+ * It primarily uses {@code io.grpc.Status.fromThrowable} to determine the
gRPC status code and
+ * then checks for specific message content.
+ */
+ private boolean isOffsetBeyondEndOfStreamError(Throwable t) {
+ if (t == null) {
+ return false;
+ }
+
+ // Status.fromThrowable() searches the cause chain for the most specific
gRPC status.
+ io.grpc.Status grpcStatus = io.grpc.Status.fromThrowable(t);
+
+ // Check if grpcStatus is valid and the code is OUT_OF_RANGE
+ if (grpcStatus != null && grpcStatus.getCode() ==
io.grpc.Status.Code.OUT_OF_RANGE) {
+ // The gRPC status is OUT_OF_RANGE.
+ // Now, verify the message content for the specific "is beyond the end
of the stream" text.
+ // This text might be in the grpcStatus's description, or in the message
of the original
+ // throwable 't', or one of its causes.
+
+ // Check the description from the derived gRPC status first.
+ // grpcStatus is confirmed not null here.
+ String description = grpcStatus.getDescription();
+ if (description != null
+ && description.toLowerCase().contains("is beyond the end of the
stream")) {
+ return true;
+ }
+
+ // If the description didn't match, iterate through the exception chain
of 't'
+ // to find a message that confirms the "offset beyond end of stream"
scenario.
+ Throwable currentThrowable = t;
+ while (currentThrowable != null) {
+ String message = currentThrowable.getMessage();
+ if (message != null && message.toLowerCase().contains("is beyond the
end of the stream")) {
+ // If any exception in the chain has this message, and the overall
gRPC status
+ // (determined by Status.fromThrowable(t)) is OUT_OF_RANGE, we
consider it a match.
+ return true;
+ }
+ currentThrowable = currentThrowable.getCause();
+ }
+ }
+ // If grpcStatus was null, or the gRPC status code was not OUT_OF_RANGE,
+ // or if it was OUT_OF_RANGE but no matching message was found.
+ return false;
+ }
+
@DefaultSchema(JavaFieldSchema.class)
static class Operation implements Comparable<Operation>, Serializable {
final long flushOffset;
@@ -186,6 +234,20 @@ public class StorageApiFlushAndFinalizeDoFn extends
DoFn<KV<String, Operation>,
if (statusCode.equals(Code.NOT_FOUND)) {
return RetryType.DONT_RETRY;
}
+
+ // check the offset beyond the end of the stream
+ if (isOffsetBeyondEndOfStreamError(error)) {
+ flushOperationsOffsetBeyondEnd.inc();
+ LOG.warn(
+ "Flush of stream {} to offset {} failed because the offset
is beyond the end of the stream. "
+ + "This typically means the stream was finalized or
truncated by BQ. "
+ + "The operation will not be retried on this stream.
Error: {}",
+ streamId,
+ offset,
+ error.toString());
+ // This specific error is not retriable on the same stream.
+ return RetryType.DONT_RETRY;
+ }
}
return RetryType.RETRY_ALL_OPERATIONS;
},