This is an automated email from the ASF dual-hosted git repository.
damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 468ae40dc8e Fix AsyncWrapperTest timeout and flakiness issues. (#38970)
468ae40dc8e is described below
commit 468ae40dc8e2424ba4acd957226e369c97d1e7b1
Author: tejasiyer-dev <[email protected]>
AuthorDate: Tue Jun 16 05:15:55 2026 -0700
Fix AsyncWrapperTest timeout and flakiness issues. (#38970)
* Fix AsyncWrapperTest timeout and flakiness issues.
* Fix AsyncWrapperTest to prevent new race conditions and premature
timeouts.
---
.../beam/sdk/transforms/AsyncWrapperTest.java | 43 +++++++++++-----------
1 file changed, 21 insertions(+), 22 deletions(-)
diff --git
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/AsyncWrapperTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/AsyncWrapperTest.java
index e95f586e884..183b1851459 100644
---
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/AsyncWrapperTest.java
+++
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/AsyncWrapperTest.java
@@ -230,21 +230,20 @@ public class AsyncWrapperTest implements Serializable {
}
private void waitForEmpty(AsyncWrapper<?, ?, ?> asyncWrapper, int
timeoutSeconds) {
- int count = 0;
+ long limit = System.currentTimeMillis() + timeoutSeconds * 1000L;
while (!asyncWrapper.isEmpty()) {
+ if (System.currentTimeMillis() > limit) {
+ throw new RuntimeException("Timed out waiting for async dofn to be
empty");
+ }
try {
- Thread.sleep(1000);
+ Thread.sleep(5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
- count += 1;
- if (count > timeoutSeconds) {
- throw new RuntimeException("Timed out waiting for async dofn to be
empty");
- }
}
try {
- Thread.sleep(1000);
+ Thread.sleep(5);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -419,7 +418,7 @@ public class AsyncWrapperTest implements Serializable {
// execution task has not finished processing yet.
@Test
public void testLongItem() {
- BasicDofn dofn = new BasicDofn(1000);
+ BasicDofn dofn = new BasicDofn(500);
AsyncWrapper<String, String, String> asyncWrapper =
new AsyncWrapper<>(
dofn, 1, Duration.standardSeconds(5), null, null, null, null,
useThreadPool);
@@ -438,7 +437,7 @@ public class AsyncWrapperTest implements Serializable {
assertEquals(0, dofn.getProcessed());
assertEquals(1, fakeBagState.items.size());
- waitForEmpty(asyncWrapper, 20);
+ waitForEmpty(asyncWrapper, 2);
result =
asyncWrapper.commitFinishedItemsDirect(
@@ -538,7 +537,7 @@ public class AsyncWrapperTest implements Serializable {
// Identical elements should not spawn multiple concurrent background
executions.
@Test
public void testDuplicates() {
- BasicDofn dofn = new BasicDofn(1000);
+ BasicDofn dofn = new BasicDofn(10);
AsyncWrapper<String, String, String> asyncWrapper =
new AsyncWrapper<>(
dofn, 1, Duration.standardSeconds(5), null, null, null, null,
useThreadPool);
@@ -568,7 +567,7 @@ public class AsyncWrapperTest implements Serializable {
// has cleared are correctly tracked and processed.
@Test
public void testSlowDuplicates() {
- BasicDofn dofn = new BasicDofn(5000);
+ BasicDofn dofn = new BasicDofn(20);
AsyncWrapper<String, String, String> asyncWrapper =
new AsyncWrapper<>(
dofn, 1, Duration.standardSeconds(5), null, null, null, null,
useThreadPool);
@@ -581,7 +580,7 @@ public class AsyncWrapperTest implements Serializable {
asyncWrapper.processDirect(msg, GlobalWindow.INSTANCE, Instant.now(),
fakeBagState, fakeTimer);
try {
- Thread.sleep(10000);
+ Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -610,7 +609,7 @@ public class AsyncWrapperTest implements Serializable {
// and decrement immediately upon execution completion.
@Test
public void testBufferCount() {
- BasicDofn dofn = new BasicDofn(1000);
+ BasicDofn dofn = new BasicDofn(10);
AsyncWrapper<String, String, String> asyncWrapper =
new AsyncWrapper<>(
dofn, 1, Duration.standardSeconds(5), null, null, null, null,
useThreadPool);
@@ -637,7 +636,7 @@ public class AsyncWrapperTest implements Serializable {
// the scheduler must block and delay submissions appropriately.
@Test
public void testBufferStopsAcceptingItems() {
- BasicDofn dofn = new BasicDofn(1000);
+ BasicDofn dofn = new BasicDofn(500);
AsyncWrapper<String, String, String> asyncWrapper =
new AsyncWrapper<>(
dofn,
@@ -670,7 +669,7 @@ public class AsyncWrapperTest implements Serializable {
}
try {
- Thread.sleep(200);
+ Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -707,7 +706,7 @@ public class AsyncWrapperTest implements Serializable {
// Verifies actively cancelled elements are cleanly dropped from the buffer
during throttling.
@Test
public void testBufferWithCancellation() {
- BasicDofn dofn = new BasicDofn(1000);
+ BasicDofn dofn = new BasicDofn(10);
AsyncWrapper<String, String, String> asyncWrapper =
new AsyncWrapper<>(
dofn, 1, Duration.standardSeconds(5), null, null, null, null,
useThreadPool);
@@ -746,7 +745,7 @@ public class AsyncWrapperTest implements Serializable {
// across multiple keys correctly under heavy multi-threaded load.
@Test
public void testLoadCorrectness() {
- BasicDofn dofn = new BasicDofn(1000);
+ BasicDofn dofn = new BasicDofn(10);
AsyncWrapper<String, String, String> asyncWrapper =
new AsyncWrapper<>(
dofn,
@@ -791,14 +790,14 @@ public class AsyncWrapperTest implements Serializable {
timers.get(key));
}));
try {
- Thread.sleep(random.nextInt(200));
+ Thread.sleep(random.nextInt(2));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
try {
- Thread.sleep(3000 + random.nextInt(2000));
+ Thread.sleep(1000 + random.nextInt(1000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -834,7 +833,7 @@ public class AsyncWrapperTest implements Serializable {
}
}
try {
- Thread.sleep(1000 + random.nextInt(2000));
+ Thread.sleep(10 + random.nextInt(20));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
@@ -854,7 +853,7 @@ public class AsyncWrapperTest implements Serializable {
// must complete cleanly without thread or lock deadlocks.
@Test
public void testResetStateConcurrentTeardown() {
- BasicDofn dofn = new BasicDofn(500);
+ BasicDofn dofn = new BasicDofn(10);
AsyncWrapper<String, String, String> asyncWrapper =
new AsyncWrapper<>(
dofn, 1, Duration.standardSeconds(5), null, null, null, null,
useThreadPool);
@@ -867,7 +866,7 @@ public class AsyncWrapperTest implements Serializable {
KV.of("key1", "1"), GlobalWindow.INSTANCE, Instant.now(),
fakeBagState, fakeTimer);
try {
- Thread.sleep(50);
+ Thread.sleep(2);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}