This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 9b59ef6cc5 IGNITE-20645 Make ComputeJob.execute asynchronous (#3920)
9b59ef6cc5 is described below

commit 9b59ef6cc5974757628f3e2b83185c8430f81a01
Author: Pavel Tupitsyn <ptupit...@apache.org>
AuthorDate: Mon Jun 17 17:59:52 2024 +0300

    IGNITE-20645 Make ComputeJob.execute asynchronous (#3920)
    
    * Change `R ComputeJob.execute(...)` to `CompletableFuture<R> 
ComputeJob.executeAsync(...)`
    * Update cancellation logic to deal with the resulting future
---
 .../java/org/apache/ignite/compute/ComputeJob.java |  8 ++-
 .../org/apache/ignite/compute/IgniteCompute.java   |  2 +-
 ...ClientStreamerWithReceiverBatchSendRequest.java |  6 +-
 .../apache/ignite/client/fakes/FakeCompute.java    | 18 +++--
 .../internal/compute/ItComputeTestEmbedded.java    | 10 +--
 .../threading/ItComputeApiThreadingTest.java       |  5 +-
 .../internal/compute/utils/InteractiveJobs.java    | 14 ++--
 .../apache/ignite/internal/compute/ConcatJob.java  |  9 ++-
 .../apache/ignite/internal/compute/FailingJob.java |  3 +-
 .../ignite/internal/compute/GetNodeNameJob.java    |  7 +-
 .../internal/compute/NonEmptyConstructorJob.java   |  7 +-
 .../apache/ignite/internal/compute/SleepJob.java   |  3 +-
 .../compute/executor/ComputeExecutorImpl.java      |  2 +-
 .../compute/queue/PriorityQueueExecutor.java       |  5 +-
 .../ignite/internal/compute/queue/QueueEntry.java  | 25 +++++--
 .../internal/compute/queue/QueueExecutionImpl.java |  7 +-
 .../compute/task/TaskExecutionInternal.java        |  5 +-
 .../internal/compute/ComputeComponentImplTest.java | 12 ++--
 .../compute/executor/ComputeExecutorTest.java      | 24 +++----
 .../compute/loader/JobClassLoaderFactoryTest.java  | 15 +++--
 .../compute/queue/PriorityQueueExecutorTest.java   | 23 +++----
 .../apache/ignite/internal/compute/UnitJob.java    |  7 +-
 .../apache/ignite/internal/compute/UnitJob.java    |  7 +-
 .../Apache.Ignite.Tests/Compute/ComputeTests.cs    |  2 +-
 .../rest/compute/ItComputeControllerTest.java      |  3 +-
 .../runner/app/PlatformTestNodeRunner.java         | 23 +++----
 .../runner/app/client/ItThinClientComputeTest.java | 76 +++++++++++++++-------
 .../client/ItThinClientPartitionAwarenessTest.java |  7 +-
 28 files changed, 211 insertions(+), 124 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java 
b/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java
index 515e902bd3..97db8b05ea 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java
@@ -17,18 +17,22 @@
 
 package org.apache.ignite.compute;
 
+import java.util.concurrent.CompletableFuture;
+import org.jetbrains.annotations.Nullable;
+
 /**
  * A Compute job that may be executed on a single Ignite node, on several 
nodes, or on the entire cluster.
  *
  * @param <R> Job result type.
  */
+@SuppressWarnings("InterfaceMayBeAnnotatedFunctional")
 public interface ComputeJob<R> {
     /**
      * Executes the job on an Ignite node.
      *
      * @param context The execution context.
      * @param args Job arguments.
-     * @return Job result.
+     * @return Job future. Can be null if the job is synchronous and does not 
return any result.
      */
-    R execute(JobExecutionContext context, Object... args);
+    @Nullable CompletableFuture<R> executeAsync(JobExecutionContext context, 
Object... args);
 }
diff --git 
a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java 
b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
index 54b23d478c..c0fb02869d 100644
--- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
+++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java
@@ -37,7 +37,7 @@ import org.apache.ignite.table.mapper.Mapper;
  * Provides the ability to execute Compute jobs.
  *
  * @see ComputeJob
- * @see ComputeJob#execute(JobExecutionContext, Object...)
+ * @see ComputeJob#executeAsync(JobExecutionContext, Object...)
  */
 public interface IgniteCompute {
     /**
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientStreamerWithReceiverBatchSendRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientStreamerWithReceiverBatchSendRequest.java
index 48078c7c75..46967bd119 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientStreamerWithReceiverBatchSendRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientStreamerWithReceiverBatchSendRequest.java
@@ -102,7 +102,7 @@ public class ClientStreamerWithReceiverBatchSendRequest {
 
     private static class ReceiverRunnerJob implements ComputeJob<List<Object>> 
{
         @Override
-        public @Nullable List<Object> execute(JobExecutionContext context, 
Object... args) {
+        public @Nullable CompletableFuture<List<Object>> 
executeAsync(JobExecutionContext context, Object... args) {
             int payloadElementCount = (int) args[0];
             byte[] payload = (byte[]) args[1];
 
@@ -113,9 +113,7 @@ public class ClientStreamerWithReceiverBatchSendRequest {
             DataStreamerReceiver<Object, Object> receiver = 
ComputeUtils.instantiateReceiver(receiverClass);
             DataStreamerReceiverContext receiverContext = context::ignite;
 
-            CompletableFuture<List<Object>> receiveFut = 
receiver.receive(receiverInfo.items(), receiverContext, receiverInfo.args());
-
-            return receiveFut == null ? null : receiveFut.join();
+            return receiver.receive(receiverInfo.items(), receiverContext, 
receiverInfo.args());
         }
     }
 }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index cc3e9bd366..0fa44c7007 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -21,6 +21,7 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.ignite.compute.JobState.COMPLETED;
 import static org.apache.ignite.compute.JobState.EXECUTING;
 import static org.apache.ignite.compute.JobState.FAILED;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
 
 import java.time.Instant;
@@ -100,19 +101,22 @@ public class FakeCompute implements IgniteComputeInternal 
{
             throw new RuntimeException(e);
         }
 
-        if (err != null) {
-            throw err;
+        var err0 = err;
+        if (err0 != null) {
+            throw err0;
         }
 
         if (jobClassName.startsWith("org.apache.ignite")) {
-            Class<ComputeJob<Object>> jobClass = 
ComputeUtils.jobClass(this.getClass().getClassLoader(), jobClassName);
-            ComputeJob<Object> job = ComputeUtils.instantiateJob(jobClass);
-            Object jobRes = job.execute(new JobExecutionContextImpl(ignite, 
new AtomicBoolean(), this.getClass().getClassLoader()), args);
+            Class<ComputeJob<R>> jobClass = 
ComputeUtils.jobClass(this.getClass().getClassLoader(), jobClassName);
+            ComputeJob<R> job = ComputeUtils.instantiateJob(jobClass);
+            CompletableFuture<R> jobFut = job.executeAsync(
+                    new JobExecutionContextImpl(ignite, new AtomicBoolean(), 
this.getClass().getClassLoader()), args);
 
-            return jobExecution(completedFuture((R) jobRes));
+            return jobExecution(jobFut != null ? jobFut : 
nullCompletedFuture());
         }
 
-        return jobExecution(future != null ? future : completedFuture((R) 
nodeName));
+        var future0 = future;
+        return jobExecution(future0 != null ? future0 : completedFuture((R) 
nodeName));
     }
 
     /** {@inheritDoc} */
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
index 7ba202c790..4aed1cf525 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
@@ -341,14 +341,14 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
 
     private static class CustomFailingJob implements ComputeJob<String> {
         @Override
-        public String execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Object... args) {
             throw ExceptionUtils.sneakyThrow((Throwable) args[0]);
         }
     }
 
     private static class WaitLatchJob implements ComputeJob<String> {
         @Override
-        public String execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Object... args) {
             try {
                 ((CountDownLatch) args[0]).await();
             } catch (InterruptedException e) {
@@ -362,7 +362,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
         static final AtomicInteger counter = new AtomicInteger(0);
 
         @Override
-        public String execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Object... args) {
             try {
                 ((CountDownLatch) args[0]).await();
                 if (counter.incrementAndGet() == 1) {
@@ -377,7 +377,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
 
     private static class PerformSyncKvGetPutJob implements ComputeJob<Void> {
         @Override
-        public Void execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<Void> executeAsync(JobExecutionContext 
context, Object... args) {
             Table table = context.ignite().tables().table("test");
             KeyValueView<Integer, Integer> view = 
table.keyValueView(Integer.class, Integer.class);
 
@@ -390,7 +390,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
 
     private static class NullReturningJob implements ComputeJob<Void> {
         @Override
-        public Void execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<Void> executeAsync(JobExecutionContext 
context, Object... args) {
             return null;
         }
     }
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java
index aabc6db098..d19788588a 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.compute.threading;
 
 import static java.lang.Thread.currentThread;
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.PublicApiThreadingTests.anIgniteThread;
 import static 
org.apache.ignite.internal.PublicApiThreadingTests.asyncContinuationPool;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -141,8 +142,8 @@ class ItComputeApiThreadingTest extends 
ClusterPerClassIntegrationTest {
 
     private static class NoOpJob implements ComputeJob<String> {
         @Override
-        public String execute(JobExecutionContext context, Object... args) {
-            return "ok";
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Object... args) {
+            return completedFuture("ok");
         }
     }
 
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
index 1d5301504f..25d5022ea5 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.compute.utils;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.equalTo;
@@ -25,6 +26,7 @@ import static org.hamcrest.Matchers.notNullValue;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -163,7 +165,7 @@ public final class InteractiveJobs {
         }
 
         @Override
-        public String execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Object... args) {
             RUNNING_INTERACTIVE_JOBS_CNT.incrementAndGet();
 
             offerArgsAsSignals(args);
@@ -178,9 +180,9 @@ public final class InteractiveJobs {
                             GLOBAL_CHANNEL.offer(ACK);
                             break;
                         case RETURN:
-                            return "Done";
+                            return completedFuture("Done");
                         case RETURN_WORKER_NAME:
-                            return context.ignite().name();
+                            return completedFuture(context.ignite().name());
                         case GET_WORKER_NAME:
                             GLOBAL_CHANNEL.add(context.ignite().name());
                             break;
@@ -226,7 +228,7 @@ public final class InteractiveJobs {
         }
 
         @Override
-        public String execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Object... args) {
             RUNNING_INTERACTIVE_JOBS_CNT.incrementAndGet();
 
             try {
@@ -244,9 +246,9 @@ public final class InteractiveJobs {
                             NODE_CHANNELS.get(workerNodeName).offer(ACK);
                             break;
                         case RETURN:
-                            return "Done";
+                            return completedFuture("Done");
                         case RETURN_WORKER_NAME:
-                            return workerNodeName;
+                            return completedFuture(workerNodeName);
                         case GET_WORKER_NAME:
                             
NODE_CHANNELS.get(workerNodeName).add(workerNodeName);
                             break;
diff --git 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/ConcatJob.java
 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/ConcatJob.java
index 81decb7f4a..b235ad0fea 100644
--- 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/ConcatJob.java
+++ 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/ConcatJob.java
@@ -17,7 +17,10 @@
 
 package org.apache.ignite.internal.compute;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
 import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.JobExecutionContext;
@@ -25,9 +28,9 @@ import org.apache.ignite.compute.JobExecutionContext;
 /** Compute job that concatenates the string representation of its arguments. 
*/
 public class ConcatJob implements ComputeJob<String> {
     @Override
-    public String execute(JobExecutionContext context, Object... args) {
-        return Arrays.stream(args)
+    public CompletableFuture<String> executeAsync(JobExecutionContext context, 
Object... args) {
+        return completedFuture(Arrays.stream(args)
                 .map(Object::toString)
-                .collect(Collectors.joining());
+                .collect(Collectors.joining()));
     }
 }
diff --git 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingJob.java
 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingJob.java
index 4286c56bb7..20497cf23d 100644
--- 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingJob.java
+++ 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingJob.java
@@ -17,13 +17,14 @@
 
 package org.apache.ignite.internal.compute;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.JobExecutionContext;
 
 /** Compute job that always fails with the {@link JobException}. */
 public class FailingJob implements ComputeJob<String> {
     @Override
-    public String execute(JobExecutionContext context, Object... args) {
+    public CompletableFuture<String> executeAsync(JobExecutionContext context, 
Object... args) {
         throw new JobException("Oops", new Exception());
     }
 }
diff --git 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/GetNodeNameJob.java
 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/GetNodeNameJob.java
index 558077adf0..226fbf83b3 100644
--- 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/GetNodeNameJob.java
+++ 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/GetNodeNameJob.java
@@ -17,13 +17,16 @@
 
 package org.apache.ignite.internal.compute;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.JobExecutionContext;
 
 /** Compute job that returns the node name. */
 public class GetNodeNameJob implements ComputeJob<String> {
     @Override
-    public String execute(JobExecutionContext context, Object... args) {
-        return context.ignite().name();
+    public CompletableFuture<String> executeAsync(JobExecutionContext context, 
Object... args) {
+        return completedFuture(context.ignite().name());
     }
 }
diff --git 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/NonEmptyConstructorJob.java
 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/NonEmptyConstructorJob.java
index 7aaa6ecf6b..01721fd2fa 100644
--- 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/NonEmptyConstructorJob.java
+++ 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/NonEmptyConstructorJob.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.compute;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.JobExecutionContext;
 
@@ -27,7 +30,7 @@ public class NonEmptyConstructorJob implements 
ComputeJob<String> {
 
     /** {@inheritDoc} */
     @Override
-    public String execute(JobExecutionContext context, Object... args) {
-        return "";
+    public CompletableFuture<String> executeAsync(JobExecutionContext context, 
Object... args) {
+        return completedFuture("");
     }
 }
diff --git 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java
 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java
index 0dd32099c1..9c8c2592bb 100644
--- 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java
+++ 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.compute;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.JobExecutionContext;
@@ -24,7 +25,7 @@ import org.apache.ignite.compute.JobExecutionContext;
 /** Compute job that sleeps for a number of milliseconds passed in the 
argument. */
 public class SleepJob implements ComputeJob<Void> {
     @Override
-    public Void execute(JobExecutionContext jobExecutionContext, Object... 
args) {
+    public CompletableFuture<Void> executeAsync(JobExecutionContext 
jobExecutionContext, Object... args) {
         try {
             TimeUnit.SECONDS.sleep((Long) args[0]);
             return null;
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
index efb9baa5e1..acb0c231ab 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
@@ -83,7 +83,7 @@ public class ComputeExecutorImpl implements ComputeExecutor {
         JobExecutionContext context = new JobExecutionContextImpl(ignite, 
isInterrupted, classLoader);
 
         QueueExecution<R> execution = executorService.submit(
-                () -> ComputeUtils.instantiateJob(jobClass).execute(context, 
args),
+                () -> 
ComputeUtils.instantiateJob(jobClass).executeAsync(context, args),
                 options.priority(),
                 options.maxRetries()
         );
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
index 7823cf0524..3eb2fa65f9 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
@@ -21,6 +21,7 @@ import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
@@ -71,7 +72,7 @@ public class PriorityQueueExecutor {
      * @param maxRetries Number of retries of the execution after failure, 
{@code 0} means the execution will not be retried.
      * @return Completable future which will be finished when compute job 
finished.
      */
-    public <R> QueueExecution<R> submit(Callable<R> job, int priority, int 
maxRetries) {
+    public <R> QueueExecution<R> submit(Callable<CompletableFuture<R>> job, 
int priority, int maxRetries) {
         Objects.requireNonNull(job);
 
         UUID jobId = stateMachine.initJob();
@@ -88,7 +89,7 @@ public class PriorityQueueExecutor {
      * @param <R> Job result type.
      * @return Completable future which will be finished when compute job 
finished.
      */
-    public <R> QueueExecution<R> submit(Callable<R> job) {
+    public <R> QueueExecution<R> submit(Callable<CompletableFuture<R>> job) {
         return submit(job, 0, 0);
     }
 
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
index c0654f804a..c5842a98b6 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.compute.queue;
 
+import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
+
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicLong;
@@ -38,7 +40,7 @@ class QueueEntry<R> implements Runnable, 
Comparable<QueueEntry<R>> {
 
     private final CompletableFuture<R> future = new CompletableFuture<>();
 
-    private final Callable<R> jobAction;
+    private final Callable<CompletableFuture<R>> jobAction;
 
     private final int priority;
 
@@ -47,6 +49,9 @@ class QueueEntry<R> implements Runnable, 
Comparable<QueueEntry<R>> {
     /** Thread used to run the job, initialized once the job starts executing. 
*/
     private @Nullable Thread workerThread;
 
+    /** Future returned from jobAction.call(). */
+    private @Nullable CompletableFuture<R> jobFuture;
+
     private final Lock lock = new ReentrantLock();
 
     private volatile boolean isInterrupted;
@@ -57,7 +62,7 @@ class QueueEntry<R> implements Runnable, 
Comparable<QueueEntry<R>> {
      * @param jobAction Compute job callable.
      * @param priority Job priority.
      */
-    QueueEntry(Callable<R> jobAction, int priority) {
+    QueueEntry(Callable<CompletableFuture<R>> jobAction, int priority) {
         this.jobAction = jobAction;
         this.priority = priority;
         seqNum = seq.getAndIncrement();
@@ -73,7 +78,14 @@ class QueueEntry<R> implements Runnable, 
Comparable<QueueEntry<R>> {
         }
 
         try {
-            future.complete(jobAction.call());
+            jobFuture = jobAction.call();
+
+            if (jobFuture == null) {
+                // Allow null futures for synchronous jobs.
+                future.complete(null);
+            } else {
+                jobFuture.whenComplete(copyStateTo(future));
+            }
         } catch (Throwable e) {
             future.completeExceptionally(e);
         } finally {
@@ -108,6 +120,11 @@ class QueueEntry<R> implements Runnable, 
Comparable<QueueEntry<R>> {
                 isInterrupted = true;
                 workerThread.interrupt();
             }
+
+            if (jobFuture != null) {
+                isInterrupted = true;
+                jobFuture.cancel(true);
+            }
         } finally {
             lock.unlock();
         }
@@ -147,6 +164,6 @@ class QueueEntry<R> implements Runnable, 
Comparable<QueueEntry<R>> {
 
     @Override
     public int hashCode() {
-        return (int) (seqNum ^ (seqNum >>> 32));
+        return Long.hashCode(seqNum);
     }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
index 3b29d421d8..16612b2da5 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
@@ -42,7 +42,7 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
     private static final IgniteLogger LOG = 
Loggers.forClass(QueueExecutionImpl.class);
 
     private final UUID jobId;
-    private final Callable<R> job;
+    private final Callable<CompletableFuture<R>> job;
     private final ComputeThreadPoolExecutor executor;
     private final ComputeStateMachine stateMachine;
 
@@ -67,7 +67,7 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
      */
     QueueExecutionImpl(
             UUID jobId,
-            Callable<R> job,
+            Callable<CompletableFuture<R>> job,
             int priority,
             ComputeThreadPoolExecutor executor,
             ComputeStateMachine stateMachine) {
@@ -126,7 +126,7 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
         try {
             QueueEntry<R> queueEntry = this.queueEntry;
 
-            if (executor.removeFromQueue(queueEntry)) {
+            if (queueEntry != null && executor.removeFromQueue(queueEntry)) {
                 this.priority = newPriority;
                 this.queueEntry = null;
                 run();
@@ -152,6 +152,7 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
     private void run() {
         QueueEntry<R> queueEntry = new QueueEntry<>(() -> {
             stateMachine.executeJob(jobId);
+
             return job.call();
         }, priority);
 
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
index 32f1883226..c36aae3865 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
@@ -94,7 +94,8 @@ public class TaskExecutionInternal<R> implements 
JobExecution<R> {
         splitExecution = executorService.submit(
                 () -> {
                     MapReduceTask<R> task = instantiateTask(taskClass);
-                    return new SplitResult<>(task, task.split(context, args));
+
+                    return completedFuture(new SplitResult<>(task, 
task.split(context, args)));
                 },
                 Integer.MAX_VALUE,
                 0
@@ -115,7 +116,7 @@ public class TaskExecutionInternal<R> implements 
JobExecution<R> {
             MapReduceTask<R> task = 
splitExecution.resultAsync().thenApply(SplitResult::task).join();
 
             return executorService.submit(
-                    () -> task.reduce(results),
+                    () -> completedFuture(task.reduce(results)),
                     Integer.MAX_VALUE,
                     0
             );
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index afd538acda..ff58fe92cd 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -688,15 +688,15 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
     private static class SimpleJob implements ComputeJob<String> {
         /** {@inheritDoc} */
         @Override
-        public String execute(JobExecutionContext context, Object... args) {
-            return "jobResponse";
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Object... args) {
+            return completedFuture("jobResponse");
         }
     }
 
     private static class FailingJob implements ComputeJob<String> {
         /** {@inheritDoc} */
         @Override
-        public String execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Object... args) {
             throw new JobException("Oops", new Exception());
         }
     }
@@ -710,15 +710,15 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
     private static class GetThreadNameJob implements ComputeJob<String> {
         /** {@inheritDoc} */
         @Override
-        public String execute(JobExecutionContext context, Object... args) {
-            return Thread.currentThread().getName();
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Object... args) {
+            return completedFuture(Thread.currentThread().getName());
         }
     }
 
     private static class LongJob implements ComputeJob<String> {
         /** {@inheritDoc} */
         @Override
-        public String execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Object... args) {
             try {
                 Thread.sleep(1_000_000);
             } catch (InterruptedException e) {
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
index 523f99a5f6..059c5aee24 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.compute.executor;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.ignite.compute.JobState.CANCELED;
 import static org.apache.ignite.compute.JobState.COMPLETED;
 import static org.apache.ignite.compute.JobState.EXECUTING;
@@ -28,6 +29,7 @@ import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.compute.ComputeJob;
@@ -86,13 +88,13 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
 
     private static class InterruptingJob implements ComputeJob<Integer> {
         @Override
-        public Integer execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<Integer> executeAsync(JobExecutionContext 
context, Object... args) {
             while (true) {
                 try {
                     Thread.sleep(100);
                 } catch (InterruptedException e) {
                     Thread.currentThread().interrupt();
-                    return 0;
+                    return completedFuture(0);
                 }
             }
         }
@@ -116,11 +118,11 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
 
     private static class CancellingJob implements ComputeJob<Integer> {
         @Override
-        public Integer execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<Integer> executeAsync(JobExecutionContext 
context, Object... args) {
             while (true) {
                 try {
                     if (context.isCancelled()) {
-                        return 0;
+                        return completedFuture(0);
                     }
                     Thread.sleep(100);
                 } catch (InterruptedException e) {
@@ -151,7 +153,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
     private static class RetryJobFail implements ComputeJob<Integer> {
 
         @Override
-        public Integer execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<Integer> executeAsync(JobExecutionContext 
context, Object... args) {
             AtomicInteger runTimes = (AtomicInteger) args[0];
             runTimes.incrementAndGet();
             throw new RuntimeException();
@@ -179,13 +181,13 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
     private static class RetryJobSuccess implements ComputeJob<Integer> {
 
         @Override
-        public Integer execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<Integer> executeAsync(JobExecutionContext 
context, Object... args) {
             AtomicInteger runTimes = (AtomicInteger) args[0];
             int maxRetries = (int) args[1];
             if (runTimes.incrementAndGet() <= maxRetries) {
                 throw new RuntimeException();
             }
-            return 0;
+            return completedFuture(0);
         }
 
     }
@@ -212,9 +214,9 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
     private static class JobSuccess implements ComputeJob<Integer> {
 
         @Override
-        public Integer execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<Integer> executeAsync(JobExecutionContext 
context, Object... args) {
             AtomicInteger runTimes = (AtomicInteger) args[0];
-            return runTimes.incrementAndGet();
+            return completedFuture(runTimes.incrementAndGet());
         }
 
     }
@@ -235,8 +237,8 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
 
     private static class SimpleJob implements ComputeJob<Integer> {
         @Override
-        public Integer execute(JobExecutionContext context, Object... args) {
-            return 0;
+        public CompletableFuture<Integer> executeAsync(JobExecutionContext 
context, Object... args) {
+            return completedFuture(0);
         }
     }
 }
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/loader/JobClassLoaderFactoryTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/loader/JobClassLoaderFactoryTest.java
index 4887d275ce..d6eeaa81c6 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/loader/JobClassLoaderFactoryTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/loader/JobClassLoaderFactoryTest.java
@@ -18,6 +18,8 @@
 package org.apache.ignite.internal.compute.loader;
 
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.getPath;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertSame;
@@ -29,6 +31,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.DeploymentUnit;
@@ -64,14 +67,14 @@ class JobClassLoaderFactoryTest extends 
BaseIgniteAbstractTest {
             // then classes from the first unit are loaded from the first 
class loader
             Class<?> clazz1 = classLoader1.loadClass(UNIT_JOB_CLASS_NAME);
             ComputeJob<Integer> job1 = (ComputeJob<Integer>) 
clazz1.getDeclaredConstructor().newInstance();
-            Integer result1 = job1.execute(null);
-            assertEquals(1, result1);
+            CompletableFuture<Integer> result1 = job1.executeAsync(null);
+            assertThat(result1, willBe(1));
 
             // and classes from the second unit are loaded from the second 
class loader
             Class<?> clazz2 = classLoader2.loadClass(UNIT_JOB_CLASS_NAME);
             ComputeJob<String> job2 = (ComputeJob<String>) 
clazz2.getDeclaredConstructor().newInstance();
-            String result2 = job2.execute(null);
-            assertEquals("Hello World!", result2);
+            CompletableFuture<String> result2 = job2.executeAsync(null);
+            assertThat(result2, willBe("Hello World!"));
         }
     }
 
@@ -90,8 +93,8 @@ class JobClassLoaderFactoryTest extends 
BaseIgniteAbstractTest {
 
             // and classes are loaded in the aplhabetical order
             ComputeJob<Integer> job1 = (ComputeJob<Integer>) 
unitJobClass.getDeclaredConstructor().newInstance();
-            Integer result1 = job1.execute(null);
-            assertEquals(1, result1);
+            CompletableFuture<Integer> result1 = job1.executeAsync(null);
+            assertThat(result1, willBe(1));
 
             Class<?> job1UtilityClass = 
classLoader.loadClass(JOB1_UTILITY_CLASS_NAME);
             assertNotNull(job1UtilityClass);
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
index adc87ffdc2..eb68ff790d 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.compute.queue;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.ignite.compute.JobState.CANCELED;
 import static org.apache.ignite.compute.JobState.COMPLETED;
 import static org.apache.ignite.compute.JobState.EXECUTING;
@@ -231,7 +232,7 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
                 try {
                     latch.await();
                 } catch (InterruptedException e) {
-                    return 0;
+                    return completedFuture(0);
                 }
             }
         });
@@ -254,7 +255,7 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
         CountDownLatch latch = new CountDownLatch(1);
         QueueExecution<Object> execution = priorityQueueExecutor.submit(() -> {
             latch.await();
-            return 0;
+            return completedFuture(0);
         });
 
         JobStatus executingStatus = await().until(execution::status, 
jobStatusWithState(EXECUTING));
@@ -272,7 +273,7 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
     void completedTaskCancel() {
         initExecutor(1);
 
-        QueueExecution<Object> execution = priorityQueueExecutor.submit(() -> 
0);
+        QueueExecution<Object> execution = priorityQueueExecutor.submit(() -> 
completedFuture(0));
 
         await().until(execution::status, jobStatusWithState(COMPLETED));
 
@@ -290,13 +291,13 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
         CountDownLatch latch = new CountDownLatch(1);
         QueueExecution<Object> runningExecution = 
priorityQueueExecutor.submit(() -> {
             latch.await();
-            return 0;
+            return completedFuture(0);
         });
 
         await().until(runningExecution::status, jobStatusWithState(EXECUTING));
 
         // Put the task in the queue
-        QueueExecution<Object> execution = priorityQueueExecutor.submit(() -> 
0);
+        QueueExecution<Object> execution = priorityQueueExecutor.submit(() -> 
completedFuture(0));
         await().until(execution::status, jobStatusWithState(QUEUED));
 
         // Cancel the task
@@ -340,7 +341,7 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
             if (runTimes.incrementAndGet() <= maxRetries) {
                 throw new RuntimeException();
             }
-            return 0;
+            return completedFuture(0);
         }, 0, maxRetries);
 
         await().until(execution::status, jobStatusWithState(COMPLETED));
@@ -385,7 +386,7 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
 
         QueueExecution<Integer> runningExecution3 =  
priorityQueueExecutor.submit(() -> {
             latch3.await();
-            return 2;
+            return completedFuture(2);
         }, 1, 0);
 
         CompletableFuture<Integer> task3 = runningExecution3.resultAsync();
@@ -436,7 +437,7 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
 
         QueueExecution<Integer> runningExecution =  
priorityQueueExecutor.submit(() -> {
             latch3.await();
-            return 2;
+            return completedFuture(2);
         }, 1, 0);
 
         CompletableFuture<Integer> task3 = runningExecution.resultAsync();
@@ -491,7 +492,7 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
         // Start two tasks
         QueueExecution<Integer> runningExecution1 =  
priorityQueueExecutor.submit(() -> {
             latch1.await();
-            return 2;
+            return completedFuture(2);
         }, 1, 0);
 
         CompletableFuture<Integer> task1 = runningExecution1.resultAsync();
@@ -528,7 +529,7 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
         // Start three tasks
         QueueExecution<Integer> runningExecution =  
priorityQueueExecutor.submit(() -> {
             latch1.await();
-            return 2;
+            return completedFuture(2);
         }, 1, 0);
 
         CompletableFuture<Integer> task1 = runningExecution.resultAsync();
@@ -582,6 +583,6 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
     }
 
     private <R> CompletableFuture<R> submit(Callable<R> job, int priority, int 
maxRetries) {
-        return priorityQueueExecutor.submit(job, priority, 
maxRetries).resultAsync();
+        return priorityQueueExecutor.submit(() -> completedFuture(job.call()), 
priority, maxRetries).resultAsync();
     }
 }
diff --git 
a/modules/compute/src/unit1/java/org/apache/ignite/internal/compute/UnitJob.java
 
b/modules/compute/src/unit1/java/org/apache/ignite/internal/compute/UnitJob.java
index d1161122de..2b5fe0d9ca 100644
--- 
a/modules/compute/src/unit1/java/org/apache/ignite/internal/compute/UnitJob.java
+++ 
b/modules/compute/src/unit1/java/org/apache/ignite/internal/compute/UnitJob.java
@@ -17,13 +17,16 @@
 
 package org.apache.ignite.internal.compute;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.JobExecutionContext;
 
 /** Compute job. */
 public class UnitJob implements ComputeJob<Integer> {
     @Override
-    public Integer execute(JobExecutionContext context, Object... args) {
-        return 1;
+    public CompletableFuture<Integer> executeAsync(JobExecutionContext 
context, Object... args) {
+        return completedFuture(1);
     }
 }
diff --git 
a/modules/compute/src/unit2/java/org/apache/ignite/internal/compute/UnitJob.java
 
b/modules/compute/src/unit2/java/org/apache/ignite/internal/compute/UnitJob.java
index 51163e21ae..631e225821 100644
--- 
a/modules/compute/src/unit2/java/org/apache/ignite/internal/compute/UnitJob.java
+++ 
b/modules/compute/src/unit2/java/org/apache/ignite/internal/compute/UnitJob.java
@@ -17,13 +17,16 @@
 
 package org.apache.ignite.internal.compute;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.JobExecutionContext;
 
 /** Compute job. */
 public class UnitJob implements ComputeJob<String> {
     @Override
-    public String execute(JobExecutionContext context, Object... args) {
-        return "Hello World!";
+    public CompletableFuture<String> executeAsync(JobExecutionContext context, 
Object... args) {
+        return completedFuture("Hello World!");
     }
 }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index a3ee3a9c08..812e760dd1 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -416,7 +416,7 @@ namespace Apache.Ignite.Tests.Compute
             var str = ex.ToString();
 
             StringAssert.Contains(
-                "at 
org.apache.ignite.internal.runner.app.PlatformTestNodeRunner$ExceptionJob.execute(PlatformTestNodeRunner.java:",
+                "at 
org.apache.ignite.internal.runner.app.PlatformTestNodeRunner$ExceptionJob.executeAsync(PlatformTestNodeRunner.java:",
                 str);
         }
 
diff --git 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java
 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java
index 79b3435e9e..767fbc4a15 100644
--- 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java
+++ 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java
@@ -40,6 +40,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.JobDescriptor;
@@ -396,7 +397,7 @@ public class ItComputeControllerTest extends 
ClusterPerClassIntegrationTest {
     private static class BlockingJob implements ComputeJob<String> {
         /** {@inheritDoc} */
         @Override
-        public String execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Object... args) {
             synchronized (LOCK) {
                 try {
                     LOCK.wait();
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
index 2539d24935..e8c6f98390 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.runner.app;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.MAX_TIME_PRECISION;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createZone;
@@ -550,12 +551,12 @@ public class PlatformTestNodeRunner {
     @SuppressWarnings("unused") // Used by platform tests.
     private static class CreateTableJob implements ComputeJob<String> {
         @Override
-        public String execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Object... args) {
             String tableName = (String) args[0];
 
             context.ignite().sql().execute(null, "CREATE TABLE " + tableName + 
"(key BIGINT PRIMARY KEY, val INT)");
 
-            return tableName;
+            return completedFuture(tableName);
         }
     }
 
@@ -565,11 +566,11 @@ public class PlatformTestNodeRunner {
     @SuppressWarnings("unused") // Used by platform tests.
     private static class DropTableJob implements ComputeJob<String> {
         @Override
-        public String execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Object... args) {
             String tableName = (String) args[0];
             context.ignite().sql().execute(null, "DROP TABLE " + tableName + 
"");
 
-            return tableName;
+            return completedFuture(tableName);
         }
     }
 
@@ -579,7 +580,7 @@ public class PlatformTestNodeRunner {
     @SuppressWarnings("unused") // Used by platform tests.
     private static class ExceptionJob implements ComputeJob<String> {
         @Override
-        public String execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Object... args) {
             throw new RuntimeException("Test exception: " + args[0]);
         }
     }
@@ -590,7 +591,7 @@ public class PlatformTestNodeRunner {
     @SuppressWarnings("unused") // Used by platform tests.
     private static class CheckedExceptionJob implements ComputeJob<String> {
         @Override
-        public String execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Object... args) {
             throw new CompletionException(new 
IgniteCheckedException(Common.NODE_LEFT_ERR, "TestCheckedEx: " + args[0]));
         }
     }
@@ -601,7 +602,7 @@ public class PlatformTestNodeRunner {
     @SuppressWarnings("unused") // Used by platform tests.
     private static class ColocationHashJob implements ComputeJob<Integer> {
         @Override
-        public Integer execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<Integer> executeAsync(JobExecutionContext 
context, Object... args) {
             var columnCount = (int) args[0];
             var buf = (byte[]) args[1];
             var timePrecision = (int) args[2];
@@ -712,7 +713,7 @@ public class PlatformTestNodeRunner {
             try {
                 Row row = marsh.marshal(tuple);
 
-                return row.colocationHash();
+                return completedFuture(row.colocationHash());
             } catch (TupleMarshallerException e) {
                 throw new RuntimeException(e);
             }
@@ -725,7 +726,7 @@ public class PlatformTestNodeRunner {
     @SuppressWarnings("unused") // Used by platform tests.
     private static class TableRowColocationHashJob implements 
ComputeJob<Integer> {
         @Override
-        public Integer execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<Integer> executeAsync(JobExecutionContext 
context, Object... args) {
             String tableName = (String) args[0];
             int i = (int) args[1];
             Tuple key = Tuple.create().set("id", 1 + i).set("id0", 2L + 
i).set("id1", "3" + i);
@@ -736,7 +737,7 @@ public class PlatformTestNodeRunner {
             TupleMarshaller marsh = view.marshaller(1);
 
             try {
-                return marsh.marshal(key).colocationHash();
+                return completedFuture(marsh.marshal(key).colocationHash());
             } catch (TupleMarshallerException e) {
                 throw new RuntimeException(e);
             }
@@ -749,7 +750,7 @@ public class PlatformTestNodeRunner {
     @SuppressWarnings("unused") // Used by platform tests.
     private static class EnableAuthenticationJob implements ComputeJob<Void> {
         @Override
-        public Void execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<Void> executeAsync(JobExecutionContext 
context, Object... args) {
             boolean enable = ((Integer) args[0]) != 0;
             @SuppressWarnings("resource") IgniteImpl ignite = (IgniteImpl) 
context.ignite();
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
index f6d310ae9a..4bf18bc6df 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.internal.runner.app.client;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static org.apache.ignite.compute.JobState.CANCELED;
 import static org.apache.ignite.compute.JobState.COMPLETED;
 import static org.apache.ignite.compute.JobState.EXECUTING;
@@ -27,6 +29,7 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Table.COLUMN_ALREADY_EXISTS_ERR;
@@ -86,6 +89,7 @@ import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.mapper.Mapper;
 import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
+import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
@@ -157,10 +161,13 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
         assertThat(execution.changePriorityAsync(0), willBe(false));
     }
 
-    @Test
-    void testCancelOnSpecificNodeAsync() {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testCancelOnSpecificNodeAsync(boolean asyncJob) {
         int sleepMs = 1_000_000;
-        JobDescriptor sleepJob = JobDescriptor.builder(SleepJob.class).build();
+        JobDescriptor sleepJob = JobDescriptor
+                .builder(asyncJob ? AsyncSleepJob.class : SleepJob.class)
+                .build();
 
         JobExecution<String> execution1 = 
client().compute().submit(Set.of(node(0)), sleepJob, sleepMs);
         JobExecution<String> execution2 = 
client().compute().submit(Set.of(node(1)), sleepJob, sleepMs);
@@ -326,19 +333,21 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
         assertNull(cause.getCause()); // No stack trace by default.
     }
 
-    @Test
-    void testExceptionInJobPropagatesToClientWithClassAndMessageAsync() {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testExceptionInJobPropagatesToClientWithClassAndMessageAsync(boolean 
asyncJob) {
         IgniteException cause = getExceptionInJobExecutionAsync(
-                client().compute().submit(Set.of(node(0)), 
JobDescriptor.builder(ExceptionJob.class).build())
+                client().compute().submit(Set.of(node(0)), 
JobDescriptor.builder(ExceptionJob.class).build(), asyncJob)
         );
 
         assertComputeExceptionWithClassAndMessage(cause);
     }
 
-    @Test
-    void testExceptionInJobPropagatesToClientWithClassAndMessageSync() {
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    void testExceptionInJobPropagatesToClientWithClassAndMessageSync(boolean 
asyncJob) {
         IgniteException cause = getExceptionInJobExecutionSync(
-                () -> client().compute().execute(Set.of(node(0)), 
JobDescriptor.builder(ExceptionJob.class).build())
+                () -> client().compute().execute(Set.of(node(0)), 
JobDescriptor.builder(ExceptionJob.class).build(), asyncJob)
         );
 
         assertComputeExceptionWithClassAndMessage(cause);
@@ -509,7 +518,7 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
         assertThat(cause.getCause().getMessage(), containsString(
                 "Caused by: java.lang.ArithmeticException: math err" + 
System.lineSeparator()
                         + "\tat 
org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$"
-                        + "ExceptionJob.execute(ItThinClientComputeTest.java:")
+                        + 
"ExceptionJob.executeAsync(ItThinClientComputeTest.java:")
         );
     }
 
@@ -737,39 +746,47 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
 
     private static class NodeNameJob implements ComputeJob<String> {
         @Override
-        public String execute(JobExecutionContext context, Object... args) {
-            return context.ignite().name() + 
Arrays.stream(args).map(Object::toString).collect(Collectors.joining("_"));
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Object... args) {
+            return completedFuture(
+                    context.ignite().name() + 
Arrays.stream(args).map(Object::toString).collect(Collectors.joining("_")));
         }
     }
 
     private static class ConcatJob implements ComputeJob<String> {
         @Override
-        public String execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Object... args) {
             if (args == null) {
-                return null;
+                return nullCompletedFuture();
             }
 
-            return Arrays.stream(args).map(o -> o == null ? "null" : 
o.toString()).collect(Collectors.joining("_"));
+            return completedFuture(
+                    Arrays.stream(args).map(o -> o == null ? "null" : 
o.toString()).collect(Collectors.joining("_")));
         }
     }
 
     private static class IgniteExceptionJob implements ComputeJob<String> {
         @Override
-        public String execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Object... args) {
             throw new CustomException(TRACE_ID, COLUMN_ALREADY_EXISTS_ERR, 
"Custom job error", null);
         }
     }
 
     private static class ExceptionJob implements ComputeJob<String> {
         @Override
-        public String execute(JobExecutionContext context, Object... args) {
-            throw new ArithmeticException("math err");
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Object... args) {
+            boolean asyncJob = args.length > 0 && (Boolean) args[0];
+
+            if (asyncJob) {
+                return failedFuture(new ArithmeticException("math err"));
+            } else {
+                throw new ArithmeticException("math err");
+            }
         }
     }
 
     private static class EchoJob implements ComputeJob<Object> {
         @Override
-        public Object execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<Object> executeAsync(JobExecutionContext 
context, Object... args) {
             var value = args[0];
 
             if (!(value instanceof byte[])) {
@@ -778,13 +795,13 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
                 assertEquals(expectedString, valueString, "Unexpected string 
representation of value");
             }
 
-            return args[0];
+            return completedFuture(args[0]);
         }
     }
 
     private static class SleepJob implements ComputeJob<Void> {
         @Override
-        public Void execute(JobExecutionContext context, Object... args) {
+        public @Nullable CompletableFuture<Void> 
executeAsync(JobExecutionContext context, Object... args) {
             try {
                 Thread.sleep((Integer) args[0]);
             } catch (InterruptedException e) {
@@ -795,10 +812,23 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
         }
     }
 
+    private static class AsyncSleepJob implements ComputeJob<Void> {
+        @Override
+        public @Nullable CompletableFuture<Void> 
executeAsync(JobExecutionContext context, Object... args) {
+            return CompletableFuture.runAsync(() -> {
+                try {
+                    Thread.sleep((Integer) args[0]);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        }
+    }
+
     private static class DecimalJob implements ComputeJob<BigDecimal> {
         @Override
-        public BigDecimal execute(JobExecutionContext context, Object... args) 
{
-            return new BigDecimal((String) args[0]).setScale((Integer) 
args[1], RoundingMode.HALF_UP);
+        public CompletableFuture<BigDecimal> executeAsync(JobExecutionContext 
context, Object... args) {
+            return completedFuture(new BigDecimal((String) 
args[0]).setScale((Integer) args[1], RoundingMode.HALF_UP));
         }
     }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientPartitionAwarenessTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientPartitionAwarenessTest.java
index 8d7c85b5d3..ded59c2b37 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientPartitionAwarenessTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientPartitionAwarenessTest.java
@@ -17,11 +17,13 @@
 
 package org.apache.ignite.internal.runner.app.client;
 
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import org.apache.ignite.client.IgniteClient;
 import org.apache.ignite.compute.ComputeJob;
@@ -121,9 +123,10 @@ public class ItThinClientPartitionAwarenessTest extends 
ItAbstractThinClientTest
 
     private static class NodeNameJob implements ComputeJob<String> {
         @Override
-        public String execute(JobExecutionContext context, Object... args) {
+        public CompletableFuture<String> executeAsync(JobExecutionContext 
context, Object... args) {
             //noinspection resource
-            return context.ignite().name() + 
Arrays.stream(args).map(Object::toString).collect(Collectors.joining("_"));
+            return completedFuture(
+                    context.ignite().name() + 
Arrays.stream(args).map(Object::toString).collect(Collectors.joining("_")));
         }
     }
 }

Reply via email to