This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 20e87cc48 [server] Fix ISR state leak causing permanent deadlock 
during rolling upgrades (#2147)
20e87cc48 is described below

commit 20e87cc48c577ce626d000d9f2d4bfb7fdcd40da
Author: Yang Wang <[email protected]>
AuthorDate: Sat Dec 13 09:50:04 2025 +0800

    [server] Fix ISR state leak causing permanent deadlock during rolling 
upgrades (#2147)
---
 .../fluss/server/replica/AdjustIsrManager.java     | 44 ++++++++++++++--------
 .../org/apache/fluss/server/replica/Replica.java   |  2 +-
 .../server/coordinator/TestCoordinatorGateway.java | 11 ++++++
 .../fluss/server/replica/AdjustIsrManagerTest.java | 29 ++++++++++++++
 4 files changed, 70 insertions(+), 16 deletions(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/AdjustIsrManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/AdjustIsrManager.java
index d3b5d63c9..8ca3b9053 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/AdjustIsrManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/AdjustIsrManager.java
@@ -23,7 +23,6 @@ import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.rpc.gateway.CoordinatorGateway;
 import org.apache.fluss.rpc.messages.AdjustIsrRequest;
 import org.apache.fluss.rpc.messages.AdjustIsrResponse;
-import org.apache.fluss.rpc.protocol.Errors;
 import org.apache.fluss.server.entity.AdjustIsrResultForBucket;
 import org.apache.fluss.server.zk.data.LeaderAndIsr;
 import org.apache.fluss.utils.MapUtils;
@@ -102,7 +101,15 @@ public class AdjustIsrManager {
             // Copy current unsent ISRs but don't remove from the map, they 
get cleared in the
             // response callback.
             List<AdjustIsrItem> adjustIsrItemList = new 
ArrayList<>(unsentAdjustIsrMap.values());
-            sendAdjustIsrRequest(adjustIsrItemList);
+            try {
+                sendAdjustIsrRequest(adjustIsrItemList);
+            } catch (Throwable e) {
+                // Caught any top level exceptions
+                handleAdjustIsrRequestError(adjustIsrItemList, e);
+                // If we received a top-level error from the coordinator, retry
+                // the request in near future.
+                scheduler.scheduleOnce("send-adjust-isr", 
this::maybePropagateIsrAdjust, 50);
+            }
         }
     }
 
@@ -123,30 +130,37 @@ public class AdjustIsrManager {
                 .adjustIsr(adjustIsrRequest)
                 .whenComplete(
                         (response, exception) -> {
-                            Errors errors;
                             try {
-                                if (exception != null) {
-                                    errors = Errors.forException(exception);
+                                if (null != exception) {
+                                    
handleAdjustIsrRequestError(adjustIsrItemList, exception);
+                                    // If we received a top-level error from 
the coordinator, retry
+                                    // the request in near future.
+                                    scheduler.scheduleOnce(
+                                            "send-adjust-isr", 
this::maybePropagateIsrAdjust, 50);
+                                    return;
                                 } else {
                                     handleAdjustIsrResponse(response, 
adjustIsrItemList);
-                                    errors = Errors.NONE;
                                 }
                             } finally {
                                 // clear the flag so future requests can 
proceed.
                                 clearInFlightRequest();
                             }
-
-                            if (errors == Errors.NONE) {
-                                maybePropagateIsrAdjust();
-                            } else {
-                                // If we received a top-level error from the 
coordinator, retry
-                                // the request in near future.
-                                scheduler.scheduleOnce(
-                                        "send-adjust-isr", 
this::maybePropagateIsrAdjust, 50);
-                            }
+                            maybePropagateIsrAdjust();
                         });
     }
 
+    private void handleAdjustIsrRequestError(
+            List<AdjustIsrItem> adjustIsrItemList, Throwable error) {
+        LOG.error("Request adjust isr failed.", error);
+        adjustIsrItemList.forEach(
+                item -> {
+                    AdjustIsrItem adjustIsrItem = 
unsentAdjustIsrMap.remove(item.tableBucket);
+                    if (null != adjustIsrItem) {
+                        adjustIsrItem.future.completeExceptionally(error);
+                    }
+                });
+    }
+
     private void handleAdjustIsrResponse(
             AdjustIsrResponse response, List<AdjustIsrItem> adjustIsrItemList) 
{
         Map<TableBucket, AdjustIsrResultForBucket> resultForBucketMap =
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index d8c88cbd7..1a4b6d906 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -1577,7 +1577,7 @@ public final class Replica {
                                             // We don't know what happened on 
the coordinator server
                                             // exactly, but we do know this 
response is out of date,
                                             // so we ignore it.
-                                            LOG.debug(
+                                            LOG.warn(
                                                     "Ignoring failed ISR 
update to {} for bucket {} since we have already updated state to {}",
                                                     proposedIsrState,
                                                     tableBucket,
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
index ea20e62f9..be15c4d15 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
@@ -19,6 +19,7 @@ package org.apache.fluss.server.coordinator;
 
 import org.apache.fluss.exception.FencedLeaderEpochException;
 import org.apache.fluss.exception.IneligibleReplicaException;
+import org.apache.fluss.exception.NetworkException;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.rpc.gateway.CoordinatorGateway;
 import org.apache.fluss.rpc.messages.AdjustIsrRequest;
@@ -91,6 +92,7 @@ import 
org.apache.fluss.server.entity.CommitRemoteLogManifestData;
 import org.apache.fluss.server.zk.ZooKeeperClient;
 import org.apache.fluss.server.zk.data.LeaderAndIsr;
 import org.apache.fluss.server.zk.data.RemoteLogManifestHandle;
+import org.apache.fluss.utils.concurrent.FutureUtils;
 
 import javax.annotation.Nullable;
 
@@ -115,6 +117,7 @@ public class TestCoordinatorGateway implements 
CoordinatorGateway {
     public final AtomicBoolean commitRemoteLogManifestFail = new 
AtomicBoolean(false);
     public final Map<TableBucket, Integer> currentLeaderEpoch = new 
HashMap<>();
     private Set<Integer> shutdownTabletServers;
+    private boolean networkIssueEnable = false;
 
     public TestCoordinatorGateway() {
         this(null);
@@ -239,6 +242,10 @@ public class TestCoordinatorGateway implements 
CoordinatorGateway {
 
     @Override
     public CompletableFuture<AdjustIsrResponse> adjustIsr(AdjustIsrRequest 
request) {
+        if (networkIssueEnable) {
+            return FutureUtils.completedExceptionally(new 
NetworkException("Mock network issue."));
+        }
+
         Map<TableBucket, LeaderAndIsr> adjustIsrData = 
getAdjustIsrData(request);
         List<AdjustIsrResultForBucket> resultForBuckets = new ArrayList<>();
 
@@ -372,4 +379,8 @@ public class TestCoordinatorGateway implements 
CoordinatorGateway {
     public void setShutdownTabletServers(Set<Integer> shutdownTabletServers) {
         this.shutdownTabletServers = shutdownTabletServers;
     }
+
+    public void setNetworkIssueEnable(boolean networkIssueEnable) {
+        this.networkIssueEnable = networkIssueEnable;
+    }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrManagerTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrManagerTest.java
index 3f8623569..248e7929e 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrManagerTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.fluss.server.replica;
 
+import org.apache.fluss.exception.NetworkException;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.server.coordinator.TestCoordinatorGateway;
 import org.apache.fluss.server.zk.data.LeaderAndIsr;
@@ -28,6 +29,7 @@ import java.util.Arrays;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link AdjustIsrManager}. */
 class AdjustIsrManagerTest {
@@ -56,4 +58,31 @@ class AdjustIsrManagerTest {
         assertThat(result)
                 .isEqualTo(new LeaderAndIsr(tabletServerId, 0, 
Arrays.asList(1, 2, 3), 0, 2));
     }
+
+    @Test
+    void testSubmitPropagatesRpcLevelErrorAndAllowsRetry() throws Exception {
+        int tabletServerId = 0;
+        TestCoordinatorGateway coordinatorGateway = new 
TestCoordinatorGateway();
+        // Make all AdjustIsr requests fail with NetworkException.
+        coordinatorGateway.setNetworkIssueEnable(true);
+        AdjustIsrManager adjustIsrManager =
+                new AdjustIsrManager(new FlussScheduler(1), 
coordinatorGateway, tabletServerId);
+
+        // The RPC-level exception is propagated to the submit future.
+        TableBucket tb = new TableBucket(150001L, 0);
+        List<Integer> currentIsr = Arrays.asList(1, 2);
+        LeaderAndIsr adjustIsr = new LeaderAndIsr(tabletServerId, 0, 
currentIsr, 0, 0);
+        assertThatThrownBy(() -> adjustIsrManager.submit(tb, adjustIsr).get())
+                .rootCause()
+                .isInstanceOf(NetworkException.class)
+                .hasMessage("Mock network issue.");
+
+        coordinatorGateway.setNetworkIssueEnable(false);
+
+        // After the network issue is cleared, a retry should not be blocked 
by any previous
+        // submit.
+        LeaderAndIsr result = adjustIsrManager.submit(tb, adjustIsr).get();
+        assertThat(result)
+                .isEqualTo(new LeaderAndIsr(tabletServerId, 0, 
Arrays.asList(1, 2), 0, 1));
+    }
 }

Reply via email to