This is an automated email from the ASF dual-hosted git repository.
cnauroth pushed a commit to branch branch-3.9
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/branch-3.9 by this push:
new c67611222 ZOOKEEPER-4909: Fix exceeded request timeout in case of
spurious wakeup
c67611222 is described below
commit c67611222a61247342ad7cc6dbf76739c7f59c3e
Author: luozongle01 <[email protected]>
AuthorDate: Tue Mar 25 00:38:10 2025 +0000
ZOOKEEPER-4909: Fix exceeded request timeout in case of spurious wakeup
Closes #2237
Co-authored-by: Kezhu Wang <[email protected]>
Signed-off-by: Kezhu Wang <[email protected]>
Signed-off-by: Chris Nauroth <[email protected]>
(cherry picked from commit 9cc30438a4d4f8e55042b064edc95eeb1c8ac9a6)
---
.../main/java/org/apache/zookeeper/ClientCnxn.java | 18 +--
.../apache/zookeeper/ClientRequestTimeoutTest.java | 131 +++++++++++++++++++++
2 files changed, 141 insertions(+), 8 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
index 8974bdb19..2d50acb01 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
@@ -1589,14 +1589,16 @@ public ReplyHeader submitRequest(
* Wait for request completion with timeout.
*/
private void waitForPacketFinish(ReplyHeader r, Packet packet) throws
InterruptedException {
- long waitStartTime = Time.currentElapsedTime();
- while (!packet.finished) {
- packet.wait(requestTimeout);
- if (!packet.finished && ((Time.currentElapsedTime() -
waitStartTime) >= requestTimeout)) {
- LOG.error("Timeout error occurred for the packet '{}'.",
packet);
- r.setErr(Code.REQUESTTIMEOUT.intValue());
- break;
- }
+ long remainingTime = requestTimeout;
+ while (!packet.finished && remainingTime > 0) {
+ long waitStartTime = Time.currentElapsedTime();
+ packet.wait(remainingTime);
+ remainingTime -= (Time.currentElapsedTime() - waitStartTime);
+ }
+
+ if (!packet.finished) {
+ LOG.error("Timeout error occurred for the packet '{}'.", packet);
+ r.setErr(Code.REQUESTTIMEOUT.intValue());
}
}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
index ecf39273e..a22ea79b2 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
@@ -19,13 +19,21 @@
package org.apache.zookeeper;
import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThan;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.jute.Record;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.client.HostProvider;
import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.proto.RequestHeader;
import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
import org.apache.zookeeper.test.ClientBase;
import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
@@ -37,6 +45,9 @@ public class ClientRequestTimeoutTest extends
QuorumPeerTestBase {
private static final int SERVER_COUNT = 3;
private boolean dropPacket = false;
private int dropPacketType = ZooDefs.OpCode.create;
+ private boolean capturePacket = false;
+ private int capturePacketType = ZooDefs.OpCode.create;
+ private ClientCnxn.Packet capturedPacket = null;
@Test
@Timeout(value = 120)
@@ -94,6 +105,105 @@ public void testClientRequestTimeout() throws Exception {
}
}
+ @Test
+ void testClientRequestTimeoutTime() throws Exception {
+ long requestTimeout = TimeUnit.SECONDS.toMillis(5);
+ System.setProperty("zookeeper.request.timeout",
Long.toString(requestTimeout));
+
+ CustomZooKeeper zk = null;
+ int clientPort = PortAssignment.unique();
+ MainThread mainThread = new MainThread(0, clientPort, "", false);
+ mainThread.start();
+ try {
+ assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPort,
CONNECTION_TIMEOUT),
+ "waiting for server 0 being up");
+
+ CountdownWatcher watch = new CountdownWatcher();
+ zk = new CustomZooKeeper(getCxnString(new int[]{clientPort}),
ClientBase.CONNECTION_TIMEOUT, watch);
+ watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+
+ dropPacket = true;
+ dropPacketType = ZooDefs.OpCode.create;
+
+ String data = "originalData";
+ long startTime = Time.currentElapsedTime();
+ try {
+ zk.create("/testClientRequestTimeout", data.getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
+ fail("KeeperException is expected.");
+ } catch (KeeperException exception) {
+ long cost = Time.currentElapsedTime() - startTime;
+ assertEquals(KeeperException.Code.REQUESTTIMEOUT,
exception.code());
+ LOG.info("testClientRequestTimeoutTime cost:{}", cost);
+ assertThat(cost, greaterThanOrEqualTo(requestTimeout));
+ assertThat(cost, lessThan(requestTimeout + 500));
+ }
+ } finally {
+ mainThread.shutdown();
+ if (zk != null) {
+ zk.close();
+ }
+ }
+ }
+
+
+ @Test
+ void testClientRequestTimeoutTimeSimulatingSpuriousWakeup() throws
Exception {
+ long requestTimeout = TimeUnit.SECONDS.toMillis(5);
+ System.setProperty("zookeeper.request.timeout",
Long.toString(requestTimeout));
+
+ CustomZooKeeper zk = null;
+ int clientPort = PortAssignment.unique();
+ MainThread mainThread = new MainThread(0, clientPort, "", false);
+ mainThread.start();
+ try {
+ assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPort,
CONNECTION_TIMEOUT),
+ "waiting for server 0 being up");
+
+ CountdownWatcher watch = new CountdownWatcher();
+ zk = new CustomZooKeeper(getCxnString(new int[]{clientPort}),
ClientBase.CONNECTION_TIMEOUT, watch);
+ watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+
+ dropPacket = true;
+ dropPacketType = ZooDefs.OpCode.create;
+ capturePacket = true;
+ capturePacketType = ZooDefs.OpCode.create;
+
+ // Simulating spurious wakeup
+ new Thread(() -> {
+ try {
+ TimeUnit.MILLISECONDS.sleep(requestTimeout / 2);
+ if (capturedPacket != null) {
+ synchronized (capturedPacket) {
+ capturedPacket.notifyAll();
+ }
+ }
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }).start();
+
+ String data = "originalData";
+ long startTime = Time.currentElapsedTime();
+ try {
+ zk.create("/testClientRequestTimeout", data.getBytes(),
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
+ fail("KeeperException is expected.");
+ } catch (KeeperException exception) {
+ long cost = Time.currentElapsedTime() - startTime;
+ assertEquals(KeeperException.Code.REQUESTTIMEOUT,
exception.code());
+ LOG.info("testClientRequestTimeoutTimeSimulatingSpuriousWakeup
cost:{}", cost);
+ assertThat(cost, greaterThanOrEqualTo(requestTimeout));
+ assertThat(cost, lessThan(requestTimeout + 500));
+ }
+ } finally {
+ capturePacket = false;
+ capturedPacket = null;
+ mainThread.shutdown();
+ if (zk != null) {
+ zk.close();
+ }
+ }
+ }
+
/**
* @return connection string in the form of
* 127.0.0.1:port1,127.0.0.1:port2,127.0.0.1:port3
@@ -141,6 +251,27 @@ public void finishPacket(Packet p) {
super.finishPacket(p);
}
+ @Override
+ public Packet queuePacket(
+ RequestHeader h,
+ ReplyHeader r,
+ Record request,
+ Record response,
+ AsyncCallback cb,
+ String clientPath,
+ String serverPath,
+ Object ctx,
+ ZooKeeper.WatchRegistration watchRegistration,
+ WatchDeregistration watchDeregistration) {
+ Packet packet = super.queuePacket(h, r, request, response, cb,
clientPath, serverPath,
+ ctx, watchRegistration, watchDeregistration);
+
+ if (capturePacket && h != null && h.getType() ==
capturePacketType) {
+ capturedPacket = packet;
+ }
+ return packet;
+ }
+
}
class CustomZooKeeper extends ZooKeeper {