liferoad commented on code in PR #35051:
URL: https://github.com/apache/beam/pull/35051#discussion_r2110675591


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java:
##########
@@ -170,6 +214,29 @@ public void process(PipelineOptions pipelineOptions, 
@Element KV<String, Operati
             BigQuerySinkMetrics.reportFailedRPCMetrics(
                 failedContext, BigQuerySinkMetrics.RpcMethod.FLUSH_ROWS);
 
+            if (error != null && isOffsetBeyondEndOfStreamError(error)) {
+              flushOperationsOffsetBeyondEnd.inc();
+              LOG.warn(
+                  "Flush of stream {} to offset {} failed because the offset 
is beyond the end of the stream. "
+                      + "This typically means the stream was finalized or 
truncated by BQ. "
+                      + "The operation will not be retried on this stream. 
Error: {}",
+                  streamId,
+                  offset,
+                  error.toString());
+              // This specific error is not retriable on the same stream.
+              // Throwing a runtime exception to break out of the RetryManager 
and signal
+              // to the Beam runner that the bundle should be retried, which 
will then
+              // allow an upstream DoFn to create a new stream.
+              throw new RuntimeException(

Review Comment:
   yes, it fails the entire bundle and track the error counts. I hope this 
could clean up the wrong stream. Maybe we could just `RetryType.DONT_RETRY`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to