This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new c5c9e58eb [server] Avoid ReplicaFetcher busy loop retry storm during 
leader election or bucket migration (#2075)
c5c9e58eb is described below

commit c5c9e58eb1f4c80b6cafc45dabc6f3b50b866c08
Author: Yang Wang <[email protected]>
AuthorDate: Tue Dec 2 19:45:04 2025 +0800

    [server] Avoid ReplicaFetcher busy loop retry storm during leader election 
or bucket migration (#2075)
---
 .../fluss/server/replica/ReplicaManager.java       | 36 ++++++++++++++++++++--
 .../replica/fetcher/ReplicaFetcherThread.java      |  1 +
 2 files changed, 34 insertions(+), 3 deletions(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index 206586bf1..5cb526a8e 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -1304,14 +1304,27 @@ public class ReplicaManager {
         boolean errorReadingData = false;
         boolean hasFetchFromLocal = false;
         Map<TableBucket, FetchBucketStatus> fetchBucketStatusMap = new 
HashMap<>();
+        Map<TableBucket, FetchLogResultForBucket> expectedErrorBuckets = new 
HashMap<>();
         for (Map.Entry<TableBucket, LogReadResult> logReadResultEntry : 
logReadResults.entrySet()) {
             TableBucket tb = logReadResultEntry.getKey();
             LogReadResult logReadResult = logReadResultEntry.getValue();
             FetchLogResultForBucket fetchLogResultForBucket =
                     logReadResult.getFetchLogResultForBucket();
             if (fetchLogResultForBucket.failed()) {
-                errorReadingData = true;
-                break;
+                // Check if this is an expected error (like 
NOT_LEADER_OR_FOLLOWER,
+                // UNKNOWN_TABLE_OR_BUCKET) which should not
+                // short-circuit the entire fetch request.
+                Errors error = fetchLogResultForBucket.getError().error();
+                if (isNonCriticalFetchError(error)) {
+                    // Expected errors should not prevent other buckets from 
being delayed.
+                    // Save the error bucket to be returned later, and 
continue processing others.
+                    expectedErrorBuckets.put(tb, fetchLogResultForBucket);
+                    continue;
+                } else {
+                    // Severe/unexpected error - short-circuit and return 
immediately
+                    errorReadingData = true;
+                    break;
+                }
             }
 
             if (!fetchLogResultForBucket.fetchFromRemote()) {
@@ -1347,7 +1360,11 @@ public class ReplicaManager {
                             params,
                             this,
                             fetchBucketStatusMap,
-                            responseCallback,
+                            delayedResponse -> {
+                                // Merge expected error buckets with delayed 
response
+                                delayedResponse.putAll(expectedErrorBuckets);
+                                responseCallback.accept(delayedResponse);
+                            },
                             serverMetricGroup);
 
             // try to complete the request immediately, otherwise put it into 
the
@@ -1361,6 +1378,19 @@ public class ReplicaManager {
         }
     }
 
+    /**
+     * Check if the error is an expected fetch error that should not 
short-circuit the entire fetch
+     * request. These errors are common during normal operations (e.g., leader 
changes, bucket
+     * migrations) and should not prevent other buckets from being delayed.
+     *
+     * @param error the error to check
+     * @return true if the error is expected and should not short-circuit
+     */
+    private boolean isNonCriticalFetchError(Errors error) {
+        return error == Errors.NOT_LEADER_OR_FOLLOWER
+                || error == Errors.UNKNOWN_TABLE_OR_BUCKET_EXCEPTION;
+    }
+
     private void completeDelayedOperations(TableBucket tableBucket) {
         DelayedTableBucketKey delayedTableBucketKey = new 
DelayedTableBucketKey(tableBucket);
         delayedWriteManager.checkAndComplete(delayedTableBucketKey);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java
index 398bfb6ff..eaf0c0050 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java
@@ -280,6 +280,7 @@ final class ReplicaFetcherThread extends ShutdownableThread 
{
                                     "Remote server is not the leader for 
replica {}, which indicate "
                                             + "that the replica is being 
moved.",
                                     tableBucket);
+                            replicasWithError.add(tableBucket);
                             break;
                         default:
                             LOG.error(

Reply via email to