reuvenlax commented on code in PR #35478: URL: https://github.com/apache/beam/pull/35478#discussion_r2301559694
########## sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java: ########## @@ -889,31 +889,67 @@ public void process( // Given that we split // the ProtoRows iterable at 2MB and the max request size is 10MB, this scenario seems // nearly impossible. - LOG.error( - "A request containing more than one row is over the request size limit of " - + maxRequestSize - + ". This is unexpected. All rows in the request will be sent to the failed-rows PCollection."); + LOG.warn( + "A request containing more than one row is over the request size limit of {}. " + + "This is unexpected. BigQueryIO will now split the request and send valid rows individually.", + maxRequestSize); } + // We will split the ProtoRows and send them in smaller batches. + ProtoRows.Builder nextRows = ProtoRows.newBuilder(); + List<org.joda.time.Instant> nextTimestamps = Lists.newArrayList(); + List<@Nullable TableRow> nextFailsafeTableRows = Lists.newArrayList(); + for (int i = 0; i < splitValue.getProtoRows().getSerializedRowsCount(); ++i) { - org.joda.time.Instant timestamp = splitValue.getTimestamps().get(i); - TableRow failedRow = splitValue.getFailsafeTableRows().get(i); - if (failedRow == null) { - ByteString rowBytes = splitValue.getProtoRows().getSerializedRows(i); - failedRow = appendClientInfo.get().toTableRow(rowBytes, Predicates.alwaysTrue()); + ByteString rowBytes = splitValue.getProtoRows().getSerializedRows(i); + if (nextRows.build().getSerializedSize() + rowBytes.size() >= maxRequestSize + && nextRows.getSerializedRowsCount() > 0) { + AppendRowsContext context = + new AppendRowsContext( + element.getKey(), nextRows.build(), nextTimestamps, nextFailsafeTableRows); + contexts.add(context); + retryManager.addOperation(runOperation, onError, onSuccess, context); + recordsAppended.inc(nextRows.getSerializedRowsCount()); + appendSizeDistribution.update(context.protoRows.getSerializedRowsCount()); + ++numAppends; + + nextRows = ProtoRows.newBuilder(); + nextTimestamps = Lists.newArrayList(); + nextFailsafeTableRows = Lists.newArrayList(); + } + if (rowBytes.size() >= maxRequestSize) { Review Comment: I think filtering of too-large rows should be happening earlier in the pipeline in StorageApiConvertMessages.java -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org