This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 2142bcdcb4 IGNITE-20850 Add compute job failover  (#2990)
2142bcdcb4 is described below

commit 2142bcdcb4a9af857942ddc16f12f8a83d6ebe55
Author: Aleksandr Pakhomov <apk...@gmail.com>
AuthorDate: Wed Jan 10 17:57:01 2024 +0300

    IGNITE-20850 Add compute job failover  (#2990)
    
    * ComputeJobFailover wraps the returned by IgniteComputeImpl
    Future. Failover captures the context of the job: workerNode,
    deployment units, jobClassName, etc.
    
    * ComputeJobFailover listens topology events and if the
    worker node has left the topology, the job will be restarted
    on one of the failover nodes. The future returned by
    IgniteComputeImpl will be completed/cancelled when the
    restarted job is completed/cancelled.
    
    ---------
    
    Co-authored-by: Vadim Pakhnushev <8614891+valep...@users.noreply.github.com>
---
 .../java/org/apache/ignite/lang/ErrorGroups.java   |   6 +
 modules/compute/build.gradle                       |   1 +
 ...aseEmbedded.java => ItComputeTestEmbedded.java} |   2 +-
 .../internal/compute/ItWorkerShutdownTest.java     | 562 +++++++++++++++++++++
 .../internal/compute/ComputeJobFailover.java       | 159 ++++++
 .../internal/compute/FailSafeJobExecution.java     | 175 +++++++
 .../ignite/internal/compute/IgniteComputeImpl.java |  42 +-
 .../internal/compute/NodeLeftEventsSource.java     |  54 ++
 .../internal/compute/RemoteExecutionContext.java   |  94 ++++
 9 files changed, 1089 insertions(+), 6 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java 
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index dfa3d3eca2..108ccb25b1 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -530,6 +530,12 @@ public class ErrorGroups {
 
         /** Compute job result not found error. */
         public static final int RESULT_NOT_FOUND_ERR = 
COMPUTE_ERR_GROUP.registerErrorCode((short) 7);
+
+        /** Compute job status can not be retrieved. */
+        public static final int FAIL_TO_GET_JOB_STATUS_ERR = 
COMPUTE_ERR_GROUP.registerErrorCode((short) 8);
+
+        /** Compute job failed. */
+        public static final int COMPUTE_JOB_FAILED_ERR = 
COMPUTE_ERR_GROUP.registerErrorCode((short) 9);
     }
 
     /** Catalog error group. */
diff --git a/modules/compute/build.gradle b/modules/compute/build.gradle
index 2802b64a07..f2c14fec74 100644
--- a/modules/compute/build.gradle
+++ b/modules/compute/build.gradle
@@ -50,6 +50,7 @@ dependencies {
     integrationTestImplementation project(':ignite-code-deployment')
     integrationTestImplementation project(':ignite-network')
     integrationTestImplementation project(':ignite-cluster-management')
+    integrationTestImplementation project(':ignite-table')
     integrationTestImplementation testFixtures(project(':ignite-core'))
     integrationTestImplementation testFixtures(project(':ignite-runner'))
 }
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestBaseEmbedded.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
similarity index 99%
rename from 
modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestBaseEmbedded.java
rename to 
modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
index 4123d9e16f..ee81c077bd 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestBaseEmbedded.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
@@ -52,7 +52,7 @@ import org.junit.jupiter.params.provider.MethodSource;
  * Integration tests for Compute functionality in embedded Ignite mode.
  */
 @SuppressWarnings("resource")
-class ItComputeTestBaseEmbedded extends ItComputeBaseTest {
+class ItComputeTestEmbedded extends ItComputeBaseTest {
     @Override
     protected List<DeploymentUnit> units() {
         return List.of();
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java
new file mode 100644
index 0000000000..2dcac4163c
--- /dev/null
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.compute.JobState.CANCELED;
+import static org.apache.ignite.compute.JobState.COMPLETED;
+import static org.apache.ignite.compute.JobState.EXECUTING;
+import static org.apache.ignite.compute.JobState.FAILED;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.in;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.table.Tuple;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration tests for worker node shutdown failover.
+ *
+ * <p>The logic is that if we run the job on remote node and this node has 
left the topology then we should restart a job on
+ * another node. This is not true for broadcast and local jobs. They should 
not be restarted.
+ */
+@SuppressWarnings("resource")
+class ItWorkerShutdownTest extends ClusterPerTestIntegrationTest {
+    /**
+     * ACK for {@link Signal#CONTINUE}. Returned by a job that has received 
the signal. Used to check that the job is alive.
+     */
+    private static final Object ack = new Object();
+
+    /**
+     * Map from node name to node index in {@link super#cluster}.
+     */
+    private static final Map<String, Integer> NODES_NAMES_TO_INDEXES = new 
HashMap<>();
+
+    /**
+     * Class-wide queue that is used as a communication channel between {@link 
GlobalInteractiveJob} and test code. You can send a signal to
+     * the job via this channel and get a response from the job via {@link 
#GLOBAL_CHANNEL}.
+     */
+    private static BlockingQueue<Signal> GLOBAL_SIGNALS = new 
LinkedBlockingQueue<>();
+
+    /**
+     * Class-wide queue that is used as a communication channel between {@link 
GlobalInteractiveJob} and test code. You can send a signal to
+     * the job via {@link #GLOBAL_SIGNALS} and get a response from the job via 
this channel.
+     */
+    private static BlockingQueue<Object> GLOBAL_CHANNEL = new 
LinkedBlockingQueue<>();
+
+    /**
+     * Node-specific queues that are used as a communication channel between 
{@link InteractiveJob} and test code. The semantics are the
+     * same as for {@link #GLOBAL_SIGNALS} except that each node has its own 
queue. So, test code can communicate with a
+     * {@link InteractiveJob} that is running on specific node.
+     */
+    private static Map<String, BlockingQueue<Signal>> NODE_SIGNALS = new 
ConcurrentHashMap<>();
+
+    /**
+     * Node-specific queues that are used as a communication channel between 
{@link InteractiveJob} and test code. The semantics are the
+     * same as for {@link #GLOBAL_CHANNEL} except that each node has its own 
queue. So, test code can communicate with a
+     * {@link InteractiveJob} that is running on specific node.
+     */
+    private static Map<String, BlockingQueue<Object>> NODE_CHANNELS = new 
ConcurrentHashMap<>();
+
+    /**
+     * Node-specific counters that are used to count how many times {@link 
InteractiveJob} has been run on specific node.
+     */
+    private static Map<String, Integer> INTERACTIVE_JOB_RUN_TIMES = new 
ConcurrentHashMap<>();
+
+    private static void checkAllInteractiveJobsCalledOnce() {
+        INTERACTIVE_JOB_RUN_TIMES.forEach((nodeName, runTimes) -> 
assertThat(runTimes, equalTo(1)));
+    }
+
+    private static void finishAllInteractiveJobs() {
+        NODE_SIGNALS.forEach((nodeName, channel) -> {
+            try {
+                channel.offer(Signal.RETURN, 10, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    private static void initChannels(List<String> nodes) {
+        for (String nodeName : nodes) {
+            NODE_CHANNELS.put(nodeName, new LinkedBlockingQueue<>());
+            NODE_SIGNALS.put(nodeName, new LinkedBlockingQueue<>());
+            INTERACTIVE_JOB_RUN_TIMES.put(nodeName, 0);
+        }
+    }
+
+    private static Set<String> workerCandidates(IgniteImpl... nodes) {
+        return Arrays.stream(nodes)
+                .map(IgniteImpl::node)
+                .map(ClusterNode::name)
+                .collect(Collectors.toSet());
+    }
+
+    private static void finishGlobalJob() {
+        GLOBAL_SIGNALS.offer(Signal.RETURN);
+    }
+
+    private static void checkGlobalInteractiveJobAlive(JobExecution<?> 
execution)
+            throws InterruptedException, ExecutionException, TimeoutException {
+        GLOBAL_SIGNALS.offer(Signal.CONTINUE);
+        assertThat(GLOBAL_CHANNEL.poll(10, TimeUnit.SECONDS), equalTo(ack));
+
+        assertThat(execution.resultAsync().isDone(), equalTo(false));
+        assertThat(idSync(execution), notNullValue());
+
+        // During the fob failover we might get a job that is restarted, the 
state will be not EXECUTING for some short time.
+        await().until(() -> execution.statusAsync().get(10, 
TimeUnit.SECONDS).state() == EXECUTING);
+    }
+
+    private static void checkInteractiveJobAlive(ClusterNode clusterNode, 
JobExecution<?> execution) {
+        NODE_SIGNALS.get(clusterNode.name()).offer(Signal.CONTINUE);
+        try {
+            assertThat(NODE_CHANNELS.get(clusterNode.name()).poll(10, 
TimeUnit.SECONDS), equalTo(ack));
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+
+        assertThat(execution.resultAsync().isDone(), equalTo(false));
+    }
+
+    private static JobStatus statusSync(JobExecution<String> execution) throws 
InterruptedException, ExecutionException, TimeoutException {
+        return execution.statusAsync().get(10, TimeUnit.SECONDS);
+    }
+
+    private Set<ClusterNode> clusterNodesByNames(Set<String> nodes) {
+        return nodes.stream()
+                .map(NODES_NAMES_TO_INDEXES::get)
+                .map(this::node)
+                .map(IgniteImpl::node)
+                .collect(Collectors.toSet());
+    }
+
+    private static UUID idSync(JobExecution<?> execution) throws 
InterruptedException, ExecutionException, TimeoutException {
+        return execution.idAsync().get(10, TimeUnit.SECONDS);
+    }
+
+    /**
+     * Initializes channels. Assumption: there is no any running job on the 
cluster.
+     */
+    @BeforeEach
+    void setUp() {
+        GLOBAL_SIGNALS.clear();
+        GLOBAL_CHANNEL.clear();
+        NODE_SIGNALS.clear();
+        NODE_CHANNELS.clear();
+        INTERACTIVE_JOB_RUN_TIMES.clear();
+        NODES_NAMES_TO_INDEXES.clear();
+
+        for (int i = 0; i < 3; i++) {
+            NODES_NAMES_TO_INDEXES.put(node(i).name(), i);
+        }
+
+        executeSql("DROP TABLE IF EXISTS PUBLIC.TEST");
+    }
+
+    @Test
+    void remoteExecutionWorkerShutdown() throws Exception {
+        // Given entry node.
+        IgniteImpl entryNode = node(0);
+        // And remote candidates to execute a job.
+        Set<String> remoteWorkerCandidates = workerCandidates(node(1), 
node(2));
+
+        // When execute job.
+        JobExecution<String> execution = 
executeGlobalInteractiveJob(entryNode, remoteWorkerCandidates);
+
+        // Then one of candidates became a worker and run the job.
+        String workerNodeName = getWorkerNodeNameFromGlobalInteractiveJob();
+        // And job is running.
+        checkGlobalInteractiveJobAlive(execution);
+
+        // And save state BEFORE worker has failed.
+        long createTimeBeforeFail = 
statusSync(execution).createTime().toEpochMilli();
+        long startTimeBeforeFail = 
statusSync(execution).startTime().toEpochMilli();
+        UUID jobIdBeforeFail = idSync(execution);
+
+        // When stop worker node.
+        stopNode(workerNodeName);
+        // And remove it from candidates.
+        remoteWorkerCandidates.remove(workerNodeName);
+
+        // Then the job is alive: it has been restarted on another candidate.
+        checkGlobalInteractiveJobAlive(execution);
+        // And remaining candidate was chosen as a failover worker.
+        String failoverWorker = getWorkerNodeNameFromGlobalInteractiveJob();
+        assertThat(remoteWorkerCandidates, hasItem(failoverWorker));
+
+        // And check create time was not changed but start time changed.
+        long createTimeAfterFail = 
statusSync(execution).createTime().toEpochMilli();
+        long startTimeAfterFail = 
statusSync(execution).startTime().toEpochMilli();
+        assertThat(createTimeAfterFail, equalTo(createTimeBeforeFail));
+        assertThat(startTimeAfterFail, greaterThan(startTimeBeforeFail));
+        // And id was not changed
+        assertThat(idSync(execution), equalTo(jobIdBeforeFail));
+
+        // When finish job.
+        finishGlobalJob();
+
+        // Then it is successfully finished.
+        assertThat(execution.resultAsync().get(10, TimeUnit.SECONDS), 
equalTo("Done"));
+        // And.
+        assertThat(execution.statusAsync().get().state(), is(COMPLETED));
+        // And finish time is greater then create time and start time.
+        long finishTime = 
execution.statusAsync().get().finishTime().toEpochMilli();
+        assertThat(finishTime, greaterThan(createTimeAfterFail));
+        assertThat(finishTime, greaterThan(startTimeAfterFail));
+        // And job id the same
+        assertThat(idSync(execution), equalTo(jobIdBeforeFail));
+    }
+
+    @Test
+    void remoteExecutionSingleWorkerShutdown() throws Exception {
+        // Given.
+        IgniteImpl entryNode = node(0);
+        // And only one remote candidate to execute a job.
+        Set<String> remoteWorkerCandidates = workerCandidates(node(1));
+
+        // When execute job.
+        JobExecution<String> execution = 
executeGlobalInteractiveJob(entryNode, remoteWorkerCandidates);
+
+        // Then the job is running on worker node.
+        String workerNodeName = getWorkerNodeNameFromGlobalInteractiveJob();
+        assertThat(remoteWorkerCandidates, hasItem(workerNodeName));
+        // And.
+        checkGlobalInteractiveJobAlive(execution);
+
+        // When stop worker node.
+        stopNode(workerNodeName);
+
+        // Then the job is failed, because there is no any failover worker.
+        assertThat(execution.resultAsync(), willThrow(IgniteException.class));
+        assertThat(execution.statusAsync().isCompletedExceptionally(), 
equalTo(true));
+    }
+
+    @Test
+    void localExecutionWorkerShutdown() throws Exception {
+        // Given entry node.
+        IgniteImpl entryNode = node(0);
+
+        // When execute job locally.
+        JobExecution<String> execution = 
executeGlobalInteractiveJob(entryNode, Set.of(entryNode.name()));
+
+        // Then the job is running.
+        checkGlobalInteractiveJobAlive(execution);
+
+        // And it is running on entry node.
+        assertThat(getWorkerNodeNameFromGlobalInteractiveJob(), 
equalTo(entryNode.name()));
+
+        // When stop entry node.
+        stopNode(entryNode.name());
+
+        // Then the job is failed, because there is no any failover worker.
+        assertThat(execution.resultAsync().isCompletedExceptionally(), 
equalTo(true));
+        assertThat(execution.statusAsync().get().state(), is(FAILED));
+    }
+
+    @Test
+    void broadcastExecutionWorkerShutdown() {
+        // Given entry node.
+        IgniteImpl entryNode = node(0);
+        // And prepare communication channels.
+        initChannels(allNodeNames());
+
+        // When start broadcast job.
+        Map<ClusterNode, JobExecution<Object>> executions = 
entryNode.compute().broadcastAsync(
+                clusterNodesByNames(workerCandidates(node(0), node(1), 
node(2))),
+                List.of(),
+                InteractiveJob.class.getName()
+        );
+
+        // Then all three jobs are alive.
+        assertThat(executions.size(), is(3));
+        executions.forEach(ItWorkerShutdownTest::checkInteractiveJobAlive);
+
+        // When stop one of workers.
+        stopNode(node(1).name());
+
+        // Then two jobs are alive.
+        executions.forEach((node, execution) -> {
+            if (node.name().equals(node(1).name())) {
+                assertThat(execution.resultAsync(), 
willThrow(IgniteException.class));
+            } else {
+                checkInteractiveJobAlive(node, execution);
+            }
+        });
+
+        // When.
+        finishAllInteractiveJobs();
+
+        // Then every job ran once because broadcast execution does not 
require failover.
+        checkAllInteractiveJobsCalledOnce();
+    }
+
+    @Test
+    void cancelRemoteExecutionOnRestartedJob() throws Exception {
+        // Given entry node.
+        IgniteImpl entryNode = node(0);
+        // And remote candidates to execute a job.
+        Set<String> remoteWorkerCandidates = workerCandidates(node(1), 
node(2));
+
+        // When execute job.
+        JobExecution<String> execution = 
executeGlobalInteractiveJob(entryNode, remoteWorkerCandidates);
+
+        // Then one of candidates became a worker and run the job.
+        String workerNodeName = getWorkerNodeNameFromGlobalInteractiveJob();
+        // And job is running.
+        checkGlobalInteractiveJobAlive(execution);
+
+        // When stop worker node.
+        stopNode(workerNodeName);
+        // And remove it from candidates.
+        remoteWorkerCandidates.remove(workerNodeName);
+
+        // Then the job is alive: it has been restarted on another candidate.
+        checkGlobalInteractiveJobAlive(execution);
+        // And remaining candidate was chosen as a failover worker.
+        String failoverWorker = getWorkerNodeNameFromGlobalInteractiveJob();
+        assertThat(remoteWorkerCandidates, hasItem(failoverWorker));
+
+        // When cancel job.
+        execution.cancelAsync().get(10, TimeUnit.SECONDS);
+
+        // Then it is cancelled.
+        assertThat(execution.resultAsync(), willThrow(IgniteException.class));
+        // And.
+        assertThat(statusSync(execution).state(), is(CANCELED));
+    }
+
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-20864";)
+    void colocatedExecutionWorkerShutdown() throws Exception {
+        // Given table.
+        initChannels(allNodeNames());
+        createTestTableWithOneRow();
+        TableImpl table = (TableImpl) node(0).tables().table("test");
+        // And partition leader for K=1.
+        ClusterNode leader = 
table.leaderAssignment(table.partition(Tuple.create(1).set("K", 1)));
+
+        // When start colocated job on node that is not leader.
+        IgniteImpl entryNode = anyNodeExcept(leader);
+        JobExecution<Object> execution = 
entryNode.compute().executeColocatedAsync(
+                "test",
+                Tuple.create(1).set("K", 1),
+                List.of(),
+                GlobalInteractiveJob.class.getName()
+        );
+
+        // Then the job is alive.
+        checkGlobalInteractiveJobAlive(execution);
+
+        // And it is running on leader node.
+        String firstWorkerNodeName = 
getWorkerNodeNameFromGlobalInteractiveJob();
+        assertThat(firstWorkerNodeName, equalTo(leader.name()));
+        // And leader node is NOT an entry node, it is remote.
+        assertThat(entryNode.name(), not(equalTo(firstWorkerNodeName)));
+
+        // When stop worker node.
+        stopNode(nodeByName(firstWorkerNodeName));
+
+        // Then the job is restarted on another node.
+        checkGlobalInteractiveJobAlive(execution);
+
+        // And it is running on another node.
+        String failoverNodeName = getWorkerNodeNameFromGlobalInteractiveJob();
+        assertThat(failoverNodeName, in(allNodeNames()));
+        // But.
+        assertThat(failoverNodeName, not(equalTo(firstWorkerNodeName)));
+
+    }
+
+    private void stopNode(IgniteImpl ignite) {
+        stopNode(ignite.name());
+    }
+
+    private void stopNode(String name) {
+        int ind = NODES_NAMES_TO_INDEXES.get(name);
+        node(ind).stop();
+    }
+
+    private IgniteImpl anyNodeExcept(ClusterNode except) {
+        String candidateName = allNodeNames()
+                .stream()
+                .filter(name -> !name.equals(except.name()))
+                .findFirst()
+                .orElseThrow();
+
+        return nodeByName(candidateName);
+    }
+
+    private IgniteImpl nodeByName(String candidateName) {
+        return cluster.runningNodes().filter(node -> 
node.name().equals(candidateName)).findFirst().orElseThrow();
+    }
+
+    private String getWorkerNodeNameFromGlobalInteractiveJob() throws 
InterruptedException {
+        GLOBAL_SIGNALS.offer(Signal.GET_WORKER_NAME);
+        return (String) GLOBAL_CHANNEL.poll(10, TimeUnit.SECONDS);
+    }
+
+    private JobExecution<String> executeGlobalInteractiveJob(IgniteImpl 
entryNode, Set<String> nodes) {
+        return entryNode.compute().executeAsync(clusterNodesByNames(nodes), 
List.of(), GlobalInteractiveJob.class.getName());
+    }
+
+    private void createTestTableWithOneRow() {
+        executeSql("CREATE TABLE test (k int, v int, CONSTRAINT PK PRIMARY KEY 
(k))");
+        executeSql("INSERT INTO test(k, v) VALUES (1, 101)");
+    }
+
+    private List<String> allNodeNames() {
+        return IntStream.range(0, initialNodes())
+                .mapToObj(this::node)
+                .map(Ignite::name)
+                .collect(toList());
+    }
+
+    /**
+     * Signals that are sent by test code to the jobs.
+     */
+    enum Signal {
+        /**
+         * Signal to the job to continue running and send ACK as a response.
+         */
+        CONTINUE,
+        /**
+         * Ask job to throw an exception.
+         */
+        THROW,
+        /**
+         * Ask job to return result.
+         */
+        RETURN,
+        /**
+         * Signal to the job to continue running and send current worker name 
to the response channel.
+         */
+        GET_WORKER_NAME
+    }
+
+    /**
+     * Interactive job that communicates via {@link #GLOBAL_CHANNEL} and 
{@link #GLOBAL_SIGNALS}.
+     */
+    static class GlobalInteractiveJob implements ComputeJob<String> {
+        private static Signal listenSignal() {
+            Signal recievedSignal;
+            try {
+                recievedSignal = GLOBAL_SIGNALS.take();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            return recievedSignal;
+        }
+
+        @Override
+        public String execute(JobExecutionContext context, Object... args) {
+            while (true) {
+                Signal recievedSignal = listenSignal();
+                switch (recievedSignal) {
+                    case THROW:
+                        throw new RuntimeException();
+                    case CONTINUE:
+                        GLOBAL_CHANNEL.offer(ack);
+                        break;
+                    case RETURN:
+                        return "Done";
+                    case GET_WORKER_NAME:
+                        GLOBAL_CHANNEL.add(context.ignite().name());
+                        break;
+                    default:
+                        throw new IllegalStateException("Unexpected value: " + 
recievedSignal);
+                }
+            }
+        }
+    }
+
+    /**
+     * Interactive job that communicates via {@link #NODE_CHANNELS} and {@link 
#NODE_SIGNALS}. Also, keeps track of how many times it was
+     * executed via {@link #INTERACTIVE_JOB_RUN_TIMES}.
+     */
+    static class InteractiveJob implements ComputeJob<String> {
+        private static Signal listenSignal(BlockingQueue<Signal> channel) {
+            Signal recievedSignal = null;
+            try {
+                recievedSignal = channel.take();
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            return recievedSignal;
+        }
+
+        @Override
+        public String execute(JobExecutionContext context, Object... args) {
+            String workerNodeName = context.ignite().name();
+
+            INTERACTIVE_JOB_RUN_TIMES.put(workerNodeName, 
INTERACTIVE_JOB_RUN_TIMES.get(workerNodeName) + 1);
+            BlockingQueue<Signal> channel = NODE_SIGNALS.get(workerNodeName);
+
+            while (true) {
+                Signal recievedSignal = listenSignal(channel);
+                switch (recievedSignal) {
+                    case THROW:
+                        throw new RuntimeException();
+                    case CONTINUE:
+                        NODE_CHANNELS.get(workerNodeName).offer(ack);
+                        break;
+                    case RETURN:
+                        return "Done";
+                    case GET_WORKER_NAME:
+                        
NODE_CHANNELS.get(workerNodeName).add(context.ignite().name());
+                        break;
+                    default:
+                        throw new IllegalStateException("Unexpected value: " + 
recievedSignal);
+                }
+            }
+        }
+    }
+}
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
new file mode 100644
index 0000000000..f6cbea769c
--- /dev/null
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import org.apache.ignite.compute.DeploymentUnit;
+import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.lang.ErrorGroups.Compute;
+import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This is a helper class for {@link ComputeComponent} to handle job failures. 
You can think about this class as a "retryable compute job
+ * with captured context". Retry logic is applied ONLY if the worker node 
leaves the cluster. If the job itself is failing, then the
+ * exception is propagated to the caller and this class does not handle it.
+ *
+ * <p>If you want to execute a job on node1 and use node2 and node3 as 
failover candidates,
+ * then you should create an instance of this class with workerNode = node1, 
failoverCandidates = [node2, node3] as arguments and
+ * call {@link #failSafeExecute()}.
+ *
+ * @param <T> the type of the result of the job.
+ */
+class ComputeJobFailover<T> {
+    private static final IgniteLogger LOG = 
Loggers.forClass(ComputeJobFailover.class);
+
+    /**
+     * Compute component that is called when the {@link #runningWorkerNode} 
has left the cluster.
+     */
+    private final ComputeComponent computeComponent;
+
+    /**
+     * The failover listens this event source to know when the {@link 
#runningWorkerNode} has left the cluster.
+     */
+    private final NodeLeftEventsSource nodeLeftEventsSource;
+
+    /**
+     * Set of worker candidates in case {@link #runningWorkerNode} has left 
the cluster.
+     */
+    private final ConcurrentLinkedDeque<ClusterNode> failoverCandidates;
+
+    /**
+     * The node where the job is being executed at a given moment. If node 
leaves the cluster, then the job is restarted on one of the
+     * {@link #failoverCandidates} and the reference is CASed to the new node.
+     */
+    private final AtomicReference<ClusterNode> runningWorkerNode;
+
+    /**
+     * Context of the called job. Captures deployment units, jobClassName and 
arguments.
+     */
+    private final RemoteExecutionContext<T> jobContext;
+
+    /**
+     * Creates a per-job instance.
+     *
+     * @param computeComponent compute component.
+     * @param nodeLeftEventsSource node left events source, used as dynamic 
topology service (allows to remove handlers).
+     * @param workerNode the node to execute the job on.
+     * @param failoverCandidates the set of nodes where the job can be 
restarted if the worker node leaves the cluster.
+     * @param units deployment units.
+     * @param jobClassName the name of the job class.
+     * @param args the arguments of the job.
+     */
+    ComputeJobFailover(
+            ComputeComponent computeComponent,
+            NodeLeftEventsSource nodeLeftEventsSource,
+            ClusterNode workerNode,
+            Set<ClusterNode> failoverCandidates,
+            List<DeploymentUnit> units,
+            String jobClassName,
+            Object... args
+    ) {
+        this.computeComponent = computeComponent;
+        this.nodeLeftEventsSource = nodeLeftEventsSource;
+        this.runningWorkerNode = new AtomicReference<>(workerNode);
+        this.failoverCandidates = new 
ConcurrentLinkedDeque<>(failoverCandidates);
+        this.jobContext = new RemoteExecutionContext<>(units, jobClassName, 
args);
+    }
+
+    /**
+     * Executes a job on the worker node and restarts the job on one of the 
failover candidates if the worker node leaves the cluster.
+     *
+     * @return JobExecution with the result of the job and the status of the 
job.
+     */
+    JobExecution<T> failSafeExecute() {
+        JobExecution<T> remoteJobExecution = launchJobOn(runningWorkerNode);
+        jobContext.initJobExecution(new 
FailSafeJobExecution<>(remoteJobExecution));
+
+        Consumer<ClusterNode> handler = new OnNodeLeft();
+        nodeLeftEventsSource.addEventHandler(handler);
+        remoteJobExecution.resultAsync().whenComplete((r, e) -> 
nodeLeftEventsSource.removeEventHandler(handler));
+
+        return jobContext.failSafeJobExecution();
+    }
+
+    private JobExecution<T> launchJobOn(AtomicReference<ClusterNode> 
runningWorkerNode) {
+        return computeComponent.executeRemotely(runningWorkerNode.get(), 
jobContext.units(), jobContext.jobClassName(), jobContext.args());
+    }
+
+    class OnNodeLeft implements Consumer<ClusterNode> {
+        @Override
+        public void accept(ClusterNode leftNode) {
+            if (!runningWorkerNode.get().equals(leftNode)) {
+                return;
+            }
+
+            ClusterNode nextWorkerCandidate = takeCandidate();
+            if (nextWorkerCandidate == null) {
+                LOG.warn("No more worker nodes to restart the job. Failing the 
job {}.", jobContext.jobClassName());
+
+                FailSafeJobExecution<?> failSafeJobExecution = 
jobContext.failSafeJobExecution();
+                failSafeJobExecution.completeExceptionally(
+                        new 
IgniteInternalException(Compute.COMPUTE_JOB_FAILED_ERR)
+                );
+                return;
+            }
+
+            LOG.warn(
+                    "Worker node {} has left the cluster. Restarting the job 
{} on node {}.",
+                    leftNode, jobContext.jobClassName(), nextWorkerCandidate
+            );
+
+            runningWorkerNode.set(nextWorkerCandidate);
+            JobExecution<T> jobExecution = launchJobOn(runningWorkerNode);
+            jobContext.updateJobExecution(jobExecution);
+        }
+
+        @Nullable
+        private ClusterNode takeCandidate() {
+            try {
+                return failoverCandidates.pop();
+            } catch (NoSuchElementException ex) {
+                return null;
+            }
+        }
+    }
+}
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
new file mode 100644
index 0000000000..a02b59460f
--- /dev/null
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/FailSafeJobExecution.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import java.time.Instant;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.JobState;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+
+/**
+ * Fail-safe wrapper for the {@link JobExecution} that should be returned to 
the client. This wrapper holds the original
+ * job execution object. This object can be updated during the lifetime of 
{@link FailSafeJobExecution}.
+ *
+ * <p>The problem that is solved by this wrapper is the following: client can 
join the {@link JobExecution#resultAsync()}
+ * future but this original future will never be completed in case the remote 
worker node has left the topology. By returning
+ * {@link FailSafeJobExecution} to the client we can update the original job 
execution object when it is restarted on another
+ * node but the client will still be able to join the original future.
+ *
+ * @param <T> the type of the job result.
+ */
+class FailSafeJobExecution<T> implements JobExecution<T> {
+    private static final IgniteLogger LOG = 
Loggers.forClass(FailSafeJobExecution.class);
+
+    /**
+     * Exception that was thrown during the job execution. It can be set only 
once.
+     */
+    private final AtomicReference<Throwable> exception = new 
AtomicReference<>(null);
+
+    /**
+     * The future that is returned as {@link JobExecution#resultAsync()} and 
will be resolved when the job is completed.
+     */
+    private final CompletableFuture<T> resultFuture;
+
+    /**
+     * The status of the first job execution attempt. It is used to preserve 
the original job creation time.
+     */
+    private final AtomicReference<JobStatus> capturedStatus;
+
+    /**
+     * Link to the current job execution object. It can be updated when the 
job is restarted on another node.
+     */
+    private final AtomicReference<JobExecution<T>> runningJobExecution;
+
+    FailSafeJobExecution(JobExecution<T> runningJobExecution) throws 
RuntimeException {
+        this.resultFuture = new CompletableFuture<>();
+        this.runningJobExecution = new AtomicReference<>(runningJobExecution);
+
+        this.capturedStatus = new AtomicReference<>(null);
+        captureStatus(runningJobExecution);
+
+        registerCompleteHook();
+    }
+
+    private void captureStatus(JobExecution<T> runningJobExecution) {
+        try {
+            runningJobExecution.statusAsync().whenComplete((status, e) -> {
+                if (status != null) {
+                    this.capturedStatus.set(status);
+                } else {
+                    this.capturedStatus.set(
+                            failedStatus()
+                    );
+                }
+            }).get(10, TimeUnit.SECONDS);
+        } catch (TimeoutException | InterruptedException | ExecutionException 
e) {
+            this.capturedStatus.set(
+                    failedStatus()
+            );
+        }
+    }
+
+    private static JobStatus failedStatus() {
+        return 
JobStatus.builder().id(UUID.randomUUID()).createTime(Instant.now()).state(JobState.FAILED).build();
+    }
+
+    /**
+     * Registers a hook for the future that is returned to the user. This 
future will be completed when the job is completed.
+     */
+    private void registerCompleteHook() {
+        runningJobExecution.get().resultAsync().whenComplete((res, err) -> {
+            if (err == null) {
+                resultFuture.complete(res);
+            } else {
+                resultFuture.completeExceptionally(err);
+            }
+        });
+    }
+
+    void updateJobExecution(JobExecution<T> jobExecution) {
+        LOG.debug("Updating job execution: {}", jobExecution);
+
+        runningJobExecution.set(jobExecution);
+        registerCompleteHook();
+    }
+
+    /**
+     * Transforms the status by modifying the fields that should be always the 
same regardless of the job execution attempt.
+     * For example, the job creation time should be the same for all attempts.
+     *
+     * @param jobStatus current job status.
+     *
+     * @return transformed job status.
+     */
+    private JobStatus transformStatus(JobStatus jobStatus) {
+        return jobStatus.toBuilder()
+                .createTime(capturedStatus.get().createTime())
+                .id(capturedStatus.get().id())
+                .build();
+    }
+
+    @Override
+    public CompletableFuture<T> resultAsync() {
+        return resultFuture;
+    }
+
+    /**
+     * Returns the transformed status of the running job execution. The 
transformation is needed because we do not want to change
+     * some fields of the status (e.g. creation time) when the job is 
restarted.
+     *
+     * @return the transformed status.
+     */
+    @Override
+    public CompletableFuture<JobStatus> statusAsync() {
+        if (exception.get() != null) {
+            return CompletableFuture.failedFuture(exception.get());
+        }
+
+        return runningJobExecution.get()
+                .statusAsync()
+                .thenApply(this::transformStatus);
+    }
+
+    @Override
+    public CompletableFuture<Void> cancelAsync() {
+        resultFuture.cancel(false);
+        return runningJobExecution.get().cancelAsync();
+    }
+
+    /**
+     * Completes the future with the exception. This method can be called only 
once.
+     *
+     * @param ex the exception that should be set to the future.
+     */
+    void completeExceptionally(Exception ex) {
+        if (exception.compareAndSet(null, ex)) {
+            runningJobExecution.get().resultAsync().completeExceptionally(ex);
+            resultFuture.completeExceptionally(ex);
+        } else {
+            throw new IllegalStateException("Job is already completed 
exceptionally.");
+        }
+    }
+}
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 8df4ce08b7..5ebb6991ee 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.compute;
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toUnmodifiableMap;
 
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -50,11 +51,15 @@ public class IgniteComputeImpl implements IgniteCompute {
     private static final String DEFAULT_SCHEMA_NAME = "PUBLIC";
 
     private final TopologyService topologyService;
+
     private final IgniteTablesInternal tables;
+
     private final ComputeComponent computeComponent;
 
     private final ThreadLocalRandom random = ThreadLocalRandom.current();
 
+    private final NodeLeftEventsSource nodeLeftEventsSource;
+
     /**
      * Create new instance.
      */
@@ -62,6 +67,7 @@ public class IgniteComputeImpl implements IgniteCompute {
         this.topologyService = topologyService;
         this.tables = tables;
         this.computeComponent = computeComponent;
+        this.nodeLeftEventsSource = new NodeLeftEventsSource(topologyService);
     }
 
     /** {@inheritDoc} */
@@ -75,7 +81,11 @@ public class IgniteComputeImpl implements IgniteCompute {
             throw new IllegalArgumentException("nodes must not be empty.");
         }
 
-        return new JobExecutionWrapper<>(executeOnOneNode(randomNode(nodes), 
units, jobClassName, args));
+        Set<ClusterNode> candidates = new HashSet<>(nodes);
+        ClusterNode targetNode = randomNode(candidates);
+        candidates.remove(targetNode);
+
+        return new 
JobExecutionWrapper<>(executeOnOneNodeWithFailover(targetNode, candidates, 
units, jobClassName, args));
     }
 
     /** {@inheritDoc} */
@@ -104,6 +114,24 @@ public class IgniteComputeImpl implements IgniteCompute {
         return iterator.next();
     }
 
+    private <R> JobExecution<R> executeOnOneNodeWithFailover(
+            ClusterNode targetNode,
+            Set<ClusterNode> failoverCandidates,
+            List<DeploymentUnit> units,
+            String jobClassName,
+            Object[] args
+    ) {
+        if (isLocal(targetNode)) {
+            return computeComponent.executeLocally(units, jobClassName, args);
+        } else {
+            return new ComputeJobFailover<R>(
+                    computeComponent, nodeLeftEventsSource,
+                    targetNode, failoverCandidates, units,
+                    jobClassName, args
+            ).failSafeExecute();
+        }
+    }
+
     private <R> JobExecution<R> executeOnOneNode(
             ClusterNode targetNode,
             List<DeploymentUnit> units,
@@ -135,9 +163,11 @@ public class IgniteComputeImpl implements IgniteCompute {
         Objects.requireNonNull(units);
         Objects.requireNonNull(jobClassName);
 
-        return new JobExecutionFutureWrapper<>(requiredTable(tableName)
-                .thenApply(table -> leaderOfTablePartitionByTupleKey(table, 
key))
-                .thenApply(primaryNode -> executeOnOneNode(primaryNode, units, 
jobClassName, args)));
+        return new JobExecutionFutureWrapper<>(
+                requiredTable(tableName)
+                        .thenApply(table -> 
leaderOfTablePartitionByTupleKey(table, key))
+                        .thenApply(primaryNode -> 
executeOnOneNode(primaryNode, units, jobClassName, args))
+        );
     }
 
     /** {@inheritDoc} */
@@ -237,6 +267,8 @@ public class IgniteComputeImpl implements IgniteCompute {
 
         return nodes.stream()
                 .collect(toUnmodifiableMap(identity(),
-                        node -> new 
JobExecutionWrapper<>(executeOnOneNode(node, units, jobClassName, args))));
+                        // No failover nodes for broadcast. We use failover 
here in order to complete futures with exceptions
+                        // if worker node has left the cluster.
+                        node -> new 
JobExecutionWrapper<>(executeOnOneNodeWithFailover(node, Set.of(), units, 
jobClassName, args))));
     }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/NodeLeftEventsSource.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/NodeLeftEventsSource.java
new file mode 100644
index 0000000000..9a3d9e680c
--- /dev/null
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/NodeLeftEventsSource.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.function.Consumer;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.TopologyEventHandler;
+import org.apache.ignite.network.TopologyService;
+
+/**
+ * This is a workaround for implementing add and remove event handlers for 
node left events.
+ * todo:  remove and use TopologyService after <a 
href="https://issues.apache.org/jira/browse/IGNITE-14519";>IGNITE-14519</a>.
+ */
+class NodeLeftEventsSource {
+    private final List<Consumer<ClusterNode>> handlers;
+
+    NodeLeftEventsSource(TopologyService delegate) {
+        this.handlers = new CopyOnWriteArrayList<>();
+
+        delegate.addEventHandler(new NodeLeftTopologyEventHandler());
+    }
+
+    void addEventHandler(Consumer<ClusterNode> onNodeLeftHandler) {
+        handlers.add(onNodeLeftHandler);
+    }
+
+    void removeEventHandler(Consumer<ClusterNode> onNodeLeftHandler) {
+        handlers.remove(onNodeLeftHandler);
+    }
+
+    private class NodeLeftTopologyEventHandler implements TopologyEventHandler 
{
+        @Override
+        public void onDisappeared(ClusterNode member) {
+            handlers.forEach(handler -> handler.accept(member));
+        }
+    }
+}
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/RemoteExecutionContext.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/RemoteExecutionContext.java
new file mode 100644
index 0000000000..abee4f8ddc
--- /dev/null
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/RemoteExecutionContext.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.compute;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.compute.DeploymentUnit;
+import org.apache.ignite.compute.JobExecution;
+
+/**
+ * Captures the context of a remote job execution. Also provides methods to 
access the job execution object
+ * that is returned to the user. The access is thread safe.
+ *
+ * @param <T> type of the result of the job.
+ */
+class RemoteExecutionContext<T> {
+    private final List<DeploymentUnit> units;
+
+    private final String jobClassName;
+
+    private final Object[] args;
+
+    private final AtomicReference<FailSafeJobExecution<T>> jobExecution;
+
+    RemoteExecutionContext(List<DeploymentUnit> units, String jobClassName, 
Object[] args) {
+        this.units = units;
+        this.jobClassName = jobClassName;
+        this.args = args;
+        this.jobExecution = new AtomicReference<>(null);
+    }
+
+    /**
+     * Initializes the job execution object that is supposed to be returned to 
the client. This method can be called only once.
+     *
+     * @param jobExecution the instance of job execution that should be 
returned to the client.
+     */
+    void initJobExecution(FailSafeJobExecution<T> jobExecution) {
+        if (!this.jobExecution.compareAndSet(null, jobExecution)) {
+            throw new IllegalStateException("Job execution is already 
initialized.");
+        }
+    }
+
+    /**
+     * Getter to the job execution object that is supposed to be returned to 
the client.
+     *
+     * @return fail-safe job execution object.
+     */
+    FailSafeJobExecution<T> failSafeJobExecution() {
+        FailSafeJobExecution<T> jobExecution = this.jobExecution.get();
+        if (jobExecution == null) {
+            throw new IllegalStateException("Job execution is not initialized. 
Call initJobExecution() first.");
+        }
+
+        return jobExecution;
+    }
+
+
+    /**
+     * Updates the state of the job execution object but does not change the 
link to the object.
+     * The context holds exactly one link that is returned to the user and 
mutates its internal state only.
+     *
+     * @param jobExecution the new job execution object (supposed to be a 
restarted job but in another worker node).
+     */
+    void updateJobExecution(JobExecution<T> jobExecution) {
+        failSafeJobExecution().updateJobExecution(jobExecution);
+    }
+
+    List<DeploymentUnit> units() {
+        return units;
+    }
+
+    String jobClassName() {
+        return jobClassName;
+    }
+
+    Object[] args() {
+        return args;
+    }
+}

Reply via email to