This is an automated email from the ASF dual-hosted git repository. apkhmv pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 6f31157466 IGNITE-20858 Improve error handling in compute jobs (#3121) 6f31157466 is described below commit 6f3115746698273b3b22dd4f59bf0ad87a67d982 Author: Ivan Gagarkin <gagarkin....@gmail.com> AuthorDate: Thu Feb 15 11:22:16 2024 +0100 IGNITE-20858 Improve error handling in compute jobs (#3121) --- .../ignite/internal/compute/ItComputeBaseTest.java | 51 +++-- .../internal/compute/ItComputeTestEmbedded.java | 99 ++++++-- .../internal/compute/ItComputeTestStandalone.java | 31 ++- .../ignite/internal/compute/ComputeUtils.java | 31 +++ .../ignite/internal/compute/IgniteComputeImpl.java | 8 +- .../compute/JobExecutionFutureWrapper.java | 3 +- .../internal/compute/JobExecutionWrapper.java | 3 +- .../ignite/internal/IgniteExceptionTestUtils.java} | 46 ++-- .../Apache.Ignite.Tests/Compute/ComputeTests.cs | 6 +- .../runner/app/client/ItThinClientComputeTest.java | 250 +++++++++++++++------ .../internal/ClusterPerClassIntegrationTest.java | 3 +- .../internal/ClusterPerTestIntegrationTest.java | 8 +- 12 files changed, 394 insertions(+), 145 deletions(-) diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java index abf492e3e8..38e36152c7 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java @@ -20,11 +20,10 @@ package org.apache.ignite.internal.compute; import static java.util.stream.Collectors.toList; import static org.apache.ignite.compute.JobState.COMPLETED; import static org.apache.ignite.compute.JobState.FAILED; -import static org.apache.ignite.internal.compute.utils.ComputeTestUtils.assertPublicException; +import static org.apache.ignite.internal.IgniteExceptionTestUtils.assertTraceableException; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState; -import static org.apache.ignite.lang.ErrorGroups.Common.COMMON_ERR_GROUP; -import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; +import static org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.aMapWithSize; import static org.hamcrest.Matchers.containsString; @@ -43,10 +42,12 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import org.apache.ignite.Ignite; +import org.apache.ignite.compute.ComputeException; import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.compute.JobExecution; -import org.apache.ignite.internal.ClusterPerTestIntegrationTest; +import org.apache.ignite.internal.ClusterPerClassIntegrationTest; import org.apache.ignite.internal.app.IgniteImpl; +import org.apache.ignite.internal.util.ExceptionUtils; import org.apache.ignite.lang.IgniteException; import org.apache.ignite.lang.TableNotFoundException; import org.apache.ignite.network.ClusterNode; @@ -57,7 +58,7 @@ import org.junit.jupiter.api.Test; /** * Base integration tests for Compute functionality. */ -public abstract class ItComputeBaseTest extends ClusterPerTestIntegrationTest { +public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest { protected abstract List<DeploymentUnit> units(); protected abstract String concatJobClassName(); @@ -76,6 +77,10 @@ public abstract class ItComputeBaseTest extends ClusterPerTestIntegrationTest { assertThat(result, is("a42")); } + IgniteImpl node(int i) { + return CLUSTER.node(i); + } + @Test void executesJobLocallyAsync() { IgniteImpl entryNode = node(0); @@ -140,7 +145,7 @@ public abstract class ItComputeBaseTest extends ClusterPerTestIntegrationTest { IgniteException ex = assertThrows(IgniteException.class, () -> entryNode.compute() .execute(Set.of(entryNode.node()), units(), failingJobClassName())); - assertPublicException(ex, COMMON_ERR_GROUP, INTERNAL_ERR, "Oops"); + assertComputeException(ex, "JobException", "Oops"); } @Test @@ -152,7 +157,7 @@ public abstract class ItComputeBaseTest extends ClusterPerTestIntegrationTest { ExecutionException ex = assertThrows(ExecutionException.class, () -> execution.resultAsync().get(1, TimeUnit.SECONDS)); - assertPublicException(ex, COMMON_ERR_GROUP, INTERNAL_ERR, "Oops"); + assertComputeException(ex, "JobException", "Oops"); assertThat(execution.statusAsync(), willBe(jobStatusWithState(FAILED))); assertThat(execution.cancelAsync(), willBe(false)); @@ -165,7 +170,7 @@ public abstract class ItComputeBaseTest extends ClusterPerTestIntegrationTest { IgniteException ex = assertThrows(IgniteException.class, () -> entryNode.compute() .execute(Set.of(node(1).node(), node(2).node()), units(), failingJobClassName())); - assertPublicException(ex, COMMON_ERR_GROUP, INTERNAL_ERR, "Oops"); + assertComputeException(ex, "JobException", "Oops"); } @Test @@ -177,7 +182,7 @@ public abstract class ItComputeBaseTest extends ClusterPerTestIntegrationTest { ExecutionException ex = assertThrows(ExecutionException.class, () -> execution.resultAsync().get(1, TimeUnit.SECONDS)); - assertPublicException(ex, COMMON_ERR_GROUP, INTERNAL_ERR, "Oops"); + assertComputeException(ex, "JobException", "Oops"); assertThat(execution.statusAsync(), willBe(jobStatusWithState(FAILED))); assertThat(execution.cancelAsync(), willBe(false)); @@ -232,7 +237,7 @@ public abstract class ItComputeBaseTest extends ClusterPerTestIntegrationTest { .get(1, TimeUnit.SECONDS); assertThat(result, is(instanceOf(CompletionException.class))); - assertPublicException(result, COMMON_ERR_GROUP, INTERNAL_ERR, "Oops"); + assertComputeException(result, "JobException", "Oops"); assertThat(execution.statusAsync(), willBe(jobStatusWithState(FAILED))); assertThat(execution.cancelAsync(), willBe(false)); @@ -258,7 +263,7 @@ public abstract class ItComputeBaseTest extends ClusterPerTestIntegrationTest { IgniteImpl entryNode = node(0); JobExecution<String> execution = entryNode.compute() - .<String>executeColocatedAsync("test", Tuple.create(Map.of("k", 1)), units(), getNodeNameJobClassName()); + .executeColocatedAsync("test", Tuple.create(Map.of("k", 1)), units(), getNodeNameJobClassName()); assertThat(execution.resultAsync(), willBe(in(allNodeNames()))); assertThat(execution.statusAsync(), willBe(jobStatusWithState(COMPLETED))); @@ -277,9 +282,10 @@ public abstract class ItComputeBaseTest extends ClusterPerTestIntegrationTest { assertThat(ex.getCause().getMessage(), containsString("The table does not exist [name=\"PUBLIC\".\"bad-table\"]")); } - private void createTestTableWithOneRow() { - executeSql("CREATE TABLE test (k int, v int, CONSTRAINT PK PRIMARY KEY (k))"); - executeSql("INSERT INTO test(k, v) VALUES (1, 101)"); + private static void createTestTableWithOneRow() { + sql("DROP TABLE IF EXISTS test"); + sql("CREATE TABLE test (k int, v int, CONSTRAINT PK PRIMARY KEY (k))"); + sql("INSERT INTO test(k, v) VALUES (1, 101)"); } private List<String> allNodeNames() { @@ -308,10 +314,25 @@ public abstract class ItComputeBaseTest extends ClusterPerTestIntegrationTest { IgniteImpl entryNode = node(0); JobExecution<String> execution = entryNode.compute() - .<Integer, String>executeColocatedAsync("test", 1, Mapper.of(Integer.class), units(), getNodeNameJobClassName()); + .executeColocatedAsync("test", 1, Mapper.of(Integer.class), units(), getNodeNameJobClassName()); assertThat(execution.resultAsync(), willBe(in(allNodeNames()))); assertThat(execution.statusAsync(), willBe(jobStatusWithState(COMPLETED))); assertThat(execution.cancelAsync(), willBe(false)); } + + protected static void assertComputeException(Exception ex, Throwable cause) { + assertComputeException(ex, cause.getClass().getName(), cause.getMessage()); + } + + protected static void assertComputeException(Exception ex, Class<?> cause, String causeMsgSubstring) { + assertComputeException(ex, cause.getName(), causeMsgSubstring); + } + + private static void assertComputeException(Exception ex, String causeClass, String causeMsgSubstring) { + assertTraceableException(ex, ComputeException.class, COMPUTE_JOB_FAILED_ERR, "Job execution failed:"); + Throwable cause = ExceptionUtils.unwrapCause(ex); + assertThat(cause.getCause().getClass().getName(), containsString(causeClass)); + assertThat(cause.getCause().getMessage(), containsString(causeMsgSubstring)); + } } diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java index 67a9a569b4..292a790e7e 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java @@ -18,12 +18,14 @@ package org.apache.ignite.internal.compute; import static java.util.stream.Collectors.joining; -import static org.apache.ignite.internal.compute.utils.ComputeTestUtils.assertPublicException; +import static org.apache.ignite.internal.IgniteExceptionTestUtils.assertPublicCheckedException; +import static org.apache.ignite.internal.IgniteExceptionTestUtils.assertPublicException; +import static org.apache.ignite.internal.IgniteExceptionTestUtils.assertTraceableException; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState; +import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; import static org.apache.ignite.lang.ErrorGroups.Compute.CLASS_INITIALIZATION_ERR; -import static org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_ERR_GROUP; import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -37,6 +39,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; import org.apache.ignite.Ignite; import org.apache.ignite.compute.ComputeException; import org.apache.ignite.compute.ComputeJob; @@ -46,7 +49,10 @@ import org.apache.ignite.compute.JobExecutionContext; import org.apache.ignite.compute.JobExecutionOptions; import org.apache.ignite.compute.JobState; import org.apache.ignite.internal.app.IgniteImpl; -import org.apache.ignite.lang.ErrorGroup; +import org.apache.ignite.internal.lang.IgniteInternalCheckedException; +import org.apache.ignite.internal.lang.IgniteInternalException; +import org.apache.ignite.internal.util.ExceptionUtils; +import org.apache.ignite.lang.IgniteCheckedException; import org.apache.ignite.lang.IgniteException; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -81,18 +87,18 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { @ParameterizedTest @MethodSource("wrongJobClassArguments") - void executesWrongJobClassLocally(String jobClassName, ErrorGroup errorGroup, int errorCode, String msg) { + void executesWrongJobClassLocally(String jobClassName, int errorCode, String msg) { IgniteImpl entryNode = node(0); IgniteException ex = assertThrows(IgniteException.class, () -> entryNode.compute() .execute(Set.of(entryNode.node()), units(), jobClassName)); - assertPublicException(ex, ComputeException.class, errorGroup, errorCode, msg); + assertTraceableException(ex, ComputeException.class, errorCode, msg); } @ParameterizedTest @MethodSource("wrongJobClassArguments") - void executesWrongJobClassLocallyAsync(String jobClassName, ErrorGroup errorGroup, int errorCode, String msg) { + void executesWrongJobClassLocallyAsync(String jobClassName, int errorCode, String msg) { IgniteImpl entryNode = node(0); ExecutionException ex = assertThrows(ExecutionException.class, () -> entryNode.compute() @@ -100,23 +106,23 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { .resultAsync() .get(1, TimeUnit.SECONDS)); - assertPublicException(ex, ComputeException.class, errorGroup, errorCode, msg); + assertTraceableException(ex, ComputeException.class, errorCode, msg); } @ParameterizedTest @MethodSource("wrongJobClassArguments") - void executesWrongJobClassOnRemoteNodes(String jobClassName, ErrorGroup errorGroup, int errorCode, String msg) { + void executesWrongJobClassOnRemoteNodes(String jobClassName, int errorCode, String msg) { Ignite entryNode = node(0); IgniteException ex = assertThrows(IgniteException.class, () -> entryNode.compute() .execute(Set.of(node(1).node(), node(2).node()), units(), jobClassName)); - assertPublicException(ex, ComputeException.class, errorGroup, errorCode, msg); + assertTraceableException(ex, ComputeException.class, errorCode, msg); } @ParameterizedTest @MethodSource("wrongJobClassArguments") - void executesWrongJobClassOnRemoteNodesAsync(String jobClassName, ErrorGroup errorGroup, int errorCode, String msg) { + void executesWrongJobClassOnRemoteNodesAsync(String jobClassName, int errorCode, String msg) { Ignite entryNode = node(0); ExecutionException ex = assertThrows(ExecutionException.class, () -> entryNode.compute() @@ -124,7 +130,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { .resultAsync() .get(1, TimeUnit.SECONDS)); - assertPublicException(ex, ComputeException.class, errorGroup, errorCode, msg); + assertTraceableException(ex, ComputeException.class, errorCode, msg); } @Test @@ -164,6 +170,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { await().until(execution::statusAsync, willBe(jobStatusWithState(JobState.EXECUTING))); assertThat(execution.changePriorityAsync(2), willBe(false)); + assertThat(execution.cancelAsync(), willBe(true)); } @Test @@ -175,6 +182,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { await().until(execution::statusAsync, willBe(jobStatusWithState(JobState.EXECUTING))); assertThat(execution.changePriorityAsync(2), willBe(false)); + assertThat(execution.cancelAsync(), willBe(true)); } @Test @@ -211,11 +219,12 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { // Tasks 1 and 3 completed successfully assertThat(execution1.resultAsync(), willCompleteSuccessfully()); assertThat(execution3.resultAsync(), willCompleteSuccessfully()); - assertThat(execution1.resultAsync().isDone(), is(true)); - assertThat(execution3.resultAsync().isDone(), is(true)); // Task 2 is not completed assertThat(execution2.resultAsync().isDone(), is(false)); + + // Finish task 2 + assertThat(execution2.cancelAsync(), willBe(true)); } @Test @@ -254,8 +263,6 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { // Tasks 1 and 3 completed successfully assertThat(execution1.resultAsync(), willCompleteSuccessfully()); assertThat(execution3.resultAsync(), willCompleteSuccessfully()); - assertThat(execution1.resultAsync().isDone(), is(true)); - assertThat(execution3.resultAsync().isDone(), is(true)); // Task 3 should be executed 2 times assertEquals(2, WaitLatchThrowExceptionOnFirstExecutionJob.counter.get()); @@ -264,7 +271,51 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { assertThat(execution2.resultAsync().isDone(), is(false)); // Cancel task2 - execution2.cancelAsync().join(); + assertThat(execution2.cancelAsync(), willBe(true)); + } + + @Test + void shouldNotConvertIgniteException() { + IgniteImpl entryNode = node(0); + + IgniteException exception = new IgniteException(INTERNAL_ERR, "Test exception"); + + IgniteException ex = assertThrows(IgniteException.class, () -> entryNode.compute() + .execute(Set.of(entryNode.node()), units(), CustomFailingJob.class.getName(), exception)); + + assertPublicException(ex, exception.code(), exception.getMessage()); + } + + @Test + void shouldNotConvertIgniteCheckedException() { + IgniteImpl entryNode = node(0); + + IgniteCheckedException exception = new IgniteCheckedException(INTERNAL_ERR, "Test exception"); + + IgniteCheckedException ex = assertThrows(IgniteCheckedException.class, () -> entryNode.compute() + .execute(Set.of(entryNode.node()), units(), CustomFailingJob.class.getName(), exception)); + + assertPublicCheckedException(ex, exception.code(), exception.getMessage()); + } + + private static Stream<Arguments> privateExceptions() { + return Stream.of( + Arguments.of(new IgniteInternalException(INTERNAL_ERR, "Test exception")), + Arguments.of(new IgniteInternalCheckedException(INTERNAL_ERR, "Test exception")), + Arguments.of(new RuntimeException("Test exception")), + Arguments.of(new Exception("Test exception")) + ); + } + + @ParameterizedTest + @MethodSource("privateExceptions") + void shouldConvertToComputeException(Throwable throwable) { + IgniteImpl entryNode = node(0); + + IgniteException ex = assertThrows(IgniteException.class, () -> entryNode.compute() + .execute(Set.of(entryNode.node()), units(), CustomFailingJob.class.getName(), throwable)); + + assertComputeException(ex, throwable); } private static class ConcatJob implements ComputeJob<String> { @@ -285,6 +336,14 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { } } + private static class CustomFailingJob implements ComputeJob<String> { + /** {@inheritDoc} */ + @Override + public String execute(JobExecutionContext context, Object... args) { + throw ExceptionUtils.sneakyThrow((Throwable) args[0]); + } + } + private static class FailingJob implements ComputeJob<String> { /** {@inheritDoc} */ @Override @@ -301,11 +360,9 @@ class ItComputeTestEmbedded extends ItComputeBaseTest { private static List<Arguments> wrongJobClassArguments() { return List.of( - Arguments.of("org.example.NonExistentJob", COMPUTE_ERR_GROUP, CLASS_INITIALIZATION_ERR, "Cannot load job class by name"), - Arguments.of(NonComputeJob.class.getName(), COMPUTE_ERR_GROUP, CLASS_INITIALIZATION_ERR, - "does not implement ComputeJob interface"), - Arguments.of(NonEmptyConstructorJob.class.getName(), COMPUTE_ERR_GROUP, CLASS_INITIALIZATION_ERR, - "Cannot instantiate job") + Arguments.of("org.example.NonExistentJob", CLASS_INITIALIZATION_ERR, "Cannot load job class by name"), + Arguments.of(NonComputeJob.class.getName(), CLASS_INITIALIZATION_ERR, "does not implement ComputeJob interface"), + Arguments.of(NonEmptyConstructorJob.class.getName(), CLASS_INITIALIZATION_ERR, "Cannot instantiate job") ); } diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java index 419bcbd896..a924bc4ede 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java +++ b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java @@ -17,14 +17,11 @@ package org.apache.ignite.internal.compute; -import static org.apache.ignite.internal.compute.utils.ComputeTestUtils.assertPublicException; import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED; import static org.apache.ignite.internal.deployunit.DeploymentStatus.OBSOLETE; import static org.apache.ignite.internal.deployunit.InitialDeployMode.MAJORITY; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; -import static org.apache.ignite.lang.ErrorGroups.Common.COMMON_ERR_GROUP; -import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.nullValue; @@ -56,7 +53,18 @@ class ItComputeTestStandalone extends ItComputeBaseTest { @BeforeEach void setUp() throws IOException { - deployJar(node(0), unit.name(), unit.version(), "ignite-it-jobs-1.0-SNAPSHOT.jar"); + IgniteImpl entryNode = node(0); + //TODO https://issues.apache.org/jira/browse/IGNITE-19757 + try { + entryNode.deployment().undeployAsync(unit.name(), unit.version()).join(); + } catch (Exception ignored) { + // ignored + } + await().until( + () -> entryNode.deployment().clusterStatusAsync(unit.name(), unit.version()), + willBe(nullValue()) + ); + deployJar(entryNode, unit.name(), unit.version(), "ignite-it-jobs-1.0-SNAPSHOT.jar"); } @Override @@ -111,8 +119,11 @@ class ItComputeTestStandalone extends ItComputeBaseTest { CompletionException ex0 = assertThrows(CompletionException.class, result::join); - assertPublicException(ex0, COMMON_ERR_GROUP, INTERNAL_ERR, - "org.example.ConcatJob. Deployment unit non-existing:1.0.0 doesn't exist"); + assertComputeException( + ex0, + ClassNotFoundException.class, + "org.example.ConcatJob. Deployment unit non-existing:1.0.0 doesn't exist" + ); } @Test @@ -175,9 +186,11 @@ class ItComputeTestStandalone extends ItComputeBaseTest { .resultAsync(); CompletionException ex0 = assertThrows(CompletionException.class, failedJob::join); - assertPublicException(ex0, COMMON_ERR_GROUP, INTERNAL_ERR, - "org.example.SleepJob. Deployment unit jobs:1.0.0 can't be used: " - + "[clusterStatus = OBSOLETE, nodeStatus = OBSOLETE]"); + assertComputeException( + ex0, + ClassNotFoundException.class, + "Deployment unit jobs:1.0.0 can't be used: [clusterStatus = OBSOLETE, nodeStatus = OBSOLETE]" + ); assertThat(successJob, willCompleteSuccessfully()); } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java index cf63bab9cc..28e48a6023 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java @@ -19,13 +19,16 @@ package org.apache.ignite.internal.compute; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; +import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause; import static org.apache.ignite.lang.ErrorGroups.Compute.CLASS_INITIALIZATION_ERR; +import static org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR; import java.lang.reflect.Constructor; import java.util.Collection; import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.stream.Collectors; import org.apache.ignite.compute.ComputeException; import org.apache.ignite.compute.ComputeJob; @@ -39,6 +42,8 @@ import org.apache.ignite.internal.compute.message.JobChangePriorityResponse; import org.apache.ignite.internal.compute.message.JobResultResponse; import org.apache.ignite.internal.compute.message.JobStatusResponse; import org.apache.ignite.internal.compute.message.JobStatusesResponse; +import org.apache.ignite.lang.IgniteCheckedException; +import org.apache.ignite.lang.IgniteException; import org.jetbrains.annotations.Nullable; /** @@ -216,4 +221,30 @@ public class ComputeUtils { .map(it -> new DeploymentUnit(it.name(), Version.parseVersion(it.version()))) .collect(Collectors.toList()); } + + /** + * Returns a new CompletableFuture that, when the given {@code origin} future completes exceptionally, maps the origin's exception to a + * public Compute exception if it is needed. + * + * @param origin The future to use to create a new stage. + * @param <R> Type os result. + * @return New CompletableFuture. + */ + public static <R> CompletableFuture<R> convertToComputeFuture(CompletableFuture<R> origin) { + return origin.handle((res, err) -> { + if (err != null) { + throw new CompletionException(mapToComputeException(unwrapCause(err))); + } + + return res; + }); + } + + private static Throwable mapToComputeException(Throwable origin) { + if (origin instanceof IgniteException || origin instanceof IgniteCheckedException) { + return origin; + } else { + return new ComputeException(COMPUTE_JOB_FAILED_ERR, "Job execution failed: " + origin, origin); + } + } } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java index 14d7354d3a..e7d6c43c03 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java @@ -20,7 +20,9 @@ package org.apache.ignite.internal.compute; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.function.Function.identity; import static java.util.stream.Collectors.toUnmodifiableMap; +import static org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.mapToPublicException; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause; import java.util.Collection; import java.util.HashSet; @@ -147,7 +149,7 @@ public class IgniteComputeImpl implements IgniteComputeInternal { try { return this.<R>executeAsync(nodes, units, jobClassName, options, args).resultAsync().join(); } catch (CompletionException e) { - throw ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e)); + throw ExceptionUtils.sneakyThrow(mapToPublicException(unwrapCause(e))); } } @@ -268,7 +270,7 @@ public class IgniteComputeImpl implements IgniteComputeInternal { try { return this.<R>executeColocatedAsync(tableName, key, units, jobClassName, options, args).resultAsync().join(); } catch (CompletionException e) { - throw ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e)); + throw ExceptionUtils.sneakyThrow(mapToPublicException(unwrapCause(e))); } } @@ -287,7 +289,7 @@ public class IgniteComputeImpl implements IgniteComputeInternal { return this.<K, R>executeColocatedAsync(tableName, key, keyMapper, units, jobClassName, options, args).resultAsync() .join(); } catch (CompletionException e) { - throw ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e)); + throw ExceptionUtils.sneakyThrow(mapToPublicException(unwrapCause(e))); } } diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionFutureWrapper.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionFutureWrapper.java index fa74709eaf..ec61c43e94 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionFutureWrapper.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionFutureWrapper.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.compute; +import static org.apache.ignite.internal.compute.ComputeUtils.convertToComputeFuture; import static org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture; import java.util.concurrent.CompletableFuture; @@ -38,7 +39,7 @@ class JobExecutionFutureWrapper<R> implements JobExecution<R> { @Override public CompletableFuture<R> resultAsync() { - return convertToPublicFuture(delegate.thenCompose(JobExecution::resultAsync)); + return convertToComputeFuture(delegate.thenCompose(JobExecution::resultAsync)); } @Override diff --git a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java index 24127cb16a..1a9e3adbc9 100644 --- a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java +++ b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.compute; +import static org.apache.ignite.internal.compute.ComputeUtils.convertToComputeFuture; import static org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture; import java.util.concurrent.CompletableFuture; @@ -38,7 +39,7 @@ class JobExecutionWrapper<R> implements JobExecution<R> { @Override public CompletableFuture<R> resultAsync() { - return convertToPublicFuture(delegate.resultAsync()); + return convertToComputeFuture(delegate.resultAsync()); } @Override diff --git a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/ComputeTestUtils.java b/modules/core/src/testFixtures/java/org/apache/ignite/internal/IgniteExceptionTestUtils.java similarity index 59% rename from modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/ComputeTestUtils.java rename to modules/core/src/testFixtures/java/org/apache/ignite/internal/IgniteExceptionTestUtils.java index e7bd2d5eab..5488c87c11 100644 --- a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/ComputeTestUtils.java +++ b/modules/core/src/testFixtures/java/org/apache/ignite/internal/IgniteExceptionTestUtils.java @@ -15,64 +15,76 @@ * limitations under the License. */ -package org.apache.ignite.internal.compute.utils; +package org.apache.ignite.internal; +import static org.apache.ignite.lang.ErrorGroups.extractGroupCode; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import org.apache.ignite.internal.util.ExceptionUtils; -import org.apache.ignite.lang.ErrorGroup; +import org.apache.ignite.lang.IgniteCheckedException; import org.apache.ignite.lang.IgniteException; -import org.hamcrest.Matchers; +import org.apache.ignite.lang.TraceableException; /** - * Test utils for Compute. + * Test utils for checking public exceptions. */ -public class ComputeTestUtils { +public class IgniteExceptionTestUtils { /** * <em>Assert</em> that passed throwable is a public exception with expected error group, code and message. * * @param throwable exception to check. - * @param expectedErrorGroup - expected {@link ErrorGroup}. * @param expectedErrorCode - expected error code. * @param containMessage - message that exception should contain. */ public static void assertPublicException( Throwable throwable, - ErrorGroup expectedErrorGroup, int expectedErrorCode, String containMessage ) { - assertPublicException(throwable, IgniteException.class, expectedErrorGroup, expectedErrorCode, containMessage); + assertTraceableException(throwable, IgniteException.class, expectedErrorCode, containMessage); } /** - * <em>Assert</em> that passed throwable is a public exception with expected type, error group, code and message. + * <em>Assert</em> that passed throwable is a public checked exception with expected error group, code and message. + * + * @param throwable exception to check. + * @param expectedErrorCode - expected error code. + * @param containMessage - message that exception should contain. + */ + public static void assertPublicCheckedException( + Throwable throwable, + int expectedErrorCode, + String containMessage + ) { + assertTraceableException(throwable, IgniteCheckedException.class, expectedErrorCode, containMessage); + } + + /** + * <em>Assert</em> that passed throwable is a traceable exception with expected type, error group, code and message. * * @param throwable - exception to check. * @param expectedType - expected public exception type. - * @param expectedErrorGroup - expected {@link ErrorGroup}. * @param expectedErrorCode - expected error code. * @param containMessage - message that exception should contain. */ - public static void assertPublicException( + public static void assertTraceableException( Throwable throwable, - Class<? extends IgniteException> expectedType, - ErrorGroup expectedErrorGroup, + Class<? extends TraceableException> expectedType, int expectedErrorCode, String containMessage ) { Throwable cause = ExceptionUtils.unwrapCause(throwable); assertThat(cause, instanceOf(expectedType)); - IgniteException ex = expectedType.cast(cause); + TraceableException ex = expectedType.cast(cause); - assertThat(ex.groupCode(), is(expectedErrorGroup.groupCode())); - assertThat(ex.groupName(), is(expectedErrorGroup.name())); + assertThat(ex.groupCode(), is(extractGroupCode(expectedErrorCode))); assertThat(ex.code(), is(expectedErrorCode)); assertThat(ex.traceId(), is(notNullValue())); - assertThat(ex.getMessage(), Matchers.containsString(containMessage)); + assertThat(cause.getMessage(), containsString(containMessage)); } } diff --git a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs index 838265a57d..5b534d95bb 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs @@ -347,15 +347,13 @@ namespace Apache.Ignite.Tests.Compute public async Task TestExceptionInJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTrace() { var jobExecution = await Client.Compute.ExecuteAsync<object>(await GetNodeAsync(1), Units, ExceptionJob, "foo-bar"); - var ex = Assert.ThrowsAsync<IgniteException>(async () => await jobExecution.GetResultAsync()); + var ex = Assert.ThrowsAsync<ComputeException>(async () => await jobExecution.GetResultAsync()); - Assert.AreEqual("Test exception: foo-bar", ex!.Message); + Assert.AreEqual("Job execution failed: java.lang.RuntimeException: Test exception: foo-bar", ex!.Message); Assert.IsNotNull(ex.InnerException); var str = ex.ToString(); - // TODO IGNITE-20858: Fix once user errors are handled properly - StringAssert.Contains("Apache.Ignite.IgniteException: Test exception: foo-bar", str); StringAssert.Contains( "at org.apache.ignite.internal.runner.app.PlatformTestNodeRunner$ExceptionJob.execute(PlatformTestNodeRunner.java:", str); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java index 6473062580..373206bfd2 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java @@ -22,11 +22,13 @@ import static org.apache.ignite.compute.JobState.COMPLETED; import static org.apache.ignite.compute.JobState.EXECUTING; import static org.apache.ignite.compute.JobState.FAILED; import static org.apache.ignite.compute.JobState.QUEUED; +import static org.apache.ignite.internal.IgniteExceptionTestUtils.assertTraceableException; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe; import static org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState; import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; +import static org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR; import static org.apache.ignite.lang.ErrorGroups.Table.COLUMN_ALREADY_EXISTS_ERR; import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; @@ -57,10 +59,12 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.ignite.client.IgniteClient; import org.apache.ignite.client.IgniteClient.Builder; import org.apache.ignite.client.IgniteClientConnectionException; +import org.apache.ignite.compute.ComputeException; import org.apache.ignite.compute.ComputeJob; import org.apache.ignite.compute.DeploymentUnit; import org.apache.ignite.compute.JobExecution; @@ -72,7 +76,6 @@ import org.apache.ignite.table.mapper.Mapper; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; -import org.junit.jupiter.params.provider.ValueSource; /** * Thin client compute integration test. @@ -282,30 +285,24 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { assertThat(execution.statusAsync(), willBe(jobStatusWithState(COMPLETED))); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testIgniteExceptionInJobPropagatesToClientWithMessageAndCodeAndTraceId(boolean async) { - IgniteException cause; - - if (async) { - JobExecution<String> execution = client().compute() - .executeAsync(Set.of(node(0)), List.of(), IgniteExceptionJob.class.getName()); - - CompletionException ex = assertThrows( - CompletionException.class, - () -> execution.resultAsync().join() - ); - - assertThat(execution.statusAsync(), willBe(jobStatusWithState(FAILED))); + @Test + void testIgniteExceptionInJobPropagatesToClientWithMessageAndCodeAndTraceIdAsync() { + IgniteException cause = getExceptionInJobExecutionAsync( + client().compute().executeAsync(Set.of(node(0)), List.of(), IgniteExceptionJob.class.getName()) + ); - cause = (IgniteException) ex.getCause(); - } else { - IgniteException ex = assertThrows( - IgniteException.class, - () -> client().compute().<String>execute(Set.of(node(0)), List.of(), IgniteExceptionJob.class.getName())); + assertThat(cause.getMessage(), containsString("Custom job error")); + assertEquals(TRACE_ID, cause.traceId()); + assertEquals(COLUMN_ALREADY_EXISTS_ERR, cause.code()); + assertInstanceOf(CustomException.class, cause); + assertNull(cause.getCause()); // No stack trace by default. + } - cause = (IgniteException) ex.getCause(); - } + @Test + void testIgniteExceptionInJobPropagatesToClientWithMessageAndCodeAndTraceIdSync() { + IgniteException cause = getExceptionInJobExecutionSync( + () -> client().compute().execute(Set.of(node(0)), List.of(), IgniteExceptionJob.class.getName()) + ); assertThat(cause.getMessage(), containsString("Custom job error")); assertEquals(TRACE_ID, cause.traceId()); @@ -314,70 +311,187 @@ public class ItThinClientComputeTest extends ItAbstractThinClientTest { assertNull(cause.getCause()); // No stack trace by default. } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testExceptionInJobPropagatesToClientWithClassAndMessage(boolean async) { - IgniteException cause; + @Test + void testExceptionInJobPropagatesToClientWithClassAndMessageAsync() { + IgniteException cause = getExceptionInJobExecutionAsync( + client().compute().executeAsync(Set.of(node(0)), List.of(), ExceptionJob.class.getName()) + ); - if (async) { - JobExecution<String> execution = client().compute().executeAsync(Set.of(node(0)), List.of(), ExceptionJob.class.getName()); + assertComputeExceptionWithClassAndMessage(cause); + } - CompletionException ex = assertThrows( - CompletionException.class, - () -> execution.resultAsync().join() - ); + @Test + void testExceptionInJobPropagatesToClientWithClassAndMessageSync() { + IgniteException cause = getExceptionInJobExecutionSync( + () -> client().compute().execute(Set.of(node(0)), List.of(), ExceptionJob.class.getName()) + ); - assertThat(execution.statusAsync(), willBe(jobStatusWithState(FAILED))); + assertComputeExceptionWithClassAndMessage(cause); + } - cause = (IgniteException) ex.getCause(); - } else { - IgniteException ex = assertThrows( - IgniteException.class, - () -> client().compute().<String>execute(Set.of(node(0)), List.of(), ExceptionJob.class.getName())); + @Test + void testExceptionInJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTraceAsync() { + // Second node has sendServerExceptionStackTraceToClient enabled. + IgniteException cause = getExceptionInJobExecutionAsync( + client().compute().executeAsync(Set.of(node(1)), List.of(), ExceptionJob.class.getName()) + ); - cause = (IgniteException) ex.getCause(); - } + assertComputeExceptionWithStackTrace(cause); + } - // TODO IGNITE-20858: Once user errors are handled properly, make sure the cause is ArithmeticException - assertThat(cause.getMessage(), containsString("math err")); - assertEquals(INTERNAL_ERR, cause.code()); - assertNull(cause.getCause()); // No stack trace by default. + @Test + void testExceptionInJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTraceSync() { + // Second node has sendServerExceptionStackTraceToClient enabled. + IgniteException cause = getExceptionInJobExecutionSync( + () -> client().compute().execute(Set.of(node(1)), List.of(), ExceptionJob.class.getName()) + ); + + assertComputeExceptionWithStackTrace(cause); } - @ParameterizedTest - @ValueSource(booleans = {true, false}) - void testExceptionInJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTrace(boolean async) { + @Test + void testExceptionInBroadcastJobPropagatesToClient() { + Map<ClusterNode, JobExecution<String>> executions = client().compute().broadcastAsync( + Set.of(node(0), node(1)), List.of(), ExceptionJob.class.getName() + ); + + assertComputeExceptionWithClassAndMessage(getExceptionInJobExecutionAsync(executions.get(node(0)))); + // Second node has sendServerExceptionStackTraceToClient enabled. - IgniteException cause; + assertComputeExceptionWithStackTrace(getExceptionInJobExecutionAsync(executions.get(node(1)))); + } - if (async) { - JobExecution<String> execution = client().compute().executeAsync(Set.of(node(1)), List.of(), ExceptionJob.class.getName()); + @Test + void testExceptionInColocatedTupleJobPropagatesToClientWithClassAndMessageAsync() { + var key = Tuple.create().set(COLUMN_KEY, 1); - CompletionException ex = assertThrows( - CompletionException.class, - () -> execution.resultAsync().join() - ); + IgniteException cause = getExceptionInJobExecutionAsync( + client().compute().executeColocatedAsync(TABLE_NAME, key, List.of(), ExceptionJob.class.getName() + )); - assertThat(execution.statusAsync(), willBe(jobStatusWithState(FAILED))); + assertComputeExceptionWithClassAndMessage(cause); + } - cause = (IgniteException) ex.getCause(); - } else { - IgniteException ex = assertThrows( - IgniteException.class, - () -> client().compute().execute(Set.of(node(1)), List.of(), ExceptionJob.class.getName())); + @Test + void testExceptionInColocatedTupleJobPropagatesToClientWithClassAndMessageSync() { + var key = Tuple.create().set(COLUMN_KEY, 1); - cause = (IgniteException) ex.getCause(); - } + IgniteException cause = getExceptionInJobExecutionSync( + () -> client().compute().executeColocated(TABLE_NAME, key, List.of(), ExceptionJob.class.getName()) + ); - // TODO IGNITE-20858: Once user errors are handled properly, make sure the cause is ArithmeticException - assertThat(cause.getMessage(), containsString("math err")); - assertEquals(INTERNAL_ERR, cause.code()); + assertComputeExceptionWithClassAndMessage(cause); + } + + @Test + void testExceptionInColocatedTupleJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTraceAsync() { + // Second node has sendServerExceptionStackTraceToClient enabled. + var key = Tuple.create().set(COLUMN_KEY, 2); + + IgniteException cause = getExceptionInJobExecutionAsync( + client().compute().executeColocatedAsync(TABLE_NAME, key, List.of(), ExceptionJob.class.getName()) + ); + + assertComputeExceptionWithStackTrace(cause); + } + + @Test + void testExceptionInColocatedTupleJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTraceSync() { + // Second node has sendServerExceptionStackTraceToClient enabled. + var key = Tuple.create().set(COLUMN_KEY, 2); + + IgniteException cause = getExceptionInJobExecutionSync( + () -> client().compute().executeColocated(TABLE_NAME, key, List.of(), ExceptionJob.class.getName()) + ); + + assertComputeExceptionWithStackTrace(cause); + } + + @Test + void testExceptionInColocatedPojoJobPropagatesToClientWithClassAndMessageAsync() { + var key = new TestPojo(1); + Mapper<TestPojo> mapper = Mapper.of(TestPojo.class); + + IgniteException cause = getExceptionInJobExecutionAsync( + client().compute().executeColocatedAsync(TABLE_NAME, key, mapper, List.of(), ExceptionJob.class.getName()) + ); + + assertComputeExceptionWithClassAndMessage(cause); + } + + @Test + void testExceptionInColocatedPojoJobPropagatesToClientWithClassAndMessageSync() { + var key = new TestPojo(1); + Mapper<TestPojo> mapper = Mapper.of(TestPojo.class); + + IgniteException cause = getExceptionInJobExecutionSync( + () -> client().compute().executeColocated(TABLE_NAME, key, mapper, List.of(), ExceptionJob.class.getName()) + ); + + assertComputeExceptionWithClassAndMessage(cause); + } + + @Test + void testExceptionInColocatedPojoJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTraceAsync() { + // Second node has sendServerExceptionStackTraceToClient enabled. + var key = new TestPojo(2); + Mapper<TestPojo> mapper = Mapper.of(TestPojo.class); + + IgniteException cause = getExceptionInJobExecutionAsync( + client().compute().executeColocatedAsync(TABLE_NAME, key, mapper, List.of(), ExceptionJob.class.getName()) + ); + + assertComputeExceptionWithStackTrace(cause); + } + + @Test + void testExceptionInColocatedPojoJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTraceSync() { + // Second node has sendServerExceptionStackTraceToClient enabled. + var key = new TestPojo(2); + Mapper<TestPojo> mapper = Mapper.of(TestPojo.class); + + IgniteException cause = getExceptionInJobExecutionSync( + () -> client().compute().executeColocated(TABLE_NAME, key, mapper, List.of(), ExceptionJob.class.getName()) + ); + + assertComputeExceptionWithStackTrace(cause); + } + + private static IgniteException getExceptionInJobExecutionAsync(JobExecution<String> execution) { + CompletionException ex = assertThrows( + CompletionException.class, + () -> execution.resultAsync().join() + ); + + assertThat(execution.statusAsync(), willBe(jobStatusWithState(FAILED))); + + return (IgniteException) ex.getCause(); + } + + private static IgniteException getExceptionInJobExecutionSync(Supplier<String> execution) { + IgniteException ex = assertThrows(IgniteException.class, execution::get); + + return (IgniteException) ex.getCause(); + } + + private static void assertComputeExceptionWithClassAndMessage(IgniteException cause) { + String expectedMessage = "Job execution failed: java.lang.ArithmeticException: math err"; + assertTraceableException(cause, ComputeException.class, COMPUTE_JOB_FAILED_ERR, expectedMessage); + + assertNull(cause.getCause()); // No stack trace by default. + } + + private static void assertComputeExceptionWithStackTrace(IgniteException cause) { + String expectedMessage = "Job execution failed: java.lang.ArithmeticException: math err"; + assertTraceableException(cause, ComputeException.class, COMPUTE_JOB_FAILED_ERR, expectedMessage); assertNotNull(cause.getCause()); assertThat(cause.getCause().getMessage(), containsString( - "at org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$" - + "ExceptionJob.execute(ItThinClientComputeTest.java:")); + "Caused by: java.lang.ArithmeticException: math err" + System.lineSeparator() + + "\tat org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$" + + "ExceptionJob.execute(ItThinClientComputeTest.java:") + ); } @ParameterizedTest diff --git a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java index 0dba6e079b..15f3b15784 100644 --- a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java +++ b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java @@ -75,7 +75,8 @@ public abstract class ClusterPerClassIntegrationTest extends IgniteIntegrationTe + " }\n" + " },\n" + " clientConnector: { port:{} },\n" - + " rest.port: {}\n" + + " rest.port: {},\n" + + " compute.threadPoolSize: 1\n" + "}"; /** Cluster nodes. */ diff --git a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java index e21b6f4ee7..d115619117 100644 --- a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java +++ b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java @@ -47,11 +47,9 @@ public abstract class ClusterPerTestIntegrationTest extends IgniteIntegrationTes + " }\n" + " },\n" + " clientConnector: { port:{} },\n" - + " rest.port: {}\n" - + " compute: {\n" - + " threadPoolSize: 1\n" - + " }\n" - + "}\n"; + + " rest.port: {},\n" + + " compute.threadPoolSize: 1\n" + + "}"; /** Template for node bootstrap config with Scalecube settings for fast failure detection. */ public static final String FAST_FAILURE_DETECTION_NODE_BOOTSTRAP_CFG_TEMPLATE = "{\n"