This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new c5c9e58eb [server] Avoid ReplicaFetcher busy loop retry storm during
leader election or bucket migration (#2075)
c5c9e58eb is described below
commit c5c9e58eb1f4c80b6cafc45dabc6f3b50b866c08
Author: Yang Wang <[email protected]>
AuthorDate: Tue Dec 2 19:45:04 2025 +0800
[server] Avoid ReplicaFetcher busy loop retry storm during leader election
or bucket migration (#2075)
---
.../fluss/server/replica/ReplicaManager.java | 36 ++++++++++++++++++++--
.../replica/fetcher/ReplicaFetcherThread.java | 1 +
2 files changed, 34 insertions(+), 3 deletions(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index 206586bf1..5cb526a8e 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -1304,14 +1304,27 @@ public class ReplicaManager {
boolean errorReadingData = false;
boolean hasFetchFromLocal = false;
Map<TableBucket, FetchBucketStatus> fetchBucketStatusMap = new
HashMap<>();
+ Map<TableBucket, FetchLogResultForBucket> expectedErrorBuckets = new
HashMap<>();
for (Map.Entry<TableBucket, LogReadResult> logReadResultEntry :
logReadResults.entrySet()) {
TableBucket tb = logReadResultEntry.getKey();
LogReadResult logReadResult = logReadResultEntry.getValue();
FetchLogResultForBucket fetchLogResultForBucket =
logReadResult.getFetchLogResultForBucket();
if (fetchLogResultForBucket.failed()) {
- errorReadingData = true;
- break;
+ // Check if this is an expected error (like
NOT_LEADER_OR_FOLLOWER,
+ // UNKNOWN_TABLE_OR_BUCKET) which should not
+ // short-circuit the entire fetch request.
+ Errors error = fetchLogResultForBucket.getError().error();
+ if (isNonCriticalFetchError(error)) {
+ // Expected errors should not prevent other buckets from
being delayed.
+ // Save the error bucket to be returned later, and
continue processing others.
+ expectedErrorBuckets.put(tb, fetchLogResultForBucket);
+ continue;
+ } else {
+ // Severe/unexpected error - short-circuit and return
immediately
+ errorReadingData = true;
+ break;
+ }
}
if (!fetchLogResultForBucket.fetchFromRemote()) {
@@ -1347,7 +1360,11 @@ public class ReplicaManager {
params,
this,
fetchBucketStatusMap,
- responseCallback,
+ delayedResponse -> {
+ // Merge expected error buckets with delayed
response
+ delayedResponse.putAll(expectedErrorBuckets);
+ responseCallback.accept(delayedResponse);
+ },
serverMetricGroup);
// try to complete the request immediately, otherwise put it into
the
@@ -1361,6 +1378,19 @@ public class ReplicaManager {
}
}
+ /**
+ * Check if the error is an expected fetch error that should not
short-circuit the entire fetch
+ * request. These errors are common during normal operations (e.g., leader
changes, bucket
+ * migrations) and should not prevent other buckets from being delayed.
+ *
+ * @param error the error to check
+ * @return true if the error is expected and should not short-circuit
+ */
+ private boolean isNonCriticalFetchError(Errors error) {
+ return error == Errors.NOT_LEADER_OR_FOLLOWER
+ || error == Errors.UNKNOWN_TABLE_OR_BUCKET_EXCEPTION;
+ }
+
private void completeDelayedOperations(TableBucket tableBucket) {
DelayedTableBucketKey delayedTableBucketKey = new
DelayedTableBucketKey(tableBucket);
delayedWriteManager.checkAndComplete(delayedTableBucketKey);
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java
index 398bfb6ff..eaf0c0050 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/replica/fetcher/ReplicaFetcherThread.java
@@ -280,6 +280,7 @@ final class ReplicaFetcherThread extends ShutdownableThread
{
"Remote server is not the leader for
replica {}, which indicate "
+ "that the replica is being
moved.",
tableBucket);
+ replicasWithError.add(tableBucket);
break;
default:
LOG.error(