gemini-code-assist[bot] commented on code in PR #38592:
URL: https://github.com/apache/beam/pull/38592#discussion_r3285138426


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -221,32 +228,54 @@ public String summaryHtml() {
     }
   }
 
-  private void executeMonitorHeld(Runnable work, long workBytes) {
+  private void executeMonitorHeld(ExecutableWork work, long workBytes) {
     bytesOutstanding += workBytes;
     ++elementsOutstanding;
     monitor.leave();
 
+    executor.execute(
+        () -> {
+          // Any execution exception thrown by work.run() propagates uncaught, 
triggering
+          // the default JVM UncaughtExceptionHandler which immediately 
crashes/terminates
+          // the JVM. Since the process exits immediately, reclaiming resource 
budgets in
+          // this JVM is unnecessary. Furthermore, since a failed execution 
does not return
+          // a WorkResult, we do not have a good/accurate fallback value to 
decrement.
+          WorkResult result = work.run();
+          decrementCounters(result);
+        });

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The `executeMonitorHeld` method for `ExecutableWork` lacks error handling 
for task submission and execution failures, which can lead to resource leaks in 
`elementsOutstanding` and `bytesOutstanding`. 
   
   1. If `executor.execute()` throws a `RejectedExecutionException` (e.g., 
during worker shutdown), the counters incremented just before will never be 
decremented, leading to a permanent capacity leak in the JVM.
   2. If `work.run()` throws an exception and the JVM does not immediately 
terminate (e.g., if the exception is caught by the thread pool's uncaught 
exception handler but the process continues), the counters will also leak.
   
   This implementation should be made consistent with the `Runnable` version of 
`executeMonitorHeld` (lines 248-272), which correctly handles these scenarios. 
Even if a JVM crash is expected on work failure, providing a defensive 
decrement (using the initial `workBytes` as a fallback) is safer.
   
   ```java
       try {
         executor.execute(
             () -> {
               WorkResult result = null;
               try {
                 result = work.run();
               } finally {
                 // Any execution exception thrown by work.run() propagates 
uncaught, triggering
                 // the default JVM UncaughtExceptionHandler which immediately 
crashes/terminates
                 // the JVM. Since the process exits immediately, reclaiming 
resource budgets in
                 // this JVM is unnecessary. Furthermore, since a failed 
execution does not return
                 // a WorkResult, we fallback to decrementing the initial 
budget to prevent
                 // resource leaks in case the JVM does not crash.
                 decrementCounters(result != null ? result : 
WorkResult.create(1, workBytes));
               }
             });
       } catch (Throwable e) {
         // If the execute() call threw an exception, decrement counters here.
         decrementCounters(WorkResult.create(1, workBytes));
         throw ExceptionUtils.propagate(e);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to