This is an automated email from the ASF dual-hosted git repository.
apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 73cba3e2d28 IGNITE-26172 Log job failed event when failover fails
(#6461)
73cba3e2d28 is described below
commit 73cba3e2d28516652bfe586b230e6ddc4d10ec40
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Tue Aug 26 11:35:11 2025 +0300
IGNITE-26172 Log job failed event when failover fails (#6461)
---
...> ItFailoverCandidateNotFoundEmbeddedTest.java} | 28 ++--
.../compute/ItFailoverCandidateNotFoundTest.java | 156 ++++++++++++++++++---
...ItFailoverCandidateNotFoundThinClientTest.java} | 19 ++-
...Test.java => ItWorkerShutdownEmbeddedTest.java} | 2 +-
.../internal/compute/ItWorkerShutdownTest.java | 22 ++-
...st.java => ItWorkerShutdownThinClientTest.java} | 2 +-
.../internal/compute/events/EventMatcher.java | 63 ++++++++-
...sTest.java => ItComputeEventsEmbeddedTest.java} | 13 +-
.../compute/events/ItComputeEventsTest.java | 52 ++-----
...est.java => ItComputeEventsThinClientTest.java} | 12 +-
.../internal/compute/ComputeComponentImpl.java | 9 +-
.../internal/compute/ComputeJobFailover.java | 21 +++
.../internal/compute/ComputeComponentImplTest.java | 3 +-
.../testframework/log4j2/EventLogInspector.java | 47 +++++++
.../org/apache/ignite/internal/app/IgniteImpl.java | 3 +-
15 files changed, 332 insertions(+), 120 deletions(-)
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItThinClientComputeEventsTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundEmbeddedTest.java
similarity index 59%
copy from
modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItThinClientComputeEventsTest.java
copy to
modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundEmbeddedTest.java
index 4adb0aa8578..4f4264b2cbc 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItThinClientComputeEventsTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundEmbeddedTest.java
@@ -15,35 +15,25 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.compute.events;
+package org.apache.ignite.internal.compute;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
+import static
org.apache.ignite.internal.compute.events.EventMatcher.embeddedJobEvent;
import java.util.UUID;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
-import org.apache.ignite.internal.compute.utils.Clients;
+import org.apache.ignite.internal.compute.events.EventMatcher;
import org.apache.ignite.internal.eventlog.api.IgniteEventType;
-import org.junit.jupiter.api.AfterAll;
-
-class ItThinClientComputeEventsTest extends ItComputeEventsTest {
- private final Clients clients = new Clients();
-
- @AfterAll
- void cleanup() {
- clients.cleanup();
- }
+import org.jetbrains.annotations.Nullable;
+class ItFailoverCandidateNotFoundEmbeddedTest extends
ItFailoverCandidateNotFoundTest {
@Override
- protected IgniteCompute compute() {
- return clients.compute(node(0));
+ IgniteCompute compute() {
+ return node(0).compute();
}
@Override
- protected EventMatcher jobEvent(IgniteEventType eventType, Type jobType,
UUID jobId, String jobClassName, String targetNode) {
- return super.jobEvent(eventType, jobType, jobId, jobClassName,
targetNode)
- .withUsername(is("unknown"))
- .withClientAddress(notNullValue(String.class));
+ protected EventMatcher jobEvent(IgniteEventType eventType, Type jobType,
@Nullable UUID jobId, String jobClassName, String targetNode) {
+ return embeddedJobEvent(eventType, jobType, jobId, jobClassName,
targetNode, node(0).name());
}
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java
index a9c61370fb9..ce69893c880 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundTest.java
@@ -17,27 +17,73 @@
package org.apache.ignite.internal.compute;
-import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type.SINGLE;
+import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_EXECUTING;
+import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_FAILED;
+import static
org.apache.ignite.internal.eventlog.api.IgniteEventType.COMPUTE_JOB_QUEUED;
+import static org.apache.ignite.internal.eventlog.api.IgniteEventType.values;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInRelativeOrder;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.not;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
+import java.util.UUID;
import java.util.stream.Collectors;
-import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.compute.JobDescriptor;
import org.apache.ignite.compute.JobTarget;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
+import org.apache.ignite.internal.compute.events.EventMatcher;
import org.apache.ignite.internal.compute.utils.InteractiveJobs;
import org.apache.ignite.internal.compute.utils.TestingJobExecution;
+import org.apache.ignite.internal.eventlog.api.IgniteEventType;
+import org.apache.ignite.internal.testframework.log4j2.EventLogInspector;
import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/**
* Tests that the failover fails when candidate node is not in the cluster
when it is selected for the failover.
*/
-public class ItFailoverCandidateNotFoundTest extends
ClusterPerTestIntegrationTest {
+abstract class ItFailoverCandidateNotFoundTest extends
ClusterPerTestIntegrationTest {
+ private final EventLogInspector logInspector = new EventLogInspector();
+
+ @BeforeEach
+ void setUp() {
+ InteractiveJobs.clearState();
+
+ logInspector.start();
+ }
+
+ @AfterEach
+ void afterEach() {
+ logInspector.stop();
+ }
+
+ @Override
+ protected void customizeInitParameters(InitParametersBuilder builder) {
+ String allComputeEvents = Arrays.stream(values())
+ .map(IgniteEventType::name)
+ .filter(name -> name.startsWith("COMPUTE_JOB"))
+ .collect(Collectors.joining(", ", "[", "]"));
+
+ builder.clusterConfiguration("ignite.eventlog {"
+ + " sinks.logSink.channel: testChannel,"
+ + " channels.testChannel.events: " + allComputeEvents
+ + "}");
+ }
+
@Override
protected int initialNodes() {
return 5;
@@ -51,28 +97,18 @@ public class ItFailoverCandidateNotFoundTest extends
ClusterPerTestIntegrationTe
return new int[]{0, 3, 4};
}
- @Test
- void thinClientFailoverCandidateLeavesCluster() throws Exception {
- failoverCandidateLeavesCluster(node(0).compute());
- }
+ abstract IgniteCompute compute();
@Test
- void embeddedFailoverCandidateLeavesCluster() throws Exception {
- String address = "127.0.0.1:" +
unwrapIgniteImpl(node(0)).clientAddress().port();
- try (IgniteClient client =
IgniteClient.builder().addresses(address).build()) {
- failoverCandidateLeavesCluster(client.compute());
- }
- }
-
- private void failoverCandidateLeavesCluster(IgniteCompute compute) throws
Exception {
+ void failoverCandidateLeavesCluster() throws Exception {
// Given remote candidates to execute a job.
- Set<ClusterNode> remoteWorkerCandidates =
Set.of(unwrapIgniteImpl(node(1)).node(), unwrapIgniteImpl(node(2)).node());
+ Set<ClusterNode> remoteWorkerCandidates = Set.of(clusterNode(1),
clusterNode(2));
Set<String> remoteWorkerCandidateNames =
remoteWorkerCandidates.stream()
.map(ClusterNode::name)
.collect(Collectors.toCollection(HashSet::new));
// When execute job.
- TestingJobExecution<String> execution =
executeGlobalInteractiveJob(compute, remoteWorkerCandidates);
+ TestingJobExecution<String> execution =
executeGlobalInteractiveJob(remoteWorkerCandidates);
// Then one of candidates became a worker and run the job.
String workerNodeName =
InteractiveJobs.globalJob().currentWorkerName();
@@ -80,10 +116,11 @@ public class ItFailoverCandidateNotFoundTest extends
ClusterPerTestIntegrationTe
InteractiveJobs.globalJob().assertAlive();
// And.
execution.assertExecuting();
+ UUID jobId = execution.idSync();
// Remove worker node from candidates, leaving other node.
remoteWorkerCandidateNames.remove(workerNodeName);
- assertThat(remoteWorkerCandidateNames.size(), is(1));
+ assertThat(remoteWorkerCandidateNames, hasSize(1));
// Stop non-worker candidate node.
String failoverCandidateNodeName =
remoteWorkerCandidateNames.stream().findFirst().orElseThrow();
@@ -94,10 +131,87 @@ public class ItFailoverCandidateNotFoundTest extends
ClusterPerTestIntegrationTe
// Then the job is failed, because there are no more failover workers.
execution.assertFailed();
+
+ String jobClassName = InteractiveJobs.globalJob().name();
+
+ // When node is shut down gracefully, the job execution is interrupted
and event could be logged anyway
+ // So there would be 2 events from a worker node, 1 failed events from
a worker node and 1 failed event from the coordinator
+ await().until(logInspector::events, contains(
+ jobEvent(COMPUTE_JOB_QUEUED, jobId, jobClassName,
workerNodeName),
+ jobEvent(COMPUTE_JOB_EXECUTING, jobId, jobClassName,
workerNodeName),
+ jobEvent(COMPUTE_JOB_FAILED, jobId, jobClassName,
workerNodeName),
+ jobEvent(COMPUTE_JOB_FAILED, jobId, jobClassName,
workerNodeName)
+ ));
}
- private static TestingJobExecution<String>
executeGlobalInteractiveJob(IgniteCompute compute, Set<ClusterNode> nodes) {
- return new TestingJobExecution<>(compute.submitAsync(
+ @Test
+ void failoverAndThenStopWorker() throws Exception {
+ // Given remote candidates to execute a job.
+ Set<ClusterNode> remoteWorkerCandidates = Set.of(clusterNode(1),
clusterNode(2));
+ Set<String> remoteWorkerCandidateNames =
remoteWorkerCandidates.stream()
+ .map(ClusterNode::name)
+ .collect(Collectors.toCollection(HashSet::new));
+
+ // When execute job.
+ TestingJobExecution<String> execution =
executeGlobalInteractiveJob(remoteWorkerCandidates);
+
+ // Then one of candidates became a worker and run the job.
+ String workerNodeName =
InteractiveJobs.globalJob().currentWorkerName();
+ UUID jobId = execution.idSync();
+
+ // When stop worker node.
+ stopNode(workerNodeName);
+
+ // Remove worker node from candidates, leaving other node.
+ remoteWorkerCandidateNames.remove(workerNodeName);
+ assertThat(remoteWorkerCandidateNames, hasSize(1));
+
+ // Wait for execution on another node
+ String failoverWorker =
InteractiveJobs.globalJob().currentWorkerName();
+ assertThat(failoverWorker, not(equalTo(workerNodeName)));
+ assertThat(remoteWorkerCandidateNames, contains(failoverWorker));
+
+ // Stop failover worker node.
+ stopNode(failoverWorker);
+
+ // Then the job is failed, because there are no more failover workers.
+ execution.assertFailed();
+
+ // When node is shut down gracefully, the job execution is interrupted
and event could be logged anyway
+ // So there would be 4 events from worker nodes, 2 failed events from
both worker nodes and 1 failed event from the coordinator
+ // The order of failed events is not determined
+ await().until(logInspector::events, hasSize(7));
+
+ String jobClassName = InteractiveJobs.globalJob().name();
+
+ assertThat(logInspector.events(), containsInRelativeOrder(
+ jobEvent(COMPUTE_JOB_QUEUED, jobId, jobClassName,
workerNodeName),
+ jobEvent(COMPUTE_JOB_EXECUTING, jobId, jobClassName,
workerNodeName),
+ jobEvent(COMPUTE_JOB_QUEUED, jobId, jobClassName,
failoverWorker),
+ jobEvent(COMPUTE_JOB_EXECUTING, jobId, jobClassName,
failoverWorker),
+ jobEvent(COMPUTE_JOB_FAILED, jobId, jobClassName,
failoverWorker)
+ ));
+
+ // Failed event from second worker node
+ assertThat(logInspector.events(), hasItem(
+ jobEvent(COMPUTE_JOB_FAILED, jobId, jobClassName,
workerNodeName)
+ ));
+ }
+
+ private EventMatcher jobEvent(IgniteEventType eventType, @Nullable UUID
jobId, String jobClassName, String targetNode) {
+ return jobEvent(eventType, SINGLE, jobId, jobClassName, targetNode);
+ }
+
+ protected abstract EventMatcher jobEvent(
+ IgniteEventType eventType,
+ Type jobType,
+ @Nullable UUID jobId,
+ String jobClassName,
+ String targetNode
+ );
+
+ private TestingJobExecution<String>
executeGlobalInteractiveJob(Set<ClusterNode> nodes) {
+ return new TestingJobExecution<>(compute().submitAsync(
JobTarget.anyNode(nodes),
JobDescriptor.builder(InteractiveJobs.globalJob().jobClass()).build(),
null
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItThinClientComputeEventsTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundThinClientTest.java
similarity index 70%
copy from
modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItThinClientComputeEventsTest.java
copy to
modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundThinClientTest.java
index 4adb0aa8578..23815afed7e 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItThinClientComputeEventsTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItFailoverCandidateNotFoundThinClientTest.java
@@ -15,22 +15,23 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.compute.events;
+package org.apache.ignite.internal.compute;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
+import static
org.apache.ignite.internal.compute.events.EventMatcher.thinClientJobEvent;
import java.util.UUID;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
+import org.apache.ignite.internal.compute.events.EventMatcher;
import org.apache.ignite.internal.compute.utils.Clients;
import org.apache.ignite.internal.eventlog.api.IgniteEventType;
-import org.junit.jupiter.api.AfterAll;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.AfterEach;
-class ItThinClientComputeEventsTest extends ItComputeEventsTest {
+class ItFailoverCandidateNotFoundThinClientTest extends
ItFailoverCandidateNotFoundTest {
private final Clients clients = new Clients();
- @AfterAll
+ @AfterEach
void cleanup() {
clients.cleanup();
}
@@ -41,9 +42,7 @@ class ItThinClientComputeEventsTest extends
ItComputeEventsTest {
}
@Override
- protected EventMatcher jobEvent(IgniteEventType eventType, Type jobType,
UUID jobId, String jobClassName, String targetNode) {
- return super.jobEvent(eventType, jobType, jobId, jobClassName,
targetNode)
- .withUsername(is("unknown"))
- .withClientAddress(notNullValue(String.class));
+ protected EventMatcher jobEvent(IgniteEventType eventType, Type jobType,
@Nullable UUID jobId, String jobClassName, String targetNode) {
+ return thinClientJobEvent(eventType, jobType, jobId, jobClassName,
targetNode, node(0).name());
}
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItEmbeddedWorkerShutdownTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownEmbeddedTest.java
similarity index 94%
rename from
modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItEmbeddedWorkerShutdownTest.java
rename to
modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownEmbeddedTest.java
index b9e2b57ddda..8d4a7525437 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItEmbeddedWorkerShutdownTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownEmbeddedTest.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.compute;
import org.apache.ignite.Ignite;
import org.apache.ignite.compute.IgniteCompute;
-class ItEmbeddedWorkerShutdownTest extends ItWorkerShutdownTest {
+class ItWorkerShutdownEmbeddedTest extends ItWorkerShutdownTest {
@Override
IgniteCompute compute(Ignite entryNode) {
return entryNode.compute();
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java
index 3765fc3e01f..43281d1ead5 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownTest.java
@@ -122,13 +122,11 @@ public abstract class ItWorkerShutdownTest extends
ClusterPerTestIntegrationTest
@Test
void remoteExecutionWorkerShutdown() throws Exception {
- // Given entry node.
- Ignite entryNode = node(0);
// And remote candidates to execute a job.
Set<String> remoteWorkerCandidates = workerCandidates(node(1),
node(2));
// When execute job.
- TestingJobExecution<String> execution =
executeGlobalInteractiveJob(entryNode, remoteWorkerCandidates);
+ TestingJobExecution<String> execution =
executeGlobalInteractiveJob(remoteWorkerCandidates);
// Then one of candidates became a worker and run the job.
String workerNodeName =
InteractiveJobs.globalJob().currentWorkerName();
@@ -181,13 +179,11 @@ public abstract class ItWorkerShutdownTest extends
ClusterPerTestIntegrationTest
@Test
void remoteExecutionSingleWorkerShutdown() throws Exception {
- // Given.
- Ignite entryNode = node(0);
// And only one remote candidate to execute a job.
Set<String> remoteWorkerCandidates = workerCandidates(node(1));
// When execute job.
- TestingJobExecution<String> execution =
executeGlobalInteractiveJob(entryNode, remoteWorkerCandidates);
+ TestingJobExecution<String> execution =
executeGlobalInteractiveJob(remoteWorkerCandidates);
// Then the job is running on worker node.
String workerNodeName =
InteractiveJobs.globalJob().currentWorkerName();
@@ -209,7 +205,7 @@ public abstract class ItWorkerShutdownTest extends
ClusterPerTestIntegrationTest
Ignite entryNode = node(0);
// When execute job locally.
- TestingJobExecution<String> execution =
executeGlobalInteractiveJob(entryNode, Set.of(entryNode.name()));
+ TestingJobExecution<String> execution =
executeGlobalInteractiveJob(Set.of(entryNode.name()));
// Then the job is running.
InteractiveJobs.globalJob().assertAlive();
@@ -331,14 +327,12 @@ public abstract class ItWorkerShutdownTest extends
ClusterPerTestIntegrationTest
@Test
void cancelRemoteExecutionOnRestartedJob() throws Exception {
- // Given entry node.
- Ignite entryNode = node(0);
// And remote candidates to execute a job.
Set<String> remoteWorkerCandidates = workerCandidates(node(1),
node(2));
// When execute job.
CancelHandle cancelHandle = CancelHandle.create();
- TestingJobExecution<String> execution =
executeGlobalInteractiveJob(entryNode, remoteWorkerCandidates,
cancelHandle.token());
+ TestingJobExecution<String> execution =
executeGlobalInteractiveJob(remoteWorkerCandidates, cancelHandle.token());
// Then one of candidates became a worker and run the job.
String workerNodeName =
InteractiveJobs.globalJob().currentWorkerName();
@@ -445,12 +439,12 @@ public abstract class ItWorkerShutdownTest extends
ClusterPerTestIntegrationTest
return cluster.runningNodes().filter(node ->
node.name().equals(candidateName)).findFirst().orElseThrow();
}
- private TestingJobExecution<String> executeGlobalInteractiveJob(Ignite
entryNode, Set<String> nodes) {
- return executeGlobalInteractiveJob(entryNode, nodes, null);
+ private TestingJobExecution<String>
executeGlobalInteractiveJob(Set<String> nodes) {
+ return executeGlobalInteractiveJob(nodes, null);
}
- private TestingJobExecution<String> executeGlobalInteractiveJob(Ignite
entryNode, Set<String> nodes, CancellationToken token) {
- return new TestingJobExecution<>(compute(entryNode).submitAsync(
+ private TestingJobExecution<String>
executeGlobalInteractiveJob(Set<String> nodes, CancellationToken token) {
+ return new TestingJobExecution<>(compute(node(0)).submitAsync(
JobTarget.anyNode(clusterNodesByNames(nodes)),
JobDescriptor.builder(InteractiveJobs.globalJob().jobClass()).build(),
null,
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItThinClientWorkerShutdownTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownThinClientTest.java
similarity index 95%
rename from
modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItThinClientWorkerShutdownTest.java
rename to
modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownThinClientTest.java
index e33ee0e1a32..999bd2e2bb8 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItThinClientWorkerShutdownTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItWorkerShutdownThinClientTest.java
@@ -22,7 +22,7 @@ import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.internal.compute.utils.Clients;
import org.junit.jupiter.api.AfterEach;
-class ItThinClientWorkerShutdownTest extends ItWorkerShutdownTest {
+class ItWorkerShutdownThinClientTest extends ItWorkerShutdownTest {
private final Clients clients = new Clients();
@AfterEach
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/EventMatcher.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/EventMatcher.java
index 8865d133e22..650bf0c115d 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/EventMatcher.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/EventMatcher.java
@@ -18,14 +18,18 @@
package org.apache.ignite.internal.compute.events;
import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.UUID;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
import
org.apache.ignite.internal.compute.events.ComputeEventsFactory.FieldNames;
import org.apache.ignite.internal.compute.utils.MismatchesDescriptor;
import org.apache.ignite.internal.eventlog.api.IgniteEventType;
+import org.apache.ignite.internal.properties.IgniteProductVersion;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
@@ -56,6 +60,61 @@ public class EventMatcher extends TypeSafeMatcher<String> {
return new EventMatcher(eventType.name());
}
+ /**
+ * Generates a matcher for compute job event originated from the embedded
API.
+ *
+ * @param eventType Event type.
+ * @param jobType Job type.
+ * @param jobId Job ID.
+ * @param jobClassName Job class name.
+ * @param targetNode Target node name.
+ * @param initiatorNode Initiator node name.
+ * @return Constructed event matcher.
+ */
+ public static EventMatcher embeddedJobEvent(
+ IgniteEventType eventType,
+ Type jobType,
+ @Nullable UUID jobId,
+ String jobClassName,
+ String targetNode,
+ String initiatorNode
+ ) {
+ return computeJobEvent(eventType)
+ .withTimestamp(notNullValue(Long.class))
+
.withProductVersion(is(IgniteProductVersion.CURRENT_VERSION.toString()))
+ .withUsername(is("SYSTEM"))
+ .withType(jobType.name())
+ .withClassName(jobClassName)
+ .withJobId(jobId)
+ .withTargetNode(targetNode)
+ .withInitiatorNode(initiatorNode)
+ .withClientAddress(nullValue());
+ }
+
+ /**
+ * Generates a matcher for compute job event originated from the thin
client API.
+ *
+ * @param eventType Event type.
+ * @param jobType Job type.
+ * @param jobId Job ID.
+ * @param jobClassName Job class name.
+ * @param targetNode Target node name.
+ * @param initiatorNode Initiator node name.
+ * @return Constructed event matcher.
+ */
+ public static EventMatcher thinClientJobEvent(
+ IgniteEventType eventType,
+ Type jobType,
+ @Nullable UUID jobId,
+ String jobClassName,
+ String targetNode,
+ String initiatorNode
+ ) {
+ return embeddedJobEvent(eventType, jobType, jobId, jobClassName,
targetNode, initiatorNode)
+ .withUsername(is("unknown"))
+ .withClientAddress(notNullValue(String.class));
+ }
+
EventMatcher withTimestamp(Matcher<? super Long> matcher) {
this.timestampMatcher = matcher;
return this;
@@ -101,8 +160,8 @@ public class EventMatcher extends TypeSafeMatcher<String> {
return this;
}
- EventMatcher withInitiatorNode(Matcher<? super String>
initiatorNodeMatcher) {
- this.initiatorNodeMatcher = initiatorNodeMatcher;
+ EventMatcher withInitiatorNode(String initiatorNode) {
+ this.initiatorNodeMatcher = is(initiatorNode);
return this;
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItEmbeddedComputeEventsTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsEmbeddedTest.java
similarity index 61%
rename from
modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItEmbeddedComputeEventsTest.java
rename to
modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsEmbeddedTest.java
index 1c42a4190ba..d9a7dd199b4 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItEmbeddedComputeEventsTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsEmbeddedTest.java
@@ -17,11 +17,22 @@
package org.apache.ignite.internal.compute.events;
+import static
org.apache.ignite.internal.compute.events.EventMatcher.embeddedJobEvent;
+
+import java.util.UUID;
import org.apache.ignite.compute.IgniteCompute;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
+import org.apache.ignite.internal.eventlog.api.IgniteEventType;
+import org.jetbrains.annotations.Nullable;
-class ItEmbeddedComputeEventsTest extends ItComputeEventsTest {
+class ItComputeEventsEmbeddedTest extends ItComputeEventsTest {
@Override
protected IgniteCompute compute() {
return node(0).compute();
}
+
+ @Override
+ protected EventMatcher jobEvent(IgniteEventType eventType, Type jobType,
@Nullable UUID jobId, String jobClassName, String targetNode) {
+ return embeddedJobEvent(eventType, jobType, jobId, jobClassName,
targetNode, node(0).name());
+ }
}
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsTest.java
index 42ad55221f4..c9419d69c74 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsTest.java
@@ -37,18 +37,14 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInRelativeOrder;
import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
-import static org.hamcrest.Matchers.nullValue;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Arrays;
-import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
import org.apache.ignite.InitParametersBuilder;
import org.apache.ignite.compute.BroadcastExecution;
@@ -66,9 +62,7 @@ import org.apache.ignite.internal.compute.SilentSleepJob;
import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
import org.apache.ignite.internal.compute.events.EventMatcher.Event;
import org.apache.ignite.internal.eventlog.api.IgniteEventType;
-import org.apache.ignite.internal.properties.IgniteProductVersion;
-import org.apache.ignite.internal.testframework.log4j2.LogInspector;
-import org.apache.ignite.internal.testframework.log4j2.LogInspector.Handler;
+import org.apache.ignite.internal.testframework.log4j2.EventLogInspector;
import org.apache.ignite.lang.CancelHandle;
import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.table.QualifiedName;
@@ -84,16 +78,10 @@ import org.junit.jupiter.params.provider.ValueSource;
@ConfigOverride(name = "ignite.compute.threadPoolSize", value = "1")
abstract class ItComputeEventsTest extends ClusterPerClassIntegrationTest {
- private final List<String> events = new CopyOnWriteArrayList<>();
-
- private final LogInspector logInspector = new LogInspector(
- "EventLog", // From LogSinkConfigurationSchema.criteria default
value
- new Handler(event -> true, event ->
events.add(event.getMessage().getFormattedMessage()))
- );
+ private final EventLogInspector logInspector = new EventLogInspector();
@BeforeEach
void startLogInspector() {
- events.clear();
logInspector.start();
}
@@ -145,11 +133,11 @@ abstract class ItComputeEventsTest extends
ClusterPerClassIntegrationTest {
assertThat(broadcastExecution.resultsAsync(),
willCompleteSuccessfully());
- await().until(() -> events, hasSize(9));
+ await().until(logInspector::events, hasSize(9));
// Now get the task ID from the first event and assert that all events
have the same task ID.
ObjectMapper mapper = new ObjectMapper();
- Event firstEvent = mapper.readValue(events.get(0), Event.class);
+ Event firstEvent = mapper.readValue(logInspector.events().get(0),
Event.class);
UUID taskId = firstEvent.fields.taskId;
assertThat(taskId, notNullValue());
@@ -157,7 +145,7 @@ abstract class ItComputeEventsTest extends
ClusterPerClassIntegrationTest {
broadcastExecution.executions().forEach(execution -> {
UUID jobId = execution.idAsync().join(); // Safe to join since
execution is complete.
String targetNode = execution.node().name();
- assertThat(events, containsInRelativeOrder(
+ assertThat(logInspector.events(), containsInRelativeOrder(
broadcastJobEvent(COMPUTE_JOB_QUEUED, jobId, jobClassName,
targetNode, taskId),
broadcastJobEvent(COMPUTE_JOB_EXECUTING, jobId,
jobClassName, targetNode, taskId),
broadcastJobEvent(COMPUTE_JOB_COMPLETED, jobId,
jobClassName, targetNode, taskId)
@@ -176,11 +164,11 @@ abstract class ItComputeEventsTest extends
ClusterPerClassIntegrationTest {
int defaultPartitionCount = 25;
assertThat(broadcastExecution.executions(),
hasSize(defaultPartitionCount));
- await().until(() -> events, hasSize(defaultPartitionCount * 3));
+ await().until(logInspector::events, hasSize(defaultPartitionCount *
3));
// Now get the task ID from the first event and assert that all events
have the same task ID.
ObjectMapper mapper = new ObjectMapper();
- Event firstEvent = mapper.readValue(events.get(0), Event.class);
+ Event firstEvent = mapper.readValue(logInspector.events().get(0),
Event.class);
UUID taskId = firstEvent.fields.taskId;
assertThat(taskId, notNullValue());
@@ -189,7 +177,7 @@ abstract class ItComputeEventsTest extends
ClusterPerClassIntegrationTest {
broadcastExecution.executions().forEach(execution -> {
UUID jobId = execution.idAsync().join(); // Safe to join since
execution is complete.
String targetNode = execution.node().name();
- assertThat(events, containsInRelativeOrder(
+ assertThat(logInspector.events(), containsInRelativeOrder(
broadcastJobEvent(COMPUTE_JOB_QUEUED, jobId, jobClassName,
targetNode, taskId).withTableName(tableName),
broadcastJobEvent(COMPUTE_JOB_EXECUTING, jobId,
jobClassName, targetNode, taskId).withTableName(tableName),
broadcastJobEvent(COMPUTE_JOB_COMPLETED, jobId,
jobClassName, targetNode, taskId).withTableName(tableName)
@@ -370,37 +358,21 @@ abstract class ItComputeEventsTest extends
ClusterPerClassIntegrationTest {
return jobEvent(eventType, BROADCAST, jobId, jobClassName,
targetNode).withTaskId(taskId);
}
- private EventMatcher jobEvent(
- IgniteEventType eventType,
- @Nullable UUID jobId,
- String jobClassName,
- String targetNode
- ) {
+ private EventMatcher jobEvent(IgniteEventType eventType, @Nullable UUID
jobId, String jobClassName, String targetNode) {
return jobEvent(eventType, SINGLE, jobId, jobClassName, targetNode);
}
- protected EventMatcher jobEvent(
+ protected abstract EventMatcher jobEvent(
IgniteEventType eventType,
Type jobType,
@Nullable UUID jobId,
String jobClassName,
String targetNode
- ) {
- return EventMatcher.computeJobEvent(eventType)
- .withTimestamp(notNullValue(Long.class))
-
.withProductVersion(is(IgniteProductVersion.CURRENT_VERSION.toString()))
- .withUsername(is("SYSTEM"))
- .withType(jobType.name())
- .withClassName(jobClassName)
- .withJobId(jobId)
- .withTargetNode(targetNode)
- .withInitiatorNode(is(node(0).name()))
- .withClientAddress(nullValue());
- }
+ );
@SafeVarargs
private void assertEvents(Matcher<String>... matchers) {
- await().until(() -> events, contains(matchers));
+ await().until(logInspector::events, contains(matchers));
}
private static void createTestTableWithOneRow() {
diff --git
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItThinClientComputeEventsTest.java
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsThinClientTest.java
similarity index 78%
rename from
modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItThinClientComputeEventsTest.java
rename to
modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsThinClientTest.java
index 4adb0aa8578..5bc7960eb82 100644
---
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItThinClientComputeEventsTest.java
+++
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/events/ItComputeEventsThinClientTest.java
@@ -17,17 +17,17 @@
package org.apache.ignite.internal.compute.events;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
+import static
org.apache.ignite.internal.compute.events.EventMatcher.thinClientJobEvent;
import java.util.UUID;
import org.apache.ignite.compute.IgniteCompute;
import org.apache.ignite.internal.compute.events.ComputeEventMetadata.Type;
import org.apache.ignite.internal.compute.utils.Clients;
import org.apache.ignite.internal.eventlog.api.IgniteEventType;
+import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterAll;
-class ItThinClientComputeEventsTest extends ItComputeEventsTest {
+class ItComputeEventsThinClientTest extends ItComputeEventsTest {
private final Clients clients = new Clients();
@AfterAll
@@ -41,9 +41,7 @@ class ItThinClientComputeEventsTest extends
ItComputeEventsTest {
}
@Override
- protected EventMatcher jobEvent(IgniteEventType eventType, Type jobType,
UUID jobId, String jobClassName, String targetNode) {
- return super.jobEvent(eventType, jobType, jobId, jobClassName,
targetNode)
- .withUsername(is("unknown"))
- .withClientAddress(notNullValue(String.class));
+ protected EventMatcher jobEvent(IgniteEventType eventType, Type jobType,
@Nullable UUID jobId, String jobClassName, String targetNode) {
+ return thinClientJobEvent(eventType, jobType, jobId, jobClassName,
targetNode, node(0).name());
}
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
index e1ce3e87083..056782b9bbe 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeComponentImpl.java
@@ -47,6 +47,7 @@ import
org.apache.ignite.internal.compute.messaging.RemoteJobExecution;
import org.apache.ignite.internal.compute.task.DelegatingTaskExecution;
import org.apache.ignite.internal.compute.task.JobSubmitter;
import org.apache.ignite.internal.compute.task.TaskExecutionInternal;
+import org.apache.ignite.internal.eventlog.api.EventLog;
import org.apache.ignite.internal.future.InFlightFutures;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -88,6 +89,8 @@ public class ComputeComponentImpl implements
ComputeComponent, SystemViewProvide
private final ComputeExecutor executor;
+ private final EventLog eventLog;
+
private final ComputeMessaging messaging;
private final ExecutionManager executionManager;
@@ -106,12 +109,14 @@ public class ComputeComponentImpl implements
ComputeComponent, SystemViewProvide
LogicalTopologyService logicalTopologyService,
JobContextManager jobContextManager,
ComputeExecutor executor,
- ComputeConfiguration computeConfiguration
+ ComputeConfiguration computeConfiguration,
+ EventLog eventLog
) {
this.topologyService = topologyService;
this.logicalTopologyService = logicalTopologyService;
this.jobContextManager = jobContextManager;
this.executor = executor;
+ this.eventLog = eventLog;
executionManager = new ExecutionManager(computeConfiguration,
topologyService);
messaging = new ComputeMessaging(executionManager, messagingService,
topologyService);
failoverExecutor = Executors.newSingleThreadExecutor(
@@ -263,7 +268,7 @@ public class ComputeComponentImpl implements
ComputeComponent, SystemViewProvide
@Nullable CancellationToken cancellationToken
) {
return ComputeJobFailover.failSafeExecute(
- this, logicalTopologyService, topologyService,
failoverExecutor,
+ this, logicalTopologyService, topologyService,
failoverExecutor, eventLog,
remoteNode, nextWorkerSelector, options, units,
jobClassName, metadataBuilder, arg
)
.thenApply(execution -> {
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
index e2f0714418e..2646411ea8c 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeJobFailover.java
@@ -27,7 +27,10 @@ import
org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyEventListener;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologySnapshot;
+import org.apache.ignite.internal.compute.events.ComputeEventMetadata;
import org.apache.ignite.internal.compute.events.ComputeEventMetadataBuilder;
+import org.apache.ignite.internal.compute.events.ComputeEventsFactory;
+import org.apache.ignite.internal.eventlog.api.EventLog;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -69,6 +72,7 @@ class ComputeJobFailover {
*/
private final Executor executor;
+ private final EventLog eventLog;
/**
* The node where the job is being executed at a given moment. If node
leaves the cluster, then the job is restarted on one of the
* worker node returned by {@link #nextWorkerSelector} and the reference
is CASed to the new node.
@@ -99,6 +103,7 @@ class ComputeJobFailover {
* @param logicalTopologyService logical topology service.
* @param topologyService physical topology service.
* @param executor the thread pool where the failover should run on.
+ * @param eventLog Event log.
* @param workerNode the node to execute the job on.
* @param nextWorkerSelector the selector that returns the next worker to
execute job on.
* @param executionOptions execution options like priority or max retries.
@@ -112,6 +117,7 @@ class ComputeJobFailover {
LogicalTopologyService logicalTopologyService,
TopologyService topologyService,
Executor executor,
+ EventLog eventLog,
ClusterNode workerNode,
NextWorkerSelector nextWorkerSelector,
ExecutionOptions executionOptions,
@@ -124,6 +130,7 @@ class ComputeJobFailover {
this.logicalTopologyService = logicalTopologyService;
this.topologyService = topologyService;
this.executor = executor;
+ this.eventLog = eventLog;
this.runningWorkerNode = new AtomicReference<>(workerNode);
this.nextWorkerSelector = nextWorkerSelector;
@@ -138,6 +145,7 @@ class ComputeJobFailover {
LogicalTopologyService logicalTopologyService,
TopologyService topologyService,
Executor executor,
+ EventLog eventLog,
ClusterNode workerNode,
NextWorkerSelector nextWorkerSelector,
ExecutionOptions executionOptions,
@@ -151,6 +159,7 @@ class ComputeJobFailover {
logicalTopologyService,
topologyService,
executor,
+ eventLog,
workerNode,
nextWorkerSelector,
executionOptions,
@@ -211,6 +220,8 @@ class ComputeJobFailover {
if (nextWorker == null) {
LOG.warn("No more worker nodes to restart the job.
Failing the job {}.", jobContext.jobClassName());
+ logJobFailedEvent();
+
failSafeExecution.completeExceptionally(new
IgniteInternalException(Compute.COMPUTE_JOB_FAILED_ERR));
return;
}
@@ -229,5 +240,15 @@ class ComputeJobFailover {
launchJobOn(nextWorker).thenAccept(execution ->
failSafeExecution.updateJobExecution(execution));
});
}
+
+ private void logJobFailedEvent() {
+ // Fill missing fields
+ ComputeEventMetadata eventMetadata = jobContext.metadataBuilder()
+ .jobClassName(jobContext.jobClassName())
+ .targetNode(runningWorkerNode.get().name()) // Use last
worker node
+ .build();
+
+ ComputeEventsFactory.logJobFailedEvent(eventLog, eventMetadata);
+ }
}
}
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index 1cc6fae45a8..441a51601b6 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -180,7 +180,8 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
logicalTopologyService,
jobContextManager,
computeExecutor,
- computeConfiguration
+ computeConfiguration,
+ EventLog.NOOP
);
assertThat(computeComponent.startAsync(new ComponentContext()),
willCompleteSuccessfully());
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/log4j2/EventLogInspector.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/log4j2/EventLogInspector.java
new file mode 100644
index 00000000000..1dbd91419a9
--- /dev/null
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/log4j2/EventLogInspector.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.testframework.log4j2;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.internal.testframework.log4j2.LogInspector.Handler;
+
+/**
+ * Helper class that listens for the messages from the EventLog logger and
puts them into the list.
+ */
+public class EventLogInspector {
+ private final List<String> events = new CopyOnWriteArrayList<>();
+
+ private final LogInspector logInspector = new LogInspector(
+ "EventLog", // From LogSinkConfigurationSchema.criteria default
value
+ new Handler(event -> true, event ->
events.add(event.getMessage().getFormattedMessage()))
+ );
+
+ public void start() {
+ events.clear();
+ logInspector.start();
+ }
+
+ public void stop() {
+ logInspector.stop();
+ }
+
+ public List<String> events() {
+ return events;
+ }
+}
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 4eca1ae0a8d..4e5df51023a 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -1251,7 +1251,8 @@ public class IgniteImpl implements Ignite {
logicalTopologyService,
new JobContextManager(deploymentManagerImpl,
deploymentManagerImpl.deploymentUnitAccessor(), new JobClassLoaderFactory()),
computeExecutor,
- computeCfg
+ computeCfg,
+ eventLog
);
systemViewManager.register(computeComponent);