arunpandianp commented on code in PR #38592:
URL: https://github.com/apache/beam/pull/38592#discussion_r3291678760
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -232,21 +334,48 @@ private void executeMonitorHeld(Runnable work, long
workBytes) {
try {
work.run();
} finally {
- decrementCounters(workBytes);
+ decrementCounters(1, 0L);
}
});
- } catch (RuntimeException e) {
- // If the execute() call threw an exception, decrement counters here.
- decrementCounters(workBytes);
- throw e;
+ } catch (Throwable e) {
+ decrementCounters(1, 0L);
+ throw ExceptionUtils.propagate(e);
+ }
+ }
+
+ @VisibleForTesting
+ BoundedQueueExecutorWorkHandleImpl createEmptyBudgetHandle() {
+ return new BoundedQueueExecutorWorkHandleImpl(0, 0L);
+ }
+
+ /**
+ * Poll additional work to be executed inline inside with the current
execute(ExecutableWork work,
+ * long workBytes) call. It is the responsibility of the caller to execute
or discard the returned
+ * ExecutableWork. Budget for the returned work is released when the
execute() call finishes.
+ *
+ * @param handle the handle that was passed to ExecutableWork.executeWorkFn
+ */
+ public Optional<ExecutableWork> pollWork(BoundedQueueExecutorWorkHandle
handle) {
+ Preconditions.checkArgument(handle instanceof
BoundedQueueExecutorWorkHandleImpl);
+ BoundedQueueExecutorWorkHandleImpl internalHandle =
(BoundedQueueExecutorWorkHandleImpl) handle;
+ while (true) {
+ Runnable runnable = executor.getQueue().poll();
+ if (runnable == null) {
+ return Optional.empty();
+ }
+ if (runnable instanceof QueuedWork) {
+ QueuedWork queuedWork = (QueuedWork) runnable;
+ queuedWork.cancelHandle();
+ internalHandle.addBudget(1, queuedWork.getWorkBytes());
+ return Optional.of(queuedWork.getWork());
+ }
+ // Pop and execute standard callbacks immediately on the calling thread
to drain the queue
+ runnable.run();
}
Review Comment:
The getter is unused now. Planning to improve the logic before using it.
This change is just setting up the Handles and getters.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]