This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a4d1bf0 [coordinator] add retry for Coordinator Server register to zk 
when reboot (#1307)
2a4d1bf0 is described below

commit 2a4d1bf0aba5a49b4a3c3ad511faa0dc38800de4
Author: xiaozhou <[email protected]>
AuthorDate: Sat Jul 12 16:29:57 2025 +0800

    [coordinator] add retry for Coordinator Server register to zk when reboot 
(#1307)
---
 .../java/com/alibaba/fluss/server/ServerBase.java  |  3 +
 .../server/coordinator/CoordinatorServer.java      | 77 ++++++++++------------
 .../alibaba/fluss/server/tablet/TabletServer.java  |  3 -
 3 files changed, 37 insertions(+), 46 deletions(-)

diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/ServerBase.java 
b/fluss-server/src/main/java/com/alibaba/fluss/server/ServerBase.java
index 1c477b78..d2f6fc4d 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/ServerBase.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/ServerBase.java
@@ -61,6 +61,9 @@ public abstract class ServerBase implements 
AutoCloseableAsync, FatalErrorHandle
 
     private static final Duration INITIALIZATION_SHUTDOWN_TIMEOUT = 
Duration.ofSeconds(30L);
 
+    protected static final long ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS = 60 * 
1000L;
+    protected static final long ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS = 3 * 
1000L;
+
     protected final Configuration conf;
 
     protected FileSystem remoteFileSystem;
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
 
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
index 227d05d3..e54fc9ee 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
@@ -56,7 +56,6 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.GuardedBy;
 
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -208,7 +207,9 @@ public class CoordinatorServer extends ServerBase {
             rpcServer.start();
 
             registerCoordinatorLeader();
-            registerZookeeperClientReconnectedListener();
+            // when init session, register coordinator server again
+            ZooKeeperUtils.registerZookeeperClientReInitSessionListener(
+                    zkClient, this::registerCoordinatorLeader, this);
 
             this.clientMetricGroup = new ClientMetricGroup(metricRegistry, 
SERVER_NAME);
             this.rpcClient = RpcClient.create(conf, clientMetricGroup, true);
@@ -281,52 +282,42 @@ public class CoordinatorServer extends ServerBase {
     }
 
     private void registerCoordinatorLeader() throws Exception {
+        long startTime = System.currentTimeMillis();
         List<Endpoint> bindEndpoints = rpcServer.getBindEndpoints();
         CoordinatorAddress coordinatorAddress =
                 new CoordinatorAddress(
                         this.serverId, 
Endpoint.loadAdvertisedEndpoints(bindEndpoints, conf));
-        zkClient.registerCoordinatorLeader(coordinatorAddress);
-    }
 
-    private void registerZookeeperClientReconnectedListener() {
-        ZooKeeperUtils.registerZookeeperClientReInitSessionListener(
-                zkClient,
-                () -> {
-                    // we need to retry to register since although
-                    // zkClient reconnect, the ephemeral node may still exist
-                    // for a while time, retry to wait the ephemeral node 
removed
-                    // see ZOOKEEPER-2985
-                    long startTime = System.currentTimeMillis();
-                    long retryWaitIntervalMs = 
Duration.ofSeconds(3).toMillis();
-                    long retryTotalWaitTimeMs = 
Duration.ofMinutes(1).toMillis();
-                    while (true) {
-                        try {
-                            this.registerCoordinatorLeader();
-                            break;
-                        } catch (KeeperException.NodeExistsException 
nodeExistsException) {
-                            long elapsedTime = System.currentTimeMillis() - 
startTime;
-                            if (elapsedTime >= retryTotalWaitTimeMs) {
-                                LOG.error(
-                                        "Coordinator Server register to 
Zookeeper exceeded total retry time of {} ms. "
-                                                + "Aborting registration 
attempts.",
-                                        retryTotalWaitTimeMs);
-                                throw nodeExistsException;
-                            }
-
-                            LOG.warn(
-                                    "Coordinator server already registered in 
Zookeeper. "
-                                            + "retrying register after {} 
ms....",
-                                    retryWaitIntervalMs);
-                            try {
-                                Thread.sleep(retryWaitIntervalMs);
-                            } catch (InterruptedException 
interruptedException) {
-                                Thread.currentThread().interrupt();
-                                break;
-                            }
-                        }
-                    }
-                },
-                this);
+        // we need to retry to register since although
+        // zkClient reconnect, the ephemeral node may still exist
+        // for a while time, retry to wait the ephemeral node removed
+        // see ZOOKEEPER-2985
+        while (true) {
+            try {
+                zkClient.registerCoordinatorLeader(coordinatorAddress);
+                break;
+            } catch (KeeperException.NodeExistsException nodeExistsException) {
+                long elapsedTime = System.currentTimeMillis() - startTime;
+                if (elapsedTime >= ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS) {
+                    LOG.error(
+                            "Coordinator Server register to Zookeeper exceeded 
total retry time of {} ms. "
+                                    + "Aborting registration attempts.",
+                            ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS);
+                    throw nodeExistsException;
+                }
+
+                LOG.warn(
+                        "Coordinator server already registered in Zookeeper. "
+                                + "retrying register after {} ms....",
+                        ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS);
+                try {
+                    Thread.sleep(ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS);
+                } catch (InterruptedException interruptedException) {
+                    Thread.currentThread().interrupt();
+                    break;
+                }
+            }
+        }
     }
 
     private void createDefaultDatabase() {
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java 
b/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java
index 006152d0..76a75eb5 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java
@@ -79,9 +79,6 @@ public class TabletServer extends ServerBase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(TabletServer.class);
 
-    private static final long ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS = 60 * 
1000L;
-    private static final long ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS = 3 * 1000L;
-
     private final int serverId;
 
     /**

Reply via email to