This is an automated email from the ASF dual-hosted git repository. ptupitsyn pushed a commit to branch ignite-24968-poc in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit c7d34f32db4b6785fa4d6aeedbc3e7bf4ac1f307 Author: Pavel Tupitsyn <[email protected]> AuthorDate: Mon Apr 7 17:22:56 2025 +0300 IGNITE-24968 Fix SQL cancellation --- .../ignite/internal/client/PayloadOutputChannel.java | 15 +++++++++++++++ .../apache/ignite/internal/client/TcpClientChannel.java | 1 + .../org/apache/ignite/internal/client/sql/ClientSql.java | 9 +++++---- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java index 6c86281e49e..7e7a81327e9 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java @@ -32,6 +32,8 @@ public class PayloadOutputChannel implements AutoCloseable { /** Client request ID. */ private final long requestId; + private Runnable onSent; + /** * Constructor. * @@ -77,4 +79,17 @@ public class PayloadOutputChannel implements AutoCloseable { public void close() { out.close(); } + + /** + * Sets the action to be executed when the payload is sent. + * + * @param onSent Action to be executed. + */ + public void onSent(Runnable onSent) { + this.onSent = onSent; + } + + public Runnable onSent() { + return onSent; + } } diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java index a9c6e2b26c6..74a84b9b069 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java @@ -379,6 +379,7 @@ class TcpClientChannel implements ClientChannel, ClientMessageHandler, ClientCon onDisconnected(ex); } else { metrics.requestsSentIncrement(); + payloadCh.onSent().run(); } }); diff --git a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java index b693440988e..ceb2dd9e478 100644 --- a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java +++ b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java @@ -28,6 +28,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder; +import org.apache.ignite.internal.client.ClientChannel; import org.apache.ignite.internal.client.PayloadOutputChannel; import org.apache.ignite.internal.client.PayloadReader; import org.apache.ignite.internal.client.PayloadWriter; @@ -264,7 +265,7 @@ public class ClientSql implements IgniteSql { w.out().packLong(ch.observableTimestamp().get().longValue()); if (cancellationToken != null) { - addCancelAction(cancellationToken, w.requestId()); + addCancelAction(cancellationToken, w.requestId(), w.clientChannel()); } }; @@ -351,21 +352,21 @@ public class ClientSql implements IgniteSql { w.out().packLong(ch.observableTimestamp().get().longValue()); if (cancellationToken != null) { - addCancelAction(cancellationToken, w.requestId()); + w.onSent(() -> addCancelAction(cancellationToken, w.requestId(), w.clientChannel())); } }; return ch.serviceAsync(ClientOp.SQL_EXEC_SCRIPT, payloadWriter, null); } - private void addCancelAction(CancellationToken cancellationToken, long correlationToken) { + private static void addCancelAction(CancellationToken cancellationToken, long correlationToken, ClientChannel reqCh) { CompletableFuture<Void> cancelFuture = new CompletableFuture<>(); if (CancelHandleHelper.isCancelled(cancellationToken)) { throw new SqlException(Sql.EXECUTION_CANCELLED_ERR, "The query was cancelled while executing."); } - Runnable cancelAction = () -> ch.serviceAsync(ClientOp.SQL_CANCEL_EXEC, w -> w.out().packLong(correlationToken), null) + Runnable cancelAction = () -> reqCh.serviceAsync(ClientOp.SQL_CANCEL_EXEC, w -> w.out().packLong(correlationToken), null) .whenComplete((r, e) -> { if (e != null) { cancelFuture.completeExceptionally(e);
