This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch ignite-24968-poc
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit c7d34f32db4b6785fa4d6aeedbc3e7bf4ac1f307
Author: Pavel Tupitsyn <[email protected]>
AuthorDate: Mon Apr 7 17:22:56 2025 +0300

    IGNITE-24968 Fix SQL cancellation
---
 .../ignite/internal/client/PayloadOutputChannel.java      | 15 +++++++++++++++
 .../apache/ignite/internal/client/TcpClientChannel.java   |  1 +
 .../org/apache/ignite/internal/client/sql/ClientSql.java  |  9 +++++----
 3 files changed, 21 insertions(+), 4 deletions(-)

diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
index 6c86281e49e..7e7a81327e9 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
@@ -32,6 +32,8 @@ public class PayloadOutputChannel implements AutoCloseable {
     /** Client request ID. */
     private final long requestId;
 
+    private Runnable onSent;
+
     /**
      * Constructor.
      *
@@ -77,4 +79,17 @@ public class PayloadOutputChannel implements AutoCloseable {
     public void close() {
         out.close();
     }
+
+    /**
+     * Sets the action to be executed when the payload is sent.
+     *
+     * @param onSent Action to be executed.
+     */
+    public void onSent(Runnable onSent) {
+        this.onSent = onSent;
+    }
+
+    public Runnable onSent() {
+        return onSent;
+    }
 }
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index a9c6e2b26c6..74a84b9b069 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -379,6 +379,7 @@ class TcpClientChannel implements ClientChannel, 
ClientMessageHandler, ClientCon
                     onDisconnected(ex);
                 } else {
                     metrics.requestsSentIncrement();
+                    payloadCh.onSent().run();
                 }
             });
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
index b693440988e..ceb2dd9e478 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
@@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
+import org.apache.ignite.internal.client.ClientChannel;
 import org.apache.ignite.internal.client.PayloadOutputChannel;
 import org.apache.ignite.internal.client.PayloadReader;
 import org.apache.ignite.internal.client.PayloadWriter;
@@ -264,7 +265,7 @@ public class ClientSql implements IgniteSql {
             w.out().packLong(ch.observableTimestamp().get().longValue());
 
             if (cancellationToken != null) {
-                addCancelAction(cancellationToken, w.requestId());
+                addCancelAction(cancellationToken, w.requestId(), 
w.clientChannel());
             }
         };
 
@@ -351,21 +352,21 @@ public class ClientSql implements IgniteSql {
             w.out().packLong(ch.observableTimestamp().get().longValue());
 
             if (cancellationToken != null) {
-                addCancelAction(cancellationToken, w.requestId());
+                w.onSent(() -> addCancelAction(cancellationToken, 
w.requestId(), w.clientChannel()));
             }
         };
 
         return ch.serviceAsync(ClientOp.SQL_EXEC_SCRIPT, payloadWriter, null);
     }
 
-    private void addCancelAction(CancellationToken cancellationToken, long 
correlationToken) {
+    private static void addCancelAction(CancellationToken cancellationToken, 
long correlationToken, ClientChannel reqCh) {
         CompletableFuture<Void> cancelFuture = new CompletableFuture<>();
 
         if (CancelHandleHelper.isCancelled(cancellationToken)) {
             throw new SqlException(Sql.EXECUTION_CANCELLED_ERR, "The query was 
cancelled while executing.");
         }
 
-        Runnable cancelAction = () -> 
ch.serviceAsync(ClientOp.SQL_CANCEL_EXEC, w -> 
w.out().packLong(correlationToken), null)
+        Runnable cancelAction = () -> 
reqCh.serviceAsync(ClientOp.SQL_CANCEL_EXEC, w -> 
w.out().packLong(correlationToken), null)
                 .whenComplete((r, e) -> {
                     if (e != null) {
                         cancelFuture.completeExceptionally(e);

Reply via email to