This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0685356531e IGNITE-26118 Add compute task events for map reduce tasks
(#6491)
0685356531e is described below
commit 0685356531e243d8e1a219e13ea6f98e8095501e
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Fri Aug 29 11:30:07 2025 +0300
IGNITE-26118 Add compute task events for map reduce tasks (#6491)
---
.../handler/ClientInboundMessageHandler.java | 10 ++-
.../ClientComputeExecuteMapReduceRequest.java | 18 ++++-
.../apache/ignite/client/fakes/FakeCompute.java | 12 ++-
.../ignite/internal/compute/ItMapReduceTest.java | 6 +-
.../internal/compute/events/EventMatcher.java | 32 ++++----
.../events/ItComputeEventsEmbeddedTest.java | 62 +++++++++++++++
.../compute/events/ItComputeEventsTest.java | 90 ++++++++++++++++++++--
.../internal/compute/utils/InteractiveTasks.java | 32 +++++---
...MapReduce.java => FailingJobMapReduceTask.java} | 34 +++-----
...Reduce.java => FailingReduceMapReduceTask.java} | 33 +++-----
.../compute/FailingSplitMapReduceTask.java} | 33 ++++----
.../apache/ignite/internal/compute/MapReduce.java | 3 +-
.../ignite/internal/compute/ComputeComponent.java | 1 +
.../internal/compute/ComputeComponentImpl.java | 14 +++-
.../ignite/internal/compute/IgniteComputeImpl.java | 16 +++-
.../internal/compute/IgniteComputeInternal.java | 21 +++++
.../compute/events/ComputeEventMetadata.java | 11 +--
.../events/ComputeEventMetadataBuilder.java | 33 ++++++--
.../compute/events/ComputeEventsFactory.java | 32 +++++---
.../internal/compute/executor/ComputeExecutor.java | 1 +
.../compute/executor/ComputeExecutorImpl.java | 15 +++-
.../compute/queue/PriorityQueueExecutor.java | 20 ++---
.../internal/compute/queue/QueueExecutionImpl.java | 4 +-
.../ignite/internal/compute/task/JobSubmitter.java | 8 +-
.../compute/task/TaskExecutionInternal.java | 88 +++++++++++++++++----
.../compute/queue/PriorityQueueExecutorTest.java | 3 +-
.../internal/eventlog/api/IgniteEventType.java | 6 ++
27 files changed, 470 insertions(+), 168 deletions(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 75401189935..672ee1333b5 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -930,7 +930,7 @@ public class ClientInboundMessageHandler
);
case ClientOp.COMPUTE_EXECUTE_MAPREDUCE:
- return ClientComputeExecuteMapReduceRequest.process(in,
compute, notificationSender(requestId));
+ return ClientComputeExecuteMapReduceRequest.process(in,
compute, notificationSender(requestId), clientContext);
case ClientOp.COMPUTE_GET_STATE:
return ClientComputeGetStateRequest.process(in, compute);
@@ -1324,8 +1324,12 @@ public class ClientInboundMessageHandler
private class ComputeConnection implements PlatformComputeConnection {
@Override
- public CompletableFuture<ComputeJobDataHolder> executeJobAsync(long
jobId, List<String> deploymentUnitPaths, String jobClassName,
- ComputeJobDataHolder arg) {
+ public CompletableFuture<ComputeJobDataHolder> executeJobAsync(
+ long jobId,
+ List<String> deploymentUnitPaths,
+ String jobClassName,
+ @Nullable ComputeJobDataHolder arg
+ ) {
return sendServerToClientRequest(ServerOp.COMPUTE_JOB_EXEC,
packer -> {
packer.packLong(jobId);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
index 7b29b10842a..ccecbf120bc 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/compute/ClientComputeExecuteMapReduceRequest.java
@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.handler.ClientContext;
import org.apache.ignite.client.handler.NotificationSender;
import org.apache.ignite.client.handler.ResponseWriter;
import org.apache.ignite.compute.JobState;
@@ -39,6 +40,9 @@ import
org.apache.ignite.internal.compute.ComputeJobDataHolder;
import org.apache.ignite.internal.compute.HybridTimestampProvider;
import org.apache.ignite.internal.compute.IgniteComputeInternal;
import org.apache.ignite.internal.compute.MarshallerProvider;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.marshalling.Marshaller;
@@ -55,18 +59,26 @@ public class ClientComputeExecuteMapReduceRequest {
* @param in Unpacker.
* @param compute Compute.
* @param notificationSender Notification sender.
+ * @param clientContext Client context.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
ClientMessageUnpacker in,
IgniteComputeInternal compute,
- NotificationSender notificationSender) {
+ NotificationSender notificationSender,
+ ClientContext clientContext
+ ) {
List<DeploymentUnit> deploymentUnits = in.unpackDeploymentUnits();
String taskClassName = in.unpackString();
ComputeJobDataHolder arg = unpackJobArgumentWithoutMarshaller(in);
- TaskExecution<Object> execution = compute.submitMapReduce(
-
TaskDescriptor.builder(taskClassName).units(deploymentUnits).build(), arg);
+ TaskDescriptor<Object, Object> taskDescriptor =
TaskDescriptor.builder(taskClassName).units(deploymentUnits).build();
+
+ ComputeEventMetadataBuilder metadataBuilder =
ComputeEventMetadata.builder(Type.MAP_REDUCE)
+ .eventUser(clientContext.userDetails())
+ .clientAddress(clientContext.remoteAddress().toString());
+
+ TaskExecution<Object> execution =
compute.submitMapReduceInternal(taskDescriptor, metadataBuilder, arg, null);
sendTaskResult(execution, notificationSender);
var idsAsync = execution.idsAsync()
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index f9b090ce4a2..934085e4736 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -180,6 +180,16 @@ public class FakeCompute implements IgniteComputeInternal {
return nullCompletedFuture();
}
+ @Override
+ public <T, R> TaskExecution<R> submitMapReduceInternal(
+ TaskDescriptor<T, R> taskDescriptor,
+ ComputeEventMetadataBuilder metadataBuilder,
+ @Nullable T arg,
+ @Nullable CancellationToken cancellationToken
+ ) {
+ return taskExecution(future != null ? future : completedFuture((R)
nodeName));
+ }
+
@Override
public <T, R> CompletableFuture<JobExecution<R>> submitAsync(
JobTarget target,
@@ -272,7 +282,7 @@ public class FakeCompute implements IgniteComputeInternal {
@Nullable T arg,
@Nullable CancellationToken cancellationToken
) {
- return taskExecution(future != null ? future : completedFuture((R)
nodeName));
+ return submitMapReduceInternal(taskDescriptor, null, arg,
cancellationToken);
}
@Override
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
index dafcdef6771..a1611552ab8 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItMapReduceTest.java
@@ -78,7 +78,7 @@ class ItMapReduceTest extends ClusterPerClassIntegrationTest {
TaskExecution<List<String>> taskExecution =
igniteCompute.submitMapReduce(
TaskDescriptor.<Object,
List<String>>builder(InteractiveTasks.GlobalApi.name()).build(), null);
assertTaskExecuting(taskExecution);
- InteractiveTasks.GlobalApi.assertAlive();
+ InteractiveTasks.GlobalApi.assertSplitAlive();
// Save state before split.
TaskState stateBeforeSplit = taskExecution.stateAsync().join();
@@ -249,7 +249,7 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
InteractiveJobs.all().finishReturnWorkerNames();
// Wait for the reduce job to start.
- InteractiveTasks.GlobalApi.assertAlive();
+ InteractiveTasks.GlobalApi.assertReduceAlive();
// When cancel the task.
assertThat(cancelHandle.cancelAsync(), willCompleteSuccessfully());
@@ -274,7 +274,7 @@ class ItMapReduceTest extends
ClusterPerClassIntegrationTest {
TaskDescriptor.<String,
List<String>>builder(InteractiveTasks.GlobalApi.name()).build(), arg,
cancellationToken
);
assertTaskExecuting(taskExecution);
- InteractiveTasks.GlobalApi.assertAlive();
+ InteractiveTasks.GlobalApi.assertSplitAlive();
return taskExecution;
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/EventMatcher.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/EventMatcher.java
index 650bf0c115d..f83a3d7460f 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/EventMatcher.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/EventMatcher.java
@@ -87,7 +87,7 @@ public class EventMatcher extends TypeSafeMatcher<String> {
.withClassName(jobClassName)
.withJobId(jobId)
.withTargetNode(targetNode)
- .withInitiatorNode(initiatorNode)
+ .withInitiatorNode(is(initiatorNode))
.withClientAddress(nullValue());
}
@@ -115,57 +115,61 @@ public class EventMatcher extends TypeSafeMatcher<String>
{
.withClientAddress(notNullValue(String.class));
}
- EventMatcher withTimestamp(Matcher<? super Long> matcher) {
+ public EventMatcher withTimestamp(Matcher<? super Long> matcher) {
this.timestampMatcher = matcher;
return this;
}
- EventMatcher withProductVersion(Matcher<? super String> matcher) {
+ public EventMatcher withProductVersion(Matcher<? super String> matcher) {
this.productVersionMatcher = matcher;
return this;
}
- EventMatcher withUsername(Matcher<? super String> matcher) {
+ public EventMatcher withUsername(Matcher<? super String> matcher) {
this.usernameMatcher = matcher;
return this;
}
- EventMatcher withType(String type) {
+ public EventMatcher withType(String type) {
this.typeMatcher = is(type);
return this;
}
- EventMatcher withClassName(String className) {
+ public EventMatcher withClassName(String className) {
this.classNameMatcher = is(className);
return this;
}
- EventMatcher withTableName(String tableName) {
+ public EventMatcher withTableName(String tableName) {
this.tableNameMatcher = is(tableName);
return this;
}
- EventMatcher withJobId(@Nullable UUID jobId) {
+ public EventMatcher withJobId(@Nullable UUID jobId) {
this.jobIdMatcher = is(jobId);
return this;
}
- EventMatcher withTaskId(@Nullable UUID taskId) {
+ public EventMatcher withTaskId(@Nullable UUID taskId) {
this.taskIdMatcher = is(taskId);
return this;
}
- EventMatcher withTargetNode(String targetNode) {
- this.targetNodeMatcher = is(targetNode);
+ public EventMatcher withTargetNode(String targetNode) {
+ return withTargetNode(is(targetNode));
+ }
+
+ public EventMatcher withTargetNode(Matcher<? super String>
targetNodeMatcher) {
+ this.targetNodeMatcher = targetNodeMatcher;
return this;
}
- EventMatcher withInitiatorNode(String initiatorNode) {
- this.initiatorNodeMatcher = is(initiatorNode);
+ public EventMatcher withInitiatorNode(Matcher<? super String>
initiatorNodeMatcher) {
+ this.initiatorNodeMatcher = initiatorNodeMatcher;
return this;
}
- EventMatcher withClientAddress(Matcher<? super String>
clientAddressMatcher) {
+ public EventMatcher withClientAddress(Matcher<? super String>
clientAddressMatcher) {
this.clientAddressMatcher = clientAddressMatcher;
return this;
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsEmbeddedTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsEmbeddedTest.java
index d9a7dd199b4..d00866e456a 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsEmbeddedTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsEmbeddedTest.java
@@ -18,12 +18,25 @@
package org.apache.ignite.internal.compute.events;
import static
org.apache.ignite.internal.compute.events.EventMatcher.embeddedJobEvent;
+import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_CANCELED;
+import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_EXECUTING;
+import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_QUEUED;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.Matchers.containsInRelativeOrder;
+import java.util.List;
import java.util.UUID;
import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.compute.TaskDescriptor;
+import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
+import org.apache.ignite.internal.compute.utils.InteractiveJobs;
+import org.apache.ignite.internal.compute.utils.InteractiveTasks;
import org.apache.ignite.internal.eventlog.api.IgniteEventType;
+import org.apache.ignite.lang.CancelHandle;
+import org.apache.ignite.lang.CancellationToken;
import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.Test;
class ItComputeEventsEmbeddedTest extends ItComputeEventsTest {
@Override
@@ -35,4 +48,53 @@ class ItComputeEventsEmbeddedTest extends
ItComputeEventsTest {
protected EventMatcher jobEvent(IgniteEventType eventType, Type jobType,
@Nullable UUID jobId, String jobClassName, String targetNode) {
return embeddedJobEvent(eventType, jobType, jobId, jobClassName,
targetNode, node(0).name());
}
+
+ // Cancel tests are hard to implement without global signals, let's test
them in the embedded mode only.
+ @Test
+ void taskSplitCanceled() {
+ CancelHandle cancelHandle = CancelHandle.create();
+ TaskExecution<List<String>> execution =
startTask(cancelHandle.token());
+
+ cancelHandle.cancel();
+
+ UUID taskId = execution.idAsync().join(); // Safe to join since
execution is complete.
+
+ assertEvents(
+ taskEvent(COMPUTE_TASK_QUEUED,
InteractiveTasks.GlobalApi.name(), taskId),
+ taskEvent(COMPUTE_TASK_EXECUTING,
InteractiveTasks.GlobalApi.name(), taskId),
+ taskEvent(COMPUTE_TASK_CANCELED,
InteractiveTasks.GlobalApi.name(), taskId)
+ );
+ }
+
+ @Test
+ void taskReduceCanceled() throws InterruptedException {
+ CancelHandle cancelHandle = CancelHandle.create();
+ TaskExecution<List<String>> execution =
startTask(cancelHandle.token());
+ InteractiveTasks.GlobalApi.finishSplit();
+ InteractiveJobs.all().finishReturnWorkerNames();
+ InteractiveTasks.GlobalApi.assertReduceAlive();
+
+ cancelHandle.cancel();
+
+ UUID taskId = execution.idAsync().join(); // Safe to join since
execution is complete.
+
+ // There are a lot of job events, skip them
+ await().until(logInspector::events, containsInRelativeOrder(
+ taskEvent(COMPUTE_TASK_QUEUED,
InteractiveTasks.GlobalApi.name(), taskId),
+ taskEvent(COMPUTE_TASK_EXECUTING,
InteractiveTasks.GlobalApi.name(), taskId),
+ taskEvent(COMPUTE_TASK_CANCELED,
InteractiveTasks.GlobalApi.name(), taskId)
+ ));
+ }
+
+ private TaskExecution<List<String>> startTask(@Nullable CancellationToken
cancellationToken) {
+ TaskExecution<List<String>> taskExecution = compute().submitMapReduce(
+ TaskDescriptor.<String,
List<String>>builder(InteractiveTasks.GlobalApi.name()).build(), null,
cancellationToken
+ );
+ try {
+ InteractiveTasks.GlobalApi.assertSplitAlive();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return taskExecution;
+ }
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsTest.java
index c9419d69c74..4fe7341986c 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.compute.events;
import static org.apache.ignite.compute.JobStatus.CANCELED;
import static org.apache.ignite.compute.JobStatus.EXECUTING;
import static
org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type.BROADCAST;
+import static
org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type.MAP_REDUCE;
import static
org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type.SINGLE;
import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_CANCELED;
import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_CANCELING;
@@ -27,6 +28,10 @@ import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JO
import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_EXECUTING;
import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_FAILED;
import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_QUEUED;
+import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_COMPLETED;
+import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_EXECUTING;
+import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_FAILED;
+import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_QUEUED;
import static org.apache.ignite.internal.eventlog.api.IgniteEventType.values;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -37,15 +42,19 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInRelativeOrder;
import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
+import org.apache.ignite.Ignite;
import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.compute.BroadcastExecution;
import org.apache.ignite.compute.BroadcastJobTarget;
@@ -54,13 +63,22 @@ import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobTarget;
+import org.apache.ignite.compute.TaskDescriptor;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
import org.apache.ignite.internal.ConfigOverride;
import org.apache.ignite.internal.compute.FailingJob;
+import org.apache.ignite.internal.compute.FailingJobMapReduceTask;
+import org.apache.ignite.internal.compute.FailingReduceMapReduceTask;
+import org.apache.ignite.internal.compute.FailingSplitMapReduceTask;
import org.apache.ignite.internal.compute.GetNodeNameJob;
+import org.apache.ignite.internal.compute.MapReduce;
import org.apache.ignite.internal.compute.SilentSleepJob;
import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
import org.apache.ignite.internal.compute.events.EventMatcher.Event;
+import org.apache.ignite.internal.compute.utils.InteractiveJobs;
+import org.apache.ignite.internal.compute.utils.InteractiveTasks;
import org.apache.ignite.internal.eventlog.api.IgniteEventType;
import org.apache.ignite.internal.testframework.log4j2.EventLogInspector;
import org.apache.ignite.lang.CancelHandle;
@@ -78,15 +96,19 @@ import org.junit.jupiter.params.provider.ValueSource;
@ConfigOverride(name = "ignite.compute.threadPoolSize", value = "1")
abstract class ItComputeEventsTest extends ClusterPerClassIntegrationTest {
- private final EventLogInspector logInspector = new EventLogInspector();
+ final EventLogInspector logInspector = new EventLogInspector();
@BeforeEach
- void startLogInspector() {
+ void setUp() {
+ InteractiveJobs.clearState();
+ InteractiveTasks.clearState();
+
InteractiveJobs.initChannels(CLUSTER.runningNodes().map(Ignite::name).collect(Collectors.toList()));
+
logInspector.start();
}
@AfterEach
- void afterEach() {
+ void tearDown() {
logInspector.stop();
dropAllTables();
}
@@ -95,7 +117,7 @@ abstract class ItComputeEventsTest extends
ClusterPerClassIntegrationTest {
protected void configureInitParameters(InitParametersBuilder builder) {
String allEvents = Arrays.stream(values())
.map(IgniteEventType::name)
- .filter(name -> name.startsWith("COMPUTE_JOB"))
+ .filter(name -> name.startsWith("COMPUTE"))
.collect(Collectors.joining(", ", "[", "]"));
builder.clusterConfiguration("ignite.eventlog {"
@@ -318,6 +340,53 @@ abstract class ItComputeEventsTest extends
ClusterPerClassIntegrationTest {
);
}
+ @Test
+ void taskCompleted() {
+ TaskExecution<Integer> execution =
compute().submitMapReduce(TaskDescriptor.builder(MapReduce.class).build(),
null);
+
+ assertThat(execution.resultAsync(), willCompleteSuccessfully());
+
+ UUID taskId = execution.idAsync().join(); // Safe to join since
execution is complete.
+ String taskClassName = MapReduce.class.getName();
+
+ List<UUID> jobIds = execution.idsAsync().join();
+ UUID firstJobId = jobIds.get(0);
+ List<String> nodeNames =
CLUSTER.runningNodes().map(Ignite::name).collect(Collectors.toList());
+
+ await().until(logInspector::events, hasSize(3 + 3 * 3)); // 3 task
events and 3 job events per node
+ assertThat(logInspector.events(), containsInRelativeOrder(
+ taskEvent(COMPUTE_TASK_QUEUED, taskClassName, taskId),
+ taskEvent(COMPUTE_TASK_EXECUTING, taskClassName, taskId),
+ taskJobEvent(COMPUTE_JOB_QUEUED, firstJobId, in(nodeNames)),
+ taskJobEvent(COMPUTE_JOB_EXECUTING, firstJobId, in(nodeNames)),
+ taskJobEvent(COMPUTE_JOB_COMPLETED, firstJobId, in(nodeNames)),
+ taskEvent(COMPUTE_TASK_COMPLETED, taskClassName, taskId)
+ ));
+
+ jobIds.forEach(jobId -> assertThat(logInspector.events(),
containsInRelativeOrder(
+ taskJobEvent(COMPUTE_JOB_QUEUED, jobId, in(nodeNames)),
+ taskJobEvent(COMPUTE_JOB_EXECUTING, jobId, in(nodeNames)),
+ taskJobEvent(COMPUTE_JOB_COMPLETED, jobId, in(nodeNames))
+ )));
+ }
+
+ @ParameterizedTest
+ @ValueSource(classes = {FailingSplitMapReduceTask.class,
FailingJobMapReduceTask.class, FailingReduceMapReduceTask.class})
+ void taskFailed(Class<? extends MapReduceTask<Void, Void, Void, Void>>
mapReduceClass) {
+ TaskExecution<Void> execution =
compute().submitMapReduce(TaskDescriptor.builder(mapReduceClass).build(), null);
+
+ assertThat(execution.resultAsync(), willThrow(ComputeException.class));
+
+ UUID taskId = execution.idAsync().join(); // Safe to join since
execution is complete.
+
+ // Skip checking possible job events.
+ await().until(logInspector::events, containsInRelativeOrder(
+ taskEvent(COMPUTE_TASK_QUEUED, mapReduceClass.getName(),
taskId),
+ taskEvent(COMPUTE_TASK_EXECUTING, mapReduceClass.getName(),
taskId),
+ taskEvent(COMPUTE_TASK_FAILED, mapReduceClass.getName(),
taskId)
+ ));
+ }
+
private <T, R> JobExecution<R> submit(JobTarget target, JobDescriptor<T,
R> descriptor, @Nullable T arg) {
return submit(target, descriptor, null, arg);
}
@@ -370,8 +439,19 @@ abstract class ItComputeEventsTest extends
ClusterPerClassIntegrationTest {
String targetNode
);
+ EventMatcher taskEvent(IgniteEventType eventType, String taskClassName,
@Nullable UUID taskId) {
+ return jobEvent(eventType, MAP_REDUCE, null, taskClassName,
node(0).name())
+ .withTaskId(taskId)
+ .withInitiatorNode(nullValue());
+ }
+
+ private EventMatcher taskJobEvent(IgniteEventType eventType, @Nullable
UUID jobId, Matcher<? super String> targetNodeMatcher) {
+ return jobEvent(eventType, MAP_REDUCE, jobId,
GetNodeNameJob.class.getName(), "")
+ .withTargetNode(targetNodeMatcher);
+ }
+
@SafeVarargs
- private void assertEvents(Matcher<String>... matchers) {
+ final void assertEvents(Matcher<String>... matchers) {
await().until(logInspector::events, contains(matchers));
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
index 7768d06e637..f690d6e1e0e 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/InteractiveTasks.java
@@ -45,7 +45,8 @@ import org.apache.ignite.compute.task.TaskExecutionContext;
*/
public final class InteractiveTasks {
/**
- * ACK for {@link Signal#CONTINUE}. Returned by a task that has received
the signal. Used to check that the task is alive.
+ * ACK for {@link Signal#CONTINUE_REDUCE} and {@link
Signal#CONTINUE_REDUCE}. Returned by a task that has received the signal. Used
to
+ * check that the task is alive.
*/
private static final Object ACK = new Object();
@@ -62,8 +63,8 @@ public final class InteractiveTasks {
private static final BlockingQueue<Object> GLOBAL_CHANNEL = new
LinkedBlockingQueue<>();
/**
- * This counter indicated how many {@link
GlobalInteractiveMapReduceTask#splitAsync(TaskExecutionContext, Object...)}
methods are
- * running now. This counter increased each time the {@link
GlobalInteractiveMapReduceTask#splitAsync(TaskExecutionContext, Object...)}
+ * This counter indicated how many {@link
GlobalInteractiveMapReduceTask#splitAsync(TaskExecutionContext, String)}
methods are
+ * running now. This counter increased each time the {@link
GlobalInteractiveMapReduceTask#splitAsync(TaskExecutionContext, String)}
* is called and decreased when the method is finished (whatever the
result is). Checked in {@link #clearState}.
*/
private static final AtomicInteger RUNNING_GLOBAL_SPLIT_CNT = new
AtomicInteger(0);
@@ -104,9 +105,14 @@ public final class InteractiveTasks {
*/
private enum Signal {
/**
- * Signal to the task to continue running and send ACK as a response.
+ * Signal to the task to continue running split and send ACK as a
response.
*/
- CONTINUE,
+ CONTINUE_SPLIT,
+
+ /**
+ * Signal to the task to continue running reduce and send ACK as a
response.
+ */
+ CONTINUE_REDUCE,
/**
* Ask task to throw an exception.
@@ -183,7 +189,7 @@ public final class InteractiveTasks {
switch (receivedSignal) {
case THROW:
throw new RuntimeException();
- case CONTINUE:
+ case CONTINUE_SPLIT:
GLOBAL_CHANNEL.offer(ACK);
break;
case SPLIT_RETURN_ALL_NODES:
@@ -216,7 +222,7 @@ public final class InteractiveTasks {
switch (receivedSignal) {
case THROW:
throw new RuntimeException();
- case CONTINUE:
+ case CONTINUE_REDUCE:
GLOBAL_CHANNEL.offer(ACK);
break;
case REDUCE_RETURN:
@@ -240,11 +246,19 @@ public final class InteractiveTasks {
* API for the interaction with {@link GlobalInteractiveMapReduceTask}.
*/
public static final class GlobalApi {
+ /**
+ * Checks that {@link GlobalInteractiveMapReduceTask} is alive and in
the split phase.
+ */
+ public static void assertSplitAlive() throws InterruptedException {
+ GLOBAL_SIGNALS.offer(Signal.CONTINUE_SPLIT);
+ assertThat(GLOBAL_CHANNEL.poll(WAIT_TIMEOUT_SECONDS,
TimeUnit.SECONDS), equalTo(ACK));
+ }
+
/**
* Checks that {@link GlobalInteractiveMapReduceTask} is alive.
*/
- public static void assertAlive() throws InterruptedException {
- GLOBAL_SIGNALS.offer(Signal.CONTINUE);
+ public static void assertReduceAlive() throws InterruptedException {
+ GLOBAL_SIGNALS.offer(Signal.CONTINUE_REDUCE);
assertThat(GLOBAL_CHANNEL.poll(WAIT_TIMEOUT_SECONDS,
TimeUnit.SECONDS), equalTo(ACK));
}
diff --git
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingJobMapReduceTask.java
similarity index 50%
copy from
modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
copy to
modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingJobMapReduceTask.java
index d33a93efe15..599dd2107fe 100644
---
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
+++
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingJobMapReduceTask.java
@@ -18,45 +18,31 @@
package org.apache.ignite.internal.compute;
import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.JobDescriptor;
-import org.apache.ignite.compute.JobExecutionOptions;
import org.apache.ignite.compute.task.MapReduceJob;
import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.compute.task.TaskExecutionContext;
-import org.apache.ignite.deployment.DeploymentUnit;
-/** Map reduce task which runs a {@link GetNodeNameJob} on each node and
computes a sum of length of all node names. */
-public class MapReduce implements MapReduceTask<List<DeploymentUnit>, Void,
String, Integer> {
+/** Map reduce task which submits failing jobs. */
+public class FailingJobMapReduceTask implements MapReduceTask<Void, Void,
String, Void> {
@Override
- public CompletableFuture<List<MapReduceJob<Void, String>>> splitAsync(
- TaskExecutionContext taskContext, List<DeploymentUnit>
deploymentUnits) {
- return taskContext.ignite().cluster().nodesAsync().thenApply(nodes ->
nodes.stream().map(node ->
+ public CompletableFuture<List<MapReduceJob<Void, String>>>
splitAsync(TaskExecutionContext taskContext, Void input) {
+ return completedFuture(List.of(
MapReduceJob.<Void, String>builder()
- .jobDescriptor(
- JobDescriptor.builder(GetNodeNameJob.class)
- .units(deploymentUnits)
- .options(JobExecutionOptions.builder()
- .maxRetries(10)
- .priority(Integer.MAX_VALUE)
- .build())
- .build())
- .nodes(Set.of(node))
+
.jobDescriptor(JobDescriptor.builder(FailingJob.class).build())
+ .nodes(taskContext.ignite().cluster().nodes())
.build()
- ).collect(toList()));
+ ));
}
@Override
- public CompletableFuture<Integer> reduceAsync(TaskExecutionContext
taskContext, Map<UUID, String> results) {
- return completedFuture(results.values().stream()
- .map(String::length)
- .reduce(Integer::sum)
- .orElseThrow());
+ public CompletableFuture<Void> reduceAsync(TaskExecutionContext
taskContext, Map<UUID, String> results) {
+ return nullCompletedFuture();
}
}
diff --git
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingReduceMapReduceTask.java
similarity index 50%
copy from
modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
copy to
modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingReduceMapReduceTask.java
index d33a93efe15..72021281281 100644
---
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
+++
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingReduceMapReduceTask.java
@@ -18,45 +18,30 @@
package org.apache.ignite.internal.compute;
import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.stream.Collectors.toList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.JobDescriptor;
-import org.apache.ignite.compute.JobExecutionOptions;
import org.apache.ignite.compute.task.MapReduceJob;
import org.apache.ignite.compute.task.MapReduceTask;
import org.apache.ignite.compute.task.TaskExecutionContext;
-import org.apache.ignite.deployment.DeploymentUnit;
-/** Map reduce task which runs a {@link GetNodeNameJob} on each node and
computes a sum of length of all node names. */
-public class MapReduce implements MapReduceTask<List<DeploymentUnit>, Void,
String, Integer> {
+/** Map reduce task that throws an exception from the reduceAsync. */
+public class FailingReduceMapReduceTask implements MapReduceTask<Void, Void,
String, Void> {
@Override
- public CompletableFuture<List<MapReduceJob<Void, String>>> splitAsync(
- TaskExecutionContext taskContext, List<DeploymentUnit>
deploymentUnits) {
- return taskContext.ignite().cluster().nodesAsync().thenApply(nodes ->
nodes.stream().map(node ->
+ public CompletableFuture<List<MapReduceJob<Void, String>>>
splitAsync(TaskExecutionContext taskContext, Void input) {
+ return completedFuture(List.of(
MapReduceJob.<Void, String>builder()
- .jobDescriptor(
- JobDescriptor.builder(GetNodeNameJob.class)
- .units(deploymentUnits)
- .options(JobExecutionOptions.builder()
- .maxRetries(10)
- .priority(Integer.MAX_VALUE)
- .build())
- .build())
- .nodes(Set.of(node))
+
.jobDescriptor(JobDescriptor.builder(GetNodeNameJob.class).build())
+ .nodes(taskContext.ignite().cluster().nodes())
.build()
- ).collect(toList()));
+ ));
}
@Override
- public CompletableFuture<Integer> reduceAsync(TaskExecutionContext
taskContext, Map<UUID, String> results) {
- return completedFuture(results.values().stream()
- .map(String::length)
- .reduce(Integer::sum)
- .orElseThrow());
+ public CompletableFuture<Void> reduceAsync(TaskExecutionContext
taskContext, Map<UUID, String> results) {
+ throw new RuntimeException();
}
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingSplitMapReduceTask.java
similarity index 53%
copy from
modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java
copy to
modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingSplitMapReduceTask.java
index 8532554ab19..7423337f50b 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java
+++
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/FailingSplitMapReduceTask.java
@@ -15,24 +15,27 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.compute.task;
+package org.apache.ignite.internal.compute;
+
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.task.MapReduceJob;
-import org.apache.ignite.lang.CancellationToken;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecutionContext;
-/**
- * Compute job submitter.
- */
-@FunctionalInterface
-public interface JobSubmitter<T, R> {
- /**
- * Submits compute jobs for an execution.
- *
- * @param computeJobRunners List of the compute job start parameters.
- * @param cancellationToken Cancellation token.
- */
- CompletableFuture<List<JobExecution<R>>> submit(List<MapReduceJob<T, R>>
computeJobRunners, CancellationToken cancellationToken);
+/** Map reduce task that throws an exception from the splitAsync. */
+public class FailingSplitMapReduceTask implements MapReduceTask<Void, Void,
Void, Void> {
+ @Override
+ public CompletableFuture<List<MapReduceJob<Void, Void>>>
splitAsync(TaskExecutionContext taskContext, Void input) {
+ throw new RuntimeException();
+ }
+
+ @Override
+ public CompletableFuture<Void> reduceAsync(TaskExecutionContext
taskContext, Map<UUID, Void> results) {
+ return nullCompletedFuture();
+ }
}
diff --git
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
index d33a93efe15..8acac4c83df 100644
---
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
+++
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/MapReduce.java
@@ -22,7 +22,6 @@ import static java.util.stream.Collectors.toList;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.JobDescriptor;
@@ -47,7 +46,7 @@ public class MapReduce implements
MapReduceTask<List<DeploymentUnit>, Void, Stri
.priority(Integer.MAX_VALUE)
.build())
.build())
- .nodes(Set.of(node))
+ .node(node)
.build()
).collect(toList()));
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
index 00fd414e33f..5f8091c5bb3 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponent.java
@@ -116,6 +116,7 @@ public interface ComputeComponent extends IgniteComponent {
JobSubmitter<M, T> jobSubmitter,
List<DeploymentUnit> units,
String taskClassName,
+ ComputeEventMetadataBuilder metadataBuilder,
I arg
);
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index 056782b9bbe..31b3aab798d 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -180,6 +180,7 @@ public class ComputeComponentImpl implements
ComputeComponent, SystemViewProvide
JobSubmitter<M, T> jobSubmitter,
List<DeploymentUnit> units,
String taskClassName,
+ ComputeEventMetadataBuilder metadataBuilder,
I input
) {
if (!busyLock.enterBusy()) {
@@ -192,7 +193,13 @@ public class ComputeComponentImpl implements
ComputeComponent, SystemViewProvide
CompletableFuture<TaskExecutionInternal<I, M, T, R>> taskFuture =
mapClassLoaderExceptions(jobContextManager.acquireClassLoader(units),
taskClassName)
.thenApply(context -> {
- TaskExecutionInternal<I, M, T, R> execution =
execTask(context, jobSubmitter, taskClassName, input);
+ TaskExecutionInternal<I, M, T, R> execution =
execTask(
+ context,
+ jobSubmitter,
+ taskClassName,
+ metadataBuilder,
+ input
+ );
execution.resultAsync().whenComplete((r, e) ->
context.close());
inFlightFutures.registerFuture(execution.resultAsync());
return execution;
@@ -368,10 +375,11 @@ public class ComputeComponentImpl implements
ComputeComponent, SystemViewProvide
JobContext context,
JobSubmitter<M, T> jobSubmitter,
String taskClassName,
- I input
+ ComputeEventMetadataBuilder metadataBuilder,
+ I arg
) {
try {
- return executor.executeTask(jobSubmitter,
taskClass(context.classLoader().classLoader(), taskClassName), input);
+ return executor.executeTask(jobSubmitter,
taskClass(context.classLoader().classLoader(), taskClassName), metadataBuilder,
arg);
} catch (Throwable e) {
context.close();
throw e;
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 278f20fbecd..bcda7162b43 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -601,8 +601,9 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
}
@Override
- public <T, R> TaskExecution<R> submitMapReduce(
+ public <T, R> TaskExecution<R> submitMapReduceInternal(
TaskDescriptor<T, R> taskDescriptor,
+ ComputeEventMetadataBuilder metadataBuilder,
@Nullable T arg,
@Nullable CancellationToken cancellationToken
) {
@@ -612,6 +613,7 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
this::submitJobs,
taskDescriptor.units(),
taskDescriptor.taskClassName(),
+ metadataBuilder,
arg
);
@@ -622,6 +624,15 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
return new TaskExecutionWrapper<>(taskExecution);
}
+ @Override
+ public <T, R> TaskExecution<R> submitMapReduce(
+ TaskDescriptor<T, R> taskDescriptor,
+ @Nullable T arg,
+ @Nullable CancellationToken cancellationToken
+ ) {
+ return submitMapReduceInternal(taskDescriptor,
ComputeEventMetadata.builder(Type.MAP_REDUCE), arg, cancellationToken);
+ }
+
@Override
public <T, R> R executeMapReduce(
TaskDescriptor<T, R> taskDescriptor,
@@ -633,13 +644,16 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
private <M, T> CompletableFuture<List<JobExecution<T>>> submitJobs(
List<MapReduceJob<M, T>> runners,
+ ComputeEventMetadataBuilder metadataBuilder,
CancellationToken cancellationToken
) {
+ //noinspection unchecked
return allOfToList(
runners.stream()
.map(runner -> submitAsync(
JobTarget.anyNode(runner.nodes()),
runner.jobDescriptor(),
+ metadataBuilder.copyOf(), // Make a copy since
the builder is mutable
runner.arg(),
cancellationToken
))
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
index 6060254e1b6..5702442d123 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeInternal.java
@@ -27,6 +27,9 @@ import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobExecutionOptions;
import org.apache.ignite.compute.JobState;
+import org.apache.ignite.compute.TaskDescriptor;
+import org.apache.ignite.compute.task.MapReduceTask;
+import org.apache.ignite.compute.task.TaskExecution;
import org.apache.ignite.deployment.DeploymentUnit;
import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
import org.apache.ignite.internal.table.TableViewInternal;
@@ -111,6 +114,24 @@ public interface IgniteComputeInternal extends
IgniteCompute {
@Nullable CancellationToken cancellationToken
);
+ /**
+ * Submits a {@link MapReduceTask} of the given class for an execution.
+ *
+ * @param <T> Job argument (T)ype.
+ * @param <R> Job (R)esult type.
+ * @param taskDescriptor Map reduce task descriptor.
+ * @param metadataBuilder Event metadata builder.
+ * @param arg Task argument.
+ * @param cancellationToken Cancellation token or {@code null}.
+ * @return Task execution interface.
+ */
+ <T, R> TaskExecution<R> submitMapReduceInternal(
+ TaskDescriptor<T, R> taskDescriptor,
+ ComputeEventMetadataBuilder metadataBuilder,
+ @Nullable T arg,
+ @Nullable CancellationToken cancellationToken
+ );
+
/**
* Retrieves the current state of all jobs on all nodes in the cluster.
*
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventMetadata.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventMetadata.java
index fc463feabff..a88dc92ddb8 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventMetadata.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventMetadata.java
@@ -35,7 +35,7 @@ public class ComputeEventMetadata {
private final String jobClassName;
/** Compute job id. */
- private final UUID jobId;
+ private final @Nullable UUID jobId;
/** Target node name. */
private final String targetNode;
@@ -44,19 +44,19 @@ public class ComputeEventMetadata {
private final String initiatorNode;
/** Used in events for broadcast jobs - common id for a group of jobs. */
- private final UUID taskId;
+ private final @Nullable UUID taskId;
/** For colocated jobs. */
- private final String tableName;
+ private final @Nullable String tableName;
/** For client API. */
- private final String clientAddress;
+ private final @Nullable String clientAddress;
ComputeEventMetadata(
EventUser eventUser,
Type type,
String jobClassName,
- UUID jobId,
+ @Nullable UUID jobId,
String targetNode,
String initiatorNode,
@Nullable UUID taskId,
@@ -105,6 +105,7 @@ public class ComputeEventMetadata {
return jobClassName;
}
+ @Nullable
UUID jobId() {
return jobId;
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventMetadataBuilder.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventMetadataBuilder.java
index 81afecee646..1ab4dfaf32a 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventMetadataBuilder.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventMetadataBuilder.java
@@ -30,7 +30,7 @@ public class ComputeEventMetadataBuilder {
private EventUser eventUser;
private Type type;
private String jobClassName;
- private UUID jobId;
+ private @Nullable UUID jobId;
private String targetNode;
private String initiatorNode;
private @Nullable UUID taskId;
@@ -52,12 +52,12 @@ public class ComputeEventMetadataBuilder {
return this;
}
- public ComputeEventMetadataBuilder jobId(UUID jobId) {
+ public ComputeEventMetadataBuilder jobId(@Nullable UUID jobId) {
this.jobId = jobId;
return this;
}
- public UUID jobId() {
+ public @Nullable UUID jobId() {
return jobId;
}
@@ -81,7 +81,7 @@ public class ComputeEventMetadataBuilder {
return this;
}
- public ComputeEventMetadataBuilder clientAddress(String clientAddress) {
+ public ComputeEventMetadataBuilder clientAddress(@Nullable String
clientAddress) {
this.clientAddress = clientAddress;
return this;
}
@@ -93,7 +93,7 @@ public class ComputeEventMetadataBuilder {
*/
public ComputeEventMetadata build() {
return new ComputeEventMetadata(
- eventUser,
+ eventUser != null ? eventUser : EventUser.system(),
type,
jobClassName,
jobId,
@@ -104,4 +104,27 @@ public class ComputeEventMetadataBuilder {
clientAddress
);
}
+
+ /**
+ * Creates new builder from this builder.
+ *
+ * @return Created builder.
+ */
+ public ComputeEventMetadataBuilder copyOf() {
+ ComputeEventMetadataBuilder builder = new ComputeEventMetadataBuilder()
+ .type(type)
+ .jobClassName(jobClassName)
+ .jobId(jobId)
+ .targetNode(targetNode)
+ .initiatorNode(initiatorNode)
+ .taskId(taskId)
+ .tableName(tableName)
+ .clientAddress(clientAddress);
+
+ if (eventUser != null) {
+ builder.eventUser(new UserDetails(eventUser.username(),
eventUser.authenticationProvider()));
+ }
+
+ return builder;
+ }
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventsFactory.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventsFactory.java
index 705d8e3e368..d3cc85bc7fb 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventsFactory.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/events/ComputeEventsFactory.java
@@ -27,7 +27,6 @@ import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JO
import java.util.Map;
import org.apache.ignite.internal.eventlog.api.EventLog;
import org.apache.ignite.internal.eventlog.api.IgniteEventType;
-import org.apache.ignite.internal.eventlog.event.EventUser;
import org.apache.ignite.internal.util.IgniteUtils;
import org.jetbrains.annotations.Nullable;
@@ -41,7 +40,7 @@ public class ComputeEventsFactory {
* @param eventLog Event log.
* @param eventMetadata Event metadata.
*/
- public static void logJobQueuedEvent(EventLog eventLog,
ComputeEventMetadata eventMetadata) {
+ public static void logJobQueuedEvent(EventLog eventLog, @Nullable
ComputeEventMetadata eventMetadata) {
logEvent(eventLog, COMPUTE_JOB_QUEUED, eventMetadata);
}
@@ -51,7 +50,7 @@ public class ComputeEventsFactory {
* @param eventLog Event log.
* @param eventMetadata Event metadata.
*/
- public static void logJobExecutingEvent(EventLog eventLog,
ComputeEventMetadata eventMetadata) {
+ public static void logJobExecutingEvent(EventLog eventLog, @Nullable
ComputeEventMetadata eventMetadata) {
logEvent(eventLog, COMPUTE_JOB_EXECUTING, eventMetadata);
}
@@ -61,7 +60,7 @@ public class ComputeEventsFactory {
* @param eventLog Event log.
* @param eventMetadata Event metadata.
*/
- public static void logJobFailedEvent(EventLog eventLog,
ComputeEventMetadata eventMetadata) {
+ public static void logJobFailedEvent(EventLog eventLog, @Nullable
ComputeEventMetadata eventMetadata) {
logEvent(eventLog, COMPUTE_JOB_FAILED, eventMetadata);
}
@@ -71,7 +70,7 @@ public class ComputeEventsFactory {
* @param eventLog Event log.
* @param eventMetadata Event metadata.
*/
- public static void logJobCompletedEvent(EventLog eventLog,
ComputeEventMetadata eventMetadata) {
+ public static void logJobCompletedEvent(EventLog eventLog, @Nullable
ComputeEventMetadata eventMetadata) {
logEvent(eventLog, COMPUTE_JOB_COMPLETED, eventMetadata);
}
@@ -81,7 +80,7 @@ public class ComputeEventsFactory {
* @param eventLog Event log.
* @param eventMetadata Event metadata.
*/
- public static void logJobCancelingEvent(EventLog eventLog,
ComputeEventMetadata eventMetadata) {
+ public static void logJobCancelingEvent(EventLog eventLog, @Nullable
ComputeEventMetadata eventMetadata) {
logEvent(eventLog, COMPUTE_JOB_CANCELING, eventMetadata);
}
@@ -91,28 +90,37 @@ public class ComputeEventsFactory {
* @param eventLog Event log.
* @param eventMetadata Event metadata.
*/
- public static void logJobCanceledEvent(EventLog eventLog,
ComputeEventMetadata eventMetadata) {
+ public static void logJobCanceledEvent(EventLog eventLog, @Nullable
ComputeEventMetadata eventMetadata) {
logEvent(eventLog, COMPUTE_JOB_CANCELED, eventMetadata);
}
- private static void logEvent(EventLog eventLog, IgniteEventType eventType,
ComputeEventMetadata eventMetadata) {
+ /**
+ * Logs compute job event.
+ *
+ * @param eventLog Event log.
+ * @param eventType Event type.
+ * @param eventMetadata Event metadata.
+ */
+ public static void logEvent(EventLog eventLog, IgniteEventType eventType,
@Nullable ComputeEventMetadata eventMetadata) {
+ if (eventMetadata == null) {
+ return;
+ }
+
eventLog.log(eventType.name(), () -> {
Map<String, Object> fields = IgniteUtils.newLinkedHashMap(8);
fields.put(FieldNames.TYPE, eventMetadata.type());
fields.put(FieldNames.CLASS_NAME, eventMetadata.jobClassName());
- fields.put(FieldNames.JOB_ID, eventMetadata.jobId());
fields.put(FieldNames.TARGET_NODE, eventMetadata.targetNode());
fields.put(FieldNames.INITIATOR_NODE,
eventMetadata.initiatorNode());
+ putIfNotNull(fields, FieldNames.JOB_ID, eventMetadata.jobId()); //
Could be null for map reduce tasks
putIfNotNull(fields, FieldNames.TASK_ID, eventMetadata.taskId());
putIfNotNull(fields, FieldNames.TABLE_NAME,
eventMetadata.tableName());
putIfNotNull(fields, FieldNames.CLIENT_ADDRESS,
eventMetadata.clientAddress());
- EventUser user = eventMetadata.eventUser();
-
return eventType.builder()
- .user(user != null ? user : EventUser.system())
+ .user(eventMetadata.eventUser())
.timestamp(System.currentTimeMillis())
.fields(fields)
.build();
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutor.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutor.java
index b8c0e78d33e..a8a94893144 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutor.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutor.java
@@ -41,6 +41,7 @@ public interface ComputeExecutor {
<I, M, T, R> TaskExecutionInternal<I, M, T, R> executeTask(
JobSubmitter<M, T> jobSubmitter,
Class<? extends MapReduceTask<I, M, T, R>> taskClass,
+ ComputeEventMetadataBuilder metadataBuilder,
I input
);
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
index 53c87b153c9..d20996b5c14 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
@@ -126,7 +126,9 @@ public class ComputeExecutorImpl implements ComputeExecutor
{
AtomicBoolean isInterrupted = new AtomicBoolean();
JobExecutionContext context = new JobExecutionContextImpl(ignite,
isInterrupted, classLoader, options.partition());
- metadataBuilder.jobClassName(jobClassName);
+ metadataBuilder
+ .jobClassName(jobClassName)
+ .targetNode(ignite.name());
Callable<CompletableFuture<ComputeJobDataHolder>> jobCallable =
getJobCallable(
options.executorType(), jobClassName, classLoader, input,
context);
@@ -241,21 +243,26 @@ public class ComputeExecutorImpl implements
ComputeExecutor {
public <I, M, T, R> TaskExecutionInternal<I, M, T, R> executeTask(
JobSubmitter<M, T> jobSubmitter,
Class<? extends MapReduceTask<I, M, T, R>> taskClass,
- I input
+ ComputeEventMetadataBuilder metadataBuilder,
+ I arg
) {
assert executorService != null;
AtomicBoolean isCancelled = new AtomicBoolean();
TaskExecutionContext context = new TaskExecutionContextImpl(ignite,
isCancelled);
- return new TaskExecutionInternal<>(executorService, jobSubmitter,
taskClass, context, isCancelled, input);
+ metadataBuilder
+ .jobClassName(taskClass.getName())
+ .targetNode(ignite.name());
+
+ return new TaskExecutionInternal<>(executorService, eventLog,
jobSubmitter, taskClass, context, isCancelled, metadataBuilder, arg);
}
@Override
public void start() {
stateMachine.start();
IgniteThreadFactory threadFactory =
IgniteThreadFactory.create(ignite.name(), "compute", LOG, STORAGE_READ,
STORAGE_WRITE);
- executorService = new PriorityQueueExecutor(configuration,
threadFactory, stateMachine, eventLog, ignite.name());
+ executorService = new PriorityQueueExecutor(configuration,
threadFactory, stateMachine, eventLog);
}
@Override
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
index 6a0b0dc3a13..c8f82bd4a52 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutor.java
@@ -31,6 +31,7 @@ import
org.apache.ignite.internal.compute.events.ComputeEventMetadata;
import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
import org.apache.ignite.internal.compute.state.ComputeStateMachine;
import org.apache.ignite.internal.eventlog.api.EventLog;
+import org.jetbrains.annotations.Nullable;
/**
* Compute job executor with priority mechanism.
@@ -42,26 +43,21 @@ public class PriorityQueueExecutor {
private final EventLog eventLog;
- private final String nodeName;
-
/**
* Constructor.
*
* @param configuration Compute configuration.
* @param threadFactory Thread factory.
* @param eventLog Event log.
- * @param nodeName Local node name.
*/
public PriorityQueueExecutor(
ComputeConfiguration configuration,
ThreadFactory threadFactory,
ComputeStateMachine stateMachine,
- EventLog eventLog,
- String nodeName
+ EventLog eventLog
) {
this.stateMachine = stateMachine;
this.eventLog = eventLog;
- this.nodeName = nodeName;
BlockingQueue<Runnable> workQueue = new
BoundedPriorityBlockingQueue<>(() -> configuration.queueMaxSize().value());
executor = new ComputeThreadPoolExecutor(
@@ -84,7 +80,7 @@ public class PriorityQueueExecutor {
* @return Completable future which will be finished when compute job
finished.
*/
public <R> QueueExecution<R> submit(Callable<CompletableFuture<R>> job,
int priority, int maxRetries) {
- return submit(job, priority, maxRetries,
ComputeEventMetadata.builder());
+ return submit(job, priority, maxRetries, null);
}
/**
@@ -94,27 +90,25 @@ public class PriorityQueueExecutor {
* @param job Execute job callable.
* @param priority Job priority.
* @param maxRetries Number of retries of the execution after failure,
{@code 0} means the execution will not be retried.
- * @param metadataBuilder Event metadata builder.
+ * @param metadataBuilder Event metadata builder. No events will be logged
if it's {@code null}.
* @return Completable future which will be finished when compute job
finished.
*/
public <R> QueueExecution<R> submit(
Callable<CompletableFuture<R>> job,
int priority,
int maxRetries,
- ComputeEventMetadataBuilder metadataBuilder
+ @Nullable ComputeEventMetadataBuilder metadataBuilder
) {
Objects.requireNonNull(job);
UUID jobId = stateMachine.initJob();
// Job ID could be set previously, if this is a remotely initiated
execution, see ComputeJobFailover.
- if (metadataBuilder.jobId() == null) {
+ if (metadataBuilder != null && metadataBuilder.jobId() == null) {
metadataBuilder.jobId(jobId);
}
- ComputeEventMetadata eventMetadata = metadataBuilder
- .targetNode(nodeName)
- .build();
+ ComputeEventMetadata eventMetadata = metadataBuilder != null ?
metadataBuilder.build() : null;
logJobQueuedEvent(eventLog, eventMetadata);
QueueExecutionImpl<R> execution = new QueueExecutionImpl<>(jobId, job,
priority, executor, stateMachine, eventLog, eventMetadata);
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
index 1173b4faf63..1e072a8f60c 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/queue/QueueExecutionImpl.java
@@ -55,7 +55,7 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
private final ComputeThreadPoolExecutor executor;
private final ComputeStateMachine stateMachine;
private final EventLog eventLog;
- private final ComputeEventMetadata eventMetadata;
+ private final @Nullable ComputeEventMetadata eventMetadata;
private final CompletableFuture<R> result = new CompletableFuture<>();
@@ -85,7 +85,7 @@ class QueueExecutionImpl<R> implements QueueExecution<R> {
ComputeThreadPoolExecutor executor,
ComputeStateMachine stateMachine,
EventLog eventLog,
- ComputeEventMetadata eventMetadata
+ @Nullable ComputeEventMetadata eventMetadata
) {
this.jobId = jobId;
this.job = job;
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java
index 8532554ab19..8ee8d5eb9a7 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/JobSubmitter.java
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.task.MapReduceJob;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
import org.apache.ignite.lang.CancellationToken;
/**
@@ -32,7 +33,12 @@ public interface JobSubmitter<T, R> {
* Submits compute jobs for an execution.
*
* @param computeJobRunners List of the compute job start parameters.
+ * @param metadataBuilder Compute event metadata builder.
* @param cancellationToken Cancellation token.
*/
- CompletableFuture<List<JobExecution<R>>> submit(List<MapReduceJob<T, R>>
computeJobRunners, CancellationToken cancellationToken);
+ CompletableFuture<List<JobExecution<R>>> submit(
+ List<MapReduceJob<T, R>> computeJobRunners,
+ ComputeEventMetadataBuilder metadataBuilder,
+ CancellationToken cancellationToken
+ );
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
index f1b4c8558ae..88a0e00f0dd 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
@@ -27,6 +27,12 @@ import static org.apache.ignite.compute.TaskStatus.FAILED;
import static
org.apache.ignite.internal.compute.ComputeUtils.getTaskSplitArgumentType;
import static org.apache.ignite.internal.compute.ComputeUtils.instantiateTask;
import static
org.apache.ignite.internal.compute.ComputeUtils.unmarshalOrNotIfNull;
+import static
org.apache.ignite.internal.compute.events.ComputeEventsFactory.logEvent;
+import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_CANCELED;
+import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_COMPLETED;
+import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_EXECUTING;
+import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_FAILED;
+import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_TASK_QUEUED;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
import static org.apache.ignite.internal.util.ArrayUtils.concat;
import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
@@ -46,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.compute.JobExecution;
import org.apache.ignite.compute.JobState;
+import org.apache.ignite.compute.JobStatus;
import org.apache.ignite.compute.TaskState;
import org.apache.ignite.compute.TaskStatus;
import org.apache.ignite.compute.task.MapReduceJob;
@@ -56,8 +63,12 @@ import
org.apache.ignite.internal.compute.HybridTimestampProvider;
import org.apache.ignite.internal.compute.MarshallerProvider;
import org.apache.ignite.internal.compute.ResultUnmarshallingJobExecution;
import org.apache.ignite.internal.compute.TaskStateImpl;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
import org.apache.ignite.internal.compute.queue.PriorityQueueExecutor;
import org.apache.ignite.internal.compute.queue.QueueExecution;
+import org.apache.ignite.internal.eventlog.api.EventLog;
+import org.apache.ignite.internal.eventlog.api.IgniteEventType;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -89,32 +100,50 @@ public class TaskExecutionInternal<I, M, T, R> implements
CancellableTaskExecuti
private final CancelHandle cancelHandle = CancelHandle.create();
+ private final EventLog eventLog;
+
private final AtomicBoolean isCancelled;
+ private final UUID taskId = UUID.randomUUID();
+
+ private final ComputeEventMetadata eventMetadata;
+
private volatile @Nullable Marshaller<R, byte[]> reduceResultMarshallerRef;
/**
* Construct an execution object and starts executing.
*
* @param executorService Compute jobs executor.
+ * @param eventLog Event log.
* @param jobSubmitter Compute jobs submitter.
* @param taskClass Map reduce task class.
* @param context Task execution context.
* @param isCancelled Flag which is passed to the execution context so
that the task can check it for cancellation request.
+ * @param metadataBuilder Event metadata builder.
* @param arg Task argument.
*/
public TaskExecutionInternal(
PriorityQueueExecutor executorService,
+ EventLog eventLog,
JobSubmitter<M, T> jobSubmitter,
Class<? extends MapReduceTask<I, M, T, R>> taskClass,
TaskExecutionContext context,
AtomicBoolean isCancelled,
+ ComputeEventMetadataBuilder metadataBuilder,
I arg
) {
+ this.eventLog = eventLog;
this.isCancelled = isCancelled;
+ eventMetadata = metadataBuilder.taskId(taskId).build();
+
LOG.debug("Executing task {}", taskClass.getName());
+
+ logEvent(eventLog, COMPUTE_TASK_QUEUED, eventMetadata);
+
splitExecution = executorService.submit(
() -> {
+ logEvent(eventLog, COMPUTE_TASK_EXECUTING, eventMetadata);
+
MapReduceTask<I, M, T, R> task =
instantiateTask(taskClass);
reduceResultMarshallerRef =
task.reduceJobResultMarshaller();
@@ -131,7 +160,7 @@ public class TaskExecutionInternal<I, M, T, R> implements
CancellableTaskExecuti
executionsFuture =
splitExecution.resultAsync().thenCompose(splitResult -> {
List<MapReduceJob<M, T>> runners = splitResult.runners();
LOG.debug("Submitting {} jobs for {}", runners.size(),
taskClass.getName());
- return jobSubmitter.submit(runners, cancelHandle.token());
+ return jobSubmitter.submit(runners, metadataBuilder,
cancelHandle.token());
});
resultsFuture =
executionsFuture.thenCompose(TaskExecutionInternal::resultsAsync);
@@ -150,26 +179,50 @@ public class TaskExecutionInternal<I, M, T, R> implements
CancellableTaskExecuti
Integer.MAX_VALUE,
0
);
- }).whenComplete(this::captureReduceFailure);
+ }).whenComplete(this::captureReduceExecution);
}
- private void captureReduceFailure(QueueExecution<R> reduceExecution,
Throwable throwable) {
+ private void captureReduceExecution(QueueExecution<R> reduceExecution,
Throwable throwable) {
if (throwable != null) {
- // Capture the reduce execution failure reason and time.
- TaskStatus status = isCancelled.get() ? CANCELED : FAILED;
-
- JobState state = splitExecution.state();
- if (state != null) {
- reduceFailedState.set(
- TaskStateImpl.toBuilder(state)
- .status(status)
- .finishTime(Instant.now())
- .build()
- );
- }
+ captureReduceSubmitFailure(throwable);
+ } else {
+ handleReduceResult(reduceExecution);
}
}
+ private void captureReduceSubmitFailure(Throwable throwable) {
+ // Capture the reduce submit failure reason and time.
+ TaskStatus status = isCancelled.get() ? CANCELED : FAILED;
+
+ logEvent(eventLog, status == CANCELED ? COMPUTE_TASK_CANCELED :
COMPUTE_TASK_FAILED, eventMetadata);
+
+ JobState state = splitExecution.state();
+ if (state != null) {
+ reduceFailedState.set(
+ TaskStateImpl.toBuilder(state)
+ .id(taskId)
+ .status(status)
+ .finishTime(Instant.now())
+ .build()
+ );
+ }
+ }
+
+ private void handleReduceResult(QueueExecution<R> reduceExecution) {
+ reduceExecution.resultAsync().whenComplete((result, throwable) -> {
+ if (result != null) {
+ logEvent(eventLog, COMPUTE_TASK_COMPLETED, eventMetadata);
+ } else {
+ JobState reduceState = reduceExecution.state();
+ // The state should never be null since it was just submitted,
but check just in case.
+ if (reduceState != null) {
+ IgniteEventType type = reduceState.status() ==
JobStatus.FAILED ? COMPUTE_TASK_FAILED : COMPUTE_TASK_CANCELED;
+ logEvent(eventLog, type, eventMetadata);
+ }
+ }
+ });
+ }
+
@Override
public CompletableFuture<R> resultAsync() {
return reduceExecutionFuture.thenCompose(QueueExecution::resultAsync);
@@ -184,7 +237,7 @@ public class TaskExecutionInternal<I, M, T, R> implements
CancellableTaskExecuti
}
if (splitState.status() != COMPLETED) {
- return
completedFuture(TaskStateImpl.toBuilder(splitState).build());
+ return
completedFuture(TaskStateImpl.toBuilder(splitState).id(taskId).build());
}
// This future is complete when reduce execution job is submitted,
return status from it.
@@ -196,7 +249,7 @@ public class TaskExecutionInternal<I, M, T, R> implements
CancellableTaskExecuti
return null;
}
return TaskStateImpl.toBuilder(reduceState)
- .id(splitState.id())
+ .id(taskId)
.createTime(splitState.createTime())
.startTime(splitState.startTime())
.build();
@@ -207,6 +260,7 @@ public class TaskExecutionInternal<I, M, T, R> implements
CancellableTaskExecuti
// At this point split is complete but reduce job is not submitted yet.
return completedFuture(TaskStateImpl.toBuilder(splitState)
+ .id(taskId)
.status(EXECUTING)
.finishTime(null)
.build());
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
index c695c57aeec..095808a6f17 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/queue/PriorityQueueExecutorTest.java
@@ -614,8 +614,7 @@ public class PriorityQueueExecutorTest extends
BaseIgniteAbstractTest {
configuration,
IgniteThreadFactory.create(nodeName, "compute", LOG),
new InMemoryComputeStateMachine(configuration, nodeName),
- EventLog.NOOP,
- nodeName
+ EventLog.NOOP
);
}
diff --git
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEventType.java
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEventType.java
index 89be27c2d37..fdb115331ba 100644
---
a/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEventType.java
+++
b/modules/eventlog/src/main/java/org/apache/ignite/internal/eventlog/api/IgniteEventType.java
@@ -44,6 +44,12 @@ public enum IgniteEventType {
COMPUTE_JOB_CANCELING,
COMPUTE_JOB_CANCELED,
+ COMPUTE_TASK_QUEUED,
+ COMPUTE_TASK_EXECUTING,
+ COMPUTE_TASK_FAILED,
+ COMPUTE_TASK_COMPLETED,
+ COMPUTE_TASK_CANCELED,
+
QUERY_STARTED,
QUERY_FINISHED;