This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e8aea3fb38b [Dataflow Streaming] Reduce contention on work submission 
(#33687)
e8aea3fb38b is described below

commit e8aea3fb38bed383cc9f8ed862c99461e0d5a7c1
Author: Arun Pandian <pandi...@google.com>
AuthorDate: Fri Jan 24 02:36:48 2025 -0800

    [Dataflow Streaming] Reduce contention on work submission (#33687)
    
    * [Dataflow Streaming] Reduce contention on work submission
---
 .../dataflow/worker/util/BoundedQueueExecutor.java | 47 ++++++++++++++++++++--
 1 file changed, 43 insertions(+), 4 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
index 9905c0ae5b5..dc611174b7e 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
@@ -17,10 +17,12 @@
  */
 package org.apache.beam.runners.dataflow.worker.util;
 
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.concurrent.GuardedBy;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Monitor.Guard;
@@ -36,6 +38,9 @@ public class BoundedQueueExecutor {
 
   // Used to guard elementsOutstanding and bytesOutstanding.
   private final Monitor monitor = new Monitor();
+  private final ConcurrentLinkedQueue<Long> decrementQueue = new 
ConcurrentLinkedQueue<>();
+  private final Object decrementQueueDrainLock = new Object();
+  private final AtomicBoolean isDecrementBatchPending = new 
AtomicBoolean(false);
   private int elementsOutstanding = 0;
   private long bytesOutstanding = 0;
 
@@ -236,10 +241,44 @@ public class BoundedQueueExecutor {
   }
 
   private void decrementCounters(long workBytes) {
-    monitor.enter();
-    --elementsOutstanding;
-    bytesOutstanding -= workBytes;
-    monitor.leave();
+    // All threads queue decrements and one thread grabs the monitor and 
updates
+    // counters. We do this to reduce contention on monitor which is locked by
+    // GetWork thread
+    decrementQueue.add(workBytes);
+    boolean submittedToExistingBatch = isDecrementBatchPending.getAndSet(true);
+    if (submittedToExistingBatch) {
+      // There is already a thread about to drain the decrement queue
+      // Current thread does not need to drain.
+      return;
+    }
+    synchronized (decrementQueueDrainLock) {
+      // By setting false here, we may allow another decrement to claim 
submission of the next batch
+      // and start waiting on the decrementQueueDrainLock.
+      //
+      // However this prevents races that would leave decrements in the queue 
and unclaimed and we
+      // are ensured there is at most one additional thread blocked. This 
helps prevent the executor
+      // from creating threads over the limit if many were contending on the 
lock while their
+      // decrements were already applied.
+      isDecrementBatchPending.set(false);
+      long bytesToDecrement = 0;
+      int elementsToDecrement = 0;
+      while (true) {
+        Long pollResult = decrementQueue.poll();
+        if (pollResult == null) {
+          break;
+        }
+        bytesToDecrement += pollResult;
+        ++elementsToDecrement;
+      }
+      if (elementsToDecrement == 0) {
+        return;
+      }
+
+      monitor.enter();
+      elementsOutstanding -= elementsToDecrement;
+      bytesOutstanding -= bytesToDecrement;
+      monitor.leave();
+    }
   }
 
   private long bytesAvailable() {

Reply via email to