This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 2a4d1bf0 [coordinator] add retry for Coordinator Server register to zk
when reboot (#1307)
2a4d1bf0 is described below
commit 2a4d1bf0aba5a49b4a3c3ad511faa0dc38800de4
Author: xiaozhou <[email protected]>
AuthorDate: Sat Jul 12 16:29:57 2025 +0800
[coordinator] add retry for Coordinator Server register to zk when reboot
(#1307)
---
.../java/com/alibaba/fluss/server/ServerBase.java | 3 +
.../server/coordinator/CoordinatorServer.java | 77 ++++++++++------------
.../alibaba/fluss/server/tablet/TabletServer.java | 3 -
3 files changed, 37 insertions(+), 46 deletions(-)
diff --git
a/fluss-server/src/main/java/com/alibaba/fluss/server/ServerBase.java
b/fluss-server/src/main/java/com/alibaba/fluss/server/ServerBase.java
index 1c477b78..d2f6fc4d 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/ServerBase.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/ServerBase.java
@@ -61,6 +61,9 @@ public abstract class ServerBase implements
AutoCloseableAsync, FatalErrorHandle
private static final Duration INITIALIZATION_SHUTDOWN_TIMEOUT =
Duration.ofSeconds(30L);
+ protected static final long ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS = 60 *
1000L;
+ protected static final long ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS = 3 *
1000L;
+
protected final Configuration conf;
protected FileSystem remoteFileSystem;
diff --git
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
index 227d05d3..e54fc9ee 100644
---
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
+++
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
@@ -56,7 +56,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -208,7 +207,9 @@ public class CoordinatorServer extends ServerBase {
rpcServer.start();
registerCoordinatorLeader();
- registerZookeeperClientReconnectedListener();
+ // when init session, register coordinator server again
+ ZooKeeperUtils.registerZookeeperClientReInitSessionListener(
+ zkClient, this::registerCoordinatorLeader, this);
this.clientMetricGroup = new ClientMetricGroup(metricRegistry,
SERVER_NAME);
this.rpcClient = RpcClient.create(conf, clientMetricGroup, true);
@@ -281,52 +282,42 @@ public class CoordinatorServer extends ServerBase {
}
private void registerCoordinatorLeader() throws Exception {
+ long startTime = System.currentTimeMillis();
List<Endpoint> bindEndpoints = rpcServer.getBindEndpoints();
CoordinatorAddress coordinatorAddress =
new CoordinatorAddress(
this.serverId,
Endpoint.loadAdvertisedEndpoints(bindEndpoints, conf));
- zkClient.registerCoordinatorLeader(coordinatorAddress);
- }
- private void registerZookeeperClientReconnectedListener() {
- ZooKeeperUtils.registerZookeeperClientReInitSessionListener(
- zkClient,
- () -> {
- // we need to retry to register since although
- // zkClient reconnect, the ephemeral node may still exist
- // for a while time, retry to wait the ephemeral node
removed
- // see ZOOKEEPER-2985
- long startTime = System.currentTimeMillis();
- long retryWaitIntervalMs =
Duration.ofSeconds(3).toMillis();
- long retryTotalWaitTimeMs =
Duration.ofMinutes(1).toMillis();
- while (true) {
- try {
- this.registerCoordinatorLeader();
- break;
- } catch (KeeperException.NodeExistsException
nodeExistsException) {
- long elapsedTime = System.currentTimeMillis() -
startTime;
- if (elapsedTime >= retryTotalWaitTimeMs) {
- LOG.error(
- "Coordinator Server register to
Zookeeper exceeded total retry time of {} ms. "
- + "Aborting registration
attempts.",
- retryTotalWaitTimeMs);
- throw nodeExistsException;
- }
-
- LOG.warn(
- "Coordinator server already registered in
Zookeeper. "
- + "retrying register after {}
ms....",
- retryWaitIntervalMs);
- try {
- Thread.sleep(retryWaitIntervalMs);
- } catch (InterruptedException
interruptedException) {
- Thread.currentThread().interrupt();
- break;
- }
- }
- }
- },
- this);
+ // we need to retry to register since although
+ // zkClient reconnect, the ephemeral node may still exist
+ // for a while time, retry to wait the ephemeral node removed
+ // see ZOOKEEPER-2985
+ while (true) {
+ try {
+ zkClient.registerCoordinatorLeader(coordinatorAddress);
+ break;
+ } catch (KeeperException.NodeExistsException nodeExistsException) {
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ if (elapsedTime >= ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS) {
+ LOG.error(
+ "Coordinator Server register to Zookeeper exceeded
total retry time of {} ms. "
+ + "Aborting registration attempts.",
+ ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS);
+ throw nodeExistsException;
+ }
+
+ LOG.warn(
+ "Coordinator server already registered in Zookeeper. "
+ + "retrying register after {} ms....",
+ ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS);
+ try {
+ Thread.sleep(ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS);
+ } catch (InterruptedException interruptedException) {
+ Thread.currentThread().interrupt();
+ break;
+ }
+ }
+ }
}
private void createDefaultDatabase() {
diff --git
a/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java
b/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java
index 006152d0..76a75eb5 100644
---
a/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java
+++
b/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java
@@ -79,9 +79,6 @@ public class TabletServer extends ServerBase {
private static final Logger LOG =
LoggerFactory.getLogger(TabletServer.class);
- private static final long ZOOKEEPER_REGISTER_TOTAL_WAIT_TIME_MS = 60 *
1000L;
- private static final long ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS = 3 * 1000L;
-
private final int serverId;
/**