This is an automated email from the ASF dual-hosted git repository.

mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new a6769a76a80 IGNITE-27855 Add JobExecutionContext#cancellationToken 
(#7922)
a6769a76a80 is described below

commit a6769a76a8042f3727f83a186df3c02661162a1b
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Fri Apr 3 16:50:01 2026 +0300

    IGNITE-27855 Add JobExecutionContext#cancellationToken (#7922)
---
 .../apache/ignite/compute/JobExecutionContext.java |  9 +++
 .../apache/ignite/client/fakes/FakeCompute.java    |  4 +-
 .../ignite/internal/compute/ItComputeBaseTest.java | 67 ++++++++++++++++++++++
 .../org/example/jobs/embedded/NestedSleepJob.java  | 40 +++++++++++++
 .../embedded/SqlQueryWithCancellationTokenJob.java | 46 +++++++++++++++
 .../example/jobs/standalone/NestedSleepJob.java    | 47 +++++++++++++++
 .../SqlQueryWithCancellationTokenJob.java          | 46 +++++++++++++++
 .../ignite/internal/compute/ComputeUtils.java      | 21 +++++++
 .../internal/compute/JobExecutionContextImpl.java  | 18 ++++--
 .../compute/executor/ComputeExecutorImpl.java      |  7 ++-
 .../compute/executor/JobExecutionInternal.java     | 16 ++++--
 .../internal/compute/queue/QueueExecutionImpl.java |  4 +-
 .../compute/task/TaskExecutionInternal.java        |  8 +--
 .../compute/JobExecutionContextImplTest.java       | 21 +++++--
 .../compute/executor/ComputeExecutorTest.java      | 35 +++++++++++
 15 files changed, 360 insertions(+), 29 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java 
b/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
index 5f275ea15ac..d645ab1ca04 100644
--- 
a/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
+++ 
b/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
@@ -20,6 +20,7 @@ package org.apache.ignite.compute;
 import java.util.Collection;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.deployment.DeploymentUnitInfo;
+import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.table.partition.Partition;
 import org.jetbrains.annotations.Nullable;
 
@@ -49,6 +50,14 @@ public interface JobExecutionContext {
      */
     @Nullable Partition partition();
 
+    /**
+     * Cancellation token that is canceled when the job is canceled. Can be 
passed to SQL queries and other operations
+     * that accept {@link CancellationToken} to propagate cancellation from 
the job to those operations.
+     *
+     * @return Cancellation token associated with this job.
+     */
+    CancellationToken cancellationToken();
+
     /**
      * Collection of deployment units associated with this job execution.
      *
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index cddd07268cc..e21975e3c0e 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -36,7 +36,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import org.apache.ignite.Ignite;
@@ -75,6 +74,7 @@ import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.PublicClusterNodeImpl;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.lang.CancelHandle;
 import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.marshalling.Marshaller;
 import org.apache.ignite.network.ClusterNode;
@@ -136,7 +136,7 @@ public class FakeCompute implements IgniteComputeInternal {
             Class<ComputeJob<Object, Object>> jobClass = 
ComputeUtils.jobClass(jobClassLoader, jobClassName);
             ComputeJob<Object, Object> job = 
ComputeUtils.instantiateJob(jobClass);
             CompletableFuture<Object> jobFut = job.executeAsync(
-                    new JobExecutionContextImpl(ignite, new AtomicBoolean(), 
jobClassLoader, null),
+                    new JobExecutionContextImpl(ignite, CancelHandle.create(), 
jobClassLoader, null),
                     SharedComputeUtils.unmarshalArg(executionContext.arg(), 
null, null));
 
             return jobExecution(jobFut != null ? jobFut : 
nullCompletedFuture());
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index a554ea93c7a..02043f31c41 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -37,6 +37,7 @@ import static 
org.apache.ignite.internal.testframework.matchers.JobStateMatcher.
 import static 
org.apache.ignite.lang.ErrorGroups.Compute.CLASS_INITIALIZATION_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_CANCELLED_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.both;
@@ -90,6 +91,7 @@ import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.TableNotFoundException;
 import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.sql.SqlException;
 import org.apache.ignite.table.QualifiedName;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.mapper.Mapper;
@@ -943,6 +945,57 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
         return unwrapIgniteImpl(node(0)).observableTimeTracker().get();
     }
 
+    @ParameterizedTest(name = "local: {0}")
+    @ValueSource(booleans = {true, false})
+    void cancellationTokenPropagatesSqlCancellation(boolean local) {
+        Ignite executeNode = local ? node(0) : node(1);
+
+        CancelHandle cancelHandle = CancelHandle.create();
+
+        // Submit a job that runs a long-running SQL query with 
context.cancellationToken().
+        // The SQL query registers on the token, so when the job is cancelled 
the query is canceled too.
+        JobExecution<Void> execution = submit(
+                JobTarget.node(clusterNode(executeNode)),
+                sqlQueryWithCancellationTokenJob(),
+                cancelHandle.token(),
+                null
+        );
+
+        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
+
+        // Cancel the job — the cancellation token propagates to the SQL query.
+        cancelHandle.cancel();
+
+        // The SQL query throws SqlException with EXECUTION_CANCELLED_ERR, 
which is treated as cancellation.
+        assertThat(execution.resultAsync(), 
willThrow(sqlCancelledException()));
+        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
+    }
+
+    @ParameterizedTest(name = "local: {0}")
+    @ValueSource(booleans = {true, false})
+    void nestedJobCancellationPropagatesToOuterJob(boolean local) {
+        Ignite executeNode = local ? node(0) : node(1);
+
+        CancelHandle cancelHandle = CancelHandle.create();
+
+        // The outer job submits a nested CancelAwareSleepJob with 
context.cancellationToken().
+        // When the outer job is canceled, the token propagates to the inner 
job.
+        JobExecution<Void> execution = submit(
+                JobTarget.node(clusterNode(executeNode)),
+                nestedSleepJob(),
+                cancelHandle.token(),
+                Long.MAX_VALUE
+        );
+
+        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
+
+        cancelHandle.cancel();
+
+        // The inner job throws CancellationException, which is wrapped as 
ComputeException(COMPUTE_JOB_CANCELLED_ERR).
+        // The outer job propagates this — isCancellationException recognizes 
the error code → CANCELED.
+        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
+    }
+
     JobDescriptor.Builder<Object, String> toStringJobBuilder() {
         return JobDescriptor.builder(jobClassName("ToStringJob"));
     }
@@ -967,6 +1020,10 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
         return JobDescriptor.<Long, 
Void>builder(jobClassName("SilentSleepJob")).units(units()).build();
     }
 
+    private JobDescriptor<Long, Void> nestedSleepJob() {
+        return JobDescriptor.<Long, 
Void>builder(jobClassName("NestedSleepJob")).units(units()).build();
+    }
+
     private JobDescriptor<Long, Void> cancelAwareSleepJob() {
         return JobDescriptor.<Long, 
Void>builder(jobClassName("CancelAwareSleepJob")).units(units()).build();
     }
@@ -991,6 +1048,10 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
         return JobDescriptor.<Void, 
String>builder(jobClassName("AsyncDelayedCompleteJob")).units(units()).build();
     }
 
+    private JobDescriptor<Void, Void> sqlQueryWithCancellationTokenJob() {
+        return JobDescriptor.<Void, 
Void>builder(jobClassName("SqlQueryWithCancellationTokenJob")).units(units()).build();
+    }
+
     private JobDescriptor<Tuple, Integer> tupleJob() {
         return JobDescriptor.<Tuple, 
Integer>builder(jobClassName("TupleJob")).units(units()).build();
     }
@@ -1019,4 +1080,10 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
                                 .or(instanceOf(CancellationException.class))
                 );
     }
+
+    private static Matcher<Exception> sqlCancelledException() {
+        return traceableException(SqlException.class)
+                .withCode(is(EXECUTION_CANCELLED_ERR))
+                .withMessage(is("The query was cancelled while executing."));
+    }
 }
diff --git 
a/modules/compute/src/integrationTest/java/org/example/jobs/embedded/NestedSleepJob.java
 
b/modules/compute/src/integrationTest/java/org/example/jobs/embedded/NestedSleepJob.java
new file mode 100644
index 00000000000..64455d3cab2
--- /dev/null
+++ 
b/modules/compute/src/integrationTest/java/org/example/jobs/embedded/NestedSleepJob.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.example.jobs.embedded;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.compute.JobTarget;
+
+/**
+ * Compute job that submits a nested {@link CancelAwareSleepJob} using the 
context's cancellation token.
+ * When this job is cancelled, the token propagates cancellation to the inner 
job.
+ */
+public class NestedSleepJob implements ComputeJob<Long, Void> {
+    @Override
+    public CompletableFuture<Void> executeAsync(JobExecutionContext context, 
Long timeout) {
+        return context.ignite().compute().executeAsync(
+                JobTarget.anyNode(context.ignite().clusterNodes()),
+                JobDescriptor.builder(CancelAwareSleepJob.class).build(),
+                timeout,
+                context.cancellationToken()
+        );
+    }
+}
diff --git 
a/modules/compute/src/integrationTest/java/org/example/jobs/embedded/SqlQueryWithCancellationTokenJob.java
 
b/modules/compute/src/integrationTest/java/org/example/jobs/embedded/SqlQueryWithCancellationTokenJob.java
new file mode 100644
index 00000000000..29a5105918f
--- /dev/null
+++ 
b/modules/compute/src/integrationTest/java/org/example/jobs/embedded/SqlQueryWithCancellationTokenJob.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.example.jobs.embedded;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.tx.Transaction;
+
+/** Compute job that executes a long-running SQL query with the context's 
cancellation token. */
+public class SqlQueryWithCancellationTokenJob implements ComputeJob<Void, 
Void> {
+    @Override
+    public CompletableFuture<Void> executeAsync(JobExecutionContext context, 
Void arg) {
+        // Execute a long-running SQL query using the job's cancellation token.
+        // When the job is cancelled, the cancellation token propagates to the 
SQL query.
+        return context.ignite().sql()
+                .executeAsync((Transaction) null, context.cancellationToken(),
+                        "SELECT * FROM system_range(0, 10000000000)")
+                .thenCompose(SqlQueryWithCancellationTokenJob::drainPages)
+                .thenApply(v -> null);
+    }
+
+    private static CompletableFuture<Void> drainPages(AsyncResultSet<?> rs) {
+        if (!rs.hasMorePages()) {
+            return rs.closeAsync();
+        }
+
+        return 
rs.fetchNextPage().thenCompose(SqlQueryWithCancellationTokenJob::drainPages);
+    }
+}
diff --git 
a/modules/compute/src/jobs/java/org/example/jobs/standalone/NestedSleepJob.java 
b/modules/compute/src/jobs/java/org/example/jobs/standalone/NestedSleepJob.java
new file mode 100644
index 00000000000..6ef22627048
--- /dev/null
+++ 
b/modules/compute/src/jobs/java/org/example/jobs/standalone/NestedSleepJob.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.example.jobs.standalone;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.deployment.DeploymentUnit;
+
+/**
+ * Compute job that submits a nested {@link CancelAwareSleepJob} using the 
context's cancellation token.
+ * When this job is canceled, the token propagates cancellation to the inner 
job.
+ */
+public class NestedSleepJob implements ComputeJob<Long, Void> {
+    @Override
+    public CompletableFuture<Void> executeAsync(JobExecutionContext context, 
Long timeout) {
+        List<DeploymentUnit> units = context.deploymentUnits().stream()
+                .map(info -> new DeploymentUnit(info.name(), info.version()))
+                .collect(Collectors.toList());
+
+        return context.ignite().compute().executeAsync(
+                JobTarget.anyNode(context.ignite().clusterNodes()),
+                
JobDescriptor.builder(CancelAwareSleepJob.class).units(units).build(),
+                timeout,
+                context.cancellationToken()
+        );
+    }
+}
diff --git 
a/modules/compute/src/jobs/java/org/example/jobs/standalone/SqlQueryWithCancellationTokenJob.java
 
b/modules/compute/src/jobs/java/org/example/jobs/standalone/SqlQueryWithCancellationTokenJob.java
new file mode 100644
index 00000000000..ebfd008f515
--- /dev/null
+++ 
b/modules/compute/src/jobs/java/org/example/jobs/standalone/SqlQueryWithCancellationTokenJob.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.example.jobs.standalone;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.tx.Transaction;
+
+/** Compute job that executes a long-running SQL query with the context's 
cancellation token. */
+public class SqlQueryWithCancellationTokenJob implements ComputeJob<Void, 
Void> {
+    @Override
+    public CompletableFuture<Void> executeAsync(JobExecutionContext context, 
Void arg) {
+        // Execute a long-running SQL query using the job's cancellation token.
+        // When the job is cancelled, the cancellation token propagates to the 
SQL query.
+        return context.ignite().sql()
+                .executeAsync((Transaction) null, context.cancellationToken(),
+                        "SELECT * FROM system_range(0, 10000000000)")
+                .thenCompose(SqlQueryWithCancellationTokenJob::drainPages)
+                .thenApply(v -> null);
+    }
+
+    private static CompletableFuture<Void> drainPages(AsyncResultSet<?> rs) {
+        if (!rs.hasMorePages()) {
+            return rs.closeAsync();
+        }
+
+        return 
rs.fetchNextPage().thenCompose(SqlQueryWithCancellationTokenJob::drainPages);
+    }
+}
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
index 3b16c59dd9f..3e24c708a14 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
@@ -24,6 +24,7 @@ import static 
org.apache.ignite.lang.ErrorGroups.Compute.CLASS_INITIALIZATION_ER
 import static 
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_CANCELLED_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Compute.MARSHALLING_TYPE_MISMATCH_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR;
 
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
@@ -52,6 +53,7 @@ import 
org.apache.ignite.internal.compute.message.JobStatesResponse;
 import org.apache.ignite.internal.deployunit.loader.UnitsClassLoader;
 import org.apache.ignite.lang.IgniteCheckedException;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.TraceableException;
 import org.apache.ignite.marshalling.Marshaller;
 import org.apache.ignite.table.DataStreamerReceiver;
 import org.apache.ignite.table.Tuple;
@@ -339,6 +341,25 @@ public class ComputeUtils {
         });
     }
 
+    /**
+     * Checks if the throwable indicates a cancellation. Recognizes {@link 
CancellationException}
+     * and domain-specific exceptions with cancellation error codes (SQL 
EXECUTION_CANCELLED_ERR, Compute COMPUTE_JOB_CANCELLED_ERR).
+     */
+    public static boolean isCancellationException(Throwable throwable) {
+        Throwable cause = unwrapCause(throwable);
+
+        if (cause instanceof CancellationException) {
+            return true;
+        }
+
+        if (cause instanceof TraceableException) {
+            int code = ((TraceableException) cause).code();
+            return code == EXECUTION_CANCELLED_ERR || code == 
COMPUTE_JOB_CANCELLED_ERR;
+        }
+
+        return false;
+    }
+
     private static Throwable mapToComputeException(Throwable origin) {
         if (origin instanceof IgniteException || origin instanceof 
IgniteCheckedException) {
             return origin;
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java
index 0e0f59da1e0..db41ab28026 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java
@@ -20,13 +20,14 @@ package org.apache.ignite.internal.compute;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.compute.JobExecutionContext;
 import org.apache.ignite.deployment.DeploymentUnitInfo;
 import org.apache.ignite.internal.deployunit.DisposableDeploymentUnit;
 import org.apache.ignite.internal.deployunit.loader.UnitsClassLoader;
 import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.lang.CancelHandle;
+import org.apache.ignite.lang.CancellationToken;
 import org.apache.ignite.table.partition.Partition;
 import org.jetbrains.annotations.Nullable;
 
@@ -36,7 +37,7 @@ import org.jetbrains.annotations.Nullable;
 public class JobExecutionContextImpl implements JobExecutionContext {
     private final Ignite ignite;
 
-    private final AtomicBoolean isInterrupted;
+    private final CancelHandle cancelHandle;
 
     private final UnitsClassLoader classLoader;
 
@@ -48,18 +49,18 @@ public class JobExecutionContextImpl implements 
JobExecutionContext {
      * Constructor.
      *
      * @param ignite Ignite instance.
-     * @param isInterrupted Interrupted flag.
+     * @param cancelHandle Cancel handle for this job.
      * @param classLoader Job class loader.
      * @param partition Partition associated with this job.
      */
     public JobExecutionContextImpl(
             Ignite ignite,
-            AtomicBoolean isInterrupted,
+            CancelHandle cancelHandle,
             UnitsClassLoader classLoader,
             @Nullable Partition partition
     ) {
         this.ignite = ignite;
-        this.isInterrupted = isInterrupted;
+        this.cancelHandle = cancelHandle;
         this.classLoader = classLoader;
         this.partition = partition;
         this.deploymentUnits = new Lazy<>(this::initDeploymentUnits);
@@ -72,7 +73,12 @@ public class JobExecutionContextImpl implements 
JobExecutionContext {
 
     @Override
     public boolean isCancelled() {
-        return isInterrupted.get();
+        return cancelHandle.isCancelled();
+    }
+
+    @Override
+    public CancellationToken cancellationToken() {
+        return cancelHandle.token();
     }
 
     @Override
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
index 0620f170081..2beecd72d76 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import org.apache.ignite.lang.CancelHandle;
 import org.apache.ignite.marshalling.Marshaller;
 import org.jetbrains.annotations.Nullable;
 
@@ -132,8 +133,8 @@ public class ComputeExecutorImpl implements ComputeExecutor 
{
         assert executorService != null;
 
         Ignite scopedIgnite = createIgniteForJob(arg);
-        AtomicBoolean isInterrupted = new AtomicBoolean();
-        JobExecutionContext context = new 
JobExecutionContextImpl(scopedIgnite, isInterrupted, classLoader, 
options.partition());
+        CancelHandle cancelHandle = CancelHandle.create();
+        JobExecutionContext context = new 
JobExecutionContextImpl(scopedIgnite, cancelHandle, classLoader, 
options.partition());
 
         metadataBuilder
                 .jobClassName(jobClassName)
@@ -151,7 +152,7 @@ public class ComputeExecutorImpl implements ComputeExecutor 
{
                 metadataBuilder
         );
 
-        return new JobExecutionInternal<>(execution, isInterrupted, null, 
false, localNode);
+        return new JobExecutionInternal<>(execution, cancelHandle, null, 
false, localNode);
     }
 
     /**
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
index cfb9f6c7d45..04ffbdafa01 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
@@ -18,12 +18,12 @@
 package org.apache.ignite.internal.compute.executor;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.compute.JobState;
 import org.apache.ignite.internal.compute.ComputeJobDataHolder;
 import org.apache.ignite.internal.compute.MarshallerProvider;
 import org.apache.ignite.internal.compute.queue.QueueExecution;
 import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.lang.CancelHandle;
 import org.apache.ignite.marshalling.Marshaller;
 import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
@@ -36,7 +36,7 @@ import org.jetbrains.annotations.Nullable;
 public class JobExecutionInternal<R> implements MarshallerProvider<R> {
     private final QueueExecution<ComputeJobDataHolder> execution;
 
-    private final AtomicBoolean isInterrupted;
+    private final CancelHandle cancelHandle;
 
     private final Marshaller<R, byte[]> marshaller;
 
@@ -48,20 +48,20 @@ public class JobExecutionInternal<R> implements 
MarshallerProvider<R> {
      * Constructor.
      *
      * @param execution Internal execution state.
-     * @param isInterrupted Flag which is passed to the execution context so 
that the job can check it for cancellation request.
+     * @param cancelHandle Cancel handle which is passed to the execution 
context so that the job can observe cancellation.
      * @param marshaller Result marshaller.
      * @param marshalResult Flag indicating whether the marshalling of the 
result will be needed.
      * @param localNode Local cluster node.
      */
     JobExecutionInternal(
             QueueExecution<ComputeJobDataHolder> execution,
-            AtomicBoolean isInterrupted,
+            CancelHandle cancelHandle,
             @Nullable Marshaller<R, byte[]> marshaller,
             boolean marshalResult,
             InternalClusterNode localNode
     ) {
         this.execution = execution;
-        this.isInterrupted = isInterrupted;
+        this.cancelHandle = cancelHandle;
         this.marshaller = marshaller;
         this.marshalResult = marshalResult;
 
@@ -80,10 +80,14 @@ public class JobExecutionInternal<R> implements 
MarshallerProvider<R> {
     /**
      * Cancel job execution.
      *
+     * <p>Initiates cancellation of operations registered on the job's 
cancellation token (e.g., SQL queries)
+     * and interrupts the worker thread as a fallback for jobs that don't use 
the token. Token cancellation
+     * is asynchronous and may complete after this method returns.
+     *
      * @return {@code true} if job was successfully cancelled.
      */
     public boolean cancel() {
-        isInterrupted.set(true);
+        cancelHandle.cancelAsync();
         return execution.cancel();
     }
 
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
index 9f49c39155f..7e06bc0e031 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
@@ -17,12 +17,12 @@
 
 package org.apache.ignite.internal.compute.queue;
 
+import static 
org.apache.ignite.internal.compute.ComputeUtils.isCancellationException;
 import static 
org.apache.ignite.internal.compute.events.ComputeEventsFactory.logJobCanceledEvent;
 import static 
org.apache.ignite.internal.compute.events.ComputeEventsFactory.logJobCancelingEvent;
 import static 
org.apache.ignite.internal.compute.events.ComputeEventsFactory.logJobCompletedEvent;
 import static 
org.apache.ignite.internal.compute.events.ComputeEventsFactory.logJobExecutingEvent;
 import static 
org.apache.ignite.internal.compute.events.ComputeEventsFactory.logJobFailedEvent;
-import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.apache.ignite.lang.ErrorGroups.Compute.QUEUE_OVERFLOW_ERR;
 
 import java.util.UUID;
@@ -210,7 +210,7 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
                 if (throwable instanceof QueueEntryCanceledException) {
                     logJobCanceledEvent(eventLog, eventMetadata);
                     result.completeExceptionally(new CancellationException());
-                } else if (unwrapCause(throwable) instanceof 
CancellationException) {
+                } else if (isCancellationException(throwable)) {
                     stateMachine.cancelJob(jobId);
                     logJobCanceledEvent(eventLog, eventMetadata);
                     result.completeExceptionally(throwable);
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
index dcd4f536f2f..171a0fc61a9 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
@@ -26,6 +26,7 @@ import static org.apache.ignite.compute.TaskStatus.EXECUTING;
 import static org.apache.ignite.compute.TaskStatus.FAILED;
 import static 
org.apache.ignite.internal.compute.ComputeUtils.getTaskSplitArgumentType;
 import static org.apache.ignite.internal.compute.ComputeUtils.instantiateTask;
+import static 
org.apache.ignite.internal.compute.ComputeUtils.isCancellationException;
 import static 
org.apache.ignite.internal.compute.ComputeUtils.unmarshalOrNotIfNull;
 import static 
org.apache.ignite.internal.compute.events.ComputeEventsFactory.logEvent;
 import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_CANCELED;
@@ -39,7 +40,6 @@ import static 
org.apache.ignite.internal.util.CompletableFutures.allOfToList;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
-import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 
 import java.time.Instant;
 import java.util.Arrays;
@@ -48,7 +48,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
@@ -191,7 +190,7 @@ public class TaskExecutionInternal<I, M, T, R> implements 
CancellableTaskExecuti
     }
 
     private void captureReduceSubmitFailure(Throwable throwable) {
-        TaskStatus status = unwrapCause(throwable) instanceof 
CancellationException ? CANCELED : FAILED;
+        TaskStatus status = isCancellationException(throwable) ? CANCELED : 
FAILED;
 
         if (status == CANCELED) {
             LOG.warn("Reduce job for task {} was cancelled.", taskId);
@@ -218,8 +217,7 @@ public class TaskExecutionInternal<I, M, T, R> implements 
CancellableTaskExecuti
             if (throwable == null) {
                 logEvent(eventLog, COMPUTE_TASK_COMPLETED, eventMetadata);
             } else {
-                IgniteEventType type = unwrapCause(throwable) instanceof 
CancellationException
-                        ? COMPUTE_TASK_CANCELED : COMPUTE_TASK_FAILED;
+                IgniteEventType type = isCancellationException(throwable) ? 
COMPUTE_TASK_CANCELED : COMPUTE_TASK_FAILED;
                 logEvent(eventLog, type, eventMetadata);
             }
         });
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java
index 9151ae6fdbd..8be09c718df 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java
@@ -19,12 +19,13 @@ package org.apache.ignite.internal.compute;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.sameInstance;
 
-import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.compute.JobExecutionContext;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.lang.CancelHandle;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.mockito.Mock;
@@ -37,21 +38,31 @@ class JobExecutionContextImplTest extends 
BaseIgniteAbstractTest {
 
     @Test
     void returnsIgnite() {
-        JobExecutionContext context = new JobExecutionContextImpl(ignite, new 
AtomicBoolean(), null, null);
+        JobExecutionContext context = new JobExecutionContextImpl(ignite, 
CancelHandle.create(), null, null);
 
         assertThat(context.ignite(), is(sameInstance(ignite)));
     }
 
     @Test
     void returnsInterruptedFlag() {
-        AtomicBoolean isInterrupted = new AtomicBoolean();
+        CancelHandle cancelHandle = CancelHandle.create();
 
-        JobExecutionContext context = new JobExecutionContextImpl(ignite, 
isInterrupted, null, null);
+        JobExecutionContext context = new JobExecutionContextImpl(ignite, 
cancelHandle, null, null);
 
         assertThat(context.isCancelled(), is(false));
 
-        isInterrupted.set(true);
+        cancelHandle.cancel();
 
         assertThat(context.isCancelled(), is(true));
     }
+
+    @Test
+    void returnsCancellationToken() {
+        CancelHandle cancelHandle = CancelHandle.create();
+
+        JobExecutionContext context = new JobExecutionContextImpl(ignite, 
cancelHandle, null, null);
+
+        assertThat(context.cancellationToken(), is(notNullValue()));
+        assertThat(context.cancellationToken(), 
is(sameInstance(cancelHandle.token())));
+    }
 }
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
index 7a64cbbbd30..34165c0b29d 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.hlc.TestClockService;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.lang.CancelHandleHelper;
 import org.apache.ignite.network.NetworkAddress;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
@@ -449,6 +450,40 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
         }
     }
 
+    @Test
+    void cancelAsyncJobViaCancellationToken() {
+        JobExecutionInternal<?> execution = 
executeJob(CancellationTokenJob.class);
+
+        JobState executingState = await().until(execution::state, 
jobStateWithStatus(EXECUTING));
+
+        // The job registers a cancel action on the cancellation token that 
completes the future.
+        // When cancel() is called, the token fires the action synchronously, 
completing the job with a result.
+        execution.cancel();
+
+        await().until(
+                execution::state,
+                jobStateWithStatusAndCreateTimeStartTime(COMPLETED, 
executingState.createTime(), executingState.startTime())
+        );
+
+        assertThat(execution.resultAsync().thenApply(h -> 
SharedComputeUtils.unmarshalResult(h, null, null)), willBe(42));
+    }
+
+    /** Async job that uses cancellationToken() to react to cancellation and 
complete the future. */
+    private static class CancellationTokenJob implements ComputeJob<Object[], 
Integer> {
+        @Override
+        public CompletableFuture<Integer> executeAsync(JobExecutionContext 
context, Object... args) {
+            CompletableFuture<Integer> result = new CompletableFuture<>();
+
+            CancelHandleHelper.addCancelAction(
+                    context.cancellationToken(),
+                    () -> result.complete(42),
+                    result
+            );
+
+            return result;
+        }
+    }
+
     private JobExecutionInternal<?> executeJob(Class<?> jobClass) {
         return executeJob(ExecutionOptions.DEFAULT, jobClass, null);
     }


Reply via email to