This is an automated email from the ASF dual-hosted git repository. ptupitsyn pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 9b59ef6cc5 IGNITE-20645 Make ComputeJob.execute asynchronous (#3920) 9b59ef6cc5 is described below commit 9b59ef6cc5974757628f3e2b83185c8430f81a01 Author: Pavel Tupitsyn <ptupit...@apache.org> AuthorDate: Mon Jun 17 17:59:52 2024 +0300 IGNITE-20645 Make ComputeJob.execute asynchronous (#3920) * Change `R ComputeJob.execute(...)` to `CompletableFuture<R> ComputeJob.executeAsync(...)` * Update cancellation logic to deal with the resulting future --- .../java/org/apache/ignite/compute/ComputeJob.java | 8 ++- .../org/apache/ignite/compute/IgniteCompute.java | 2 +- ...ClientStreamerWithReceiverBatchSendRequest.java | 6 +- .../apache/ignite/client/fakes/FakeCompute.java | 18 +++-- .../internal/compute/ItComputeTestEmbedded.java | 10 +-- .../threading/ItComputeApiThreadingTest.java | 5 +- .../internal/compute/utils/InteractiveJobs.java | 14 ++-- .../apache/ignite/internal/compute/ConcatJob.java | 9 ++- .../apache/ignite/internal/compute/FailingJob.java | 3 +- .../ignite/internal/compute/GetNodeNameJob.java | 7 +- .../internal/compute/NonEmptyConstructorJob.java | 7 +- .../apache/ignite/internal/compute/SleepJob.java | 3 +- .../compute/executor/ComputeExecutorImpl.java | 2 +- .../compute/queue/PriorityQueueExecutor.java | 5 +- .../ignite/internal/compute/queue/QueueEntry.java | 25 +++++-- .../internal/compute/queue/QueueExecutionImpl.java | 7 +- .../compute/task/TaskExecutionInternal.java | 5 +- .../internal/compute/ComputeComponentImplTest.java | 12 ++-- .../compute/executor/ComputeExecutorTest.java | 24 +++---- .../compute/loader/JobClassLoaderFactoryTest.java | 15 +++-- .../compute/queue/PriorityQueueExecutorTest.java | 23 +++---- .../apache/ignite/internal/compute/UnitJob.java | 7 +- .../apache/ignite/internal/compute/UnitJob.java | 7 +- .../Apache.Ignite.Tests/Compute/ComputeTests.cs | 2 +- .../rest/compute/ItComputeControllerTest.java | 3 +- .../runner/app/PlatformTestNodeRunner.java | 23 +++---- .../runner/app/client/ItThinClientComputeTest.java | 76 +++++++++++++++------- .../client/ItThinClientPartitionAwarenessTest.java | 7 +- 28 files changed, 211 insertions(+), 124 deletions(-) diff --git a/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java b/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java index 515e902bd3..97db8b05ea 100644 --- a/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java +++ b/modules/api/src/main/java/org/apache/ignite/compute/ComputeJob.java @@ -17,18 +17,22 @@ package org.apache.ignite.compute; +import java.util.concurrent.CompletableFuture; +import org.jetbrains.annotations.Nullable; + /** * A Compute job that may be executed on a single Ignite node, on several nodes, or on the entire cluster. * * @param <R> Job result type. */ +@SuppressWarnings("InterfaceMayBeAnnotatedFunctional") public interface ComputeJob<R> { /** * Executes the job on an Ignite node. * * @param context The execution context. * @param args Job arguments. - * @return Job result. + * @return Job future. Can be null if the job is synchronous and does not return any result. */ - R execute(JobExecutionContext context, Object... args); + @Nullable CompletableFuture<R> executeAsync(JobExecutionContext context, Object... args); } diff --git a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java index 54b23d478c..c0fb02869d 100644 --- a/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java +++ b/modules/api/src/main/java/org/apache/ignite/compute/IgniteCompute.java @@ -37,7 +37,7 @@ import org.apache.ignite.table.mapper.Mapper; * Provides the ability to execute Compute jobs. * * @see ComputeJob - * @see ComputeJob#execute(JobExecutionContext, Object...) + * @see ComputeJob#executeAsync(JobExecutionContext, Object...) */ public interface IgniteCompute { /** diff --git a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientStreamerWithReceiverBatchSendRequest.java b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientStreamerWithReceiverBatchSendRequest.java index 48078c7c75..46967bd119 100644 --- a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientStreamerWithReceiverBatchSendRequest.java +++ b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientStreamerWithReceiverBatchSendRequest.java @@ -102,7 +102,7 @@ public class ClientStreamerWithReceiverBatchSendRequest { private static class ReceiverRunnerJob implements ComputeJob<List<Object>> { @Override - public @Nullable List<Object> execute(JobExecutionContext context, Object... args) { + public @Nullable CompletableFuture<List<Object>> executeAsync(JobExecutionContext context, Object... args) { int payloadElementCount = (int) args[0]; byte[] payload = (byte[]) args[1]; @@ -113,9 +113,7 @@ public class ClientStreamerWithReceiverBatchSendRequest { DataStreamerReceiver<Object, Object> receiver = ComputeUtils.instantiateReceiver(receiverClass); DataStreamerReceiverContext receiverContext = context::ignite; - CompletableFuture<List<Object>> receiveFut = receiver.receive(receiverInfo.items(), receiverContext, receiverInfo.args()); - - return receiveFut == null ? null : receiveFut.join(); + return receiver.receive(receiverInfo.items(), receiverContext, receiverInfo.args()); } } } diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java index cc3e9bd366..0fa44c7007 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java @@ -21,6 +21,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.compute.JobState.COMPLETED; import static org.apache.ignite.compute.JobState.EXECUTING; import static org.apache.ignite.compute.JobState.FAILED; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture; import java.time.Instant; @@ -100,19 +101,22 @@ public class FakeCompute implements IgniteComputeInternal { throw new RuntimeException(e); } - if (err != null) { - throw err; + var err0 = err; + if (err0 != null) { + throw err0; } if (jobClassName.startsWith("org.apache.ignite")) { - Class<ComputeJob<Object>> jobClass = ComputeUtils.jobClass(this.getClass().getClassLoader(), jobClassName); - ComputeJob<Object> job = ComputeUtils.instantiateJob(jobClass); - Object jobRes = job.execute(new JobExecutionContextImpl(ignite, new AtomicBoolean(), this.getClass().getClassLoader()), args); + Class<ComputeJob<R>> jobClass = ComputeUtils.jobClass(this.getClass().getClassLoader(), jobClassName); + ComputeJob<R> job = ComputeUtils.instantiateJob(jobClass); + CompletableFuture<R> jobFut = job.executeAsync( + new JobExecutionContextImpl(ignite, new AtomicBoolean(), this.getClass().getClassLoader()), args); - return jobExecution(completedFuture((R) jobRes)); + return jobExecution(jobFut != null ? jobFut : nullCompletedFuture()); } - return jobExecution(future != null ? future : completedFuture((R) nodeName)); + var future0 = future; + return jobExecution(future0 != null ? future0 : completedFuture((R) nodeName)); } /** {@inheritDoc} */ diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java index 7ba202c790..4aed1cf525 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java @@ -341,14 +341,14 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { private static class CustomFailingJob implements ComputeJob<String> { @Override - public String execute(JobExecutionContext context, Object... args) { + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { throw ExceptionUtils.sneakyThrow((Throwable) args[0]); } } private static class WaitLatchJob implements ComputeJob<String> { @Override - public String execute(JobExecutionContext context, Object... args) { + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { try { ((CountDownLatch) args[0]).await(); } catch (InterruptedException e) { @@ -362,7 +362,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { static final AtomicInteger counter = new AtomicInteger(0); @Override - public String execute(JobExecutionContext context, Object... args) { + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { try { ((CountDownLatch) args[0]).await(); if (counter.incrementAndGet() == 1) { @@ -377,7 +377,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { private static class PerformSyncKvGetPutJob implements ComputeJob<Void> { @Override - public Void execute(JobExecutionContext context, Object... args) { + public CompletableFuture<Void> executeAsync(JobExecutionContext context, Object... args) { Table table = context.ignite().tables().table("test"); KeyValueView<Integer, Integer> view = table.keyValueView(Integer.class, Integer.class); @@ -390,7 +390,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { private static class NullReturningJob implements ComputeJob<Void> { @Override - public Void execute(JobExecutionContext context, Object... args) { + public CompletableFuture<Void> executeAsync(JobExecutionContext context, Object... args) { return null; } } diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java index aabc6db098..d19788588a 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/threading/ItComputeApiThreadingTest.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.compute.threading; import static java.lang.Thread.currentThread; +import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.internal.PublicApiThreadingTests.anIgniteThread; import static org.apache.ignite.internal.PublicApiThreadingTests.asyncContinuationPool; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; @@ -141,8 +142,8 @@ class ItComputeApiThreadingTest extends ClusterPerClassIntegrationTest { private static class NoOpJob implements ComputeJob<String> { @Override - public String execute(JobExecutionContext context, Object... args) { - return "ok"; + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { + return completedFuture("ok"); } } diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java index 1d5301504f..25d5022ea5 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveJobs.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.compute.utils; +import static java.util.concurrent.CompletableFuture.completedFuture; import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; @@ -25,6 +26,7 @@ import static org.hamcrest.Matchers.notNullValue; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @@ -163,7 +165,7 @@ public final class InteractiveJobs { } @Override - public String execute(JobExecutionContext context, Object... args) { + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { RUNNING_INTERACTIVE_JOBS_CNT.incrementAndGet(); offerArgsAsSignals(args); @@ -178,9 +180,9 @@ public final class InteractiveJobs { GLOBAL_CHANNEL.offer(ACK); break; case RETURN: - return "Done"; + return completedFuture("Done"); case RETURN_WORKER_NAME: - return context.ignite().name(); + return completedFuture(context.ignite().name()); case GET_WORKER_NAME: GLOBAL_CHANNEL.add(context.ignite().name()); break; @@ -226,7 +228,7 @@ public final class InteractiveJobs { } @Override - public String execute(JobExecutionContext context, Object... args) { + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { RUNNING_INTERACTIVE_JOBS_CNT.incrementAndGet(); try { @@ -244,9 +246,9 @@ public final class InteractiveJobs { NODE_CHANNELS.get(workerNodeName).offer(ACK); break; case RETURN: - return "Done"; + return completedFuture("Done"); case RETURN_WORKER_NAME: - return workerNodeName; + return completedFuture(workerNodeName); case GET_WORKER_NAME: NODE_CHANNELS.get(workerNodeName).add(workerNodeName); break; diff --git a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/ConcatJob.java b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/ConcatJob.java index 81decb7f4a..b235ad0fea 100644 --- a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/ConcatJob.java +++ b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/ConcatJob.java @@ -17,7 +17,10 @@ package org.apache.ignite.internal.compute; +import static java.util.concurrent.CompletableFuture.completedFuture; + import java.util.Arrays; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.JobExecutionContext; @@ -25,9 +28,9 @@ import org.apache.ignite.compute.JobExecutionContext; /** Compute job that concatenates the string representation of its arguments. */ public class ConcatJob implements ComputeJob<String> { @Override - public String execute(JobExecutionContext context, Object... args) { - return Arrays.stream(args) + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { + return completedFuture(Arrays.stream(args) .map(Object::toString) - .collect(Collectors.joining()); + .collect(Collectors.joining())); } } diff --git a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingJob.java b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingJob.java index 4286c56bb7..20497cf23d 100644 --- a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingJob.java +++ b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingJob.java @@ -17,13 +17,14 @@ package org.apache.ignite.internal.compute; +import java.util.concurrent.CompletableFuture; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.JobExecutionContext; /** Compute job that always fails with the {@link JobException}. */ public class FailingJob implements ComputeJob<String> { @Override - public String execute(JobExecutionContext context, Object... args) { + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { throw new JobException("Oops", new Exception()); } } diff --git a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/GetNodeNameJob.java b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/GetNodeNameJob.java index 558077adf0..226fbf83b3 100644 --- a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/GetNodeNameJob.java +++ b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/GetNodeNameJob.java @@ -17,13 +17,16 @@ package org.apache.ignite.internal.compute; +import static java.util.concurrent.CompletableFuture.completedFuture; + +import java.util.concurrent.CompletableFuture; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.JobExecutionContext; /** Compute job that returns the node name. */ public class GetNodeNameJob implements ComputeJob<String> { @Override - public String execute(JobExecutionContext context, Object... args) { - return context.ignite().name(); + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { + return completedFuture(context.ignite().name()); } } diff --git a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/NonEmptyConstructorJob.java b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/NonEmptyConstructorJob.java index 7aaa6ecf6b..01721fd2fa 100644 --- a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/NonEmptyConstructorJob.java +++ b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/NonEmptyConstructorJob.java @@ -17,6 +17,9 @@ package org.apache.ignite.internal.compute; +import static java.util.concurrent.CompletableFuture.completedFuture; + +import java.util.concurrent.CompletableFuture; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.JobExecutionContext; @@ -27,7 +30,7 @@ public class NonEmptyConstructorJob implements ComputeJob<String> { /** {@inheritDoc} */ @Override - public String execute(JobExecutionContext context, Object... args) { - return ""; + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { + return completedFuture(""); } } diff --git a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java index 0dd32099c1..9c8c2592bb 100644 --- a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java +++ b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/SleepJob.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.compute; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.JobExecutionContext; @@ -24,7 +25,7 @@ import org.apache.ignite.compute.JobExecutionContext; /** Compute job that sleeps for a number of milliseconds passed in the argument. */ public class SleepJob implements ComputeJob<Void> { @Override - public Void execute(JobExecutionContext jobExecutionContext, Object... args) { + public CompletableFuture<Void> executeAsync(JobExecutionContext jobExecutionContext, Object... args) { try { TimeUnit.SECONDS.sleep((Long) args[0]); return null; diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java index efb9baa5e1..acb0c231ab 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java @@ -83,7 +83,7 @@ public class ComputeExecutorImpl implements ComputeExecutor { JobExecutionContext context = new JobExecutionContextImpl(ignite, isInterrupted, classLoader); QueueExecution<R> execution = executorService.submit( - () -> ComputeUtils.instantiateJob(jobClass).execute(context, args), + () -> ComputeUtils.instantiateJob(jobClass).executeAsync(context, args), options.priority(), options.maxRetries() ); diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java index 7823cf0524..3eb2fa65f9 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java @@ -21,6 +21,7 @@ import java.util.Objects; import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.compute.configuration.ComputeConfiguration; @@ -71,7 +72,7 @@ public class PriorityQueueExecutor { * @param maxRetries Number of retries of the execution after failure, {@code 0} means the execution will not be retried. * @return Completable future which will be finished when compute job finished. */ - public <R> QueueExecution<R> submit(Callable<R> job, int priority, int maxRetries) { + public <R> QueueExecution<R> submit(Callable<CompletableFuture<R>> job, int priority, int maxRetries) { Objects.requireNonNull(job); UUID jobId = stateMachine.initJob(); @@ -88,7 +89,7 @@ public class PriorityQueueExecutor { * @param <R> Job result type. * @return Completable future which will be finished when compute job finished. */ - public <R> QueueExecution<R> submit(Callable<R> job) { + public <R> QueueExecution<R> submit(Callable<CompletableFuture<R>> job) { return submit(job, 0, 0); } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java index c0654f804a..c5842a98b6 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueEntry.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.compute.queue; +import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo; + import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicLong; @@ -38,7 +40,7 @@ class QueueEntry<R> implements Runnable, Comparable<QueueEntry<R>> { private final CompletableFuture<R> future = new CompletableFuture<>(); - private final Callable<R> jobAction; + private final Callable<CompletableFuture<R>> jobAction; private final int priority; @@ -47,6 +49,9 @@ class QueueEntry<R> implements Runnable, Comparable<QueueEntry<R>> { /** Thread used to run the job, initialized once the job starts executing. */ private @Nullable Thread workerThread; + /** Future returned from jobAction.call(). */ + private @Nullable CompletableFuture<R> jobFuture; + private final Lock lock = new ReentrantLock(); private volatile boolean isInterrupted; @@ -57,7 +62,7 @@ class QueueEntry<R> implements Runnable, Comparable<QueueEntry<R>> { * @param jobAction Compute job callable. * @param priority Job priority. */ - QueueEntry(Callable<R> jobAction, int priority) { + QueueEntry(Callable<CompletableFuture<R>> jobAction, int priority) { this.jobAction = jobAction; this.priority = priority; seqNum = seq.getAndIncrement(); @@ -73,7 +78,14 @@ class QueueEntry<R> implements Runnable, Comparable<QueueEntry<R>> { } try { - future.complete(jobAction.call()); + jobFuture = jobAction.call(); + + if (jobFuture == null) { + // Allow null futures for synchronous jobs. + future.complete(null); + } else { + jobFuture.whenComplete(copyStateTo(future)); + } } catch (Throwable e) { future.completeExceptionally(e); } finally { @@ -108,6 +120,11 @@ class QueueEntry<R> implements Runnable, Comparable<QueueEntry<R>> { isInterrupted = true; workerThread.interrupt(); } + + if (jobFuture != null) { + isInterrupted = true; + jobFuture.cancel(true); + } } finally { lock.unlock(); } @@ -147,6 +164,6 @@ class QueueEntry<R> implements Runnable, Comparable<QueueEntry<R>> { @Override public int hashCode() { - return (int) (seqNum ^ (seqNum >>> 32)); + return Long.hashCode(seqNum); } } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java index 3b29d421d8..16612b2da5 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java @@ -42,7 +42,7 @@ class QueueExecutionImpl<R> implements QueueExecution<R> { private static final IgniteLogger LOG = Loggers.forClass(QueueExecutionImpl.class); private final UUID jobId; - private final Callable<R> job; + private final Callable<CompletableFuture<R>> job; private final ComputeThreadPoolExecutor executor; private final ComputeStateMachine stateMachine; @@ -67,7 +67,7 @@ class QueueExecutionImpl<R> implements QueueExecution<R> { */ QueueExecutionImpl( UUID jobId, - Callable<R> job, + Callable<CompletableFuture<R>> job, int priority, ComputeThreadPoolExecutor executor, ComputeStateMachine stateMachine) { @@ -126,7 +126,7 @@ class QueueExecutionImpl<R> implements QueueExecution<R> { try { QueueEntry<R> queueEntry = this.queueEntry; - if (executor.removeFromQueue(queueEntry)) { + if (queueEntry != null && executor.removeFromQueue(queueEntry)) { this.priority = newPriority; this.queueEntry = null; run(); @@ -152,6 +152,7 @@ class QueueExecutionImpl<R> implements QueueExecution<R> { private void run() { QueueEntry<R> queueEntry = new QueueEntry<>(() -> { stateMachine.executeJob(jobId); + return job.call(); }, priority); diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java index 32f1883226..c36aae3865 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java @@ -94,7 +94,8 @@ public class TaskExecutionInternal<R> implements JobExecution<R> { splitExecution = executorService.submit( () -> { MapReduceTask<R> task = instantiateTask(taskClass); - return new SplitResult<>(task, task.split(context, args)); + + return completedFuture(new SplitResult<>(task, task.split(context, args))); }, Integer.MAX_VALUE, 0 @@ -115,7 +116,7 @@ public class TaskExecutionInternal<R> implements JobExecution<R> { MapReduceTask<R> task = splitExecution.resultAsync().thenApply(SplitResult::task).join(); return executorService.submit( - () -> task.reduce(results), + () -> completedFuture(task.reduce(results)), Integer.MAX_VALUE, 0 ); diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java index afd538acda..ff58fe92cd 100644 --- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java @@ -688,15 +688,15 @@ class ComputeComponentImplTest extends BaseIgniteAbstractTest { private static class SimpleJob implements ComputeJob<String> { /** {@inheritDoc} */ @Override - public String execute(JobExecutionContext context, Object... args) { - return "jobResponse"; + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { + return completedFuture("jobResponse"); } } private static class FailingJob implements ComputeJob<String> { /** {@inheritDoc} */ @Override - public String execute(JobExecutionContext context, Object... args) { + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { throw new JobException("Oops", new Exception()); } } @@ -710,15 +710,15 @@ class ComputeComponentImplTest extends BaseIgniteAbstractTest { private static class GetThreadNameJob implements ComputeJob<String> { /** {@inheritDoc} */ @Override - public String execute(JobExecutionContext context, Object... args) { - return Thread.currentThread().getName(); + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { + return completedFuture(Thread.currentThread().getName()); } } private static class LongJob implements ComputeJob<String> { /** {@inheritDoc} */ @Override - public String execute(JobExecutionContext context, Object... args) { + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { try { Thread.sleep(1_000_000); } catch (InterruptedException e) { diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java index 523f99a5f6..059c5aee24 100644 --- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.compute.executor; +import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.compute.JobState.CANCELED; import static org.apache.ignite.compute.JobState.COMPLETED; import static org.apache.ignite.compute.JobState.EXECUTING; @@ -28,6 +29,7 @@ import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.Ignite; import org.apache.ignite.compute.ComputeJob; @@ -86,13 +88,13 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest { private static class InterruptingJob implements ComputeJob<Integer> { @Override - public Integer execute(JobExecutionContext context, Object... args) { + public CompletableFuture<Integer> executeAsync(JobExecutionContext context, Object... args) { while (true) { try { Thread.sleep(100); } catch (InterruptedException e) { Thread.currentThread().interrupt(); - return 0; + return completedFuture(0); } } } @@ -116,11 +118,11 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest { private static class CancellingJob implements ComputeJob<Integer> { @Override - public Integer execute(JobExecutionContext context, Object... args) { + public CompletableFuture<Integer> executeAsync(JobExecutionContext context, Object... args) { while (true) { try { if (context.isCancelled()) { - return 0; + return completedFuture(0); } Thread.sleep(100); } catch (InterruptedException e) { @@ -151,7 +153,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest { private static class RetryJobFail implements ComputeJob<Integer> { @Override - public Integer execute(JobExecutionContext context, Object... args) { + public CompletableFuture<Integer> executeAsync(JobExecutionContext context, Object... args) { AtomicInteger runTimes = (AtomicInteger) args[0]; runTimes.incrementAndGet(); throw new RuntimeException(); @@ -179,13 +181,13 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest { private static class RetryJobSuccess implements ComputeJob<Integer> { @Override - public Integer execute(JobExecutionContext context, Object... args) { + public CompletableFuture<Integer> executeAsync(JobExecutionContext context, Object... args) { AtomicInteger runTimes = (AtomicInteger) args[0]; int maxRetries = (int) args[1]; if (runTimes.incrementAndGet() <= maxRetries) { throw new RuntimeException(); } - return 0; + return completedFuture(0); } } @@ -212,9 +214,9 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest { private static class JobSuccess implements ComputeJob<Integer> { @Override - public Integer execute(JobExecutionContext context, Object... args) { + public CompletableFuture<Integer> executeAsync(JobExecutionContext context, Object... args) { AtomicInteger runTimes = (AtomicInteger) args[0]; - return runTimes.incrementAndGet(); + return completedFuture(runTimes.incrementAndGet()); } } @@ -235,8 +237,8 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest { private static class SimpleJob implements ComputeJob<Integer> { @Override - public Integer execute(JobExecutionContext context, Object... args) { - return 0; + public CompletableFuture<Integer> executeAsync(JobExecutionContext context, Object... args) { + return completedFuture(0); } } } diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/loader/JobClassLoaderFactoryTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/loader/JobClassLoaderFactoryTest.java index 4887d275ce..d6eeaa81c6 100644 --- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/loader/JobClassLoaderFactoryTest.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/loader/JobClassLoaderFactoryTest.java @@ -18,6 +18,8 @@ package org.apache.ignite.internal.compute.loader; import static org.apache.ignite.internal.testframework.IgniteTestUtils.getPath; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertSame; @@ -29,6 +31,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.DeploymentUnit; @@ -64,14 +67,14 @@ class JobClassLoaderFactoryTest extends BaseIgniteAbstractTest { // then classes from the first unit are loaded from the first class loader Class<?> clazz1 = classLoader1.loadClass(UNIT_JOB_CLASS_NAME); ComputeJob<Integer> job1 = (ComputeJob<Integer>) clazz1.getDeclaredConstructor().newInstance(); - Integer result1 = job1.execute(null); - assertEquals(1, result1); + CompletableFuture<Integer> result1 = job1.executeAsync(null); + assertThat(result1, willBe(1)); // and classes from the second unit are loaded from the second class loader Class<?> clazz2 = classLoader2.loadClass(UNIT_JOB_CLASS_NAME); ComputeJob<String> job2 = (ComputeJob<String>) clazz2.getDeclaredConstructor().newInstance(); - String result2 = job2.execute(null); - assertEquals("Hello World!", result2); + CompletableFuture<String> result2 = job2.executeAsync(null); + assertThat(result2, willBe("Hello World!")); } } @@ -90,8 +93,8 @@ class JobClassLoaderFactoryTest extends BaseIgniteAbstractTest { // and classes are loaded in the aplhabetical order ComputeJob<Integer> job1 = (ComputeJob<Integer>) unitJobClass.getDeclaredConstructor().newInstance(); - Integer result1 = job1.execute(null); - assertEquals(1, result1); + CompletableFuture<Integer> result1 = job1.executeAsync(null); + assertThat(result1, willBe(1)); Class<?> job1UtilityClass = classLoader.loadClass(JOB1_UTILITY_CLASS_NAME); assertNotNull(job1UtilityClass); diff --git a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java index adc87ffdc2..eb68ff790d 100644 --- a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java +++ b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.compute.queue; +import static java.util.concurrent.CompletableFuture.completedFuture; import static org.apache.ignite.compute.JobState.CANCELED; import static org.apache.ignite.compute.JobState.COMPLETED; import static org.apache.ignite.compute.JobState.EXECUTING; @@ -231,7 +232,7 @@ public class PriorityQueueExecutorTest extends BaseIgniteAbstractTest { try { latch.await(); } catch (InterruptedException e) { - return 0; + return completedFuture(0); } } }); @@ -254,7 +255,7 @@ public class PriorityQueueExecutorTest extends BaseIgniteAbstractTest { CountDownLatch latch = new CountDownLatch(1); QueueExecution<Object> execution = priorityQueueExecutor.submit(() -> { latch.await(); - return 0; + return completedFuture(0); }); JobStatus executingStatus = await().until(execution::status, jobStatusWithState(EXECUTING)); @@ -272,7 +273,7 @@ public class PriorityQueueExecutorTest extends BaseIgniteAbstractTest { void completedTaskCancel() { initExecutor(1); - QueueExecution<Object> execution = priorityQueueExecutor.submit(() -> 0); + QueueExecution<Object> execution = priorityQueueExecutor.submit(() -> completedFuture(0)); await().until(execution::status, jobStatusWithState(COMPLETED)); @@ -290,13 +291,13 @@ public class PriorityQueueExecutorTest extends BaseIgniteAbstractTest { CountDownLatch latch = new CountDownLatch(1); QueueExecution<Object> runningExecution = priorityQueueExecutor.submit(() -> { latch.await(); - return 0; + return completedFuture(0); }); await().until(runningExecution::status, jobStatusWithState(EXECUTING)); // Put the task in the queue - QueueExecution<Object> execution = priorityQueueExecutor.submit(() -> 0); + QueueExecution<Object> execution = priorityQueueExecutor.submit(() -> completedFuture(0)); await().until(execution::status, jobStatusWithState(QUEUED)); // Cancel the task @@ -340,7 +341,7 @@ public class PriorityQueueExecutorTest extends BaseIgniteAbstractTest { if (runTimes.incrementAndGet() <= maxRetries) { throw new RuntimeException(); } - return 0; + return completedFuture(0); }, 0, maxRetries); await().until(execution::status, jobStatusWithState(COMPLETED)); @@ -385,7 +386,7 @@ public class PriorityQueueExecutorTest extends BaseIgniteAbstractTest { QueueExecution<Integer> runningExecution3 = priorityQueueExecutor.submit(() -> { latch3.await(); - return 2; + return completedFuture(2); }, 1, 0); CompletableFuture<Integer> task3 = runningExecution3.resultAsync(); @@ -436,7 +437,7 @@ public class PriorityQueueExecutorTest extends BaseIgniteAbstractTest { QueueExecution<Integer> runningExecution = priorityQueueExecutor.submit(() -> { latch3.await(); - return 2; + return completedFuture(2); }, 1, 0); CompletableFuture<Integer> task3 = runningExecution.resultAsync(); @@ -491,7 +492,7 @@ public class PriorityQueueExecutorTest extends BaseIgniteAbstractTest { // Start two tasks QueueExecution<Integer> runningExecution1 = priorityQueueExecutor.submit(() -> { latch1.await(); - return 2; + return completedFuture(2); }, 1, 0); CompletableFuture<Integer> task1 = runningExecution1.resultAsync(); @@ -528,7 +529,7 @@ public class PriorityQueueExecutorTest extends BaseIgniteAbstractTest { // Start three tasks QueueExecution<Integer> runningExecution = priorityQueueExecutor.submit(() -> { latch1.await(); - return 2; + return completedFuture(2); }, 1, 0); CompletableFuture<Integer> task1 = runningExecution.resultAsync(); @@ -582,6 +583,6 @@ public class PriorityQueueExecutorTest extends BaseIgniteAbstractTest { } private <R> CompletableFuture<R> submit(Callable<R> job, int priority, int maxRetries) { - return priorityQueueExecutor.submit(job, priority, maxRetries).resultAsync(); + return priorityQueueExecutor.submit(() -> completedFuture(job.call()), priority, maxRetries).resultAsync(); } } diff --git a/modules/compute/src/unit1/java/org/apache/ignite/internal/compute/UnitJob.java b/modules/compute/src/unit1/java/org/apache/ignite/internal/compute/UnitJob.java index d1161122de..2b5fe0d9ca 100644 --- a/modules/compute/src/unit1/java/org/apache/ignite/internal/compute/UnitJob.java +++ b/modules/compute/src/unit1/java/org/apache/ignite/internal/compute/UnitJob.java @@ -17,13 +17,16 @@ package org.apache.ignite.internal.compute; +import static java.util.concurrent.CompletableFuture.completedFuture; + +import java.util.concurrent.CompletableFuture; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.JobExecutionContext; /** Compute job. */ public class UnitJob implements ComputeJob<Integer> { @Override - public Integer execute(JobExecutionContext context, Object... args) { - return 1; + public CompletableFuture<Integer> executeAsync(JobExecutionContext context, Object... args) { + return completedFuture(1); } } diff --git a/modules/compute/src/unit2/java/org/apache/ignite/internal/compute/UnitJob.java b/modules/compute/src/unit2/java/org/apache/ignite/internal/compute/UnitJob.java index 51163e21ae..631e225821 100644 --- a/modules/compute/src/unit2/java/org/apache/ignite/internal/compute/UnitJob.java +++ b/modules/compute/src/unit2/java/org/apache/ignite/internal/compute/UnitJob.java @@ -17,13 +17,16 @@ package org.apache.ignite.internal.compute; +import static java.util.concurrent.CompletableFuture.completedFuture; + +import java.util.concurrent.CompletableFuture; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.JobExecutionContext; /** Compute job. */ public class UnitJob implements ComputeJob<String> { @Override - public String execute(JobExecutionContext context, Object... args) { - return "Hello World!"; + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { + return completedFuture("Hello World!"); } } diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs index a3ee3a9c08..812e760dd1 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs @@ -416,7 +416,7 @@ namespace Apache.Ignite.Tests.Compute var str = ex.ToString(); StringAssert.Contains( - "at org.apache.ignite.internal.runner.app.PlatformTestNodeRunner$ExceptionJob.execute(PlatformTestNodeRunner.java:", + "at org.apache.ignite.internal.runner.app.PlatformTestNodeRunner$ExceptionJob.executeAsync(PlatformTestNodeRunner.java:", str); } diff --git a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java index 79b3435e9e..767fbc4a15 100644 --- a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java +++ b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/compute/ItComputeControllerTest.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.JobDescriptor; @@ -396,7 +397,7 @@ public class ItComputeControllerTest extends ClusterPerClassIntegrationTest { private static class BlockingJob implements ComputeJob<String> { /** {@inheritDoc} */ @Override - public String execute(JobExecutionContext context, Object... args) { + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { synchronized (LOCK) { try { LOCK.wait(); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java index 2539d24935..e8c6f98390 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/PlatformTestNodeRunner.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.runner.app; +import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.stream.Collectors.toList; import static org.apache.ignite.internal.catalog.commands.CatalogUtils.MAX_TIME_PRECISION; import static org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.createZone; @@ -550,12 +551,12 @@ public class PlatformTestNodeRunner { @SuppressWarnings("unused") // Used by platform tests. private static class CreateTableJob implements ComputeJob<String> { @Override - public String execute(JobExecutionContext context, Object... args) { + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { String tableName = (String) args[0]; context.ignite().sql().execute(null, "CREATE TABLE " + tableName + "(key BIGINT PRIMARY KEY, val INT)"); - return tableName; + return completedFuture(tableName); } } @@ -565,11 +566,11 @@ public class PlatformTestNodeRunner { @SuppressWarnings("unused") // Used by platform tests. private static class DropTableJob implements ComputeJob<String> { @Override - public String execute(JobExecutionContext context, Object... args) { + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { String tableName = (String) args[0]; context.ignite().sql().execute(null, "DROP TABLE " + tableName + ""); - return tableName; + return completedFuture(tableName); } } @@ -579,7 +580,7 @@ public class PlatformTestNodeRunner { @SuppressWarnings("unused") // Used by platform tests. private static class ExceptionJob implements ComputeJob<String> { @Override - public String execute(JobExecutionContext context, Object... args) { + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { throw new RuntimeException("Test exception: " + args[0]); } } @@ -590,7 +591,7 @@ public class PlatformTestNodeRunner { @SuppressWarnings("unused") // Used by platform tests. private static class CheckedExceptionJob implements ComputeJob<String> { @Override - public String execute(JobExecutionContext context, Object... args) { + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { throw new CompletionException(new IgniteCheckedException(Common.NODE_LEFT_ERR, "TestCheckedEx: " + args[0])); } } @@ -601,7 +602,7 @@ public class PlatformTestNodeRunner { @SuppressWarnings("unused") // Used by platform tests. private static class ColocationHashJob implements ComputeJob<Integer> { @Override - public Integer execute(JobExecutionContext context, Object... args) { + public CompletableFuture<Integer> executeAsync(JobExecutionContext context, Object... args) { var columnCount = (int) args[0]; var buf = (byte[]) args[1]; var timePrecision = (int) args[2]; @@ -712,7 +713,7 @@ public class PlatformTestNodeRunner { try { Row row = marsh.marshal(tuple); - return row.colocationHash(); + return completedFuture(row.colocationHash()); } catch (TupleMarshallerException e) { throw new RuntimeException(e); } @@ -725,7 +726,7 @@ public class PlatformTestNodeRunner { @SuppressWarnings("unused") // Used by platform tests. private static class TableRowColocationHashJob implements ComputeJob<Integer> { @Override - public Integer execute(JobExecutionContext context, Object... args) { + public CompletableFuture<Integer> executeAsync(JobExecutionContext context, Object... args) { String tableName = (String) args[0]; int i = (int) args[1]; Tuple key = Tuple.create().set("id", 1 + i).set("id0", 2L + i).set("id1", "3" + i); @@ -736,7 +737,7 @@ public class PlatformTestNodeRunner { TupleMarshaller marsh = view.marshaller(1); try { - return marsh.marshal(key).colocationHash(); + return completedFuture(marsh.marshal(key).colocationHash()); } catch (TupleMarshallerException e) { throw new RuntimeException(e); } @@ -749,7 +750,7 @@ public class PlatformTestNodeRunner { @SuppressWarnings("unused") // Used by platform tests. private static class EnableAuthenticationJob implements ComputeJob<Void> { @Override - public Void execute(JobExecutionContext context, Object... args) { + public CompletableFuture<Void> executeAsync(JobExecutionContext context, Object... args) { boolean enable = ((Integer) args[0]) != 0; @SuppressWarnings("resource") IgniteImpl ignite = (IgniteImpl) context.ignite(); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java index f6d310ae9a..4bf18bc6df 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.runner.app.client; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; import static org.apache.ignite.compute.JobState.CANCELED; import static org.apache.ignite.compute.JobState.COMPLETED; import static org.apache.ignite.compute.JobState.EXECUTING; @@ -27,6 +29,7 @@ import static org.apache.ignite.internal.testframework.matchers.CompletableFutur import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; import static org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR; import static org.apache.ignite.lang.ErrorGroups.Table.COLUMN_ALREADY_EXISTS_ERR; @@ -86,6 +89,7 @@ import org.apache.ignite.table.Tuple; import org.apache.ignite.table.mapper.Mapper; import org.hamcrest.Matcher; import org.hamcrest.Matchers; +import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; @@ -157,10 +161,13 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { assertThat(execution.changePriorityAsync(0), willBe(false)); } - @Test - void testCancelOnSpecificNodeAsync() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testCancelOnSpecificNodeAsync(boolean asyncJob) { int sleepMs = 1_000_000; - JobDescriptor sleepJob = JobDescriptor.builder(SleepJob.class).build(); + JobDescriptor sleepJob = JobDescriptor + .builder(asyncJob ? AsyncSleepJob.class : SleepJob.class) + .build(); JobExecution<String> execution1 = client().compute().submit(Set.of(node(0)), sleepJob, sleepMs); JobExecution<String> execution2 = client().compute().submit(Set.of(node(1)), sleepJob, sleepMs); @@ -326,19 +333,21 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { assertNull(cause.getCause()); // No stack trace by default. } - @Test - void testExceptionInJobPropagatesToClientWithClassAndMessageAsync() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testExceptionInJobPropagatesToClientWithClassAndMessageAsync(boolean asyncJob) { IgniteException cause = getExceptionInJobExecutionAsync( - client().compute().submit(Set.of(node(0)), JobDescriptor.builder(ExceptionJob.class).build()) + client().compute().submit(Set.of(node(0)), JobDescriptor.builder(ExceptionJob.class).build(), asyncJob) ); assertComputeExceptionWithClassAndMessage(cause); } - @Test - void testExceptionInJobPropagatesToClientWithClassAndMessageSync() { + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void testExceptionInJobPropagatesToClientWithClassAndMessageSync(boolean asyncJob) { IgniteException cause = getExceptionInJobExecutionSync( - () -> client().compute().execute(Set.of(node(0)), JobDescriptor.builder(ExceptionJob.class).build()) + () -> client().compute().execute(Set.of(node(0)), JobDescriptor.builder(ExceptionJob.class).build(), asyncJob) ); assertComputeExceptionWithClassAndMessage(cause); @@ -509,7 +518,7 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { assertThat(cause.getCause().getMessage(), containsString( "Caused by: java.lang.ArithmeticException: math err" + System.lineSeparator() + "\tat org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$" - + "ExceptionJob.execute(ItThinClientComputeTest.java:") + + "ExceptionJob.executeAsync(ItThinClientComputeTest.java:") ); } @@ -737,39 +746,47 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { private static class NodeNameJob implements ComputeJob<String> { @Override - public String execute(JobExecutionContext context, Object... args) { - return context.ignite().name() + Arrays.stream(args).map(Object::toString).collect(Collectors.joining("_")); + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { + return completedFuture( + context.ignite().name() + Arrays.stream(args).map(Object::toString).collect(Collectors.joining("_"))); } } private static class ConcatJob implements ComputeJob<String> { @Override - public String execute(JobExecutionContext context, Object... args) { + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { if (args == null) { - return null; + return nullCompletedFuture(); } - return Arrays.stream(args).map(o -> o == null ? "null" : o.toString()).collect(Collectors.joining("_")); + return completedFuture( + Arrays.stream(args).map(o -> o == null ? "null" : o.toString()).collect(Collectors.joining("_"))); } } private static class IgniteExceptionJob implements ComputeJob<String> { @Override - public String execute(JobExecutionContext context, Object... args) { + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { throw new CustomException(TRACE_ID, COLUMN_ALREADY_EXISTS_ERR, "Custom job error", null); } } private static class ExceptionJob implements ComputeJob<String> { @Override - public String execute(JobExecutionContext context, Object... args) { - throw new ArithmeticException("math err"); + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { + boolean asyncJob = args.length > 0 && (Boolean) args[0]; + + if (asyncJob) { + return failedFuture(new ArithmeticException("math err")); + } else { + throw new ArithmeticException("math err"); + } } } private static class EchoJob implements ComputeJob<Object> { @Override - public Object execute(JobExecutionContext context, Object... args) { + public CompletableFuture<Object> executeAsync(JobExecutionContext context, Object... args) { var value = args[0]; if (!(value instanceof byte[])) { @@ -778,13 +795,13 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { assertEquals(expectedString, valueString, "Unexpected string representation of value"); } - return args[0]; + return completedFuture(args[0]); } } private static class SleepJob implements ComputeJob<Void> { @Override - public Void execute(JobExecutionContext context, Object... args) { + public @Nullable CompletableFuture<Void> executeAsync(JobExecutionContext context, Object... args) { try { Thread.sleep((Integer) args[0]); } catch (InterruptedException e) { @@ -795,10 +812,23 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { } } + private static class AsyncSleepJob implements ComputeJob<Void> { + @Override + public @Nullable CompletableFuture<Void> executeAsync(JobExecutionContext context, Object... args) { + return CompletableFuture.runAsync(() -> { + try { + Thread.sleep((Integer) args[0]); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + } + private static class DecimalJob implements ComputeJob<BigDecimal> { @Override - public BigDecimal execute(JobExecutionContext context, Object... args) { - return new BigDecimal((String) args[0]).setScale((Integer) args[1], RoundingMode.HALF_UP); + public CompletableFuture<BigDecimal> executeAsync(JobExecutionContext context, Object... args) { + return completedFuture(new BigDecimal((String) args[0]).setScale((Integer) args[1], RoundingMode.HALF_UP)); } } diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientPartitionAwarenessTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientPartitionAwarenessTest.java index 8d7c85b5d3..ded59c2b37 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientPartitionAwarenessTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientPartitionAwarenessTest.java @@ -17,11 +17,13 @@ package org.apache.ignite.internal.runner.app.client; +import static java.util.concurrent.CompletableFuture.completedFuture; import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import org.apache.ignite.client.IgniteClient; import org.apache.ignite.compute.ComputeJob; @@ -121,9 +123,10 @@ public class ItThinClientPartitionAwarenessTest extends ItAbstractThinClientTest private static class NodeNameJob implements ComputeJob<String> { @Override - public String execute(JobExecutionContext context, Object... args) { + public CompletableFuture<String> executeAsync(JobExecutionContext context, Object... args) { //noinspection resource - return context.ignite().name() + Arrays.stream(args).map(Object::toString).collect(Collectors.joining("_")); + return completedFuture( + context.ignite().name() + Arrays.stream(args).map(Object::toString).collect(Collectors.joining("_"))); } } }