This is an automated email from the ASF dual-hosted git repository.
ipolyzos pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 37f46ddd5 [server] Added Retry Handler to Address Snapshotting Test
Asynchrony (#1881)
37f46ddd5 is described below
commit 37f46ddd5413de94ce4407682990b38d7f5d56bd
Author: Rion Williams <[email protected]>
AuthorDate: Thu Oct 30 03:29:11 2025 -0500
[server] Added Retry Handler to Address Snapshotting Test Asynchrony (#1881)
[server] Added Retry Handler to Address Snapshotting Test Asynchrony
---
.../apache/fluss/server/replica/ReplicaTest.java | 24 +++++++++++++++++++---
1 file changed, 21 insertions(+), 3 deletions(-)
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
index bc1365ab1..51ab24347 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTest.java
@@ -503,7 +503,7 @@ final class ReplicaTest extends ReplicaTestBase {
putRecordsToLeader(kvReplica, kvRecords);
// trigger first snapshot
- scheduledExecutorService.triggerNonPeriodicScheduledTask();
+ triggerSnapshotTaskWithRetry(scheduledExecutorService, 5);
kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 0);
// put more data and create second snapshot
@@ -514,7 +514,7 @@ final class ReplicaTest extends ReplicaTestBase {
putRecordsToLeader(kvReplica, kvRecords);
// trigger second snapshot
- scheduledExecutorService.triggerNonPeriodicScheduledTask();
+ triggerSnapshotTaskWithRetry(scheduledExecutorService, 5);
kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 1);
// put more data and create third snapshot (this will be the broken
one)
@@ -525,7 +525,7 @@ final class ReplicaTest extends ReplicaTestBase {
putRecordsToLeader(kvReplica, kvRecords);
// trigger third snapshot
- scheduledExecutorService.triggerNonPeriodicScheduledTask();
+ triggerSnapshotTaskWithRetry(scheduledExecutorService, 5);
CompletedSnapshot snapshot2 =
kvSnapshotStore.waitUntilSnapshotComplete(tableBucket, 2);
// verify that snapshot2 is the latest one before we break it
@@ -766,4 +766,22 @@ final class ReplicaTest extends ReplicaTestBase {
isScheduled = false;
}
}
+
+ /** A helper function with support for retries for flaky triggering
operations. */
+ private static void triggerSnapshotTaskWithRetry(
+ ManuallyTriggeredScheduledExecutorService
scheduledExecutorService, int maxRetries)
+ throws Exception {
+ for (int i = 0; i < maxRetries; i++) {
+ try {
+ scheduledExecutorService.triggerNonPeriodicScheduledTask();
+ return;
+ } catch (java.util.NoSuchElementException e) {
+ if (i == maxRetries - 1) {
+ throw e;
+ }
+
+ Thread.sleep(50);
+ }
+ }
+ }
}