This is an automated email from the ASF dual-hosted git repository.
mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a6769a76a80 IGNITE-27855 Add JobExecutionContext#cancellationToken
(#7922)
a6769a76a80 is described below
commit a6769a76a8042f3727f83a186df3c02661162a1b
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Fri Apr 3 16:50:01 2026 +0300
IGNITE-27855 Add JobExecutionContext#cancellationToken (#7922)
---
.../apache/ignite/compute/JobExecutionContext.java | 9 +++
.../apache/ignite/client/fakes/FakeCompute.java | 4 +-
.../ignite/internal/compute/ItComputeBaseTest.java | 67 ++++++++++++++++++++++
.../org/example/jobs/embedded/NestedSleepJob.java | 40 +++++++++++++
.../embedded/SqlQueryWithCancellationTokenJob.java | 46 +++++++++++++++
.../example/jobs/standalone/NestedSleepJob.java | 47 +++++++++++++++
.../SqlQueryWithCancellationTokenJob.java | 46 +++++++++++++++
.../ignite/internal/compute/ComputeUtils.java | 21 +++++++
.../internal/compute/JobExecutionContextImpl.java | 18 ++++--
.../compute/executor/ComputeExecutorImpl.java | 7 ++-
.../compute/executor/JobExecutionInternal.java | 16 ++++--
.../internal/compute/queue/QueueExecutionImpl.java | 4 +-
.../compute/task/TaskExecutionInternal.java | 8 +--
.../compute/JobExecutionContextImplTest.java | 21 +++++--
.../compute/executor/ComputeExecutorTest.java | 35 +++++++++++
15 files changed, 360 insertions(+), 29 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
b/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
index 5f275ea15ac..d645ab1ca04 100644
---
a/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
+++
b/modules/api/src/main/java/org/apache/ignite/compute/JobExecutionContext.java
@@ -20,6 +20,7 @@ package org.apache.ignite.compute;
import java.util.Collection;
import org.apache.ignite.Ignite;
import org.apache.ignite.deployment.DeploymentUnitInfo;
+import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.table.partition.Partition;
import org.jetbrains.annotations.Nullable;
@@ -49,6 +50,14 @@ public interface JobExecutionContext {
*/
@Nullable Partition partition();
+ /**
+ * Cancellation token that is canceled when the job is canceled. Can be
passed to SQL queries and other operations
+ * that accept {@link CancellationToken} to propagate cancellation from
the job to those operations.
+ *
+ * @return Cancellation token associated with this job.
+ */
+ CancellationToken cancellationToken();
+
/**
* Collection of deployment units associated with this job execution.
*
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index cddd07268cc..e21975e3c0e 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -36,7 +36,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.ignite.Ignite;
@@ -75,6 +74,7 @@ import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.PublicClusterNodeImpl;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.lang.CancelHandle;
import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.marshalling.Marshaller;
import org.apache.ignite.network.ClusterNode;
@@ -136,7 +136,7 @@ public class FakeCompute implements IgniteComputeInternal {
Class<ComputeJob<Object, Object>> jobClass =
ComputeUtils.jobClass(jobClassLoader, jobClassName);
ComputeJob<Object, Object> job =
ComputeUtils.instantiateJob(jobClass);
CompletableFuture<Object> jobFut = job.executeAsync(
- new JobExecutionContextImpl(ignite, new AtomicBoolean(),
jobClassLoader, null),
+ new JobExecutionContextImpl(ignite, CancelHandle.create(),
jobClassLoader, null),
SharedComputeUtils.unmarshalArg(executionContext.arg(),
null, null));
return jobExecution(jobFut != null ? jobFut :
nullCompletedFuture());
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index a554ea93c7a..02043f31c41 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -37,6 +37,7 @@ import static
org.apache.ignite.internal.testframework.matchers.JobStateMatcher.
import static
org.apache.ignite.lang.ErrorGroups.Compute.CLASS_INITIALIZATION_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_CANCELLED_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.both;
@@ -90,6 +91,7 @@ import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.lang.TableNotFoundException;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.sql.SqlException;
import org.apache.ignite.table.QualifiedName;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.table.mapper.Mapper;
@@ -943,6 +945,57 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
return unwrapIgniteImpl(node(0)).observableTimeTracker().get();
}
+ @ParameterizedTest(name = "local: {0}")
+ @ValueSource(booleans = {true, false})
+ void cancellationTokenPropagatesSqlCancellation(boolean local) {
+ Ignite executeNode = local ? node(0) : node(1);
+
+ CancelHandle cancelHandle = CancelHandle.create();
+
+ // Submit a job that runs a long-running SQL query with
context.cancellationToken().
+ // The SQL query registers on the token, so when the job is cancelled
the query is canceled too.
+ JobExecution<Void> execution = submit(
+ JobTarget.node(clusterNode(executeNode)),
+ sqlQueryWithCancellationTokenJob(),
+ cancelHandle.token(),
+ null
+ );
+
+ await().until(execution::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
+
+ // Cancel the job — the cancellation token propagates to the SQL query.
+ cancelHandle.cancel();
+
+ // The SQL query throws SqlException with EXECUTION_CANCELLED_ERR,
which is treated as cancellation.
+ assertThat(execution.resultAsync(),
willThrow(sqlCancelledException()));
+ await().until(execution::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
+ }
+
+ @ParameterizedTest(name = "local: {0}")
+ @ValueSource(booleans = {true, false})
+ void nestedJobCancellationPropagatesToOuterJob(boolean local) {
+ Ignite executeNode = local ? node(0) : node(1);
+
+ CancelHandle cancelHandle = CancelHandle.create();
+
+ // The outer job submits a nested CancelAwareSleepJob with
context.cancellationToken().
+ // When the outer job is canceled, the token propagates to the inner
job.
+ JobExecution<Void> execution = submit(
+ JobTarget.node(clusterNode(executeNode)),
+ nestedSleepJob(),
+ cancelHandle.token(),
+ Long.MAX_VALUE
+ );
+
+ await().until(execution::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
+
+ cancelHandle.cancel();
+
+ // The inner job throws CancellationException, which is wrapped as
ComputeException(COMPUTE_JOB_CANCELLED_ERR).
+ // The outer job propagates this — isCancellationException recognizes
the error code → CANCELED.
+ await().until(execution::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
+ }
+
JobDescriptor.Builder<Object, String> toStringJobBuilder() {
return JobDescriptor.builder(jobClassName("ToStringJob"));
}
@@ -967,6 +1020,10 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
return JobDescriptor.<Long,
Void>builder(jobClassName("SilentSleepJob")).units(units()).build();
}
+ private JobDescriptor<Long, Void> nestedSleepJob() {
+ return JobDescriptor.<Long,
Void>builder(jobClassName("NestedSleepJob")).units(units()).build();
+ }
+
private JobDescriptor<Long, Void> cancelAwareSleepJob() {
return JobDescriptor.<Long,
Void>builder(jobClassName("CancelAwareSleepJob")).units(units()).build();
}
@@ -991,6 +1048,10 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
return JobDescriptor.<Void,
String>builder(jobClassName("AsyncDelayedCompleteJob")).units(units()).build();
}
+ private JobDescriptor<Void, Void> sqlQueryWithCancellationTokenJob() {
+ return JobDescriptor.<Void,
Void>builder(jobClassName("SqlQueryWithCancellationTokenJob")).units(units()).build();
+ }
+
private JobDescriptor<Tuple, Integer> tupleJob() {
return JobDescriptor.<Tuple,
Integer>builder(jobClassName("TupleJob")).units(units()).build();
}
@@ -1019,4 +1080,10 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
.or(instanceOf(CancellationException.class))
);
}
+
+ private static Matcher<Exception> sqlCancelledException() {
+ return traceableException(SqlException.class)
+ .withCode(is(EXECUTION_CANCELLED_ERR))
+ .withMessage(is("The query was cancelled while executing."));
+ }
}
diff --git
a/modules/compute/src/integrationTest/java/org/example/jobs/embedded/NestedSleepJob.java
b/modules/compute/src/integrationTest/java/org/example/jobs/embedded/NestedSleepJob.java
new file mode 100644
index 00000000000..64455d3cab2
--- /dev/null
+++
b/modules/compute/src/integrationTest/java/org/example/jobs/embedded/NestedSleepJob.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.example.jobs.embedded;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.compute.JobTarget;
+
+/**
+ * Compute job that submits a nested {@link CancelAwareSleepJob} using the
context's cancellation token.
+ * When this job is cancelled, the token propagates cancellation to the inner
job.
+ */
+public class NestedSleepJob implements ComputeJob<Long, Void> {
+ @Override
+ public CompletableFuture<Void> executeAsync(JobExecutionContext context,
Long timeout) {
+ return context.ignite().compute().executeAsync(
+ JobTarget.anyNode(context.ignite().clusterNodes()),
+ JobDescriptor.builder(CancelAwareSleepJob.class).build(),
+ timeout,
+ context.cancellationToken()
+ );
+ }
+}
diff --git
a/modules/compute/src/integrationTest/java/org/example/jobs/embedded/SqlQueryWithCancellationTokenJob.java
b/modules/compute/src/integrationTest/java/org/example/jobs/embedded/SqlQueryWithCancellationTokenJob.java
new file mode 100644
index 00000000000..29a5105918f
--- /dev/null
+++
b/modules/compute/src/integrationTest/java/org/example/jobs/embedded/SqlQueryWithCancellationTokenJob.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.example.jobs.embedded;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.tx.Transaction;
+
+/** Compute job that executes a long-running SQL query with the context's
cancellation token. */
+public class SqlQueryWithCancellationTokenJob implements ComputeJob<Void,
Void> {
+ @Override
+ public CompletableFuture<Void> executeAsync(JobExecutionContext context,
Void arg) {
+ // Execute a long-running SQL query using the job's cancellation token.
+ // When the job is cancelled, the cancellation token propagates to the
SQL query.
+ return context.ignite().sql()
+ .executeAsync((Transaction) null, context.cancellationToken(),
+ "SELECT * FROM system_range(0, 10000000000)")
+ .thenCompose(SqlQueryWithCancellationTokenJob::drainPages)
+ .thenApply(v -> null);
+ }
+
+ private static CompletableFuture<Void> drainPages(AsyncResultSet<?> rs) {
+ if (!rs.hasMorePages()) {
+ return rs.closeAsync();
+ }
+
+ return
rs.fetchNextPage().thenCompose(SqlQueryWithCancellationTokenJob::drainPages);
+ }
+}
diff --git
a/modules/compute/src/jobs/java/org/example/jobs/standalone/NestedSleepJob.java
b/modules/compute/src/jobs/java/org/example/jobs/standalone/NestedSleepJob.java
new file mode 100644
index 00000000000..6ef22627048
--- /dev/null
+++
b/modules/compute/src/jobs/java/org/example/jobs/standalone/NestedSleepJob.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.example.jobs.standalone;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobDescriptor;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.deployment.DeploymentUnit;
+
+/**
+ * Compute job that submits a nested {@link CancelAwareSleepJob} using the
context's cancellation token.
+ * When this job is canceled, the token propagates cancellation to the inner
job.
+ */
+public class NestedSleepJob implements ComputeJob<Long, Void> {
+ @Override
+ public CompletableFuture<Void> executeAsync(JobExecutionContext context,
Long timeout) {
+ List<DeploymentUnit> units = context.deploymentUnits().stream()
+ .map(info -> new DeploymentUnit(info.name(), info.version()))
+ .collect(Collectors.toList());
+
+ return context.ignite().compute().executeAsync(
+ JobTarget.anyNode(context.ignite().clusterNodes()),
+
JobDescriptor.builder(CancelAwareSleepJob.class).units(units).build(),
+ timeout,
+ context.cancellationToken()
+ );
+ }
+}
diff --git
a/modules/compute/src/jobs/java/org/example/jobs/standalone/SqlQueryWithCancellationTokenJob.java
b/modules/compute/src/jobs/java/org/example/jobs/standalone/SqlQueryWithCancellationTokenJob.java
new file mode 100644
index 00000000000..ebfd008f515
--- /dev/null
+++
b/modules/compute/src/jobs/java/org/example/jobs/standalone/SqlQueryWithCancellationTokenJob.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.example.jobs.standalone;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.sql.async.AsyncResultSet;
+import org.apache.ignite.tx.Transaction;
+
+/** Compute job that executes a long-running SQL query with the context's
cancellation token. */
+public class SqlQueryWithCancellationTokenJob implements ComputeJob<Void,
Void> {
+ @Override
+ public CompletableFuture<Void> executeAsync(JobExecutionContext context,
Void arg) {
+ // Execute a long-running SQL query using the job's cancellation token.
+ // When the job is cancelled, the cancellation token propagates to the
SQL query.
+ return context.ignite().sql()
+ .executeAsync((Transaction) null, context.cancellationToken(),
+ "SELECT * FROM system_range(0, 10000000000)")
+ .thenCompose(SqlQueryWithCancellationTokenJob::drainPages)
+ .thenApply(v -> null);
+ }
+
+ private static CompletableFuture<Void> drainPages(AsyncResultSet<?> rs) {
+ if (!rs.hasMorePages()) {
+ return rs.closeAsync();
+ }
+
+ return
rs.fetchNextPage().thenCompose(SqlQueryWithCancellationTokenJob::drainPages);
+ }
+}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
index 3b16c59dd9f..3e24c708a14 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
@@ -24,6 +24,7 @@ import static
org.apache.ignite.lang.ErrorGroups.Compute.CLASS_INITIALIZATION_ER
import static
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_CANCELLED_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Compute.MARSHALLING_TYPE_MISMATCH_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Sql.EXECUTION_CANCELLED_ERR;
import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
@@ -52,6 +53,7 @@ import
org.apache.ignite.internal.compute.message.JobStatesResponse;
import org.apache.ignite.internal.deployunit.loader.UnitsClassLoader;
import org.apache.ignite.lang.IgniteCheckedException;
import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.lang.TraceableException;
import org.apache.ignite.marshalling.Marshaller;
import org.apache.ignite.table.DataStreamerReceiver;
import org.apache.ignite.table.Tuple;
@@ -339,6 +341,25 @@ public class ComputeUtils {
});
}
+ /**
+ * Checks if the throwable indicates a cancellation. Recognizes {@link
CancellationException}
+ * and domain-specific exceptions with cancellation error codes (SQL
EXECUTION_CANCELLED_ERR, Compute COMPUTE_JOB_CANCELLED_ERR).
+ */
+ public static boolean isCancellationException(Throwable throwable) {
+ Throwable cause = unwrapCause(throwable);
+
+ if (cause instanceof CancellationException) {
+ return true;
+ }
+
+ if (cause instanceof TraceableException) {
+ int code = ((TraceableException) cause).code();
+ return code == EXECUTION_CANCELLED_ERR || code ==
COMPUTE_JOB_CANCELLED_ERR;
+ }
+
+ return false;
+ }
+
private static Throwable mapToComputeException(Throwable origin) {
if (origin instanceof IgniteException || origin instanceof
IgniteCheckedException) {
return origin;
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java
index 0e0f59da1e0..db41ab28026 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionContextImpl.java
@@ -20,13 +20,14 @@ package org.apache.ignite.internal.compute;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.deployment.DeploymentUnitInfo;
import org.apache.ignite.internal.deployunit.DisposableDeploymentUnit;
import org.apache.ignite.internal.deployunit.loader.UnitsClassLoader;
import org.apache.ignite.internal.util.Lazy;
+import org.apache.ignite.lang.CancelHandle;
+import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.table.partition.Partition;
import org.jetbrains.annotations.Nullable;
@@ -36,7 +37,7 @@ import org.jetbrains.annotations.Nullable;
public class JobExecutionContextImpl implements JobExecutionContext {
private final Ignite ignite;
- private final AtomicBoolean isInterrupted;
+ private final CancelHandle cancelHandle;
private final UnitsClassLoader classLoader;
@@ -48,18 +49,18 @@ public class JobExecutionContextImpl implements
JobExecutionContext {
* Constructor.
*
* @param ignite Ignite instance.
- * @param isInterrupted Interrupted flag.
+ * @param cancelHandle Cancel handle for this job.
* @param classLoader Job class loader.
* @param partition Partition associated with this job.
*/
public JobExecutionContextImpl(
Ignite ignite,
- AtomicBoolean isInterrupted,
+ CancelHandle cancelHandle,
UnitsClassLoader classLoader,
@Nullable Partition partition
) {
this.ignite = ignite;
- this.isInterrupted = isInterrupted;
+ this.cancelHandle = cancelHandle;
this.classLoader = classLoader;
this.partition = partition;
this.deploymentUnits = new Lazy<>(this::initDeploymentUnits);
@@ -72,7 +73,12 @@ public class JobExecutionContextImpl implements
JobExecutionContext {
@Override
public boolean isCancelled() {
- return isInterrupted.get();
+ return cancelHandle.isCancelled();
+ }
+
+ @Override
+ public CancellationToken cancellationToken() {
+ return cancelHandle.token();
}
@Override
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
index 0620f170081..2beecd72d76 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
@@ -62,6 +62,7 @@ import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
+import org.apache.ignite.lang.CancelHandle;
import org.apache.ignite.marshalling.Marshaller;
import org.jetbrains.annotations.Nullable;
@@ -132,8 +133,8 @@ public class ComputeExecutorImpl implements ComputeExecutor
{
assert executorService != null;
Ignite scopedIgnite = createIgniteForJob(arg);
- AtomicBoolean isInterrupted = new AtomicBoolean();
- JobExecutionContext context = new
JobExecutionContextImpl(scopedIgnite, isInterrupted, classLoader,
options.partition());
+ CancelHandle cancelHandle = CancelHandle.create();
+ JobExecutionContext context = new
JobExecutionContextImpl(scopedIgnite, cancelHandle, classLoader,
options.partition());
metadataBuilder
.jobClassName(jobClassName)
@@ -151,7 +152,7 @@ public class ComputeExecutorImpl implements ComputeExecutor
{
metadataBuilder
);
- return new JobExecutionInternal<>(execution, isInterrupted, null,
false, localNode);
+ return new JobExecutionInternal<>(execution, cancelHandle, null,
false, localNode);
}
/**
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
index cfb9f6c7d45..04ffbdafa01 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/JobExecutionInternal.java
@@ -18,12 +18,12 @@
package org.apache.ignite.internal.compute.executor;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.compute.JobState;
import org.apache.ignite.internal.compute.ComputeJobDataHolder;
import org.apache.ignite.internal.compute.MarshallerProvider;
import org.apache.ignite.internal.compute.queue.QueueExecution;
import org.apache.ignite.internal.network.InternalClusterNode;
+import org.apache.ignite.lang.CancelHandle;
import org.apache.ignite.marshalling.Marshaller;
import org.apache.ignite.network.ClusterNode;
import org.jetbrains.annotations.Nullable;
@@ -36,7 +36,7 @@ import org.jetbrains.annotations.Nullable;
public class JobExecutionInternal<R> implements MarshallerProvider<R> {
private final QueueExecution<ComputeJobDataHolder> execution;
- private final AtomicBoolean isInterrupted;
+ private final CancelHandle cancelHandle;
private final Marshaller<R, byte[]> marshaller;
@@ -48,20 +48,20 @@ public class JobExecutionInternal<R> implements
MarshallerProvider<R> {
* Constructor.
*
* @param execution Internal execution state.
- * @param isInterrupted Flag which is passed to the execution context so
that the job can check it for cancellation request.
+ * @param cancelHandle Cancel handle which is passed to the execution
context so that the job can observe cancellation.
* @param marshaller Result marshaller.
* @param marshalResult Flag indicating whether the marshalling of the
result will be needed.
* @param localNode Local cluster node.
*/
JobExecutionInternal(
QueueExecution<ComputeJobDataHolder> execution,
- AtomicBoolean isInterrupted,
+ CancelHandle cancelHandle,
@Nullable Marshaller<R, byte[]> marshaller,
boolean marshalResult,
InternalClusterNode localNode
) {
this.execution = execution;
- this.isInterrupted = isInterrupted;
+ this.cancelHandle = cancelHandle;
this.marshaller = marshaller;
this.marshalResult = marshalResult;
@@ -80,10 +80,14 @@ public class JobExecutionInternal<R> implements
MarshallerProvider<R> {
/**
* Cancel job execution.
*
+ * <p>Initiates cancellation of operations registered on the job's
cancellation token (e.g., SQL queries)
+ * and interrupts the worker thread as a fallback for jobs that don't use
the token. Token cancellation
+ * is asynchronous and may complete after this method returns.
+ *
* @return {@code true} if job was successfully cancelled.
*/
public boolean cancel() {
- isInterrupted.set(true);
+ cancelHandle.cancelAsync();
return execution.cancel();
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
index 9f49c39155f..7e06bc0e031 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
@@ -17,12 +17,12 @@
package org.apache.ignite.internal.compute.queue;
+import static
org.apache.ignite.internal.compute.ComputeUtils.isCancellationException;
import static
org.apache.ignite.internal.compute.events.ComputeEventsFactory.logJobCanceledEvent;
import static
org.apache.ignite.internal.compute.events.ComputeEventsFactory.logJobCancelingEvent;
import static
org.apache.ignite.internal.compute.events.ComputeEventsFactory.logJobCompletedEvent;
import static
org.apache.ignite.internal.compute.events.ComputeEventsFactory.logJobExecutingEvent;
import static
org.apache.ignite.internal.compute.events.ComputeEventsFactory.logJobFailedEvent;
-import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.lang.ErrorGroups.Compute.QUEUE_OVERFLOW_ERR;
import java.util.UUID;
@@ -210,7 +210,7 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
if (throwable instanceof QueueEntryCanceledException) {
logJobCanceledEvent(eventLog, eventMetadata);
result.completeExceptionally(new CancellationException());
- } else if (unwrapCause(throwable) instanceof
CancellationException) {
+ } else if (isCancellationException(throwable)) {
stateMachine.cancelJob(jobId);
logJobCanceledEvent(eventLog, eventMetadata);
result.completeExceptionally(throwable);
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
index dcd4f536f2f..171a0fc61a9 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
@@ -26,6 +26,7 @@ import static org.apache.ignite.compute.TaskStatus.EXECUTING;
import static org.apache.ignite.compute.TaskStatus.FAILED;
import static
org.apache.ignite.internal.compute.ComputeUtils.getTaskSplitArgumentType;
import static org.apache.ignite.internal.compute.ComputeUtils.instantiateTask;
+import static
org.apache.ignite.internal.compute.ComputeUtils.isCancellationException;
import static
org.apache.ignite.internal.compute.ComputeUtils.unmarshalOrNotIfNull;
import static
org.apache.ignite.internal.compute.events.ComputeEventsFactory.logEvent;
import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_CANCELED;
@@ -39,7 +40,6 @@ import static
org.apache.ignite.internal.util.CompletableFutures.allOfToList;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
-import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import java.time.Instant;
import java.util.Arrays;
@@ -48,7 +48,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
-import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
@@ -191,7 +190,7 @@ public class TaskExecutionInternal<I, M, T, R> implements
CancellableTaskExecuti
}
private void captureReduceSubmitFailure(Throwable throwable) {
- TaskStatus status = unwrapCause(throwable) instanceof
CancellationException ? CANCELED : FAILED;
+ TaskStatus status = isCancellationException(throwable) ? CANCELED :
FAILED;
if (status == CANCELED) {
LOG.warn("Reduce job for task {} was cancelled.", taskId);
@@ -218,8 +217,7 @@ public class TaskExecutionInternal<I, M, T, R> implements
CancellableTaskExecuti
if (throwable == null) {
logEvent(eventLog, COMPUTE_TASK_COMPLETED, eventMetadata);
} else {
- IgniteEventType type = unwrapCause(throwable) instanceof
CancellationException
- ? COMPUTE_TASK_CANCELED : COMPUTE_TASK_FAILED;
+ IgniteEventType type = isCancellationException(throwable) ?
COMPUTE_TASK_CANCELED : COMPUTE_TASK_FAILED;
logEvent(eventLog, type, eventMetadata);
}
});
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java
index 9151ae6fdbd..8be09c718df 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/JobExecutionContextImplTest.java
@@ -19,12 +19,13 @@ package org.apache.ignite.internal.compute;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.sameInstance;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.Ignite;
import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.lang.CancelHandle;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
@@ -37,21 +38,31 @@ class JobExecutionContextImplTest extends
BaseIgniteAbstractTest {
@Test
void returnsIgnite() {
- JobExecutionContext context = new JobExecutionContextImpl(ignite, new
AtomicBoolean(), null, null);
+ JobExecutionContext context = new JobExecutionContextImpl(ignite,
CancelHandle.create(), null, null);
assertThat(context.ignite(), is(sameInstance(ignite)));
}
@Test
void returnsInterruptedFlag() {
- AtomicBoolean isInterrupted = new AtomicBoolean();
+ CancelHandle cancelHandle = CancelHandle.create();
- JobExecutionContext context = new JobExecutionContextImpl(ignite,
isInterrupted, null, null);
+ JobExecutionContext context = new JobExecutionContextImpl(ignite,
cancelHandle, null, null);
assertThat(context.isCancelled(), is(false));
- isInterrupted.set(true);
+ cancelHandle.cancel();
assertThat(context.isCancelled(), is(true));
}
+
+ @Test
+ void returnsCancellationToken() {
+ CancelHandle cancelHandle = CancelHandle.create();
+
+ JobExecutionContext context = new JobExecutionContextImpl(ignite,
cancelHandle, null, null);
+
+ assertThat(context.cancellationToken(), is(notNullValue()));
+ assertThat(context.cancellationToken(),
is(sameInstance(cancelHandle.token())));
+ }
}
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
index 7a64cbbbd30..34165c0b29d 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
@@ -60,6 +60,7 @@ import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.lang.CancelHandleHelper;
import org.apache.ignite.network.NetworkAddress;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
@@ -449,6 +450,40 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
}
}
+ @Test
+ void cancelAsyncJobViaCancellationToken() {
+ JobExecutionInternal<?> execution =
executeJob(CancellationTokenJob.class);
+
+ JobState executingState = await().until(execution::state,
jobStateWithStatus(EXECUTING));
+
+ // The job registers a cancel action on the cancellation token that
completes the future.
+ // When cancel() is called, the token fires the action synchronously,
completing the job with a result.
+ execution.cancel();
+
+ await().until(
+ execution::state,
+ jobStateWithStatusAndCreateTimeStartTime(COMPLETED,
executingState.createTime(), executingState.startTime())
+ );
+
+ assertThat(execution.resultAsync().thenApply(h ->
SharedComputeUtils.unmarshalResult(h, null, null)), willBe(42));
+ }
+
+ /** Async job that uses cancellationToken() to react to cancellation and
complete the future. */
+ private static class CancellationTokenJob implements ComputeJob<Object[],
Integer> {
+ @Override
+ public CompletableFuture<Integer> executeAsync(JobExecutionContext
context, Object... args) {
+ CompletableFuture<Integer> result = new CompletableFuture<>();
+
+ CancelHandleHelper.addCancelAction(
+ context.cancellationToken(),
+ () -> result.complete(42),
+ result
+ );
+
+ return result;
+ }
+ }
+
private JobExecutionInternal<?> executeJob(Class<?> jobClass) {
return executeJob(ExecutionOptions.DEFAULT, jobClass, null);
}