This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 1c903df95 ZOOKEEPER-4923: Add timeout to control brand-new session
establishment
1c903df95 is described below
commit 1c903df950553610b3c67f2edda61f40205c5cc7
Author: Kezhu Wang <[email protected]>
AuthorDate: Tue Dec 16 06:39:07 2025 +0800
ZOOKEEPER-4923: Add timeout to control brand-new session establishment
Reviewers: anmolnar, cnauroth, anmolnar
Author: kezhuw
Closes #2253 from kezhuw/ZOOKEEPER-4923-brand-new-session-timeout-control
---
.../main/java/org/apache/zookeeper/ClientCnxn.java | 40 ++++++++++++++++++-
.../main/java/org/apache/zookeeper/ZooKeeper.java | 3 ++
.../apache/zookeeper/client/ZooKeeperBuilder.java | 17 ++++++++
.../apache/zookeeper/client/ZooKeeperOptions.java | 11 ++++++
.../zookeeper/ClientCnxnSocketFragilityTest.java | 10 +++--
.../apache/zookeeper/ClientRequestTimeoutTest.java | 4 ++
.../apache/zookeeper/test/SessionTimeoutTest.java | 46 ++++++++++++++++++++++
7 files changed, 127 insertions(+), 4 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
index 7663e27f6..020f9408a 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
@@ -168,6 +168,8 @@ static class AuthData {
private final int sessionTimeout;
+ private final long newSessionTimeout;
+
private final ZKWatchManager watchManager;
private long sessionId;
@@ -398,6 +400,36 @@ public ClientCnxn(
long sessionId,
byte[] sessionPasswd,
boolean canBeReadOnly
+ ) throws IOException {
+ this(hostProvider, sessionTimeout, Long.MAX_VALUE, clientConfig,
defaultWatcher, clientCnxnSocket, sessionId, sessionPasswd, canBeReadOnly);
+ }
+
+ /**
+ * Creates a connection object. The actual network connect doesn't get
+ * established until needed. The start() instance method must be called
+ * after construction.
+ *
+ * @param hostProvider the list of ZooKeeper servers to connect to
+ * @param sessionTimeout the timeout for connections.
+ * @param newSessionTimeout the timeout before giving up brand-new session
establishment.
+ * @param clientConfig the client configuration.
+ * @param defaultWatcher default watcher for this connection
+ * @param clientCnxnSocket the socket implementation used (e.g. NIO/Netty)
+ * @param sessionId session id if re-establishing session
+ * @param sessionPasswd session passwd if re-establishing session
+ * @param canBeReadOnly whether the connection is allowed to go to
read-only mode in case of partitioning
+ * @throws IOException in cases of broken network
+ */
+ public ClientCnxn(
+ HostProvider hostProvider,
+ int sessionTimeout,
+ long newSessionTimeout,
+ ZKClientConfig clientConfig,
+ Watcher defaultWatcher,
+ ClientCnxnSocket clientCnxnSocket,
+ long sessionId,
+ byte[] sessionPasswd,
+ boolean canBeReadOnly
) throws IOException {
this.hostProvider = hostProvider;
this.sessionTimeout = sessionTimeout;
@@ -413,6 +445,7 @@ public ClientCnxn(
this.connectTimeout = sessionTimeout / hostProvider.size();
this.readTimeout = sessionTimeout * 2 / 3;
this.expirationTimeout = sessionTimeout * 4 / 3;
+ this.newSessionTimeout = newSessionTimeout == 0 ? expirationTimeout :
newSessionTimeout;
this.sendThread = new SendThread(clientCnxnSocket);
this.eventThread = new EventThread();
@@ -1192,7 +1225,12 @@ public void run() {
to = connectTimeout - clientCnxnSocket.getIdleSend();
}
- int expiration = sessionId == 0 ? Integer.MAX_VALUE :
expirationTimeout - clientCnxnSocket.getIdleRecv();
+ long expiration;
+ if (sessionId == 0) {
+ expiration = newSessionTimeout -
clientCnxnSocket.getIdleRecv();
+ } else {
+ expiration = expirationTimeout -
clientCnxnSocket.getIdleRecv();
+ }
if (expiration <= 0) {
String warnInfo = String.format(
"Client session timed out, have not heard from
server in %dms for session id 0x%s",
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
index b3b5af454..7533e01a9 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java
@@ -697,6 +697,7 @@ public ZooKeeper(
ClientCnxn createConnection(
HostProvider hostProvider,
int sessionTimeout,
+ long newSessionTimeout,
ZKClientConfig clientConfig,
Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
@@ -707,6 +708,7 @@ ClientCnxn createConnection(
return new ClientCnxn(
hostProvider,
sessionTimeout,
+ newSessionTimeout,
clientConfig,
defaultWatcher,
clientCnxnSocket,
@@ -1148,6 +1150,7 @@ public ZooKeeper(ZooKeeperOptions options) throws
IOException {
cnxn = createConnection(
hostProvider,
sessionTimeout,
+ options.getNewSessionTimeoutMs(),
this.clientConfig,
watcher,
getClientCnxnSocket(),
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperBuilder.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperBuilder.java
index f484dcfee..03b5f4c4e 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperBuilder.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperBuilder.java
@@ -39,6 +39,7 @@
public class ZooKeeperBuilder {
private final String connectString;
private final Duration sessionTimeout;
+ private Duration newSessionTimeout = Duration.ofSeconds(Long.MAX_VALUE,
999_999_999L);
private Function<Collection<InetSocketAddress>, HostProvider> hostProvider;
private Watcher defaultWatcher;
private boolean canBeReadOnly = false;
@@ -117,6 +118,21 @@ public ZooKeeperBuilder withSession(long sessionId, byte[]
sessionPasswd) {
return this;
}
+ /**
+ * Specifies timeout to establish a brand-new session.
+ *
+ * @param timeout timeout to get {@link
org.apache.zookeeper.Watcher.Event.KeeperState#Expired} in establishing a
+ * brand-new session. {@code
Duration.ofSeconds(Long.MAX_VALUE, 999_999_999L)}, which is the default,
+ * means endless retry until connected. {@code
Duration.ZERO} means a sensible value deduced from
+ * specified session timeout, currently, it is approximate
{@code sessionTimeout * 4 / 3}.
+ * @return this
+ * @since 3.10.0
+ */
+ public ZooKeeperBuilder withNewSessionTimeout(Duration timeout) {
+ this.newSessionTimeout = timeout;
+ return this;
+ }
+
/**
* Specifies the client config used to construct ZooKeeper instances.
*
@@ -143,6 +159,7 @@ public ZooKeeperOptions toOptions() {
return new ZooKeeperOptions(
connectString,
sessionTimeout,
+ newSessionTimeout,
defaultWatcher,
hostProvider,
canBeReadOnly,
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperOptions.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperOptions.java
index 52a173ebf..2605cb0f8 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperOptions.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/client/ZooKeeperOptions.java
@@ -33,6 +33,7 @@
public class ZooKeeperOptions {
private final String connectString;
private final Duration sessionTimeout;
+ private final Duration newSessionTimeout;
private final Watcher defaultWatcher;
private final Function<Collection<InetSocketAddress>, HostProvider>
hostProvider;
private final boolean canBeReadOnly;
@@ -42,6 +43,7 @@ public class ZooKeeperOptions {
ZooKeeperOptions(String connectString,
Duration sessionTimeout,
+ Duration newSessionTimeout,
Watcher defaultWatcher,
Function<Collection<InetSocketAddress>, HostProvider>
hostProvider,
boolean canBeReadOnly,
@@ -50,6 +52,7 @@ public class ZooKeeperOptions {
ZKClientConfig clientConfig) {
this.connectString = connectString;
this.sessionTimeout = sessionTimeout;
+ this.newSessionTimeout = newSessionTimeout;
this.hostProvider = hostProvider;
this.defaultWatcher = defaultWatcher;
this.canBeReadOnly = canBeReadOnly;
@@ -66,6 +69,14 @@ public int getSessionTimeoutMs() {
return (int) Long.min(Integer.MAX_VALUE, sessionTimeout.toMillis());
}
+ public long getNewSessionTimeoutMs() {
+ try {
+ return newSessionTimeout.toMillis();
+ } catch (ArithmeticException ignored) {
+ return Long.MAX_VALUE;
+ }
+ }
+
public Watcher getDefaultWatcher() {
return defaultWatcher;
}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
index 2b70a599d..27c07fe90 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
@@ -96,7 +96,7 @@ public void testSocketClosedAfterFailure() throws Exception {
BusyServer server = new BusyServer();
ZooKeeper zk = new ZooKeeper(server.getHostPort(), (int)
sessionTimeout.toMillis(), null) {
@Override
- ClientCnxn createConnection(HostProvider hostProvider, int
sessionTimeout, ZKClientConfig clientConfig, Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd,
boolean canBeReadOnly) throws IOException {
+ ClientCnxn createConnection(HostProvider hostProvider, int
sessionTimeout, long newSessionTimeout, ZKClientConfig clientConfig, Watcher
defaultWatcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[]
sessionPasswd, boolean canBeReadOnly) throws IOException {
ClientCnxnSocketNIO socket = spy((ClientCnxnSocketNIO)
clientCnxnSocket);
doAnswer(mock -> {
@@ -110,7 +110,7 @@ ClientCnxn createConnection(HostProvider hostProvider, int
sessionTimeout, ZKCli
}).when(socket).createSock();
nioSelector.set(socket.getSelector());
- return super.createConnection(hostProvider,
sessionTimeout, clientConfig, defaultWatcher, socket, sessionId, sessionPasswd,
canBeReadOnly);
+ return super.createConnection(hostProvider,
sessionTimeout, newSessionTimeout, clientConfig, defaultWatcher, socket,
sessionId, sessionPasswd, canBeReadOnly);
}
}) {
@@ -328,6 +328,7 @@ class CustomClientCnxn extends ClientCnxn {
public CustomClientCnxn(
HostProvider hostProvider,
int sessionTimeout,
+ long newSessionTimeout,
ZKClientConfig zkClientConfig,
Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
@@ -338,6 +339,7 @@ public CustomClientCnxn(
super(
hostProvider,
sessionTimeout,
+ newSessionTimeout,
zkClientConfig,
defaultWatcher,
clientCnxnSocket,
@@ -403,6 +405,7 @@ public boolean isAlive() {
ClientCnxn createConnection(
HostProvider hostProvider,
int sessionTimeout,
+ long newSessionTimeout,
ZKClientConfig clientConfig,
Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
@@ -415,6 +418,7 @@ ClientCnxn createConnection(
ClientCnxnSocketFragilityTest.this.cnxn = new CustomClientCnxn(
hostProvider,
sessionTimeout,
+ newSessionTimeout,
clientConfig,
defaultWatcher,
clientCnxnSocket,
@@ -424,4 +428,4 @@ ClientCnxn createConnection(
return ClientCnxnSocketFragilityTest.this.cnxn;
}
}
-}
\ No newline at end of file
+}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
index 93f801cab..a05825ce2 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
@@ -225,6 +225,7 @@ class CustomClientCnxn extends ClientCnxn {
CustomClientCnxn(
HostProvider hostProvider,
int sessionTimeout,
+ long newSessionTimeout,
ZKClientConfig clientConfig,
Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
@@ -235,6 +236,7 @@ class CustomClientCnxn extends ClientCnxn {
super(
hostProvider,
sessionTimeout,
+ newSessionTimeout,
clientConfig,
defaultWatcher,
clientCnxnSocket,
@@ -286,6 +288,7 @@ public CustomZooKeeper(String connectString, int
sessionTimeout, Watcher watcher
ClientCnxn createConnection(
HostProvider hostProvider,
int sessionTimeout,
+ long newSessionTimeout,
ZKClientConfig clientConfig,
Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket,
@@ -296,6 +299,7 @@ ClientCnxn createConnection(
return new CustomClientCnxn(
hostProvider,
sessionTimeout,
+ newSessionTimeout,
clientConfig,
defaultWatcher,
clientCnxnSocket,
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java
index 86659ba70..5688d9f55 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java
@@ -19,6 +19,7 @@
package org.apache.zookeeper.test;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -27,6 +28,7 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
+import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -40,6 +42,7 @@
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZooKeeperBuilder;
import org.apache.zookeeper.common.BusyServer;
import org.apache.zookeeper.common.Time;
import org.junit.jupiter.api.BeforeEach;
@@ -176,6 +179,49 @@ public void testSessionExpirationWhenNoServerUp() throws
Exception {
assertThrows(TimeoutException.class, () -> watcher.expired.get(3 *
sessionTimeout, TimeUnit.MILLISECONDS));
assertThrows(KeeperException.ConnectionLossException.class, () ->
zk.exists("/", null));
}
+
+ // when: try to establish a brand-new session using builder with
default newSessionTimeout
+ watcher.reset();
+ try (ZooKeeper zk = new ZooKeeperBuilder(hostPort,
Duration.ofMillis(sessionTimeout))
+ .withDefaultWatcher(watcher)
+ .build()) {
+ // then: never Expired
+ assertThrows(TimeoutException.class, () -> watcher.expired.get(3 *
sessionTimeout, TimeUnit.MILLISECONDS));
+ assertThrows(KeeperException.ConnectionLossException.class, () ->
zk.exists("/", null));
+ }
+
+ // when: try to establish a brand-new session using builder with
Duration.ZERO newSessionTimeout
+ watcher.reset();
+ long start = Time.currentElapsedTime();
+ try (ZooKeeper zk = new ZooKeeperBuilder(hostPort,
Duration.ofMillis(sessionTimeout))
+ .withDefaultWatcher(watcher)
+ .withNewSessionTimeout(Duration.ZERO)
+ .build()) {
+ // then: get Expired after some delay
+ watcher.expired.join();
+ long elapsed = Time.currentElapsedTime() - start;
+ assertThat(elapsed, greaterThan((long) sessionTimeout));
+ assertThat(elapsed, lessThan(sessionTimeout * 10L));
+ // then: future request will get SessionExpiredException
+ assertThrows(KeeperException.SessionExpiredException.class, () ->
zk.exists("/", null));
+ }
+
+ // when: try to establish a brand-new session using builder with
custom newSessionTimeout
+ watcher.reset();
+ start = Time.currentElapsedTime();
+ Duration newSessionTimeout = Duration.ofMillis(300);
+ try (ZooKeeper zk = new ZooKeeperBuilder(hostPort,
Duration.ofMillis(30000))
+ .withDefaultWatcher(watcher)
+ .withNewSessionTimeout(newSessionTimeout)
+ .build()) {
+ // then: get Expired after newSessionTimeout
+ watcher.expired.join();
+ long elapsed = Time.currentElapsedTime() - start;
+ assertThat(elapsed,
greaterThanOrEqualTo(newSessionTimeout.toMillis()));
+ assertThat(elapsed, lessThan(newSessionTimeout.toMillis() * 10));
+ // then: future request will get SessionExpiredException
+ assertThrows(KeeperException.SessionExpiredException.class, () ->
zk.exists("/", null));
+ }
}
@Test