gemini-code-assist[bot] commented on code in PR #38592:
URL: https://github.com/apache/beam/pull/38592#discussion_r3291532998


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -232,21 +334,47 @@ private void executeMonitorHeld(Runnable work, long 
workBytes) {
             try {
               work.run();
             } finally {
-              decrementCounters(workBytes);
+              decrementCounters(1, 0L);
             }
           });
-    } catch (RuntimeException e) {
-      // If the execute() call threw an exception, decrement counters here.
-      decrementCounters(workBytes);
-      throw e;
+    } catch (Throwable e) {
+      decrementCounters(1, 0L);
+      throw ExceptionUtils.propagate(e);
+    }
+  }
+
+  @VisibleForTesting
+  BoundedQueueExecutorWorkHandleImpl createEmptyBudgetHandle() {
+    return new BoundedQueueExecutorWorkHandleImpl(0, 0L);
+  }
+
+  /**
+   * Poll additional work to be executed inline inside with the current 
execute(ExecutableWork work,
+   * long workBytes) call. It is the responsibility of the caller to execute 
or discard the returned
+   * ExecutableWork. Budget for the returned work is released when the 
execute() call finishes.
+   *
+   * @param handle the handle that was passed to ExecutableWork.executeWorkFn
+   */
+  public Optional<ExecutableWork> pollWork(BoundedQueueExecutorWorkHandle 
handle) {
+    BoundedQueueExecutorWorkHandleImpl internalHandle = 
(BoundedQueueExecutorWorkHandleImpl) handle;

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Casting `handle` directly to `BoundedQueueExecutorWorkHandleImpl` is unsafe. 
While it's currently the only implementation, `BoundedQueueExecutorWorkHandle` 
is a public interface. If a different implementation is passed, this will throw 
a `ClassCastException`. Additionally, it should handle the case where `handle` 
might be null.
   
   ```java
       if (!(handle instanceof BoundedQueueExecutorWorkHandleImpl)) {
         return Optional.empty();
       }
       BoundedQueueExecutorWorkHandleImpl internalHandle = 
(BoundedQueueExecutorWorkHandleImpl) handle;
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -221,8 +244,87 @@ public String summaryHtml() {
     }
   }
 
-  private void executeMonitorHeld(Runnable work, long workBytes) {
+  class BoundedQueueExecutorWorkHandleImpl
+      implements BoundedQueueExecutorWorkHandle, AutoCloseable {
+
+    private int elements;
+    private long bytes;
+    private boolean closed = false;
+
+    private BoundedQueueExecutorWorkHandleImpl(int elements, long bytes) {
+      this.elements = elements;
+      this.bytes = bytes;
+    }
+
+    public synchronized void addBudget(int elements, long bytes) {
+      Preconditions.checkState(!closed, "Cannot add budget to a closed 
WorkBudgetHandle");
+      this.elements += elements;
+      this.bytes += bytes;
+    }
+
+    public synchronized void cancel() {
+      this.closed = true;
+    }
+
+    @Override
+    public synchronized void close() {
+      Preconditions.checkArgument(!closed);
+      closed = true;
+      decrementCounters(this.elements, this.bytes);
+    }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The `close()` method should be idempotent as per the `AutoCloseable` 
contract. Currently, calling `close()` twice will throw an 
`IllegalArgumentException` due to the `Preconditions.checkArgument(!closed)` 
check. This is problematic because `QueuedWork.run()` calls `handle.close()` in 
a `finally` block, and if the user code also closes the handle, the original 
exception (if any) will be suppressed by this new exception.
   
   ```suggestion
       @Override
       public synchronized void close() {
         if (closed) {
           return;
         }
         closed = true;
         decrementCounters(this.elements, this.bytes);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to