This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 14d25c3da76 Fix flaky streaming dataflow tests (#30572)
14d25c3da76 is described below

commit 14d25c3da76e3864e746a1a13e0896e861fab607
Author: martin trieu <marti...@google.com>
AuthorDate: Thu Mar 14 02:16:39 2024 -0700

    Fix flaky streaming dataflow tests (#30572)
    
    * remove waiting/sleeping arbitratily in tests since it is leading to 
flakiness
---
 .../dataflow/worker/StreamingDataflowWorker.java   |  4 +-
 .../worker/windmill/state/WindmillStateCache.java  | 30 +++++---
 .../worker/StreamingModeExecutionContextTest.java  |  2 +-
 .../dataflow/worker/WorkerCustomSourcesTest.java   |  3 +-
 .../windmill/state/WindmillStateCacheTest.java     |  2 +-
 .../windmill/state/WindmillStateInternalsTest.java |  2 +-
 .../work/budget/GetWorkBudgetRefresherTest.java    | 84 ++++++++++++++--------
 .../refresh/DispatchedActiveWorkRefresherTest.java | 31 +++++---
 8 files changed, 101 insertions(+), 57 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 7bc186af445..6f1bb0847bc 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -476,7 +476,7 @@ public class StreamingDataflowWorker {
                 computationId -> 
Optional.ofNullable(computationMap.get(computationId)))),
         clientId,
         computationMap,
-        new WindmillStateCache(options.getWorkerCacheMb()),
+        WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb()),
         createWorkUnitExecutor(options),
         IntrinsicMapTaskExecutorFactory.defaultFactory(),
         new DataflowWorkUnitClient(options, LOG),
@@ -502,7 +502,7 @@ public class StreamingDataflowWorker {
       Supplier<Instant> clock,
       Function<String, ScheduledExecutorService> executorSupplier) {
     BoundedQueueExecutor boundedQueueExecutor = 
createWorkUnitExecutor(options);
-    WindmillStateCache stateCache = new 
WindmillStateCache(options.getWorkerCacheMb());
+    WindmillStateCache stateCache = 
WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb());
     computationMap.putAll(
         createComputationMapForTesting(mapTasks, boundedQueueExecutor, 
stateCache::forComputation));
     return new StreamingDataflowWorker(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
index ba59d1ae814..0d4e7c6b645 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCache.java
@@ -42,7 +42,6 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Precondit
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheStats;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Weigher;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.MapMaker;
 import org.checkerframework.checker.nullness.qual.Nullable;
 
@@ -56,6 +55,7 @@ import org.checkerframework.checker.nullness.qual.Nullable;
  * thread at a time, so this is safe.
  */
 public class WindmillStateCache implements StatusDataProvider {
+  private static final int STATE_CACHE_CONCURRENCY_LEVEL = 4;
   // Convert Megabytes to bytes
   private static final long MEGABYTES = 1024 * 1024;
   // Estimate of overhead per StateId.
@@ -72,20 +72,28 @@ public class WindmillStateCache implements 
StatusDataProvider {
   // Contains the current valid ForKey object. Entries in the cache are keyed 
by ForKey with pointer
   // equality so entries may be invalidated by creating a new key object, 
rendering the previous
   // entries inaccessible. They will be evicted through normal cache operation.
-  private final ConcurrentMap<WindmillComputationKey, ForKey> keyIndex =
-      new MapMaker().weakValues().concurrencyLevel(4).makeMap();
+  private final ConcurrentMap<WindmillComputationKey, ForKey> keyIndex;
   private final long workerCacheBytes; // Copy workerCacheMb and convert to 
bytes.
 
-  public WindmillStateCache(long workerCacheMb) {
-    final Weigher<Weighted, Weighted> weigher = 
Weighers.weightedKeysAndValues();
-    workerCacheBytes = workerCacheMb * MEGABYTES;
-    stateCache =
+  private WindmillStateCache(
+      long workerCacheMb,
+      ConcurrentMap<WindmillComputationKey, ForKey> keyIndex,
+      Cache<StateId, StateCacheEntry> stateCache) {
+    this.workerCacheBytes = workerCacheMb * MEGABYTES;
+    this.stateCache = stateCache;
+    this.keyIndex = keyIndex;
+  }
+
+  public static WindmillStateCache ofSizeMbs(long workerCacheMb) {
+    return new WindmillStateCache(
+        workerCacheMb,
+        new 
MapMaker().weakValues().concurrencyLevel(STATE_CACHE_CONCURRENCY_LEVEL).makeMap(),
         CacheBuilder.newBuilder()
-            .maximumWeight(workerCacheBytes)
+            .maximumWeight(workerCacheMb * MEGABYTES)
             .recordStats()
-            .weigher(weigher)
-            .concurrencyLevel(4)
-            .build();
+            .weigher(Weighers.weightedKeysAndValues())
+            .concurrencyLevel(STATE_CACHE_CONCURRENCY_LEVEL)
+            .build());
   }
 
   private EntryStats calculateEntryStats() {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
index 18f1f4f7119..158fbee3753 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
@@ -107,7 +107,7 @@ public class StreamingModeExecutionContextTest {
             "computationId",
             new ReaderCache(Duration.standardMinutes(1), 
Executors.newCachedThreadPool()),
             stateNameMap,
-            new 
WindmillStateCache(options.getWorkerCacheMb()).forComputation("comp"),
+            
WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb()).forComputation("comp"),
             StreamingStepMetricsContainer.createRegistry(),
             new DataflowExecutionStateTracker(
                 ExecutionStateSampler.newForTest(),
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
index 66c10075742..d451ec093f7 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
@@ -948,7 +948,8 @@ public class WorkerCustomSourcesTest {
             "computationId",
             new ReaderCache(Duration.standardMinutes(1), Runnable::run),
             /*stateNameMap=*/ ImmutableMap.of(),
-            new 
WindmillStateCache(options.getWorkerCacheMb()).forComputation("computationId"),
+            WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb())
+                .forComputation("computationId"),
             StreamingStepMetricsContainer.createRegistry(),
             new DataflowExecutionStateTracker(
                 ExecutionStateSampler.newForTest(),
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
index 26448d38137..1f4355b156b 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateCacheTest.java
@@ -148,7 +148,7 @@ public class WindmillStateCacheTest {
   @Before
   public void setUp() {
     options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
-    cache = new WindmillStateCache(400);
+    cache = WindmillStateCache.ofSizeMbs(400);
     assertEquals(0, cache.getWeight());
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
index 19223124dda..d55a20e5517 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
@@ -206,7 +206,7 @@ public class WindmillStateInternalsTest {
   public void setUp() {
     MockitoAnnotations.initMocks(this);
     options = PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class);
-    cache = new WindmillStateCache(options.getWorkerCacheMb());
+    cache = WindmillStateCache.ofSizeMbs(options.getWorkerCacheMb());
     resetUnderTest();
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresherTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresherTest.java
index 101e111cb65..d3c00606726 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresherTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/budget/GetWorkBudgetRefresherTest.java
@@ -17,89 +17,111 @@
  */
 package org.apache.beam.runners.dataflow.worker.windmill.work.budget;
 
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoInteractions;
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertFalse;
 
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
-import org.mockito.Mockito;
 
 @RunWith(JUnit4.class)
 public class GetWorkBudgetRefresherTest {
-  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   private static final int WAIT_BUFFER = 10;
-  private final Runnable redistributeBudget = Mockito.mock(Runnable.class);
+  @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
 
-  private GetWorkBudgetRefresher createBudgetRefresher() {
-    return createBudgetRefresher(false);
+  private GetWorkBudgetRefresher createBudgetRefresher(Runnable 
redistributeBudget) {
+    return createBudgetRefresher(false, redistributeBudget);
   }
 
-  private GetWorkBudgetRefresher createBudgetRefresher(Boolean 
isBudgetRefreshPaused) {
+  private GetWorkBudgetRefresher createBudgetRefresher(
+      boolean isBudgetRefreshPaused, Runnable redistributeBudget) {
     return new GetWorkBudgetRefresher(() -> isBudgetRefreshPaused, 
redistributeBudget);
   }
 
   @Test
   public void testStop_successfullyTerminates() throws InterruptedException {
-    GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher();
+    CountDownLatch redistributeBudgetLatch = new CountDownLatch(1);
+    Runnable redistributeBudget = redistributeBudgetLatch::countDown;
+    GetWorkBudgetRefresher budgetRefresher = 
createBudgetRefresher(redistributeBudget);
     budgetRefresher.start();
     budgetRefresher.stop();
     budgetRefresher.requestBudgetRefresh();
-    Thread.sleep(WAIT_BUFFER);
-    verifyNoInteractions(redistributeBudget);
+    boolean redistributeBudgetRan =
+        redistributeBudgetLatch.await(WAIT_BUFFER, TimeUnit.MILLISECONDS);
+    // Make sure that redistributeBudgetLatch.countDown() is never called.
+    assertThat(redistributeBudgetLatch.getCount()).isEqualTo(1);
+    assertFalse(redistributeBudgetRan);
   }
 
   @Test
   public void testRequestBudgetRefresh_triggersBudgetRefresh() throws 
InterruptedException {
-    GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher();
+    CountDownLatch redistributeBudgetLatch = new CountDownLatch(1);
+    Runnable redistributeBudget = redistributeBudgetLatch::countDown;
+    GetWorkBudgetRefresher budgetRefresher = 
createBudgetRefresher(redistributeBudget);
     budgetRefresher.start();
     budgetRefresher.requestBudgetRefresh();
-    // Wait a bit for redistribute budget to run.
-    Thread.sleep(WAIT_BUFFER);
-    verify(redistributeBudget, times(1)).run();
+    // Wait for redistribute budget to run.
+    redistributeBudgetLatch.await();
+    assertThat(redistributeBudgetLatch.getCount()).isEqualTo(0);
   }
 
   @Test
   public void testScheduledBudgetRefresh() throws InterruptedException {
-    GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher();
+    CountDownLatch redistributeBudgetLatch = new CountDownLatch(1);
+    Runnable redistributeBudget = redistributeBudgetLatch::countDown;
+    GetWorkBudgetRefresher budgetRefresher = 
createBudgetRefresher(redistributeBudget);
     budgetRefresher.start();
-    Thread.sleep(GetWorkBudgetRefresher.SCHEDULED_BUDGET_REFRESH_MILLIS + 
WAIT_BUFFER);
-    verify(redistributeBudget, times(1)).run();
+    // Wait for scheduled redistribute budget to run.
+    redistributeBudgetLatch.await();
+    assertThat(redistributeBudgetLatch.getCount()).isEqualTo(0);
   }
 
   @Test
   public void testTriggeredAndScheduledBudgetRefresh_concurrent() throws 
InterruptedException {
-    GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher();
+    CountDownLatch redistributeBudgetLatch = new CountDownLatch(2);
+    Runnable redistributeBudget = redistributeBudgetLatch::countDown;
+    GetWorkBudgetRefresher budgetRefresher = 
createBudgetRefresher(redistributeBudget);
     budgetRefresher.start();
     Thread budgetRefreshTriggerThread = new 
Thread(budgetRefresher::requestBudgetRefresh);
     budgetRefreshTriggerThread.start();
-    Thread.sleep(GetWorkBudgetRefresher.SCHEDULED_BUDGET_REFRESH_MILLIS + 
WAIT_BUFFER);
     budgetRefreshTriggerThread.join();
-
-    // Wait a bit for redistribute budget to run.
-    Thread.sleep(WAIT_BUFFER);
-    verify(redistributeBudget, times(2)).run();
+    // Wait for triggered and scheduled redistribute budget to run.
+    redistributeBudgetLatch.await();
+    assertThat(redistributeBudgetLatch.getCount()).isEqualTo(0);
   }
 
   @Test
   public void testTriggeredBudgetRefresh_doesNotRunWhenBudgetRefreshPaused()
       throws InterruptedException {
-    GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher(true);
+    CountDownLatch redistributeBudgetLatch = new CountDownLatch(1);
+    Runnable redistributeBudget = redistributeBudgetLatch::countDown;
+    GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher(true, 
redistributeBudget);
     budgetRefresher.start();
     budgetRefresher.requestBudgetRefresh();
-    Thread.sleep(WAIT_BUFFER);
-    verifyNoInteractions(redistributeBudget);
+    boolean redistributeBudgetRan =
+        redistributeBudgetLatch.await(WAIT_BUFFER, TimeUnit.MILLISECONDS);
+    // Make sure that redistributeBudgetLatch.countDown() is never called.
+    assertThat(redistributeBudgetLatch.getCount()).isEqualTo(1);
+    assertFalse(redistributeBudgetRan);
   }
 
   @Test
   public void testScheduledBudgetRefresh_doesNotRunWhenBudgetRefreshPaused()
       throws InterruptedException {
-    GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher(true);
+    CountDownLatch redistributeBudgetLatch = new CountDownLatch(1);
+    Runnable redistributeBudget = redistributeBudgetLatch::countDown;
+    GetWorkBudgetRefresher budgetRefresher = createBudgetRefresher(true, 
redistributeBudget);
     budgetRefresher.start();
-    Thread.sleep(GetWorkBudgetRefresher.SCHEDULED_BUDGET_REFRESH_MILLIS + 
WAIT_BUFFER);
-    verifyNoInteractions(redistributeBudget);
+    boolean redistributeBudgetRan =
+        redistributeBudgetLatch.await(
+            GetWorkBudgetRefresher.SCHEDULED_BUDGET_REFRESH_MILLIS + 
WAIT_BUFFER,
+            TimeUnit.MILLISECONDS);
+    // Make sure that redistributeBudgetLatch.countDown() is never called.
+    assertThat(redistributeBudgetLatch.getCount()).isEqualTo(1);
+    assertFalse(redistributeBudgetRan);
   }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/DispatchedActiveWorkRefresherTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/DispatchedActiveWorkRefresherTest.java
index 48e8dc160fe..31e35404258 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/DispatchedActiveWorkRefresherTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/DispatchedActiveWorkRefresherTest.java
@@ -18,10 +18,12 @@
 package org.apache.beam.runners.dataflow.worker.windmill.work.refresh;
 
 import static com.google.common.truth.Truth.assertThat;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.after;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
 import com.google.api.services.dataflow.model.MapTask;
@@ -50,7 +52,6 @@ import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashBa
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Table;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Test;
@@ -192,11 +193,11 @@ public class DispatchedActiveWorkRefresherTest {
   }
 
   @Test
-  public void testInvalidateStuckCommits() {
+  public void testInvalidateStuckCommits() throws InterruptedException {
     int stuckCommitDurationMillis = 100;
     Table<ComputationState, Work, WindmillStateCache.ForComputation> 
computations =
         HashBasedTable.create();
-    WindmillStateCache stateCache = new WindmillStateCache(100);
+    WindmillStateCache stateCache = WindmillStateCache.ofSizeMbs(100);
     ByteString key = ByteString.EMPTY;
     for (int i = 0; i < 5; i++) {
       WindmillStateCache.ForComputation perComputationStateCache =
@@ -209,6 +210,19 @@ public class DispatchedActiveWorkRefresherTest {
     }
 
     TestClock fakeClock = new TestClock(Instant.now());
+    CountDownLatch invalidateStuckCommitRan = new 
CountDownLatch(computations.size());
+
+    // Count down the latch every time to avoid waiting/sleeping arbitrarily.
+    for (ComputationState computation : computations.rowKeySet()) {
+      doAnswer(
+              invocation -> {
+                invocation.callRealMethod();
+                invalidateStuckCommitRan.countDown();
+                return null;
+              })
+          .when(computation)
+          .invalidateStuckCommits(any(Instant.class));
+    }
 
     ActiveWorkRefresher activeWorkRefresher =
         createActiveWorkRefresher(
@@ -220,21 +234,20 @@ public class DispatchedActiveWorkRefresherTest {
 
     activeWorkRefresher.start();
     fakeClock.advance(Duration.millis(stuckCommitDurationMillis));
-    Uninterruptibles.sleepUninterruptibly(stuckCommitDurationMillis, 
TimeUnit.MILLISECONDS);
+    invalidateStuckCommitRan.await();
+    activeWorkRefresher.stop();
 
     for (Table.Cell<ComputationState, Work, WindmillStateCache.ForComputation> 
cell :
         computations.cellSet()) {
       ComputationState computation = cell.getRowKey();
       Work work = cell.getColumnKey();
       WindmillStateCache.ForComputation perComputationStateCache = 
cell.getValue();
-      verify(perComputationStateCache, after((long) (stuckCommitDurationMillis 
* 1.5)).times(1))
+      verify(perComputationStateCache, times(1))
           .invalidate(eq(key), eq(work.getWorkItem().getShardingKey()));
-      verify(computation, after((long) (stuckCommitDurationMillis * 
1.5)).times(1))
+      verify(computation, times(1))
           .completeWorkAndScheduleNextWorkForKey(
               eq(ShardedKey.create(key, work.getWorkItem().getShardingKey())), 
eq(work.id()));
     }
-
-    activeWorkRefresher.stop();
   }
 
   static class TestClock implements Clock {

Reply via email to