gemini-code-assist[bot] commented on code in PR #39085:
URL: https://github.com/apache/beam/pull/39085#discussion_r3466139826


##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java:
##########
@@ -196,6 +196,13 @@ public interface DataflowStreamingPipelineOptions extends 
PipelineOptions {
 
   void setStuckCommitDurationMillis(int value);
 
+  @Description(
+      "Retry commits on stream errors until this much time has elapsed since 
the commit was scheduled. If zero, retry forever.")
+  @Default.InstanceFactory(CommitWorkStreamRetryTimeoutMillisFactory.class)
+  long getCommitWorkStreamRetryTimeoutMillis();
+
+  void getCommitWorkStreamRetryTimeoutMillis(long value);

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The setter method is incorrectly named 
`getCommitWorkStreamRetryTimeoutMillis` instead of 
`setCommitWorkStreamRetryTimeoutMillis`. This violates JavaBean naming 
conventions and will cause issues with `PipelineOptions` serialization and 
command-line parsing.
   
   ```suggestion
     void setCommitWorkStreamRetryTimeoutMillis(long value);
   ```



##########
runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowStreamingPipelineOptions.java:
##########
@@ -343,4 +350,15 @@ public Boolean create(PipelineOptions options) {
       return ExperimentalOptions.hasExperiment(options, 
"enable_windmill_service_direct_path");
     }
   }
+
+  /** defaults to false unless one of the experiment is set. */

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The Javadoc comment states that the option `defaults to false`, but the 
factory actually returns a `Long` representing 30 minutes in milliseconds (or 
`0L` if disabled). Update the comment to accurately describe the default value.
   
   ```suggestion
     /** Defaults to 30 minutes unless the disable_commit_retry_timeout 
experiment is set. */
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -224,14 +228,50 @@ public void onResponse(StreamingCommitResponse response) {
       failureHandler.throwIfNonEmpty();
     }
 
-    @Override
     @SuppressWarnings("ReferenceEquality")
+    private boolean belongsToThisHandler(StreamAndRequest streamAndRequest) {
+      return streamAndRequest.handler == this;
+    }
+
+    @Override
     public boolean hasPendingRequests() {
-      return pending.entrySet().stream().anyMatch(e -> e.getValue().handler == 
this);
+      return pending.entrySet().stream().anyMatch(e -> 
belongsToThisHandler(e.getValue()));
     }
 
     @Override
+    @SuppressWarnings("ReferenceEquality")
     public void onDone(Status status) {
+      if (maxRetryDuration.compareTo(Duration.ZERO) > 0) {
+        // Remove the requests that have exceeded the retry time so they are 
not retried.
+        long startTimeRetryThresholdMsec = System.currentTimeMillis() - 
maxRetryDuration.toMillis();
+        Iterator<Map.Entry<Long, StreamAndRequest>> iterator = 
pending.entrySet().iterator();
+        int keptRequests = 0, removedRequests = 0;
+        while (iterator.hasNext()) {
+          StreamAndRequest streamAndRequest = 
checkNotNull(iterator.next().getValue());
+          PendingRequest pendingRequest = streamAndRequest.request;
+          if (!belongsToThisHandler(streamAndRequest)
+              || pendingRequest.getStartTimeMillis() > 
startTimeRetryThresholdMsec) {
+            ++keptRequests;
+            continue;

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using `System.currentTimeMillis()` to measure elapsed time or timeouts is 
susceptible to system clock adjustments (e.g., NTP syncs). It is highly 
recommended to use a monotonic clock like `System.nanoTime()` for calculating 
elapsed time.
   
   ```suggestion
           long maxRetryNanos = maxRetryDuration.toNanos();
           long nowNanos = System.nanoTime();
           Iterator<Map.Entry<Long, StreamAndRequest>> iterator = 
pending.entrySet().iterator();
           int keptRequests = 0, removedRequests = 0;
           while (iterator.hasNext()) {
             StreamAndRequest streamAndRequest = 
checkNotNull(iterator.next().getValue());
             PendingRequest pendingRequest = streamAndRequest.request;
             if (!belongsToThisHandler(streamAndRequest)
                 || (nowNanos - pendingRequest.getStartTimeNanos()) <= 
maxRetryNanos) {
               ++keptRequests;
               continue;
             }
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -224,14 +228,50 @@ public void onResponse(StreamingCommitResponse response) {
       failureHandler.throwIfNonEmpty();
     }
 
-    @Override
     @SuppressWarnings("ReferenceEquality")
+    private boolean belongsToThisHandler(StreamAndRequest streamAndRequest) {
+      return streamAndRequest.handler == this;
+    }
+
+    @Override
     public boolean hasPendingRequests() {
-      return pending.entrySet().stream().anyMatch(e -> e.getValue().handler == 
this);
+      return pending.entrySet().stream().anyMatch(e -> 
belongsToThisHandler(e.getValue()));
     }
 
     @Override
+    @SuppressWarnings("ReferenceEquality")
     public void onDone(Status status) {
+      if (maxRetryDuration.compareTo(Duration.ZERO) > 0) {
+        // Remove the requests that have exceeded the retry time so they are 
not retried.
+        long startTimeRetryThresholdMsec = System.currentTimeMillis() - 
maxRetryDuration.toMillis();
+        Iterator<Map.Entry<Long, StreamAndRequest>> iterator = 
pending.entrySet().iterator();
+        int keptRequests = 0, removedRequests = 0;
+        while (iterator.hasNext()) {
+          StreamAndRequest streamAndRequest = 
checkNotNull(iterator.next().getValue());
+          PendingRequest pendingRequest = streamAndRequest.request;
+          if (!belongsToThisHandler(streamAndRequest)
+              || pendingRequest.getStartTimeMillis() > 
startTimeRetryThresholdMsec) {
+            ++keptRequests;
+            continue;
+          }
+          ++removedRequests;
+          try {
+            pendingRequest.completeWithStatus(CommitStatus.ABORTED);
+          } catch (RuntimeException e) {
+            // Catch possible exceptions to ensure that an exception for one 
commit does not prevent
+            // other commits from being processed. Aggregate all the failures 
to throw after
+            // processing the response if they exist.

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The comment mentions aggregating all failures to throw after processing, but 
the catch block only logs a warning and does not aggregate or rethrow any 
exceptions. Update the comment to reflect the actual behavior.
   
   ```suggestion
               // Catch possible exceptions to ensure that an exception for one 
commit does not prevent
               // other commits from being processed.
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -376,34 +416,46 @@ private void issueMultiChunkRequest(long id, 
PendingRequest pendingRequest)
     }
   }
 
-  @AutoValue
-  abstract static class PendingRequest {
+  private static class PendingRequest {
+    private final String computationId;
+    private final WorkItemCommitRequest request;
+    private final Consumer<CommitStatus> onDone;
+    private final long startTimeMillis;
 
-    private static PendingRequest create(
+    private PendingRequest(
         String computationId, WorkItemCommitRequest request, 
Consumer<CommitStatus> onDone) {
-      return new AutoValue_GrpcCommitWorkStream_PendingRequest(computationId, 
request, onDone);
+      this.computationId = computationId;
+      this.request = request;
+      this.onDone = onDone;
+      this.startTimeMillis = System.currentTimeMillis();
     }
 
-    abstract String computationId();
+    String getComputationId() {
+      return computationId;
+    }
 
-    abstract WorkItemCommitRequest request();
+    WorkItemCommitRequest getRequest() {
+      return request;
+    }
 
-    abstract Consumer<CommitStatus> onDone();
+    long getStartTimeMillis() {
+      return startTimeMillis;
+    }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   To support monotonic time tracking using `System.nanoTime()`, update 
`PendingRequest` to store and expose `startTimeNanos` instead of 
`startTimeMillis`.
   
   ```java
       private final long startTimeNanos;
   
       private PendingRequest(
           String computationId, WorkItemCommitRequest request, 
Consumer<CommitStatus> onDone) {
         this.computationId = computationId;
         this.request = request;
         this.onDone = onDone;
         this.startTimeNanos = System.nanoTime();
       }
   
       String getComputationId() {
         return computationId;
       }
   
       WorkItemCommitRequest getRequest() {
         return request;
       }
   
       long getStartTimeNanos() {
         return startTimeNanos;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to