This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 0685356531e IGNITE-26118 Add compute task events for map reduce tasks 
(#6491)
0685356531e is described below

commit 0685356531e243d8e1a219e13ea6f98e8095501e
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Fri Aug 29 11:30:07 2025 +0300

    IGNITE-26118 Add compute task events for map reduce tasks (#6491)
---
 .../handler/ClientInboundMessageHandler.java       | 10 ++-
 .../ClientComputeExecuteMapReduceRequest.java      | 18 ++++-
 .../apache/ignite/client/fakes/FakeCompute.java    | 12 ++-
 .../ignite/internal/compute/ItMapReduceTest.java   |  6 +-
 .../internal/compute/events/EventMatcher.java      | 32 ++++----
 .../events/ItComputeEventsEmbeddedTest.java        | 62 +++++++++++++++
 .../compute/events/ItComputeEventsTest.java        | 90 ++++++++++++++++++++--
 .../internal/compute/utils/InteractiveTasks.java   | 32 +++++---
 ...MapReduce.java => FailingJobMapReduceTask.java} | 34 +++-----
 ...Reduce.java => FailingReduceMapReduceTask.java} | 33 +++-----
 .../compute/FailingSplitMapReduceTask.java}        | 33 ++++----
 .../apache/ignite/internal/compute/MapReduce.java  |  3 +-
 .../ignite/internal/compute/ComputeComponent.java  |  1 +
 .../internal/compute/ComputeComponentImpl.java     | 14 +++-
 .../ignite/internal/compute/IgniteComputeImpl.java | 16 +++-
 .../internal/compute/IgniteComputeInternal.java    | 21 +++++
 .../compute/events/ComputeEventMetadata.java       | 11 +--
 .../events/ComputeEventMetadataBuilder.java        | 33 ++++++--
 .../compute/events/ComputeEventsFactory.java       | 32 +++++---
 .../internal/compute/executor/ComputeExecutor.java |  1 +
 .../compute/executor/ComputeExecutorImpl.java      | 15 +++-
 .../compute/queue/PriorityQueueExecutor.java       | 20 ++---
 .../internal/compute/queue/QueueExecutionImpl.java |  4 +-
 .../ignite/internal/compute/task/JobSubmitter.java |  8 +-
 .../compute/task/TaskExecutionInternal.java        | 88 +++++++++++++++++----
 .../compute/queue/PriorityQueueExecutorTest.java   |  3 +-
 .../internal/eventlog/api/IgniteEventType.java     |  6 ++
 27 files changed, 470 insertions(+), 168 deletions(-)

diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 75401189935..672ee1333b5 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -930,7 +930,7 @@ public class ClientInboundMessageHandler
                 );
 
             case ClientOp.COMPUTE_EXECUTE_MAPREDUCE:
-                return ClientComputeExecuteMapReduceRequest.process(in, 
compute, notificationSender(requestId));
+                return ClientComputeExecuteMapReduceRequest.process(in, 
compute, notificationSender(requestId), clientContext);
 
             case ClientOp.COMPUTE_GET_STATE:
                 return ClientComputeGetStateRequest.process(in, compute);
@@ -1324,8 +1324,12 @@ public class ClientInboundMessageHandler
 
     private class ComputeConnection implements PlatformComputeConnection {
         @Override
-        public CompletableFuture<ComputeJobDataHolder> executeJobAsync(long 
jobId, List<String> deploymentUnitPaths, String jobClassName,
-                ComputeJobDataHolder arg) {
+        public CompletableFuture<ComputeJobDataHolder> executeJobAsync(
+                long jobId,
+                List<String> deploymentUnitPaths,
+                String jobClassName,
+                @Nullable ComputeJobDataHolder arg
+        ) {
             return sendServerToClientRequest(ServerOp.COMPUTE_JOB_EXEC,
                     packer -> {
                         packer.packLong(jobId);
diff --git 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
index 7b29b10842a..ccecbf120bc 100644
--- 
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
+++ 
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.handler.ClientContext;
 import org.apache.ignite.client.handler.NotificationSender;
 import org.apache.ignite.client.handler.ResponseWriter;
 import org.apache.ignite.compute.JobState;
@@ -39,6 +40,9 @@ import 
org.apache.ignite.internal.compute.ComputeJobDataHolder;
 import org.apache.ignite.internal.compute.HybridTimestampProvider;
 import org.apache.ignite.internal.compute.IgniteComputeInternal;
 import org.apache.ignite.internal.compute.MarshallerProvider;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.marshalling.Marshaller;
@@ -55,18 +59,26 @@ public class ClientComputeExecuteMapReduceRequest {
      * @param in Unpacker.
      * @param compute Compute.
      * @param notificationSender Notification sender.
+     * @param clientContext Client context.
      * @return Future.
      */
     public static CompletableFuture<ResponseWriter> process(
             ClientMessageUnpacker in,
             IgniteComputeInternal compute,
-            NotificationSender notificationSender) {
+            NotificationSender notificationSender,
+            ClientContext clientContext
+    ) {
         List<DeploymentUnit> deploymentUnits = in.unpackDeploymentUnits();
         String taskClassName = in.unpackString();
         ComputeJobDataHolder arg = unpackJobArgumentWithoutMarshaller(in);
 
-        TaskExecution<Object> execution = compute.submitMapReduce(
-                
TaskDescriptor.builder(taskClassName).units(deploymentUnits).build(), arg);
+        TaskDescriptor<Object, Object> taskDescriptor = 
TaskDescriptor.builder(taskClassName).units(deploymentUnits).build();
+
+        ComputeEventMetadataBuilder metadataBuilder = 
ComputeEventMetadata.builder(Type.MAP_REDUCE)
+                .eventUser(clientContext.userDetails())
+                .clientAddress(clientContext.remoteAddress().toString());
+
+        TaskExecution<Object> execution = 
compute.submitMapReduceInternal(taskDescriptor, metadataBuilder, arg, null);
         sendTaskResult(execution, notificationSender);
 
         var idsAsync = execution.idsAsync()
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index f9b090ce4a2..934085e4736 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -180,6 +180,16 @@ public class FakeCompute implements IgniteComputeInternal {
         return nullCompletedFuture();
     }
 
+    @Override
+    public <T, R> TaskExecution<R> submitMapReduceInternal(
+            TaskDescriptor<T, R> taskDescriptor,
+            ComputeEventMetadataBuilder metadataBuilder,
+            @Nullable T arg,
+            @Nullable CancellationToken cancellationToken
+    ) {
+        return taskExecution(future != null ? future : completedFuture((R) 
nodeName));
+    }
+
     @Override
     public <T, R> CompletableFuture<JobExecution<R>> submitAsync(
             JobTarget target,
@@ -272,7 +282,7 @@ public class FakeCompute implements IgniteComputeInternal {
             @Nullable T arg,
             @Nullable CancellationToken cancellationToken
     ) {
-        return taskExecution(future != null ? future : completedFuture((R) 
nodeName));
+        return submitMapReduceInternal(taskDescriptor, null, arg, 
cancellationToken);
     }
 
     @Override
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
index dafcdef6771..a1611552ab8 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
@@ -78,7 +78,7 @@ class ItMapReduceTest extends ClusterPerClassIntegrationTest {
         TaskExecution<List<String>> taskExecution = 
igniteCompute.submitMapReduce(
                 TaskDescriptor.<Object, 
List<String>>builder(InteractiveTasks.GlobalApi.name()).build(), null);
         assertTaskExecuting(taskExecution);
-        InteractiveTasks.GlobalApi.assertAlive();
+        InteractiveTasks.GlobalApi.assertSplitAlive();
 
         // Save state before split.
         TaskState stateBeforeSplit = taskExecution.stateAsync().join();
@@ -249,7 +249,7 @@ class ItMapReduceTest extends 
ClusterPerClassIntegrationTest {
         InteractiveJobs.all().finishReturnWorkerNames();
 
         // Wait for the reduce job to start.
-        InteractiveTasks.GlobalApi.assertAlive();
+        InteractiveTasks.GlobalApi.assertReduceAlive();
 
         // When cancel the task.
         assertThat(cancelHandle.cancelAsync(), willCompleteSuccessfully());
@@ -274,7 +274,7 @@ class ItMapReduceTest extends 
ClusterPerClassIntegrationTest {
                 TaskDescriptor.<String, 
List<String>>builder(InteractiveTasks.GlobalApi.name()).build(), arg, 
cancellationToken
         );
         assertTaskExecuting(taskExecution);
-        InteractiveTasks.GlobalApi.assertAlive();
+        InteractiveTasks.GlobalApi.assertSplitAlive();
         return taskExecution;
     }
 
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/EventMatcher.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/EventMatcher.java
index 650bf0c115d..f83a3d7460f 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/EventMatcher.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/EventMatcher.java
@@ -87,7 +87,7 @@ public class EventMatcher extends TypeSafeMatcher<String> {
                 .withClassName(jobClassName)
                 .withJobId(jobId)
                 .withTargetNode(targetNode)
-                .withInitiatorNode(initiatorNode)
+                .withInitiatorNode(is(initiatorNode))
                 .withClientAddress(nullValue());
     }
 
@@ -115,57 +115,61 @@ public class EventMatcher extends TypeSafeMatcher<String> 
{
                 .withClientAddress(notNullValue(String.class));
     }
 
-    EventMatcher withTimestamp(Matcher<? super Long> matcher) {
+    public EventMatcher withTimestamp(Matcher<? super Long> matcher) {
         this.timestampMatcher = matcher;
         return this;
     }
 
-    EventMatcher withProductVersion(Matcher<? super String> matcher) {
+    public EventMatcher withProductVersion(Matcher<? super String> matcher) {
         this.productVersionMatcher = matcher;
         return this;
     }
 
-    EventMatcher withUsername(Matcher<? super String> matcher) {
+    public EventMatcher withUsername(Matcher<? super String> matcher) {
         this.usernameMatcher = matcher;
         return this;
     }
 
-    EventMatcher withType(String type) {
+    public EventMatcher withType(String type) {
         this.typeMatcher = is(type);
         return this;
     }
 
-    EventMatcher withClassName(String className) {
+    public EventMatcher withClassName(String className) {
         this.classNameMatcher = is(className);
         return this;
     }
 
-    EventMatcher withTableName(String tableName) {
+    public EventMatcher withTableName(String tableName) {
         this.tableNameMatcher = is(tableName);
         return this;
     }
 
-    EventMatcher withJobId(@Nullable UUID jobId) {
+    public EventMatcher withJobId(@Nullable UUID jobId) {
         this.jobIdMatcher = is(jobId);
         return this;
     }
 
-    EventMatcher withTaskId(@Nullable UUID taskId) {
+    public EventMatcher withTaskId(@Nullable UUID taskId) {
         this.taskIdMatcher = is(taskId);
         return this;
     }
 
-    EventMatcher withTargetNode(String targetNode) {
-        this.targetNodeMatcher = is(targetNode);
+    public EventMatcher withTargetNode(String targetNode) {
+        return withTargetNode(is(targetNode));
+    }
+
+    public EventMatcher withTargetNode(Matcher<? super String> 
targetNodeMatcher) {
+        this.targetNodeMatcher = targetNodeMatcher;
         return this;
     }
 
-    EventMatcher withInitiatorNode(String initiatorNode) {
-        this.initiatorNodeMatcher = is(initiatorNode);
+    public EventMatcher withInitiatorNode(Matcher<? super String> 
initiatorNodeMatcher) {
+        this.initiatorNodeMatcher = initiatorNodeMatcher;
         return this;
     }
 
-    EventMatcher withClientAddress(Matcher<? super String> 
clientAddressMatcher) {
+    public EventMatcher withClientAddress(Matcher<? super String> 
clientAddressMatcher) {
         this.clientAddressMatcher = clientAddressMatcher;
         return this;
     }
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsEmbeddedTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsEmbeddedTest.java
index d9a7dd199b4..d00866e456a 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsEmbeddedTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsEmbeddedTest.java
@@ -18,12 +18,25 @@
 package org.apache.ignite.internal.compute.events;
 
 import static 
org.apache.ignite.internal.compute.events.EventMatcher.embeddedJobEvent;
+import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_CANCELED;
+import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_EXECUTING;
+import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_QUEUED;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.containsInRelativeOrder;
 
+import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.compute.TaskDescriptor;
+import org.apache.ignite.compute.task.TaskExecution;
 import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
+import org.apache.ignite.internal.compute.utils.InteractiveJobs;
+import org.apache.ignite.internal.compute.utils.InteractiveTasks;
 import org.apache.ignite.internal.eventlog.api.IgniteEventType;
+import org.apache.ignite.lang.CancelHandle;
+import org.apache.ignite.lang.CancellationToken;
 import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
 
 class ItComputeEventsEmbeddedTest extends ItComputeEventsTest {
     @Override
@@ -35,4 +48,53 @@ class ItComputeEventsEmbeddedTest extends 
ItComputeEventsTest {
     protected EventMatcher jobEvent(IgniteEventType eventType, Type jobType, 
@Nullable UUID jobId, String jobClassName, String targetNode) {
         return embeddedJobEvent(eventType, jobType, jobId, jobClassName, 
targetNode, node(0).name());
     }
+
+    // Cancel tests are hard to implement without global signals, let's test 
them in the embedded mode only.
+    @Test
+    void taskSplitCanceled() {
+        CancelHandle cancelHandle = CancelHandle.create();
+        TaskExecution<List<String>> execution = 
startTask(cancelHandle.token());
+
+        cancelHandle.cancel();
+
+        UUID taskId = execution.idAsync().join(); // Safe to join since 
execution is complete.
+
+        assertEvents(
+                taskEvent(COMPUTE_TASK_QUEUED, 
InteractiveTasks.GlobalApi.name(), taskId),
+                taskEvent(COMPUTE_TASK_EXECUTING, 
InteractiveTasks.GlobalApi.name(), taskId),
+                taskEvent(COMPUTE_TASK_CANCELED, 
InteractiveTasks.GlobalApi.name(), taskId)
+        );
+    }
+
+    @Test
+    void taskReduceCanceled() throws InterruptedException {
+        CancelHandle cancelHandle = CancelHandle.create();
+        TaskExecution<List<String>> execution = 
startTask(cancelHandle.token());
+        InteractiveTasks.GlobalApi.finishSplit();
+        InteractiveJobs.all().finishReturnWorkerNames();
+        InteractiveTasks.GlobalApi.assertReduceAlive();
+
+        cancelHandle.cancel();
+
+        UUID taskId = execution.idAsync().join(); // Safe to join since 
execution is complete.
+
+        // There are a lot of job events, skip them
+        await().until(logInspector::events, containsInRelativeOrder(
+                taskEvent(COMPUTE_TASK_QUEUED, 
InteractiveTasks.GlobalApi.name(), taskId),
+                taskEvent(COMPUTE_TASK_EXECUTING, 
InteractiveTasks.GlobalApi.name(), taskId),
+                taskEvent(COMPUTE_TASK_CANCELED, 
InteractiveTasks.GlobalApi.name(), taskId)
+        ));
+    }
+
+    private TaskExecution<List<String>> startTask(@Nullable CancellationToken 
cancellationToken) {
+        TaskExecution<List<String>> taskExecution = compute().submitMapReduce(
+                TaskDescriptor.<String, 
List<String>>builder(InteractiveTasks.GlobalApi.name()).build(), null, 
cancellationToken
+        );
+        try {
+            InteractiveTasks.GlobalApi.assertSplitAlive();
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+        return taskExecution;
+    }
 }
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsTest.java
index c9419d69c74..4fe7341986c 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.compute.events;
 import static org.apache.ignite.compute.JobStatus.CANCELED;
 import static org.apache.ignite.compute.JobStatus.EXECUTING;
 import static 
org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type.BROADCAST;
+import static 
org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type.MAP_REDUCE;
 import static 
org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type.SINGLE;
 import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_CANCELED;
 import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_CANCELING;
@@ -27,6 +28,10 @@ import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JO
 import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_EXECUTING;
 import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_FAILED;
 import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_QUEUED;
+import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_COMPLETED;
+import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_EXECUTING;
+import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_FAILED;
+import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_QUEUED;
 import static org.apache.ignite.internal.eventlog.api.IgniteEventType.values;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -37,15 +42,19 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInRelativeOrder;
 import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.in;
 import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
 import org.apache.ignite.InitParametersBuilder;
 import org.apache.ignite.compute.BroadcastExecution;
 import org.apache.ignite.compute.BroadcastJobTarget;
@@ -54,13 +63,22 @@ import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.compute.JobDescriptor;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskDescriptor;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecution;
 import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
 import org.apache.ignite.internal.ConfigOverride;
 import org.apache.ignite.internal.compute.FailingJob;
+import org.apache.ignite.internal.compute.FailingJobMapReduceTask;
+import org.apache.ignite.internal.compute.FailingReduceMapReduceTask;
+import org.apache.ignite.internal.compute.FailingSplitMapReduceTask;
 import org.apache.ignite.internal.compute.GetNodeNameJob;
+import org.apache.ignite.internal.compute.MapReduce;
 import org.apache.ignite.internal.compute.SilentSleepJob;
 import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
 import org.apache.ignite.internal.compute.events.EventMatcher.Event;
+import org.apache.ignite.internal.compute.utils.InteractiveJobs;
+import org.apache.ignite.internal.compute.utils.InteractiveTasks;
 import org.apache.ignite.internal.eventlog.api.IgniteEventType;
 import org.apache.ignite.internal.testframework.log4j2.EventLogInspector;
 import org.apache.ignite.lang.CancelHandle;
@@ -78,15 +96,19 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 @ConfigOverride(name = "ignite.compute.threadPoolSize", value = "1")
 abstract class ItComputeEventsTest extends ClusterPerClassIntegrationTest {
-    private final EventLogInspector logInspector = new EventLogInspector();
+    final EventLogInspector logInspector = new EventLogInspector();
 
     @BeforeEach
-    void startLogInspector() {
+    void setUp() {
+        InteractiveJobs.clearState();
+        InteractiveTasks.clearState();
+        
InteractiveJobs.initChannels(CLUSTER.runningNodes().map(Ignite::name).collect(Collectors.toList()));
+
         logInspector.start();
     }
 
     @AfterEach
-    void afterEach() {
+    void tearDown() {
         logInspector.stop();
         dropAllTables();
     }
@@ -95,7 +117,7 @@ abstract class ItComputeEventsTest extends 
ClusterPerClassIntegrationTest {
     protected void configureInitParameters(InitParametersBuilder builder) {
         String allEvents = Arrays.stream(values())
                 .map(IgniteEventType::name)
-                .filter(name -> name.startsWith("COMPUTE_JOB"))
+                .filter(name -> name.startsWith("COMPUTE"))
                 .collect(Collectors.joining(", ", "[", "]"));
 
         builder.clusterConfiguration("ignite.eventlog {"
@@ -318,6 +340,53 @@ abstract class ItComputeEventsTest extends 
ClusterPerClassIntegrationTest {
         );
     }
 
+    @Test
+    void taskCompleted() {
+        TaskExecution<Integer> execution = 
compute().submitMapReduce(TaskDescriptor.builder(MapReduce.class).build(), 
null);
+
+        assertThat(execution.resultAsync(), willCompleteSuccessfully());
+
+        UUID taskId = execution.idAsync().join(); // Safe to join since 
execution is complete.
+        String taskClassName = MapReduce.class.getName();
+
+        List<UUID> jobIds = execution.idsAsync().join();
+        UUID firstJobId = jobIds.get(0);
+        List<String> nodeNames = 
CLUSTER.runningNodes().map(Ignite::name).collect(Collectors.toList());
+
+        await().until(logInspector::events, hasSize(3 + 3 * 3)); // 3 task 
events and 3 job events per node
+        assertThat(logInspector.events(), containsInRelativeOrder(
+                taskEvent(COMPUTE_TASK_QUEUED, taskClassName, taskId),
+                taskEvent(COMPUTE_TASK_EXECUTING, taskClassName, taskId),
+                taskJobEvent(COMPUTE_JOB_QUEUED, firstJobId, in(nodeNames)),
+                taskJobEvent(COMPUTE_JOB_EXECUTING, firstJobId, in(nodeNames)),
+                taskJobEvent(COMPUTE_JOB_COMPLETED, firstJobId, in(nodeNames)),
+                taskEvent(COMPUTE_TASK_COMPLETED, taskClassName, taskId)
+        ));
+
+        jobIds.forEach(jobId -> assertThat(logInspector.events(), 
containsInRelativeOrder(
+                taskJobEvent(COMPUTE_JOB_QUEUED, jobId, in(nodeNames)),
+                taskJobEvent(COMPUTE_JOB_EXECUTING, jobId, in(nodeNames)),
+                taskJobEvent(COMPUTE_JOB_COMPLETED, jobId, in(nodeNames))
+        )));
+    }
+
+    @ParameterizedTest
+    @ValueSource(classes = {FailingSplitMapReduceTask.class, 
FailingJobMapReduceTask.class, FailingReduceMapReduceTask.class})
+    void taskFailed(Class<? extends MapReduceTask<Void, Void, Void, Void>> 
mapReduceClass) {
+        TaskExecution<Void> execution = 
compute().submitMapReduce(TaskDescriptor.builder(mapReduceClass).build(), null);
+
+        assertThat(execution.resultAsync(), willThrow(ComputeException.class));
+
+        UUID taskId = execution.idAsync().join(); // Safe to join since 
execution is complete.
+
+        // Skip checking possible job events.
+        await().until(logInspector::events, containsInRelativeOrder(
+                taskEvent(COMPUTE_TASK_QUEUED, mapReduceClass.getName(), 
taskId),
+                taskEvent(COMPUTE_TASK_EXECUTING, mapReduceClass.getName(), 
taskId),
+                taskEvent(COMPUTE_TASK_FAILED, mapReduceClass.getName(), 
taskId)
+        ));
+    }
+
     private <T, R> JobExecution<R> submit(JobTarget target, JobDescriptor<T, 
R> descriptor, @Nullable T arg) {
         return submit(target, descriptor, null, arg);
     }
@@ -370,8 +439,19 @@ abstract class ItComputeEventsTest extends 
ClusterPerClassIntegrationTest {
             String targetNode
     );
 
+    EventMatcher taskEvent(IgniteEventType eventType, String taskClassName, 
@Nullable UUID taskId) {
+        return jobEvent(eventType, MAP_REDUCE, null, taskClassName, 
node(0).name())
+                .withTaskId(taskId)
+                .withInitiatorNode(nullValue());
+    }
+
+    private EventMatcher taskJobEvent(IgniteEventType eventType, @Nullable 
UUID jobId, Matcher<? super String> targetNodeMatcher) {
+        return jobEvent(eventType, MAP_REDUCE, jobId, 
GetNodeNameJob.class.getName(), "")
+                .withTargetNode(targetNodeMatcher);
+    }
+
     @SafeVarargs
-    private void assertEvents(Matcher<String>... matchers) {
+    final void assertEvents(Matcher<String>... matchers) {
         await().until(logInspector::events, contains(matchers));
     }
 
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
index 7768d06e637..f690d6e1e0e 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
@@ -45,7 +45,8 @@ import org.apache.ignite.compute.task.TaskExecutionContext;
  */
 public final class InteractiveTasks {
     /**
-     * ACK for {@link Signal#CONTINUE}. Returned by a task that has received 
the signal. Used to check that the task is alive.
+     * ACK for {@link Signal#CONTINUE_REDUCE} and {@link 
Signal#CONTINUE_REDUCE}. Returned by a task that has received the signal. Used 
to
+     * check that the task is alive.
      */
     private static final Object ACK = new Object();
 
@@ -62,8 +63,8 @@ public final class InteractiveTasks {
     private static final BlockingQueue<Object> GLOBAL_CHANNEL = new 
LinkedBlockingQueue<>();
 
     /**
-     * This counter indicated how many {@link 
GlobalInteractiveMapReduceTask#splitAsync(TaskExecutionContext, Object...)} 
methods are
-     * running now. This counter increased each time the {@link 
GlobalInteractiveMapReduceTask#splitAsync(TaskExecutionContext, Object...)}
+     * This counter indicated how many {@link 
GlobalInteractiveMapReduceTask#splitAsync(TaskExecutionContext, String)} 
methods are
+     * running now. This counter increased each time the {@link 
GlobalInteractiveMapReduceTask#splitAsync(TaskExecutionContext, String)}
      * is called and decreased when the method is finished (whatever the 
result is). Checked in {@link #clearState}.
      */
     private static final AtomicInteger RUNNING_GLOBAL_SPLIT_CNT = new 
AtomicInteger(0);
@@ -104,9 +105,14 @@ public final class InteractiveTasks {
      */
     private enum Signal {
         /**
-         * Signal to the task to continue running and send ACK as a response.
+         * Signal to the task to continue running split and send ACK as a 
response.
          */
-        CONTINUE,
+        CONTINUE_SPLIT,
+
+        /**
+         * Signal to the task to continue running reduce and send ACK as a 
response.
+         */
+        CONTINUE_REDUCE,
 
         /**
          * Ask task to throw an exception.
@@ -183,7 +189,7 @@ public final class InteractiveTasks {
                     switch (receivedSignal) {
                         case THROW:
                             throw new RuntimeException();
-                        case CONTINUE:
+                        case CONTINUE_SPLIT:
                             GLOBAL_CHANNEL.offer(ACK);
                             break;
                         case SPLIT_RETURN_ALL_NODES:
@@ -216,7 +222,7 @@ public final class InteractiveTasks {
                     switch (receivedSignal) {
                         case THROW:
                             throw new RuntimeException();
-                        case CONTINUE:
+                        case CONTINUE_REDUCE:
                             GLOBAL_CHANNEL.offer(ACK);
                             break;
                         case REDUCE_RETURN:
@@ -240,11 +246,19 @@ public final class InteractiveTasks {
      * API for the interaction with {@link GlobalInteractiveMapReduceTask}.
      */
     public static final class GlobalApi {
+        /**
+         * Checks that {@link GlobalInteractiveMapReduceTask} is alive and in 
the split phase.
+         */
+        public static void assertSplitAlive() throws InterruptedException {
+            GLOBAL_SIGNALS.offer(Signal.CONTINUE_SPLIT);
+            assertThat(GLOBAL_CHANNEL.poll(WAIT_TIMEOUT_SECONDS, 
TimeUnit.SECONDS), equalTo(ACK));
+        }
+
         /**
          * Checks that {@link GlobalInteractiveMapReduceTask} is alive.
          */
-        public static void assertAlive() throws InterruptedException {
-            GLOBAL_SIGNALS.offer(Signal.CONTINUE);
+        public static void assertReduceAlive() throws InterruptedException {
+            GLOBAL_SIGNALS.offer(Signal.CONTINUE_REDUCE);
             assertThat(GLOBAL_CHANNEL.poll(WAIT_TIMEOUT_SECONDS, 
TimeUnit.SECONDS), equalTo(ACK));
         }
 
diff --git 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingJobMapReduceTask.java
similarity index 50%
copy from 
modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
copy to 
modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingJobMapReduceTask.java
index d33a93efe15..599dd2107fe 100644
--- 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
+++ 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingJobMapReduceTask.java
@@ -18,45 +18,31 @@
 package org.apache.ignite.internal.compute;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.stream.Collectors.toList;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.JobDescriptor;
-import org.apache.ignite.compute.JobExecutionOptions;
 import org.apache.ignite.compute.task.MapReduceJob;
 import org.apache.ignite.compute.task.MapReduceTask;
 import org.apache.ignite.compute.task.TaskExecutionContext;
-import org.apache.ignite.deployment.DeploymentUnit;
 
-/** Map reduce task which runs a {@link GetNodeNameJob} on each node and 
computes a sum of length of all node names. */
-public class MapReduce implements MapReduceTask<List<DeploymentUnit>, Void, 
String, Integer> {
+/** Map reduce task which submits failing jobs. */
+public class FailingJobMapReduceTask implements MapReduceTask<Void, Void, 
String, Void> {
     @Override
-    public CompletableFuture<List<MapReduceJob<Void, String>>> splitAsync(
-            TaskExecutionContext taskContext, List<DeploymentUnit> 
deploymentUnits) {
-        return taskContext.ignite().cluster().nodesAsync().thenApply(nodes -> 
nodes.stream().map(node ->
+    public CompletableFuture<List<MapReduceJob<Void, String>>> 
splitAsync(TaskExecutionContext taskContext, Void input) {
+        return completedFuture(List.of(
                 MapReduceJob.<Void, String>builder()
-                        .jobDescriptor(
-                                JobDescriptor.builder(GetNodeNameJob.class)
-                                .units(deploymentUnits)
-                                .options(JobExecutionOptions.builder()
-                                    .maxRetries(10)
-                                    .priority(Integer.MAX_VALUE)
-                                    .build())
-                                .build())
-                        .nodes(Set.of(node))
+                        
.jobDescriptor(JobDescriptor.builder(FailingJob.class).build())
+                        .nodes(taskContext.ignite().cluster().nodes())
                         .build()
-        ).collect(toList()));
+        ));
     }
 
     @Override
-    public CompletableFuture<Integer> reduceAsync(TaskExecutionContext 
taskContext, Map<UUID, String> results) {
-        return completedFuture(results.values().stream()
-                .map(String::length)
-                .reduce(Integer::sum)
-                .orElseThrow());
+    public CompletableFuture<Void> reduceAsync(TaskExecutionContext 
taskContext, Map<UUID, String> results) {
+        return nullCompletedFuture();
     }
 }
diff --git 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingReduceMapReduceTask.java
similarity index 50%
copy from 
modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
copy to 
modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingReduceMapReduceTask.java
index d33a93efe15..72021281281 100644
--- 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
+++ 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingReduceMapReduceTask.java
@@ -18,45 +18,30 @@
 package org.apache.ignite.internal.compute;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.stream.Collectors.toList;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.JobDescriptor;
-import org.apache.ignite.compute.JobExecutionOptions;
 import org.apache.ignite.compute.task.MapReduceJob;
 import org.apache.ignite.compute.task.MapReduceTask;
 import org.apache.ignite.compute.task.TaskExecutionContext;
-import org.apache.ignite.deployment.DeploymentUnit;
 
-/** Map reduce task which runs a {@link GetNodeNameJob} on each node and 
computes a sum of length of all node names. */
-public class MapReduce implements MapReduceTask<List<DeploymentUnit>, Void, 
String, Integer> {
+/** Map reduce task that throws an exception from the reduceAsync. */
+public class FailingReduceMapReduceTask implements MapReduceTask<Void, Void, 
String, Void> {
     @Override
-    public CompletableFuture<List<MapReduceJob<Void, String>>> splitAsync(
-            TaskExecutionContext taskContext, List<DeploymentUnit> 
deploymentUnits) {
-        return taskContext.ignite().cluster().nodesAsync().thenApply(nodes -> 
nodes.stream().map(node ->
+    public CompletableFuture<List<MapReduceJob<Void, String>>> 
splitAsync(TaskExecutionContext taskContext, Void input) {
+        return completedFuture(List.of(
                 MapReduceJob.<Void, String>builder()
-                        .jobDescriptor(
-                                JobDescriptor.builder(GetNodeNameJob.class)
-                                .units(deploymentUnits)
-                                .options(JobExecutionOptions.builder()
-                                    .maxRetries(10)
-                                    .priority(Integer.MAX_VALUE)
-                                    .build())
-                                .build())
-                        .nodes(Set.of(node))
+                        
.jobDescriptor(JobDescriptor.builder(GetNodeNameJob.class).build())
+                        .nodes(taskContext.ignite().cluster().nodes())
                         .build()
-        ).collect(toList()));
+        ));
     }
 
     @Override
-    public CompletableFuture<Integer> reduceAsync(TaskExecutionContext 
taskContext, Map<UUID, String> results) {
-        return completedFuture(results.values().stream()
-                .map(String::length)
-                .reduce(Integer::sum)
-                .orElseThrow());
+    public CompletableFuture<Void> reduceAsync(TaskExecutionContext 
taskContext, Map<UUID, String> results) {
+        throw new RuntimeException();
     }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java
 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingSplitMapReduceTask.java
similarity index 53%
copy from 
modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java
copy to 
modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingSplitMapReduceTask.java
index 8532554ab19..7423337f50b 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java
+++ 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingSplitMapReduceTask.java
@@ -15,24 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.compute.task;
+package org.apache.ignite.internal.compute;
+
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.task.MapReduceJob;
-import org.apache.ignite.lang.CancellationToken;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecutionContext;
 
-/**
- * Compute job submitter.
- */
-@FunctionalInterface
-public interface JobSubmitter<T, R> {
-    /**
-     * Submits compute jobs for an execution.
-     *
-     * @param computeJobRunners List of the compute job start parameters.
-     * @param cancellationToken Cancellation token.
-     */
-    CompletableFuture<List<JobExecution<R>>> submit(List<MapReduceJob<T, R>> 
computeJobRunners, CancellationToken cancellationToken);
+/** Map reduce task that throws an exception from the splitAsync. */
+public class FailingSplitMapReduceTask implements MapReduceTask<Void, Void, 
Void, Void> {
+    @Override
+    public CompletableFuture<List<MapReduceJob<Void, Void>>> 
splitAsync(TaskExecutionContext taskContext, Void input) {
+        throw new RuntimeException();
+    }
+
+    @Override
+    public CompletableFuture<Void> reduceAsync(TaskExecutionContext 
taskContext, Map<UUID, Void> results) {
+        return nullCompletedFuture();
+    }
 }
diff --git 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
index d33a93efe15..8acac4c83df 100644
--- 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
+++ 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
@@ -22,7 +22,6 @@ import static java.util.stream.Collectors.toList;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.JobDescriptor;
@@ -47,7 +46,7 @@ public class MapReduce implements 
MapReduceTask<List<DeploymentUnit>, Void, Stri
                                     .priority(Integer.MAX_VALUE)
                                     .build())
                                 .build())
-                        .nodes(Set.of(node))
+                        .node(node)
                         .build()
         ).collect(toList()));
     }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
index 00fd414e33f..5f8091c5bb3 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
@@ -116,6 +116,7 @@ public interface ComputeComponent extends IgniteComponent {
             JobSubmitter<M, T> jobSubmitter,
             List<DeploymentUnit> units,
             String taskClassName,
+            ComputeEventMetadataBuilder metadataBuilder,
             I arg
     );
 
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 056782b9bbe..31b3aab798d 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -180,6 +180,7 @@ public class ComputeComponentImpl implements 
ComputeComponent, SystemViewProvide
             JobSubmitter<M, T> jobSubmitter,
             List<DeploymentUnit> units,
             String taskClassName,
+            ComputeEventMetadataBuilder metadataBuilder,
             I input
     ) {
         if (!busyLock.enterBusy()) {
@@ -192,7 +193,13 @@ public class ComputeComponentImpl implements 
ComputeComponent, SystemViewProvide
             CompletableFuture<TaskExecutionInternal<I, M, T, R>> taskFuture =
                     
mapClassLoaderExceptions(jobContextManager.acquireClassLoader(units), 
taskClassName)
                             .thenApply(context -> {
-                                TaskExecutionInternal<I, M, T, R> execution = 
execTask(context, jobSubmitter, taskClassName, input);
+                                TaskExecutionInternal<I, M, T, R> execution = 
execTask(
+                                        context,
+                                        jobSubmitter,
+                                        taskClassName,
+                                        metadataBuilder,
+                                        input
+                                );
                                 execution.resultAsync().whenComplete((r, e) -> 
context.close());
                                 
inFlightFutures.registerFuture(execution.resultAsync());
                                 return execution;
@@ -368,10 +375,11 @@ public class ComputeComponentImpl implements 
ComputeComponent, SystemViewProvide
             JobContext context,
             JobSubmitter<M, T> jobSubmitter,
             String taskClassName,
-            I input
+            ComputeEventMetadataBuilder metadataBuilder,
+            I arg
     ) {
         try {
-            return executor.executeTask(jobSubmitter, 
taskClass(context.classLoader().classLoader(), taskClassName), input);
+            return executor.executeTask(jobSubmitter, 
taskClass(context.classLoader().classLoader(), taskClassName), metadataBuilder, 
arg);
         } catch (Throwable e) {
             context.close();
             throw e;
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 278f20fbecd..bcda7162b43 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -601,8 +601,9 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
     }
 
     @Override
-    public <T, R> TaskExecution<R> submitMapReduce(
+    public <T, R> TaskExecution<R> submitMapReduceInternal(
             TaskDescriptor<T, R> taskDescriptor,
+            ComputeEventMetadataBuilder metadataBuilder,
             @Nullable T arg,
             @Nullable CancellationToken cancellationToken
     ) {
@@ -612,6 +613,7 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
                 this::submitJobs,
                 taskDescriptor.units(),
                 taskDescriptor.taskClassName(),
+                metadataBuilder,
                 arg
         );
 
@@ -622,6 +624,15 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
         return new TaskExecutionWrapper<>(taskExecution);
     }
 
+    @Override
+    public <T, R> TaskExecution<R> submitMapReduce(
+            TaskDescriptor<T, R> taskDescriptor,
+            @Nullable T arg,
+            @Nullable CancellationToken cancellationToken
+    ) {
+        return submitMapReduceInternal(taskDescriptor, 
ComputeEventMetadata.builder(Type.MAP_REDUCE), arg, cancellationToken);
+    }
+
     @Override
     public <T, R> R executeMapReduce(
             TaskDescriptor<T, R> taskDescriptor,
@@ -633,13 +644,16 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
 
     private <M, T> CompletableFuture<List<JobExecution<T>>> submitJobs(
             List<MapReduceJob<M, T>> runners,
+            ComputeEventMetadataBuilder metadataBuilder,
             CancellationToken cancellationToken
     ) {
+        //noinspection unchecked
         return allOfToList(
                 runners.stream()
                         .map(runner -> submitAsync(
                                 JobTarget.anyNode(runner.nodes()),
                                 runner.jobDescriptor(),
+                                metadataBuilder.copyOf(), // Make a copy since 
the builder is mutable
                                 runner.arg(),
                                 cancellationToken
                         ))
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
index 6060254e1b6..5702442d123 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
@@ -27,6 +27,9 @@ import org.apache.ignite.compute.IgniteCompute;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobExecutionOptions;
 import org.apache.ignite.compute.JobState;
+import org.apache.ignite.compute.TaskDescriptor;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecution;
 import org.apache.ignite.deployment.DeploymentUnit;
 import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
 import org.apache.ignite.internal.table.TableViewInternal;
@@ -111,6 +114,24 @@ public interface IgniteComputeInternal extends 
IgniteCompute {
             @Nullable CancellationToken cancellationToken
     );
 
+    /**
+     * Submits a {@link MapReduceTask} of the given class for an execution.
+     *
+     * @param <T> Job argument (T)ype.
+     * @param <R> Job (R)esult type.
+     * @param taskDescriptor Map reduce task descriptor.
+     * @param metadataBuilder Event metadata builder.
+     * @param arg Task argument.
+     * @param cancellationToken Cancellation token or {@code null}.
+     * @return Task execution interface.
+     */
+    <T, R> TaskExecution<R> submitMapReduceInternal(
+            TaskDescriptor<T, R> taskDescriptor,
+            ComputeEventMetadataBuilder metadataBuilder,
+            @Nullable T arg,
+            @Nullable CancellationToken cancellationToken
+    );
+
     /**
      * Retrieves the current state of all jobs on all nodes in the cluster.
      *
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventMetadata.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventMetadata.java
index fc463feabff..a88dc92ddb8 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventMetadata.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventMetadata.java
@@ -35,7 +35,7 @@ public class ComputeEventMetadata {
     private final String jobClassName;
 
     /** Compute job id. */
-    private final UUID jobId;
+    private final @Nullable UUID jobId;
 
     /** Target node name. */
     private final String targetNode;
@@ -44,19 +44,19 @@ public class ComputeEventMetadata {
     private final String initiatorNode;
 
     /** Used in events for broadcast jobs - common id for a group of jobs. */
-    private final UUID taskId;
+    private final @Nullable UUID taskId;
 
     /** For colocated jobs. */
-    private final String tableName;
+    private final @Nullable String tableName;
 
     /** For client API. */
-    private final String clientAddress;
+    private final @Nullable String clientAddress;
 
     ComputeEventMetadata(
             EventUser eventUser,
             Type type,
             String jobClassName,
-            UUID jobId,
+            @Nullable UUID jobId,
             String targetNode,
             String initiatorNode,
             @Nullable UUID taskId,
@@ -105,6 +105,7 @@ public class ComputeEventMetadata {
         return jobClassName;
     }
 
+    @Nullable
     UUID jobId() {
         return jobId;
     }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventMetadataBuilder.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventMetadataBuilder.java
index 81afecee646..1ab4dfaf32a 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventMetadataBuilder.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventMetadataBuilder.java
@@ -30,7 +30,7 @@ public class ComputeEventMetadataBuilder {
     private EventUser eventUser;
     private Type type;
     private String jobClassName;
-    private UUID jobId;
+    private @Nullable UUID jobId;
     private String targetNode;
     private String initiatorNode;
     private @Nullable UUID taskId;
@@ -52,12 +52,12 @@ public class ComputeEventMetadataBuilder {
         return this;
     }
 
-    public ComputeEventMetadataBuilder jobId(UUID jobId) {
+    public ComputeEventMetadataBuilder jobId(@Nullable UUID jobId) {
         this.jobId = jobId;
         return this;
     }
 
-    public UUID jobId() {
+    public @Nullable UUID jobId() {
         return jobId;
     }
 
@@ -81,7 +81,7 @@ public class ComputeEventMetadataBuilder {
         return this;
     }
 
-    public ComputeEventMetadataBuilder clientAddress(String clientAddress) {
+    public ComputeEventMetadataBuilder clientAddress(@Nullable String 
clientAddress) {
         this.clientAddress = clientAddress;
         return this;
     }
@@ -93,7 +93,7 @@ public class ComputeEventMetadataBuilder {
      */
     public ComputeEventMetadata build() {
         return new ComputeEventMetadata(
-                eventUser,
+                eventUser != null ? eventUser : EventUser.system(),
                 type,
                 jobClassName,
                 jobId,
@@ -104,4 +104,27 @@ public class ComputeEventMetadataBuilder {
                 clientAddress
         );
     }
+
+    /**
+     * Creates new builder from this builder.
+     *
+     * @return Created builder.
+     */
+    public ComputeEventMetadataBuilder copyOf() {
+        ComputeEventMetadataBuilder builder = new ComputeEventMetadataBuilder()
+                .type(type)
+                .jobClassName(jobClassName)
+                .jobId(jobId)
+                .targetNode(targetNode)
+                .initiatorNode(initiatorNode)
+                .taskId(taskId)
+                .tableName(tableName)
+                .clientAddress(clientAddress);
+
+        if (eventUser != null) {
+            builder.eventUser(new UserDetails(eventUser.username(), 
eventUser.authenticationProvider()));
+        }
+
+        return builder;
+    }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventsFactory.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventsFactory.java
index 705d8e3e368..d3cc85bc7fb 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventsFactory.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventsFactory.java
@@ -27,7 +27,6 @@ import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JO
 import java.util.Map;
 import org.apache.ignite.internal.eventlog.api.EventLog;
 import org.apache.ignite.internal.eventlog.api.IgniteEventType;
-import org.apache.ignite.internal.eventlog.event.EventUser;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.jetbrains.annotations.Nullable;
 
@@ -41,7 +40,7 @@ public class ComputeEventsFactory {
      * @param eventLog Event log.
      * @param eventMetadata Event metadata.
      */
-    public static void logJobQueuedEvent(EventLog eventLog, 
ComputeEventMetadata eventMetadata) {
+    public static void logJobQueuedEvent(EventLog eventLog, @Nullable 
ComputeEventMetadata eventMetadata) {
         logEvent(eventLog, COMPUTE_JOB_QUEUED, eventMetadata);
     }
 
@@ -51,7 +50,7 @@ public class ComputeEventsFactory {
      * @param eventLog Event log.
      * @param eventMetadata Event metadata.
      */
-    public static void logJobExecutingEvent(EventLog eventLog, 
ComputeEventMetadata eventMetadata) {
+    public static void logJobExecutingEvent(EventLog eventLog, @Nullable 
ComputeEventMetadata eventMetadata) {
         logEvent(eventLog, COMPUTE_JOB_EXECUTING, eventMetadata);
     }
 
@@ -61,7 +60,7 @@ public class ComputeEventsFactory {
      * @param eventLog Event log.
      * @param eventMetadata Event metadata.
      */
-    public static void logJobFailedEvent(EventLog eventLog, 
ComputeEventMetadata eventMetadata) {
+    public static void logJobFailedEvent(EventLog eventLog, @Nullable 
ComputeEventMetadata eventMetadata) {
         logEvent(eventLog, COMPUTE_JOB_FAILED, eventMetadata);
     }
 
@@ -71,7 +70,7 @@ public class ComputeEventsFactory {
      * @param eventLog Event log.
      * @param eventMetadata Event metadata.
      */
-    public static void logJobCompletedEvent(EventLog eventLog, 
ComputeEventMetadata eventMetadata) {
+    public static void logJobCompletedEvent(EventLog eventLog, @Nullable 
ComputeEventMetadata eventMetadata) {
         logEvent(eventLog, COMPUTE_JOB_COMPLETED, eventMetadata);
     }
 
@@ -81,7 +80,7 @@ public class ComputeEventsFactory {
      * @param eventLog Event log.
      * @param eventMetadata Event metadata.
      */
-    public static void logJobCancelingEvent(EventLog eventLog, 
ComputeEventMetadata eventMetadata) {
+    public static void logJobCancelingEvent(EventLog eventLog, @Nullable 
ComputeEventMetadata eventMetadata) {
         logEvent(eventLog, COMPUTE_JOB_CANCELING, eventMetadata);
     }
 
@@ -91,28 +90,37 @@ public class ComputeEventsFactory {
      * @param eventLog Event log.
      * @param eventMetadata Event metadata.
      */
-    public static void logJobCanceledEvent(EventLog eventLog, 
ComputeEventMetadata eventMetadata) {
+    public static void logJobCanceledEvent(EventLog eventLog, @Nullable 
ComputeEventMetadata eventMetadata) {
         logEvent(eventLog, COMPUTE_JOB_CANCELED, eventMetadata);
     }
 
-    private static void logEvent(EventLog eventLog, IgniteEventType eventType, 
ComputeEventMetadata eventMetadata) {
+    /**
+     * Logs compute job event.
+     *
+     * @param eventLog Event log.
+     * @param eventType Event type.
+     * @param eventMetadata Event metadata.
+     */
+    public static void logEvent(EventLog eventLog, IgniteEventType eventType, 
@Nullable ComputeEventMetadata eventMetadata) {
+        if (eventMetadata == null) {
+            return;
+        }
+
         eventLog.log(eventType.name(), () -> {
             Map<String, Object> fields = IgniteUtils.newLinkedHashMap(8);
 
             fields.put(FieldNames.TYPE, eventMetadata.type());
             fields.put(FieldNames.CLASS_NAME, eventMetadata.jobClassName());
-            fields.put(FieldNames.JOB_ID, eventMetadata.jobId());
             fields.put(FieldNames.TARGET_NODE, eventMetadata.targetNode());
             fields.put(FieldNames.INITIATOR_NODE, 
eventMetadata.initiatorNode());
 
+            putIfNotNull(fields, FieldNames.JOB_ID, eventMetadata.jobId()); // 
Could be null for map reduce tasks
             putIfNotNull(fields, FieldNames.TASK_ID, eventMetadata.taskId());
             putIfNotNull(fields, FieldNames.TABLE_NAME, 
eventMetadata.tableName());
             putIfNotNull(fields, FieldNames.CLIENT_ADDRESS, 
eventMetadata.clientAddress());
 
-            EventUser user = eventMetadata.eventUser();
-
             return eventType.builder()
-                    .user(user != null ? user : EventUser.system())
+                    .user(eventMetadata.eventUser())
                     .timestamp(System.currentTimeMillis())
                     .fields(fields)
                     .build();
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutor.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutor.java
index b8c0e78d33e..a8a94893144 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutor.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutor.java
@@ -41,6 +41,7 @@ public interface ComputeExecutor {
     <I, M, T, R> TaskExecutionInternal<I, M, T, R> executeTask(
             JobSubmitter<M, T> jobSubmitter,
             Class<? extends MapReduceTask<I, M, T, R>> taskClass,
+            ComputeEventMetadataBuilder metadataBuilder,
             I input
     );
 
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
index 53c87b153c9..d20996b5c14 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
@@ -126,7 +126,9 @@ public class ComputeExecutorImpl implements ComputeExecutor 
{
         AtomicBoolean isInterrupted = new AtomicBoolean();
         JobExecutionContext context = new JobExecutionContextImpl(ignite, 
isInterrupted, classLoader, options.partition());
 
-        metadataBuilder.jobClassName(jobClassName);
+        metadataBuilder
+                .jobClassName(jobClassName)
+                .targetNode(ignite.name());
 
         Callable<CompletableFuture<ComputeJobDataHolder>> jobCallable = 
getJobCallable(
                 options.executorType(), jobClassName, classLoader, input, 
context);
@@ -241,21 +243,26 @@ public class ComputeExecutorImpl implements 
ComputeExecutor {
     public <I, M, T, R> TaskExecutionInternal<I, M, T, R> executeTask(
             JobSubmitter<M, T> jobSubmitter,
             Class<? extends MapReduceTask<I, M, T, R>> taskClass,
-            I input
+            ComputeEventMetadataBuilder metadataBuilder,
+            I arg
     ) {
         assert executorService != null;
 
         AtomicBoolean isCancelled = new AtomicBoolean();
         TaskExecutionContext context = new TaskExecutionContextImpl(ignite, 
isCancelled);
 
-        return new TaskExecutionInternal<>(executorService, jobSubmitter, 
taskClass, context, isCancelled, input);
+        metadataBuilder
+                .jobClassName(taskClass.getName())
+                .targetNode(ignite.name());
+
+        return new TaskExecutionInternal<>(executorService, eventLog, 
jobSubmitter, taskClass, context, isCancelled, metadataBuilder, arg);
     }
 
     @Override
     public void start() {
         stateMachine.start();
         IgniteThreadFactory threadFactory = 
IgniteThreadFactory.create(ignite.name(), "compute", LOG, STORAGE_READ, 
STORAGE_WRITE);
-        executorService = new PriorityQueueExecutor(configuration, 
threadFactory, stateMachine, eventLog, ignite.name());
+        executorService = new PriorityQueueExecutor(configuration, 
threadFactory, stateMachine, eventLog);
     }
 
     @Override
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
index 6a0b0dc3a13..c8f82bd4a52 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
@@ -31,6 +31,7 @@ import 
org.apache.ignite.internal.compute.events.ComputeEventMetadata;
 import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
 import org.apache.ignite.internal.compute.state.ComputeStateMachine;
 import org.apache.ignite.internal.eventlog.api.EventLog;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Compute job executor with priority mechanism.
@@ -42,26 +43,21 @@ public class PriorityQueueExecutor {
 
     private final EventLog eventLog;
 
-    private final String nodeName;
-
     /**
      * Constructor.
      *
      * @param configuration Compute configuration.
      * @param threadFactory Thread factory.
      * @param eventLog Event log.
-     * @param nodeName Local node name.
      */
     public PriorityQueueExecutor(
             ComputeConfiguration configuration,
             ThreadFactory threadFactory,
             ComputeStateMachine stateMachine,
-            EventLog eventLog,
-            String nodeName
+            EventLog eventLog
     ) {
         this.stateMachine = stateMachine;
         this.eventLog = eventLog;
-        this.nodeName = nodeName;
 
         BlockingQueue<Runnable> workQueue = new 
BoundedPriorityBlockingQueue<>(() -> configuration.queueMaxSize().value());
         executor = new ComputeThreadPoolExecutor(
@@ -84,7 +80,7 @@ public class PriorityQueueExecutor {
      * @return Completable future which will be finished when compute job 
finished.
      */
     public <R> QueueExecution<R> submit(Callable<CompletableFuture<R>> job, 
int priority, int maxRetries) {
-        return submit(job, priority, maxRetries, 
ComputeEventMetadata.builder());
+        return submit(job, priority, maxRetries, null);
     }
 
     /**
@@ -94,27 +90,25 @@ public class PriorityQueueExecutor {
      * @param job Execute job callable.
      * @param priority Job priority.
      * @param maxRetries Number of retries of the execution after failure, 
{@code 0} means the execution will not be retried.
-     * @param metadataBuilder Event metadata builder.
+     * @param metadataBuilder Event metadata builder. No events will be logged 
if it's {@code null}.
      * @return Completable future which will be finished when compute job 
finished.
      */
     public <R> QueueExecution<R> submit(
             Callable<CompletableFuture<R>> job,
             int priority,
             int maxRetries,
-            ComputeEventMetadataBuilder metadataBuilder
+            @Nullable ComputeEventMetadataBuilder metadataBuilder
     ) {
         Objects.requireNonNull(job);
 
         UUID jobId = stateMachine.initJob();
 
         // Job ID could be set previously, if this is a remotely initiated 
execution, see ComputeJobFailover.
-        if (metadataBuilder.jobId() == null) {
+        if (metadataBuilder != null && metadataBuilder.jobId() == null) {
             metadataBuilder.jobId(jobId);
         }
 
-        ComputeEventMetadata eventMetadata = metadataBuilder
-                .targetNode(nodeName)
-                .build();
+        ComputeEventMetadata eventMetadata = metadataBuilder != null ? 
metadataBuilder.build() : null;
         logJobQueuedEvent(eventLog, eventMetadata);
 
         QueueExecutionImpl<R> execution = new QueueExecutionImpl<>(jobId, job, 
priority, executor, stateMachine, eventLog, eventMetadata);
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
index 1173b4faf63..1e072a8f60c 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
@@ -55,7 +55,7 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
     private final ComputeThreadPoolExecutor executor;
     private final ComputeStateMachine stateMachine;
     private final EventLog eventLog;
-    private final ComputeEventMetadata eventMetadata;
+    private final @Nullable ComputeEventMetadata eventMetadata;
 
     private final CompletableFuture<R> result = new CompletableFuture<>();
 
@@ -85,7 +85,7 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
             ComputeThreadPoolExecutor executor,
             ComputeStateMachine stateMachine,
             EventLog eventLog,
-            ComputeEventMetadata eventMetadata
+            @Nullable ComputeEventMetadata eventMetadata
     ) {
         this.jobId = jobId;
         this.job = job;
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java
index 8532554ab19..8ee8d5eb9a7 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.task.MapReduceJob;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
 import org.apache.ignite.lang.CancellationToken;
 
 /**
@@ -32,7 +33,12 @@ public interface JobSubmitter<T, R> {
      * Submits compute jobs for an execution.
      *
      * @param computeJobRunners List of the compute job start parameters.
+     * @param metadataBuilder Compute event metadata builder.
      * @param cancellationToken Cancellation token.
      */
-    CompletableFuture<List<JobExecution<R>>> submit(List<MapReduceJob<T, R>> 
computeJobRunners, CancellationToken cancellationToken);
+    CompletableFuture<List<JobExecution<R>>> submit(
+            List<MapReduceJob<T, R>> computeJobRunners,
+            ComputeEventMetadataBuilder metadataBuilder,
+            CancellationToken cancellationToken
+    );
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
index f1b4c8558ae..88a0e00f0dd 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
@@ -27,6 +27,12 @@ import static org.apache.ignite.compute.TaskStatus.FAILED;
 import static 
org.apache.ignite.internal.compute.ComputeUtils.getTaskSplitArgumentType;
 import static org.apache.ignite.internal.compute.ComputeUtils.instantiateTask;
 import static 
org.apache.ignite.internal.compute.ComputeUtils.unmarshalOrNotIfNull;
+import static 
org.apache.ignite.internal.compute.events.ComputeEventsFactory.logEvent;
+import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_CANCELED;
+import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_COMPLETED;
+import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_EXECUTING;
+import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_FAILED;
+import static 
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_QUEUED;
 import static 
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
 import static org.apache.ignite.internal.util.ArrayUtils.concat;
 import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
@@ -46,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.compute.JobExecution;
 import org.apache.ignite.compute.JobState;
+import org.apache.ignite.compute.JobStatus;
 import org.apache.ignite.compute.TaskState;
 import org.apache.ignite.compute.TaskStatus;
 import org.apache.ignite.compute.task.MapReduceJob;
@@ -56,8 +63,12 @@ import 
org.apache.ignite.internal.compute.HybridTimestampProvider;
 import org.apache.ignite.internal.compute.MarshallerProvider;
 import org.apache.ignite.internal.compute.ResultUnmarshallingJobExecution;
 import org.apache.ignite.internal.compute.TaskStateImpl;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
 import org.apache.ignite.internal.compute.queue.PriorityQueueExecutor;
 import org.apache.ignite.internal.compute.queue.QueueExecution;
+import org.apache.ignite.internal.eventlog.api.EventLog;
+import org.apache.ignite.internal.eventlog.api.IgniteEventType;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -89,32 +100,50 @@ public class TaskExecutionInternal<I, M, T, R> implements 
CancellableTaskExecuti
 
     private final CancelHandle cancelHandle = CancelHandle.create();
 
+    private final EventLog eventLog;
+
     private final AtomicBoolean isCancelled;
 
+    private final UUID taskId = UUID.randomUUID();
+
+    private final ComputeEventMetadata eventMetadata;
+
     private volatile @Nullable Marshaller<R, byte[]> reduceResultMarshallerRef;
 
     /**
      * Construct an execution object and starts executing.
      *
      * @param executorService Compute jobs executor.
+     * @param eventLog Event log.
      * @param jobSubmitter Compute jobs submitter.
      * @param taskClass Map reduce task class.
      * @param context Task execution context.
      * @param isCancelled Flag which is passed to the execution context so 
that the task can check it for cancellation request.
+     * @param metadataBuilder Event metadata builder.
      * @param arg Task argument.
      */
     public TaskExecutionInternal(
             PriorityQueueExecutor executorService,
+            EventLog eventLog,
             JobSubmitter<M, T> jobSubmitter,
             Class<? extends MapReduceTask<I, M, T, R>> taskClass,
             TaskExecutionContext context,
             AtomicBoolean isCancelled,
+            ComputeEventMetadataBuilder metadataBuilder,
             I arg
     ) {
+        this.eventLog = eventLog;
         this.isCancelled = isCancelled;
+        eventMetadata = metadataBuilder.taskId(taskId).build();
+
         LOG.debug("Executing task {}", taskClass.getName());
+
+        logEvent(eventLog, COMPUTE_TASK_QUEUED, eventMetadata);
+
         splitExecution = executorService.submit(
                 () -> {
+                    logEvent(eventLog, COMPUTE_TASK_EXECUTING, eventMetadata);
+
                     MapReduceTask<I, M, T, R> task = 
instantiateTask(taskClass);
 
                     reduceResultMarshallerRef = 
task.reduceJobResultMarshaller();
@@ -131,7 +160,7 @@ public class TaskExecutionInternal<I, M, T, R> implements 
CancellableTaskExecuti
         executionsFuture = 
splitExecution.resultAsync().thenCompose(splitResult -> {
             List<MapReduceJob<M, T>> runners = splitResult.runners();
             LOG.debug("Submitting {} jobs for {}", runners.size(), 
taskClass.getName());
-            return jobSubmitter.submit(runners, cancelHandle.token());
+            return jobSubmitter.submit(runners, metadataBuilder, 
cancelHandle.token());
         });
 
         resultsFuture = 
executionsFuture.thenCompose(TaskExecutionInternal::resultsAsync);
@@ -150,26 +179,50 @@ public class TaskExecutionInternal<I, M, T, R> implements 
CancellableTaskExecuti
                     Integer.MAX_VALUE,
                     0
             );
-        }).whenComplete(this::captureReduceFailure);
+        }).whenComplete(this::captureReduceExecution);
     }
 
-    private void captureReduceFailure(QueueExecution<R> reduceExecution, 
Throwable throwable) {
+    private void captureReduceExecution(QueueExecution<R> reduceExecution, 
Throwable throwable) {
         if (throwable != null) {
-            // Capture the reduce execution failure reason and time.
-            TaskStatus status = isCancelled.get() ? CANCELED : FAILED;
-
-            JobState state = splitExecution.state();
-            if (state != null) {
-                reduceFailedState.set(
-                        TaskStateImpl.toBuilder(state)
-                                .status(status)
-                                .finishTime(Instant.now())
-                                .build()
-                );
-            }
+            captureReduceSubmitFailure(throwable);
+        } else {
+            handleReduceResult(reduceExecution);
         }
     }
 
+    private void captureReduceSubmitFailure(Throwable throwable) {
+        // Capture the reduce submit failure reason and time.
+        TaskStatus status = isCancelled.get() ? CANCELED : FAILED;
+
+        logEvent(eventLog, status == CANCELED ? COMPUTE_TASK_CANCELED : 
COMPUTE_TASK_FAILED, eventMetadata);
+
+        JobState state = splitExecution.state();
+        if (state != null) {
+            reduceFailedState.set(
+                    TaskStateImpl.toBuilder(state)
+                            .id(taskId)
+                            .status(status)
+                            .finishTime(Instant.now())
+                            .build()
+            );
+        }
+    }
+
+    private void handleReduceResult(QueueExecution<R> reduceExecution) {
+        reduceExecution.resultAsync().whenComplete((result, throwable) -> {
+            if (result != null) {
+                logEvent(eventLog, COMPUTE_TASK_COMPLETED, eventMetadata);
+            } else {
+                JobState reduceState = reduceExecution.state();
+                // The state should never be null since it was just submitted, 
but check just in case.
+                if (reduceState != null) {
+                    IgniteEventType type = reduceState.status() == 
JobStatus.FAILED ? COMPUTE_TASK_FAILED : COMPUTE_TASK_CANCELED;
+                    logEvent(eventLog, type, eventMetadata);
+                }
+            }
+        });
+    }
+
     @Override
     public CompletableFuture<R> resultAsync() {
         return reduceExecutionFuture.thenCompose(QueueExecution::resultAsync);
@@ -184,7 +237,7 @@ public class TaskExecutionInternal<I, M, T, R> implements 
CancellableTaskExecuti
         }
 
         if (splitState.status() != COMPLETED) {
-            return 
completedFuture(TaskStateImpl.toBuilder(splitState).build());
+            return 
completedFuture(TaskStateImpl.toBuilder(splitState).id(taskId).build());
         }
 
         // This future is complete when reduce execution job is submitted, 
return status from it.
@@ -196,7 +249,7 @@ public class TaskExecutionInternal<I, M, T, R> implements 
CancellableTaskExecuti
                         return null;
                     }
                     return TaskStateImpl.toBuilder(reduceState)
-                            .id(splitState.id())
+                            .id(taskId)
                             .createTime(splitState.createTime())
                             .startTime(splitState.startTime())
                             .build();
@@ -207,6 +260,7 @@ public class TaskExecutionInternal<I, M, T, R> implements 
CancellableTaskExecuti
 
         // At this point split is complete but reduce job is not submitted yet.
         return completedFuture(TaskStateImpl.toBuilder(splitState)
+                .id(taskId)
                 .status(EXECUTING)
                 .finishTime(null)
                 .build());
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
index c695c57aeec..095808a6f17 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
@@ -614,8 +614,7 @@ public class PriorityQueueExecutorTest extends 
BaseIgniteAbstractTest {
                 configuration,
                 IgniteThreadFactory.create(nodeName, "compute", LOG),
                 new InMemoryComputeStateMachine(configuration, nodeName),
-                EventLog.NOOP,
-                nodeName
+                EventLog.NOOP
         );
     }
 
diff --git 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEventType.java
 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEventType.java
index 89be27c2d37..fdb115331ba 100644
--- 
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEventType.java
+++ 
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEventType.java
@@ -44,6 +44,12 @@ public enum IgniteEventType {
     COMPUTE_JOB_CANCELING,
     COMPUTE_JOB_CANCELED,
 
+    COMPUTE_TASK_QUEUED,
+    COMPUTE_TASK_EXECUTING,
+    COMPUTE_TASK_FAILED,
+    COMPUTE_TASK_COMPLETED,
+    COMPUTE_TASK_CANCELED,
+
     QUERY_STARTED,
     QUERY_FINISHED;
 

Reply via email to