reuvenlax commented on code in PR #38058:
URL: https://github.com/apache/beam/pull/38058#discussion_r3121493300
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiDynamicDestinations.java:
##########
@@ -32,17 +34,26 @@ public interface MessageConverter<T> {
DescriptorProtos.DescriptorProto getDescriptor(boolean includeCdcColumns)
throws Exception;
StorageApiWritePayload toMessage(
- T element, @Nullable RowMutationInformation rowMutationInformation)
throws Exception;
+ T element,
+ @Nullable RowMutationInformation rowMutationInformation,
+ TableRowToStorageApiProto.ErrorCollector collectedExceptions)
+ throws Exception;
TableRow toFailsafeTableRow(T element);
+
+ void updateSchemaFromTable() throws IOException, InterruptedException;
Review Comment:
let me think about this a bit more. in the past such logic has caused
problems with pipeline update
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -884,72 +1003,107 @@ public void process(
}
}
};
- Instant now = Instant.now();
- List<AppendRowsContext> contexts = Lists.newArrayList();
- RetryManager<AppendRowsResponse, AppendRowsContext> retryManager =
- new RetryManager<>(
- Duration.standardSeconds(1),
- Duration.standardSeconds(20),
- maxRetries,
-
BigQuerySinkMetrics.throttledTimeCounter(BigQuerySinkMetrics.RpcMethod.APPEND_ROWS));
- int numAppends = 0;
- for (SplittingIterable.Value splitValue : messages) {
- // Handle the case of a row that is too large.
- if (splitValue.getProtoRows().getSerializedSize() >= maxRequestSize) {
- if (splitValue.getProtoRows().getSerializedRowsCount() > 1) {
- // TODO(reuvenlax): Is it worth trying to handle this case by
splitting the protoRows?
- // Given that we split
- // the ProtoRows iterable at 2MB and the max request size is 10MB,
this scenario seems
- // nearly impossible.
- LOG.error(
- "A request containing more than one row is over the request
size limit of {}. This is unexpected. All rows in the request will be sent to
the failed-rows PCollection.",
- maxRequestSize);
- }
- for (int i = 0; i <
splitValue.getProtoRows().getSerializedRowsCount(); ++i) {
- org.joda.time.Instant timestamp =
splitValue.getTimestamps().get(i);
- TableRow failedRow = splitValue.getFailsafeTableRows().get(i);
- if (failedRow == null) {
- ByteString rowBytes =
splitValue.getProtoRows().getSerializedRows(i);
- failedRow = appendClientInfo.get().toTableRow(rowBytes,
Predicates.alwaysTrue());
- }
- o.get(failedRowsTag)
- .outputWithTimestamp(
- new BigQueryStorageApiInsertError(
- failedRow,
- "Row payload too large. Maximum size " +
maxRequestSize,
- tableReference),
- timestamp);
- }
- int numRowsFailed =
splitValue.getProtoRows().getSerializedRowsCount();
- rowsSentToFailedRowsCollection.inc(numRowsFailed);
- BigQuerySinkMetrics.appendRowsRowStatusCounter(
- BigQuerySinkMetrics.RowStatus.FAILED,
- BigQuerySinkMetrics.PAYLOAD_TOO_LARGE,
- shortTableId)
- .inc(numRowsFailed);
- } else {
- ++numAppends;
- // RetryManager
- AppendRowsContext context =
- new AppendRowsContext(
- element.getKey(),
- splitValue.getProtoRows(),
- splitValue.getTimestamps(),
- splitValue.getFailsafeTableRows());
- contexts.add(context);
- retryManager.addOperation(runOperation, onError, onSuccess, context);
-
recordsAppended.inc(splitValue.getProtoRows().getSerializedRowsCount());
-
appendSizeDistribution.update(context.protoRows.getSerializedRowsCount());
+
+ BackOff backoff =
+ FluentBackoff.DEFAULT
+ .withInitialBackoff(Duration.standardSeconds(1))
+ .withMaxBackoff(Duration.standardMinutes(1))
+ .withMaxRetries(500)
+ .withThrottledTimeCounter(
+ BigQuerySinkMetrics.throttledTimeCounter(
+ BigQuerySinkMetrics.RpcMethod.OPEN_WRITE_STREAM))
+ .backoff();
+ CreateRetryManagerResult<DestinationT> createRetryManagerResult;
+ do {
+ // Each ProtoRows object contains at most 1MB of rows.
+ // TODO: Push messageFromTableRow up to top level. That we we cans
skip TableRow entirely if
+ // already proto or already schema.
+ Iterable<SplittingIterable.Value> messages =
+ new SplittingIterable(
+ element.getValue(),
+ splitSize,
+ // Unknown field merger
+ (bytes, tableRow) ->
+ appendClientInfo.get().mergeNewFields(bytes, tableRow,
ignoreUnknownValues),
+ // Convert back to TableRow
+ bytes -> appendClientInfo.get().toTableRow(bytes,
Predicates.alwaysTrue()),
+ // Failed rows consumer
+ (failedRow, errorMessage) -> {
+ o.get(failedRowsTag)
+ .outputWithTimestamp(
+ new BigQueryStorageApiInsertError(
+ failedRow.getValue(), errorMessage,
tableReference),
+ failedRow.getTimestamp());
+ rowsSentToFailedRowsCollection.inc();
+ BigQuerySinkMetrics.appendRowsRowStatusCounter(
+ BigQuerySinkMetrics.RowStatus.FAILED,
+ BigQuerySinkMetrics.PAYLOAD_TOO_LARGE,
+ shortTableId)
+ .inc(1);
+ },
+ // Get the currently-known TableSchema hash
+ () -> appendClientInfo.get().getTableSchemaHash(),
+ () ->
+ TableRowToStorageApiProto.wrapDescriptorProto(
+ messageConverter.getDescriptor(false)),
+ autoUpdateSchema,
+ elementTs);
+
+ createRetryManagerResult =
Review Comment:
but we need it outside of the do loop, so we can't
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -833,6 +950,8 @@ public void process(
|| statusCode.equals(Code.INVALID_ARGUMENT)
|| statusCode.equals(Code.NOT_FOUND)
|| statusCode.equals(Code.FAILED_PRECONDITION);
+ streamDoesNotExist = streamDoesNotExist && !schemaMismatchError;
+
if (offsetMismatch || streamDoesNotExist) {
appendOffsetFailures.inc();
Review Comment:
we want to reset and create a new stream "connection" (reset the RPC). We
specifically do not want to delete the stream and recreate it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]