This is an automated email from the ASF dual-hosted git repository.

cnauroth pushed a commit to branch branch-3.9
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/branch-3.9 by this push:
     new c67611222 ZOOKEEPER-4909: Fix exceeded request timeout in case of 
spurious wakeup
c67611222 is described below

commit c67611222a61247342ad7cc6dbf76739c7f59c3e
Author: luozongle01 <[email protected]>
AuthorDate: Tue Mar 25 00:38:10 2025 +0000

    ZOOKEEPER-4909: Fix exceeded request timeout in case of spurious wakeup
    
    Closes #2237
    
    Co-authored-by: Kezhu Wang <[email protected]>
    Signed-off-by: Kezhu Wang <[email protected]>
    Signed-off-by: Chris Nauroth <[email protected]>
    (cherry picked from commit 9cc30438a4d4f8e55042b064edc95eeb1c8ac9a6)
---
 .../main/java/org/apache/zookeeper/ClientCnxn.java |  18 +--
 .../apache/zookeeper/ClientRequestTimeoutTest.java | 131 +++++++++++++++++++++
 2 files changed, 141 insertions(+), 8 deletions(-)

diff --git 
a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java 
b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
index 8974bdb19..2d50acb01 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
@@ -1589,14 +1589,16 @@ public ReplyHeader submitRequest(
      * Wait for request completion with timeout.
      */
     private void waitForPacketFinish(ReplyHeader r, Packet packet) throws 
InterruptedException {
-        long waitStartTime = Time.currentElapsedTime();
-        while (!packet.finished) {
-            packet.wait(requestTimeout);
-            if (!packet.finished && ((Time.currentElapsedTime() - 
waitStartTime) >= requestTimeout)) {
-                LOG.error("Timeout error occurred for the packet '{}'.", 
packet);
-                r.setErr(Code.REQUESTTIMEOUT.intValue());
-                break;
-            }
+        long remainingTime = requestTimeout;
+        while (!packet.finished && remainingTime > 0) {
+            long waitStartTime = Time.currentElapsedTime();
+            packet.wait(remainingTime);
+            remainingTime -= (Time.currentElapsedTime() - waitStartTime);
+        }
+
+        if (!packet.finished) {
+            LOG.error("Timeout error occurred for the packet '{}'.", packet);
+            r.setErr(Code.REQUESTTIMEOUT.intValue());
         }
     }
 
diff --git 
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
 
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
index ecf39273e..a22ea79b2 100644
--- 
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
+++ 
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java
@@ -19,13 +19,21 @@
 package org.apache.zookeeper;
 
 import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThan;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.jute.Record;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.client.HostProvider;
 import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.proto.RequestHeader;
 import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
 import org.apache.zookeeper.test.ClientBase;
 import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
@@ -37,6 +45,9 @@ public class ClientRequestTimeoutTest extends 
QuorumPeerTestBase {
     private static final int SERVER_COUNT = 3;
     private boolean dropPacket = false;
     private int dropPacketType = ZooDefs.OpCode.create;
+    private boolean capturePacket = false;
+    private int capturePacketType = ZooDefs.OpCode.create;
+    private ClientCnxn.Packet capturedPacket = null;
 
     @Test
     @Timeout(value = 120)
@@ -94,6 +105,105 @@ public void testClientRequestTimeout() throws Exception {
         }
     }
 
+    @Test
+    void testClientRequestTimeoutTime() throws Exception {
+        long requestTimeout = TimeUnit.SECONDS.toMillis(5);
+        System.setProperty("zookeeper.request.timeout", 
Long.toString(requestTimeout));
+
+        CustomZooKeeper zk = null;
+        int clientPort = PortAssignment.unique();
+        MainThread mainThread = new MainThread(0, clientPort, "", false);
+        mainThread.start();
+        try {
+            assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPort, 
CONNECTION_TIMEOUT),
+                    "waiting for server 0 being up");
+
+            CountdownWatcher watch = new CountdownWatcher();
+            zk = new CustomZooKeeper(getCxnString(new int[]{clientPort}), 
ClientBase.CONNECTION_TIMEOUT, watch);
+            watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+
+            dropPacket = true;
+            dropPacketType = ZooDefs.OpCode.create;
+
+            String data = "originalData";
+            long startTime = Time.currentElapsedTime();
+            try {
+                zk.create("/testClientRequestTimeout", data.getBytes(), 
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
+                fail("KeeperException is expected.");
+            } catch (KeeperException exception) {
+                long cost = Time.currentElapsedTime() - startTime;
+                assertEquals(KeeperException.Code.REQUESTTIMEOUT, 
exception.code());
+                LOG.info("testClientRequestTimeoutTime cost:{}", cost);
+                assertThat(cost, greaterThanOrEqualTo(requestTimeout));
+                assertThat(cost, lessThan(requestTimeout + 500));
+            }
+        } finally {
+            mainThread.shutdown();
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+
+
+    @Test
+    void testClientRequestTimeoutTimeSimulatingSpuriousWakeup() throws 
Exception {
+        long requestTimeout = TimeUnit.SECONDS.toMillis(5);
+        System.setProperty("zookeeper.request.timeout", 
Long.toString(requestTimeout));
+
+        CustomZooKeeper zk = null;
+        int clientPort = PortAssignment.unique();
+        MainThread mainThread = new MainThread(0, clientPort, "", false);
+        mainThread.start();
+        try {
+            assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPort, 
CONNECTION_TIMEOUT),
+                    "waiting for server 0 being up");
+
+            CountdownWatcher watch = new CountdownWatcher();
+            zk = new CustomZooKeeper(getCxnString(new int[]{clientPort}), 
ClientBase.CONNECTION_TIMEOUT, watch);
+            watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+
+            dropPacket = true;
+            dropPacketType = ZooDefs.OpCode.create;
+            capturePacket = true;
+            capturePacketType = ZooDefs.OpCode.create;
+
+            // Simulating spurious wakeup
+            new Thread(() -> {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(requestTimeout / 2);
+                    if (capturedPacket != null) {
+                        synchronized (capturedPacket) {
+                            capturedPacket.notifyAll();
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }).start();
+
+            String data = "originalData";
+            long startTime = Time.currentElapsedTime();
+            try {
+                zk.create("/testClientRequestTimeout", data.getBytes(), 
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
+                fail("KeeperException is expected.");
+            } catch (KeeperException exception) {
+                long cost = Time.currentElapsedTime() - startTime;
+                assertEquals(KeeperException.Code.REQUESTTIMEOUT, 
exception.code());
+                LOG.info("testClientRequestTimeoutTimeSimulatingSpuriousWakeup 
cost:{}", cost);
+                assertThat(cost, greaterThanOrEqualTo(requestTimeout));
+                assertThat(cost, lessThan(requestTimeout + 500));
+            }
+        } finally {
+            capturePacket = false;
+            capturedPacket = null;
+            mainThread.shutdown();
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+
     /**
      * @return connection string in the form of
      *         127.0.0.1:port1,127.0.0.1:port2,127.0.0.1:port3
@@ -141,6 +251,27 @@ public void finishPacket(Packet p) {
             super.finishPacket(p);
         }
 
+        @Override
+        public Packet queuePacket(
+                RequestHeader h,
+                ReplyHeader r,
+                Record request,
+                Record response,
+                AsyncCallback cb,
+                String clientPath,
+                String serverPath,
+                Object ctx,
+                ZooKeeper.WatchRegistration watchRegistration,
+                WatchDeregistration watchDeregistration) {
+            Packet packet = super.queuePacket(h, r, request, response, cb, 
clientPath, serverPath,
+                    ctx, watchRegistration, watchDeregistration);
+
+            if (capturePacket && h != null && h.getType() == 
capturePacketType) {
+                capturedPacket = packet;
+            }
+            return packet;
+        }
+
     }
 
     class CustomZooKeeper extends ZooKeeper {

Reply via email to