This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8373582f12 IGNITE-23696 Fix return result for remote execution (#4736)
8373582f12 is described below

commit 8373582f12f822484214d853c3ed658f7afd207f
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Wed Nov 20 15:50:27 2024 +0300

    IGNITE-23696 Fix return result for remote execution (#4736)
---
 .../ignite/internal/compute/ItComputeBaseTest.java | 97 ++++++++++++++--------
 .../compute/{SleepJob.java => SilentSleepJob.java} |  8 +-
 .../apache/ignite/internal/compute/SleepJob.java   |  2 +-
 .../internal/compute/FailSafeJobExecution.java     |  1 -
 .../internal/compute/queue/QueueExecutionImpl.java |  4 +-
 .../compute/queue/PriorityQueueExecutorTest.java   |  2 +-
 6 files changed, 71 insertions(+), 43 deletions(-)

diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index 222b41c347..30e4a31922 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -37,12 +37,14 @@ import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.in;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
@@ -69,6 +71,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 /**
  * Base integration tests for Compute functionality. To add new compute job 
for testing both in embedded and standalone mode, add the
@@ -469,24 +472,72 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
         assertThat(result, is(sumOfNodeNamesLengths));
     }
 
-    @Test
-    void cancelsJobLocally() {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void cancelsJob(boolean local) {
         Ignite entryNode = node(0);
+        Ignite executeNode = local ? node(0) : node(1);
 
+        // This job catches the interruption and throws a RuntimeException
         JobDescriptor<Long, Void> job = 
JobDescriptor.builder(SleepJob.class).units(units()).build();
-        JobExecution<Void> execution = 
entryNode.compute().submit(JobTarget.node(clusterNode(entryNode)), job, 
Long.MAX_VALUE);
+        JobExecution<Void> execution = 
entryNode.compute().submit(JobTarget.node(clusterNode(executeNode)), job, 
Long.MAX_VALUE);
 
         await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
 
         assertThat(execution.cancelAsync(), willBe(true));
 
+        CompletionException completionException = 
assertThrows(CompletionException.class, () -> execution.resultAsync().join());
+
+        // Unwrap CompletionException, ComputeException should be the cause 
thrown from the API
+        assertThat(completionException.getCause(), 
instanceOf(ComputeException.class));
+        ComputeException computeException = (ComputeException) 
completionException.getCause();
+
+        // ComputeException should be caused by the RuntimeException thrown 
from the SleepJob
+        assertThat(computeException.getCause(), 
instanceOf(RuntimeException.class));
+        RuntimeException runtimeException = (RuntimeException) 
computeException.getCause();
+
+        // RuntimeException is thrown when SleepJob catches the 
InterruptedException
+        assertThat(runtimeException.getCause(), 
instanceOf(InterruptedException.class));
+        assertThat(runtimeException.getCause().getCause(), is(nullValue()));
+
         await().until(execution::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
     }
 
-    @Test
-    void cancelsQueuedJobLocally() {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void cancelsNotCancellableJob(boolean local) {
         Ignite entryNode = node(0);
-        var nodes = JobTarget.node(clusterNode(entryNode));
+        Ignite executeNode = local ? node(0) : node(1);
+
+        // This job catches the interruption and returns normally
+        JobDescriptor<Long, Void> job = 
JobDescriptor.builder(SilentSleepJob.class).units(units()).build();
+        JobExecution<Void> execution = 
entryNode.compute().submit(JobTarget.node(clusterNode(executeNode)), job, 
Long.MAX_VALUE);
+
+        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
+
+        assertThat(execution.cancelAsync(), willBe(true));
+
+        CompletionException completionException = 
assertThrows(CompletionException.class, () -> execution.resultAsync().join());
+
+        // Unwrap CompletionException, ComputeException should be the cause 
thrown from the API
+        assertThat(completionException.getCause(), 
instanceOf(ComputeException.class));
+        ComputeException computeException = (ComputeException) 
completionException.getCause();
+
+        // ComputeException should be caused by the CancellationException 
thrown from the executor which detects that the job completes,
+        // but was previously cancelled
+        assertThat(computeException.getCause(), 
instanceOf(CancellationException.class));
+        CancellationException cancellationException = (CancellationException) 
computeException.getCause();
+        assertThat(cancellationException.getCause(), is(nullValue()));
+
+        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
+    }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void cancelsQueuedJob(boolean local) {
+        Ignite entryNode = node(0);
+        Ignite executeNode = local ? node(0) : node(1);
+        var nodes = JobTarget.node(clusterNode(executeNode));
 
         JobDescriptor<Long, Void> job = 
JobDescriptor.builder(SleepJob.class).units(units()).build();
 
@@ -510,38 +561,14 @@ public abstract class ItComputeBaseTest extends 
ClusterPerClassIntegrationTest {
         await().until(execution1::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
     }
 
-    @Test
-    void cancelsJobRemotely() {
-        Ignite entryNode = node(0);
-
-        JobDescriptor<Long, Void> job = 
JobDescriptor.builder(SleepJob.class).units(units()).build();
-        JobExecution<Void> execution = 
entryNode.compute().submit(JobTarget.node(clusterNode(node(1))), job, 
Long.MAX_VALUE);
-
-        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
-
-        assertThat(execution.cancelAsync(), willBe(true));
-
-        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(CANCELED)));
-    }
-
-    @Test
-    void changeExecutingJobPriorityLocally() {
-        Ignite entryNode = node(0);
-
-        JobDescriptor<Long, Void> job = 
JobDescriptor.builder(SleepJob.class).units(units()).build();
-        JobExecution<Void> execution = 
entryNode.compute().submit(JobTarget.node(clusterNode(entryNode)), job, 
Long.MAX_VALUE);
-        await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
-
-        assertThat(execution.changePriorityAsync(2), willBe(false));
-        assertThat(execution.cancelAsync(), willBe(true));
-    }
-
-    @Test
-    void changeExecutingJobPriorityRemotely() {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void changeExecutingJobPriority(boolean local) {
         Ignite entryNode = node(0);
+        Ignite executeNode = local ? node(0) : node(1);
 
         JobDescriptor<Long, Void> job = 
JobDescriptor.builder(SleepJob.class).units(units()).build();
-        JobExecution<Void> execution = 
entryNode.compute().submit(JobTarget.node(clusterNode(node(1))), job, 
Long.MAX_VALUE);
+        JobExecution<Void> execution = 
entryNode.compute().submit(JobTarget.node(clusterNode(executeNode)), job, 
Long.MAX_VALUE);
         await().until(execution::stateAsync, 
willBe(jobStateWithStatus(EXECUTING)));
 
         assertThat(execution.changePriorityAsync(2), willBe(false));
diff --git 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java
 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SilentSleepJob.java
similarity index 88%
copy from 
modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java
copy to 
modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SilentSleepJob.java
index 605baa7929..b942c644c5 100644
--- 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java
+++ 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SilentSleepJob.java
@@ -22,15 +22,15 @@ import java.util.concurrent.TimeUnit;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.JobExecutionContext;
 
-/** Compute job that sleeps for a number of milliseconds passed in the 
argument. */
-public class SleepJob implements ComputeJob<Long, Void> {
+/** Compute job that sleeps for a number of milliseconds passed in the 
argument and completes successfully in any case. */
+public class SilentSleepJob implements ComputeJob<Long, Void> {
     @Override
     public CompletableFuture<Void> executeAsync(JobExecutionContext 
jobExecutionContext, Long timeout) {
         try {
             TimeUnit.SECONDS.sleep(timeout);
-            return null;
         } catch (InterruptedException e) {
-            throw new RuntimeException(e);
+            // no op.
         }
+        return null;
     }
 }
diff --git 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java
 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java
index 605baa7929..d291469183 100644
--- 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java
+++ 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.JobExecutionContext;
 
-/** Compute job that sleeps for a number of milliseconds passed in the 
argument. */
+/** Compute job that sleeps for a number of milliseconds passed in the 
argument and throws a {@link RuntimeException} if interrupted. */
 public class SleepJob implements ComputeJob<Long, Void> {
     @Override
     public CompletableFuture<Void> executeAsync(JobExecutionContext 
jobExecutionContext, Long timeout) {
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
index 26cc364721..ffa7db4f54 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
@@ -158,7 +158,6 @@ class FailSafeJobExecution<T> implements JobExecution<T>, 
MarshallerProvider<T>
 
     @Override
     public CompletableFuture<@Nullable Boolean> cancelAsync() {
-        resultFuture.cancel(false);
         return runningJobExecution.get().cancelAsync();
     }
 
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
index 21a9329ad3..ea28dd9e9b 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
@@ -21,6 +21,7 @@ import static 
org.apache.ignite.lang.ErrorGroups.Compute.QUEUE_OVERFLOW_ERR;
 
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
@@ -186,10 +187,11 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
             } else {
                 if (queueEntry.isInterrupted()) {
                     stateMachine.cancelJob(jobId);
+                    result.completeExceptionally(new CancellationException());
                 } else {
                     stateMachine.completeJob(jobId);
+                    result.complete(r);
                 }
-                result.complete(r);
             }
         });
     }
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
index e623649e0d..a2ac45186c 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
@@ -245,7 +245,7 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
                 execution::state,
                 jobStateWithStatusAndCreateTimeStartTime(CANCELED, 
executingState.createTime(), executingState.startTime())
         );
-        assertThat(execution.resultAsync(), willBe(0));
+        assertThat(execution.resultAsync(), 
willThrow(CancellationException.class));
     }
 
     @Test

Reply via email to