This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit c49b84859f9bb8c80a22319c9c54ec65d3fe18ee
Author: Jark Wu <[email protected]>
AuthorDate: Tue Mar 31 18:06:32 2026 +0800

    [server] Improve the implementation of Coordinator HA
---
 .../exception/NotCoordinatorLeaderException.java   |   6 +-
 .../fluss/rpc/gateway/CoordinatorGateway.java      |   3 +
 .../rpc/netty/server/FlussRequestHandler.java      |  15 +++
 .../server/coordinator/CoordinatorContext.java     |   2 +-
 .../coordinator/CoordinatorLeaderElection.java     |  77 +++++--------
 .../server/coordinator/CoordinatorServer.java      | 127 +++++++++------------
 .../server/coordinator/CoordinatorService.java     |  75 +-----------
 .../coordinator/RequireCoordinatorLeader.java      |  48 --------
 .../org/apache/fluss/server/zk/data/ZkData.java    |   2 +-
 ...java => CoordinatorHighAvailabilityITCase.java} |   2 +-
 .../coordinator/CoordinatorServerElectionTest.java |  12 +-
 .../server/coordinator/TestCoordinatorGateway.java |   5 +
 .../server/testutils/FlussClusterExtension.java    |   2 -
 13 files changed, 123 insertions(+), 253 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/exception/NotCoordinatorLeaderException.java
 
b/fluss-common/src/main/java/org/apache/fluss/exception/NotCoordinatorLeaderException.java
index a3f65aaf4..faa2f5587 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/exception/NotCoordinatorLeaderException.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/exception/NotCoordinatorLeaderException.java
@@ -17,7 +17,11 @@
 
 package org.apache.fluss.exception;
 
-/** Exception thrown when a request is sent to a stand by coordinator server. 
since: 0.9 */
+/**
+ * Exception thrown when a request is sent to a stand by coordinator server.
+ *
+ * @since 1.0
+ */
 public class NotCoordinatorLeaderException extends ApiException {
 
     private static final long serialVersionUID = 1L;
diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java
index 92508f856..fc509ca30 100644
--- 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java
+++ 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/CoordinatorGateway.java
@@ -40,6 +40,9 @@ import java.util.concurrent.CompletableFuture;
 /** The entry point of RPC gateway interface for coordinator server. */
 public interface CoordinatorGateway extends RpcGateway, AdminGateway {
 
+    /** Checks if the current server is the leader. */
+    boolean isLeader();
+
     /**
      * AdjustIsr request to adjust (expend or shrink) the ISR set for request 
table bucket.
      *
diff --git 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/FlussRequestHandler.java
 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/FlussRequestHandler.java
index 21f3672a2..2ff5e0691 100644
--- 
a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/FlussRequestHandler.java
+++ 
b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/FlussRequestHandler.java
@@ -17,8 +17,11 @@
 
 package org.apache.fluss.rpc.netty.server;
 
+import org.apache.fluss.exception.NotCoordinatorLeaderException;
 import org.apache.fluss.rpc.RpcGatewayService;
+import org.apache.fluss.rpc.gateway.CoordinatorGateway;
 import org.apache.fluss.rpc.messages.ApiMessage;
+import org.apache.fluss.rpc.protocol.ApiKeys;
 import org.apache.fluss.rpc.protocol.ApiMethod;
 import org.apache.fluss.rpc.protocol.RequestType;
 
@@ -35,9 +38,11 @@ public class FlussRequestHandler implements 
RequestHandler<FlussRequest> {
     private static final Logger LOG = 
LoggerFactory.getLogger(FlussRequestHandler.class);
 
     private final RpcGatewayService service;
+    private final boolean isCoordinator;
 
     public FlussRequestHandler(RpcGatewayService service) {
         this.service = service;
+        this.isCoordinator = service instanceof CoordinatorGateway;
     }
 
     @Override
@@ -58,6 +63,16 @@ public class FlussRequestHandler implements 
RequestHandler<FlussRequest> {
                             request.isInternal(),
                             request.getAddress(),
                             request.getPrincipal()));
+            // check if the coordinator server is the current leader if the 
API is a coordinator
+            // TODO: we should only check coordinator APIs instead of all APIs
+            if (isCoordinator && api.getApiKey() != ApiKeys.API_VERSIONS) {
+                if (!((CoordinatorGateway) service).isLeader()) {
+                    request.fail(
+                            new NotCoordinatorLeaderException(
+                                    "This coordinator server is not the 
current leader."));
+                    return;
+                }
+            }
             // invoke the corresponding method on RpcGateway instance.
             CompletableFuture<?> responseFuture =
                     (CompletableFuture<?>) api.getMethod().invoke(service, 
message);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
index 2b821b02d..57ed11528 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
@@ -710,10 +710,10 @@ public class CoordinatorContext {
         tablesToBeDeleted.clear();
         coordinatorEpoch = 0;
         clearTablesState();
-        // clear the live tablet servers
         liveTabletServers.clear();
         shuttingDownTabletServers.clear();
         serverTags.clear();
+        liveCoordinatorServers.clear();
     }
 
     public int getTotalPartitionCount() {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java
index 9f09df6af..d226123b5 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java
@@ -53,17 +53,15 @@ public class CoordinatorLeaderElection implements 
AutoCloseable {
     private final String serverId;
     private final LeaderLatch leaderLatch;
     private final AtomicBoolean isLeader = new AtomicBoolean(false);
-    private final CompletableFuture<Void> leaderReadyFuture = new 
CompletableFuture<>();
-    // Single-threaded executor to run leader init callbacks outside Curator's 
EventThread.
+    // Cached thread pool to run leader init/cleanup callbacks outside 
Curator's EventThread.
     // Curator's LeaderLatchListener callbacks run on its internal 
EventThread; performing
     // synchronous ZK operations there causes deadlock because ZK response 
dispatch also
-    // needs that same thread.
+    // needs that same thread. A cached pool is used because these callbacks 
are transient
+    // (only run during leadership transitions), so idle threads can be 
reclaimed.
     private final ExecutorService leaderCallbackExecutor;
     // Tracks the pending cleanup task so that init can wait for it to 
complete.
     private final AtomicReference<CompletableFuture<Void>> pendingCleanup =
             new AtomicReference<>(CompletableFuture.completedFuture(null));
-    private volatile Runnable initLeaderServices;
-    private volatile Consumer<Throwable> cleanupLeaderServices;
 
     public CoordinatorLeaderElection(ZooKeeperClient zkClient, String 
serverId) {
         this.serverId = serverId;
@@ -73,34 +71,35 @@ public class CoordinatorLeaderElection implements 
AutoCloseable {
                         ZkData.CoordinatorElectionZNode.path(),
                         String.valueOf(serverId));
         this.leaderCallbackExecutor =
-                Executors.newSingleThreadExecutor(
+                Executors.newCachedThreadPool(
                         r -> {
                             Thread t = new Thread(r, 
"coordinator-leader-callback-" + serverId);
+                            // Daemon threads ensure the JVM can exit even if 
close() is not
+                            // called. Orderly shutdown is handled by close() 
-> shutdownNow().
                             t.setDaemon(true);
                             return t;
                         });
     }
 
     /**
-     * Starts the leader election process asynchronously. The returned future 
completes when this
-     * server becomes the leader for the first time and initializes the leader 
services.
+     * Starts the leader election process asynchronously.
      *
      * <p>After the first election, the server will continue to participate in 
future elections.
      * When re-elected as leader, the initLeaderServices callback will be 
invoked again.
      *
      * @param initLeaderServices the callback to initialize leader services 
once elected
      * @param cleanupLeaderServices the callback to clean up leader services 
when losing leadership
-     * @return a CompletableFuture that completes when this server becomes 
leader for the first time
      */
-    public CompletableFuture<Void> startElectLeaderAsync(
+    public void startElectLeaderAsync(
             Runnable initLeaderServices, Consumer<Throwable> 
cleanupLeaderServices) {
-        this.initLeaderServices = initLeaderServices;
-        this.cleanupLeaderServices = cleanupLeaderServices;
-
         leaderLatch.addListener(
                 new LeaderLatchListener() {
                     @Override
                     public void isLeader() {
+                        // return if already marked as leader to avoid 
duplicate init calls
+                        if (isLeader.get()) {
+                            return;
+                        }
                         LOG.info("Coordinator server {} has become the 
leader.", serverId);
                         // Capture the pending cleanup future at this point so 
that
                         // init waits for it before proceeding.
@@ -124,15 +123,15 @@ public class CoordinatorLeaderElection implements 
AutoCloseable {
                                     }
                                     try {
                                         initLeaderServices.run();
-                                        leaderReadyFuture.complete(null);
                                     } catch (Exception e) {
                                         LOG.error(
                                                 "Failed to initialize leader 
services for server {}",
                                                 serverId,
                                                 e);
-                                        
leaderReadyFuture.completeExceptionally(e);
                                     }
                                 });
+                        // Set leader flag before init completes, so when zk 
found this leader, the
+                        // leader can accept requests
                         isLeader.set(true);
                     }
 
@@ -142,31 +141,27 @@ public class CoordinatorLeaderElection implements 
AutoCloseable {
                             LOG.warn(
                                     "Coordinator server {} has lost the 
leadership, cleaning up leader services.",
                                     serverId);
-                            // Run cleanup on a separate daemon thread (NOT on 
the
-                            // leaderCallbackExecutor) to avoid blocking init 
tasks.
+                            // Submit cleanup to leaderCallbackExecutor. The 
cached thread
+                            // pool can spawn a new thread even if init is 
still running.
                             // The cleanup completion is tracked via 
pendingCleanup so
                             // that subsequent init waits for it.
                             CompletableFuture<Void> cleanupFuture = new 
CompletableFuture<>();
                             pendingCleanup.set(cleanupFuture);
-                            Thread cleanupThread =
-                                    new Thread(
-                                            () -> {
-                                                try {
-                                                    if (cleanupLeaderServices 
!= null) {
-                                                        
cleanupLeaderServices.accept(null);
-                                                    }
-                                                } catch (Exception e) {
-                                                    LOG.error(
-                                                            "Failed to cleanup 
leader services for server {}",
-                                                            serverId,
-                                                            e);
-                                                } finally {
-                                                    
cleanupFuture.complete(null);
-                                                }
-                                            },
-                                            "coordinator-leader-cleanup-" + 
serverId);
-                            cleanupThread.setDaemon(true);
-                            cleanupThread.start();
+                            leaderCallbackExecutor.execute(
+                                    () -> {
+                                        try {
+                                            if (cleanupLeaderServices != null) 
{
+                                                
cleanupLeaderServices.accept(null);
+                                            }
+                                        } catch (Exception e) {
+                                            LOG.error(
+                                                    "Failed to cleanup leader 
services for server {}",
+                                                    serverId,
+                                                    e);
+                                        } finally {
+                                            cleanupFuture.complete(null);
+                                        }
+                                    });
                         }
                     }
                 });
@@ -176,11 +171,7 @@ public class CoordinatorLeaderElection implements 
AutoCloseable {
             LOG.info("Coordinator server {} started leader election.", 
serverId);
         } catch (Exception e) {
             LOG.error("Failed to start LeaderLatch for server {}", serverId, 
e);
-            leaderReadyFuture.completeExceptionally(
-                    new RuntimeException("Leader election start failed", e));
         }
-
-        return leaderReadyFuture;
     }
 
     @Override
@@ -196,12 +187,6 @@ public class CoordinatorLeaderElection implements 
AutoCloseable {
         }
 
         leaderCallbackExecutor.shutdownNow();
-
-        // Complete the future exceptionally if it hasn't been completed yet
-        if (!leaderReadyFuture.isDone()) {
-            leaderReadyFuture.completeExceptionally(
-                    new RuntimeException("Leader election closed for server " 
+ serverId));
-        }
     }
 
     public boolean isLeader() {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
index 2262b971b..55ba87108 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
@@ -152,9 +152,6 @@ public class CoordinatorServer extends ServerBase {
     @GuardedBy("lock")
     private CoordinatorLeaderElection coordinatorLeaderElection;
 
-    @GuardedBy("lock")
-    private CompletableFuture<Void> leaderElectionFuture;
-
     @GuardedBy("lock")
     private KvSnapshotLeaseManager kvSnapshotLeaseManager;
 
@@ -182,28 +179,26 @@ public class CoordinatorServer extends ServerBase {
         electCoordinatorLeaderAsync();
     }
 
-    private CompletableFuture<Void> electCoordinatorLeaderAsync() throws 
Exception {
+    private void electCoordinatorLeaderAsync() throws Exception {
         initCoordinatorStandby();
 
         // start election (coordinatorLeaderElection is created inside 
initCoordinatorStandby
         // after zkClient is initialized)
-        this.leaderElectionFuture =
-                coordinatorLeaderElection.startElectLeaderAsync(
-                        () -> {
-                            try {
-                                initCoordinatorLeader();
-                            } catch (Exception e) {
-                                throw new RuntimeException(e);
-                            }
-                        },
-                        (Throwable t) -> {
-                            try {
-                                cleanupCoordinatorLeader();
-                            } catch (Exception e) {
-                                LOG.error("Failed to cleanup coordinator 
leader services", e);
-                            }
-                        });
-        return leaderElectionFuture;
+        coordinatorLeaderElection.startElectLeaderAsync(
+                () -> {
+                    try {
+                        initCoordinatorLeader();
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                },
+                (Throwable t) -> {
+                    try {
+                        cleanupCoordinatorLeader();
+                    } catch (Exception e) {
+                        LOG.error("Failed to cleanup coordinator leader 
services", e);
+                    }
+                });
     }
 
     protected void initCoordinatorStandby() throws Exception {
@@ -421,72 +416,53 @@ public class CoordinatorServer extends ServerBase {
     }
 
     private void registerCoordinatorServer() throws Exception {
-        long startTime = System.currentTimeMillis();
-        List<Endpoint> bindEndpoints = rpcServer.getBindEndpoints();
-        CoordinatorAddress coordinatorAddress =
-                new CoordinatorAddress(
-                        this.serverId, 
Endpoint.loadAdvertisedEndpoints(bindEndpoints, conf));
-
-        // we need to retry to register since although
-        // zkClient reconnect, the ephemeral node may still exist
-        // for a while time, retry to wait the ephemeral node removed
-        // see ZOOKEEPER-2985
-        while (true) {
-            try {
-                zkClient.registerCoordinatorServer(coordinatorAddress);
-                break;
-            } catch (KeeperException.NodeExistsException nodeExistsException) {
-                long elapsedTime = System.currentTimeMillis() - startTime;
-                if (elapsedTime >= ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS) {
-                    LOG.error(
-                            "Coordinator Server register to Zookeeper exceeded 
total retry time of {} ms. "
-                                    + "Aborting registration attempts.",
-                            ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS);
-                    throw nodeExistsException;
-                }
-
-                LOG.warn(
-                        "Coordinator server already registered in Zookeeper. "
-                                + "retrying register after {} ms....",
-                        ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS);
-                try {
-                    Thread.sleep(ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS);
-                } catch (InterruptedException interruptedException) {
-                    Thread.currentThread().interrupt();
-                    break;
-                }
-            }
-        }
+        CoordinatorAddress coordinatorAddress = buildCoordinatorAddress();
+        registerToZookeeperWithRetry(
+                "coordinator server", () -> 
zkClient.registerCoordinatorServer(coordinatorAddress));
     }
 
     private void registerCoordinatorLeader() throws Exception {
-        long startTime = System.currentTimeMillis();
+        CoordinatorAddress coordinatorAddress = buildCoordinatorAddress();
+        registerToZookeeperWithRetry(
+                "coordinator leader", () -> 
zkClient.registerCoordinatorLeader(coordinatorAddress));
+    }
+
+    private CoordinatorAddress buildCoordinatorAddress() {
         List<Endpoint> bindEndpoints = rpcServer.getBindEndpoints();
-        CoordinatorAddress coordinatorAddress =
-                new CoordinatorAddress(
-                        this.serverId, 
Endpoint.loadAdvertisedEndpoints(bindEndpoints, conf));
-
-        // we need to retry to register since although
-        // zkClient reconnect, the ephemeral node may still exist
-        // for a while time, retry to wait the ephemeral node removed
-        // see ZOOKEEPER-2985
+        return new CoordinatorAddress(
+                this.serverId, Endpoint.loadAdvertisedEndpoints(bindEndpoints, 
conf));
+    }
+
+    /**
+     * Registers to ZooKeeper with retry logic to handle the case where the 
ephemeral node may still
+     * exist for a while after ZK client reconnects.
+     *
+     * @param description a description of the registration for logging
+     * @param registration the registration action to perform
+     * @see <a 
href="https://issues.apache.org/jira/browse/ZOOKEEPER-2985";>ZOOKEEPER-2985</a>
+     */
+    private void registerToZookeeperWithRetry(String description, 
ThrowingRunnable registration)
+            throws Exception {
+        long startTime = System.currentTimeMillis();
         while (true) {
             try {
-                zkClient.registerCoordinatorLeader(coordinatorAddress);
+                registration.run();
                 break;
             } catch (KeeperException.NodeExistsException nodeExistsException) {
                 long elapsedTime = System.currentTimeMillis() - startTime;
                 if (elapsedTime >= ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS) {
                     LOG.error(
-                            "Coordinator Server register to Zookeeper exceeded 
total retry time of {} ms. "
+                            "Registering {} to Zookeeper exceeded total retry 
time of {} ms. "
                                     + "Aborting registration attempts.",
+                            description,
                             ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS);
                     throw nodeExistsException;
                 }
 
                 LOG.warn(
-                        "Coordinator server already registered in Zookeeper. "
-                                + "retrying register after {} ms....",
+                        "Node for {} already exists in Zookeeper. "
+                                + "Retrying register after {} ms....",
+                        description,
                         ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS);
                 try {
                     Thread.sleep(ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS);
@@ -498,6 +474,12 @@ public class CoordinatorServer extends ServerBase {
         }
     }
 
+    /** A functional interface for actions that may throw checked exceptions. 
*/
+    @FunctionalInterface
+    private interface ThrowingRunnable {
+        void run() throws Exception;
+    }
+
     private void createDefaultDatabase() {
         MetadataManager metadataManager =
                 new MetadataManager(zkClient, conf, lakeCatalogDynamicLoader);
@@ -686,11 +668,6 @@ public class CoordinatorServer extends ServerBase {
         return coordinatorService;
     }
 
-    @VisibleForTesting
-    public CompletableFuture<Void> getLeaderElectionFuture() {
-        return leaderElectionFuture;
-    }
-
     @Override
     protected String getServerName() {
         return SERVER_NAME;
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 7f9d92c77..aaf2460cf 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -35,7 +35,6 @@ import org.apache.fluss.exception.InvalidDatabaseException;
 import org.apache.fluss.exception.InvalidTableException;
 import org.apache.fluss.exception.LakeTableAlreadyExistException;
 import org.apache.fluss.exception.NonPrimaryKeyTableException;
-import org.apache.fluss.exception.NotCoordinatorLeaderException;
 import org.apache.fluss.exception.SecurityDisabledException;
 import org.apache.fluss.exception.TableAlreadyExistException;
 import org.apache.fluss.exception.TableNotPartitionedException;
@@ -294,19 +293,9 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return "coordinator";
     }
 
-    /**
-     * Checks whether this coordinator server is the current leader.
-     *
-     * <p>This method should be called at the beginning of every method 
annotated with {@link
-     * RequireCoordinatorLeader} to guard against requests being processed by 
a standby coordinator.
-     *
-     * @throws NotCoordinatorLeaderException if this server is not the current 
coordinator leader
-     */
-    private void checkLeader() {
-        if (!coordinatorLeaderElection.isLeader()) {
-            throw new NotCoordinatorLeaderException(
-                    "This coordinator server is not the current leader.");
-        }
+    @Override
+    public boolean isLeader() {
+        return coordinatorLeaderElection.isLeader();
     }
 
     @Override
@@ -367,10 +356,8 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         }
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<CreateDatabaseResponse> 
createDatabase(CreateDatabaseRequest request) {
-        checkLeader();
         if (authorizer != null) {
             authorizer.authorize(currentSession(), OperationType.CREATE, 
Resource.cluster());
         }
@@ -393,7 +380,6 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return CompletableFuture.completedFuture(response);
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<AlterDatabaseResponse> 
alterDatabase(AlterDatabaseRequest request) {
         String databaseName = request.getDatabaseName();
@@ -438,7 +424,6 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
 
     @Override
     public CompletableFuture<DropDatabaseResponse> 
dropDatabase(DropDatabaseRequest request) {
-        checkLeader();
         authorizeDatabase(OperationType.DROP, request.getDatabaseName());
         DropDatabaseResponse response = new DropDatabaseResponse();
         metadataManager.dropDatabase(
@@ -446,10 +431,8 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return CompletableFuture.completedFuture(response);
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<CreateTableResponse> 
createTable(CreateTableRequest request) {
-        checkLeader();
         TablePath tablePath = toTablePath(request.getTablePath());
         tablePath.validate();
         authorizeDatabase(OperationType.CREATE, tablePath.getDatabaseName());
@@ -518,10 +501,8 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return CompletableFuture.completedFuture(new CreateTableResponse());
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest 
request) {
-        checkLeader();
         TablePath tablePath = toTablePath(request.getTablePath());
         tablePath.validate();
         authorizeTable(OperationType.ALTER, tablePath);
@@ -680,10 +661,8 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return Boolean.parseBoolean(dataLakeEnabledValue);
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<DropTableResponse> dropTable(DropTableRequest 
request) {
-        checkLeader();
         TablePath tablePath = toTablePath(request.getTablePath());
         authorizeTable(OperationType.DROP, tablePath);
 
@@ -692,11 +671,9 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return CompletableFuture.completedFuture(response);
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<CreatePartitionResponse> createPartition(
             CreatePartitionRequest request) {
-        checkLeader();
         TablePath tablePath = toTablePath(request.getTablePath());
         authorizeTable(OperationType.WRITE, tablePath);
 
@@ -738,10 +715,8 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return CompletableFuture.completedFuture(response);
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<DropPartitionResponse> 
dropPartition(DropPartitionRequest request) {
-        checkLeader();
         TablePath tablePath = toTablePath(request.getTablePath());
         authorizeTable(OperationType.WRITE, tablePath);
 
@@ -762,10 +737,8 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return CompletableFuture.completedFuture(response);
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<MetadataResponse> metadata(MetadataRequest 
request) {
-        checkLeader();
         String listenerName = currentListenerName();
         Session session = currentSession();
 
@@ -784,9 +757,7 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return metadataResponseAccessContextEvent.getResultFuture();
     }
 
-    @RequireCoordinatorLeader
     public CompletableFuture<AdjustIsrResponse> adjustIsr(AdjustIsrRequest 
request) {
-        checkLeader();
         CompletableFuture<AdjustIsrResponse> response = new 
CompletableFuture<>();
         eventManagerSupplier
                 .get()
@@ -794,11 +765,9 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return response;
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<CommitKvSnapshotResponse> commitKvSnapshot(
             CommitKvSnapshotRequest request) {
-        checkLeader();
         CompletableFuture<CommitKvSnapshotResponse> response = new 
CompletableFuture<>();
         // parse completed snapshot from request
         byte[] completedSnapshotBytes = request.getCompletedSnapshot();
@@ -813,11 +782,9 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return response;
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<CommitRemoteLogManifestResponse> 
commitRemoteLogManifest(
             CommitRemoteLogManifestRequest request) {
-        checkLeader();
         CompletableFuture<CommitRemoteLogManifestResponse> response = new 
CompletableFuture<>();
         eventManagerSupplier
                 .get()
@@ -827,10 +794,8 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return response;
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<CreateAclsResponse> createAcls(CreateAclsRequest 
request) {
-        checkLeader();
         if (authorizer == null) {
             throw new SecurityDisabledException("No Authorizer is 
configured.");
         }
@@ -839,10 +804,8 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return 
CompletableFuture.completedFuture(makeCreateAclsResponse(aclCreateResults));
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<DropAclsResponse> dropAcls(DropAclsRequest 
request) {
-        checkLeader();
         if (authorizer == null) {
             throw new SecurityDisabledException("No Authorizer is 
configured.");
         }
@@ -851,11 +814,9 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return 
CompletableFuture.completedFuture(makeDropAclsResponse(aclDeleteResults));
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<PrepareLakeTableSnapshotResponse> 
prepareLakeTableSnapshot(
             PrepareLakeTableSnapshotRequest request) {
-        checkLeader();
         CompletableFuture<PrepareLakeTableSnapshotResponse> future = new 
CompletableFuture<>();
         boolean ignorePreviousBucketOffsets =
                 request.hasIgnorePreviousTableOffsets() && 
request.isIgnorePreviousTableOffsets();
@@ -904,11 +865,9 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return future;
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<CommitLakeTableSnapshotResponse> 
commitLakeTableSnapshot(
             CommitLakeTableSnapshotRequest request) {
-        checkLeader();
         CompletableFuture<CommitLakeTableSnapshotResponse> response = new 
CompletableFuture<>();
         eventManagerSupplier
                 .get()
@@ -918,11 +877,9 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return response;
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<LakeTieringHeartbeatResponse> 
lakeTieringHeartbeat(
             LakeTieringHeartbeatRequest request) {
-        checkLeader();
         LakeTieringHeartbeatResponse heartbeatResponse = new 
LakeTieringHeartbeatResponse();
         int currentCoordinatorEpoch = coordinatorEpochSupplier.get();
         heartbeatResponse.setCoordinatorEpoch(currentCoordinatorEpoch);
@@ -996,11 +953,9 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return CompletableFuture.completedFuture(heartbeatResponse);
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<ControlledShutdownResponse> controlledShutdown(
             ControlledShutdownRequest request) {
-        checkLeader();
         if (authorizer != null) {
             authorizer.authorize(currentSession(), OperationType.ALTER, 
Resource.cluster());
         }
@@ -1016,11 +971,9 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return response;
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<AcquireKvSnapshotLeaseResponse> 
acquireKvSnapshotLease(
             AcquireKvSnapshotLeaseRequest request) {
-        checkLeader();
         for (PbKvSnapshotLeaseForTable kvSnapshotLeaseForTable :
                 request.getSnapshotsToLeasesList()) {
             long tableId = kvSnapshotLeaseForTable.getTableId();
@@ -1052,11 +1005,9 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                 ioExecutor);
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<ReleaseKvSnapshotLeaseResponse> 
releaseKvSnapshotLease(
             ReleaseKvSnapshotLeaseRequest request) {
-        checkLeader();
         for (PbTableBucket tableBucket : request.getBucketsToReleasesList()) {
             long tableId = tableBucket.getTableId();
             if (authorizer != null) {
@@ -1087,11 +1038,9 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                 ioExecutor);
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<DropKvSnapshotLeaseResponse> dropKvSnapshotLease(
             DropKvSnapshotLeaseRequest request) {
-        checkLeader();
         String leaseId = request.getLeaseId();
         // Capture session before entering async block since currentSession() 
is thread-local
         Session session = authorizer != null ? currentSession() : null;
@@ -1128,11 +1077,9 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                 ioExecutor);
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<AlterClusterConfigsResponse> alterClusterConfigs(
             AlterClusterConfigsRequest request) {
-        checkLeader();
         CompletableFuture<AlterClusterConfigsResponse> future = new 
CompletableFuture<>();
         List<PbAlterConfig> infos = request.getAlterConfigsList();
         if (infos.isEmpty()) {
@@ -1173,10 +1120,8 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return future;
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<AddServerTagResponse> 
addServerTag(AddServerTagRequest request) {
-        checkLeader();
         if (authorizer != null) {
             authorizer.authorize(currentSession(), OperationType.ALTER, 
Resource.cluster());
         }
@@ -1194,11 +1139,9 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return response;
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<RemoveServerTagResponse> removeServerTag(
             RemoveServerTagRequest request) {
-        checkLeader();
         if (authorizer != null) {
             authorizer.authorize(currentSession(), OperationType.ALTER, 
Resource.cluster());
         }
@@ -1216,10 +1159,8 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return response;
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<RebalanceResponse> rebalance(RebalanceRequest 
request) {
-        checkLeader();
         if (authorizer != null) {
             authorizer.authorize(currentSession(), OperationType.WRITE, 
Resource.cluster());
         }
@@ -1233,11 +1174,9 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return response;
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<ListRebalanceProgressResponse> 
listRebalanceProgress(
             ListRebalanceProgressRequest request) {
-        checkLeader();
         if (authorizer != null) {
             authorizer.authorize(currentSession(), OperationType.DESCRIBE, 
Resource.cluster());
         }
@@ -1252,11 +1191,9 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return response;
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<CancelRebalanceResponse> cancelRebalance(
             CancelRebalanceRequest request) {
-        checkLeader();
         if (authorizer != null) {
             authorizer.authorize(currentSession(), OperationType.WRITE, 
Resource.cluster());
         }
@@ -1367,11 +1304,9 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
     // Producer Offset Management APIs (for Exactly-Once Semantics)
     // 
==================================================================================
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<RegisterProducerOffsetsResponse> 
registerProducerOffsets(
             RegisterProducerOffsetsRequest request) {
-        checkLeader();
         // Authorization: require WRITE permission on all tables in the request
         if (authorizer != null) {
             for (PbProducerTableOffsets tableOffsets : 
request.getTableOffsetsList()) {
@@ -1415,11 +1350,9 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                 ioExecutor);
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<GetProducerOffsetsResponse> getProducerOffsets(
             GetProducerOffsetsRequest request) {
-        checkLeader();
         String producerId = request.getProducerId();
         // Capture session before entering async block since currentSession() 
is thread-local
         Session session = authorizer != null ? currentSession() : null;
@@ -1476,11 +1409,9 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                 ioExecutor);
     }
 
-    @RequireCoordinatorLeader
     @Override
     public CompletableFuture<DeleteProducerOffsetsResponse> 
deleteProducerOffsets(
             DeleteProducerOffsetsRequest request) {
-        checkLeader();
         // Capture session before entering async block since currentSession() 
is thread-local
         Session session = authorizer != null ? currentSession() : null;
         return CompletableFuture.supplyAsync(
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/RequireCoordinatorLeader.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/RequireCoordinatorLeader.java
deleted file mode 100644
index cbf0178a3..000000000
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/RequireCoordinatorLeader.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.fluss.server.coordinator;
-
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * Annotation to mark RPC methods that require the coordinator server to be 
the current leader
- * before execution.
- *
- * <p>When a method annotated with {@code @RequireCoordinatorLeader} is 
invoked, it will first check
- * whether this coordinator server is the current leader via {@link
- * CoordinatorLeaderElection#isLeader()}. If the server is not the leader, a 
{@link
- * org.apache.fluss.exception.NotCoordinatorLeaderException} will be thrown 
immediately, and the
- * actual method body will not be executed.
- *
- * <p>Usage example:
- *
- * <pre>{@code
- * @RequireCoordinatorLeader
- * public CompletableFuture<CreateTableResponse> 
createTable(CreateTableRequest request) {
- *     // method body only executes when this server is the coordinator leader
- * }
- * }</pre>
- */
-@Documented
-@Target(ElementType.METHOD)
-@Retention(RetentionPolicy.RUNTIME)
-public @interface RequireCoordinatorLeader {}
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java 
b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java
index 6e007fdfc..32b5a7c01 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java
@@ -349,7 +349,7 @@ public final class ZkData {
     /**
      * The znode for the active coordinator leader. The znode path is:
      *
-     * <p>/coordinators/leader
+     * <p>/coordinators/active
      */
     public static final class CoordinatorLeaderZNode {
         public static String path() {
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHAITCase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java
similarity index 99%
rename from 
fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHAITCase.java
rename to 
fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java
index 66cc39332..9fa4f2ad2 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHAITCase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java
@@ -68,7 +68,7 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
  * (initialize leader services) -> lose leadership (become standby) -> 
re-participate in election ->
  * become leader again.
  */
-class CoordinatorHAITCase {
+class CoordinatorHighAvailabilityITCase {
 
     @RegisterExtension
     public static final AllCallbackWrapper<ZooKeeperExtension> 
ZOO_KEEPER_EXTENSION_WRAPPER =
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java
index ad89f57ad..cf832a955 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java
@@ -34,6 +34,7 @@ import java.time.Duration;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 
 import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -141,12 +142,11 @@ class CoordinatorServerElectionTest {
 
     public void waitUntilCoordinatorServerReelected(CoordinatorAddress 
originAddress) {
         waitUntil(
-                () ->
-                        
zookeeperClient.getCoordinatorLeaderAddress().isPresent()
-                                && !zookeeperClient
-                                        .getCoordinatorLeaderAddress()
-                                        .get()
-                                        .equals(originAddress),
+                () -> {
+                    Optional<CoordinatorAddress> address =
+                            zookeeperClient.getCoordinatorLeaderAddress();
+                    return address.isPresent() && 
!address.get().equals(originAddress);
+                },
                 Duration.ofMinutes(1),
                 "Fail to wait coordinator server reelected");
     }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
index 6fc51fc44..f55846a7f 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java
@@ -271,6 +271,11 @@ public class TestCoordinatorGateway implements 
CoordinatorGateway {
         return null;
     }
 
+    @Override
+    public boolean isLeader() {
+        return true;
+    }
+
     @Override
     public CompletableFuture<AdjustIsrResponse> adjustIsr(AdjustIsrRequest 
request) {
         if (networkIssueEnable) {
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
index 8ce9f6a7c..37d3f6c46 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
@@ -944,8 +944,6 @@ public final class FlussClusterExtension
     }
 
     public void waitUntilCoordinatorServerElected() throws Exception {
-        coordinatorServer.getLeaderElectionFuture().get();
-
         waitUntil(
                 () -> 
zooKeeperClient.getCoordinatorLeaderAddress().isPresent(),
                 Duration.ofSeconds(10),


Reply via email to