This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 79d9dbbe5 [server] TabletServer support controlled shutdown (#1159)
79d9dbbe5 is described below
commit 79d9dbbe54d6aec61f113e026cecbbf03287c588
Author: yunhong <[email protected]>
AuthorDate: Sun Sep 21 15:40:24 2025 +0800
[server] TabletServer support controlled shutdown (#1159)
---
.../TabletServerNotAvailableException.java | 27 ++---
.../fluss/rpc/gateway/CoordinatorGateway.java | 7 ++
.../org/apache/fluss/rpc/protocol/ApiKeys.java | 3 +-
fluss-rpc/src/main/proto/FlussApi.proto | 8 ++
.../java/org/apache/fluss/server/ServerBase.java | 1 +
.../server/coordinator/CoordinatorContext.java | 24 ++++-
.../coordinator/CoordinatorEventProcessor.java | 78 ++++++++++++++
.../coordinator/CoordinatorRequestBatch.java | 2 +-
.../server/coordinator/CoordinatorService.java | 17 ++++
.../coordinator/event/ControlledShutdownEvent.java | 50 +++++++++
.../ReplicaLeaderElectionAlgorithms.java | 85 +++++++++++++++-
...hms.java => ReplicaLeaderElectionStrategy.java} | 21 +---
.../statemachine/ReplicaStateMachine.java | 9 +-
.../statemachine/TableBucketStateMachine.java | 112 ++++++++++++---------
.../apache/fluss/server/tablet/TabletServer.java | 70 ++++++++++++-
.../fluss/server/utils/ServerRpcMessageUtils.java | 11 ++
.../apache/fluss/server/zk/data/LeaderAndIsr.java | 29 ++++--
.../coordinator/CoordinatorEventProcessorTest.java | 9 +-
.../server/coordinator/CoordinatorTestUtils.java | 5 +-
.../server/coordinator/TestCoordinatorGateway.java | 8 ++
.../ReplicaLeaderElectionAlgorithmsTest.java | 112 +++++++++++++++++++++
.../statemachine/ReplicaStateMachineTest.java | 4 +-
.../statemachine/TableBucketStateMachineTest.java | 79 ++++++++++++---
...ITCase.java => TabletServerShutdownITCase.java} | 52 ++++++++--
24 files changed, 693 insertions(+), 130 deletions(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java
b/fluss-common/src/main/java/org/apache/fluss/exception/TabletServerNotAvailableException.java
similarity index 52%
copy from
fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java
copy to
fluss-common/src/main/java/org/apache/fluss/exception/TabletServerNotAvailableException.java
index d0f6b1835..320daeff6 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java
+++
b/fluss-common/src/main/java/org/apache/fluss/exception/TabletServerNotAvailableException.java
@@ -15,23 +15,18 @@
* limitations under the License.
*/
-package org.apache.fluss.server.coordinator.statemachine;
+package org.apache.fluss.exception;
-import java.util.List;
-import java.util.Optional;
+import org.apache.fluss.annotation.PublicEvolving;
-/** The algorithms to elect the replica leader. */
-public class ReplicaLeaderElectionAlgorithms {
- public static Optional<Integer> defaultReplicaLeaderElection(
- List<Integer> assignments, List<Integer> aliveReplicas,
List<Integer> isr) {
- // currently, we always use the first replica in assignment, which
also in aliveReplicas and
- // isr as the leader replica.
- for (int assignment : assignments) {
- if (aliveReplicas.contains(assignment) &&
isr.contains(assignment)) {
- return Optional.of(assignment);
- }
- }
-
- return Optional.empty();
+/**
+ * Thrown when the tabletServer is not available.
+ *
+ * @since 0.8
+ */
+@PublicEvolving
+public class TabletServerNotAvailableException extends ApiException {
+ public TabletServerNotAvailableException(String message) {
+ super(message);
}
}
diff --git
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java
index a089a324e..a78c9bdbd 100644
---
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java
+++
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java
@@ -26,6 +26,8 @@ import
org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestRequest;
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse;
+import org.apache.fluss.rpc.messages.ControlledShutdownRequest;
+import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
import org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest;
import org.apache.fluss.rpc.messages.LakeTieringHeartbeatResponse;
import org.apache.fluss.rpc.protocol.ApiKeys;
@@ -78,4 +80,9 @@ public interface CoordinatorGateway extends RpcGateway,
AdminGateway {
@RPC(api = ApiKeys.LAKE_TIERING_HEARTBEAT)
CompletableFuture<LakeTieringHeartbeatResponse> lakeTieringHeartbeat(
LakeTieringHeartbeatRequest request);
+
+ /** Try to controlled shutdown for tabletServer with specify
tabletServerId. */
+ @RPC(api = ApiKeys.CONTROLLED_SHUTDOWN)
+ CompletableFuture<ControlledShutdownResponse> controlledShutdown(
+ ControlledShutdownRequest request);
}
diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
index 0bc2d494a..8526581ae 100644
--- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
+++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java
@@ -70,7 +70,8 @@ public enum ApiKeys {
CREATE_ACLS(1039, 0, 0, PUBLIC),
LIST_ACLS(1040, 0, 0, PUBLIC),
DROP_ACLS(1041, 0, 0, PUBLIC),
- LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE);
+ LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE),
+ CONTROLLED_SHUTDOWN(1043, 0, 0, PRIVATE);
private static final Map<Integer, ApiKeys> ID_TO_TYPE =
Arrays.stream(ApiKeys.values())
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto
b/fluss-rpc/src/main/proto/FlussApi.proto
index 19e401095..200b8b520 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -532,6 +532,14 @@ message LakeTieringHeartbeatResponse {
repeated PbHeartbeatRespForTable failed_table_resp = 5;
}
+message ControlledShutdownRequest {
+ required int32 tablet_server_id = 1;
+ required int32 tablet_server_epoch = 2;
+}
+
+message ControlledShutdownResponse {
+ repeated PbTableBucket remaining_leader_buckets = 1;
+}
// --------------- Inner classes ----------------
message PbApiVersion {
diff --git a/fluss-server/src/main/java/org/apache/fluss/server/ServerBase.java
b/fluss-server/src/main/java/org/apache/fluss/server/ServerBase.java
index 99c1e105f..4acbafd09 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/ServerBase.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/ServerBase.java
@@ -115,6 +115,7 @@ public abstract class ServerBase implements
AutoCloseableAsync, FatalErrorHandle
public void start() throws Exception {
try {
addShutDownHook();
+
// at first, we need to initialize the file system
pluginManager =
PluginUtils.createPluginManagerFromRootFolder(conf);
FileSystem.initialize(conf, pluginManager);
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
index aa372f56a..4cb988967 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
@@ -67,6 +67,7 @@ public class CoordinatorContext {
private final Map<TableBucketReplica, Integer> failDeleteNumbers = new
HashMap<>();
private final Map<Integer, ServerInfo> liveTabletServers = new HashMap<>();
+ private final Set<Integer> shuttingDownTabletServers = new HashSet<>();
// a map from the table bucket to the state of the bucket.
private final Map<TableBucket, BucketState> bucketStates = new HashMap<>();
@@ -114,6 +115,24 @@ public class CoordinatorContext {
return liveTabletServers;
}
+ public Set<Integer> liveTabletServerSet() {
+ Set<Integer> liveTabletServers = new HashSet<>();
+ for (Integer brokerId : this.liveTabletServers.keySet()) {
+ if (!shuttingDownTabletServers.contains(brokerId)) {
+ liveTabletServers.add(brokerId);
+ }
+ }
+ return liveTabletServers;
+ }
+
+ public Set<Integer> shuttingDownTabletServers() {
+ return shuttingDownTabletServers;
+ }
+
+ public Set<Integer> liveOrShuttingDownTabletServers() {
+ return liveTabletServers.keySet();
+ }
+
@VisibleForTesting
public void setLiveTabletServers(List<ServerInfo> servers) {
liveTabletServers.clear();
@@ -136,8 +155,8 @@ public class CoordinatorContext {
this.liveTabletServers.remove(serverId);
}
- public boolean isReplicaAndServerOnline(int serverId, TableBucket
tableBucket) {
- return liveTabletServers.containsKey(serverId)
+ public boolean isReplicaOnline(int serverId, TableBucket tableBucket) {
+ return liveTabletServerSet().contains(serverId)
&& !replicasOnOffline
.getOrDefault(serverId, Collections.emptySet())
.contains(tableBucket);
@@ -636,5 +655,6 @@ public class CoordinatorContext {
clearTablesState();
// clear the live tablet servers
liveTabletServers.clear();
+ shuttingDownTabletServers.clear();
}
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
index 66d52d9fc..bab477d20 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
@@ -27,6 +27,7 @@ import org.apache.fluss.exception.FencedLeaderEpochException;
import org.apache.fluss.exception.FlussRuntimeException;
import org.apache.fluss.exception.InvalidCoordinatorException;
import org.apache.fluss.exception.InvalidUpdateVersionException;
+import org.apache.fluss.exception.TabletServerNotAvailableException;
import org.apache.fluss.exception.UnknownTableOrBucketException;
import org.apache.fluss.metadata.PhysicalTablePath;
import org.apache.fluss.metadata.TableBucket;
@@ -38,6 +39,7 @@ import org.apache.fluss.rpc.messages.AdjustIsrResponse;
import org.apache.fluss.rpc.messages.CommitKvSnapshotResponse;
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse;
+import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
import org.apache.fluss.rpc.messages.PbCommitLakeTableSnapshotRespForTable;
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.server.coordinator.event.AccessContextEvent;
@@ -45,6 +47,7 @@ import
org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent;
import org.apache.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent;
import org.apache.fluss.server.coordinator.event.CommitRemoteLogManifestEvent;
+import org.apache.fluss.server.coordinator.event.ControlledShutdownEvent;
import org.apache.fluss.server.coordinator.event.CoordinatorEvent;
import org.apache.fluss.server.coordinator.event.CoordinatorEventManager;
import org.apache.fluss.server.coordinator.event.CreatePartitionEvent;
@@ -72,6 +75,7 @@ import
org.apache.fluss.server.kv.snapshot.CompletedSnapshotStore;
import org.apache.fluss.server.metadata.CoordinatorMetadataCache;
import org.apache.fluss.server.metadata.ServerInfo;
import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
+import org.apache.fluss.server.utils.ServerRpcMessageUtils;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.data.BucketAssignment;
import org.apache.fluss.server.zk.data.LakeTableSnapshot;
@@ -104,6 +108,7 @@ import java.util.stream.Collectors;
import static
org.apache.fluss.server.coordinator.statemachine.BucketState.OfflineBucket;
import static
org.apache.fluss.server.coordinator.statemachine.BucketState.OnlineBucket;
+import static
org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionStrategy.CONTROLLED_SHUTDOWN_ELECTION;
import static
org.apache.fluss.server.coordinator.statemachine.ReplicaState.OfflineReplica;
import static
org.apache.fluss.server.coordinator.statemachine.ReplicaState.OnlineReplica;
import static
org.apache.fluss.server.coordinator.statemachine.ReplicaState.ReplicaDeletionStarted;
@@ -529,6 +534,11 @@ public class CoordinatorEventProcessor implements
EventProcessor {
completeFromCallable(
commitLakeTableSnapshotEvent.getRespCallback(),
() ->
tryProcessCommitLakeTableSnapshot(commitLakeTableSnapshotEvent));
+ } else if (event instanceof ControlledShutdownEvent) {
+ ControlledShutdownEvent controlledShutdownEvent =
(ControlledShutdownEvent) event;
+ completeFromCallable(
+ controlledShutdownEvent.getRespCallback(),
+ () ->
tryProcessControlledShutdown(controlledShutdownEvent));
} else if (event instanceof AccessContextEvent) {
AccessContextEvent<?> accessContextEvent = (AccessContextEvent<?>)
event;
processAccessContext(accessContextEvent);
@@ -865,6 +875,7 @@ public class CoordinatorEventProcessor implements
EventProcessor {
LOG.info("Tablet server failure callback for {}.", tabletServerId);
coordinatorContext.removeOfflineBucketInServer(tabletServerId);
coordinatorContext.removeLiveTabletServer(tabletServerId);
+ coordinatorContext.shuttingDownTabletServers().remove(tabletServerId);
coordinatorChannelManager.removeTabletServer(tabletServerId);
// Here, we will first update alive tabletServer info for all
tabletServers and
@@ -1165,6 +1176,73 @@ public class CoordinatorEventProcessor implements
EventProcessor {
return response;
}
+ private ControlledShutdownResponse tryProcessControlledShutdown(
+ ControlledShutdownEvent controlledShutdownEvent) {
+ ControlledShutdownResponse response = new ControlledShutdownResponse();
+
+ // TODO here we need to check tabletServerEpoch, avoid to receive
controlled shutdown
+ // request from an old tabletServer. Trace by
https://github.com/alibaba/fluss/issues/1153
+ int tabletServerEpoch = controlledShutdownEvent.getTabletServerEpoch();
+
+ int tabletServerId = controlledShutdownEvent.getTabletServerId();
+ LOG.info(
+ "Try to process controlled shutdown for tabletServer: {} of
tabletServer epoch: {}",
+ controlledShutdownEvent.getTabletServerId(),
+ tabletServerEpoch);
+
+ if
(!coordinatorContext.liveOrShuttingDownTabletServers().contains(tabletServerId))
{
+ throw new TabletServerNotAvailableException(
+ "TabletServer" + tabletServerId + " is not available.");
+ }
+
+ coordinatorContext.shuttingDownTabletServers().add(tabletServerId);
+ LOG.debug(
+ "All shutting down tabletServers: {}",
+ coordinatorContext.shuttingDownTabletServers());
+ LOG.debug("All live tabletServers: {}",
coordinatorContext.liveTabletServerSet());
+
+ List<TableBucketReplica> replicasToActOn =
+
coordinatorContext.replicasOnTabletServer(tabletServerId).stream()
+ .filter(
+ replica -> {
+ TableBucket tableBucket =
replica.getTableBucket();
+ return
!coordinatorContext.getAssignment(tableBucket).isEmpty()
+ && coordinatorContext
+
.getBucketLeaderAndIsr(tableBucket)
+ .isPresent()
+ &&
!coordinatorContext.isToBeDeleted(tableBucket);
+ })
+ .collect(Collectors.toList());
+
+ Set<TableBucket> bucketsLedByServer = new HashSet<>();
+ Set<TableBucketReplica> replicasFollowedByServer = new HashSet<>();
+ for (TableBucketReplica replica : replicasToActOn) {
+ TableBucket tableBucket = replica.getTableBucket();
+ if (replica.getReplica()
+ ==
coordinatorContext.getBucketLeaderAndIsr(tableBucket).get().leader()) {
+ bucketsLedByServer.add(tableBucket);
+ } else {
+ replicasFollowedByServer.add(replica);
+ }
+ }
+
+ tableBucketStateMachine.handleStateChange(
+ bucketsLedByServer, OnlineBucket,
CONTROLLED_SHUTDOWN_ELECTION);
+
+ // TODO need send stop request to the leader?
+
+ // If the tabletServer is a follower, updates the isr in ZK and
notifies the current leader.
+ replicaStateMachine.handleStateChanges(replicasFollowedByServer,
OfflineReplica);
+
+ // Return the list of buckets that are still being managed by the
controlled shutdown
+ // tabletServer after leader migration.
+ response.addAllRemainingLeaderBuckets(
+
coordinatorContext.getBucketsWithLeaderIn(tabletServerId).stream()
+ .map(ServerRpcMessageUtils::fromTableBucket)
+ .collect(Collectors.toList()));
+ return response;
+ }
+
private void validateFencedEvent(FencedCoordinatorEvent event) {
TableBucket tb = event.getTableBucket();
if (coordinatorContext.getTablePathById(tb.getTableId()) == null) {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
index 504b2b439..de6778d35 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java
@@ -211,7 +211,7 @@ public class CoordinatorRequestBatch {
List<Integer> bucketReplicas,
LeaderAndIsr leaderAndIsr) {
tabletServers.stream()
- .filter(s -> s >= 0)
+ .filter(s -> s >= 0 &&
!coordinatorContext.shuttingDownTabletServers().contains(s))
.forEach(
id -> {
Map<TableBucket, PbNotifyLeaderAndIsrReqForBucket>
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 5f87937a2..dd3eb62ab 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -49,6 +49,8 @@ import
org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestRequest;
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse;
+import org.apache.fluss.rpc.messages.ControlledShutdownRequest;
+import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
import org.apache.fluss.rpc.messages.CreateAclsRequest;
import org.apache.fluss.rpc.messages.CreateAclsResponse;
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
@@ -86,6 +88,7 @@ import
org.apache.fluss.server.coordinator.event.AdjustIsrReceivedEvent;
import org.apache.fluss.server.coordinator.event.CommitKvSnapshotEvent;
import org.apache.fluss.server.coordinator.event.CommitLakeTableSnapshotEvent;
import org.apache.fluss.server.coordinator.event.CommitRemoteLogManifestEvent;
+import org.apache.fluss.server.coordinator.event.ControlledShutdownEvent;
import org.apache.fluss.server.coordinator.event.EventManager;
import org.apache.fluss.server.entity.CommitKvSnapshotData;
import org.apache.fluss.server.entity.LakeTieringTableInfo;
@@ -575,6 +578,20 @@ public final class CoordinatorService extends
RpcServiceBase implements Coordina
return CompletableFuture.completedFuture(heartbeatResponse);
}
+ @Override
+ public CompletableFuture<ControlledShutdownResponse> controlledShutdown(
+ ControlledShutdownRequest request) {
+ CompletableFuture<ControlledShutdownResponse> response = new
CompletableFuture<>();
+ eventManagerSupplier
+ .get()
+ .put(
+ new ControlledShutdownEvent(
+ request.getTabletServerId(),
+ request.getTabletServerEpoch(),
+ response));
+ return response;
+ }
+
private void validateHeartbeatRequest(
PbHeartbeatReqForTable heartbeatReqForTable, int currentEpoch) {
if (heartbeatReqForTable.getCoordinatorEpoch() != currentEpoch) {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/ControlledShutdownEvent.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/ControlledShutdownEvent.java
new file mode 100644
index 000000000..7f93d0805
--- /dev/null
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/event/ControlledShutdownEvent.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.event;
+
+import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
+
+import java.util.concurrent.CompletableFuture;
+
+/** An event for controlled shutdown of TabletServer. */
+public class ControlledShutdownEvent implements CoordinatorEvent {
+ private final int tabletServerId;
+ private final int tabletServerEpoch;
+ private final CompletableFuture<ControlledShutdownResponse> respCallback;
+
+ public ControlledShutdownEvent(
+ int tabletServerId,
+ int tabletServerEpoch,
+ CompletableFuture<ControlledShutdownResponse> respCallback) {
+ this.tabletServerId = tabletServerId;
+ this.tabletServerEpoch = tabletServerEpoch;
+ this.respCallback = respCallback;
+ }
+
+ public int getTabletServerId() {
+ return tabletServerId;
+ }
+
+ public int getTabletServerEpoch() {
+ return tabletServerEpoch;
+ }
+
+ public CompletableFuture<ControlledShutdownResponse> getRespCallback() {
+ return respCallback;
+ }
+}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java
index d0f6b1835..c7c1aa07a 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java
@@ -17,21 +17,100 @@
package org.apache.fluss.server.coordinator.statemachine;
+import
org.apache.fluss.server.coordinator.statemachine.TableBucketStateMachine.ElectionResult;
+import org.apache.fluss.server.zk.data.LeaderAndIsr;
+
+import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/** The algorithms to elect the replica leader. */
public class ReplicaLeaderElectionAlgorithms {
- public static Optional<Integer> defaultReplicaLeaderElection(
- List<Integer> assignments, List<Integer> aliveReplicas,
List<Integer> isr) {
+
+ /**
+ * Init replica leader election when the bucket is new created.
+ *
+ * @param assignments the assignments
+ * @param aliveReplicas the alive replicas
+ * @param coordinatorEpoch the coordinator epoch
+ * @return the election result
+ */
+ public static Optional<ElectionResult> initReplicaLeaderElection(
+ List<Integer> assignments, List<Integer> aliveReplicas, int
coordinatorEpoch) {
+ // currently, we always use the first replica in assignment, which
also in aliveReplicas and
+ // isr as the leader replica.
+ for (int assignment : assignments) {
+ if (aliveReplicas.contains(assignment)) {
+ return Optional.of(
+ new ElectionResult(
+ aliveReplicas,
+ new LeaderAndIsr(
+ assignment, 0, aliveReplicas,
coordinatorEpoch, 0)));
+ }
+ }
+
+ return Optional.empty();
+ }
+
+ /**
+ * Default replica leader election, like electing leader while leader
offline.
+ *
+ * @param assignments the assignments
+ * @param aliveReplicas the alive replicas
+ * @param leaderAndIsr the original leaderAndIsr
+ * @return the election result
+ */
+ public static Optional<ElectionResult> defaultReplicaLeaderElection(
+ List<Integer> assignments, List<Integer> aliveReplicas,
LeaderAndIsr leaderAndIsr) {
// currently, we always use the first replica in assignment, which
also in aliveReplicas and
// isr as the leader replica.
+ List<Integer> isr = leaderAndIsr.isr();
for (int assignment : assignments) {
if (aliveReplicas.contains(assignment) &&
isr.contains(assignment)) {
- return Optional.of(assignment);
+ return Optional.of(
+ new ElectionResult(
+ aliveReplicas,
leaderAndIsr.newLeaderAndIsr(assignment, isr)));
}
}
return Optional.empty();
}
+
+ /**
+ * Controlled shutdown replica leader election.
+ *
+ * @param assignments the assignments
+ * @param aliveReplicas the alive replicas
+ * @param leaderAndIsr the original leaderAndIsr
+ * @param shutdownTabletServers the shutdown tabletServers
+ * @return the election result
+ */
+ public static Optional<ElectionResult>
controlledShutdownReplicaLeaderElection(
+ List<Integer> assignments,
+ List<Integer> aliveReplicas,
+ LeaderAndIsr leaderAndIsr,
+ Set<Integer> shutdownTabletServers) {
+ List<Integer> originIsr = leaderAndIsr.isr();
+ Set<Integer> isrSet = new HashSet<>(originIsr);
+ for (Integer id : assignments) {
+ if (aliveReplicas.contains(id)
+ && isrSet.contains(id)
+ && !shutdownTabletServers.contains(id)) {
+ Set<Integer> newAliveReplicas = new HashSet<>(aliveReplicas);
+ newAliveReplicas.removeAll(shutdownTabletServers);
+ List<Integer> newIsr =
+ originIsr.stream()
+ .filter(replica ->
!shutdownTabletServers.contains(replica))
+ .collect(Collectors.toList());
+ return Optional.of(
+ new ElectionResult(
+ new ArrayList<>(newAliveReplicas),
+ leaderAndIsr.newLeaderAndIsr(id, newIsr)));
+ }
+ }
+ return Optional.empty();
+ }
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionStrategy.java
similarity index 55%
copy from
fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java
copy to
fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionStrategy.java
index d0f6b1835..faff47a42 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithms.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionStrategy.java
@@ -17,21 +17,8 @@
package org.apache.fluss.server.coordinator.statemachine;
-import java.util.List;
-import java.util.Optional;
-
-/** The algorithms to elect the replica leader. */
-public class ReplicaLeaderElectionAlgorithms {
- public static Optional<Integer> defaultReplicaLeaderElection(
- List<Integer> assignments, List<Integer> aliveReplicas,
List<Integer> isr) {
- // currently, we always use the first replica in assignment, which
also in aliveReplicas and
- // isr as the leader replica.
- for (int assignment : assignments) {
- if (aliveReplicas.contains(assignment) &&
isr.contains(assignment)) {
- return Optional.of(assignment);
- }
- }
-
- return Optional.empty();
- }
+/** The strategies to elect the replica leader. */
+public enum ReplicaLeaderElectionStrategy {
+ DEFAULT_ELECTION,
+ CONTROLLED_SHUTDOWN_ELECTION
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java
index be77a43e5..235f4151d 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java
@@ -107,7 +107,7 @@ public class ReplicaStateMachine {
for (Integer replica : replicas) {
TableBucketReplica tableBucketReplica =
new TableBucketReplica(tableBucket, replica);
- if (coordinatorContext.isReplicaAndServerOnline(replica,
tableBucket)) {
+ if (coordinatorContext.isReplicaOnline(replica, tableBucket)) {
coordinatorContext.putReplicaState(
tableBucketReplica, ReplicaState.OnlineReplica);
onlineReplicas.add(tableBucketReplica);
@@ -419,7 +419,7 @@ public class ReplicaStateMachine {
TableBucket tableBucket = tableBucketReplica.getTableBucket();
int replicaId = tableBucketReplica.getReplica();
- LeaderAndIsr leaderAndIsr = null;
+ LeaderAndIsr leaderAndIsr;
if (toUpdateLeaderAndIsrList.get(tableBucket) != null) {
leaderAndIsr = toUpdateLeaderAndIsrList.get(tableBucket);
} else {
@@ -451,7 +451,10 @@ public class ReplicaStateMachine {
: leaderAndIsr.isr().stream()
.filter(id -> id != replicaId)
.collect(Collectors.toList());
- LeaderAndIsr adjustLeaderAndIsr =
leaderAndIsr.newLeaderAndIsr(newLeader, newIsr);
+ LeaderAndIsr adjustLeaderAndIsr =
+ newLeader == LeaderAndIsr.NO_LEADER
+ ? leaderAndIsr.newLeaderAndIsr(newLeader, newIsr)
+ : leaderAndIsr.newLeaderAndIsr(newIsr);
adjustedLeaderAndIsr.put(tableBucketReplica, adjustLeaderAndIsr);
toUpdateLeaderAndIsrList.put(tableBucket, adjustLeaderAndIsr);
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java
index d1b9958cb..33e278f44 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java
@@ -40,6 +40,12 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
+import static
org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionAlgorithms.controlledShutdownReplicaLeaderElection;
+import static
org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionAlgorithms.defaultReplicaLeaderElection;
+import static
org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionAlgorithms.initReplicaLeaderElection;
+import static
org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionStrategy.CONTROLLED_SHUTDOWN_ELECTION;
+import static
org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionStrategy.DEFAULT_ELECTION;
+
/* This file is based on source code of Apache Kafka Project
(https://kafka.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
* additional information regarding copyright ownership. */
@@ -85,7 +91,7 @@ public class TableBucketStateMachine {
.map(
leaderAndIsr -> {
// ONLINE if the leader is alive,
otherwise, it's OFFLINE
- if
(coordinatorContext.isReplicaAndServerOnline(
+ if (coordinatorContext.isReplicaOnline(
leaderAndIsr.leader(),
tableBucket)) {
return BucketState.OnlineBucket;
} else {
@@ -115,6 +121,13 @@ public class TableBucketStateMachine {
}
public void handleStateChange(Set<TableBucket> tableBuckets, BucketState
targetState) {
+ handleStateChange(tableBuckets, targetState, DEFAULT_ELECTION);
+ }
+
+ public void handleStateChange(
+ Set<TableBucket> tableBuckets,
+ BucketState targetState,
+ ReplicaLeaderElectionStrategy replicaLeaderElectionStrategy) {
try {
coordinatorRequestBatch.newBatch();
@@ -123,7 +136,7 @@ public class TableBucketStateMachine {
batchHandleOnlineChangeAndInitLeader(tableBuckets);
} else {
for (TableBucket tableBucket : tableBuckets) {
- doHandleStateChange(tableBucket, targetState);
+ doHandleStateChange(tableBucket, targetState,
replicaLeaderElectionStrategy);
}
}
coordinatorRequestBatch.sendRequestToTabletServers(
@@ -175,8 +188,12 @@ public class TableBucketStateMachine {
*
* @param tableBucket The table bucket that is to do state change
* @param targetState the target state that is to change to
+ * @param replicaLeaderElectionStrategy the strategy to choose a new leader
*/
- private void doHandleStateChange(TableBucket tableBucket, BucketState
targetState) {
+ private void doHandleStateChange(
+ TableBucket tableBucket,
+ BucketState targetState,
+ ReplicaLeaderElectionStrategy replicaLeaderElectionStrategy) {
coordinatorContext.putBucketStateIfNotExists(tableBucket,
BucketState.NonExistentBucket);
if (!checkValidTableBucketStateChange(tableBucket, targetState)) {
return;
@@ -224,7 +241,8 @@ public class TableBucketStateMachine {
// current state is Online or Offline
// not new bucket, we then need to update leader/epoch for
the bucket
Optional<ElectionResult> optionalElectionResult =
- electNewLeaderForTableBuckets(tableBucket);
+ electNewLeaderForTableBuckets(
+ tableBucket,
replicaLeaderElectionStrategy);
if (!optionalElectionResult.isPresent()) {
logFailedStateChange(tableBucket, currentState,
targetState);
} else {
@@ -389,10 +407,7 @@ public class TableBucketStateMachine {
// filter out the live servers
List<Integer> liveServers =
assignedServers.stream()
- .filter(
- (server) ->
-
coordinatorContext.isReplicaAndServerOnline(
- server, tableBucket))
+ .filter((server) ->
coordinatorContext.isReplicaOnline(server, tableBucket))
.collect(Collectors.toList());
// todo, consider this case, may reassign with other servers?
if (liveServers.isEmpty()) {
@@ -413,23 +428,16 @@ public class TableBucketStateMachine {
}
// For the case that the table bucket has been initialized, we use all
the live assigned
// servers as inSyncReplica set.
- List<Integer> isr = liveServers;
- Optional<Integer> leaderOpt =
- ReplicaLeaderElectionAlgorithms.defaultReplicaLeaderElection(
- assignedServers, liveServers, isr);
- if (!leaderOpt.isPresent()) {
+ Optional<ElectionResult> resultOpt =
+ initReplicaLeaderElection(
+ assignedServers, liveServers,
coordinatorContext.getCoordinatorEpoch());
+ if (!resultOpt.isPresent()) {
LOG.error(
"The leader election for table bucket {} is empty.",
stringifyBucket(tableBucket));
return Optional.empty();
}
- int leader = leaderOpt.get();
-
- // Register the initial leader and isr.
- LeaderAndIsr leaderAndIsr =
- new LeaderAndIsr(leader, 0, isr,
coordinatorContext.getCoordinatorEpoch(), 0);
-
- return Optional.of(new ElectionResult(liveServers, leaderAndIsr));
+ return resultOpt;
}
private List<RegisterTableBucketLeadAndIsrInfo>
tryRegisterLeaderAndIsrOneByOne(
@@ -449,7 +457,8 @@ public class TableBucketStateMachine {
return registerSuccessList;
}
- private Optional<ElectionResult> electNewLeaderForTableBuckets(TableBucket
tableBucket) {
+ private Optional<ElectionResult> electNewLeaderForTableBuckets(
+ TableBucket tableBucket, ReplicaLeaderElectionStrategy
electionStrategy) {
LeaderAndIsr leaderAndIsr;
try {
leaderAndIsr = zooKeeperClient.getLeaderAndIsr(tableBucket).get();
@@ -469,7 +478,7 @@ public class TableBucketStateMachine {
}
// re-election
Optional<ElectionResult> optionalElectionResult =
- leaderForOffline(tableBucket, leaderAndIsr);
+ electLeader(tableBucket, leaderAndIsr, electionStrategy);
if (!optionalElectionResult.isPresent()) {
LOG.error(
"The result of elect leader for table bucket {} is empty.",
@@ -564,19 +573,24 @@ public class TableBucketStateMachine {
}
/**
- * Elect a new leader for new or offline bucket, it'll always elect one
from the live replicas
- * in isr set.
+ * Elect a new leader for bucket, it'll always elect one from the live
replicas in isr set.
+ *
+ * <p>The elect cases including:
+ *
+ * <ol>
+ * <li>new or offline bucket
+ * <li>tabletServer controlled shutdown
+ * </ol>
*/
- private Optional<ElectionResult> leaderForOffline(
- TableBucket tableBucket, LeaderAndIsr leaderAndIsr) {
+ private Optional<ElectionResult> electLeader(
+ TableBucket tableBucket,
+ LeaderAndIsr leaderAndIsr,
+ ReplicaLeaderElectionStrategy electionStrategy) {
List<Integer> assignment =
coordinatorContext.getAssignment(tableBucket);
// filter out the live servers
List<Integer> liveReplicas =
assignment.stream()
- .filter(
- replica ->
-
coordinatorContext.isReplicaAndServerOnline(
- replica, tableBucket))
+ .filter(replica ->
coordinatorContext.isReplicaOnline(replica, tableBucket))
.collect(Collectors.toList());
// we'd like use the first live replica as the new leader
if (liveReplicas.isEmpty()) {
@@ -584,29 +598,27 @@ public class TableBucketStateMachine {
return Optional.empty();
}
- Optional<Integer> leaderOpt =
- ReplicaLeaderElectionAlgorithms.defaultReplicaLeaderElection(
- assignment, liveReplicas, leaderAndIsr.isr());
- if (!leaderOpt.isPresent()) {
+ Optional<ElectionResult> resultOpt = Optional.empty();
+ if (electionStrategy == DEFAULT_ELECTION) {
+ resultOpt = defaultReplicaLeaderElection(assignment, liveReplicas,
leaderAndIsr);
+ } else if (electionStrategy == CONTROLLED_SHUTDOWN_ELECTION) {
+ Set<Integer> shuttingDownTabletServers =
coordinatorContext.shuttingDownTabletServers();
+ resultOpt =
+ controlledShutdownReplicaLeaderElection(
+ assignment, liveReplicas, leaderAndIsr,
shuttingDownTabletServers);
+ }
+
+ if (!resultOpt.isPresent()) {
LOG.error(
"The leader election for table bucket {} is empty.",
stringifyBucket(tableBucket));
return Optional.empty();
}
-
- // get the updated leader and isr
- LeaderAndIsr newLeaderAndIsr =
- new LeaderAndIsr(
- leaderOpt.get(),
- leaderAndIsr.leaderEpoch() + 1,
- leaderAndIsr.isr(),
- coordinatorContext.getCoordinatorEpoch(),
- leaderAndIsr.bucketEpoch() + 1);
-
- return Optional.of(new ElectionResult(liveReplicas, newLeaderAndIsr));
+ return resultOpt;
}
- private static class ElectionResult {
+ /** The result of leader election. */
+ public static class ElectionResult {
private final List<Integer> liveReplicas;
private final LeaderAndIsr leaderAndIsr;
@@ -614,5 +626,13 @@ public class TableBucketStateMachine {
this.liveReplicas = liveReplicas;
this.leaderAndIsr = leaderAndIsr;
}
+
+ public List<Integer> getLiveReplicas() {
+ return liveReplicas;
+ }
+
+ public LeaderAndIsr getLeaderAndIsr() {
+ return leaderAndIsr;
+ }
}
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
index 0e593c2c8..63121dbc3 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
@@ -24,11 +24,14 @@ import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.IllegalConfigurationException;
import org.apache.fluss.exception.InvalidServerRackInfoException;
+import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metrics.registry.MetricRegistry;
import org.apache.fluss.rpc.GatewayClientProxy;
import org.apache.fluss.rpc.RpcClient;
import org.apache.fluss.rpc.RpcServer;
import org.apache.fluss.rpc.gateway.CoordinatorGateway;
+import org.apache.fluss.rpc.messages.ControlledShutdownRequest;
+import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
import org.apache.fluss.rpc.netty.server.RequestsMetrics;
import org.apache.fluss.server.ServerBase;
@@ -68,6 +71,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.fluss.config.ConfigOptions.BACKGROUND_THREADS;
+import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.toTableBucket;
/**
* Tablet server implementation. The tablet server is responsible to manage
the log tablet and kv
@@ -79,6 +83,10 @@ public class TabletServer extends ServerBase {
private static final Logger LOG =
LoggerFactory.getLogger(TabletServer.class);
+ // TODO, maybe need to make it configurable
+ private static final int CONTROLLED_SHUTDOWN_MAX_RETRIES = 3;
+ private static final long CONTROLLED_SHUTDOWN_RETRY_INTERVAL_MS = 1000L;
+
private final int serverId;
/**
@@ -144,6 +152,9 @@ public class TabletServer extends ServerBase {
@Nullable
private Authorizer authorizer;
+ @GuardedBy("lock")
+ private CoordinatorGateway coordinatorGateway;
+
public TabletServer(Configuration conf) {
this(conf, SystemClock.getInstance());
}
@@ -206,7 +217,7 @@ public class TabletServer extends ServerBase {
new ClientMetricGroup(metricRegistry, SERVER_NAME + "-" +
serverId);
this.rpcClient = RpcClient.create(conf, clientMetricGroup, true);
- CoordinatorGateway coordinatorGateway =
+ this.coordinatorGateway =
GatewayClientProxy.createGatewayProxy(
() ->
metadataCache.getCoordinatorServer(interListenerName),
rpcClient,
@@ -261,6 +272,9 @@ public class TabletServer extends ServerBase {
@Override
protected CompletableFuture<Result> closeAsync(Result result) {
if (isShutDown.compareAndSet(false, true)) {
+
+ controlledShutDown();
+
CompletableFuture<Void> serviceShutdownFuture = stopServices();
serviceShutdownFuture.whenComplete(
@@ -408,6 +422,60 @@ public class TabletServer extends ServerBase {
}
}
+ private void controlledShutDown() {
+ LOG.info("Starting controlled shutdown.");
+
+ // We request the CoordinatorServer to do a controlled shutdown. On
failure, we backoff for
+ // a period of time and try again for a number of retries. If all the
attempt fails, we
+ // simply force the shutdown.
+ boolean shutdownSucceeded = false;
+ int remainingRetries = CONTROLLED_SHUTDOWN_MAX_RETRIES;
+ while (!shutdownSucceeded && remainingRetries > 0) {
+ remainingRetries--;
+
+ ControlledShutdownRequest controlledShutdownRequest =
+ new ControlledShutdownRequest()
+ .setTabletServerId(serverId)
+ .setTabletServerEpoch(-1); // TODO, set correct
tabletServer epoch.
+ try {
+ ControlledShutdownResponse response =
+
coordinatorGateway.controlledShutdown(controlledShutdownRequest).get();
+ if (response.getRemainingLeaderBucketsCount() > 0) {
+ List<TableBucket> remainingLeaderBuckets = new
ArrayList<>();
+ response.getRemainingLeaderBucketsList()
+ .forEach(
+ pbTableBucket ->
+ remainingLeaderBuckets.add(
+
toTableBucket(pbTableBucket)));
+ LOG.warn(
+ "TabletServer {} is still the leader for the
following buckets: {} after Controlled Shutdown",
+ serverId,
+ remainingLeaderBuckets);
+ } else {
+ shutdownSucceeded = true;
+ }
+ } catch (Exception e) {
+ LOG.warn("Failed to do controlled shutdown: {}",
e.getMessage());
+ // do nothing and retry.
+ }
+
+ if (!shutdownSucceeded && remainingRetries > 0) {
+ try {
+ Thread.sleep(CONTROLLED_SHUTDOWN_RETRY_INTERVAL_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ LOG.info("Retrying controlled shutdown ({} retries
remaining).", remainingRetries);
+ }
+ }
+
+ if (!shutdownSucceeded) {
+ LOG.warn(
+ "Proceeding to do an unclean shutdown as all the
controlled shutdown attempts failed.");
+ }
+ }
+
@Override
protected String getServerName() {
return SERVER_NAME;
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
index 823c2bb52..88126f266 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java
@@ -218,6 +218,17 @@ public class ServerRpcMessageUtils {
protoTableBucket.getBucketId());
}
+ public static PbTableBucket fromTableBucket(TableBucket tableBucket) {
+ PbTableBucket pbTableBucket =
+ new PbTableBucket()
+ .setTableId(tableBucket.getTableId())
+ .setBucketId(tableBucket.getBucket());
+ if (tableBucket.getPartitionId() != null) {
+ pbTableBucket.setPartitionId(tableBucket.getPartitionId());
+ }
+ return pbTableBucket;
+ }
+
public static ServerNode toServerNode(PbServerNode pbServerNode,
ServerType serverType) {
return new ServerNode(
pbServerNode.getNodeId(),
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LeaderAndIsr.java
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LeaderAndIsr.java
index 6b6f8bc7a..251248138 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LeaderAndIsr.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/LeaderAndIsr.java
@@ -70,8 +70,27 @@ public class LeaderAndIsr {
this.bucketEpoch = bucketEpoch;
}
+ /**
+ * Create a new LeaderAndIsr with the given leader and isr, which means
the leader changes.
+ *
+ * @param newLeader the new leader replica id
+ * @param newIsr the new isr
+ * @return the new LeaderAndIsr
+ */
public LeaderAndIsr newLeaderAndIsr(int newLeader, List<Integer> newIsr) {
- return new LeaderAndIsr(newLeader, leaderEpoch, newIsr,
coordinatorEpoch, bucketEpoch + 1);
+ return new LeaderAndIsr(
+ newLeader, leaderEpoch + 1, newIsr, coordinatorEpoch,
bucketEpoch + 1);
+ }
+
+ /**
+ * Create a new LeaderAndIsr with the given isr, which means only the isr
changes, but the
+ * leader remains the same.
+ *
+ * @param newIsr the new isr
+ * @return the new LeaderAndIsr
+ */
+ public LeaderAndIsr newLeaderAndIsr(List<Integer> newIsr) {
+ return new LeaderAndIsr(leader, leaderEpoch, newIsr, coordinatorEpoch,
bucketEpoch + 1);
}
public int leader() {
@@ -98,14 +117,6 @@ public class LeaderAndIsr {
return bucketEpoch;
}
- public boolean equalsAllowStalePartitionEpoch(LeaderAndIsr other) {
- return leader == other.leader
- && leaderEpoch == other.leaderEpoch
- && coordinatorEpoch == other.coordinatorEpoch
- && isr.equals(other.isr)
- && bucketEpoch <= other.bucketEpoch;
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
index 09a833e41..79047d8b7 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java
@@ -315,8 +315,7 @@ class CoordinatorEventProcessorTest {
client.registerTabletServer(newlyServerId, tabletServerRegistration);
// retry until the tablet server register event is been handled
- retryVerifyContext(
- ctx ->
assertThat(ctx.getLiveTabletServers()).containsKey(newlyServerId));
+ retryVerifyContext(ctx ->
assertThat(ctx.liveTabletServerSet()).contains(newlyServerId));
initCoordinatorChannel();
// verify the context has the exact tablet server
@@ -360,7 +359,7 @@ class CoordinatorEventProcessorTest {
// retry until the server has been removed from coordinator context
retryVerifyContext(
- ctx ->
assertThat(ctx.getLiveTabletServers()).doesNotContainKey(newlyServerId));
+ ctx ->
assertThat(ctx.liveTabletServerSet()).doesNotContain(newlyServerId));
// check replica state
// all replicas should be online but the replica in the down server
@@ -397,8 +396,7 @@ class CoordinatorEventProcessorTest {
// assume the server that comes again
zookeeperClient.registerTabletServer(newlyServerId,
tabletServerRegistration);
// retry until the server has been added to coordinator context
- retryVerifyContext(
- ctx ->
assertThat(ctx.getLiveTabletServers()).containsKey(newlyServerId));
+ retryVerifyContext(ctx ->
assertThat(ctx.liveTabletServerSet()).contains(newlyServerId));
// make sure the bucket that remains in offline should be online again
// since the server become online
@@ -812,7 +810,6 @@ class CoordinatorEventProcessorTest {
assertThat(ctx.getBucketLeaderAndIsr(tableBucket))
.contains(
leaderAndIsr.newLeaderAndIsr(
-
leaderAndIsr.leader(),
leaderAndIsr.isr()))));
// verify the response
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorTestUtils.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorTestUtils.java
index 9d7337ae7..4636a341d 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorTestUtils.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorTestUtils.java
@@ -43,7 +43,7 @@ public class CoordinatorTestUtils {
TestCoordinatorChannelManager testCoordinatorChannelManager) {
Map<Integer, TabletServerGateway> gateways =
makeTabletServerGateways(
- coordinatorContext.getLiveTabletServers().keySet(),
Collections.emptySet());
+ coordinatorContext.liveTabletServerSet(),
Collections.emptySet());
testCoordinatorChannelManager.setGateways(gateways);
}
@@ -52,8 +52,7 @@ public class CoordinatorTestUtils {
TestCoordinatorChannelManager testCoordinatorChannelManager,
Set<Integer> failServers) {
Map<Integer, TabletServerGateway> gateways =
- makeTabletServerGateways(
- coordinatorContext.getLiveTabletServers().keySet(),
failServers);
+
makeTabletServerGateways(coordinatorContext.liveTabletServerSet(), failServers);
testCoordinatorChannelManager.setGateways(gateways);
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
index 1f7509738..38e3f9ea1 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
@@ -30,6 +30,8 @@ import
org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotResponse;
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestRequest;
import org.apache.fluss.rpc.messages.CommitRemoteLogManifestResponse;
+import org.apache.fluss.rpc.messages.ControlledShutdownRequest;
+import org.apache.fluss.rpc.messages.ControlledShutdownResponse;
import org.apache.fluss.rpc.messages.CreateAclsRequest;
import org.apache.fluss.rpc.messages.CreateAclsResponse;
import org.apache.fluss.rpc.messages.CreateDatabaseRequest;
@@ -296,6 +298,12 @@ public class TestCoordinatorGateway implements
CoordinatorGateway {
throw new UnsupportedOperationException();
}
+ @Override
+ public CompletableFuture<ControlledShutdownResponse> controlledShutdown(
+ ControlledShutdownRequest request) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public CompletableFuture<ListAclsResponse> listAcls(ListAclsRequest
request) {
throw new UnsupportedOperationException();
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithmsTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithmsTest.java
new file mode 100644
index 000000000..a530a1929
--- /dev/null
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaLeaderElectionAlgorithmsTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator.statemachine;
+
+import
org.apache.fluss.server.coordinator.statemachine.TableBucketStateMachine.ElectionResult;
+import org.apache.fluss.server.zk.data.LeaderAndIsr;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+import static
org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionAlgorithms.controlledShutdownReplicaLeaderElection;
+import static
org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionAlgorithms.defaultReplicaLeaderElection;
+import static
org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionAlgorithms.initReplicaLeaderElection;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link ReplicaLeaderElectionAlgorithms}. */
+public class ReplicaLeaderElectionAlgorithmsTest {
+
+ @Test
+ void testInitReplicaLeaderElection() {
+ List<Integer> assignments = Arrays.asList(2, 4);
+ List<Integer> liveReplicas = Collections.singletonList(4);
+
+ Optional<ElectionResult> leaderElectionResultOpt =
+ initReplicaLeaderElection(assignments, liveReplicas, 0);
+ assertThat(leaderElectionResultOpt.isPresent()).isTrue();
+ ElectionResult leaderElectionResult = leaderElectionResultOpt.get();
+
assertThat(leaderElectionResult.getLiveReplicas()).containsExactlyInAnyOrder(4);
+
assertThat(leaderElectionResult.getLeaderAndIsr().leader()).isEqualTo(4);
+
assertThat(leaderElectionResult.getLeaderAndIsr().isr()).containsExactlyInAnyOrder(4);
+ }
+
+ @Test
+ void testDefaultReplicaLeaderElection() {
+ List<Integer> assignments = Arrays.asList(2, 4);
+ List<Integer> liveReplicas = Arrays.asList(2, 4);
+ LeaderAndIsr originLeaderAndIsr = new LeaderAndIsr(4, 0,
Arrays.asList(2, 4), 0, 0);
+
+ Optional<ElectionResult> leaderElectionResultOpt =
+ defaultReplicaLeaderElection(assignments, liveReplicas,
originLeaderAndIsr);
+ assertThat(leaderElectionResultOpt.isPresent()).isTrue();
+ ElectionResult leaderElectionResult = leaderElectionResultOpt.get();
+
assertThat(leaderElectionResult.getLiveReplicas()).containsExactlyInAnyOrder(2,
4);
+
assertThat(leaderElectionResult.getLeaderAndIsr().leader()).isEqualTo(2);
+
assertThat(leaderElectionResult.getLeaderAndIsr().isr()).containsExactlyInAnyOrder(2,
4);
+ }
+
+ @Test
+ void testControlledShutdownReplicaLeaderElection() {
+ List<Integer> assignments = Arrays.asList(2, 4);
+ List<Integer> liveReplicas = Arrays.asList(2, 4);
+ LeaderAndIsr originLeaderAndIsr = new LeaderAndIsr(2, 0,
Arrays.asList(2, 4), 0, 0);
+ Set<Integer> shutdownTabletServers = Collections.singleton(2);
+
+ Optional<ElectionResult> leaderElectionResultOpt =
+ controlledShutdownReplicaLeaderElection(
+ assignments, liveReplicas, originLeaderAndIsr,
shutdownTabletServers);
+ assertThat(leaderElectionResultOpt.isPresent()).isTrue();
+ ElectionResult leaderElectionResult = leaderElectionResultOpt.get();
+
assertThat(leaderElectionResult.getLiveReplicas()).containsExactlyInAnyOrder(4);
+
assertThat(leaderElectionResult.getLeaderAndIsr().leader()).isEqualTo(4);
+
assertThat(leaderElectionResult.getLeaderAndIsr().isr()).containsExactlyInAnyOrder(4);
+ }
+
+ @Test
+ void testControlledShutdownReplicaLeaderElectionLastIsrShuttingDown() {
+ List<Integer> assignments = Arrays.asList(2, 4);
+ List<Integer> liveReplicas = Arrays.asList(2, 4);
+ LeaderAndIsr originLeaderAndIsr =
+ new LeaderAndIsr(2, 0, Collections.singletonList(2), 0, 0);
+ Set<Integer> shutdownTabletServers = Collections.singleton(2);
+
+ Optional<ElectionResult> leaderElectionResultOpt =
+ controlledShutdownReplicaLeaderElection(
+ assignments, liveReplicas, originLeaderAndIsr,
shutdownTabletServers);
+ assertThat(leaderElectionResultOpt).isEmpty();
+ }
+
+ @Test
+ void
testControlledShutdownPartitionLeaderElectionAllIsrSimultaneouslyShutdown() {
+ List<Integer> assignments = Arrays.asList(2, 4);
+ List<Integer> liveReplicas = Arrays.asList(2, 4);
+ LeaderAndIsr originLeaderAndIsr = new LeaderAndIsr(2, 0,
Arrays.asList(2, 4), 0, 0);
+ Set<Integer> shutdownTabletServers = new HashSet<>(Arrays.asList(2,
4));
+
+ Optional<ElectionResult> leaderElectionResultOpt =
+ controlledShutdownReplicaLeaderElection(
+ assignments, liveReplicas, originLeaderAndIsr,
shutdownTabletServers);
+ assertThat(leaderElectionResultOpt).isEmpty();
+ }
+}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java
index 4c3cf1ab4..454ec5de4 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java
@@ -223,7 +223,7 @@ class ReplicaStateMachineTest {
replicaStateMachine.handleStateChanges(replicas, OfflineReplica);
leaderAndIsr =
coordinatorContext.getBucketLeaderAndIsr(tableBucket).get();
assertThat(leaderAndIsr)
- .isEqualTo(new LeaderAndIsr(LeaderAndIsr.NO_LEADER, 0,
Arrays.asList(2), 0, 3));
+ .isEqualTo(new LeaderAndIsr(LeaderAndIsr.NO_LEADER, 3,
Arrays.asList(2), 0, 3));
}
@Test
@@ -274,7 +274,7 @@ class ReplicaStateMachineTest {
assertThat(leaderAndIsr)
.isEqualTo(
new LeaderAndIsr(
- LeaderAndIsr.NO_LEADER, 0,
Collections.singletonList(0), 0, 3));
+ LeaderAndIsr.NO_LEADER, 1,
Collections.singletonList(0), 0, 3));
}
private void toReplicaDeletionStartedState(
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
index 9009765e4..6cee33fbe 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java
@@ -53,14 +53,19 @@ import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
+import java.util.List;
import java.util.Optional;
import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
+import static
org.apache.fluss.server.coordinator.CoordinatorTestUtils.createServers;
+import static
org.apache.fluss.server.coordinator.CoordinatorTestUtils.makeSendLeaderAndStopRequestAlwaysSuccess;
import static
org.apache.fluss.server.coordinator.statemachine.BucketState.NewBucket;
import static
org.apache.fluss.server.coordinator.statemachine.BucketState.NonExistentBucket;
import static
org.apache.fluss.server.coordinator.statemachine.BucketState.OfflineBucket;
import static
org.apache.fluss.server.coordinator.statemachine.BucketState.OnlineBucket;
+import static
org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElectionStrategy.CONTROLLED_SHUTDOWN_ELECTION;
import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
import static org.assertj.core.api.Assertions.assertThat;
@@ -121,9 +126,8 @@ class TableBucketStateMachineTest {
coordinatorContext.putTablePath(t1Id, TablePath.of("db1", "t1"));
coordinatorContext.putTablePath(t2Id, TablePath.of("db1", "t2"));
- coordinatorContext.setLiveTabletServers(
- CoordinatorTestUtils.createServers(Arrays.asList(0, 1, 3)));
- CoordinatorTestUtils.makeSendLeaderAndStopRequestAlwaysSuccess(
+ coordinatorContext.setLiveTabletServers(createServers(Arrays.asList(0,
1, 3)));
+ makeSendLeaderAndStopRequestAlwaysSuccess(
coordinatorContext, testCoordinatorChannelManager);
// set assignments
coordinatorContext.updateBucketReplicaAssignment(t1b0,
Arrays.asList(0, 1));
@@ -203,9 +207,8 @@ class TableBucketStateMachineTest {
assertThat(coordinatorContext.getBucketState(tableBucket)).isEqualTo(NewBucket);
// now, we set 3 live servers
- coordinatorContext.setLiveTabletServers(
- CoordinatorTestUtils.createServers(Arrays.asList(0, 1, 2)));
- CoordinatorTestUtils.makeSendLeaderAndStopRequestAlwaysSuccess(
+ coordinatorContext.setLiveTabletServers(createServers(Arrays.asList(0,
1, 2)));
+ makeSendLeaderAndStopRequestAlwaysSuccess(
coordinatorContext, testCoordinatorChannelManager);
// change to online again
@@ -217,8 +220,7 @@ class TableBucketStateMachineTest {
// case2: assuming the leader replica fail(we remove it to server
list),
// we need elect another replica,
- coordinatorContext.setLiveTabletServers(
- CoordinatorTestUtils.createServers(Arrays.asList(1, 2)));
+ coordinatorContext.setLiveTabletServers(createServers(Arrays.asList(1,
2)));
tableBucketStateMachine.handleStateChange(Collections.singleton(tableBucket),
OnlineBucket);
// check state is online
@@ -230,8 +232,7 @@ class TableBucketStateMachineTest {
// case4: the leader replica fail, but non replicas is available
coordinatorContext.putBucketState(tableBucket, OfflineBucket);
- coordinatorContext.setLiveTabletServers(
- CoordinatorTestUtils.createServers(Collections.emptyList()));
+
coordinatorContext.setLiveTabletServers(createServers(Collections.emptyList()));
tableBucketStateMachine.handleStateChange(Collections.singleton(tableBucket),
OnlineBucket);
// the state will still be offline
assertThat(coordinatorContext.getBucketState(tableBucket)).isEqualTo(OfflineBucket);
@@ -266,8 +267,7 @@ class TableBucketStateMachineTest {
coordinatorContext, coordinatorRequestBatch,
zookeeperClient);
eventManager.start();
- coordinatorContext.setLiveTabletServers(
- CoordinatorTestUtils.createServers(Arrays.asList(0, 1, 2)));
+ coordinatorContext.setLiveTabletServers(createServers(Arrays.asList(0,
1, 2)));
CoordinatorTestUtils.makeSendLeaderAndStopRequestFailContext(
coordinatorContext, testCoordinatorChannelManager,
Sets.newHashSet(0, 2));
// init a table bucket assignment to coordinator context
@@ -319,6 +319,61 @@ class TableBucketStateMachineTest {
assertThat(coordinatorContext.getBucketState(tableBucket0)).isNull();
}
+ @Test
+ void testStateChangeForTabletServerControlledShutdown() {
+ TableBucketStateMachine tableBucketStateMachine =
createTableBucketStateMachine();
+ long tableId = 7;
+ TablePath fakeTablePath = TablePath.of("db1", "t2");
+ TableBucket tb = new TableBucket(tableId, 0);
+
+ // init coordinator context.
+ coordinatorContext.putTableInfo(
+ TableInfo.of(
+ fakeTablePath,
+ tableId,
+ 0,
+ DATA1_TABLE_DESCRIPTOR,
+ System.currentTimeMillis(),
+ System.currentTimeMillis()));
+ coordinatorContext.putTablePath(tableId, fakeTablePath);
+ coordinatorContext.updateBucketReplicaAssignment(tb, Arrays.asList(0,
1, 2));
+ coordinatorContext.putBucketState(tb, NewBucket);
+
+ List<Integer> aliveServers = Arrays.asList(0, 1, 2);
+ coordinatorContext.setLiveTabletServers(createServers(aliveServers));
+ makeSendLeaderAndStopRequestAlwaysSuccess(
+ coordinatorContext, testCoordinatorChannelManager);
+
+ // check state is online.
+ tableBucketStateMachine.handleStateChange(Collections.singleton(tb),
OnlineBucket);
+
assertThat(coordinatorContext.getBucketState(tb)).isEqualTo(OnlineBucket);
+ assertThat(coordinatorContext.liveTabletServerSet())
+ .containsExactlyInAnyOrderElementsOf(aliveServers);
+ assertThat(coordinatorContext.shuttingDownTabletServers()).isEmpty();
+ assertThat(coordinatorContext.liveOrShuttingDownTabletServers())
+ .containsExactlyInAnyOrderElementsOf(aliveServers);
+
+ int oldLeader =
coordinatorContext.getBucketLeaderAndIsr(tb).get().leader();
+ aliveServers =
+ aliveServers.stream().filter(s -> s !=
oldLeader).collect(Collectors.toList());
+
+ // trigger controlled shutdown for oldLeader.
+ coordinatorContext.shuttingDownTabletServers().add(oldLeader);
+ assertThat(coordinatorContext.liveTabletServerSet())
+ .containsExactlyInAnyOrderElementsOf(aliveServers);
+ assertThat(coordinatorContext.shuttingDownTabletServers())
+ .containsExactlyInAnyOrder(oldLeader);
+ assertThat(coordinatorContext.liveOrShuttingDownTabletServers())
+ .containsExactlyInAnyOrder(0, 1, 2);
+
+ // handle state change for controlled shutdown.
+ tableBucketStateMachine.handleStateChange(
+ Collections.singleton(tb), OnlineBucket,
CONTROLLED_SHUTDOWN_ELECTION);
+
assertThat(coordinatorContext.getBucketState(tb)).isEqualTo(OnlineBucket);
+ assertThat(coordinatorContext.getBucketLeaderAndIsr(tb).get().leader())
+ .isNotEqualTo(oldLeader);
+ }
+
private TableBucketStateMachine createTableBucketStateMachine() {
return new TableBucketStateMachine(
coordinatorContext, coordinatorRequestBatch, zookeeperClient);
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServerFailOverITCase.java
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServerShutdownITCase.java
similarity index 77%
rename from
fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServerFailOverITCase.java
rename to
fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServerShutdownITCase.java
index 8388259a5..2102df6d1 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServerFailOverITCase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServerShutdownITCase.java
@@ -51,9 +51,8 @@ import static
org.apache.fluss.testutils.common.CommonTestUtils.retry;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/** The ITCase for tablet server failover. */
-class TabletServerFailOverITCase {
-
+/** The ITCase for tabletServer shutdown (controlled shutdown). */
+public class TabletServerShutdownITCase {
@RegisterExtension
public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
FlussClusterExtension.builder().setNumOfTabletServers(3).build();
@@ -114,7 +113,42 @@ class TabletServerFailOverITCase {
}
@Test
- void testKillServers() throws Exception {
+ void testControlledShutdown() throws Exception {
+ FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(3);
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(Schema.newBuilder().column("a",
DataTypes.INT()).build())
+ .distributedBy(1)
+ .property(ConfigOptions.TABLE_REPLICATION_FACTOR, 3)
+ .build();
+ TablePath tablePath = TablePath.of("test_shutdown",
"test_controlled_shutdown");
+ long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath,
tableDescriptor);
+ TableBucket tb = new TableBucket(tableId, 0);
+
+ LeaderAndIsr leaderAndIsr =
FLUSS_CLUSTER_EXTENSION.waitLeaderAndIsrReady(tb);
+ int leader = leaderAndIsr.leader();
+
+ // test kill the tabletServers with leader on.
+ FLUSS_CLUSTER_EXTENSION.stopTabletServer(leader);
+ ZooKeeperClient zkClient =
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
+
+ // the leader should be removed from isr, and new leader should be
elected.
+ retry(
+ Duration.ofMinutes(1),
+ () ->
+ assertThat(zkClient.getLeaderAndIsr(tb))
+ .map(LeaderAndIsr::leader)
+ .isNotEqualTo(leader));
+
+ // restart the shutdown server
+ FLUSS_CLUSTER_EXTENSION.startTabletServer(leader, true);
+ }
+
+ @Test
+ void testControlledShutdownRetriesFailover() throws Exception {
+ // This case is to test the scenario that the controlled shutdown
request is retried and
+ // failed by cannot elect any new leader. In this case the controlled
shutdown will finally
+ // go uncontrolled shutdown.
FLUSS_CLUSTER_EXTENSION.assertHasTabletServerNumber(3);
TableDescriptor tableDescriptor =
TableDescriptor.builder()
@@ -122,7 +156,7 @@ class TabletServerFailOverITCase {
.distributedBy(1)
.property(ConfigOptions.TABLE_REPLICATION_FACTOR, 2)
.build();
- TablePath tablePath = TablePath.of("test_failover",
"test_kill_servers");
+ TablePath tablePath = TablePath.of("test_failover",
"test_controlled_shutdown_failed");
long tableId = createTable(FLUSS_CLUSTER_EXTENSION, tablePath,
tableDescriptor);
TableBucket tb = new TableBucket(tableId, 0);
@@ -132,20 +166,22 @@ class TabletServerFailOverITCase {
isr.remove(Integer.valueOf(leader));
int follower = isr.get(0);
- // let's kil follower
+ // Let's kil follower. Will go controlled shutdown.
FLUSS_CLUSTER_EXTENSION.stopTabletServer(follower);
ZooKeeperClient zkClient =
FLUSS_CLUSTER_EXTENSION.getZooKeeperClient();
// the follower should be removed from isr
LeaderAndIsr expectedLeaderAndIsr1 =
- leaderAndIsr.newLeaderAndIsr(leader,
Collections.singletonList(leader));
+
leaderAndIsr.newLeaderAndIsr(Collections.singletonList(leader));
retry(
Duration.ofMinutes(1),
() ->
assertThat(zkClient.getLeaderAndIsr(tb).get())
.isEqualTo(expectedLeaderAndIsr1));
- // kill the leader again
+ // kill the leader. As we only have 1 replica, no leader can be
elected as we send the
+ // controlled shutdown request to the leader. So the controlled
shutdown will finally go
+ // uncontrolled shutdown.
FLUSS_CLUSTER_EXTENSION.stopTabletServer(leader);
// should be no leader