This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 600bd612dc2 [Dataflow Streaming] Remove GetWorkBudgetRefresher which
is unused and has a flaky test. (#37317)
600bd612dc2 is described below
commit 600bd612dc2ef495fef08675398361159f7a03df
Author: Sam Whittle <[email protected]>
AuthorDate: Mon Jan 19 11:27:56 2026 +0100
[Dataflow Streaming] Remove GetWorkBudgetRefresher which is unused and has
a flaky test. (#37317)
---
.../work/budget/GetWorkBudgetRefresher.java | 133 ---------------------
.../work/budget/GetWorkBudgetRefresherTest.java | 127 --------------------
2 files changed, 260 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java
deleted file mode 100644
index d81c7d0593f..00000000000
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresher.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.windmill.work.budget;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.function.Supplier;
-import javax.annotation.concurrent.ThreadSafe;
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.fn.stream.AdvancingPhaser;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Handles refreshing the budget either via triggered or scheduled execution
using a {@link
- * java.util.concurrent.Phaser} to emulate publish/subscribe pattern.
- */
-@Internal
-@ThreadSafe
-public final class GetWorkBudgetRefresher {
- @VisibleForTesting public static final int SCHEDULED_BUDGET_REFRESH_MILLIS =
100;
- private static final int INITIAL_BUDGET_REFRESH_PHASE = 0;
- private static final String BUDGET_REFRESH_THREAD =
"GetWorkBudgetRefreshThread";
- private static final Logger LOG =
LoggerFactory.getLogger(GetWorkBudgetRefresher.class);
-
- private final AdvancingPhaser budgetRefreshTrigger;
- private final ExecutorService budgetRefreshExecutor;
- private final Supplier<Boolean> isBudgetRefreshPaused;
- private final Runnable redistributeBudget;
-
- public GetWorkBudgetRefresher(
- Supplier<Boolean> isBudgetRefreshPaused, Runnable redistributeBudget) {
- this.budgetRefreshTrigger = new AdvancingPhaser(1);
- this.budgetRefreshExecutor =
- Executors.newSingleThreadExecutor(
- new ThreadFactoryBuilder()
- .setNameFormat(BUDGET_REFRESH_THREAD)
- .setUncaughtExceptionHandler(
- (t, e) ->
- LOG.error(
- "{} failed due to uncaught exception during
execution. ",
- t.getName(),
- e))
- .build());
- this.isBudgetRefreshPaused = isBudgetRefreshPaused;
- this.redistributeBudget = redistributeBudget;
- }
-
- @SuppressWarnings("FutureReturnValueIgnored")
- public void start() {
- budgetRefreshExecutor.submit(this::subscribeToRefreshBudget);
- }
-
- /** Publishes an event to trigger a budget refresh. */
- public void requestBudgetRefresh() {
- budgetRefreshTrigger.arrive();
- }
-
- public void stop() {
- budgetRefreshTrigger.arriveAndDeregister();
- // Put the budgetRefreshTrigger in a terminated state,
#waitForBudgetRefreshEventWithTimeout
- // will subsequently return false, and #subscribeToRefreshBudget will
return, completing the
- // task.
- budgetRefreshTrigger.forceTermination();
- budgetRefreshExecutor.shutdownNow();
- }
-
- private void subscribeToRefreshBudget() {
- int currentBudgetRefreshPhase = INITIAL_BUDGET_REFRESH_PHASE;
- // Runs forever until #stop is called.
- while (true) {
- currentBudgetRefreshPhase =
waitForBudgetRefreshEventWithTimeout(currentBudgetRefreshPhase);
- // Phaser.awaitAdvanceInterruptibly(...) returns a negative value if the
phaser is
- // terminated, else returns when either a budget refresh has been
manually triggered or
- // SCHEDULED_BUDGET_REFRESH_MILLIS have passed.
- if (currentBudgetRefreshPhase < 0) {
- return;
- }
- // Budget refreshes are paused during endpoint updates.
- if (!isBudgetRefreshPaused.get()) {
- redistributeBudget.run();
- }
- }
- }
-
- /**
- * Waits for a budget refresh trigger event with a timeout. Returns the
current phase of the
- * {@link #budgetRefreshTrigger}, to be used for following waits for the
{@link
- * #budgetRefreshTrigger} to advance.
- *
- * <p>Budget refresh event is triggered when {@link #budgetRefreshTrigger}
moves on from the given
- * currentBudgetRefreshPhase.
- */
- private int waitForBudgetRefreshEventWithTimeout(int
currentBudgetRefreshPhase) {
- try {
- // Wait for budgetRefreshTrigger to advance FROM the current phase.
- return budgetRefreshTrigger.awaitAdvanceInterruptibly(
- currentBudgetRefreshPhase, SCHEDULED_BUDGET_REFRESH_MILLIS,
TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new BudgetRefreshException("Error occurred waiting for budget
refresh.", e);
- } catch (TimeoutException ignored) {
- // Intentionally do nothing since we trigger the budget refresh on the
timeout.
- }
-
- return currentBudgetRefreshPhase;
- }
-
- private static class BudgetRefreshException extends RuntimeException {
- private BudgetRefreshException(String msg, Throwable sourceException) {
- super(msg, sourceException);
- }
- }
-}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresherTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresherTest.java
deleted file mode 100644
index d3c00606726..00000000000
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresherTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.dataflow.worker.windmill.work.budget;
-
-import static com.google.common.truth.Truth.assertThat;
-import static org.junit.Assert.assertFalse;
-
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.Timeout;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-@RunWith(JUnit4.class)
-public class GetWorkBudgetRefresherTest {
- private static final int WAIT_BUFFER = 10;
- @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
-
- private GetWorkBudgetRefresher createBudgetRefresher(Runnable
redistributeBudget) {
- return createBudgetRefresher(false, redistributeBudget);
- }
-
- private GetWorkBudgetRefresher createBudgetRefresher(
- boolean isBudgetRefreshPaused, Runnable redistributeBudget) {
- return new GetWorkBudgetRefresher(() -> isBudgetRefreshPaused,
redistributeBudget);
- }
-
- @Test
- public void testStop_successfullyTerminates() throws InterruptedException {
- CountDownLatch redistributeBudgetLatch = new CountDownLatch(1);
- Runnable redistributeBudget = redistributeBudgetLatch::countDown;
- GetWorkBudgetRefresher budgetRefresher =
createBudgetRefresher(redistributeBudget);
- budgetRefresher.start();
- budgetRefresher.stop();
- budgetRefresher.requestBudgetRefresh();
- boolean redistributeBudgetRan =
- redistributeBudgetLatch.await(WAIT_BUFFER, TimeUnit.MILLISECONDS);
- // Make sure that redistributeBudgetLatch.countDown() is never called.
- assertThat(redistributeBudgetLatch.getCount()).isEqualTo(1);
- assertFalse(redistributeBudgetRan);
- }
-
- @Test
- public void testRequestBudgetRefresh_triggersBudgetRefresh() throws
InterruptedException {
- CountDownLatch redistributeBudgetLatch = new CountDownLatch(1);
- Runnable redistributeBudget = redistributeBudgetLatch::countDown;
- GetWorkBudgetRefresher budgetRefresher =
createBudgetRefresher(redistributeBudget);
- budgetRefresher.start();
- budgetRefresher.requestBudgetRefresh();
- // Wait for redistribute budget to run.
- redistributeBudgetLatch.await();
- assertThat(redistributeBudgetLatch.getCount()).isEqualTo(0);
- }
-
- @Test
- public void testScheduledBudgetRefresh() throws InterruptedException {
- CountDownLatch redistributeBudgetLatch = new CountDownLatch(1);
- Runnable redistributeBudget = redistributeBudgetLatch::countDown;
- GetWorkBudgetRefresher budgetRefresher =
createBudgetRefresher(redistributeBudget);
- budgetRefresher.start();
- // Wait for scheduled redistribute budget to run.
- redistributeBudgetLatch.await();
- assertThat(redistributeBudgetLatch.getCount()).isEqualTo(0);
- }
-
- @Test
- public void testTriggeredAndScheduledBudgetRefresh_concurrent() throws
InterruptedException {
- CountDownLatch redistributeBudgetLatch = new CountDownLatch(2);
- Runnable redistributeBudget = redistributeBudgetLatch::countDown;
- GetWorkBudgetRefresher budgetRefresher =
createBudgetRefresher(redistributeBudget);
- budgetRefresher.start();
- Thread budgetRefreshTriggerThread = new
Thread(budgetRefresher::requestBudgetRefresh);
- budgetRefreshTriggerThread.start();
- budgetRefreshTriggerThread.join();
- // Wait for triggered and scheduled redistribute budget to run.
- redistributeBudgetLatch.await();
- assertThat(redistributeBudgetLatch.getCount()).isEqualTo(0);
- }
-
- @Test
- public void testTriggeredBudgetRefresh_doesNotRunWhenBudgetRefreshPaused()
- throws InterruptedException {
- CountDownLatch redistributeBudgetLatch = new CountDownLatch(1);
- Runnable redistributeBudget = redistributeBudgetLatch::countDown;
- GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher(true,
redistributeBudget);
- budgetRefresher.start();
- budgetRefresher.requestBudgetRefresh();
- boolean redistributeBudgetRan =
- redistributeBudgetLatch.await(WAIT_BUFFER, TimeUnit.MILLISECONDS);
- // Make sure that redistributeBudgetLatch.countDown() is never called.
- assertThat(redistributeBudgetLatch.getCount()).isEqualTo(1);
- assertFalse(redistributeBudgetRan);
- }
-
- @Test
- public void testScheduledBudgetRefresh_doesNotRunWhenBudgetRefreshPaused()
- throws InterruptedException {
- CountDownLatch redistributeBudgetLatch = new CountDownLatch(1);
- Runnable redistributeBudget = redistributeBudgetLatch::countDown;
- GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher(true,
redistributeBudget);
- budgetRefresher.start();
- boolean redistributeBudgetRan =
- redistributeBudgetLatch.await(
- GetWorkBudgetRefresher.SCHEDULED_BUDGET_REFRESH_MILLIS +
WAIT_BUFFER,
- TimeUnit.MILLISECONDS);
- // Make sure that redistributeBudgetLatch.countDown() is never called.
- assertThat(redistributeBudgetLatch.getCount()).isEqualTo(1);
- assertFalse(redistributeBudgetRan);
- }
-}