reuvenlax commented on code in PR #35478:
URL: https://github.com/apache/beam/pull/35478#discussion_r2301559694


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.java:
##########
@@ -889,31 +889,67 @@ public void process(
             // Given that we split
             // the ProtoRows iterable at 2MB and the max request size is 10MB, 
this scenario seems
             // nearly impossible.
-            LOG.error(
-                "A request containing more than one row is over the request 
size limit of "
-                    + maxRequestSize
-                    + ". This is unexpected. All rows in the request will be 
sent to the failed-rows PCollection.");
+            LOG.warn(
+                "A request containing more than one row is over the request 
size limit of {}. "
+                    + "This is unexpected. BigQueryIO will now split the 
request and send valid rows individually.",
+                maxRequestSize);
           }
+          // We will split the ProtoRows and send them in smaller batches.
+          ProtoRows.Builder nextRows = ProtoRows.newBuilder();
+          List<org.joda.time.Instant> nextTimestamps = Lists.newArrayList();
+          List<@Nullable TableRow> nextFailsafeTableRows = 
Lists.newArrayList();
+
           for (int i = 0; i < 
splitValue.getProtoRows().getSerializedRowsCount(); ++i) {
-            org.joda.time.Instant timestamp = 
splitValue.getTimestamps().get(i);
-            TableRow failedRow = splitValue.getFailsafeTableRows().get(i);
-            if (failedRow == null) {
-              ByteString rowBytes = 
splitValue.getProtoRows().getSerializedRows(i);
-              failedRow = appendClientInfo.get().toTableRow(rowBytes, 
Predicates.alwaysTrue());
+            ByteString rowBytes = 
splitValue.getProtoRows().getSerializedRows(i);
+            if (nextRows.build().getSerializedSize() + rowBytes.size() >= 
maxRequestSize
+                && nextRows.getSerializedRowsCount() > 0) {
+              AppendRowsContext context =
+                  new AppendRowsContext(
+                      element.getKey(), nextRows.build(), nextTimestamps, 
nextFailsafeTableRows);
+              contexts.add(context);
+              retryManager.addOperation(runOperation, onError, onSuccess, 
context);
+              recordsAppended.inc(nextRows.getSerializedRowsCount());
+              
appendSizeDistribution.update(context.protoRows.getSerializedRowsCount());
+              ++numAppends;
+
+              nextRows = ProtoRows.newBuilder();
+              nextTimestamps = Lists.newArrayList();
+              nextFailsafeTableRows = Lists.newArrayList();
+            }
+            if (rowBytes.size() >= maxRequestSize) {

Review Comment:
   I think filtering of too-large rows should be happening earlier in the 
pipeline in StorageApiConvertMessages.java



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to