gemini-code-assist[bot] commented on code in PR #38768:
URL: https://github.com/apache/beam/pull/38768#discussion_r3340876671


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -378,34 +382,34 @@ private void issueMultiChunkRequest(long id, 
PendingRequest pendingRequest)
 
   @AutoValue
   abstract static class PendingRequest {
-
-    private static PendingRequest create(
-        String computationId, WorkItemCommitRequest request, 
Consumer<CommitStatus> onDone) {
-      return new AutoValue_GrpcCommitWorkStream_PendingRequest(computationId, 
request, onDone);
+    static PendingRequest create(
+        String computationId,
+        long shardingKey,
+        ByteString serializedCommit,
+        StreamingCommitRequestChunk.CommitType commitType,
+        Consumer<CommitStatus> onDone) {
+      return new AutoValue_GrpcCommitWorkStream_PendingRequest(
+          computationId, shardingKey, serializedCommit, commitType, onDone);
     }
 
     abstract String computationId();
 
-    abstract WorkItemCommitRequest request();
+    abstract long shardingKey();
+
+    abstract ByteString serializedCommit();
+
+    abstract StreamingCommitRequestChunk.CommitType commitType();
 
     abstract Consumer<CommitStatus> onDone();
 
     private long getBytes() {
-      return (long) request().getSerializedSize() + computationId().length();
-    }
-
-    private ByteString serializedCommit() {
-      return request().toByteString();
+      return (long) serializedCommit().size() + computationId().length();
     }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   ### Performance Optimization: Defer Serialization to the Stream Writer Thread
   
   Currently, `PendingRequest` eagerly serializes the commit request to a 
`ByteString` on the committer thread (inside `commitWorkItem` and 
`commitMultiKeyWorkItem`). Eager serialization on the committer thread can 
become a performance bottleneck under high throughput.
   
   By holding the `MessageLite` (or the vendored equivalent 
`org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.MessageLite`) directly 
in `PendingRequest` and using `@AutoValue.Memoized` to lazily serialize it, we 
can:
   1. Defer the serialization overhead to the stream writer thread (when 
`serializedCommit()` is actually called).
   2. Ensure that serialization is only performed once and cached, even if 
`serializedCommit()` is accessed multiple times.
   
   ```java
       static PendingRequest create(
           String computationId,
           long shardingKey,
           com.google.protobuf.MessageLite request,
           StreamingCommitRequestChunk.CommitType commitType,
           Consumer<CommitStatus> onDone) {
         return new AutoValue_GrpcCommitWorkStream_PendingRequest(
             computationId, shardingKey, request, commitType, onDone);
       }
   
       abstract String computationId();
   
       abstract long shardingKey();
   
       abstract com.google.protobuf.MessageLite request();
   
       abstract StreamingCommitRequestChunk.CommitType commitType();
   
       abstract Consumer<CommitStatus> onDone();
   
       private long getBytes() {
         return (long) serializedCommit().size() + computationId().length();
       }
   
       @AutoValue.Memoized
       ByteString serializedCommit() {
         return request().toByteString();
       }
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -462,7 +466,34 @@ public boolean commitWorkItem(
         return false;
       }
 
-      PendingRequest request = PendingRequest.create(computation, 
commitRequest, onDone);
+      PendingRequest request =
+          PendingRequest.create(
+              computation,
+              commitRequest.getShardingKey(),
+              commitRequest.toByteString(),
+              StreamingCommitRequestChunk.CommitType.COMMIT_TYPE_SINGLE_KEY,
+              onDone);
+      add(idGenerator.incrementAndGet(), request);
+      return true;
+    }
+
+    @Override
+    public boolean commitMultiKeyWorkItem(
+        String computation,
+        Windmill.MultiKeyWorkItemCommitRequest commitRequest,
+        Consumer<CommitStatus> onDone) {
+      if (!canAccept(commitRequest.getSerializedSize() + 
computation.length())) {
+        return false;
+      }
+      Preconditions.checkArgument(commitRequest.getRequestsCount() > 0);
+      PendingRequest request =
+          PendingRequest.create(
+              computation,
+              // Any key in the batch for routing
+              commitRequest.getRequests(0).getShardingKey(),
+              commitRequest.toByteString(),
+              StreamingCommitRequestChunk.CommitType.COMMIT_TYPE_MULTI_KEY,
+              onDone);

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Pass the `commitRequest` directly to `PendingRequest.create` to support lazy 
serialization.
   
   ```suggestion
         PendingRequest request =
             PendingRequest.create(
                 computation,
                 // Any key in the batch for routing
                 commitRequest.getRequests(0).getShardingKey(),
                 commitRequest,
                 StreamingCommitRequestChunk.CommitType.COMMIT_TYPE_MULTI_KEY,
                 onDone);
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -462,7 +466,34 @@ public boolean commitWorkItem(
         return false;
       }
 
-      PendingRequest request = PendingRequest.create(computation, 
commitRequest, onDone);
+      PendingRequest request =
+          PendingRequest.create(
+              computation,
+              commitRequest.getShardingKey(),
+              commitRequest.toByteString(),
+              StreamingCommitRequestChunk.CommitType.COMMIT_TYPE_SINGLE_KEY,
+              onDone);

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Pass the `commitRequest` directly to `PendingRequest.create` to support lazy 
serialization.
   
   ```suggestion
         PendingRequest request =
             PendingRequest.create(
                 computation,
                 commitRequest.getShardingKey(),
                 commitRequest,
                 StreamingCommitRequestChunk.CommitType.COMMIT_TYPE_SINGLE_KEY,
                 onDone);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to