This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 966d1fff903 [Dataflow Streaming] BoundedQueueExecutor: Add an 
experiment to use fair monitor (#34787)
966d1fff903 is described below

commit 966d1fff90350a863fbdd186517e1ecac16ae58c
Author: Arun Pandian <[email protected]>
AuthorDate: Wed Apr 30 17:21:09 2025 +0000

    [Dataflow Streaming] BoundedQueueExecutor: Add an experiment to use fair 
monitor (#34787)
---
 .../runners/dataflow/worker/StreamingDataflowWorker.java | 10 +++++++++-
 .../dataflow/worker/util/BoundedQueueExecutor.java       |  6 ++++--
 .../dataflow/worker/StreamingDataflowWorkerTest.java     | 12 ++++++++----
 .../dataflow/worker/util/BoundedQueueExecutorTest.java   | 16 +++++++++++++---
 .../processing/failures/WorkFailureProcessorTest.java    |  3 ++-
 .../windmill/work/refresh/ActiveWorkRefresherTest.java   |  3 ++-
 6 files changed, 38 insertions(+), 12 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index daca54fe158..fa428a6cb8f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -166,6 +166,9 @@ public final class StreamingDataflowWorker {
   private static final String BEAM_FN_API_EXPERIMENT = "beam_fn_api";
   private static final String 
STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL_EXPERIMENT =
       "streaming_engine_use_job_settings_for_heartbeat_pool";
+  // Experiment make the monitor within BoundedQueueExecutor fair
+  public static final String 
BOUNDED_QUEUE_EXECUTOR_USE_FAIR_MONITOR_EXPERIMENT =
+      "windmill_bounded_queue_executor_use_fair_monitor";
 
   private final WindmillStateCache stateCache;
   private final StreamingWorkerStatusPages statusPages;
@@ -796,13 +799,16 @@ public final class StreamingDataflowWorker {
   }
 
   private static BoundedQueueExecutor 
createWorkUnitExecutor(DataflowWorkerHarnessOptions options) {
+    boolean useFairMonitor =
+        DataflowRunner.hasExperiment(options, 
BOUNDED_QUEUE_EXECUTOR_USE_FAIR_MONITOR_EXPERIMENT);
     return new BoundedQueueExecutor(
         chooseMaxThreads(options),
         THREAD_EXPIRATION_TIME_SEC,
         TimeUnit.SECONDS,
         chooseMaxBundlesOutstanding(options),
         chooseMaxBytesOutstanding(options),
-        new 
ThreadFactoryBuilder().setNameFormat("DataflowWorkUnits-%d").setDaemon(true).build());
+        new 
ThreadFactoryBuilder().setNameFormat("DataflowWorkUnits-%d").setDaemon(true).build(),
+        useFairMonitor);
   }
 
   public static void main(String[] args) throws Exception {
@@ -938,6 +944,7 @@ public final class StreamingDataflowWorker {
 
   @FunctionalInterface
   private interface StreamingWorkerStatusReporterFactory {
+
     StreamingWorkerStatusReporter createStatusReporter(ThrottledTimeTracker 
throttledTimeTracker);
   }
 
@@ -961,6 +968,7 @@ public final class StreamingDataflowWorker {
 
     @AutoValue.Builder
     abstract static class Builder {
+
       abstract Builder setConfigFetcher(ComputationConfig.Fetcher value);
 
       abstract Builder setComputationStateCache(ComputationStateCache value);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
index dc611174b7e..9079c3cc69b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
@@ -37,7 +37,7 @@ public class BoundedQueueExecutor {
   private final long maximumBytesOutstanding;
 
   // Used to guard elementsOutstanding and bytesOutstanding.
-  private final Monitor monitor = new Monitor();
+  private final Monitor monitor;
   private final ConcurrentLinkedQueue<Long> decrementQueue = new 
ConcurrentLinkedQueue<>();
   private final Object decrementQueueDrainLock = new Object();
   private final AtomicBoolean isDecrementBatchPending = new 
AtomicBoolean(false);
@@ -65,8 +65,10 @@ public class BoundedQueueExecutor {
       TimeUnit unit,
       int maximumElementsOutstanding,
       long maximumBytesOutstanding,
-      ThreadFactory threadFactory) {
+      ThreadFactory threadFactory,
+      boolean useFairMonitor) {
     this.maximumPoolSize = initialMaximumPoolSize;
+    monitor = new Monitor(useFairMonitor);
     executor =
         new ThreadPoolExecutor(
             initialMaximumPoolSize,
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index c60228f336e..0cf1bb330da 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -2946,7 +2946,8 @@ public class StreamingDataflowWorkerTest {
             new ThreadFactoryBuilder()
                 .setNameFormat("DataflowWorkUnits-%d")
                 .setDaemon(true)
-                .build());
+                .build(),
+            /*useFairMonitor=*/ false);
 
     ComputationState computationState =
         new ComputationState(
@@ -3006,7 +3007,8 @@ public class StreamingDataflowWorkerTest {
             new ThreadFactoryBuilder()
                 .setNameFormat("DataflowWorkUnits-%d")
                 .setDaemon(true)
-                .build());
+                .build(),
+            /*useFairMonitor=*/ false);
 
     ComputationState computationState =
         new ComputationState(
@@ -3075,7 +3077,8 @@ public class StreamingDataflowWorkerTest {
             new ThreadFactoryBuilder()
                 .setNameFormat("DataflowWorkUnits-%d")
                 .setDaemon(true)
-                .build());
+                .build(),
+            /*useFairMonitor=*/ false);
 
     ComputationState computationState =
         new ComputationState(
@@ -3148,7 +3151,8 @@ public class StreamingDataflowWorkerTest {
             new ThreadFactoryBuilder()
                 .setNameFormat("DataflowWorkUnits-%d")
                 .setDaemon(true)
-                .build());
+                .build(),
+            /*useFairMonitor=*/ false);
 
     ComputationState computationState =
         new ComputationState(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
index 9fce8cbdfa9..ca8bf9dba5d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
@@ -41,15 +43,22 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
 
 /** Unit tests for {@link 
org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor}. */
-@RunWith(JUnit4.class)
+@RunWith(Parameterized.class)
 // TODO(https://github.com/apache/beam/issues/21230): Remove when new version 
of errorprone is
 // released (2.11.0)
 @SuppressWarnings("unused")
 public class BoundedQueueExecutorTest {
 
+  @Parameterized.Parameter public boolean useFairMonitor;
+
+  @Parameterized.Parameters(name = "useFairMonitor = {0}")
+  public static Collection<Object[]> useFairMonitor() {
+    return Arrays.asList(new Object[][] {{false}, {true}});
+  }
+
   private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
   private static final int DEFAULT_MAX_THREADS = 2;
   private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
@@ -101,7 +110,8 @@ public class BoundedQueueExecutorTest {
             new ThreadFactoryBuilder()
                 .setNameFormat("DataflowWorkUnits-%d")
                 .setDaemon(true)
-                .build());
+                .build(),
+            useFairMonitor);
   }
 
   @Test
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java
index 77f3dcfeb1c..f55549f7e2d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java
@@ -62,7 +62,8 @@ public class WorkFailureProcessorTest {
             new ThreadFactoryBuilder()
                 .setNameFormat("DataflowWorkUnits-%d")
                 .setDaemon(true)
-                .build());
+                .build(),
+            /*useFairMonitor=*/ false);
 
     return WorkFailureProcessor.forTesting(workExecutor, failureTracker, 
Optional::empty, clock, 0);
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java
index 86980558208..115deccf6df 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java
@@ -77,7 +77,8 @@ public class ActiveWorkRefresherTest {
         TimeUnit.SECONDS,
         1,
         10000000,
-        new 
ThreadFactoryBuilder().setNameFormat("DataflowWorkUnits-%d").setDaemon(true).build());
+        new 
ThreadFactoryBuilder().setNameFormat("DataflowWorkUnits-%d").setDaemon(true).build(),
+        /*useFairMonitor=*/ false);
   }
 
   private static ComputationState createComputationState(int 
computationIdSuffix) {

Reply via email to