This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 640f40c6d [server] Coordinator should reject the AdjustIsr request if 
the adjusted isr contains shutdown TabletServers (#1734)
640f40c6d is described below

commit 640f40c6d7cda1da80e9d5a25ce37a5d12e66190
Author: yunhong <[email protected]>
AuthorDate: Tue Sep 23 11:25:48 2025 +0800

    [server] Coordinator should reject the AdjustIsr request if the adjusted 
isr contains shutdown TabletServers (#1734)
---
 .../exception/IneligibleReplicaException.java      | 28 ++++++++++++++++++
 .../java/org/apache/fluss/rpc/protocol/Errors.java |  7 ++++-
 .../coordinator/CoordinatorEventProcessor.java     | 15 ++++++++++
 .../org/apache/fluss/server/replica/Replica.java   |  1 +
 .../apache/fluss/server/zk/ZooKeeperClient.java    |  5 ++++
 .../server/coordinator/TestCoordinatorGateway.java | 29 ++++++++++++++++++-
 .../apache/fluss/server/replica/AdjustIsrTest.java | 33 +++++++++++++++++++++-
 7 files changed, 115 insertions(+), 3 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/exception/IneligibleReplicaException.java
 
b/fluss-common/src/main/java/org/apache/fluss/exception/IneligibleReplicaException.java
new file mode 100644
index 000000000..e21736b3a
--- /dev/null
+++ 
b/fluss-common/src/main/java/org/apache/fluss/exception/IneligibleReplicaException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.exception;
+
+/** Exception for ineligible replica. */
+public class IneligibleReplicaException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public IneligibleReplicaException(String message) {
+        super(message);
+    }
+}
diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
index 631047eac..5ec892a69 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java
@@ -28,6 +28,7 @@ import org.apache.fluss.exception.DatabaseNotExistException;
 import org.apache.fluss.exception.DuplicateSequenceException;
 import org.apache.fluss.exception.FencedLeaderEpochException;
 import org.apache.fluss.exception.FencedTieringEpochException;
+import org.apache.fluss.exception.IneligibleReplicaException;
 import org.apache.fluss.exception.InvalidColumnProjectionException;
 import org.apache.fluss.exception.InvalidConfigException;
 import org.apache.fluss.exception.InvalidCoordinatorException;
@@ -217,7 +218,11 @@ public enum Errors {
     LAKE_SNAPSHOT_NOT_EXIST(
             53, "The lake snapshot is not exist.", 
LakeTableSnapshotNotExistException::new),
     LAKE_TABLE_ALREADY_EXIST(
-            54, "The lake table already exists.", 
LakeTableAlreadyExistException::new);
+            54, "The lake table already exists.", 
LakeTableAlreadyExistException::new),
+    INELIGIBLE_REPLICA_EXCEPTION(
+            55,
+            "The new ISR contains at least one ineligible replica.",
+            IneligibleReplicaException::new);
 
     private static final Logger LOG = LoggerFactory.getLogger(Errors.class);
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
index bab477d20..3f0ee3cbc 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -25,6 +25,7 @@ import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.FencedLeaderEpochException;
 import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.exception.IneligibleReplicaException;
 import org.apache.fluss.exception.InvalidCoordinatorException;
 import org.apache.fluss.exception.InvalidUpdateVersionException;
 import org.apache.fluss.exception.TabletServerNotAvailableException;
@@ -1026,6 +1027,20 @@ public class CoordinatorEventProcessor implements 
EventProcessor {
                 // that this node is not the leader.
                 throw new InvalidUpdateVersionException(
                         "The request bucket epoch in adjust isr request is 
lower than current bucket epoch in coordinator.");
+            } else {
+                // Check if the new ISR are all ineligible replicas (doesn't 
contain any shutting
+                // down tabletServers).
+                Set<Integer> ineligibleReplicas = new 
HashSet<>(newLeaderAndIsr.isr());
+                
ineligibleReplicas.removeAll(coordinatorContext.liveTabletServerSet());
+                if (!ineligibleReplicas.isEmpty()) {
+                    String errorMsg =
+                            String.format(
+                                    "Rejecting adjustIsr request for table 
bucket %s because it "
+                                            + "specified ineligible replicas 
%s in the new ISR %s",
+                                    tableBucket, ineligibleReplicas, 
newLeaderAndIsr.isr());
+                    LOG.info(errorMsg);
+                    throw new IneligibleReplicaException(errorMsg);
+                }
             }
         }
     }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index 90c1630f8..e4e25de80 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -1649,6 +1649,7 @@ public final class Replica {
         failedIsrUpdates.inc();
         switch (error) {
             case OPERATION_NOT_ATTEMPTED_EXCEPTION:
+            case INELIGIBLE_REPLICA_EXCEPTION:
                 // Care must be taken when resetting to the last committed 
state since we may not
                 // know in general whether the request was applied or not 
taking into account
                 // retries and controller changes which might have occurred 
before we received the
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
index 98654972a..7173d8cdf 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
@@ -408,6 +408,10 @@ public class ZooKeeperClient implements AutoCloseable {
                 .forPath(bucketsParentPath);
 
         for (RegisterTableBucketLeadAndIsrInfo info : registerList) {
+            LOG.info(
+                    "Batch Register {} for bucket {} in Zookeeper.",
+                    info.getLeaderAndIsr(),
+                    info.getTableBucket());
             byte[] data = LeaderAndIsrZNode.encode(info.getLeaderAndIsr());
             // create direct parent node
             CuratorOp parentNodeCreate =
@@ -487,6 +491,7 @@ public class ZooKeeperClient implements AutoCloseable {
             TableBucket tableBucket = entry.getKey();
             LeaderAndIsr leaderAndIsr = entry.getValue();
 
+            LOG.info("Batch Update {} for bucket {} in Zookeeper.", 
leaderAndIsr, tableBucket);
             String path = LeaderAndIsrZNode.path(tableBucket);
             byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr);
             CuratorOp updateOp = 
zkClient.transactionOp().setData().forPath(path, data);
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
index 38e3f9ea1..5ef93cd71 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
@@ -18,6 +18,7 @@
 package org.apache.fluss.server.coordinator;
 
 import org.apache.fluss.exception.FencedLeaderEpochException;
+import org.apache.fluss.exception.IneligibleReplicaException;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.rpc.gateway.CoordinatorGateway;
 import org.apache.fluss.rpc.messages.AdjustIsrRequest;
@@ -89,8 +90,10 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -105,6 +108,7 @@ public class TestCoordinatorGateway implements 
CoordinatorGateway {
     private final @Nullable ZooKeeperClient zkClient;
     public final AtomicBoolean commitRemoteLogManifestFail = new 
AtomicBoolean(false);
     public final Map<TableBucket, Integer> currentLeaderEpoch = new 
HashMap<>();
+    private Set<Integer> shutdownTabletServers;
 
     public TestCoordinatorGateway() {
         this(null);
@@ -112,6 +116,7 @@ public class TestCoordinatorGateway implements 
CoordinatorGateway {
 
     public TestCoordinatorGateway(ZooKeeperClient zkClient) {
         this.zkClient = zkClient;
+        this.shutdownTabletServers = new HashSet<>();
     }
 
     @Override
@@ -230,7 +235,12 @@ public class TestCoordinatorGateway implements 
CoordinatorGateway {
                 (tb, leaderAndIsr) -> {
                     Integer currentLeaderEpoch = 
this.currentLeaderEpoch.getOrDefault(tb, 0);
                     int requestLeaderEpoch = leaderAndIsr.leaderEpoch();
-
+                    Set<Integer> ineligibleReplicas = new HashSet<>();
+                    for (int replica : leaderAndIsr.isr()) {
+                        if (shutdownTabletServers.contains(replica)) {
+                            ineligibleReplicas.add(replica);
+                        }
+                    }
                     AdjustIsrResultForBucket adjustIsrResultForBucket;
                     if (requestLeaderEpoch < currentLeaderEpoch) {
                         adjustIsrResultForBucket =
@@ -239,6 +249,19 @@ public class TestCoordinatorGateway implements 
CoordinatorGateway {
                                         ApiError.fromThrowable(
                                                 new FencedLeaderEpochException(
                                                         "request leader epoch 
is fenced.")));
+                    } else if (!ineligibleReplicas.isEmpty()) {
+                        adjustIsrResultForBucket =
+                                new AdjustIsrResultForBucket(
+                                        tb,
+                                        ApiError.fromThrowable(
+                                                new IneligibleReplicaException(
+                                                        String.format(
+                                                                "Rejecting 
adjustIsr request for table bucket %s because it "
+                                                                        + 
"specified ineligible replicas %s in the new ISR %s",
+                                                                tb,
+                                                                
ineligibleReplicas,
+                                                                
leaderAndIsr))));
+
                     } else {
                         adjustIsrResultForBucket =
                                 new AdjustIsrResultForBucket(
@@ -322,4 +345,8 @@ public class TestCoordinatorGateway implements 
CoordinatorGateway {
     public void setCurrentLeaderEpoch(TableBucket tableBucket, int 
leaderEpoch) {
         currentLeaderEpoch.put(tableBucket, leaderEpoch);
     }
+
+    public void setShutdownTabletServers(Set<Integer> shutdownTabletServers) {
+        this.shutdownTabletServers = shutdownTabletServers;
+    }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrTest.java 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrTest.java
index f90561485..64eade999 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/AdjustIsrTest.java
@@ -20,6 +20,7 @@ package org.apache.fluss.server.replica;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.FencedLeaderEpochException;
+import org.apache.fluss.exception.IneligibleReplicaException;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.rpc.entity.ProduceLogResultForBucket;
 import org.apache.fluss.server.entity.FetchReqInfo;
@@ -118,7 +119,7 @@ public class AdjustIsrTest extends ReplicaTestBase {
     }
 
     @Test
-    void testSubmitShrinkIsrAsLeaderFenced() throws Exception {
+    void testSubmitShrinkIsrAsLeaderFenced() {
         // replica set is 1,2,3 , isr set is 1,2,3.
         TableBucket tb = new TableBucket(DATA1_TABLE_ID, 1);
         makeLogTableAsLeader(tb, Arrays.asList(1, 2, 3), Arrays.asList(1, 2, 
3), false);
@@ -144,4 +145,34 @@ public class AdjustIsrTest extends ReplicaTestBase {
                 .isInstanceOf(FencedLeaderEpochException.class)
                 .hasMessageContaining("request leader epoch is fenced.");
     }
+
+    @Test
+    void testSubmitShrinkIsrAsServerAlreadyShutdown() {
+        // replica set is 1,2,3 , isr set is 1,2,3.
+        TableBucket tb = new TableBucket(DATA1_TABLE_ID, 1);
+        makeLogTableAsLeader(tb, Arrays.asList(1, 2, 3), Arrays.asList(1, 2, 
3), false);
+
+        Replica replica = replicaManager.getReplicaOrException(tb);
+        assertThat(replica.getIsr()).containsExactlyInAnyOrder(1, 2, 3);
+
+        // To mock we prepare an isr shrink in Replica#maybeShrinkIsr();
+        IsrState.PendingShrinkIsrState pendingShrinkIsrState =
+                replica.prepareIsrShrink(
+                        new IsrState.CommittedIsrState(Arrays.asList(1, 2, 3)),
+                        Arrays.asList(1, 2),
+                        Collections.singletonList(3));
+
+        // Set tabletServer-2 as shutdown tabletServers to mock server already 
shutdown.
+        
testCoordinatorGateway.setShutdownTabletServers(Collections.singleton(2));
+        assertThatThrownBy(
+                        () ->
+                                replica.submitAdjustIsr(pendingShrinkIsrState)
+                                        .get(1, TimeUnit.MINUTES))
+                .rootCause()
+                .isInstanceOf(IneligibleReplicaException.class)
+                .hasMessage(
+                        "Rejecting adjustIsr request for table bucket "
+                                + "TableBucket{tableId=150001, bucket=1} 
because it specified ineligible replicas [2] "
+                                + "in the new ISR LeaderAndIsr{leader=1, 
leaderEpoch=0, isr=[1, 2], coordinatorEpoch=0, bucketEpoch=0}");
+    }
 }

Reply via email to