This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 7a3fb06ad8 IGNITE-21422 Cancelling queued job should cancel result 
(#3141)
7a3fb06ad8 is described below

commit 7a3fb06ad8dca34ff63499bbef53b5343b9764b9
Author: Vadim Pakhnushev <8614891+valep...@users.noreply.github.com>
AuthorDate: Fri Feb 2 14:50:04 2024 +0300

    IGNITE-21422 Cancelling queued job should cancel result (#3141)
---
 .../apache/ignite/internal/compute/queue/QueueExecutionImpl.java | 7 +++++--
 .../ignite/internal/compute/queue/PriorityQueueExecutorTest.java | 9 +++++++--
 2 files changed, 12 insertions(+), 4 deletions(-)

diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
index 02f195fd08..087fa95fb3 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
@@ -91,8 +91,11 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
 
             QueueEntry<R> queueEntry = this.queueEntry.get();
             if (queueEntry != null) {
-                executor.remove(queueEntry);
-                queueEntry.interrupt();
+                if (executor.remove(queueEntry)) {
+                    result.cancel(true);
+                } else {
+                    queueEntry.interrupt();
+                }
                 return true;
             }
         } catch (IllegalJobStateTransition e) {
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
index a062e0055a..438af26d17 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
@@ -24,6 +24,7 @@ import static org.apache.ignite.compute.JobState.FAILED;
 import static org.apache.ignite.compute.JobState.QUEUED;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutIn;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState;
 import static 
org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithStateAndCreateTimeStartTime;
@@ -32,6 +33,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -242,6 +244,7 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
                 execution::status,
                 jobStatusWithStateAndCreateTimeStartTime(CANCELED, 
executingStatus.createTime(), executingStatus.startTime())
         );
+        assertThat(execution.resultAsync(), willBe(0));
     }
 
     @Test
@@ -262,6 +265,7 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
                 execution::status,
                 jobStatusWithStateAndCreateTimeStartTime(CANCELED, 
executingStatus.createTime(), executingStatus.startTime())
         );
+        assertThat(execution.resultAsync(), 
willThrow(InterruptedException.class));
     }
 
     @Test
@@ -274,7 +278,8 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
 
         assertThat(execution.cancel(), is(false));
 
-        assertThat(execution.status().state(), is(COMPLETED));
+        assertThat(execution.status(), is(jobStatusWithState(COMPLETED)));
+        assertThat(execution.resultAsync(), willBe(0));
     }
 
     @Test
@@ -302,7 +307,7 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
         latch.countDown();
 
         // And check that the canceled task was removed from the queue and 
never executed
-        assertThat(execution.resultAsync(), willTimeoutIn(100, 
TimeUnit.MILLISECONDS));
+        assertThat(execution.resultAsync(), 
willThrow(CancellationException.class));
     }
 
     @Test

Reply via email to