This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 966d1fff903 [Dataflow Streaming] BoundedQueueExecutor: Add an
experiment to use fair monitor (#34787)
966d1fff903 is described below
commit 966d1fff90350a863fbdd186517e1ecac16ae58c
Author: Arun Pandian <[email protected]>
AuthorDate: Wed Apr 30 17:21:09 2025 +0000
[Dataflow Streaming] BoundedQueueExecutor: Add an experiment to use fair
monitor (#34787)
---
.../runners/dataflow/worker/StreamingDataflowWorker.java | 10 +++++++++-
.../dataflow/worker/util/BoundedQueueExecutor.java | 6 ++++--
.../dataflow/worker/StreamingDataflowWorkerTest.java | 12 ++++++++----
.../dataflow/worker/util/BoundedQueueExecutorTest.java | 16 +++++++++++++---
.../processing/failures/WorkFailureProcessorTest.java | 3 ++-
.../windmill/work/refresh/ActiveWorkRefresherTest.java | 3 ++-
6 files changed, 38 insertions(+), 12 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index daca54fe158..fa428a6cb8f 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -166,6 +166,9 @@ public final class StreamingDataflowWorker {
private static final String BEAM_FN_API_EXPERIMENT = "beam_fn_api";
private static final String
STREAMING_ENGINE_USE_JOB_SETTINGS_FOR_HEARTBEAT_POOL_EXPERIMENT =
"streaming_engine_use_job_settings_for_heartbeat_pool";
+ // Experiment make the monitor within BoundedQueueExecutor fair
+ public static final String
BOUNDED_QUEUE_EXECUTOR_USE_FAIR_MONITOR_EXPERIMENT =
+ "windmill_bounded_queue_executor_use_fair_monitor";
private final WindmillStateCache stateCache;
private final StreamingWorkerStatusPages statusPages;
@@ -796,13 +799,16 @@ public final class StreamingDataflowWorker {
}
private static BoundedQueueExecutor
createWorkUnitExecutor(DataflowWorkerHarnessOptions options) {
+ boolean useFairMonitor =
+ DataflowRunner.hasExperiment(options,
BOUNDED_QUEUE_EXECUTOR_USE_FAIR_MONITOR_EXPERIMENT);
return new BoundedQueueExecutor(
chooseMaxThreads(options),
THREAD_EXPIRATION_TIME_SEC,
TimeUnit.SECONDS,
chooseMaxBundlesOutstanding(options),
chooseMaxBytesOutstanding(options),
- new
ThreadFactoryBuilder().setNameFormat("DataflowWorkUnits-%d").setDaemon(true).build());
+ new
ThreadFactoryBuilder().setNameFormat("DataflowWorkUnits-%d").setDaemon(true).build(),
+ useFairMonitor);
}
public static void main(String[] args) throws Exception {
@@ -938,6 +944,7 @@ public final class StreamingDataflowWorker {
@FunctionalInterface
private interface StreamingWorkerStatusReporterFactory {
+
StreamingWorkerStatusReporter createStatusReporter(ThrottledTimeTracker
throttledTimeTracker);
}
@@ -961,6 +968,7 @@ public final class StreamingDataflowWorker {
@AutoValue.Builder
abstract static class Builder {
+
abstract Builder setConfigFetcher(ComputationConfig.Fetcher value);
abstract Builder setComputationStateCache(ComputationStateCache value);
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
index dc611174b7e..9079c3cc69b 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutor.java
@@ -37,7 +37,7 @@ public class BoundedQueueExecutor {
private final long maximumBytesOutstanding;
// Used to guard elementsOutstanding and bytesOutstanding.
- private final Monitor monitor = new Monitor();
+ private final Monitor monitor;
private final ConcurrentLinkedQueue<Long> decrementQueue = new
ConcurrentLinkedQueue<>();
private final Object decrementQueueDrainLock = new Object();
private final AtomicBoolean isDecrementBatchPending = new
AtomicBoolean(false);
@@ -65,8 +65,10 @@ public class BoundedQueueExecutor {
TimeUnit unit,
int maximumElementsOutstanding,
long maximumBytesOutstanding,
- ThreadFactory threadFactory) {
+ ThreadFactory threadFactory,
+ boolean useFairMonitor) {
this.maximumPoolSize = initialMaximumPoolSize;
+ monitor = new Monitor(useFairMonitor);
executor =
new ThreadPoolExecutor(
initialMaximumPoolSize,
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index c60228f336e..0cf1bb330da 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -2946,7 +2946,8 @@ public class StreamingDataflowWorkerTest {
new ThreadFactoryBuilder()
.setNameFormat("DataflowWorkUnits-%d")
.setDaemon(true)
- .build());
+ .build(),
+ /*useFairMonitor=*/ false);
ComputationState computationState =
new ComputationState(
@@ -3006,7 +3007,8 @@ public class StreamingDataflowWorkerTest {
new ThreadFactoryBuilder()
.setNameFormat("DataflowWorkUnits-%d")
.setDaemon(true)
- .build());
+ .build(),
+ /*useFairMonitor=*/ false);
ComputationState computationState =
new ComputationState(
@@ -3075,7 +3077,8 @@ public class StreamingDataflowWorkerTest {
new ThreadFactoryBuilder()
.setNameFormat("DataflowWorkUnits-%d")
.setDaemon(true)
- .build());
+ .build(),
+ /*useFairMonitor=*/ false);
ComputationState computationState =
new ComputationState(
@@ -3148,7 +3151,8 @@ public class StreamingDataflowWorkerTest {
new ThreadFactoryBuilder()
.setNameFormat("DataflowWorkUnits-%d")
.setDaemon(true)
- .build());
+ .build(),
+ /*useFairMonitor=*/ false);
ComputationState computationState =
new ComputationState(
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
index 9fce8cbdfa9..ca8bf9dba5d 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
@@ -24,6 +24,8 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -41,15 +43,22 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
/** Unit tests for {@link
org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor}. */
-@RunWith(JUnit4.class)
+@RunWith(Parameterized.class)
// TODO(https://github.com/apache/beam/issues/21230): Remove when new version
of errorprone is
// released (2.11.0)
@SuppressWarnings("unused")
public class BoundedQueueExecutorTest {
+ @Parameterized.Parameter public boolean useFairMonitor;
+
+ @Parameterized.Parameters(name = "useFairMonitor = {0}")
+ public static Collection<Object[]> useFairMonitor() {
+ return Arrays.asList(new Object[][] {{false}, {true}});
+ }
+
private static final long MAXIMUM_BYTES_OUTSTANDING = 10000000;
private static final int DEFAULT_MAX_THREADS = 2;
private static final int DEFAULT_THREAD_EXPIRATION_SEC = 60;
@@ -101,7 +110,8 @@ public class BoundedQueueExecutorTest {
new ThreadFactoryBuilder()
.setNameFormat("DataflowWorkUnits-%d")
.setDaemon(true)
- .build());
+ .build(),
+ useFairMonitor);
}
@Test
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java
index 77f3dcfeb1c..f55549f7e2d 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java
@@ -62,7 +62,8 @@ public class WorkFailureProcessorTest {
new ThreadFactoryBuilder()
.setNameFormat("DataflowWorkUnits-%d")
.setDaemon(true)
- .build());
+ .build(),
+ /*useFairMonitor=*/ false);
return WorkFailureProcessor.forTesting(workExecutor, failureTracker,
Optional::empty, clock, 0);
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java
index 86980558208..115deccf6df 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java
@@ -77,7 +77,8 @@ public class ActiveWorkRefresherTest {
TimeUnit.SECONDS,
1,
10000000,
- new
ThreadFactoryBuilder().setNameFormat("DataflowWorkUnits-%d").setDaemon(true).build());
+ new
ThreadFactoryBuilder().setNameFormat("DataflowWorkUnits-%d").setDaemon(true).build(),
+ /*useFairMonitor=*/ false);
}
private static ComputationState createComputationState(int
computationIdSuffix) {