Abacn commented on code in PR #35051:
URL: https://github.com/apache/beam/pull/35051#discussion_r2110616694
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.java:
##########
@@ -170,6 +214,29 @@ public void process(PipelineOptions pipelineOptions,
@Element KV<String, Operati
BigQuerySinkMetrics.reportFailedRPCMetrics(
failedContext, BigQuerySinkMetrics.RpcMethod.FLUSH_ROWS);
+ if (error != null && isOffsetBeyondEndOfStreamError(error)) {
+ flushOperationsOffsetBeyondEnd.inc();
+ LOG.warn(
+ "Flush of stream {} to offset {} failed because the offset
is beyond the end of the stream. "
+ + "This typically means the stream was finalized or
truncated by BQ. "
+ + "The operation will not be retried on this stream.
Error: {}",
+ streamId,
+ offset,
+ error.toString());
+ // This specific error is not retriable on the same stream.
+ // Throwing a runtime exception to break out of the RetryManager
and signal
+ // to the Beam runner that the bundle should be retried, which
will then
+ // allow an upstream DoFn to create a new stream.
+ throw new RuntimeException(
Review Comment:
If I understood correctly it does not fix the error, but processing and
re-throw an clearer error. It still rely on runner retry. Is that correct?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]