scwhittle commented on code in PR #33687:
URL: https://github.com/apache/beam/pull/33687#discussion_r1928385055
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -36,6 +38,9 @@ public class BoundedQueueExecutor {
// Used to guard elementsOutstanding and bytesOutstanding.
private final Monitor monitor = new Monitor();
+ private final ConcurrentLinkedQueue<Long> decrementQueue = new
ConcurrentLinkedQueue<>();
+ private final Object decrementQueueDrainLock = new Object();
+ private final AtomicBoolean decrementBatchOpen = new AtomicBoolean(false);
Review Comment:
I read decrementBatchOpen as "decrement batchopen" at first and it was
confusing.
Maybe "isDecrementBatchPending" or something to clarify?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java:
##########
@@ -236,10 +241,37 @@ private void executeMonitorHeld(Runnable work, long
workBytes) {
}
private void decrementCounters(long workBytes) {
- monitor.enter();
- --elementsOutstanding;
- bytesOutstanding -= workBytes;
- monitor.leave();
+ // All threads queue decrements and one thread grabs the monitor and
updates
+ // counters. We do this to reduce contention on monitor which is locked by
+ // GetWork thread
+ decrementQueue.add(workBytes);
+ boolean submittedToExistingBatch = decrementBatchOpen.getAndSet(true);
+ if (submittedToExistingBatch) {
+ // There is already a thread about to drain the decrement queue
+ // Current thread does not need to drain.
+ return;
+ }
+ synchronized (decrementQueueDrainLock) {
+ decrementBatchOpen.set(false);
Review Comment:
Since it's a little subtle how about a comment.
// By setting false here, we may allow another decrement to claim submission
of the next batch and start waiting on the decrementQueueDrainLock.
// However this prevents races that would leave decrements in the queue and
unclaimed and we are ensured there is at most one additional thread blocked.
This helps prevent the executor from creating threads over the limit if many
were contending on the lock while their decrements were already applied.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]