This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 20e87cc48 [server] Fix ISR state leak causing permanent deadlock
during rolling upgrades (#2147)
20e87cc48 is described below
commit 20e87cc48c577ce626d000d9f2d4bfb7fdcd40da
Author: Yang Wang <[email protected]>
AuthorDate: Sat Dec 13 09:50:04 2025 +0800
[server] Fix ISR state leak causing permanent deadlock during rolling
upgrades (#2147)
---
.../fluss/server/replica/AdjustIsrManager.java | 44 ++++++++++++++--------
.../org/apache/fluss/server/replica/Replica.java | 2 +-
.../server/coordinator/TestCoordinatorGateway.java | 11 ++++++
.../fluss/server/replica/AdjustIsrManagerTest.java | 29 ++++++++++++++
4 files changed, 70 insertions(+), 16 deletions(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/AdjustIsrManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/AdjustIsrManager.java
index d3b5d63c9..8ca3b9053 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/replica/AdjustIsrManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/replica/AdjustIsrManager.java
@@ -23,7 +23,6 @@ import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
import org.apache.fluss.rpc.messages.AdjustIsrRequest;
import org.apache.fluss.rpc.messages.AdjustIsrResponse;
-import org.apache.fluss.rpc.protocol.Errors;
import org.apache.fluss.server.entity.AdjustIsrResultForBucket;
import org.apache.fluss.server.zk.data.LeaderAndIsr;
import org.apache.fluss.utils.MapUtils;
@@ -102,7 +101,15 @@ public class AdjustIsrManager {
// Copy current unsent ISRs but don't remove from the map, they
get cleared in the
// response callback.
List<AdjustIsrItem> adjustIsrItemList = new
ArrayList<>(unsentAdjustIsrMap.values());
- sendAdjustIsrRequest(adjustIsrItemList);
+ try {
+ sendAdjustIsrRequest(adjustIsrItemList);
+ } catch (Throwable e) {
+ // Caught any top level exceptions
+ handleAdjustIsrRequestError(adjustIsrItemList, e);
+ // If we received a top-level error from the coordinator, retry
+ // the request in near future.
+ scheduler.scheduleOnce("send-adjust-isr",
this::maybePropagateIsrAdjust, 50);
+ }
}
}
@@ -123,30 +130,37 @@ public class AdjustIsrManager {
.adjustIsr(adjustIsrRequest)
.whenComplete(
(response, exception) -> {
- Errors errors;
try {
- if (exception != null) {
- errors = Errors.forException(exception);
+ if (null != exception) {
+
handleAdjustIsrRequestError(adjustIsrItemList, exception);
+ // If we received a top-level error from
the coordinator, retry
+ // the request in near future.
+ scheduler.scheduleOnce(
+ "send-adjust-isr",
this::maybePropagateIsrAdjust, 50);
+ return;
} else {
handleAdjustIsrResponse(response,
adjustIsrItemList);
- errors = Errors.NONE;
}
} finally {
// clear the flag so future requests can
proceed.
clearInFlightRequest();
}
-
- if (errors == Errors.NONE) {
- maybePropagateIsrAdjust();
- } else {
- // If we received a top-level error from the
coordinator, retry
- // the request in near future.
- scheduler.scheduleOnce(
- "send-adjust-isr",
this::maybePropagateIsrAdjust, 50);
- }
+ maybePropagateIsrAdjust();
});
}
+ private void handleAdjustIsrRequestError(
+ List<AdjustIsrItem> adjustIsrItemList, Throwable error) {
+ LOG.error("Request adjust isr failed.", error);
+ adjustIsrItemList.forEach(
+ item -> {
+ AdjustIsrItem adjustIsrItem =
unsentAdjustIsrMap.remove(item.tableBucket);
+ if (null != adjustIsrItem) {
+ adjustIsrItem.future.completeExceptionally(error);
+ }
+ });
+ }
+
private void handleAdjustIsrResponse(
AdjustIsrResponse response, List<AdjustIsrItem> adjustIsrItemList)
{
Map<TableBucket, AdjustIsrResultForBucket> resultForBucketMap =
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index d8c88cbd7..1a4b6d906 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -1577,7 +1577,7 @@ public final class Replica {
// We don't know what happened on
the coordinator server
// exactly, but we do know this
response is out of date,
// so we ignore it.
- LOG.debug(
+ LOG.warn(
"Ignoring failed ISR
update to {} for bucket {} since we have already updated state to {}",
proposedIsrState,
tableBucket,
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
index ea20e62f9..be15c4d15 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
@@ -19,6 +19,7 @@ package org.apache.fluss.server.coordinator;
import org.apache.fluss.exception.FencedLeaderEpochException;
import org.apache.fluss.exception.IneligibleReplicaException;
+import org.apache.fluss.exception.NetworkException;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
import org.apache.fluss.rpc.messages.AdjustIsrRequest;
@@ -91,6 +92,7 @@ import
org.apache.fluss.server.entity.CommitRemoteLogManifestData;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.data.LeaderAndIsr;
import org.apache.fluss.server.zk.data.RemoteLogManifestHandle;
+import org.apache.fluss.utils.concurrent.FutureUtils;
import javax.annotation.Nullable;
@@ -115,6 +117,7 @@ public class TestCoordinatorGateway implements
CoordinatorGateway {
public final AtomicBoolean commitRemoteLogManifestFail = new
AtomicBoolean(false);
public final Map<TableBucket, Integer> currentLeaderEpoch = new
HashMap<>();
private Set<Integer> shutdownTabletServers;
+ private boolean networkIssueEnable = false;
public TestCoordinatorGateway() {
this(null);
@@ -239,6 +242,10 @@ public class TestCoordinatorGateway implements
CoordinatorGateway {
@Override
public CompletableFuture<AdjustIsrResponse> adjustIsr(AdjustIsrRequest
request) {
+ if (networkIssueEnable) {
+ return FutureUtils.completedExceptionally(new
NetworkException("Mock network issue."));
+ }
+
Map<TableBucket, LeaderAndIsr> adjustIsrData =
getAdjustIsrData(request);
List<AdjustIsrResultForBucket> resultForBuckets = new ArrayList<>();
@@ -372,4 +379,8 @@ public class TestCoordinatorGateway implements
CoordinatorGateway {
public void setShutdownTabletServers(Set<Integer> shutdownTabletServers) {
this.shutdownTabletServers = shutdownTabletServers;
}
+
+ public void setNetworkIssueEnable(boolean networkIssueEnable) {
+ this.networkIssueEnable = networkIssueEnable;
+ }
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrManagerTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrManagerTest.java
index 3f8623569..248e7929e 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrManagerTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.fluss.server.replica;
+import org.apache.fluss.exception.NetworkException;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.server.coordinator.TestCoordinatorGateway;
import org.apache.fluss.server.zk.data.LeaderAndIsr;
@@ -28,6 +29,7 @@ import java.util.Arrays;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link AdjustIsrManager}. */
class AdjustIsrManagerTest {
@@ -56,4 +58,31 @@ class AdjustIsrManagerTest {
assertThat(result)
.isEqualTo(new LeaderAndIsr(tabletServerId, 0,
Arrays.asList(1, 2, 3), 0, 2));
}
+
+ @Test
+ void testSubmitPropagatesRpcLevelErrorAndAllowsRetry() throws Exception {
+ int tabletServerId = 0;
+ TestCoordinatorGateway coordinatorGateway = new
TestCoordinatorGateway();
+ // Make all AdjustIsr requests fail with NetworkException.
+ coordinatorGateway.setNetworkIssueEnable(true);
+ AdjustIsrManager adjustIsrManager =
+ new AdjustIsrManager(new FlussScheduler(1),
coordinatorGateway, tabletServerId);
+
+ // The RPC-level exception is propagated to the submit future.
+ TableBucket tb = new TableBucket(150001L, 0);
+ List<Integer> currentIsr = Arrays.asList(1, 2);
+ LeaderAndIsr adjustIsr = new LeaderAndIsr(tabletServerId, 0,
currentIsr, 0, 0);
+ assertThatThrownBy(() -> adjustIsrManager.submit(tb, adjustIsr).get())
+ .rootCause()
+ .isInstanceOf(NetworkException.class)
+ .hasMessage("Mock network issue.");
+
+ coordinatorGateway.setNetworkIssueEnable(false);
+
+ // After the network issue is cleared, a retry should not be blocked
by any previous
+ // submit.
+ LeaderAndIsr result = adjustIsrManager.submit(tb, adjustIsr).get();
+ assertThat(result)
+ .isEqualTo(new LeaderAndIsr(tabletServerId, 0,
Arrays.asList(1, 2), 0, 1));
+ }
}