This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-0.8 in repository https://gitbox.apache.org/repos/asf/fluss.git
commit 710d5912202cfc2124dfe12ba3bd0dc23f813a34 Author: Rion Williams <[email protected]> AuthorDate: Thu Oct 30 03:29:11 2025 -0500 [server] Added Retry Handler to Address Snapshotting Test Asynchrony (#1881) [server] Added Retry Handler to Address Snapshotting Test Asynchrony (cherry picked from commit 37f46ddd5413de94ce4407682990b38d7f5d56bd) --- .../apache/fluss/server/replica/ReplicaTest.java | 24 +++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java index bc1365ab1..51ab24347 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java @@ -503,7 +503,7 @@ final class ReplicaTest extends ReplicaTestBase { putRecordsToLeader(kvReplica, kvRecords); // trigger first snapshot - scheduledExecutorService.triggerNonPeriodicScheduledTask(); + triggerSnapshotTaskWithRetry(scheduledExecutorService, 5); kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 0); // put more data and create second snapshot @@ -514,7 +514,7 @@ final class ReplicaTest extends ReplicaTestBase { putRecordsToLeader(kvReplica, kvRecords); // trigger second snapshot - scheduledExecutorService.triggerNonPeriodicScheduledTask(); + triggerSnapshotTaskWithRetry(scheduledExecutorService, 5); kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 1); // put more data and create third snapshot (this will be the broken one) @@ -525,7 +525,7 @@ final class ReplicaTest extends ReplicaTestBase { putRecordsToLeader(kvReplica, kvRecords); // trigger third snapshot - scheduledExecutorService.triggerNonPeriodicScheduledTask(); + triggerSnapshotTaskWithRetry(scheduledExecutorService, 5); CompletedSnapshot snapshot2 = kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 2); // verify that snapshot2 is the latest one before we break it @@ -766,4 +766,22 @@ final class ReplicaTest extends ReplicaTestBase { isScheduled = false; } } + + /** A helper function with support for retries for flaky triggering operations. */ + private static void triggerSnapshotTaskWithRetry( + ManuallyTriggeredScheduledExecutorService scheduledExecutorService, int maxRetries) + throws Exception { + for (int i = 0; i < maxRetries; i++) { + try { + scheduledExecutorService.triggerNonPeriodicScheduledTask(); + return; + } catch (java.util.NoSuchElementException e) { + if (i == maxRetries - 1) { + throw e; + } + + Thread.sleep(50); + } + } + } }
