This is an automated email from the ASF dual-hosted git repository.

ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c56f077040 IGNITE-22124 Java thin: Implement Compute MapReduce API 
(#3744)
c56f077040 is described below

commit c56f077040e736b087ad239c688f43897cc096f7
Author: Vadim Kolodin <vkolo...@gridgain.com>
AuthorDate: Mon May 20 16:09:56 2024 +0300

    IGNITE-22124 Java thin: Implement Compute MapReduce API (#3744)
---
 .../internal/client/proto/ClientMessagePacker.java |  13 ++
 .../client/proto/ClientMessageUnpacker.java        |  10 ++
 .../ignite/internal/client/proto/ClientOp.java     |   5 +
 .../handler/ClientInboundMessageHandler.java       |   4 +
 .../ClientComputeExecuteMapReduceRequest.java      | 100 +++++++++++++++
 .../apache/ignite/client/ClientOperationType.java  |   5 +
 .../org/apache/ignite/client/RetryReadPolicy.java  |   1 +
 .../apache/ignite/internal/client/ClientUtils.java |   3 +
 .../internal/client/compute/ClientCompute.java     |  65 ++++++++--
 .../client/compute/ClientJobExecution.java         |  14 +--
 .../client/compute/ClientTaskExecution.java        | 140 +++++++++++++++++++++
 .../internal/client/compute/SubmitTaskResult.java  |  41 ++++++
 .../client/compute/task/ClientTaskExecution.java   |  57 ---------
 .../apache/ignite/client/ClientComputeTest.java    |  47 +++++++
 .../apache/ignite/client/fakes/FakeCompute.java    |  58 ++++++++-
 .../runner/app/client/ItThinClientComputeTest.java | 122 ++++++++++++++++++
 16 files changed, 612 insertions(+), 73 deletions(-)

diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
index 1fc36b24ce..1f07669394 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessagePacker.java
@@ -520,6 +520,19 @@ public class ClientMessagePacker implements AutoCloseable {
         buf.writeLongLE(val.getLeastSignificantBits());
     }
 
+    /**
+     * Writes a UUID.
+     *
+     * @param val UUID value.
+     */
+    public void packUuidNullable(@Nullable UUID val) {
+        if (val == null) {
+            packNil();
+        } else {
+            packUuid(val);
+        }
+    }
+
     /**
      * Writes a bit set.
      *
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java
index cba6481c9c..ec8086fb11 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientMessageUnpacker.java
@@ -672,6 +672,16 @@ public class ClientMessageUnpacker implements 
AutoCloseable {
         return new UUID(buf.readLongLE(), buf.readLongLE());
     }
 
+    /**
+     * Reads a nullable UUID.
+     *
+     * @return UUID or null.
+     * @throws MessageTypeException when type is not UUID.
+     */
+    public @Nullable UUID unpackUuidNullable() {
+        return tryUnpackNil() ? null : unpackUuid();
+    }
+
     /**
      * Reads a bit set.
      *
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
index 0b83a4869f..bb371faaed 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
@@ -170,4 +170,9 @@ public class ClientOp {
 
     /** Execute SQL query with the parameters batch. */
     public static final int SQL_EXEC_BATCH = 63;
+
+    /**
+     * Execute MapReduce task.
+     */
+    public static final int COMPUTE_EXECUTE_MAPREDUCE = 64;
 }
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 5669f4d285..a9a75b8405 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -45,6 +45,7 @@ import 
org.apache.ignite.client.handler.requests.cluster.ClientClusterGetNodesRe
 import 
org.apache.ignite.client.handler.requests.compute.ClientComputeCancelRequest;
 import 
org.apache.ignite.client.handler.requests.compute.ClientComputeChangePriorityRequest;
 import 
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteColocatedRequest;
+import 
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteMapReduceRequest;
 import 
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest;
 import 
org.apache.ignite.client.handler.requests.compute.ClientComputeGetStatusRequest;
 import org.apache.ignite.client.handler.requests.jdbc.ClientJdbcCloseRequest;
@@ -735,6 +736,9 @@ public class ClientInboundMessageHandler extends 
ChannelInboundHandlerAdapter im
                         notificationSender(requestId)
                 );
 
+            case ClientOp.COMPUTE_EXECUTE_MAPREDUCE:
+                return ClientComputeExecuteMapReduceRequest.process(in, out, 
compute, notificationSender(requestId));
+
             case ClientOp.COMPUTE_GET_STATUS:
                 return ClientComputeGetStatusRequest.process(in, out, compute);
 
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
new file mode 100644
index 0000000000..15ea62f430
--- /dev/null
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.compute;
+
+import static 
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest.unpackArgs;
+import static 
org.apache.ignite.client.handler.requests.compute.ClientComputeExecuteRequest.unpackDeploymentUnits;
+import static 
org.apache.ignite.client.handler.requests.compute.ClientComputeGetStatusRequest.packJobStatus;
+import static org.apache.ignite.internal.util.IgniteUtils.firstNotNull;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.handler.NotificationSender;
+import org.apache.ignite.compute.DeploymentUnit;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.internal.client.proto.ClientMessagePacker;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.compute.IgniteComputeInternal;
+
+/**
+ * Compute MapReduce request.
+ */
+public class ClientComputeExecuteMapReduceRequest {
+    /**
+     * Processes the request.
+     *
+     * @param in Unpacker.
+     * @param out Packer.
+     * @param compute Compute.
+     * @param notificationSender Notification sender.
+     * @return Future.
+     */
+    public static CompletableFuture<Void> process(
+            ClientMessageUnpacker in,
+            ClientMessagePacker out,
+            IgniteComputeInternal compute,
+            NotificationSender notificationSender) {
+        List<DeploymentUnit> deploymentUnits = unpackDeploymentUnits(in);
+        String taskClassName = in.unpackString();
+        Object[] args = unpackArgs(in);
+
+        TaskExecution<Object> execution = 
compute.submitMapReduce(deploymentUnits, taskClassName, args);
+        sendTaskResult(execution, notificationSender);
+
+        var idsAsync = execution.idsAsync()
+                .handle((ids, ex) -> {
+                    // empty ids in case of split exception to properly 
respond with task id and failed status
+                    return ex == null ? ids : Collections.<UUID>emptyList();
+                });
+
+        return execution.idAsync()
+                .thenAcceptBoth(idsAsync, (id, ids) -> {
+                    out.packUuid(id);
+                    packJobIds(out, ids);
+                });
+    }
+
+    static void packJobIds(ClientMessagePacker out, List<UUID> ids) {
+        out.packInt(ids.size());
+        for (var uuid : ids) {
+            out.packUuid(uuid);
+        }
+    }
+
+    static CompletableFuture<Object> sendTaskResult(TaskExecution<Object> 
execution, NotificationSender notificationSender) {
+        return execution.resultAsync().whenComplete((val, err) ->
+                execution.statusAsync().whenComplete((status, errStatus) ->
+                        execution.statusesAsync().whenComplete((statuses, 
errStatuses) ->
+                                notificationSender.sendNotification(w -> {
+                                    w.packObjectAsBinaryTuple(val);
+                                    packJobStatus(w, status);
+                                    packJobStatuses(w, statuses);
+                                }, firstNotNull(err, errStatus, errStatuses)))
+                ));
+    }
+
+    static void packJobStatuses(ClientMessagePacker w, List<JobStatus> 
statuses) {
+        w.packInt(statuses.size());
+        for (JobStatus status : statuses) {
+            packJobStatus(w, status);
+        }
+    }
+}
diff --git 
a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
 
b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
index f62cecfc90..820e111de4 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/client/ClientOperationType.java
@@ -132,6 +132,11 @@ public enum ClientOperationType {
      */
     COMPUTE_EXECUTE,
 
+    /**
+     * Compute Execute MapReduce ({@link 
org.apache.ignite.compute.IgniteCompute#submitMapReduce(List, String, 
Object...)}).
+     */
+    COMPUTE_EXECUTE_MAPREDUCE,
+
     /**
      * Get compute job status ({@link 
org.apache.ignite.compute.JobExecution#statusAsync()}).
      */
diff --git 
a/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java 
b/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
index 482bc8a0e9..4912f2a2db 100644
--- a/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
+++ b/modules/client/src/main/java/org/apache/ignite/client/RetryReadPolicy.java
@@ -39,6 +39,7 @@ public class RetryReadPolicy extends RetryLimitPolicy {
 
             case TUPLE_UPSERT:
             case COMPUTE_EXECUTE:
+            case COMPUTE_EXECUTE_MAPREDUCE:
             case COMPUTE_GET_STATUS:
             case COMPUTE_CANCEL:
             case COMPUTE_CHANGE_PRIORITY:
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
index dde688c5ce..bc76849b24 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
@@ -215,6 +215,9 @@ public class ClientUtils {
             case ClientOp.COMPUTE_EXECUTE_COLOCATED:
                 return ClientOperationType.COMPUTE_EXECUTE;
 
+            case ClientOp.COMPUTE_EXECUTE_MAPREDUCE:
+                return ClientOperationType.COMPUTE_EXECUTE_MAPREDUCE;
+
             case ClientOp.COMPUTE_GET_STATUS:
                 return ClientOperationType.COMPUTE_GET_STATUS;
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
index 63507b113a..efac50488b 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientCompute.java
@@ -20,12 +20,14 @@ package org.apache.ignite.internal.client.compute;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.ignite.lang.ErrorGroups.Client.TABLE_ID_NOT_FOUND_ERR;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
@@ -251,8 +253,10 @@ public class ClientCompute implements IgniteCompute {
 
     @Override
     public <R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units, 
String taskClassName, Object... args) {
-        // TODO https://issues.apache.org/jira/browse/IGNITE-22124
-        throw new UnsupportedOperationException("Not implemented yet.");
+        Objects.requireNonNull(units);
+        Objects.requireNonNull(taskClassName);
+
+        return new ClientTaskExecution<>(ch, doExecuteMapReduceAsync(units, 
taskClassName, args));
     }
 
     @Override
@@ -260,6 +264,20 @@ public class ClientCompute implements IgniteCompute {
         return sync(executeMapReduceAsync(units, taskClassName, args));
     }
 
+    private CompletableFuture<SubmitTaskResult> doExecuteMapReduceAsync(
+            List<DeploymentUnit> units,
+            String taskClassName,
+            Object... args) {
+        return ch.serviceAsync(
+                ClientOp.COMPUTE_EXECUTE_MAPREDUCE,
+                w -> packTask(w.out(), units, taskClassName, args),
+                ClientCompute::unpackSubmitTaskResult,
+                null,
+                null,
+                true
+        );
+    }
+
     private CompletableFuture<SubmitResult> executeOnNodesAsync(
             Set<ClusterNode> nodes,
             List<DeploymentUnit> units,
@@ -418,11 +436,7 @@ public class ClientCompute implements IgniteCompute {
             String jobClassName,
             JobExecutionOptions options,
             Object[] args) {
-        w.packInt(units.size());
-        for (DeploymentUnit unit : units) {
-            w.packString(unit.name());
-            w.packString(unit.version().render());
-        }
+        packDeploymentUnits(w, units);
 
         w.packString(jobClassName);
         w.packInt(options.priority());
@@ -430,6 +444,23 @@ public class ClientCompute implements IgniteCompute {
         w.packObjectArrayAsBinaryTuple(args);
     }
 
+    private static void packTask(ClientMessagePacker w,
+            List<DeploymentUnit> units,
+            String taskClassName,
+            Object[] args) {
+        packDeploymentUnits(w, units);
+        w.packString(taskClassName);
+        w.packObjectArrayAsBinaryTuple(args);
+    }
+
+    private static void packDeploymentUnits(ClientMessagePacker w, 
List<DeploymentUnit> units) {
+        w.packInt(units.size());
+        for (DeploymentUnit unit : units) {
+            w.packString(unit.name());
+            w.packString(unit.version().render());
+        }
+    }
+
     /**
      * Unpacks job id from channel and gets notification future. This is 
needed because we need to unpack message response in the payload
      * reader because the unpacker will be closed after the response is 
processed.
@@ -441,6 +472,26 @@ public class ClientCompute implements IgniteCompute {
         return new SubmitResult(ch.in().unpackUuid(), ch.notificationFuture());
     }
 
+    /**
+     * Unpacks coordination job id and jobs ids which are executing under this 
task from channel and gets notification future.
+     * This is needed because we need to unpack message response in the payload
+     * reader because the unpacker will be closed after the response is 
processed.
+     *
+     * @param ch Payload channel.
+     * @return Result of the task submission.
+     */
+    private static SubmitTaskResult unpackSubmitTaskResult(PayloadInputChannel 
ch) {
+        var jobId = ch.in().unpackUuid();
+
+        var size = ch.in().unpackInt();
+        List<UUID> jobIds = new ArrayList<>(size);
+        for (int i = 0; i < size; i++) {
+            jobIds.add(ch.in().unpackUuid());
+        }
+
+        return new SubmitTaskResult(jobId, jobIds, ch.notificationFuture());
+    }
+
     private static <R> R sync(CompletableFuture<R> future) {
         try {
             return future.join();
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
index c95bd5e8ec..f4535aec1b 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientJobExecution.java
@@ -75,7 +75,7 @@ class ClientJobExecution<R> implements JobExecution<R> {
         if (statusFuture.isDone()) {
             return statusFuture;
         }
-        return jobIdFuture.thenCompose(this::getJobStatus);
+        return jobIdFuture.thenCompose(jobId -> getJobStatus(ch, jobId));
     }
 
     @Override
@@ -83,7 +83,7 @@ class ClientJobExecution<R> implements JobExecution<R> {
         if (statusFuture.isDone()) {
             return falseCompletedFuture();
         }
-        return jobIdFuture.thenCompose(this::cancelJob);
+        return jobIdFuture.thenCompose(jobId -> cancelJob(ch, jobId));
     }
 
     @Override
@@ -91,10 +91,10 @@ class ClientJobExecution<R> implements JobExecution<R> {
         if (statusFuture.isDone()) {
             return falseCompletedFuture();
         }
-        return jobIdFuture.thenCompose(jobId -> changePriority(jobId, 
newPriority));
+        return jobIdFuture.thenCompose(jobId -> changePriority(ch, jobId, 
newPriority));
     }
 
-    private CompletableFuture<@Nullable JobStatus> getJobStatus(UUID jobId) {
+    static CompletableFuture<@Nullable JobStatus> getJobStatus(ReliableChannel 
ch, UUID jobId) {
         // Send the request to any node, the request will be broadcast since 
client doesn't know which particular node is running the job
         // especially in case of colocated execution.
         return ch.serviceAsync(
@@ -107,7 +107,7 @@ class ClientJobExecution<R> implements JobExecution<R> {
         );
     }
 
-    private CompletableFuture<@Nullable Boolean> cancelJob(UUID jobId) {
+    static CompletableFuture<@Nullable Boolean> cancelJob(ReliableChannel ch, 
UUID jobId) {
         // Send the request to any node, the request will be broadcast since 
client doesn't know which particular node is running the job
         // especially in case of colocated execution.
         return ch.serviceAsync(
@@ -120,7 +120,7 @@ class ClientJobExecution<R> implements JobExecution<R> {
         );
     }
 
-    private CompletableFuture<@Nullable Boolean> changePriority(UUID jobId, 
int newPriority) {
+    static CompletableFuture<@Nullable Boolean> changePriority(ReliableChannel 
ch, UUID jobId, int newPriority) {
         // Send the request to any node, the request will be broadcast since 
client doesn't know which particular node is running the job
         // especially in case of colocated execution.
         return ch.serviceAsync(
@@ -136,7 +136,7 @@ class ClientJobExecution<R> implements JobExecution<R> {
         );
     }
 
-    private static @Nullable JobStatus unpackJobStatus(PayloadInputChannel 
payloadInputChannel) {
+    static @Nullable JobStatus unpackJobStatus(PayloadInputChannel 
payloadInputChannel) {
         ClientMessageUnpacker unpacker = payloadInputChannel.in();
         if (unpacker.tryUnpackNil()) {
             return null;
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
new file mode 100644
index 0000000000..9df7d73b6a
--- /dev/null
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/ClientTaskExecution.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.compute;
+
+import static 
org.apache.ignite.internal.client.compute.ClientJobExecution.cancelJob;
+import static 
org.apache.ignite.internal.client.compute.ClientJobExecution.changePriority;
+import static 
org.apache.ignite.internal.client.compute.ClientJobExecution.getJobStatus;
+import static 
org.apache.ignite.internal.client.compute.ClientJobExecution.unpackJobStatus;
+import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import org.apache.ignite.compute.JobStatus;
+import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.internal.client.PayloadInputChannel;
+import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.util.CompletableFutures;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Client compute task implementation.
+ *
+ * @param <R> Task result type.
+ */
+class ClientTaskExecution<R> implements TaskExecution<R> {
+    private final ReliableChannel ch;
+
+    private final CompletableFuture<UUID> jobIdFuture;
+
+    private final CompletableFuture<List<@Nullable UUID>> jobIdsFuture;
+
+    private final CompletableFuture<R> resultAsync;
+
+    // Local status cache
+    private final CompletableFuture<@Nullable JobStatus> statusFuture = new 
CompletableFuture<>();
+
+    // Local statuses cache
+    private final CompletableFuture<List<@Nullable JobStatus>> statusesFutures 
= new CompletableFuture<>();
+
+    ClientTaskExecution(ReliableChannel ch, 
CompletableFuture<SubmitTaskResult> reqFuture) {
+        this.ch = ch;
+
+        jobIdFuture = reqFuture.thenApply(SubmitTaskResult::jobId);
+        jobIdsFuture = reqFuture.thenApply(SubmitTaskResult::jobIds);
+
+        resultAsync = reqFuture
+                .thenCompose(SubmitTaskResult::notificationFuture)
+                .thenApply(payloadInputChannel -> {
+                    // Notifications require explicit input close.
+                    try (payloadInputChannel) {
+                        R result = (R) 
payloadInputChannel.in().unpackObjectFromBinaryTuple();
+                        
statusFuture.complete(unpackJobStatus(payloadInputChannel));
+                        
statusesFutures.complete(unpackJobStatuses(payloadInputChannel));
+                        return result;
+                    }
+                });
+    }
+
+    @Override
+    public CompletableFuture<R> resultAsync() {
+        return resultAsync;
+    }
+
+    @Override
+    public CompletableFuture<@Nullable JobStatus> statusAsync() {
+        if (statusFuture.isDone()) {
+            return statusFuture;
+        }
+        return jobIdFuture.thenCompose(jobId -> getJobStatus(ch, jobId));
+    }
+
+    @Override
+    public CompletableFuture<@Nullable Boolean> cancelAsync() {
+        if (statusFuture.isDone()) {
+            return falseCompletedFuture();
+        }
+        return jobIdFuture.thenCompose(jobId -> cancelJob(ch, jobId));
+    }
+
+    @Override
+    public CompletableFuture<@Nullable Boolean> changePriorityAsync(int 
newPriority) {
+        if (statusFuture.isDone()) {
+            return falseCompletedFuture();
+        }
+        return jobIdFuture.thenCompose(jobId -> changePriority(ch, jobId, 
newPriority));
+    }
+
+    @Override
+    public CompletableFuture<List<@Nullable JobStatus>> statusesAsync() {
+        if (statusesFutures.isDone()) {
+            return statusesFutures;
+        }
+
+        return jobIdsFuture.thenCompose(ids -> {
+            @SuppressWarnings("unchecked")
+            CompletableFuture<@Nullable JobStatus>[] futures = ids.stream()
+                    .map(jobId -> getJobStatus(ch, jobId))
+                    .toArray(CompletableFuture[]::new);
+
+            return CompletableFutures.allOf(futures)
+                    .thenApply(Function.identity());
+        });
+    }
+
+    private static List<@Nullable JobStatus> 
unpackJobStatuses(PayloadInputChannel payloadInputChannel) {
+        var unpacker = payloadInputChannel.in();
+        var size = unpacker.unpackInt();
+
+        if (size == 0) {
+            return Collections.emptyList();
+        }
+
+        var statuses = new ArrayList<@Nullable JobStatus>(size);
+        for (int i = 0; i < size; i++) {
+            var status = unpackJobStatus(payloadInputChannel);
+            statuses.add(status);
+        }
+
+        return Collections.unmodifiableList(statuses);
+    }
+}
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/SubmitTaskResult.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/SubmitTaskResult.java
new file mode 100644
index 0000000000..71ef494201
--- /dev/null
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/SubmitTaskResult.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.client.compute;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.client.PayloadInputChannel;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Result of the task submission.
+ * Contains unpacked job id, a collection of ids of the jobs which are 
executing under this task, and notification future.
+ */
+class SubmitTaskResult extends SubmitResult {
+    private final List<UUID> jobIds;
+
+    SubmitTaskResult(UUID jobId, List<UUID> jobIds, 
CompletableFuture<PayloadInputChannel> notificationFuture) {
+        super(jobId, notificationFuture);
+        this.jobIds = jobIds;
+    }
+
+    List<@Nullable UUID> jobIds() {
+        return jobIds;
+    }
+}
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/task/ClientTaskExecution.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/compute/task/ClientTaskExecution.java
deleted file mode 100644
index dd1b766cef..0000000000
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/compute/task/ClientTaskExecution.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.client.compute.task;
-
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.compute.JobStatus;
-import org.apache.ignite.compute.TaskExecution;
-import org.jetbrains.annotations.Nullable;
-
-// TODO https://issues.apache.org/jira/browse/IGNITE-22124
-/**
- * Client compute task implementation.
- *
- * @param <R> Task result type.
- */
-public class ClientTaskExecution<R> implements TaskExecution<R> {
-    @Override
-    public CompletableFuture<R> resultAsync() {
-        throw new UnsupportedOperationException("Not implemented yet.");
-    }
-
-    @Override
-    public CompletableFuture<@Nullable JobStatus> statusAsync() {
-        throw new UnsupportedOperationException("Not implemented yet.");
-    }
-
-    @Override
-    public CompletableFuture<@Nullable Boolean> cancelAsync() {
-        throw new UnsupportedOperationException("Not implemented yet.");
-    }
-
-    @Override
-    public CompletableFuture<@Nullable Boolean> changePriorityAsync(int 
newPriority) {
-        throw new UnsupportedOperationException("Not implemented yet.");
-    }
-
-    @Override
-    public CompletableFuture<List<@Nullable JobStatus>> statusesAsync() {
-        throw new UnsupportedOperationException("Not implemented yet.");
-    }
-}
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java 
b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
index 6a58de7d9f..6fb03e47da 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/ClientComputeTest.java
@@ -28,7 +28,10 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.closeAll;
 import static org.apache.ignite.lang.ErrorGroups.Table.TABLE_NOT_FOUND_ERR;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -45,6 +48,7 @@ import org.apache.ignite.client.fakes.FakeIgnite;
 import org.apache.ignite.client.fakes.FakeIgniteTables;
 import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.compute.JobExecution;
+import org.apache.ignite.compute.TaskExecution;
 import org.apache.ignite.compute.version.Version;
 import org.apache.ignite.internal.client.table.ClientTable;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -216,6 +220,49 @@ public class ClientComputeTest extends 
BaseIgniteAbstractTest {
         }
     }
 
+    @Test
+    void testMapReduceExecute() throws Exception {
+        initServers(reqId -> false);
+
+        try (var client = getClient(server1)) {
+            Object[] args = {"arg1", 2};
+            String res1 = client.compute().executeMapReduce(List.of(), "job", 
args);
+            assertEquals("s1", res1);
+        }
+    }
+
+    @Test
+    void testMapReduceSubmit() throws Exception {
+        initServers(reqId -> false);
+
+        try (var client = getClient(server1)) {
+            TaskExecution<Object> task = 
client.compute().submitMapReduce(List.of(), "job");
+
+            assertThat(task.resultAsync(), willBe("s1"));
+
+            assertThat(task.statusAsync(), 
willBe(jobStatusWithState(COMPLETED)));
+            assertThat(task.statusesAsync(), 
willBe(everyItem(jobStatusWithState(COMPLETED))));
+
+            assertThat("compute task and sub tasks ids must be different",
+                    task.idsAsync(), 
willBe(not(hasItem(task.idAsync().get()))));
+        }
+    }
+
+    @Test
+    void testMapReduceException() throws Exception {
+        initServers(reqId -> false);
+
+        try (var client = getClient(server1)) {
+            FakeCompute.future = CompletableFuture.failedFuture(new 
RuntimeException("job failed"));
+
+            TaskExecution<Object> execution = 
client.compute().submitMapReduce(List.of(), "job");
+
+            assertThat(execution.resultAsync(), 
willThrowFast(IgniteException.class));
+            assertThat(execution.statusAsync(), 
willBe(jobStatusWithState(FAILED)));
+            assertThat(execution.statusesAsync(), 
willBe(everyItem(jobStatusWithState(FAILED))));
+        }
+    }
+
     @Test
     void testUnitsPropagation() throws Exception {
         initServers(reqId -> false);
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index 3d5c9d79d0..6dc1c72a07 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -34,6 +34,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
+import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.compute.IgniteCompute;
@@ -190,7 +191,7 @@ public class FakeCompute implements IgniteComputeInternal {
 
     @Override
     public <R> TaskExecution<R> submitMapReduce(List<DeploymentUnit> units, 
String taskClassName, Object... args) {
-        return null;
+        return taskExecution(future != null ? future : completedFuture((R) 
nodeName));
     }
 
     @Override
@@ -215,7 +216,7 @@ public class FakeCompute implements IgniteComputeInternal {
 
         result.whenComplete((r, throwable) -> {
             JobState state = throwable != null ? FAILED : COMPLETED;
-            JobStatus newStatus = 
status.toBuilder().state(state).finishTime(Instant.now()).build();
+            JobStatus newStatus = 
status.toBuilder().id(jobId).state(state).finishTime(Instant.now()).build();
             statuses.put(jobId, newStatus);
         });
         return new JobExecution<>() {
@@ -241,6 +242,59 @@ public class FakeCompute implements IgniteComputeInternal {
         };
     }
 
+    private <R> TaskExecution<R> taskExecution(CompletableFuture<R> result) {
+        BiFunction<UUID, JobState, JobStatus> toStatus = (id, jobState) ->
+                JobStatus.builder()
+                        .id(id)
+                        .state(jobState)
+                        .createTime(Instant.now())
+                        .startTime(Instant.now())
+                        .build();
+
+        UUID jobId = UUID.randomUUID();
+        UUID subJobId1 = UUID.randomUUID();
+        UUID subJobId2 = UUID.randomUUID();
+
+        statuses.put(jobId, toStatus.apply(jobId, EXECUTING));
+        statuses.put(subJobId1, toStatus.apply(subJobId1, EXECUTING));
+        statuses.put(subJobId2, toStatus.apply(subJobId2, EXECUTING));
+
+        result.whenComplete((r, throwable) -> {
+            JobState state = throwable != null ? FAILED : COMPLETED;
+
+            statuses.put(jobId, toStatus.apply(jobId, state));
+            statuses.put(subJobId1, toStatus.apply(subJobId1, state));
+            statuses.put(subJobId2, toStatus.apply(subJobId2, state));
+        });
+
+        return new TaskExecution<>() {
+            @Override
+            public CompletableFuture<R> resultAsync() {
+                return result;
+            }
+
+            @Override
+            public CompletableFuture<@Nullable JobStatus> statusAsync() {
+                return completedFuture(statuses.get(jobId));
+            }
+
+            @Override
+            public CompletableFuture<List<@Nullable JobStatus>> 
statusesAsync() {
+                return completedFuture(List.of(statuses.get(subJobId1), 
statuses.get(subJobId2)));
+            }
+
+            @Override
+            public CompletableFuture<@Nullable Boolean> cancelAsync() {
+                return trueCompletedFuture();
+            }
+
+            @Override
+            public CompletableFuture<@Nullable Boolean> 
changePriorityAsync(int newPriority) {
+                return trueCompletedFuture();
+            }
+        };
+    }
+
     @Override
     public CompletableFuture<Collection<JobStatus>> statusesAsync() {
         return completedFuture(statuses.values());
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
index 1e7bb01e2e..ca2e22cf2d 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
@@ -32,8 +32,12 @@ import static 
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Table.COLUMN_ALREADY_EXISTS_ERR;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
 import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.everyItem;
+import static org.hamcrest.Matchers.hasItem;
 import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 import static org.hamcrest.Matchers.oneOf;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -70,13 +74,20 @@ import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobExecutionContext;
+import org.apache.ignite.compute.TaskExecution;
+import org.apache.ignite.compute.task.ComputeJobRunner;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecutionContext;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.table.Tuple;
 import org.apache.ignite.table.mapper.Mapper;
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
 
 /**
  * Thin client compute integration test.
@@ -668,6 +679,46 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
         assertEquals(expected, res);
     }
 
+    @Test
+    void testExecuteMapReduce() throws Exception {
+        TaskExecution<String> execution = 
client().compute().submitMapReduce(List.of(), 
MapReduceNodeNameTask.class.getName());
+
+        List<Matcher<? super String>> nodeNames = sortedNodes().stream()
+                .map(ClusterNode::name)
+                .map(Matchers::containsString)
+                .collect(Collectors.toList());
+        assertThat(execution.resultAsync(), willBe(allOf(nodeNames)));
+
+        assertThat(execution.statusAsync(), 
willBe(jobStatusWithState(COMPLETED)));
+        assertThat(execution.statusesAsync(), 
willBe(everyItem(jobStatusWithState(COMPLETED))));
+
+        assertThat("compute task and sub tasks ids must be different",
+                execution.idsAsync(), 
willBe(not(hasItem(execution.idAsync().get()))));
+    }
+
+    @Test
+    void testExecuteMapReduceWithArgs() {
+        TaskExecution<String> execution = client().compute()
+                .submitMapReduce(List.of(), MapReduceArgsTask.class.getName(), 
1, "2", 3.3);
+
+        assertThat(execution.resultAsync(), willBe(containsString("1_2_3.3")));
+        assertThat(execution.statusAsync(), 
willBe(jobStatusWithState(COMPLETED)));
+    }
+
+    @ParameterizedTest
+    @ValueSource(classes = {MapReduceExceptionOnSplitTask.class, 
MapReduceExceptionOnReduceTask.class})
+    void testExecuteMapReduceExceptionPropagation(Class<?> taskClass) {
+        IgniteException cause = getExceptionInJobExecutionAsync(
+                client().compute().submitMapReduce(List.of(), 
taskClass.getName())
+        );
+
+        assertThat(cause.getMessage(), containsString("Custom job error"));
+        assertEquals(TRACE_ID, cause.traceId());
+        assertEquals(COLUMN_ALREADY_EXISTS_ERR, cause.code());
+        assertInstanceOf(CustomException.class, cause);
+        assertNull(cause.getCause()); // No stack trace by default.
+    }
+
     private void testEchoArg(Object arg) {
         Object res = client().compute().execute(Set.of(node(0)), List.of(), 
EchoJob.class.getName(), arg, arg.toString());
 
@@ -755,6 +806,77 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
         }
     }
 
+    private static class MapReduceNodeNameTask implements 
MapReduceTask<String> {
+        @Override
+        public List<ComputeJobRunner> split(TaskExecutionContext context, 
Object... args) {
+            return context.ignite().clusterNodes().stream()
+                    .map(node -> ComputeJobRunner.builder()
+                            .jobClassName(NodeNameJob.class.getName())
+                            .nodes(Set.of(node))
+                            .args(args)
+                            .build())
+                    .collect(Collectors.toList());
+        }
+
+        @Override
+        public String reduce(Map<UUID, ?> results) {
+            return results.values().stream()
+                    .map(String.class::cast)
+                    .collect(Collectors.joining(","));
+        }
+    }
+
+    private static class MapReduceArgsTask implements MapReduceTask<String> {
+        @Override
+        public List<ComputeJobRunner> split(TaskExecutionContext context, 
Object... args) {
+            return context.ignite().clusterNodes().stream()
+                    .map(node -> ComputeJobRunner.builder()
+                            .jobClassName(ConcatJob.class.getName())
+                            .nodes(Set.of(node))
+                            .args(args)
+                            .build())
+                    .collect(Collectors.toList());
+        }
+
+        @Override
+        public String reduce(Map<UUID, ?> results) {
+            return results.values().stream()
+                    .map(String.class::cast)
+                    .collect(Collectors.joining(","));
+        }
+    }
+
+    private static class MapReduceExceptionOnSplitTask implements 
MapReduceTask<String> {
+        @Override
+        public List<ComputeJobRunner> split(TaskExecutionContext context, 
Object... args) {
+            throw new CustomException(TRACE_ID, COLUMN_ALREADY_EXISTS_ERR, 
"Custom job error", null);
+        }
+
+        @Override
+        public String reduce(Map<UUID, ?> results) {
+            return "expected split exception";
+        }
+    }
+
+    private static class MapReduceExceptionOnReduceTask implements 
MapReduceTask<String> {
+
+        @Override
+        public List<ComputeJobRunner> split(TaskExecutionContext context, 
Object... args) {
+            return context.ignite().clusterNodes().stream()
+                    .map(node -> ComputeJobRunner.builder()
+                            .jobClassName(NodeNameJob.class.getName())
+                            .nodes(Set.of(node))
+                            .args(args)
+                            .build())
+                    .collect(Collectors.toList());
+        }
+
+        @Override
+        public String reduce(Map<UUID, ?> results) {
+            throw new CustomException(TRACE_ID, COLUMN_ALREADY_EXISTS_ERR, 
"Custom job error", null);
+        }
+    }
+
     /**
      * Custom public exception class.
      */


Reply via email to