This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new cf3c05d66 [CELEBORN-2068] TransportClientFactory should close channel
explicitly to avoid resource leak for timeout or failure
cf3c05d66 is described below
commit cf3c05d6683eb1f96895478ba3e9c2668f3aaca2
Author: SteNicholas <[email protected]>
AuthorDate: Fri Jul 18 17:50:08 2025 +0800
[CELEBORN-2068] TransportClientFactory should close channel explicitly to
avoid resource leak for timeout or failure
### What changes were proposed in this pull request?
`TransportClientFactory` should close channel explicitly to avoid resource
leak for timeout or failure.
### Why are the changes needed?
There is resource leak risk for timeout or failure in
`TransportClientFactory#internalCreateClient`.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manual test.
Closes #3368 from SteNicholas/CELEBORN-2068.
Authored-by: SteNicholas <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../common/network/client/TransportClientFactory.java | 16 ++++++++++++----
1 file changed, 12 insertions(+), 4 deletions(-)
diff --git
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
index aff94a802..f9bd73355 100644
---
a/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
+++
b/common/src/main/java/org/apache/celeborn/common/network/client/TransportClientFactory.java
@@ -287,7 +287,6 @@ public class TransportClientFactory implements Closeable {
}
final AtomicReference<TransportClient> clientRef = new AtomicReference<>();
- final AtomicReference<Channel> channelRef = new AtomicReference<>();
bootstrap.handler(
new ChannelInitializer<SocketChannel>() {
@@ -295,7 +294,6 @@ public class TransportClientFactory implements Closeable {
public void initChannel(SocketChannel ch) {
TransportChannelHandler clientHandler =
context.initializePipeline(ch, decoder, true);
clientRef.set(clientHandler.getClient());
- channelRef.set(ch);
}
});
@@ -311,9 +309,11 @@ public class TransportClientFactory implements Closeable {
throw new IOException(String.format("Failed to connect to %s",
address), cf.cause());
}
} else if (!cf.await(connectTimeoutMs)) {
+ closeChannel(cf);
throw new CelebornIOException(
String.format("Connecting to %s timed out (%s ms)", address,
connectTimeoutMs));
} else if (cf.cause() != null) {
+ closeChannel(cf);
throw new CelebornIOException(String.format("Failed to connect to %s",
address), cf.cause());
}
if (context.sslEncryptionEnabled()) {
@@ -333,12 +333,12 @@ public class TransportClientFactory implements Closeable {
"failed to complete TLS handshake to {}",
address,
handshakeFuture.cause());
- cf.channel().close();
+ closeChannel(cf);
}
}
});
if (!future.await(connectionTimeoutMs)) {
- cf.channel().close();
+ closeChannel(cf);
throw new IOException(
String.format("Failed to connect to %s within connection timeout",
address));
}
@@ -398,4 +398,12 @@ public class TransportClientFactory implements Closeable {
public TransportContext getContext() {
return context;
}
+
+ private void closeChannel(ChannelFuture channelFuture) {
+ try {
+ channelFuture.channel().close();
+ } catch (Exception e) {
+ logger.warn("Failed to close channel", e);
+ }
+ }
}