This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8fea6dcd7ba IGNITE-25669 Fix Netty buffer leak in TcpClientChannel
(#6512)
8fea6dcd7ba is described below
commit 8fea6dcd7bab0536e83769588e79fefa8e4de98f
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Sep 1 16:27:03 2025 +0300
IGNITE-25669 Fix Netty buffer leak in TcpClientChannel (#6512)
Close unpacker if asyncContinuationExecutor throws an exception.
---
.../ignite/internal/client/TcpClientChannel.java | 86 +++++++++++++---------
.../apache/ignite/client/ConfigurationTest.java | 22 ++++++
2 files changed, 75 insertions(+), 33 deletions(-)
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 2472d0e1e4f..50a00ea8283 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -412,14 +412,12 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
}
}
- // Handle the response in the async continuation pool.
- return fut.handleAsync((unpacker, err) -> {
- if (err != null) {
- throw sneakyThrow(ViewUtils.ensurePublicException(err));
- }
-
- return complete(payloadReader, notificationFut, unpacker);
- }, asyncContinuationExecutor);
+ // Handle the response in the async continuation pool with
completeAsync.
+ return fut
+ .thenCompose(unpacker -> completeAsync(payloadReader,
notificationFut, unpacker))
+ .exceptionally(err -> {
+ throw
sneakyThrow(ViewUtils.ensurePublicException(err));
+ });
} catch (Throwable t) {
log.warn("Failed to send request [id=" + id + ", op=" + opCode +
", remoteAddress=" + cfg.getAddress() + "]: "
+ t.getMessage(), t);
@@ -434,6 +432,31 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
}
}
+ private <T> CompletableFuture<T> completeAsync(
+ @Nullable PayloadReader<T> payloadReader,
+ @Nullable CompletableFuture<PayloadInputChannel> notificationFut,
+ ClientMessageUnpacker unpacker
+ ) {
+ try {
+ CompletableFuture<T> resFut = new CompletableFuture<>();
+
+ // Use asyncContinuationExecutor explicitly to close unpacker if
the executor throws.
+ // With handleAsync et al we can't close the unpacker in that case.
+ asyncContinuationExecutor.execute(() -> {
+ try {
+ resFut.complete(complete(payloadReader, notificationFut,
unpacker));
+ } catch (Throwable t) {
+
resFut.completeExceptionally(ViewUtils.ensurePublicException(t));
+ }
+ });
+
+ return resFut;
+ } catch (Throwable t) {
+ unpacker.close();
+ return failedFuture(ViewUtils.ensurePublicException(t));
+ }
+ }
+
/**
* Completes the request future.
*
@@ -636,24 +659,16 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
});
return fut
- .handleAsync((unpacker, err) -> {
- if (err != null) {
- if (err instanceof TimeoutException || err.getCause()
instanceof TimeoutException) {
- metrics.handshakesFailedTimeoutIncrement();
- throw new
IgniteClientConnectionException(CONNECTION_ERR, "Handshake timeout",
endpoint(), err);
- }
-
- metrics.handshakesFailedIncrement();
- throw new
IgniteClientConnectionException(CONNECTION_ERR, "Handshake error", endpoint(),
err);
+ .thenCompose(unpacker -> completeAsync(r ->
handshakeRes(r.in()), null, unpacker))
+ .exceptionally(err -> {
+ if (err instanceof TimeoutException || err.getCause()
instanceof TimeoutException) {
+ metrics.handshakesFailedTimeoutIncrement();
+ throw new
IgniteClientConnectionException(CONNECTION_ERR, "Handshake timeout",
endpoint(), err);
}
- try {
- return complete(r -> handshakeRes(r.in()), null,
unpacker);
- } catch (Throwable th) {
- metrics.handshakesFailedIncrement();
- throw new
IgniteClientConnectionException(CONNECTION_ERR, "Handshake error", endpoint(),
th);
- }
- }, asyncContinuationExecutor);
+ metrics.handshakesFailedIncrement();
+ throw new IgniteClientConnectionException(CONNECTION_ERR,
"Handshake error", endpoint(), err);
+ });
}
/**
@@ -823,17 +838,22 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
// Add reference count before jumping onto another thread.
unpacker.retain();
- asyncContinuationExecutor.execute(() -> {
- try {
- if (!fut.complete(new PayloadInputChannel(this, unpacker,
null))) {
+ try {
+ asyncContinuationExecutor.execute(() -> {
+ try {
+ if (!fut.complete(new PayloadInputChannel(this, unpacker,
null))) {
+ unpacker.close();
+ }
+ } catch (Throwable e) {
unpacker.close();
- }
- } catch (Throwable e) {
- unpacker.close();
- log.error("Failed to handle server notification
[remoteAddress=" + cfg.getAddress() + "]: " + e.getMessage(), e);
- }
- });
+ log.error("Failed to handle server notification
[remoteAddress=" + cfg.getAddress() + "]: " + e.getMessage(), e);
+ }
+ });
+ } catch (Throwable t) {
+ unpacker.close();
+ throw t;
+ }
}
void checkTimeouts(long now) {
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
index a5834b52d9e..b306462360f 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
@@ -30,8 +30,10 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import org.apache.ignite.client.IgniteClient.Builder;
import org.apache.ignite.lang.IgniteException;
import org.junit.jupiter.api.Test;
@@ -174,4 +176,24 @@ public class ConfigurationTest extends AbstractClientTest {
executor.shutdown();
}
}
+
+ @Test
+ public void testAsyncContinuationExecutorException() {
+ AtomicLong reqId = new AtomicLong();
+
+ Builder clientBuilder = IgniteClient.builder()
+ .addresses("localhost:" + serverPort)
+ .asyncContinuationExecutor(command -> {
+ if (reqId.incrementAndGet() > 2) {
+ throw new RuntimeException("bad executor");
+ }
+
+ command.run();
+ });
+
+ try (var client = clientBuilder.build()) {
+ IgniteException ex = assertThrows(IgniteException.class, () ->
client.tables().tables());
+ assertThat(ex.getMessage(), containsString("bad executor"));
+ }
+ }
}