This is an automated email from the ASF dual-hosted git repository. scwhittle pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 14d25c3da76 Fix flaky streaming dataflow tests (#30572) 14d25c3da76 is described below commit 14d25c3da76e3864e746a1a13e0896e861fab607 Author: martin trieu <marti...@google.com> AuthorDate: Thu Mar 14 02:16:39 2024 -0700 Fix flaky streaming dataflow tests (#30572) * remove waiting/sleeping arbitratily in tests since it is leading to flakiness --- .../dataflow/worker/StreamingDataflowWorker.java | 4 +- .../worker/windmill/state/WindmillStateCache.java | 30 +++++--- .../worker/StreamingModeExecutionContextTest.java | 2 +- .../dataflow/worker/WorkerCustomSourcesTest.java | 3 +- .../windmill/state/WindmillStateCacheTest.java | 2 +- .../windmill/state/WindmillStateInternalsTest.java | 2 +- .../work/budget/GetWorkBudgetRefresherTest.java | 84 ++++++++++++++-------- .../refresh/DispatchedActiveWorkRefresherTest.java | 31 +++++--- 8 files changed, 101 insertions(+), 57 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 7bc186af445..6f1bb0847bc 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -476,7 +476,7 @@ public class StreamingDataflowWorker { computationId -> Optional.ofNullable(computationMap.get(computationId)))), clientId, computationMap, - new WindmillStateCache(options.getWorkerCacheMb()), + WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb()), createWorkUnitExecutor(options), IntrinsicMapTaskExecutorFactory.defaultFactory(), new DataflowWorkUnitClient(options, LOG), @@ -502,7 +502,7 @@ public class StreamingDataflowWorker { Supplier<Instant> clock, Function<String, ScheduledExecutorService> executorSupplier) { BoundedQueueExecutor boundedQueueExecutor = createWorkUnitExecutor(options); - WindmillStateCache stateCache = new WindmillStateCache(options.getWorkerCacheMb()); + WindmillStateCache stateCache = WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb()); computationMap.putAll( createComputationMapForTesting(mapTasks, boundedQueueExecutor, stateCache::forComputation)); return new StreamingDataflowWorker( diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java index ba59d1ae814..0d4e7c6b645 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java @@ -42,7 +42,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Precondit import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Weigher; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.MapMaker; import org.checkerframework.checker.nullness.qual.Nullable; @@ -56,6 +55,7 @@ import org.checkerframework.checker.nullness.qual.Nullable; * thread at a time, so this is safe. */ public class WindmillStateCache implements StatusDataProvider { + private static final int STATE_CACHE_CONCURRENCY_LEVEL = 4; // Convert Megabytes to bytes private static final long MEGABYTES = 1024 * 1024; // Estimate of overhead per StateId. @@ -72,20 +72,28 @@ public class WindmillStateCache implements StatusDataProvider { // Contains the current valid ForKey object. Entries in the cache are keyed by ForKey with pointer // equality so entries may be invalidated by creating a new key object, rendering the previous // entries inaccessible. They will be evicted through normal cache operation. - private final ConcurrentMap<WindmillComputationKey, ForKey> keyIndex = - new MapMaker().weakValues().concurrencyLevel(4).makeMap(); + private final ConcurrentMap<WindmillComputationKey, ForKey> keyIndex; private final long workerCacheBytes; // Copy workerCacheMb and convert to bytes. - public WindmillStateCache(long workerCacheMb) { - final Weigher<Weighted, Weighted> weigher = Weighers.weightedKeysAndValues(); - workerCacheBytes = workerCacheMb * MEGABYTES; - stateCache = + private WindmillStateCache( + long workerCacheMb, + ConcurrentMap<WindmillComputationKey, ForKey> keyIndex, + Cache<StateId, StateCacheEntry> stateCache) { + this.workerCacheBytes = workerCacheMb * MEGABYTES; + this.stateCache = stateCache; + this.keyIndex = keyIndex; + } + + public static WindmillStateCache ofSizeMbs(long workerCacheMb) { + return new WindmillStateCache( + workerCacheMb, + new MapMaker().weakValues().concurrencyLevel(STATE_CACHE_CONCURRENCY_LEVEL).makeMap(), CacheBuilder.newBuilder() - .maximumWeight(workerCacheBytes) + .maximumWeight(workerCacheMb * MEGABYTES) .recordStats() - .weigher(weigher) - .concurrencyLevel(4) - .build(); + .weigher(Weighers.weightedKeysAndValues()) + .concurrencyLevel(STATE_CACHE_CONCURRENCY_LEVEL) + .build()); } private EntryStats calculateEntryStats() { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java index 18f1f4f7119..158fbee3753 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java @@ -107,7 +107,7 @@ public class StreamingModeExecutionContextTest { "computationId", new ReaderCache(Duration.standardMinutes(1), Executors.newCachedThreadPool()), stateNameMap, - new WindmillStateCache(options.getWorkerCacheMb()).forComputation("comp"), + WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb()).forComputation("comp"), StreamingStepMetricsContainer.createRegistry(), new DataflowExecutionStateTracker( ExecutionStateSampler.newForTest(), diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java index 66c10075742..d451ec093f7 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java @@ -948,7 +948,8 @@ public class WorkerCustomSourcesTest { "computationId", new ReaderCache(Duration.standardMinutes(1), Runnable::run), /*stateNameMap=*/ ImmutableMap.of(), - new WindmillStateCache(options.getWorkerCacheMb()).forComputation("computationId"), + WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb()) + .forComputation("computationId"), StreamingStepMetricsContainer.createRegistry(), new DataflowExecutionStateTracker( ExecutionStateSampler.newForTest(), diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java index 26448d38137..1f4355b156b 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java @@ -148,7 +148,7 @@ public class WindmillStateCacheTest { @Before public void setUp() { options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class); - cache = new WindmillStateCache(400); + cache = WindmillStateCache.ofSizeMbs(400); assertEquals(0, cache.getWeight()); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java index 19223124dda..d55a20e5517 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java @@ -206,7 +206,7 @@ public class WindmillStateInternalsTest { public void setUp() { MockitoAnnotations.initMocks(this); options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class); - cache = new WindmillStateCache(options.getWorkerCacheMb()); + cache = WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb()); resetUnderTest(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresherTest.java index 101e111cb65..d3c00606726 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresherTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresherTest.java @@ -17,89 +17,111 @@ */ package org.apache.beam.runners.dataflow.worker.windmill.work.budget; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoInteractions; +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertFalse; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Mockito; @RunWith(JUnit4.class) public class GetWorkBudgetRefresherTest { - @Rule public transient Timeout globalTimeout = Timeout.seconds(600); private static final int WAIT_BUFFER = 10; - private final Runnable redistributeBudget = Mockito.mock(Runnable.class); + @Rule public transient Timeout globalTimeout = Timeout.seconds(600); - private GetWorkBudgetRefresher createBudgetRefresher() { - return createBudgetRefresher(false); + private GetWorkBudgetRefresher createBudgetRefresher(Runnable redistributeBudget) { + return createBudgetRefresher(false, redistributeBudget); } - private GetWorkBudgetRefresher createBudgetRefresher(Boolean isBudgetRefreshPaused) { + private GetWorkBudgetRefresher createBudgetRefresher( + boolean isBudgetRefreshPaused, Runnable redistributeBudget) { return new GetWorkBudgetRefresher(() -> isBudgetRefreshPaused, redistributeBudget); } @Test public void testStop_successfullyTerminates() throws InterruptedException { - GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher(); + CountDownLatch redistributeBudgetLatch = new CountDownLatch(1); + Runnable redistributeBudget = redistributeBudgetLatch::countDown; + GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher(redistributeBudget); budgetRefresher.start(); budgetRefresher.stop(); budgetRefresher.requestBudgetRefresh(); - Thread.sleep(WAIT_BUFFER); - verifyNoInteractions(redistributeBudget); + boolean redistributeBudgetRan = + redistributeBudgetLatch.await(WAIT_BUFFER, TimeUnit.MILLISECONDS); + // Make sure that redistributeBudgetLatch.countDown() is never called. + assertThat(redistributeBudgetLatch.getCount()).isEqualTo(1); + assertFalse(redistributeBudgetRan); } @Test public void testRequestBudgetRefresh_triggersBudgetRefresh() throws InterruptedException { - GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher(); + CountDownLatch redistributeBudgetLatch = new CountDownLatch(1); + Runnable redistributeBudget = redistributeBudgetLatch::countDown; + GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher(redistributeBudget); budgetRefresher.start(); budgetRefresher.requestBudgetRefresh(); - // Wait a bit for redistribute budget to run. - Thread.sleep(WAIT_BUFFER); - verify(redistributeBudget, times(1)).run(); + // Wait for redistribute budget to run. + redistributeBudgetLatch.await(); + assertThat(redistributeBudgetLatch.getCount()).isEqualTo(0); } @Test public void testScheduledBudgetRefresh() throws InterruptedException { - GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher(); + CountDownLatch redistributeBudgetLatch = new CountDownLatch(1); + Runnable redistributeBudget = redistributeBudgetLatch::countDown; + GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher(redistributeBudget); budgetRefresher.start(); - Thread.sleep(GetWorkBudgetRefresher.SCHEDULED_BUDGET_REFRESH_MILLIS + WAIT_BUFFER); - verify(redistributeBudget, times(1)).run(); + // Wait for scheduled redistribute budget to run. + redistributeBudgetLatch.await(); + assertThat(redistributeBudgetLatch.getCount()).isEqualTo(0); } @Test public void testTriggeredAndScheduledBudgetRefresh_concurrent() throws InterruptedException { - GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher(); + CountDownLatch redistributeBudgetLatch = new CountDownLatch(2); + Runnable redistributeBudget = redistributeBudgetLatch::countDown; + GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher(redistributeBudget); budgetRefresher.start(); Thread budgetRefreshTriggerThread = new Thread(budgetRefresher::requestBudgetRefresh); budgetRefreshTriggerThread.start(); - Thread.sleep(GetWorkBudgetRefresher.SCHEDULED_BUDGET_REFRESH_MILLIS + WAIT_BUFFER); budgetRefreshTriggerThread.join(); - - // Wait a bit for redistribute budget to run. - Thread.sleep(WAIT_BUFFER); - verify(redistributeBudget, times(2)).run(); + // Wait for triggered and scheduled redistribute budget to run. + redistributeBudgetLatch.await(); + assertThat(redistributeBudgetLatch.getCount()).isEqualTo(0); } @Test public void testTriggeredBudgetRefresh_doesNotRunWhenBudgetRefreshPaused() throws InterruptedException { - GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher(true); + CountDownLatch redistributeBudgetLatch = new CountDownLatch(1); + Runnable redistributeBudget = redistributeBudgetLatch::countDown; + GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher(true, redistributeBudget); budgetRefresher.start(); budgetRefresher.requestBudgetRefresh(); - Thread.sleep(WAIT_BUFFER); - verifyNoInteractions(redistributeBudget); + boolean redistributeBudgetRan = + redistributeBudgetLatch.await(WAIT_BUFFER, TimeUnit.MILLISECONDS); + // Make sure that redistributeBudgetLatch.countDown() is never called. + assertThat(redistributeBudgetLatch.getCount()).isEqualTo(1); + assertFalse(redistributeBudgetRan); } @Test public void testScheduledBudgetRefresh_doesNotRunWhenBudgetRefreshPaused() throws InterruptedException { - GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher(true); + CountDownLatch redistributeBudgetLatch = new CountDownLatch(1); + Runnable redistributeBudget = redistributeBudgetLatch::countDown; + GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher(true, redistributeBudget); budgetRefresher.start(); - Thread.sleep(GetWorkBudgetRefresher.SCHEDULED_BUDGET_REFRESH_MILLIS + WAIT_BUFFER); - verifyNoInteractions(redistributeBudget); + boolean redistributeBudgetRan = + redistributeBudgetLatch.await( + GetWorkBudgetRefresher.SCHEDULED_BUDGET_REFRESH_MILLIS + WAIT_BUFFER, + TimeUnit.MILLISECONDS); + // Make sure that redistributeBudgetLatch.countDown() is never called. + assertThat(redistributeBudgetLatch.getCount()).isEqualTo(1); + assertFalse(redistributeBudgetRan); } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/DispatchedActiveWorkRefresherTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/DispatchedActiveWorkRefresherTest.java index 48e8dc160fe..31e35404258 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/DispatchedActiveWorkRefresherTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/DispatchedActiveWorkRefresherTest.java @@ -18,10 +18,12 @@ package org.apache.beam.runners.dataflow.worker.windmill.work.refresh; import static com.google.common.truth.Truth.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.after; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import com.google.api.services.dataflow.model.MapTask; @@ -50,7 +52,6 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashBa import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Table; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles; import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Test; @@ -192,11 +193,11 @@ public class DispatchedActiveWorkRefresherTest { } @Test - public void testInvalidateStuckCommits() { + public void testInvalidateStuckCommits() throws InterruptedException { int stuckCommitDurationMillis = 100; Table<ComputationState, Work, WindmillStateCache.ForComputation> computations = HashBasedTable.create(); - WindmillStateCache stateCache = new WindmillStateCache(100); + WindmillStateCache stateCache = WindmillStateCache.ofSizeMbs(100); ByteString key = ByteString.EMPTY; for (int i = 0; i < 5; i++) { WindmillStateCache.ForComputation perComputationStateCache = @@ -209,6 +210,19 @@ public class DispatchedActiveWorkRefresherTest { } TestClock fakeClock = new TestClock(Instant.now()); + CountDownLatch invalidateStuckCommitRan = new CountDownLatch(computations.size()); + + // Count down the latch every time to avoid waiting/sleeping arbitrarily. + for (ComputationState computation : computations.rowKeySet()) { + doAnswer( + invocation -> { + invocation.callRealMethod(); + invalidateStuckCommitRan.countDown(); + return null; + }) + .when(computation) + .invalidateStuckCommits(any(Instant.class)); + } ActiveWorkRefresher activeWorkRefresher = createActiveWorkRefresher( @@ -220,21 +234,20 @@ public class DispatchedActiveWorkRefresherTest { activeWorkRefresher.start(); fakeClock.advance(Duration.millis(stuckCommitDurationMillis)); - Uninterruptibles.sleepUninterruptibly(stuckCommitDurationMillis, TimeUnit.MILLISECONDS); + invalidateStuckCommitRan.await(); + activeWorkRefresher.stop(); for (Table.Cell<ComputationState, Work, WindmillStateCache.ForComputation> cell : computations.cellSet()) { ComputationState computation = cell.getRowKey(); Work work = cell.getColumnKey(); WindmillStateCache.ForComputation perComputationStateCache = cell.getValue(); - verify(perComputationStateCache, after((long) (stuckCommitDurationMillis * 1.5)).times(1)) + verify(perComputationStateCache, times(1)) .invalidate(eq(key), eq(work.getWorkItem().getShardingKey())); - verify(computation, after((long) (stuckCommitDurationMillis * 1.5)).times(1)) + verify(computation, times(1)) .completeWorkAndScheduleNextWorkForKey( eq(ShardedKey.create(key, work.getWorkItem().getShardingKey())), eq(work.id())); } - - activeWorkRefresher.stop(); } static class TestClock implements Clock {