gemini-code-assist[bot] commented on code in PR #38592:
URL: https://github.com/apache/beam/pull/38592#discussion_r3291547255


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -232,21 +334,48 @@ private void executeMonitorHeld(Runnable work, long 
workBytes) {
             try {
               work.run();
             } finally {
-              decrementCounters(workBytes);
+              decrementCounters(1, 0L);
             }
           });
-    } catch (RuntimeException e) {
-      // If the execute() call threw an exception, decrement counters here.
-      decrementCounters(workBytes);
-      throw e;
+    } catch (Throwable e) {
+      decrementCounters(1, 0L);
+      throw ExceptionUtils.propagate(e);
+    }
+  }
+
+  @VisibleForTesting
+  BoundedQueueExecutorWorkHandleImpl createEmptyBudgetHandle() {
+    return new BoundedQueueExecutorWorkHandleImpl(0, 0L);
+  }
+
+  /**
+   * Poll additional work to be executed inline inside with the current 
execute(ExecutableWork work,
+   * long workBytes) call. It is the responsibility of the caller to execute 
or discard the returned
+   * ExecutableWork. Budget for the returned work is released when the 
execute() call finishes.
+   *
+   * @param handle the handle that was passed to ExecutableWork.executeWorkFn
+   */
+  public Optional<ExecutableWork> pollWork(BoundedQueueExecutorWorkHandle 
handle) {
+    Preconditions.checkArgument(handle instanceof 
BoundedQueueExecutorWorkHandleImpl);
+    BoundedQueueExecutorWorkHandleImpl internalHandle = 
(BoundedQueueExecutorWorkHandleImpl) handle;
+    while (true) {
+      Runnable runnable = executor.getQueue().poll();
+      if (runnable == null) {
+        return Optional.empty();
+      }
+      if (runnable instanceof QueuedWork) {
+        QueuedWork queuedWork = (QueuedWork) runnable;
+        queuedWork.cancelHandle();
+        internalHandle.addBudget(1, queuedWork.getWorkBytes());
+        return Optional.of(queuedWork.getWork());
+      }
+      // Pop and execute standard callbacks immediately on the calling thread 
to drain the queue
+      runnable.run();
     }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `pollWork` method executes any non-`QueuedWork` items (such as callbacks 
submitted via `forceExecute(Runnable)`) directly on the calling thread to 
"drain the queue" while searching for the next `QueuedWork`. This is a 
significant change in behavior, as these tasks were previously always executed 
by the executor's thread pool. If these callbacks are long-running or perform 
blocking operations, they could delay the completion of the current work item 
and the release of its associated budget. While this is necessary to reach 
items further back in the `LinkedBlockingQueue`, it should be monitored for 
potential impact on worker thread latency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to