This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 13aacdd24f IGNITE-21250 Client compute should pass all candidate nodes 
(#3104)
13aacdd24f is described below

commit 13aacdd24fb8597fdf03655e1c3d765ed0d827ef
Author: Vadim Pakhnushev <8614891+valep...@users.noreply.github.com>
AuthorDate: Thu Feb 1 21:44:43 2024 +0300

    IGNITE-21250 Client compute should pass all candidate nodes (#3104)
---
 .../compute/ClientComputeExecuteRequest.java       | 39 ++++++++-----
 .../internal/client/compute/ClientCompute.java     | 28 ++++++----
 .../apache/ignite/client/fakes/FakeCompute.java    | 10 ++++
 modules/compute/build.gradle                       |  1 +
 .../compute/ItEmbeddedWorkerShutdownTest.java      | 28 ++++++++++
 .../compute/ItThinClientWorkerShutdownTest.java    | 45 +++++++++++++++
 .../internal/compute/ItWorkerShutdownTest.java     | 11 ++--
 .../ignite/internal/compute/ComputeComponent.java  | 22 ++++++++
 .../internal/compute/ComputeComponentImpl.java     | 43 +++++++++++++++
 .../internal/compute/ComputeJobFailover.java       |  1 +
 .../ignite/internal/compute/ExecutionManager.java  |  2 +-
 .../ignite/internal/compute/IgniteComputeImpl.java | 49 +++++++----------
 .../internal/compute/IgniteComputeInternal.java    | 27 +++++++++
 .../internal/compute/ComputeComponentImplTest.java |  8 +++
 .../internal/compute/IgniteComputeImplTest.java    | 19 +++++--
 .../cpp/ignite/client/compute/compute.cpp          | 22 +-------
 .../ignite/client/detail/compute/compute_impl.cpp  | 10 +++-
 .../ignite/client/detail/compute/compute_impl.h    |  7 ++-
 .../dotnet/Apache.Ignite.Tests/FakeServer.cs       | 15 ++++-
 .../Apache.Ignite/Internal/Compute/Compute.cs      | 64 ++++++++++++++--------
 .../org/apache/ignite/internal/app/IgniteImpl.java |  3 +-
 21 files changed, 339 insertions(+), 115 deletions(-)

diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
index 71f0d71c29..5b54080139 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteRequest.java
@@ -20,17 +20,19 @@ package org.apache.ignite.client.handler.requests.compute;
 import static 
org.apache.ignite.client.handler.requests.compute.ClientComputeGetStatusRequest.packJobStatus;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.client.handler.NotificationSender;
 import org.apache.ignite.compute.DeploymentUnit;
-import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobExecutionOptions;
 import org.apache.ignite.internal.client.proto.ClientMessagePacker;
 import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.compute.IgniteComputeInternal;
 import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterService;
 
 /**
@@ -50,29 +52,40 @@ public class ClientComputeExecuteRequest {
     public static CompletableFuture<Void> process(
             ClientMessageUnpacker in,
             ClientMessagePacker out,
-            IgniteCompute compute,
+            IgniteComputeInternal compute,
             ClusterService cluster,
-            NotificationSender notificationSender) {
-        var nodeName = in.tryUnpackNil() ? null : in.unpackString();
-
-        var node = nodeName == null
-                ? cluster.topologyService().localMember()
-                : cluster.topologyService().getByConsistentId(nodeName);
-
-        if (node == null) {
-            throw new IgniteException("Specified node is not present in the 
cluster: " + nodeName);
-        }
+            NotificationSender notificationSender
+    ) {
+        Set<ClusterNode> candidates = unpackCandidateNodes(in, cluster);
 
         List<DeploymentUnit> deploymentUnits = unpackDeploymentUnits(in);
         String jobClassName = in.unpackString();
         JobExecutionOptions options = 
JobExecutionOptions.builder().priority(in.unpackInt()).maxRetries(in.unpackInt()).build();
         Object[] args = unpackArgs(in);
 
-        JobExecution<Object> execution = compute.executeAsync(Set.of(node), 
deploymentUnits, jobClassName, options, args);
+        JobExecution<Object> execution = 
compute.executeAsyncWithFailover(candidates, deploymentUnits, jobClassName, 
options, args);
         sendResultAndStatus(execution, notificationSender);
         return execution.idAsync().thenAccept(out::packUuid);
     }
 
+    private static Set<ClusterNode> unpackCandidateNodes(ClientMessageUnpacker 
in, ClusterService cluster) {
+        int size = in.unpackInt();
+        Set<ClusterNode> nodes = new HashSet<>(size);
+
+        for (int i = 0; i < size; i++) {
+            String nodeName = in.unpackString();
+            ClusterNode node = 
cluster.topologyService().getByConsistentId(nodeName);
+
+            if (node == null) {
+                throw new IgniteException("Specified node is not present in 
the cluster: " + nodeName);
+            }
+
+            nodes.add(node);
+        }
+
+        return nodes;
+    }
+
     static void sendResultAndStatus(JobExecution<Object> execution, 
NotificationSender notificationSender) {
         execution.resultAsync().whenComplete((val, err) ->
                 execution.statusAsync().whenComplete((status, errStatus) ->
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index 0b0341f298..bf621dbab0 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -100,9 +100,7 @@ public class ClientCompute implements IgniteCompute {
             throw new IllegalArgumentException("nodes must not be empty.");
         }
 
-        ClusterNode node = randomNode(nodes);
-
-        return new ClientJobExecution<>(ch, executeOnOneNode(node, units, 
jobClassName, options, args));
+        return new ClientJobExecution<>(ch, executeOnNodesAsync(nodes, units, 
jobClassName, options, args));
     }
 
     /** {@inheritDoc} */
@@ -252,7 +250,9 @@ public class ClientCompute implements IgniteCompute {
         Map<ClusterNode, JobExecution<R>> map = new HashMap<>(nodes.size());
 
         for (ClusterNode node : nodes) {
-            ClientJobExecution<R> execution = new ClientJobExecution<>(ch, 
executeOnOneNode(node, units, jobClassName, options, args));
+            JobExecution<R> execution = new ClientJobExecution<>(ch, 
executeOnNodesAsync(
+                    Set.of(node), units, jobClassName, options, args
+            ));
             if (map.put(node, execution) != null) {
                 throw new IllegalStateException("Node can't be specified more 
than once: " + node);
             }
@@ -261,22 +261,19 @@ public class ClientCompute implements IgniteCompute {
         return map;
     }
 
-    private CompletableFuture<PayloadInputChannel> executeOnOneNode(
-            ClusterNode node,
+    private CompletableFuture<PayloadInputChannel> executeOnNodesAsync(
+            Set<ClusterNode> nodes,
             List<DeploymentUnit> units,
             String jobClassName,
             JobExecutionOptions options,
             Object[] args
     ) {
+        ClusterNode node = randomNode(nodes);
+
         return ch.serviceAsync(
                 ClientOp.COMPUTE_EXECUTE,
                 w -> {
-                    if 
(w.clientChannel().protocolContext().clusterNode().name().equals(node.name())) {
-                        w.out().packNil();
-                    } else {
-                        w.out().packString(node.name());
-                    }
-
+                    packNodeNames(w.out(), nodes);
                     packJob(w.out(), units, jobClassName, options, args);
                 },
                 ch -> ch,
@@ -408,6 +405,13 @@ public class ClientCompute implements IgniteCompute {
         return completedFuture(res);
     }
 
+    private static void packNodeNames(ClientMessagePacker w, Set<ClusterNode> 
nodes) {
+        w.packInt(nodes.size());
+        for (ClusterNode node : nodes) {
+            w.packString(node.name());
+        }
+    }
+
     private static void packJob(ClientMessagePacker w,
             List<DeploymentUnit> units,
             String jobClassName,
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index c3e69d688c..067d9dbc62 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -75,6 +75,16 @@ public class FakeCompute implements IgniteComputeInternal {
             String jobClassName,
             JobExecutionOptions options,
             Object... args) {
+        return executeAsyncWithFailover(nodes, units, jobClassName, options, 
args);
+    }
+
+    @Override
+    public <R> JobExecution<R> executeAsyncWithFailover(
+            Set<ClusterNode> nodes,
+            List<DeploymentUnit> units,
+            String jobClassName,
+            JobExecutionOptions options,
+            Object... args) {
         if (Objects.equals(jobClassName, GET_UNITS)) {
             String unitString = 
units.stream().map(DeploymentUnit::render).collect(Collectors.joining(","));
             return completedExecution((R) unitString);
diff --git a/modules/compute/build.gradle b/modules/compute/build.gradle
index b7a26d3821..5e3fc9041c 100644
--- a/modules/compute/build.gradle
+++ b/modules/compute/build.gradle
@@ -54,6 +54,7 @@ dependencies {
     integrationTestImplementation project(':ignite-cluster-management')
     integrationTestImplementation project(':ignite-table')
     integrationTestImplementation project(':ignite-placement-driver-api')
+    integrationTestImplementation project(':ignite-client')
     integrationTestImplementation testFixtures(project(':ignite-core'))
     integrationTestImplementation testFixtures(project(':ignite-runner'))
 }
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItEmbeddedWorkerShutdownTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItEmbeddedWorkerShutdownTest.java
new file mode 100644
index 0000000000..6d44bb4e3c
--- /dev/null
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItEmbeddedWorkerShutdownTest.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.app.IgniteImpl;
+
+class ItEmbeddedWorkerShutdownTest extends ItWorkerShutdownTest {
+    @Override
+    IgniteCompute compute(IgniteImpl entryNode) {
+        return entryNode.compute();
+    }
+}
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItThinClientWorkerShutdownTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItThinClientWorkerShutdownTest.java
new file mode 100644
index 0000000000..401a298220
--- /dev/null
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItThinClientWorkerShutdownTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.junit.jupiter.api.AfterEach;
+
+class ItThinClientWorkerShutdownTest extends ItWorkerShutdownTest {
+    private final Map<String, IgniteClient> clients = new HashMap<>();
+
+    @AfterEach
+    void cleanup() throws Exception {
+        for (IgniteClient igniteClient : clients.values()) {
+            igniteClient.close();
+        }
+        clients.clear();
+    }
+
+    @Override
+    IgniteCompute compute(IgniteImpl entryNode) {
+        String address = "127.0.0.1:" + entryNode.clientAddress().port();
+        IgniteClient client = 
IgniteClient.builder().addresses(address).build();
+        clients.put(address, client);
+        return client.compute();
+    }
+}
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java
index c76b6c0870..9befd4dbb7 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java
@@ -34,6 +34,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
+import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
@@ -55,7 +56,7 @@ import org.junit.jupiter.api.Test;
  * another node. This is not true for broadcast and local jobs. They should 
not be restarted.
  */
 @SuppressWarnings("resource")
-public class ItWorkerShutdownTest extends ClusterPerTestIntegrationTest {
+public abstract class ItWorkerShutdownTest extends 
ClusterPerTestIntegrationTest {
     /**
      * Map from node name to node index in {@link super#cluster}.
      */
@@ -208,7 +209,7 @@ public class ItWorkerShutdownTest extends 
ClusterPerTestIntegrationTest {
         InteractiveJobs.initChannels(allNodeNames());
 
         // When start broadcast job.
-        Map<ClusterNode, JobExecution<Object>> executions = 
entryNode.compute().broadcastAsync(
+        Map<ClusterNode, JobExecution<Object>> executions = 
compute(entryNode).broadcastAsync(
                 clusterNodesByNames(workerCandidates(node(0), node(1), 
node(2))),
                 List.of(),
                 InteractiveJobs.interactiveJobName()
@@ -286,7 +287,7 @@ public class ItWorkerShutdownTest extends 
ClusterPerTestIntegrationTest {
 
         // When start colocated job on node that is not primary replica.
         IgniteImpl entryNode = anyNodeExcept(primaryReplica);
-        TestingJobExecution<Object> execution = new 
TestingJobExecution<>(entryNode.compute().executeColocatedAsync(
+        TestingJobExecution<Object> execution = new 
TestingJobExecution<>(compute(entryNode).executeColocatedAsync(
                 TABLE_NAME,
                 Tuple.create(1).set("K", 1),
                 List.of(),
@@ -360,10 +361,12 @@ public class ItWorkerShutdownTest extends 
ClusterPerTestIntegrationTest {
 
     private TestingJobExecution<String> executeGlobalInteractiveJob(IgniteImpl 
entryNode, Set<String> nodes) {
         return new TestingJobExecution<>(
-                entryNode.compute().executeAsync(clusterNodesByNames(nodes), 
List.of(), InteractiveJobs.globalJob().name())
+                compute(entryNode).executeAsync(clusterNodesByNames(nodes), 
List.of(), InteractiveJobs.globalJob().name())
         );
     }
 
+    abstract IgniteCompute compute(IgniteImpl entryNode);
+
     private void createReplicatedTestTableWithOneRow() {
         // Number of replicas == number of nodes and number of partitions == 
1. This gives us the majority on primary replica stop.
         // After the primary replica is stopped we still be able to select new 
primary replica selected.
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
index 679000389e..3f88353d1f 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
@@ -104,6 +104,28 @@ public interface ComputeComponent extends IgniteComponent {
         return executeRemotely(ExecutionOptions.DEFAULT, remoteNode, units, 
jobClassName, args);
     }
 
+    /**
+     * Executes a job of the given class on a remote node. If the node leaves 
the cluster, it will be restarted on the node given by the
+     * {@code nextWorkerSelector}.
+     *
+     * @param remoteNode Name of the job class.
+     * @param nextWorkerSelector The selector that returns the next worker to 
execute job on.
+     * @param options Job execution options.
+     * @param units Deployment units which will be loaded for execution.
+     * @param jobClassName Name of the job class.
+     * @param args Job args.
+     * @param <R> Job result type.
+     * @return Future execution result.
+     */
+    <R> JobExecution<R> executeRemotelyWithFailover(
+            ClusterNode remoteNode,
+            NextWorkerSelector nextWorkerSelector,
+            List<DeploymentUnit> units,
+            String jobClassName,
+            ExecutionOptions options,
+            Object... args
+    );
+
     /**
      * Retrieves the current status of all jobs on all nodes in the cluster.
      *
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 19762aa0fc..df5ff79840 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -26,10 +26,14 @@ import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobStatus;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
 import org.apache.ignite.internal.compute.executor.ComputeExecutor;
 import org.apache.ignite.internal.compute.executor.JobExecutionInternal;
@@ -40,7 +44,11 @@ import 
org.apache.ignite.internal.compute.messaging.RemoteJobExecution;
 import org.apache.ignite.internal.future.InFlightFutures;
 import org.apache.ignite.internal.lang.IgniteInternalException;
 import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.MessagingService;
 import org.apache.ignite.network.TopologyService;
@@ -50,6 +58,8 @@ import org.jetbrains.annotations.Nullable;
  * Implementation of {@link ComputeComponent}.
  */
 public class ComputeComponentImpl implements ComputeComponent {
+    private static final IgniteLogger LOG = 
Loggers.forClass(ComputeComponentImpl.class);
+
     /** Busy lock to stop synchronously. */
     private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
 
@@ -58,6 +68,10 @@ public class ComputeComponentImpl implements 
ComputeComponent {
 
     private final InFlightFutures inFlightFutures = new InFlightFutures();
 
+    private final TopologyService topologyService;
+
+    private final LogicalTopologyService logicalTopologyService;
+
     private final JobContextManager jobContextManager;
 
     private final ComputeExecutor executor;
@@ -66,20 +80,29 @@ public class ComputeComponentImpl implements 
ComputeComponent {
 
     private final ExecutionManager executionManager;
 
+    private final ExecutorService failoverExecutor;
+
     /**
      * Creates a new instance.
      */
     public ComputeComponentImpl(
+            String nodeName,
             MessagingService messagingService,
             TopologyService topologyService,
+            LogicalTopologyService logicalTopologyService,
             JobContextManager jobContextManager,
             ComputeExecutor executor,
             ComputeConfiguration computeConfiguration
     ) {
+        this.topologyService = topologyService;
+        this.logicalTopologyService = logicalTopologyService;
         this.jobContextManager = jobContextManager;
         this.executor = executor;
         executionManager = new ExecutionManager(computeConfiguration, 
topologyService);
         messaging = new ComputeMessaging(executionManager, messagingService, 
topologyService);
+        failoverExecutor = Executors.newSingleThreadExecutor(
+                NamedThreadFactory.create(nodeName, "compute-job-failover", 
LOG)
+        );
     }
 
     /** {@inheritDoc} */
@@ -146,6 +169,25 @@ public class ComputeComponentImpl implements 
ComputeComponent {
         }
     }
 
+    @Override
+    public <R> JobExecution<R> executeRemotelyWithFailover(
+            ClusterNode remoteNode,
+            NextWorkerSelector nextWorkerSelector,
+            List<DeploymentUnit> units,
+            String jobClassName,
+            ExecutionOptions options,
+            Object... args
+    ) {
+        JobExecution<R> result = new ComputeJobFailover<R>(
+                this, logicalTopologyService, topologyService,
+                remoteNode, nextWorkerSelector, failoverExecutor, units,
+                jobClassName, options, args
+        ).failSafeExecute();
+
+        result.idAsync().thenAccept(jobId -> 
executionManager.addExecution(jobId, result));
+        return result;
+    }
+
     @Override
     public CompletableFuture<Collection<JobStatus>> statusesAsync() {
         return messaging.broadcastStatusesAsync();
@@ -189,6 +231,7 @@ public class ComputeComponentImpl implements 
ComputeComponent {
         executionManager.stop();
         messaging.stop();
         executor.stop();
+        IgniteUtils.shutdownAndAwaitTermination(failoverExecutor, 10, 
TimeUnit.SECONDS);
     }
 
     private <R> JobExecutionInternal<R> exec(JobContext context, 
ExecutionOptions options, String jobClassName, Object[] args) {
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
index 5a3f96d3ba..ad01f19dd4 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
@@ -91,6 +91,7 @@ class ComputeJobFailover<T> {
      * @param logicalTopologyService logical topology service.
      * @param topologyService physical topology service.
      * @param workerNode the node to execute the job on.
+     * @param nextWorkerSelector the selector that returns the next worker to 
execute job on.
      * @param executor the thread pool where the failover should run on.
      * @param units deployment units.
      * @param jobClassName the name of the job class.
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
index 064f9fa3ac..8d361aaddc 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ExecutionManager.java
@@ -102,7 +102,7 @@ public class ExecutionManager {
      */
     public CompletableFuture<Set<JobStatus>> localStatusesAsync() {
         CompletableFuture<JobStatus>[] statuses = executions.values().stream()
-                .filter(it -> !(it instanceof RemoteJobExecution))
+                .filter(it -> !(it instanceof RemoteJobExecution) && !(it 
instanceof FailSafeJobExecution))
                 .map(JobExecution::statusAsync)
                 .toArray(CompletableFuture[]::new);
 
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 5fdeb55292..cd6ff37165 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -33,8 +33,6 @@ import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.compute.ComputeException;
@@ -43,15 +41,11 @@ import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobExecutionOptions;
 import org.apache.ignite.compute.JobStatus;
-import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.hlc.HybridClock;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.table.IgniteTablesInternal;
 import org.apache.ignite.internal.table.TableViewInternal;
-import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.util.CompletableFutures;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.ErrorGroups.Compute;
@@ -67,8 +61,6 @@ import org.jetbrains.annotations.Nullable;
  * Implementation of {@link IgniteCompute}.
  */
 public class IgniteComputeImpl implements IgniteComputeInternal {
-    private static final IgniteLogger LOG = 
Loggers.forClass(IgniteComputeImpl.class);
-
     private static final String DEFAULT_SCHEMA_NAME = "PUBLIC";
 
     private final TopologyService topologyService;
@@ -79,39 +71,32 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal {
 
     private final ThreadLocalRandom random = ThreadLocalRandom.current();
 
-    private final LogicalTopologyService logicalTopologyService;
-
     private final PlacementDriver placementDriver;
 
     private final HybridClock clock;
 
-    private final Executor failoverExecutor;
-
     /**
      * Create new instance.
      */
     public IgniteComputeImpl(PlacementDriver placementDriver, TopologyService 
topologyService,
-            LogicalTopologyService logicalTopologyService, 
IgniteTablesInternal tables, ComputeComponent computeComponent,
+            IgniteTablesInternal tables, ComputeComponent computeComponent,
             HybridClock clock) {
         this.placementDriver = placementDriver;
         this.topologyService = topologyService;
         this.tables = tables;
         this.computeComponent = computeComponent;
-        this.logicalTopologyService = logicalTopologyService;
         this.clock = clock;
-        this.failoverExecutor = Executors.newFixedThreadPool(
-                1,
-                new NamedThreadFactory("compute-job-failover", LOG)
-        );
     }
 
     /** {@inheritDoc} */
     @Override
-    public <R> JobExecution<R> executeAsync(Set<ClusterNode> nodes,
+    public <R> JobExecution<R> executeAsync(
+            Set<ClusterNode> nodes,
             List<DeploymentUnit> units,
             String jobClassName,
             JobExecutionOptions options,
-            Object... args) {
+            Object... args
+    ) {
         Objects.requireNonNull(nodes);
         Objects.requireNonNull(units);
         Objects.requireNonNull(jobClassName);
@@ -121,9 +106,21 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal {
             throw new IllegalArgumentException("nodes must not be empty.");
         }
 
+        return executeAsyncWithFailover(nodes, units, jobClassName, options, 
args);
+    }
+
+    @Override
+    public <R> JobExecution<R> executeAsyncWithFailover(
+            Set<ClusterNode> nodes,
+            List<DeploymentUnit> units,
+            String jobClassName,
+            JobExecutionOptions options,
+            Object... args
+    ) {
         Set<ClusterNode> candidates = new HashSet<>(nodes);
         ClusterNode targetNode = randomNode(candidates);
         candidates.remove(targetNode);
+
         NextWorkerSelector selector = new DeqNextWorkerSelector(new 
ConcurrentLinkedDeque<>(candidates));
 
         return new JobExecutionWrapper<>(
@@ -169,18 +166,14 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal {
             NextWorkerSelector nextWorkerSelector,
             List<DeploymentUnit> units,
             String jobClassName,
-            JobExecutionOptions options,
+            JobExecutionOptions jobExecutionOptions,
             Object[] args
     ) {
-        ExecutionOptions executionOptions = ExecutionOptions.from(options);
+        ExecutionOptions options = ExecutionOptions.from(jobExecutionOptions);
         if (isLocal(targetNode)) {
-            return computeComponent.executeLocally(executionOptions, units, 
jobClassName, args);
+            return computeComponent.executeLocally(options, units, 
jobClassName, args);
         } else {
-            return new ComputeJobFailover<R>(
-                    computeComponent, logicalTopologyService, topologyService,
-                    targetNode, nextWorkerSelector, failoverExecutor, units,
-                    jobClassName, executionOptions, args
-            ).failSafeExecute();
+            return computeComponent.executeRemotelyWithFailover(targetNode, 
nextWorkerSelector, units, jobClassName, options, args);
         }
     }
 
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
index 417e07c236..32c7517afc 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
@@ -17,16 +17,43 @@
 
 package org.apache.ignite.internal.compute;
 
+import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.JobExecutionOptions;
 import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.network.ClusterNode;
 import org.jetbrains.annotations.Nullable;
 
 /**
  * Internal compute facade.
  */
 public interface IgniteComputeInternal extends IgniteCompute {
+    /**
+     * Executes a {@link ComputeJob} of the given class on a single node. If 
the node leaves the cluster, it will be restarted on one of the
+     * candidate nodes.
+     *
+     * @param <R> Job result type.
+     * @param nodes Candidate nodes; In case target node left the cluster, the 
job will be restarted on one of them.
+     * @param units Deployment units. Can be empty.
+     * @param jobClassName Name of the job class to execute.
+     * @param options Job execution options.
+     * @param args Arguments of the job.
+     * @return CompletableFuture Job result.
+     */
+    <R> JobExecution<R> executeAsyncWithFailover(
+            Set<ClusterNode> nodes,
+            List<DeploymentUnit> units,
+            String jobClassName,
+            JobExecutionOptions options,
+            Object... args
+    );
+
     /**
      * Gets job status by id.
      *
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index efe77210d8..08414159e2 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -73,6 +73,7 @@ import org.apache.ignite.compute.JobExecutionContext;
 import org.apache.ignite.compute.JobState;
 import org.apache.ignite.compute.JobStatus;
 import org.apache.ignite.compute.version.Version;
+import 
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
 import org.apache.ignite.internal.compute.configuration.ComputeConfiguration;
 import org.apache.ignite.internal.compute.executor.ComputeExecutor;
 import org.apache.ignite.internal.compute.executor.ComputeExecutorImpl;
@@ -131,6 +132,9 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
     @Mock(answer = RETURNS_DEEP_STUBS)
     private TopologyService topologyService;
 
+    @Mock
+    private LogicalTopologyService logicalTopologyService;
+
     @InjectConfiguration
     private ComputeConfiguration computeConfiguration;
 
@@ -201,8 +205,10 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
         computeExecutor = new ComputeExecutorImpl(ignite, stateMachine, 
computeConfiguration);
 
         computeComponent = new ComputeComponentImpl(
+                INSTANCE_NAME,
                 messagingService,
                 topologyService,
+                logicalTopologyService,
                 jobContextManager,
                 computeExecutor,
                 computeConfiguration
@@ -703,8 +709,10 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
                 willCompleteSuccessfully());
 
         computeComponent = new ComputeComponentImpl(
+                INSTANCE_NAME,
                 messagingService,
                 topologyService,
+                logicalTopologyService,
                 jobContextManager,
                 computeExecutor,
                 computeConfiguration
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
index eea0394208..67673b91b9 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/IgniteComputeImplTest.java
@@ -23,8 +23,10 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.AdditionalMatchers.aryEq;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.mock;
@@ -119,7 +121,7 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest {
                 willBe("remoteResponse")
         );
 
-        verify(computeComponent).executeRemotely(ExecutionOptions.DEFAULT, 
remoteNode, testDeploymentUnits, JOB_CLASS_NAME, "a",  42);
+        verifyExecuteRemotelyWithFailover(ExecutionOptions.DEFAULT);
     }
 
     @Test
@@ -148,7 +150,7 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest {
                 willBe("remoteResponse")
         );
 
-        verify(computeComponent).executeRemotely(expectedOptions, remoteNode, 
testDeploymentUnits, JOB_CLASS_NAME, "a",  42);
+        verifyExecuteRemotelyWithFailover(expectedOptions);
     }
 
     @Test
@@ -200,9 +202,16 @@ class IgniteComputeImplTest extends BaseIgniteAbstractTest 
{
                 .thenReturn(completedExecution("jobResponse"));
     }
 
-    private void respondWhenExecutingSimpleJobRemotely(ExecutionOptions 
executionOptions) {
-        when(computeComponent.executeRemotely(executionOptions, remoteNode, 
testDeploymentUnits, JOB_CLASS_NAME, "a", 42))
-                .thenReturn(completedExecution("remoteResponse"));
+    private void respondWhenExecutingSimpleJobRemotely(ExecutionOptions 
options) {
+        when(computeComponent.executeRemotelyWithFailover(
+                eq(remoteNode), any(), eq(testDeploymentUnits), 
eq(JOB_CLASS_NAME), eq(options), aryEq(new Object[]{"a", 42})
+        )).thenReturn(completedExecution("remoteResponse"));
+    }
+
+    private void verifyExecuteRemotelyWithFailover(ExecutionOptions options) {
+        verify(computeComponent).executeRemotelyWithFailover(
+                eq(remoteNode), any(), eq(testDeploymentUnits), 
eq(JOB_CLASS_NAME), eq(options), aryEq(new Object[]{"a", 42})
+        );
     }
 
     private static <R> JobExecution<R> completedExecution(R result) {
diff --git a/modules/platforms/cpp/ignite/client/compute/compute.cpp 
b/modules/platforms/cpp/ignite/client/compute/compute.cpp
index 2887dffa91..e7c0a18900 100644
--- a/modules/platforms/cpp/ignite/client/compute/compute.cpp
+++ b/modules/platforms/cpp/ignite/client/compute/compute.cpp
@@ -19,32 +19,15 @@
 #include "ignite/client/detail/argument_check_utils.h"
 #include "ignite/client/detail/compute/compute_impl.h"
 
-#include <random>
-
 namespace ignite {
 
-template<typename T>
-typename T::value_type get_random_element(const T &cont) {
-    static std::mutex randomMutex;
-    static std::random_device rd;
-    static std::mt19937 gen(rd());
-
-    assert(!cont.empty());
-
-    std::uniform_int_distribution<size_t> distrib(0, cont.size() - 1);
-
-    std::lock_guard<std::mutex> lock(randomMutex);
-
-    return cont[distrib(gen)];
-}
-
 void compute::execute_async(const std::vector<cluster_node> &nodes, const 
std::vector<deployment_unit> &units,
     std::string_view job_class_name, const std::vector<primitive> &args,
     ignite_callback<std::optional<primitive>> callback) {
     detail::arg_check::container_non_empty(nodes, "Nodes container");
     detail::arg_check::container_non_empty(job_class_name, "Job class name");
 
-    m_impl->execute_on_one_node(get_random_element(nodes), units, 
job_class_name, args, std::move(callback));
+    m_impl->execute_on_nodes(nodes, units, job_class_name, args, 
std::move(callback));
 }
 
 void compute::broadcast_async(const std::set<cluster_node> &nodes, const 
std::vector<deployment_unit> &units,
@@ -69,7 +52,8 @@ void compute::broadcast_async(const std::set<cluster_node> 
&nodes, const std::ve
     auto shared_res = 
std::make_shared<result_group>(std::int32_t(nodes.size()), std::move(callback));
 
     for (const auto &node : nodes) {
-        m_impl->execute_on_one_node(node, units, job_class_name, args, [node, 
shared_res](auto &&res) {
+        std::vector<cluster_node> candidates = { node };
+        m_impl->execute_on_nodes(candidates, units, job_class_name, args, 
[node, shared_res](auto &&res) {
             auto &val = *shared_res;
 
             std::lock_guard<std::mutex> lock(val.m_mutex);
diff --git 
a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp 
b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
index f30a623a78..e2233849e5 100644
--- a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
+++ b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.cpp
@@ -83,12 +83,16 @@ void write_units(protocol::writer &writer, const 
std::vector<deployment_unit> &u
     }
 }
 
-void compute_impl::execute_on_one_node(cluster_node node, const 
std::vector<deployment_unit> &units,
+void compute_impl::execute_on_nodes(const std::vector<cluster_node> &nodes, 
const std::vector<deployment_unit> &units,
     std::string_view job_class_name, const std::vector<primitive> &args,
     ignite_callback<std::optional<primitive>> callback) {
 
-    auto writer_func = [&node, job_class_name, &units, args](protocol::writer 
&writer) {
-        writer.write(node.get_name());
+    auto writer_func = [&nodes, job_class_name, &units, args](protocol::writer 
&writer) {
+        auto nodes_num = std::int32_t(nodes.size());
+        writer.write(nodes_num);
+        for (const auto &node : nodes) {
+            writer.write(node.get_name());
+        }
         write_units(writer, units);
         writer.write(job_class_name);
 
diff --git a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.h 
b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.h
index e2f701e73d..65c4b8a75b 100644
--- a/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.h
+++ b/modules/platforms/cpp/ignite/client/detail/compute/compute_impl.h
@@ -47,15 +47,16 @@ public:
         , m_tables(std::move(tables)) {}
 
     /**
-     * Executes a compute job represented by the given class on the specified 
node asynchronously.
+     * Executes a compute job represented by the given class on one of the 
specified node asynchronously. If the node leaves the cluster,
+     * it will be restarted on one of the candidate nodes.
      *
-     * @param node Node to use for the job execution.
+     * @param nodes Candidate node to use for the job execution.
      * @param units Deployment units. Can be empty.
      * @param job_class_name Java class name of the job to execute.
      * @param args Job arguments.
      * @param callback A callback called on operation completion with job 
execution result.
      */
-    void execute_on_one_node(cluster_node node, const 
std::vector<deployment_unit> &units,
+    void execute_on_nodes(const std::vector<cluster_node> &nodes, const 
std::vector<deployment_unit> &units,
         std::string_view job_class_name, const std::vector<primitive> &args,
         ignite_callback<std::optional<primitive>> callback);
 
diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
index 7a2ee27cf4..493a6ab897 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/FakeServer.cs
@@ -616,8 +616,19 @@ namespace Apache.Ignite.Tests
         private PooledArrayBuffer ComputeExecute(MsgPackReader reader, bool 
colocated = false)
         {
             // Colocated: table id, schema version, key.
-            // Else: node name.
-            reader.Skip(colocated ? 4 : 1);
+            // Else: node names.
+            if (colocated)
+            {
+                reader.Skip(4);
+            }
+            else
+            {
+                var namesCount = reader.ReadInt32();
+                for (int i = 0; i < namesCount; i++)
+                {
+                    reader.ReadString();
+                }
+            }
 
             var unitsCount = reader.TryReadNil() ? 0 : reader.ReadInt32();
             var units = new List<DeploymentUnit>(unitsCount);
diff --git a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs 
b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
index 6292e78f69..5d938590d0 100644
--- a/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/Internal/Compute/Compute.cs
@@ -69,7 +69,10 @@ namespace Apache.Ignite.Internal.Compute
             IgniteArgumentCheck.NotNull(nodes);
             IgniteArgumentCheck.NotNull(jobClassName);
 
-            return await ExecuteOnOneNode<T>(GetRandomNode(nodes), units, 
jobClassName, args).ConfigureAwait(false);
+            var nodesCol = GetNodesCollection(nodes);
+            IgniteArgumentCheck.Ensure(nodesCol.Count > 0, nameof(nodes), 
"Nodes can't be empty.");
+
+            return await ExecuteOnNodes<T>(nodesCol, units, jobClassName, 
args).ConfigureAwait(false);
         }
 
         /// <inheritdoc/>
@@ -121,7 +124,7 @@ namespace Apache.Ignite.Internal.Compute
 
             foreach (var node in nodes)
             {
-                var task = ExecuteOnOneNode<T>(node, units0, jobClassName, 
args);
+                var task = ExecuteOnNodes<T>(new[] { node }, units0, 
jobClassName, args);
 
                 res[node] = task;
             }
@@ -133,34 +136,26 @@ namespace Apache.Ignite.Internal.Compute
         public override string ToString() => 
IgniteToStringBuilder.Build(GetType());
 
         [SuppressMessage("Security", "CA5394:Do not use insecure randomness", 
Justification = "Secure random is not required here.")]
-        private static IClusterNode GetRandomNode(IEnumerable<IClusterNode> 
nodes)
+        private static IClusterNode GetRandomNode(ICollection<IClusterNode> 
nodes)
         {
-            var nodesCol = GetNodesCollection(nodes);
-
-            IgniteArgumentCheck.Ensure(nodesCol.Count > 0, nameof(nodes), 
"Nodes can't be empty.");
-
-            var idx = Random.Shared.Next(0, nodesCol.Count);
+            var idx = Random.Shared.Next(0, nodes.Count);
 
-            return nodesCol.ElementAt(idx);
+            return nodes.ElementAt(idx);
         }
 
         private static ICollection<IClusterNode> 
GetNodesCollection(IEnumerable<IClusterNode> nodes) =>
             nodes as ICollection<IClusterNode> ?? nodes.ToList();
 
-        private static void WriteUnits(IEnumerable<DeploymentUnit> units, 
PooledArrayBuffer buf)
+        private static void WriteEnumerable<T>(IEnumerable<T> items, 
PooledArrayBuffer buf, Action<T> writerFunc)
         {
             var w = buf.MessageWriter;
 
-            if (units.TryGetNonEnumeratedCount(out var count))
+            if (items.TryGetNonEnumeratedCount(out var count))
             {
                 w.Write(count);
-                foreach (var unit in units)
+                foreach (var item in items)
                 {
-                    IgniteArgumentCheck.NotNullOrEmpty(unit.Name);
-                    IgniteArgumentCheck.NotNullOrEmpty(unit.Version);
-
-                    w.Write(unit.Name);
-                    w.Write(unit.Version);
+                    writerFunc(item);
                 }
 
                 return;
@@ -171,24 +166,45 @@ namespace Apache.Ignite.Internal.Compute
             var countSpan = buf.GetSpan(5);
             buf.Advance(5);
 
-            foreach (var unit in units)
+            foreach (var item in items)
             {
                 count++;
-                w.Write(unit.Name);
-                w.Write(unit.Version);
+                writerFunc(item);
             }
 
             countSpan[0] = MsgPackCode.Array32;
             BinaryPrimitives.WriteInt32BigEndian(countSpan[1..], count);
         }
 
-        private async Task<T> ExecuteOnOneNode<T>(
-            IClusterNode node,
+        private static void WriteUnits(IEnumerable<DeploymentUnit> units, 
PooledArrayBuffer buf)
+        {
+            WriteEnumerable(units, buf, writerFunc: unit =>
+            {
+                IgniteArgumentCheck.NotNullOrEmpty(unit.Name);
+                IgniteArgumentCheck.NotNullOrEmpty(unit.Version);
+
+                var w = buf.MessageWriter;
+                w.Write(unit.Name);
+                w.Write(unit.Version);
+            });
+        }
+
+        private static void WriteNodeNames(IEnumerable<IClusterNode> nodes, 
PooledArrayBuffer buf)
+        {
+            WriteEnumerable(nodes, buf, writerFunc: node =>
+            {
+                var w = buf.MessageWriter;
+                w.Write(node.Name);
+            });
+        }
+
+        private async Task<T> ExecuteOnNodes<T>(
+            ICollection<IClusterNode> nodes,
             IEnumerable<DeploymentUnit> units,
             string jobClassName,
             object?[]? args)
         {
-            IgniteArgumentCheck.NotNull(node);
+            IClusterNode node = GetRandomNode(nodes);
 
             using var writer = ProtoCommon.GetMessageWriter();
             Write();
@@ -205,7 +221,7 @@ namespace Apache.Ignite.Internal.Compute
             {
                 var w = writer.MessageWriter;
 
-                w.Write(node.Name);
+                WriteNodeNames(nodes, writer);
                 WriteUnits(units, writer);
                 w.Write(jobClassName);
 
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index d23bc0dce7..ed2f4efe7c 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -703,8 +703,10 @@ public class IgniteImpl implements Ignite {
         ComputeConfiguration computeCfg = 
nodeConfigRegistry.getConfiguration(ComputeConfiguration.KEY);
         InMemoryComputeStateMachine stateMachine = new 
InMemoryComputeStateMachine(computeCfg, name);
         computeComponent = new ComputeComponentImpl(
+                name,
                 clusterSvc.messagingService(),
                 clusterSvc.topologyService(),
+                logicalTopologyService,
                 new JobContextManager(deploymentManagerImpl, 
deploymentManagerImpl.deploymentUnitAccessor(), new JobClassLoaderFactory()),
                 new ComputeExecutorImpl(this, stateMachine, computeCfg),
                 computeCfg
@@ -713,7 +715,6 @@ public class IgniteImpl implements Ignite {
         compute = new IgniteComputeImpl(
                 placementDriverMgr.placementDriver(),
                 clusterSvc.topologyService(),
-                logicalTopologyService,
                 distributedTblMgr,
                 computeComponent,
                 clock

Reply via email to