This is an automated email from the ASF dual-hosted git repository.
tison pushed a commit to branch branch-3.9.4
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/branch-3.9.4 by this push:
new 311239892 ZOOKEEPER-4921: Retry endlessly to establish a brand-new
session (#2265)
311239892 is described below
commit 311239892255df52929cca1a336968026c586541
Author: Kezhu Wang <[email protected]>
AuthorDate: Tue Jun 17 11:47:09 2025 +0800
ZOOKEEPER-4921: Retry endlessly to establish a brand-new session (#2265)
This partially rollback ZOOKEEPER-4508 to keep consistent with versions
prior to 3.9.3 (excluded), so to maintain compatibility with third party
libraries.
Refs: ZOOKEEPER-4508, ZOOKEEPER-4921, ZOOKEEPER-4923 and
https://lists.apache.org/thread/nfb9z7rhgglbjzfxvg4z2m3pks53b3c1
---
.../main/java/org/apache/zookeeper/ClientCnxn.java | 2 +-
.../apache/zookeeper/test/SessionTimeoutTest.java | 65 +++++++++++++++-------
2 files changed, 47 insertions(+), 20 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
index 2d50acb01..d51313cd3 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
@@ -1241,7 +1241,7 @@ public void run() {
to = connectTimeout - clientCnxnSocket.getIdleSend();
}
- int expiration = expirationTimeout -
clientCnxnSocket.getIdleRecv();
+ int expiration = sessionId == 0 ? Integer.MAX_VALUE :
expirationTimeout - clientCnxnSocket.getIdleRecv();
if (expiration <= 0) {
String warnInfo = String.format(
"Client session timed out, have not heard from
server in %dms for session id 0x%s",
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java
index 7a59f5eb9..9f5943f68 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java
@@ -18,6 +18,9 @@
package org.apache.zookeeper.test;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThan;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -31,12 +34,15 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.TestableZooKeeper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.Time;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
@@ -54,6 +60,21 @@ public void setUp() throws Exception {
zk = createClient();
}
+ private static class ExpiredWatcher implements Watcher {
+ public volatile CompletableFuture<Void> expired = new
CompletableFuture<>();
+
+ synchronized void reset() {
+ expired = new CompletableFuture<>();
+ }
+
+ @Override
+ public synchronized void process(WatchedEvent event) {
+ if (event.getState() == Event.KeeperState.Expired) {
+ expired.complete(null);
+ }
+ }
+ }
+
private static class BusyServer implements AutoCloseable {
private final ServerSocket server;
private final Socket client;
@@ -143,17 +164,24 @@ public void testSessionExpirationAfterAllServerDown()
throws Exception {
// stop client also to gain less distraction
zk.close();
- // small connection timeout to gain quick ci feedback
- int sessionTimeout = 3000;
- CompletableFuture<Void> expired = new CompletableFuture<>();
+ // given: established session
+ int sessionTimeout = 3000; // small connection timeout to gain quick
ci feedback
+ ExpiredWatcher watcher = new ExpiredWatcher();
zk = createClient(new CountdownWatcher(), hostPort, sessionTimeout);
- zk.register(event -> {
- if (event.getState() == Watcher.Event.KeeperState.Expired) {
- expired.complete(null);
- }
- });
+ zk.register(watcher);
+
+ // when: all server down
+ long start = Time.currentElapsedTime();
+ zk.sync("/"); // touch timeout counts
stopServer();
- expired.join();
+
+ // then: get Expired after session timeout
+ watcher.expired.join();
+ long elapsed = Time.currentElapsedTime() - start;
+ assertThat(elapsed, greaterThanOrEqualTo((long)
zk.getSessionTimeout()));
+ assertThat(elapsed, lessThan(zk.getSessionTimeout() * 10L));
+
+ // then: future request will get SessionExpiredException
assertThrows(KeeperException.SessionExpiredException.class, () ->
zk.exists("/", null));
}
@@ -162,18 +190,17 @@ public void testSessionExpirationWhenNoServerUp() throws
Exception {
// stop client also to gain less distraction
zk.close();
+ // given: unavailable cluster
stopServer();
- // small connection timeout to gain quick ci feedback
- int sessionTimeout = 3000;
- CompletableFuture<Void> expired = new CompletableFuture<>();
- new TestableZooKeeper(hostPort, sessionTimeout, event -> {
- if (event.getState() == Watcher.Event.KeeperState.Expired) {
- expired.complete(null);
- }
- });
- expired.join();
- assertThrows(KeeperException.SessionExpiredException.class, () ->
zk.exists("/", null));
+ // when: try to establish a brand-new session
+ int sessionTimeout = 300; // small connection timeout to gain quick ci
feedback
+ ExpiredWatcher watcher = new ExpiredWatcher();
+ try (ZooKeeper zk = new ZooKeeper(hostPort, sessionTimeout, watcher)) {
+ // then: never Expired
+ assertThrows(TimeoutException.class, () -> watcher.expired.get(3 *
sessionTimeout, TimeUnit.MILLISECONDS));
+ assertThrows(KeeperException.ConnectionLossException.class, () ->
zk.exists("/", null));
+ }
}
@Test