This is an automated email from the ASF dual-hosted git repository. scwhittle pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new e8aea3fb38b [Dataflow Streaming] Reduce contention on work submission (#33687) e8aea3fb38b is described below commit e8aea3fb38bed383cc9f8ed862c99461e0d5a7c1 Author: Arun Pandian <pandi...@google.com> AuthorDate: Fri Jan 24 02:36:48 2025 -0800 [Dataflow Streaming] Reduce contention on work submission (#33687) * [Dataflow Streaming] Reduce contention on work submission --- .../dataflow/worker/util/BoundedQueueExecutor.java | 47 ++++++++++++++++++++-- 1 file changed, 43 insertions(+), 4 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java index 9905c0ae5b5..dc611174b7e 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java @@ -17,10 +17,12 @@ */ package org.apache.beam.runners.dataflow.worker.util; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.concurrent.GuardedBy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard; @@ -36,6 +38,9 @@ public class BoundedQueueExecutor { // Used to guard elementsOutstanding and bytesOutstanding. private final Monitor monitor = new Monitor(); + private final ConcurrentLinkedQueue<Long> decrementQueue = new ConcurrentLinkedQueue<>(); + private final Object decrementQueueDrainLock = new Object(); + private final AtomicBoolean isDecrementBatchPending = new AtomicBoolean(false); private int elementsOutstanding = 0; private long bytesOutstanding = 0; @@ -236,10 +241,44 @@ public class BoundedQueueExecutor { } private void decrementCounters(long workBytes) { - monitor.enter(); - --elementsOutstanding; - bytesOutstanding -= workBytes; - monitor.leave(); + // All threads queue decrements and one thread grabs the monitor and updates + // counters. We do this to reduce contention on monitor which is locked by + // GetWork thread + decrementQueue.add(workBytes); + boolean submittedToExistingBatch = isDecrementBatchPending.getAndSet(true); + if (submittedToExistingBatch) { + // There is already a thread about to drain the decrement queue + // Current thread does not need to drain. + return; + } + synchronized (decrementQueueDrainLock) { + // By setting false here, we may allow another decrement to claim submission of the next batch + // and start waiting on the decrementQueueDrainLock. + // + // However this prevents races that would leave decrements in the queue and unclaimed and we + // are ensured there is at most one additional thread blocked. This helps prevent the executor + // from creating threads over the limit if many were contending on the lock while their + // decrements were already applied. + isDecrementBatchPending.set(false); + long bytesToDecrement = 0; + int elementsToDecrement = 0; + while (true) { + Long pollResult = decrementQueue.poll(); + if (pollResult == null) { + break; + } + bytesToDecrement += pollResult; + ++elementsToDecrement; + } + if (elementsToDecrement == 0) { + return; + } + + monitor.enter(); + elementsOutstanding -= elementsToDecrement; + bytesOutstanding -= bytesToDecrement; + monitor.leave(); + } } private long bytesAvailable() {