This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8373582f12 IGNITE-23696 Fix return result for remote execution (#4736)
8373582f12 is described below
commit 8373582f12f822484214d853c3ed658f7afd207f
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Wed Nov 20 15:50:27 2024 +0300
IGNITE-23696 Fix return result for remote execution (#4736)
---
.../ignite/internal/compute/ItComputeBaseTest.java | 97 ++++++++++++++--------
.../compute/{SleepJob.java => SilentSleepJob.java} | 8 +-
.../apache/ignite/internal/compute/SleepJob.java | 2 +-
.../internal/compute/FailSafeJobExecution.java | 1 -
.../internal/compute/queue/QueueExecutionImpl.java | 4 +-
.../compute/queue/PriorityQueueExecutorTest.java | 2 +-
6 files changed, 71 insertions(+), 43 deletions(-)
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index 222b41c347..30e4a31922 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -37,12 +37,14 @@ import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
@@ -69,6 +71,7 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.jupiter.params.provider.ValueSource;
/**
* Base integration tests for Compute functionality. To add new compute job
for testing both in embedded and standalone mode, add the
@@ -469,24 +472,72 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
assertThat(result, is(sumOfNodeNamesLengths));
}
- @Test
- void cancelsJobLocally() {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void cancelsJob(boolean local) {
Ignite entryNode = node(0);
+ Ignite executeNode = local ? node(0) : node(1);
+ // This job catches the interruption and throws a RuntimeException
JobDescriptor<Long, Void> job =
JobDescriptor.builder(SleepJob.class).units(units()).build();
- JobExecution<Void> execution =
entryNode.compute().submit(JobTarget.node(clusterNode(entryNode)), job,
Long.MAX_VALUE);
+ JobExecution<Void> execution =
entryNode.compute().submit(JobTarget.node(clusterNode(executeNode)), job,
Long.MAX_VALUE);
await().until(execution::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
assertThat(execution.cancelAsync(), willBe(true));
+ CompletionException completionException =
assertThrows(CompletionException.class, () -> execution.resultAsync().join());
+
+ // Unwrap CompletionException, ComputeException should be the cause
thrown from the API
+ assertThat(completionException.getCause(),
instanceOf(ComputeException.class));
+ ComputeException computeException = (ComputeException)
completionException.getCause();
+
+ // ComputeException should be caused by the RuntimeException thrown
from the SleepJob
+ assertThat(computeException.getCause(),
instanceOf(RuntimeException.class));
+ RuntimeException runtimeException = (RuntimeException)
computeException.getCause();
+
+ // RuntimeException is thrown when SleepJob catches the
InterruptedException
+ assertThat(runtimeException.getCause(),
instanceOf(InterruptedException.class));
+ assertThat(runtimeException.getCause().getCause(), is(nullValue()));
+
await().until(execution::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
}
- @Test
- void cancelsQueuedJobLocally() {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void cancelsNotCancellableJob(boolean local) {
Ignite entryNode = node(0);
- var nodes = JobTarget.node(clusterNode(entryNode));
+ Ignite executeNode = local ? node(0) : node(1);
+
+ // This job catches the interruption and returns normally
+ JobDescriptor<Long, Void> job =
JobDescriptor.builder(SilentSleepJob.class).units(units()).build();
+ JobExecution<Void> execution =
entryNode.compute().submit(JobTarget.node(clusterNode(executeNode)), job,
Long.MAX_VALUE);
+
+ await().until(execution::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
+
+ assertThat(execution.cancelAsync(), willBe(true));
+
+ CompletionException completionException =
assertThrows(CompletionException.class, () -> execution.resultAsync().join());
+
+ // Unwrap CompletionException, ComputeException should be the cause
thrown from the API
+ assertThat(completionException.getCause(),
instanceOf(ComputeException.class));
+ ComputeException computeException = (ComputeException)
completionException.getCause();
+
+ // ComputeException should be caused by the CancellationException
thrown from the executor which detects that the job completes,
+ // but was previously cancelled
+ assertThat(computeException.getCause(),
instanceOf(CancellationException.class));
+ CancellationException cancellationException = (CancellationException)
computeException.getCause();
+ assertThat(cancellationException.getCause(), is(nullValue()));
+
+ await().until(execution::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void cancelsQueuedJob(boolean local) {
+ Ignite entryNode = node(0);
+ Ignite executeNode = local ? node(0) : node(1);
+ var nodes = JobTarget.node(clusterNode(executeNode));
JobDescriptor<Long, Void> job =
JobDescriptor.builder(SleepJob.class).units(units()).build();
@@ -510,38 +561,14 @@ public abstract class ItComputeBaseTest extends
ClusterPerClassIntegrationTest {
await().until(execution1::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
}
- @Test
- void cancelsJobRemotely() {
- Ignite entryNode = node(0);
-
- JobDescriptor<Long, Void> job =
JobDescriptor.builder(SleepJob.class).units(units()).build();
- JobExecution<Void> execution =
entryNode.compute().submit(JobTarget.node(clusterNode(node(1))), job,
Long.MAX_VALUE);
-
- await().until(execution::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
-
- assertThat(execution.cancelAsync(), willBe(true));
-
- await().until(execution::stateAsync,
willBe(jobStateWithStatus(CANCELED)));
- }
-
- @Test
- void changeExecutingJobPriorityLocally() {
- Ignite entryNode = node(0);
-
- JobDescriptor<Long, Void> job =
JobDescriptor.builder(SleepJob.class).units(units()).build();
- JobExecution<Void> execution =
entryNode.compute().submit(JobTarget.node(clusterNode(entryNode)), job,
Long.MAX_VALUE);
- await().until(execution::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
-
- assertThat(execution.changePriorityAsync(2), willBe(false));
- assertThat(execution.cancelAsync(), willBe(true));
- }
-
- @Test
- void changeExecutingJobPriorityRemotely() {
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void changeExecutingJobPriority(boolean local) {
Ignite entryNode = node(0);
+ Ignite executeNode = local ? node(0) : node(1);
JobDescriptor<Long, Void> job =
JobDescriptor.builder(SleepJob.class).units(units()).build();
- JobExecution<Void> execution =
entryNode.compute().submit(JobTarget.node(clusterNode(node(1))), job,
Long.MAX_VALUE);
+ JobExecution<Void> execution =
entryNode.compute().submit(JobTarget.node(clusterNode(executeNode)), job,
Long.MAX_VALUE);
await().until(execution::stateAsync,
willBe(jobStateWithStatus(EXECUTING)));
assertThat(execution.changePriorityAsync(2), willBe(false));
diff --git
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SilentSleepJob.java
similarity index 88%
copy from
modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java
copy to
modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SilentSleepJob.java
index 605baa7929..b942c644c5 100644
---
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java
+++
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SilentSleepJob.java
@@ -22,15 +22,15 @@ import java.util.concurrent.TimeUnit;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobExecutionContext;
-/** Compute job that sleeps for a number of milliseconds passed in the
argument. */
-public class SleepJob implements ComputeJob<Long, Void> {
+/** Compute job that sleeps for a number of milliseconds passed in the
argument and completes successfully in any case. */
+public class SilentSleepJob implements ComputeJob<Long, Void> {
@Override
public CompletableFuture<Void> executeAsync(JobExecutionContext
jobExecutionContext, Long timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
- return null;
} catch (InterruptedException e) {
- throw new RuntimeException(e);
+ // no op.
}
+ return null;
}
}
diff --git
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java
index 605baa7929..d291469183 100644
---
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java
+++
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobExecutionContext;
-/** Compute job that sleeps for a number of milliseconds passed in the
argument. */
+/** Compute job that sleeps for a number of milliseconds passed in the
argument and throws a {@link RuntimeException} if interrupted. */
public class SleepJob implements ComputeJob<Long, Void> {
@Override
public CompletableFuture<Void> executeAsync(JobExecutionContext
jobExecutionContext, Long timeout) {
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
index 26cc364721..ffa7db4f54 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
@@ -158,7 +158,6 @@ class FailSafeJobExecution<T> implements JobExecution<T>,
MarshallerProvider<T>
@Override
public CompletableFuture<@Nullable Boolean> cancelAsync() {
- resultFuture.cancel(false);
return runningJobExecution.get().cancelAsync();
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
index 21a9329ad3..ea28dd9e9b 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
@@ -21,6 +21,7 @@ import static
org.apache.ignite.lang.ErrorGroups.Compute.QUEUE_OVERFLOW_ERR;
import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
@@ -186,10 +187,11 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
} else {
if (queueEntry.isInterrupted()) {
stateMachine.cancelJob(jobId);
+ result.completeExceptionally(new CancellationException());
} else {
stateMachine.completeJob(jobId);
+ result.complete(r);
}
- result.complete(r);
}
});
}
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
index e623649e0d..a2ac45186c 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
@@ -245,7 +245,7 @@ public class PriorityQueueExecutorTest extends
BaseIgniteAbstractTest {
execution::state,
jobStateWithStatusAndCreateTimeStartTime(CANCELED,
executingState.createTime(), executingState.startTime())
);
- assertThat(execution.resultAsync(), willBe(0));
+ assertThat(execution.resultAsync(),
willThrow(CancellationException.class));
}
@Test