This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new e8e141b21 ZOOKEEPER-4736: Fix nio socket fd leak if network service is
down
e8e141b21 is described below
commit e8e141b21f3a07797958c74053762048c7a3a0bf
Author: lchqlchq <[email protected]>
AuthorDate: Fri Oct 31 04:21:09 2025 +0800
ZOOKEEPER-4736: Fix nio socket fd leak if network service is down
Reviewers: kezhuw, anmolnar
Author: lchqlchq
Closes #2047 from lchqlchq/fd
---
.../main/java/org/apache/zookeeper/ClientCnxn.java | 13 +++++-
.../org/apache/zookeeper/ClientCnxnSocketNIO.java | 6 +++
.../zookeeper/ClientCnxnSocketFragilityTest.java | 46 +++++++++++++++++++++
.../org/apache/zookeeper/common/BusyServer.java | 48 ++++++++++++++++++++++
.../apache/zookeeper/test/SessionTimeoutTest.java | 27 +-----------
5 files changed, 113 insertions(+), 27 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
index ed03359f7..7663e27f6 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java
@@ -1289,6 +1289,17 @@ public void run() {
"SendThread exited loop for session: 0x" +
Long.toHexString(getSessionId()));
}
+ private void abortConnection() {
+ try {
+ clientCnxnSocket.testableCloseSocket();
+ } catch (IOException e) {
+ LOG.debug("Fail to close ongoing socket", e);
+ }
+ }
+
+ /**
+ * This is not thread-safe and should only be called inside {@link
SendThread}.
+ */
private void cleanAndNotifyState() {
cleanup();
if (state.isAlive()) {
@@ -1531,7 +1542,7 @@ public ReplyHeader submitRequest(
}
}
if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
- sendThread.cleanAndNotifyState();
+ sendThread.abortConnection();
}
return r;
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java
index ea58b857e..e39bee118 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java
@@ -209,6 +209,12 @@ void cleanup() {
} catch (IOException e) {
LOG.debug("Ignoring exception during channel close", e);
}
+ try {
+ selector.wakeup();
+ selector.selectNow();
+ } catch (IOException e) {
+ LOG.debug("Ignoring exception during selecting of cancelled
socket", e);
+ }
}
try {
Thread.sleep(100);
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
index 54426f0b6..2b70a599d 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketFragilityTest.java
@@ -18,20 +18,32 @@
package org.apache.zookeeper;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.SocketException;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.time.Duration;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.zookeeper.ClientCnxn.Packet;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.client.HostProvider;
import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.BusyServer;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
import org.apache.zookeeper.test.ClientBase;
@@ -75,6 +87,40 @@ private void closeZookeeper(ZooKeeper zk) {
});
}
+ @Test
+ public void testSocketClosedAfterFailure() throws Exception {
+ Duration sessionTimeout = Duration.ofMillis(1000);
+ final AtomicReference<Selector> nioSelector = new AtomicReference<>();
+ try (
+ // given: busy server
+ BusyServer server = new BusyServer();
+ ZooKeeper zk = new ZooKeeper(server.getHostPort(), (int)
sessionTimeout.toMillis(), null) {
+ @Override
+ ClientCnxn createConnection(HostProvider hostProvider, int
sessionTimeout, ZKClientConfig clientConfig, Watcher defaultWatcher,
ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd,
boolean canBeReadOnly) throws IOException {
+ ClientCnxnSocketNIO socket = spy((ClientCnxnSocketNIO)
clientCnxnSocket);
+
+ doAnswer(mock -> {
+ SocketChannel spy = spy((SocketChannel)
mock.callRealMethod());
+ // when: connect get exception
+ //
+ // this could happen if system's network service is
unavailable,
+ // for examples, "ifdown eth0" or "service network
stop" and so on.
+ doThrow(new SocketException("Network is
unreachable")).when(spy).connect(any());
+ return spy;
+ }).when(socket).createSock();
+
+ nioSelector.set(socket.getSelector());
+ return super.createConnection(hostProvider,
sessionTimeout, clientConfig, defaultWatcher, socket, sessionId, sessionPasswd,
canBeReadOnly);
+ }
+ }) {
+
+ Thread.sleep(sessionTimeout.toMillis() * 5);
+
+ // then: sockets of failed connections are closed, so at most one
registered socket
+ assertThat(nioSelector.get().keys().size(), lessThanOrEqualTo(1));
+ }
+ }
+
@Test
public void testClientCnxnSocketFragility() throws Exception {
System.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET,
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/common/BusyServer.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/common/BusyServer.java
new file mode 100644
index 000000000..c2eece3d2
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/common/BusyServer.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.common;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+
+public class BusyServer implements AutoCloseable {
+ private final ServerSocket server;
+ private final Socket client;
+
+ public BusyServer() throws IOException {
+ this.server = new ServerSocket(0, 1,
InetAddress.getByName("127.0.0.1"));
+ this.client = new Socket("127.0.0.1", server.getLocalPort());
+ }
+
+ public int getLocalPort() {
+ return server.getLocalPort();
+ }
+
+ public String getHostPort() {
+ return String.format("127.0.0.1:%d", getLocalPort());
+ }
+
+ @Override
+ public void close() throws Exception {
+ client.close();
+ server.close();
+ }
+}
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java
index 9f5943f68..86659ba70 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTimeoutTest.java
@@ -27,8 +27,6 @@
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import java.io.IOException;
-import java.net.ServerSocket;
-import java.net.Socket;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -42,6 +40,7 @@
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.common.BusyServer;
import org.apache.zookeeper.common.Time;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -75,30 +74,6 @@ public synchronized void process(WatchedEvent event) {
}
}
- private static class BusyServer implements AutoCloseable {
- private final ServerSocket server;
- private final Socket client;
-
- public BusyServer() throws IOException {
- this.server = new ServerSocket(0, 1);
- this.client = new Socket("127.0.0.1", server.getLocalPort());
- }
-
- public int getLocalPort() {
- return server.getLocalPort();
- }
-
- public String getHostPort() {
- return String.format("127.0.0.1:%d", getLocalPort());
- }
-
- @Override
- public void close() throws Exception {
- client.close();
- server.close();
- }
- }
-
@Test
public void testSessionExpiration() throws InterruptedException,
KeeperException {
final CountDownLatch expirationLatch = new CountDownLatch(1);