This is an automated email from the ASF dual-hosted git repository. apkhmv pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 7a3fb06ad8 IGNITE-21422 Cancelling queued job should cancel result (#3141) 7a3fb06ad8 is described below commit 7a3fb06ad8dca34ff63499bbef53b5343b9764b9 Author: Vadim Pakhnushev <8614891+valep...@users.noreply.github.com> AuthorDate: Fri Feb 2 14:50:04 2024 +0300 IGNITE-21422 Cancelling queued job should cancel result (#3141) --- .../apache/ignite/internal/compute/queue/QueueExecutionImpl.java | 7 +++++-- .../ignite/internal/compute/queue/PriorityQueueExecutorTest.java | 9 +++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java index 02f195fd08..087fa95fb3 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java @@ -91,8 +91,11 @@ class QueueExecutionImpl<R> implements QueueExecution<R> { QueueEntry<R> queueEntry = this.queueEntry.get(); if (queueEntry != null) { - executor.remove(queueEntry); - queueEntry.interrupt(); + if (executor.remove(queueEntry)) { + result.cancel(true); + } else { + queueEntry.interrupt(); + } return true; } } catch (IllegalJobStateTransition e) { diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java index a062e0055a..438af26d17 100644 --- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java @@ -24,6 +24,7 @@ import static org.apache.ignite.compute.JobState.FAILED; import static org.apache.ignite.compute.JobState.QUEUED; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState; import static org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithStateAndCreateTimeStartTime; @@ -32,6 +33,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -242,6 +244,7 @@ public class PriorityQueueExecutorTest extends BaseIgniteAbstractTest { execution::status, jobStatusWithStateAndCreateTimeStartTime(CANCELED, executingStatus.createTime(), executingStatus.startTime()) ); + assertThat(execution.resultAsync(), willBe(0)); } @Test @@ -262,6 +265,7 @@ public class PriorityQueueExecutorTest extends BaseIgniteAbstractTest { execution::status, jobStatusWithStateAndCreateTimeStartTime(CANCELED, executingStatus.createTime(), executingStatus.startTime()) ); + assertThat(execution.resultAsync(), willThrow(InterruptedException.class)); } @Test @@ -274,7 +278,8 @@ public class PriorityQueueExecutorTest extends BaseIgniteAbstractTest { assertThat(execution.cancel(), is(false)); - assertThat(execution.status().state(), is(COMPLETED)); + assertThat(execution.status(), is(jobStatusWithState(COMPLETED))); + assertThat(execution.resultAsync(), willBe(0)); } @Test @@ -302,7 +307,7 @@ public class PriorityQueueExecutorTest extends BaseIgniteAbstractTest { latch.countDown(); // And check that the canceled task was removed from the queue and never executed - assertThat(execution.resultAsync(), willTimeoutIn(100, TimeUnit.MILLISECONDS)); + assertThat(execution.resultAsync(), willThrow(CancellationException.class)); } @Test