This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/fluss.git
commit c49b84859f9bb8c80a22319c9c54ec65d3fe18ee Author: Jark Wu <[email protected]> AuthorDate: Tue Mar 31 18:06:32 2026 +0800 [server] Improve the implementation of Coordinator HA --- .../exception/NotCoordinatorLeaderException.java | 6 +- .../fluss/rpc/gateway/CoordinatorGateway.java | 3 + .../rpc/netty/server/FlussRequestHandler.java | 15 +++ .../server/coordinator/CoordinatorContext.java | 2 +- .../coordinator/CoordinatorLeaderElection.java | 77 +++++-------- .../server/coordinator/CoordinatorServer.java | 127 +++++++++------------ .../server/coordinator/CoordinatorService.java | 75 +----------- .../coordinator/RequireCoordinatorLeader.java | 48 -------- .../org/apache/fluss/server/zk/data/ZkData.java | 2 +- ...java => CoordinatorHighAvailabilityITCase.java} | 2 +- .../coordinator/CoordinatorServerElectionTest.java | 12 +- .../server/coordinator/TestCoordinatorGateway.java | 5 + .../server/testutils/FlussClusterExtension.java | 2 - 13 files changed, 123 insertions(+), 253 deletions(-) diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/NotCoordinatorLeaderException.java b/fluss-common/src/main/java/org/apache/fluss/exception/NotCoordinatorLeaderException.java index a3f65aaf4..faa2f5587 100644 --- a/fluss-common/src/main/java/org/apache/fluss/exception/NotCoordinatorLeaderException.java +++ b/fluss-common/src/main/java/org/apache/fluss/exception/NotCoordinatorLeaderException.java @@ -17,7 +17,11 @@ package org.apache.fluss.exception; -/** Exception thrown when a request is sent to a stand by coordinator server. since: 0.9 */ +/** + * Exception thrown when a request is sent to a stand by coordinator server. + * + * @since 1.0 + */ public class NotCoordinatorLeaderException extends ApiException { private static final long serialVersionUID = 1L; diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java index 92508f856..fc509ca30 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java @@ -40,6 +40,9 @@ import java.util.concurrent.CompletableFuture; /** The entry point of RPC gateway interface for coordinator server. */ public interface CoordinatorGateway extends RpcGateway, AdminGateway { + /** Checks if the current server is the leader. */ + boolean isLeader(); + /** * AdjustIsr request to adjust (expend or shrink) the ISR set for request table bucket. * diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/FlussRequestHandler.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/FlussRequestHandler.java index 21f3672a2..2ff5e0691 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/FlussRequestHandler.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/FlussRequestHandler.java @@ -17,8 +17,11 @@ package org.apache.fluss.rpc.netty.server; +import org.apache.fluss.exception.NotCoordinatorLeaderException; import org.apache.fluss.rpc.RpcGatewayService; +import org.apache.fluss.rpc.gateway.CoordinatorGateway; import org.apache.fluss.rpc.messages.ApiMessage; +import org.apache.fluss.rpc.protocol.ApiKeys; import org.apache.fluss.rpc.protocol.ApiMethod; import org.apache.fluss.rpc.protocol.RequestType; @@ -35,9 +38,11 @@ public class FlussRequestHandler implements RequestHandler<FlussRequest> { private static final Logger LOG = LoggerFactory.getLogger(FlussRequestHandler.class); private final RpcGatewayService service; + private final boolean isCoordinator; public FlussRequestHandler(RpcGatewayService service) { this.service = service; + this.isCoordinator = service instanceof CoordinatorGateway; } @Override @@ -58,6 +63,16 @@ public class FlussRequestHandler implements RequestHandler<FlussRequest> { request.isInternal(), request.getAddress(), request.getPrincipal())); + // check if the coordinator server is the current leader if the API is a coordinator + // TODO: we should only check coordinator APIs instead of all APIs + if (isCoordinator && api.getApiKey() != ApiKeys.API_VERSIONS) { + if (!((CoordinatorGateway) service).isLeader()) { + request.fail( + new NotCoordinatorLeaderException( + "This coordinator server is not the current leader.")); + return; + } + } // invoke the corresponding method on RpcGateway instance. CompletableFuture<?> responseFuture = (CompletableFuture<?>) api.getMethod().invoke(service, message); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java index 2b821b02d..57ed11528 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java @@ -710,10 +710,10 @@ public class CoordinatorContext { tablesToBeDeleted.clear(); coordinatorEpoch = 0; clearTablesState(); - // clear the live tablet servers liveTabletServers.clear(); shuttingDownTabletServers.clear(); serverTags.clear(); + liveCoordinatorServers.clear(); } public int getTotalPartitionCount() { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java index 9f09df6af..d226123b5 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java @@ -53,17 +53,15 @@ public class CoordinatorLeaderElection implements AutoCloseable { private final String serverId; private final LeaderLatch leaderLatch; private final AtomicBoolean isLeader = new AtomicBoolean(false); - private final CompletableFuture<Void> leaderReadyFuture = new CompletableFuture<>(); - // Single-threaded executor to run leader init callbacks outside Curator's EventThread. + // Cached thread pool to run leader init/cleanup callbacks outside Curator's EventThread. // Curator's LeaderLatchListener callbacks run on its internal EventThread; performing // synchronous ZK operations there causes deadlock because ZK response dispatch also - // needs that same thread. + // needs that same thread. A cached pool is used because these callbacks are transient + // (only run during leadership transitions), so idle threads can be reclaimed. private final ExecutorService leaderCallbackExecutor; // Tracks the pending cleanup task so that init can wait for it to complete. private final AtomicReference<CompletableFuture<Void>> pendingCleanup = new AtomicReference<>(CompletableFuture.completedFuture(null)); - private volatile Runnable initLeaderServices; - private volatile Consumer<Throwable> cleanupLeaderServices; public CoordinatorLeaderElection(ZooKeeperClient zkClient, String serverId) { this.serverId = serverId; @@ -73,34 +71,35 @@ public class CoordinatorLeaderElection implements AutoCloseable { ZkData.CoordinatorElectionZNode.path(), String.valueOf(serverId)); this.leaderCallbackExecutor = - Executors.newSingleThreadExecutor( + Executors.newCachedThreadPool( r -> { Thread t = new Thread(r, "coordinator-leader-callback-" + serverId); + // Daemon threads ensure the JVM can exit even if close() is not + // called. Orderly shutdown is handled by close() -> shutdownNow(). t.setDaemon(true); return t; }); } /** - * Starts the leader election process asynchronously. The returned future completes when this - * server becomes the leader for the first time and initializes the leader services. + * Starts the leader election process asynchronously. * * <p>After the first election, the server will continue to participate in future elections. * When re-elected as leader, the initLeaderServices callback will be invoked again. * * @param initLeaderServices the callback to initialize leader services once elected * @param cleanupLeaderServices the callback to clean up leader services when losing leadership - * @return a CompletableFuture that completes when this server becomes leader for the first time */ - public CompletableFuture<Void> startElectLeaderAsync( + public void startElectLeaderAsync( Runnable initLeaderServices, Consumer<Throwable> cleanupLeaderServices) { - this.initLeaderServices = initLeaderServices; - this.cleanupLeaderServices = cleanupLeaderServices; - leaderLatch.addListener( new LeaderLatchListener() { @Override public void isLeader() { + // return if already marked as leader to avoid duplicate init calls + if (isLeader.get()) { + return; + } LOG.info("Coordinator server {} has become the leader.", serverId); // Capture the pending cleanup future at this point so that // init waits for it before proceeding. @@ -124,15 +123,15 @@ public class CoordinatorLeaderElection implements AutoCloseable { } try { initLeaderServices.run(); - leaderReadyFuture.complete(null); } catch (Exception e) { LOG.error( "Failed to initialize leader services for server {}", serverId, e); - leaderReadyFuture.completeExceptionally(e); } }); + // Set leader flag before init completes, so when zk found this leader, the + // leader can accept requests isLeader.set(true); } @@ -142,31 +141,27 @@ public class CoordinatorLeaderElection implements AutoCloseable { LOG.warn( "Coordinator server {} has lost the leadership, cleaning up leader services.", serverId); - // Run cleanup on a separate daemon thread (NOT on the - // leaderCallbackExecutor) to avoid blocking init tasks. + // Submit cleanup to leaderCallbackExecutor. The cached thread + // pool can spawn a new thread even if init is still running. // The cleanup completion is tracked via pendingCleanup so // that subsequent init waits for it. CompletableFuture<Void> cleanupFuture = new CompletableFuture<>(); pendingCleanup.set(cleanupFuture); - Thread cleanupThread = - new Thread( - () -> { - try { - if (cleanupLeaderServices != null) { - cleanupLeaderServices.accept(null); - } - } catch (Exception e) { - LOG.error( - "Failed to cleanup leader services for server {}", - serverId, - e); - } finally { - cleanupFuture.complete(null); - } - }, - "coordinator-leader-cleanup-" + serverId); - cleanupThread.setDaemon(true); - cleanupThread.start(); + leaderCallbackExecutor.execute( + () -> { + try { + if (cleanupLeaderServices != null) { + cleanupLeaderServices.accept(null); + } + } catch (Exception e) { + LOG.error( + "Failed to cleanup leader services for server {}", + serverId, + e); + } finally { + cleanupFuture.complete(null); + } + }); } } }); @@ -176,11 +171,7 @@ public class CoordinatorLeaderElection implements AutoCloseable { LOG.info("Coordinator server {} started leader election.", serverId); } catch (Exception e) { LOG.error("Failed to start LeaderLatch for server {}", serverId, e); - leaderReadyFuture.completeExceptionally( - new RuntimeException("Leader election start failed", e)); } - - return leaderReadyFuture; } @Override @@ -196,12 +187,6 @@ public class CoordinatorLeaderElection implements AutoCloseable { } leaderCallbackExecutor.shutdownNow(); - - // Complete the future exceptionally if it hasn't been completed yet - if (!leaderReadyFuture.isDone()) { - leaderReadyFuture.completeExceptionally( - new RuntimeException("Leader election closed for server " + serverId)); - } } public boolean isLeader() { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java index 2262b971b..55ba87108 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java @@ -152,9 +152,6 @@ public class CoordinatorServer extends ServerBase { @GuardedBy("lock") private CoordinatorLeaderElection coordinatorLeaderElection; - @GuardedBy("lock") - private CompletableFuture<Void> leaderElectionFuture; - @GuardedBy("lock") private KvSnapshotLeaseManager kvSnapshotLeaseManager; @@ -182,28 +179,26 @@ public class CoordinatorServer extends ServerBase { electCoordinatorLeaderAsync(); } - private CompletableFuture<Void> electCoordinatorLeaderAsync() throws Exception { + private void electCoordinatorLeaderAsync() throws Exception { initCoordinatorStandby(); // start election (coordinatorLeaderElection is created inside initCoordinatorStandby // after zkClient is initialized) - this.leaderElectionFuture = - coordinatorLeaderElection.startElectLeaderAsync( - () -> { - try { - initCoordinatorLeader(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, - (Throwable t) -> { - try { - cleanupCoordinatorLeader(); - } catch (Exception e) { - LOG.error("Failed to cleanup coordinator leader services", e); - } - }); - return leaderElectionFuture; + coordinatorLeaderElection.startElectLeaderAsync( + () -> { + try { + initCoordinatorLeader(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, + (Throwable t) -> { + try { + cleanupCoordinatorLeader(); + } catch (Exception e) { + LOG.error("Failed to cleanup coordinator leader services", e); + } + }); } protected void initCoordinatorStandby() throws Exception { @@ -421,72 +416,53 @@ public class CoordinatorServer extends ServerBase { } private void registerCoordinatorServer() throws Exception { - long startTime = System.currentTimeMillis(); - List<Endpoint> bindEndpoints = rpcServer.getBindEndpoints(); - CoordinatorAddress coordinatorAddress = - new CoordinatorAddress( - this.serverId, Endpoint.loadAdvertisedEndpoints(bindEndpoints, conf)); - - // we need to retry to register since although - // zkClient reconnect, the ephemeral node may still exist - // for a while time, retry to wait the ephemeral node removed - // see ZOOKEEPER-2985 - while (true) { - try { - zkClient.registerCoordinatorServer(coordinatorAddress); - break; - } catch (KeeperException.NodeExistsException nodeExistsException) { - long elapsedTime = System.currentTimeMillis() - startTime; - if (elapsedTime >= ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS) { - LOG.error( - "Coordinator Server register to Zookeeper exceeded total retry time of {} ms. " - + "Aborting registration attempts.", - ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS); - throw nodeExistsException; - } - - LOG.warn( - "Coordinator server already registered in Zookeeper. " - + "retrying register after {} ms....", - ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS); - try { - Thread.sleep(ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS); - } catch (InterruptedException interruptedException) { - Thread.currentThread().interrupt(); - break; - } - } - } + CoordinatorAddress coordinatorAddress = buildCoordinatorAddress(); + registerToZookeeperWithRetry( + "coordinator server", () -> zkClient.registerCoordinatorServer(coordinatorAddress)); } private void registerCoordinatorLeader() throws Exception { - long startTime = System.currentTimeMillis(); + CoordinatorAddress coordinatorAddress = buildCoordinatorAddress(); + registerToZookeeperWithRetry( + "coordinator leader", () -> zkClient.registerCoordinatorLeader(coordinatorAddress)); + } + + private CoordinatorAddress buildCoordinatorAddress() { List<Endpoint> bindEndpoints = rpcServer.getBindEndpoints(); - CoordinatorAddress coordinatorAddress = - new CoordinatorAddress( - this.serverId, Endpoint.loadAdvertisedEndpoints(bindEndpoints, conf)); - - // we need to retry to register since although - // zkClient reconnect, the ephemeral node may still exist - // for a while time, retry to wait the ephemeral node removed - // see ZOOKEEPER-2985 + return new CoordinatorAddress( + this.serverId, Endpoint.loadAdvertisedEndpoints(bindEndpoints, conf)); + } + + /** + * Registers to ZooKeeper with retry logic to handle the case where the ephemeral node may still + * exist for a while after ZK client reconnects. + * + * @param description a description of the registration for logging + * @param registration the registration action to perform + * @see <a href="https://issues.apache.org/jira/browse/ZOOKEEPER-2985">ZOOKEEPER-2985</a> + */ + private void registerToZookeeperWithRetry(String description, ThrowingRunnable registration) + throws Exception { + long startTime = System.currentTimeMillis(); while (true) { try { - zkClient.registerCoordinatorLeader(coordinatorAddress); + registration.run(); break; } catch (KeeperException.NodeExistsException nodeExistsException) { long elapsedTime = System.currentTimeMillis() - startTime; if (elapsedTime >= ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS) { LOG.error( - "Coordinator Server register to Zookeeper exceeded total retry time of {} ms. " + "Registering {} to Zookeeper exceeded total retry time of {} ms. " + "Aborting registration attempts.", + description, ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS); throw nodeExistsException; } LOG.warn( - "Coordinator server already registered in Zookeeper. " - + "retrying register after {} ms....", + "Node for {} already exists in Zookeeper. " + + "Retrying register after {} ms....", + description, ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS); try { Thread.sleep(ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS); @@ -498,6 +474,12 @@ public class CoordinatorServer extends ServerBase { } } + /** A functional interface for actions that may throw checked exceptions. */ + @FunctionalInterface + private interface ThrowingRunnable { + void run() throws Exception; + } + private void createDefaultDatabase() { MetadataManager metadataManager = new MetadataManager(zkClient, conf, lakeCatalogDynamicLoader); @@ -686,11 +668,6 @@ public class CoordinatorServer extends ServerBase { return coordinatorService; } - @VisibleForTesting - public CompletableFuture<Void> getLeaderElectionFuture() { - return leaderElectionFuture; - } - @Override protected String getServerName() { return SERVER_NAME; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 7f9d92c77..aaf2460cf 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -35,7 +35,6 @@ import org.apache.fluss.exception.InvalidDatabaseException; import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.exception.LakeTableAlreadyExistException; import org.apache.fluss.exception.NonPrimaryKeyTableException; -import org.apache.fluss.exception.NotCoordinatorLeaderException; import org.apache.fluss.exception.SecurityDisabledException; import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotPartitionedException; @@ -294,19 +293,9 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return "coordinator"; } - /** - * Checks whether this coordinator server is the current leader. - * - * <p>This method should be called at the beginning of every method annotated with {@link - * RequireCoordinatorLeader} to guard against requests being processed by a standby coordinator. - * - * @throws NotCoordinatorLeaderException if this server is not the current coordinator leader - */ - private void checkLeader() { - if (!coordinatorLeaderElection.isLeader()) { - throw new NotCoordinatorLeaderException( - "This coordinator server is not the current leader."); - } + @Override + public boolean isLeader() { + return coordinatorLeaderElection.isLeader(); } @Override @@ -367,10 +356,8 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina } } - @RequireCoordinatorLeader @Override public CompletableFuture<CreateDatabaseResponse> createDatabase(CreateDatabaseRequest request) { - checkLeader(); if (authorizer != null) { authorizer.authorize(currentSession(), OperationType.CREATE, Resource.cluster()); } @@ -393,7 +380,6 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return CompletableFuture.completedFuture(response); } - @RequireCoordinatorLeader @Override public CompletableFuture<AlterDatabaseResponse> alterDatabase(AlterDatabaseRequest request) { String databaseName = request.getDatabaseName(); @@ -438,7 +424,6 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina @Override public CompletableFuture<DropDatabaseResponse> dropDatabase(DropDatabaseRequest request) { - checkLeader(); authorizeDatabase(OperationType.DROP, request.getDatabaseName()); DropDatabaseResponse response = new DropDatabaseResponse(); metadataManager.dropDatabase( @@ -446,10 +431,8 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return CompletableFuture.completedFuture(response); } - @RequireCoordinatorLeader @Override public CompletableFuture<CreateTableResponse> createTable(CreateTableRequest request) { - checkLeader(); TablePath tablePath = toTablePath(request.getTablePath()); tablePath.validate(); authorizeDatabase(OperationType.CREATE, tablePath.getDatabaseName()); @@ -518,10 +501,8 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return CompletableFuture.completedFuture(new CreateTableResponse()); } - @RequireCoordinatorLeader @Override public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest request) { - checkLeader(); TablePath tablePath = toTablePath(request.getTablePath()); tablePath.validate(); authorizeTable(OperationType.ALTER, tablePath); @@ -680,10 +661,8 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return Boolean.parseBoolean(dataLakeEnabledValue); } - @RequireCoordinatorLeader @Override public CompletableFuture<DropTableResponse> dropTable(DropTableRequest request) { - checkLeader(); TablePath tablePath = toTablePath(request.getTablePath()); authorizeTable(OperationType.DROP, tablePath); @@ -692,11 +671,9 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return CompletableFuture.completedFuture(response); } - @RequireCoordinatorLeader @Override public CompletableFuture<CreatePartitionResponse> createPartition( CreatePartitionRequest request) { - checkLeader(); TablePath tablePath = toTablePath(request.getTablePath()); authorizeTable(OperationType.WRITE, tablePath); @@ -738,10 +715,8 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return CompletableFuture.completedFuture(response); } - @RequireCoordinatorLeader @Override public CompletableFuture<DropPartitionResponse> dropPartition(DropPartitionRequest request) { - checkLeader(); TablePath tablePath = toTablePath(request.getTablePath()); authorizeTable(OperationType.WRITE, tablePath); @@ -762,10 +737,8 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return CompletableFuture.completedFuture(response); } - @RequireCoordinatorLeader @Override public CompletableFuture<MetadataResponse> metadata(MetadataRequest request) { - checkLeader(); String listenerName = currentListenerName(); Session session = currentSession(); @@ -784,9 +757,7 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return metadataResponseAccessContextEvent.getResultFuture(); } - @RequireCoordinatorLeader public CompletableFuture<AdjustIsrResponse> adjustIsr(AdjustIsrRequest request) { - checkLeader(); CompletableFuture<AdjustIsrResponse> response = new CompletableFuture<>(); eventManagerSupplier .get() @@ -794,11 +765,9 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return response; } - @RequireCoordinatorLeader @Override public CompletableFuture<CommitKvSnapshotResponse> commitKvSnapshot( CommitKvSnapshotRequest request) { - checkLeader(); CompletableFuture<CommitKvSnapshotResponse> response = new CompletableFuture<>(); // parse completed snapshot from request byte[] completedSnapshotBytes = request.getCompletedSnapshot(); @@ -813,11 +782,9 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return response; } - @RequireCoordinatorLeader @Override public CompletableFuture<CommitRemoteLogManifestResponse> commitRemoteLogManifest( CommitRemoteLogManifestRequest request) { - checkLeader(); CompletableFuture<CommitRemoteLogManifestResponse> response = new CompletableFuture<>(); eventManagerSupplier .get() @@ -827,10 +794,8 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return response; } - @RequireCoordinatorLeader @Override public CompletableFuture<CreateAclsResponse> createAcls(CreateAclsRequest request) { - checkLeader(); if (authorizer == null) { throw new SecurityDisabledException("No Authorizer is configured."); } @@ -839,10 +804,8 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return CompletableFuture.completedFuture(makeCreateAclsResponse(aclCreateResults)); } - @RequireCoordinatorLeader @Override public CompletableFuture<DropAclsResponse> dropAcls(DropAclsRequest request) { - checkLeader(); if (authorizer == null) { throw new SecurityDisabledException("No Authorizer is configured."); } @@ -851,11 +814,9 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return CompletableFuture.completedFuture(makeDropAclsResponse(aclDeleteResults)); } - @RequireCoordinatorLeader @Override public CompletableFuture<PrepareLakeTableSnapshotResponse> prepareLakeTableSnapshot( PrepareLakeTableSnapshotRequest request) { - checkLeader(); CompletableFuture<PrepareLakeTableSnapshotResponse> future = new CompletableFuture<>(); boolean ignorePreviousBucketOffsets = request.hasIgnorePreviousTableOffsets() && request.isIgnorePreviousTableOffsets(); @@ -904,11 +865,9 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return future; } - @RequireCoordinatorLeader @Override public CompletableFuture<CommitLakeTableSnapshotResponse> commitLakeTableSnapshot( CommitLakeTableSnapshotRequest request) { - checkLeader(); CompletableFuture<CommitLakeTableSnapshotResponse> response = new CompletableFuture<>(); eventManagerSupplier .get() @@ -918,11 +877,9 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return response; } - @RequireCoordinatorLeader @Override public CompletableFuture<LakeTieringHeartbeatResponse> lakeTieringHeartbeat( LakeTieringHeartbeatRequest request) { - checkLeader(); LakeTieringHeartbeatResponse heartbeatResponse = new LakeTieringHeartbeatResponse(); int currentCoordinatorEpoch = coordinatorEpochSupplier.get(); heartbeatResponse.setCoordinatorEpoch(currentCoordinatorEpoch); @@ -996,11 +953,9 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return CompletableFuture.completedFuture(heartbeatResponse); } - @RequireCoordinatorLeader @Override public CompletableFuture<ControlledShutdownResponse> controlledShutdown( ControlledShutdownRequest request) { - checkLeader(); if (authorizer != null) { authorizer.authorize(currentSession(), OperationType.ALTER, Resource.cluster()); } @@ -1016,11 +971,9 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return response; } - @RequireCoordinatorLeader @Override public CompletableFuture<AcquireKvSnapshotLeaseResponse> acquireKvSnapshotLease( AcquireKvSnapshotLeaseRequest request) { - checkLeader(); for (PbKvSnapshotLeaseForTable kvSnapshotLeaseForTable : request.getSnapshotsToLeasesList()) { long tableId = kvSnapshotLeaseForTable.getTableId(); @@ -1052,11 +1005,9 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina ioExecutor); } - @RequireCoordinatorLeader @Override public CompletableFuture<ReleaseKvSnapshotLeaseResponse> releaseKvSnapshotLease( ReleaseKvSnapshotLeaseRequest request) { - checkLeader(); for (PbTableBucket tableBucket : request.getBucketsToReleasesList()) { long tableId = tableBucket.getTableId(); if (authorizer != null) { @@ -1087,11 +1038,9 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina ioExecutor); } - @RequireCoordinatorLeader @Override public CompletableFuture<DropKvSnapshotLeaseResponse> dropKvSnapshotLease( DropKvSnapshotLeaseRequest request) { - checkLeader(); String leaseId = request.getLeaseId(); // Capture session before entering async block since currentSession() is thread-local Session session = authorizer != null ? currentSession() : null; @@ -1128,11 +1077,9 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina ioExecutor); } - @RequireCoordinatorLeader @Override public CompletableFuture<AlterClusterConfigsResponse> alterClusterConfigs( AlterClusterConfigsRequest request) { - checkLeader(); CompletableFuture<AlterClusterConfigsResponse> future = new CompletableFuture<>(); List<PbAlterConfig> infos = request.getAlterConfigsList(); if (infos.isEmpty()) { @@ -1173,10 +1120,8 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return future; } - @RequireCoordinatorLeader @Override public CompletableFuture<AddServerTagResponse> addServerTag(AddServerTagRequest request) { - checkLeader(); if (authorizer != null) { authorizer.authorize(currentSession(), OperationType.ALTER, Resource.cluster()); } @@ -1194,11 +1139,9 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return response; } - @RequireCoordinatorLeader @Override public CompletableFuture<RemoveServerTagResponse> removeServerTag( RemoveServerTagRequest request) { - checkLeader(); if (authorizer != null) { authorizer.authorize(currentSession(), OperationType.ALTER, Resource.cluster()); } @@ -1216,10 +1159,8 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return response; } - @RequireCoordinatorLeader @Override public CompletableFuture<RebalanceResponse> rebalance(RebalanceRequest request) { - checkLeader(); if (authorizer != null) { authorizer.authorize(currentSession(), OperationType.WRITE, Resource.cluster()); } @@ -1233,11 +1174,9 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return response; } - @RequireCoordinatorLeader @Override public CompletableFuture<ListRebalanceProgressResponse> listRebalanceProgress( ListRebalanceProgressRequest request) { - checkLeader(); if (authorizer != null) { authorizer.authorize(currentSession(), OperationType.DESCRIBE, Resource.cluster()); } @@ -1252,11 +1191,9 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina return response; } - @RequireCoordinatorLeader @Override public CompletableFuture<CancelRebalanceResponse> cancelRebalance( CancelRebalanceRequest request) { - checkLeader(); if (authorizer != null) { authorizer.authorize(currentSession(), OperationType.WRITE, Resource.cluster()); } @@ -1367,11 +1304,9 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina // Producer Offset Management APIs (for Exactly-Once Semantics) // ================================================================================== - @RequireCoordinatorLeader @Override public CompletableFuture<RegisterProducerOffsetsResponse> registerProducerOffsets( RegisterProducerOffsetsRequest request) { - checkLeader(); // Authorization: require WRITE permission on all tables in the request if (authorizer != null) { for (PbProducerTableOffsets tableOffsets : request.getTableOffsetsList()) { @@ -1415,11 +1350,9 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina ioExecutor); } - @RequireCoordinatorLeader @Override public CompletableFuture<GetProducerOffsetsResponse> getProducerOffsets( GetProducerOffsetsRequest request) { - checkLeader(); String producerId = request.getProducerId(); // Capture session before entering async block since currentSession() is thread-local Session session = authorizer != null ? currentSession() : null; @@ -1476,11 +1409,9 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina ioExecutor); } - @RequireCoordinatorLeader @Override public CompletableFuture<DeleteProducerOffsetsResponse> deleteProducerOffsets( DeleteProducerOffsetsRequest request) { - checkLeader(); // Capture session before entering async block since currentSession() is thread-local Session session = authorizer != null ? currentSession() : null; return CompletableFuture.supplyAsync( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/RequireCoordinatorLeader.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/RequireCoordinatorLeader.java deleted file mode 100644 index cbf0178a3..000000000 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/RequireCoordinatorLeader.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.server.coordinator; - -import java.lang.annotation.Documented; -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -/** - * Annotation to mark RPC methods that require the coordinator server to be the current leader - * before execution. - * - * <p>When a method annotated with {@code @RequireCoordinatorLeader} is invoked, it will first check - * whether this coordinator server is the current leader via {@link - * CoordinatorLeaderElection#isLeader()}. If the server is not the leader, a {@link - * org.apache.fluss.exception.NotCoordinatorLeaderException} will be thrown immediately, and the - * actual method body will not be executed. - * - * <p>Usage example: - * - * <pre>{@code - * @RequireCoordinatorLeader - * public CompletableFuture<CreateTableResponse> createTable(CreateTableRequest request) { - * // method body only executes when this server is the coordinator leader - * } - * }</pre> - */ -@Documented -@Target(ElementType.METHOD) -@Retention(RetentionPolicy.RUNTIME) -public @interface RequireCoordinatorLeader {} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java index 6e007fdfc..32b5a7c01 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java @@ -349,7 +349,7 @@ public final class ZkData { /** * The znode for the active coordinator leader. The znode path is: * - * <p>/coordinators/leader + * <p>/coordinators/active */ public static final class CoordinatorLeaderZNode { public static String path() { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHAITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java similarity index 99% rename from fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHAITCase.java rename to fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java index 66cc39332..9fa4f2ad2 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHAITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java @@ -68,7 +68,7 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; * (initialize leader services) -> lose leadership (become standby) -> re-participate in election -> * become leader again. */ -class CoordinatorHAITCase { +class CoordinatorHighAvailabilityITCase { @RegisterExtension public static final AllCallbackWrapper<ZooKeeperExtension> ZOO_KEEPER_EXTENSION_WRAPPER = diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java index ad89f57ad..cf832a955 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java @@ -34,6 +34,7 @@ import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.Objects; +import java.util.Optional; import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil; import static org.assertj.core.api.Assertions.assertThat; @@ -141,12 +142,11 @@ class CoordinatorServerElectionTest { public void waitUntilCoordinatorServerReelected(CoordinatorAddress originAddress) { waitUntil( - () -> - zookeeperClient.getCoordinatorLeaderAddress().isPresent() - && !zookeeperClient - .getCoordinatorLeaderAddress() - .get() - .equals(originAddress), + () -> { + Optional<CoordinatorAddress> address = + zookeeperClient.getCoordinatorLeaderAddress(); + return address.isPresent() && !address.get().equals(originAddress); + }, Duration.ofMinutes(1), "Fail to wait coordinator server reelected"); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java index 6fc51fc44..f55846a7f 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java @@ -271,6 +271,11 @@ public class TestCoordinatorGateway implements CoordinatorGateway { return null; } + @Override + public boolean isLeader() { + return true; + } + @Override public CompletableFuture<AdjustIsrResponse> adjustIsr(AdjustIsrRequest request) { if (networkIssueEnable) { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java index 8ce9f6a7c..37d3f6c46 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java @@ -944,8 +944,6 @@ public final class FlussClusterExtension } public void waitUntilCoordinatorServerElected() throws Exception { - coordinatorServer.getLeaderElectionFuture().get(); - waitUntil( () -> zooKeeperClient.getCoordinatorLeaderAddress().isPresent(), Duration.ofSeconds(10),
