gemini-code-assist[bot] commented on code in PR #39085:
URL: https://github.com/apache/beam/pull/39085#discussion_r3466849490


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -224,14 +228,48 @@ public void onResponse(StreamingCommitResponse response) {
       failureHandler.throwIfNonEmpty();
     }
 
-    @Override
     @SuppressWarnings("ReferenceEquality")
+    private boolean belongsToThisHandler(StreamAndRequest streamAndRequest) {
+      return streamAndRequest.handler == this;
+    }
+
+    @Override
     public boolean hasPendingRequests() {
-      return pending.entrySet().stream().anyMatch(e -> e.getValue().handler == 
this);
+      return pending.entrySet().stream().anyMatch(e -> 
belongsToThisHandler(e.getValue()));
     }
 
     @Override
+    @SuppressWarnings("ReferenceEquality")
     public void onDone(Status status) {
+      if (maxRetryDuration.compareTo(Duration.ZERO) > 0) {
+        // Remove the requests that have exceeded the retry time so they are 
not retried.
+        long startTimeRetryThresholdNanos = System.nanoTime() - 
maxRetryDuration.toNanos();
+        Iterator<Map.Entry<Long, StreamAndRequest>> iterator = 
pending.entrySet().iterator();
+        int keptRequests = 0, removedRequests = 0;
+        while (iterator.hasNext()) {
+          StreamAndRequest streamAndRequest = 
checkNotNull(iterator.next().getValue());
+          PendingRequest pendingRequest = streamAndRequest.request;
+          if (!belongsToThisHandler(streamAndRequest)
+              || pendingRequest.getStartTimeNanos() > 
startTimeRetryThresholdNanos) {
+            ++keptRequests;
+            continue;
+          }
+          ++removedRequests;
+          try {
+            pendingRequest.completeWithStatus(CommitStatus.ABORTED);
+            iterator.remove();
+          } catch (RuntimeException e) {
+            // Ignore exceptions and retry the commit.
+            LOG.warn("Exception while aborting commit due to retry timeout.", 
e);
+          }
+        }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   There are two issues in this retry timeout cleanup block:
   
   1. **Robustness & Memory Leak**: If 
`pendingRequest.completeWithStatus(CommitStatus.ABORTED)` throws a 
`RuntimeException`, `iterator.remove()` is skipped. This leaves the timed-out 
request in the `pending` map, causing it to be retried on stream reconnect and 
potentially leaking memory. We should remove the request from the iterator 
first before invoking the callback.
   2. **`System.nanoTime()` Comparison Safety**: Comparing absolute `nanoTime` 
values directly using `>` can fail if the values wrap around (which can happen 
on long-running JVMs). Using `System.nanoTime() - 
pendingRequest.getStartTimeNanos() < maxRetryDurationNanos` is safe against 
wrap-around and is also much more readable as it directly represents the 
elapsed time.
   
   ```java
         if (maxRetryDuration.compareTo(Duration.ZERO) > 0) {
           // Remove the requests that have exceeded the retry time so they are 
not retried.
           long maxRetryDurationNanos = maxRetryDuration.toNanos();
           Iterator<Map.Entry<Long, StreamAndRequest>> iterator = 
pending.entrySet().iterator();
           int keptRequests = 0, removedRequests = 0;
           while (iterator.hasNext()) {
             StreamAndRequest streamAndRequest = 
checkNotNull(iterator.next().getValue());
             PendingRequest pendingRequest = streamAndRequest.request;
             if (!belongsToThisHandler(streamAndRequest)
                 || System.nanoTime() - pendingRequest.getStartTimeNanos() < 
maxRetryDurationNanos) {
               ++keptRequests;
               continue;
             }
             ++removedRequests;
             iterator.remove();
             try {
               pendingRequest.completeWithStatus(CommitStatus.ABORTED);
             } catch (RuntimeException e) {
               LOG.warn("Exception while aborting commit due to retry 
timeout.", e);
             }
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to