This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8fea6dcd7ba IGNITE-25669 Fix Netty buffer leak in TcpClientChannel 
(#6512)
8fea6dcd7ba is described below

commit 8fea6dcd7bab0536e83769588e79fefa8e4de98f
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Sep 1 16:27:03 2025 +0300

    IGNITE-25669 Fix Netty buffer leak in TcpClientChannel (#6512)
    
    Close unpacker if asyncContinuationExecutor throws an exception.
---
 .../ignite/internal/client/TcpClientChannel.java   | 86 +++++++++++++---------
 .../apache/ignite/client/ConfigurationTest.java    | 22 ++++++
 2 files changed, 75 insertions(+), 33 deletions(-)

diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 2472d0e1e4f..50a00ea8283 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -412,14 +412,12 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
                 }
             }
 
-            // Handle the response in the async continuation pool.
-            return fut.handleAsync((unpacker, err) -> {
-                if (err != null) {
-                    throw sneakyThrow(ViewUtils.ensurePublicException(err));
-                }
-
-                return complete(payloadReader, notificationFut, unpacker);
-            }, asyncContinuationExecutor);
+            // Handle the response in the async continuation pool with 
completeAsync.
+            return fut
+                    .thenCompose(unpacker -> completeAsync(payloadReader, 
notificationFut, unpacker))
+                    .exceptionally(err -> {
+                        throw 
sneakyThrow(ViewUtils.ensurePublicException(err));
+                    });
         } catch (Throwable t) {
             log.warn("Failed to send request [id=" + id + ", op=" + opCode + 
", remoteAddress=" + cfg.getAddress() + "]: "
                     + t.getMessage(), t);
@@ -434,6 +432,31 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
         }
     }
 
+    private <T> CompletableFuture<T> completeAsync(
+            @Nullable PayloadReader<T> payloadReader,
+            @Nullable CompletableFuture<PayloadInputChannel> notificationFut,
+            ClientMessageUnpacker unpacker
+    ) {
+        try {
+            CompletableFuture<T> resFut = new CompletableFuture<>();
+
+            // Use asyncContinuationExecutor explicitly to close unpacker if 
the executor throws.
+            // With handleAsync et al we can't close the unpacker in that case.
+            asyncContinuationExecutor.execute(() -> {
+                try {
+                    resFut.complete(complete(payloadReader, notificationFut, 
unpacker));
+                } catch (Throwable t) {
+                    
resFut.completeExceptionally(ViewUtils.ensurePublicException(t));
+                }
+            });
+
+            return resFut;
+        } catch (Throwable t) {
+            unpacker.close();
+            return failedFuture(ViewUtils.ensurePublicException(t));
+        }
+    }
+
     /**
      * Completes the request future.
      *
@@ -636,24 +659,16 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
         });
 
         return fut
-                .handleAsync((unpacker, err) -> {
-                    if (err != null) {
-                        if (err instanceof TimeoutException || err.getCause() 
instanceof TimeoutException) {
-                            metrics.handshakesFailedTimeoutIncrement();
-                            throw new 
IgniteClientConnectionException(CONNECTION_ERR, "Handshake timeout", 
endpoint(), err);
-                        }
-
-                        metrics.handshakesFailedIncrement();
-                        throw new 
IgniteClientConnectionException(CONNECTION_ERR, "Handshake error", endpoint(), 
err);
+                .thenCompose(unpacker -> completeAsync(r -> 
handshakeRes(r.in()), null, unpacker))
+                .exceptionally(err -> {
+                    if (err instanceof TimeoutException || err.getCause() 
instanceof TimeoutException) {
+                        metrics.handshakesFailedTimeoutIncrement();
+                        throw new 
IgniteClientConnectionException(CONNECTION_ERR, "Handshake timeout", 
endpoint(), err);
                     }
 
-                    try {
-                        return complete(r -> handshakeRes(r.in()), null, 
unpacker);
-                    } catch (Throwable th) {
-                        metrics.handshakesFailedIncrement();
-                        throw new 
IgniteClientConnectionException(CONNECTION_ERR, "Handshake error", endpoint(), 
th);
-                    }
-                }, asyncContinuationExecutor);
+                    metrics.handshakesFailedIncrement();
+                    throw new IgniteClientConnectionException(CONNECTION_ERR, 
"Handshake error", endpoint(), err);
+                });
     }
 
     /**
@@ -823,17 +838,22 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
         // Add reference count before jumping onto another thread.
         unpacker.retain();
 
-        asyncContinuationExecutor.execute(() -> {
-            try {
-                if (!fut.complete(new PayloadInputChannel(this, unpacker, 
null))) {
+        try {
+            asyncContinuationExecutor.execute(() -> {
+                try {
+                    if (!fut.complete(new PayloadInputChannel(this, unpacker, 
null))) {
+                        unpacker.close();
+                    }
+                } catch (Throwable e) {
                     unpacker.close();
-                }
-            } catch (Throwable e) {
-                unpacker.close();
 
-                log.error("Failed to handle server notification 
[remoteAddress=" + cfg.getAddress() + "]: " + e.getMessage(), e);
-            }
-        });
+                    log.error("Failed to handle server notification 
[remoteAddress=" + cfg.getAddress() + "]: " + e.getMessage(), e);
+                }
+            });
+        } catch (Throwable t) {
+            unpacker.close();
+            throw t;
+        }
     }
 
     void checkTimeouts(long now) {
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
index a5834b52d9e..b306462360f 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/ConfigurationTest.java
@@ -30,8 +30,10 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
+import org.apache.ignite.client.IgniteClient.Builder;
 import org.apache.ignite.lang.IgniteException;
 import org.junit.jupiter.api.Test;
 
@@ -174,4 +176,24 @@ public class ConfigurationTest extends AbstractClientTest {
             executor.shutdown();
         }
     }
+
+    @Test
+    public void testAsyncContinuationExecutorException() {
+        AtomicLong reqId = new AtomicLong();
+
+        Builder clientBuilder = IgniteClient.builder()
+                .addresses("localhost:" + serverPort)
+                .asyncContinuationExecutor(command -> {
+                    if (reqId.incrementAndGet() > 2) {
+                        throw new RuntimeException("bad executor");
+                    }
+
+                    command.run();
+                });
+
+        try (var client = clientBuilder.build()) {
+            IgniteException ex = assertThrows(IgniteException.class, () -> 
client.tables().tables());
+            assertThat(ex.getMessage(), containsString("bad executor"));
+        }
+    }
 }

Reply via email to