This is an automated email from the ASF dual-hosted git repository.

apkhmv pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f31157466 IGNITE-20858 Improve error handling in compute jobs (#3121)
6f31157466 is described below

commit 6f3115746698273b3b22dd4f59bf0ad87a67d982
Author: Ivan Gagarkin <gagarkin....@gmail.com>
AuthorDate: Thu Feb 15 11:22:16 2024 +0100

    IGNITE-20858 Improve error handling in compute jobs (#3121)
---
 .../ignite/internal/compute/ItComputeBaseTest.java |  51 +++--
 .../internal/compute/ItComputeTestEmbedded.java    |  99 ++++++--
 .../internal/compute/ItComputeTestStandalone.java  |  31 ++-
 .../ignite/internal/compute/ComputeUtils.java      |  31 +++
 .../ignite/internal/compute/IgniteComputeImpl.java |   8 +-
 .../compute/JobExecutionFutureWrapper.java         |   3 +-
 .../internal/compute/JobExecutionWrapper.java      |   3 +-
 .../ignite/internal/IgniteExceptionTestUtils.java} |  46 ++--
 .../Apache.Ignite.Tests/Compute/ComputeTests.cs    |   6 +-
 .../runner/app/client/ItThinClientComputeTest.java | 250 +++++++++++++++------
 .../internal/ClusterPerClassIntegrationTest.java   |   3 +-
 .../internal/ClusterPerTestIntegrationTest.java    |   8 +-
 12 files changed, 394 insertions(+), 145 deletions(-)

diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
index abf492e3e8..38e36152c7 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeBaseTest.java
@@ -20,11 +20,10 @@ package org.apache.ignite.internal.compute;
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.compute.JobState.COMPLETED;
 import static org.apache.ignite.compute.JobState.FAILED;
-import static 
org.apache.ignite.internal.compute.utils.ComputeTestUtils.assertPublicException;
+import static 
org.apache.ignite.internal.IgniteExceptionTestUtils.assertTraceableException;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState;
-import static org.apache.ignite.lang.ErrorGroups.Common.COMMON_ERR_GROUP;
-import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+import static 
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.aMapWithSize;
 import static org.hamcrest.Matchers.containsString;
@@ -43,10 +42,12 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.compute.ComputeException;
 import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.compute.JobExecution;
-import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
 import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.lang.TableNotFoundException;
 import org.apache.ignite.network.ClusterNode;
@@ -57,7 +58,7 @@ import org.junit.jupiter.api.Test;
 /**
  * Base integration tests for Compute functionality.
  */
-public abstract class ItComputeBaseTest extends ClusterPerTestIntegrationTest {
+public abstract class ItComputeBaseTest extends ClusterPerClassIntegrationTest 
{
     protected abstract List<DeploymentUnit> units();
 
     protected abstract String concatJobClassName();
@@ -76,6 +77,10 @@ public abstract class ItComputeBaseTest extends 
ClusterPerTestIntegrationTest {
         assertThat(result, is("a42"));
     }
 
+    IgniteImpl node(int i) {
+        return CLUSTER.node(i);
+    }
+
     @Test
     void executesJobLocallyAsync() {
         IgniteImpl entryNode = node(0);
@@ -140,7 +145,7 @@ public abstract class ItComputeBaseTest extends 
ClusterPerTestIntegrationTest {
         IgniteException ex = assertThrows(IgniteException.class, () -> 
entryNode.compute()
                 .execute(Set.of(entryNode.node()), units(), 
failingJobClassName()));
 
-        assertPublicException(ex, COMMON_ERR_GROUP, INTERNAL_ERR, "Oops");
+        assertComputeException(ex, "JobException", "Oops");
     }
 
     @Test
@@ -152,7 +157,7 @@ public abstract class ItComputeBaseTest extends 
ClusterPerTestIntegrationTest {
 
         ExecutionException ex = assertThrows(ExecutionException.class, () -> 
execution.resultAsync().get(1, TimeUnit.SECONDS));
 
-        assertPublicException(ex, COMMON_ERR_GROUP, INTERNAL_ERR, "Oops");
+        assertComputeException(ex, "JobException", "Oops");
 
         assertThat(execution.statusAsync(), 
willBe(jobStatusWithState(FAILED)));
         assertThat(execution.cancelAsync(), willBe(false));
@@ -165,7 +170,7 @@ public abstract class ItComputeBaseTest extends 
ClusterPerTestIntegrationTest {
         IgniteException ex = assertThrows(IgniteException.class, () -> 
entryNode.compute()
                 .execute(Set.of(node(1).node(), node(2).node()), units(), 
failingJobClassName()));
 
-        assertPublicException(ex, COMMON_ERR_GROUP, INTERNAL_ERR, "Oops");
+        assertComputeException(ex, "JobException", "Oops");
     }
 
     @Test
@@ -177,7 +182,7 @@ public abstract class ItComputeBaseTest extends 
ClusterPerTestIntegrationTest {
 
         ExecutionException ex = assertThrows(ExecutionException.class, () -> 
execution.resultAsync().get(1, TimeUnit.SECONDS));
 
-        assertPublicException(ex, COMMON_ERR_GROUP, INTERNAL_ERR, "Oops");
+        assertComputeException(ex, "JobException", "Oops");
 
         assertThat(execution.statusAsync(), 
willBe(jobStatusWithState(FAILED)));
         assertThat(execution.cancelAsync(), willBe(false));
@@ -232,7 +237,7 @@ public abstract class ItComputeBaseTest extends 
ClusterPerTestIntegrationTest {
                     .get(1, TimeUnit.SECONDS);
 
             assertThat(result, is(instanceOf(CompletionException.class)));
-            assertPublicException(result, COMMON_ERR_GROUP, INTERNAL_ERR, 
"Oops");
+            assertComputeException(result, "JobException", "Oops");
 
             assertThat(execution.statusAsync(), 
willBe(jobStatusWithState(FAILED)));
             assertThat(execution.cancelAsync(), willBe(false));
@@ -258,7 +263,7 @@ public abstract class ItComputeBaseTest extends 
ClusterPerTestIntegrationTest {
         IgniteImpl entryNode = node(0);
 
         JobExecution<String> execution = entryNode.compute()
-                .<String>executeColocatedAsync("test", 
Tuple.create(Map.of("k", 1)), units(), getNodeNameJobClassName());
+                .executeColocatedAsync("test", Tuple.create(Map.of("k", 1)), 
units(), getNodeNameJobClassName());
 
         assertThat(execution.resultAsync(), willBe(in(allNodeNames())));
         assertThat(execution.statusAsync(), 
willBe(jobStatusWithState(COMPLETED)));
@@ -277,9 +282,10 @@ public abstract class ItComputeBaseTest extends 
ClusterPerTestIntegrationTest {
         assertThat(ex.getCause().getMessage(), containsString("The table does 
not exist [name=\"PUBLIC\".\"bad-table\"]"));
     }
 
-    private void createTestTableWithOneRow() {
-        executeSql("CREATE TABLE test (k int, v int, CONSTRAINT PK PRIMARY KEY 
(k))");
-        executeSql("INSERT INTO test(k, v) VALUES (1, 101)");
+    private static void createTestTableWithOneRow() {
+        sql("DROP TABLE IF EXISTS test");
+        sql("CREATE TABLE test (k int, v int, CONSTRAINT PK PRIMARY KEY (k))");
+        sql("INSERT INTO test(k, v) VALUES (1, 101)");
     }
 
     private List<String> allNodeNames() {
@@ -308,10 +314,25 @@ public abstract class ItComputeBaseTest extends 
ClusterPerTestIntegrationTest {
         IgniteImpl entryNode = node(0);
 
         JobExecution<String> execution = entryNode.compute()
-                .<Integer, String>executeColocatedAsync("test", 1, 
Mapper.of(Integer.class), units(), getNodeNameJobClassName());
+                .executeColocatedAsync("test", 1, Mapper.of(Integer.class), 
units(), getNodeNameJobClassName());
 
         assertThat(execution.resultAsync(), willBe(in(allNodeNames())));
         assertThat(execution.statusAsync(), 
willBe(jobStatusWithState(COMPLETED)));
         assertThat(execution.cancelAsync(), willBe(false));
     }
+
+    protected static void assertComputeException(Exception ex, Throwable 
cause) {
+        assertComputeException(ex, cause.getClass().getName(), 
cause.getMessage());
+    }
+
+    protected static void assertComputeException(Exception ex, Class<?> cause, 
String causeMsgSubstring) {
+        assertComputeException(ex, cause.getName(), causeMsgSubstring);
+    }
+
+    private static void assertComputeException(Exception ex, String 
causeClass, String causeMsgSubstring) {
+        assertTraceableException(ex, ComputeException.class, 
COMPUTE_JOB_FAILED_ERR, "Job execution failed:");
+        Throwable cause = ExceptionUtils.unwrapCause(ex);
+        assertThat(cause.getCause().getClass().getName(), 
containsString(causeClass));
+        assertThat(cause.getCause().getMessage(), 
containsString(causeMsgSubstring));
+    }
 }
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
index 67a9a569b4..292a790e7e 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestEmbedded.java
@@ -18,12 +18,14 @@
 package org.apache.ignite.internal.compute;
 
 import static java.util.stream.Collectors.joining;
-import static 
org.apache.ignite.internal.compute.utils.ComputeTestUtils.assertPublicException;
+import static 
org.apache.ignite.internal.IgniteExceptionTestUtils.assertPublicCheckedException;
+import static 
org.apache.ignite.internal.IgniteExceptionTestUtils.assertPublicException;
+import static 
org.apache.ignite.internal.IgniteExceptionTestUtils.assertTraceableException;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState;
+import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Compute.CLASS_INITIALIZATION_ERR;
-import static org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_ERR_GROUP;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.is;
@@ -37,6 +39,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.compute.ComputeException;
 import org.apache.ignite.compute.ComputeJob;
@@ -46,7 +49,10 @@ import org.apache.ignite.compute.JobExecutionContext;
 import org.apache.ignite.compute.JobExecutionOptions;
 import org.apache.ignite.compute.JobState;
 import org.apache.ignite.internal.app.IgniteImpl;
-import org.apache.ignite.lang.ErrorGroup;
+import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.lang.IgniteCheckedException;
 import org.apache.ignite.lang.IgniteException;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -81,18 +87,18 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
 
     @ParameterizedTest
     @MethodSource("wrongJobClassArguments")
-    void executesWrongJobClassLocally(String jobClassName, ErrorGroup 
errorGroup, int errorCode, String msg) {
+    void executesWrongJobClassLocally(String jobClassName, int errorCode, 
String msg) {
         IgniteImpl entryNode = node(0);
 
         IgniteException ex = assertThrows(IgniteException.class, () -> 
entryNode.compute()
                 .execute(Set.of(entryNode.node()), units(), jobClassName));
 
-        assertPublicException(ex, ComputeException.class, errorGroup, 
errorCode, msg);
+        assertTraceableException(ex, ComputeException.class, errorCode, msg);
     }
 
     @ParameterizedTest
     @MethodSource("wrongJobClassArguments")
-    void executesWrongJobClassLocallyAsync(String jobClassName, ErrorGroup 
errorGroup, int errorCode, String msg) {
+    void executesWrongJobClassLocallyAsync(String jobClassName, int errorCode, 
String msg) {
         IgniteImpl entryNode = node(0);
 
         ExecutionException ex = assertThrows(ExecutionException.class, () -> 
entryNode.compute()
@@ -100,23 +106,23 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
                 .resultAsync()
                 .get(1, TimeUnit.SECONDS));
 
-        assertPublicException(ex, ComputeException.class, errorGroup, 
errorCode, msg);
+        assertTraceableException(ex, ComputeException.class, errorCode, msg);
     }
 
     @ParameterizedTest
     @MethodSource("wrongJobClassArguments")
-    void executesWrongJobClassOnRemoteNodes(String jobClassName, ErrorGroup 
errorGroup, int errorCode, String msg) {
+    void executesWrongJobClassOnRemoteNodes(String jobClassName, int 
errorCode, String msg) {
         Ignite entryNode = node(0);
 
         IgniteException ex = assertThrows(IgniteException.class, () -> 
entryNode.compute()
                 .execute(Set.of(node(1).node(), node(2).node()), units(), 
jobClassName));
 
-        assertPublicException(ex, ComputeException.class, errorGroup, 
errorCode, msg);
+        assertTraceableException(ex, ComputeException.class, errorCode, msg);
     }
 
     @ParameterizedTest
     @MethodSource("wrongJobClassArguments")
-    void executesWrongJobClassOnRemoteNodesAsync(String jobClassName, 
ErrorGroup errorGroup, int errorCode, String msg) {
+    void executesWrongJobClassOnRemoteNodesAsync(String jobClassName, int 
errorCode, String msg) {
         Ignite entryNode = node(0);
 
         ExecutionException ex = assertThrows(ExecutionException.class, () -> 
entryNode.compute()
@@ -124,7 +130,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
                 .resultAsync()
                 .get(1, TimeUnit.SECONDS));
 
-        assertPublicException(ex, ComputeException.class, errorGroup, 
errorCode, msg);
+        assertTraceableException(ex, ComputeException.class, errorCode, msg);
     }
 
     @Test
@@ -164,6 +170,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
         await().until(execution::statusAsync, 
willBe(jobStatusWithState(JobState.EXECUTING)));
 
         assertThat(execution.changePriorityAsync(2), willBe(false));
+        assertThat(execution.cancelAsync(), willBe(true));
     }
 
     @Test
@@ -175,6 +182,7 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
         await().until(execution::statusAsync, 
willBe(jobStatusWithState(JobState.EXECUTING)));
 
         assertThat(execution.changePriorityAsync(2), willBe(false));
+        assertThat(execution.cancelAsync(), willBe(true));
     }
 
     @Test
@@ -211,11 +219,12 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
         // Tasks 1 and 3 completed successfully
         assertThat(execution1.resultAsync(), willCompleteSuccessfully());
         assertThat(execution3.resultAsync(), willCompleteSuccessfully());
-        assertThat(execution1.resultAsync().isDone(), is(true));
-        assertThat(execution3.resultAsync().isDone(), is(true));
 
         // Task 2 is not completed
         assertThat(execution2.resultAsync().isDone(), is(false));
+
+        // Finish task 2
+        assertThat(execution2.cancelAsync(), willBe(true));
     }
 
     @Test
@@ -254,8 +263,6 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
         // Tasks 1 and 3 completed successfully
         assertThat(execution1.resultAsync(), willCompleteSuccessfully());
         assertThat(execution3.resultAsync(), willCompleteSuccessfully());
-        assertThat(execution1.resultAsync().isDone(), is(true));
-        assertThat(execution3.resultAsync().isDone(), is(true));
 
         // Task 3 should be executed 2 times
         assertEquals(2, 
WaitLatchThrowExceptionOnFirstExecutionJob.counter.get());
@@ -264,7 +271,51 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
         assertThat(execution2.resultAsync().isDone(), is(false));
 
         // Cancel task2
-        execution2.cancelAsync().join();
+        assertThat(execution2.cancelAsync(), willBe(true));
+    }
+
+    @Test
+    void shouldNotConvertIgniteException() {
+        IgniteImpl entryNode = node(0);
+
+        IgniteException exception = new IgniteException(INTERNAL_ERR, "Test 
exception");
+
+        IgniteException ex = assertThrows(IgniteException.class, () -> 
entryNode.compute()
+                .execute(Set.of(entryNode.node()), units(), 
CustomFailingJob.class.getName(), exception));
+
+        assertPublicException(ex, exception.code(), exception.getMessage());
+    }
+
+    @Test
+    void shouldNotConvertIgniteCheckedException() {
+        IgniteImpl entryNode = node(0);
+
+        IgniteCheckedException exception = new 
IgniteCheckedException(INTERNAL_ERR, "Test exception");
+
+        IgniteCheckedException ex = assertThrows(IgniteCheckedException.class, 
() -> entryNode.compute()
+                .execute(Set.of(entryNode.node()), units(), 
CustomFailingJob.class.getName(), exception));
+
+        assertPublicCheckedException(ex, exception.code(), 
exception.getMessage());
+    }
+
+    private static Stream<Arguments> privateExceptions() {
+        return Stream.of(
+                Arguments.of(new IgniteInternalException(INTERNAL_ERR, "Test 
exception")),
+                Arguments.of(new IgniteInternalCheckedException(INTERNAL_ERR, 
"Test exception")),
+                Arguments.of(new RuntimeException("Test exception")),
+                Arguments.of(new Exception("Test exception"))
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("privateExceptions")
+    void shouldConvertToComputeException(Throwable throwable) {
+        IgniteImpl entryNode = node(0);
+
+        IgniteException ex = assertThrows(IgniteException.class, () -> 
entryNode.compute()
+                .execute(Set.of(entryNode.node()), units(), 
CustomFailingJob.class.getName(), throwable));
+
+        assertComputeException(ex, throwable);
     }
 
     private static class ConcatJob implements ComputeJob<String> {
@@ -285,6 +336,14 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
         }
     }
 
+    private static class CustomFailingJob implements ComputeJob<String> {
+        /** {@inheritDoc} */
+        @Override
+        public String execute(JobExecutionContext context, Object... args) {
+            throw ExceptionUtils.sneakyThrow((Throwable) args[0]);
+        }
+    }
+
     private static class FailingJob implements ComputeJob<String> {
         /** {@inheritDoc} */
         @Override
@@ -301,11 +360,9 @@ class ItComputeTestEmbedded extends ItComputeBaseTest {
 
     private static List<Arguments> wrongJobClassArguments() {
         return List.of(
-                Arguments.of("org.example.NonExistentJob", COMPUTE_ERR_GROUP, 
CLASS_INITIALIZATION_ERR, "Cannot load job class by name"),
-                Arguments.of(NonComputeJob.class.getName(), COMPUTE_ERR_GROUP, 
CLASS_INITIALIZATION_ERR,
-                        "does not implement ComputeJob interface"),
-                Arguments.of(NonEmptyConstructorJob.class.getName(), 
COMPUTE_ERR_GROUP, CLASS_INITIALIZATION_ERR,
-                        "Cannot instantiate job")
+                Arguments.of("org.example.NonExistentJob", 
CLASS_INITIALIZATION_ERR, "Cannot load job class by name"),
+                Arguments.of(NonComputeJob.class.getName(), 
CLASS_INITIALIZATION_ERR, "does not implement ComputeJob interface"),
+                Arguments.of(NonEmptyConstructorJob.class.getName(), 
CLASS_INITIALIZATION_ERR, "Cannot instantiate job")
         );
     }
 
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
index 419bcbd896..a924bc4ede 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
+++ 
b/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/ItComputeTestStandalone.java
@@ -17,14 +17,11 @@
 
 package org.apache.ignite.internal.compute;
 
-import static 
org.apache.ignite.internal.compute.utils.ComputeTestUtils.assertPublicException;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.DEPLOYED;
 import static org.apache.ignite.internal.deployunit.DeploymentStatus.OBSOLETE;
 import static org.apache.ignite.internal.deployunit.InitialDeployMode.MAJORITY;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
-import static org.apache.ignite.lang.ErrorGroups.Common.COMMON_ERR_GROUP;
-import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.nullValue;
@@ -56,7 +53,18 @@ class ItComputeTestStandalone extends ItComputeBaseTest {
 
     @BeforeEach
     void setUp() throws IOException {
-        deployJar(node(0), unit.name(), unit.version(), 
"ignite-it-jobs-1.0-SNAPSHOT.jar");
+        IgniteImpl entryNode = node(0);
+        //TODO https://issues.apache.org/jira/browse/IGNITE-19757
+        try {
+            entryNode.deployment().undeployAsync(unit.name(), 
unit.version()).join();
+        } catch (Exception ignored) {
+            // ignored
+        }
+        await().until(
+                () -> entryNode.deployment().clusterStatusAsync(unit.name(), 
unit.version()),
+                willBe(nullValue())
+        );
+        deployJar(entryNode, unit.name(), unit.version(), 
"ignite-it-jobs-1.0-SNAPSHOT.jar");
     }
 
     @Override
@@ -111,8 +119,11 @@ class ItComputeTestStandalone extends ItComputeBaseTest {
 
         CompletionException ex0 = assertThrows(CompletionException.class, 
result::join);
 
-        assertPublicException(ex0, COMMON_ERR_GROUP, INTERNAL_ERR,
-                "org.example.ConcatJob. Deployment unit non-existing:1.0.0 
doesn't exist");
+        assertComputeException(
+                ex0,
+                ClassNotFoundException.class,
+                "org.example.ConcatJob. Deployment unit non-existing:1.0.0 
doesn't exist"
+        );
     }
 
     @Test
@@ -175,9 +186,11 @@ class ItComputeTestStandalone extends ItComputeBaseTest {
                 .resultAsync();
 
         CompletionException ex0 = assertThrows(CompletionException.class, 
failedJob::join);
-        assertPublicException(ex0, COMMON_ERR_GROUP, INTERNAL_ERR,
-                "org.example.SleepJob. Deployment unit jobs:1.0.0 can't be 
used: "
-                + "[clusterStatus = OBSOLETE, nodeStatus = OBSOLETE]");
+        assertComputeException(
+                ex0,
+                ClassNotFoundException.class,
+                "Deployment unit jobs:1.0.0 can't be used: [clusterStatus = 
OBSOLETE, nodeStatus = OBSOLETE]"
+        );
 
         assertThat(successJob, willCompleteSuccessfully());
     }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
index cf63bab9cc..28e48a6023 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
@@ -19,13 +19,16 @@ package org.apache.ignite.internal.compute;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static 
org.apache.ignite.lang.ErrorGroups.Compute.CLASS_INITIALIZATION_ERR;
+import static 
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR;
 
 import java.lang.reflect.Constructor;
 import java.util.Collection;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.stream.Collectors;
 import org.apache.ignite.compute.ComputeException;
 import org.apache.ignite.compute.ComputeJob;
@@ -39,6 +42,8 @@ import 
org.apache.ignite.internal.compute.message.JobChangePriorityResponse;
 import org.apache.ignite.internal.compute.message.JobResultResponse;
 import org.apache.ignite.internal.compute.message.JobStatusResponse;
 import org.apache.ignite.internal.compute.message.JobStatusesResponse;
+import org.apache.ignite.lang.IgniteCheckedException;
+import org.apache.ignite.lang.IgniteException;
 import org.jetbrains.annotations.Nullable;
 
 /**
@@ -216,4 +221,30 @@ public class ComputeUtils {
                 .map(it -> new DeploymentUnit(it.name(), 
Version.parseVersion(it.version())))
                 .collect(Collectors.toList());
     }
+
+    /**
+     * Returns a new CompletableFuture that, when the given {@code origin} 
future completes exceptionally, maps the origin's exception to a
+     * public Compute exception if it is needed.
+     *
+     * @param origin The future to use to create a new stage.
+     * @param <R> Type os result.
+     * @return New CompletableFuture.
+     */
+    public static <R> CompletableFuture<R> 
convertToComputeFuture(CompletableFuture<R> origin) {
+        return origin.handle((res, err) -> {
+            if (err != null) {
+                throw new 
CompletionException(mapToComputeException(unwrapCause(err)));
+            }
+
+            return res;
+        });
+    }
+
+    private static Throwable mapToComputeException(Throwable origin) {
+        if (origin instanceof IgniteException || origin instanceof 
IgniteCheckedException) {
+            return origin;
+        } else {
+            return new ComputeException(COMPUTE_JOB_FAILED_ERR, "Job execution 
failed: " + origin, origin);
+        }
+    }
 }
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 14d7354d3a..e7d6c43c03 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -20,7 +20,9 @@ package org.apache.ignite.internal.compute;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.function.Function.identity;
 import static java.util.stream.Collectors.toUnmodifiableMap;
+import static 
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.mapToPublicException;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 
 import java.util.Collection;
 import java.util.HashSet;
@@ -147,7 +149,7 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal {
         try {
             return this.<R>executeAsync(nodes, units, jobClassName, options, 
args).resultAsync().join();
         } catch (CompletionException e) {
-            throw 
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
+            throw 
ExceptionUtils.sneakyThrow(mapToPublicException(unwrapCause(e)));
         }
     }
 
@@ -268,7 +270,7 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal {
         try {
             return this.<R>executeColocatedAsync(tableName, key, units, 
jobClassName, options, args).resultAsync().join();
         } catch (CompletionException e) {
-            throw 
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
+            throw 
ExceptionUtils.sneakyThrow(mapToPublicException(unwrapCause(e)));
         }
     }
 
@@ -287,7 +289,7 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal {
             return this.<K, R>executeColocatedAsync(tableName, key, keyMapper, 
units, jobClassName, options, args).resultAsync()
                     .join();
         } catch (CompletionException e) {
-            throw 
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
+            throw 
ExceptionUtils.sneakyThrow(mapToPublicException(unwrapCause(e)));
         }
     }
 
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionFutureWrapper.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionFutureWrapper.java
index fa74709eaf..ec61c43e94 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionFutureWrapper.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionFutureWrapper.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.compute;
 
+import static 
org.apache.ignite.internal.compute.ComputeUtils.convertToComputeFuture;
 import static 
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
 
 import java.util.concurrent.CompletableFuture;
@@ -38,7 +39,7 @@ class JobExecutionFutureWrapper<R> implements JobExecution<R> 
{
 
     @Override
     public CompletableFuture<R> resultAsync() {
-        return 
convertToPublicFuture(delegate.thenCompose(JobExecution::resultAsync));
+        return 
convertToComputeFuture(delegate.thenCompose(JobExecution::resultAsync));
     }
 
     @Override
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java
index 24127cb16a..1a9e3adbc9 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/JobExecutionWrapper.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.compute;
 
+import static 
org.apache.ignite.internal.compute.ComputeUtils.convertToComputeFuture;
 import static 
org.apache.ignite.internal.lang.IgniteExceptionMapperUtil.convertToPublicFuture;
 
 import java.util.concurrent.CompletableFuture;
@@ -38,7 +39,7 @@ class JobExecutionWrapper<R> implements JobExecution<R> {
 
     @Override
     public CompletableFuture<R> resultAsync() {
-        return convertToPublicFuture(delegate.resultAsync());
+        return convertToComputeFuture(delegate.resultAsync());
     }
 
     @Override
diff --git 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/ComputeTestUtils.java
 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/IgniteExceptionTestUtils.java
similarity index 59%
rename from 
modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/ComputeTestUtils.java
rename to 
modules/core/src/testFixtures/java/org/apache/ignite/internal/IgniteExceptionTestUtils.java
index e7bd2d5eab..5488c87c11 100644
--- 
a/modules/compute/src/integrationTest/java/org/apache/ignite/internal/compute/utils/ComputeTestUtils.java
+++ 
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/IgniteExceptionTestUtils.java
@@ -15,64 +15,76 @@
  * limitations under the License.
  */
 
-package org.apache.ignite.internal.compute.utils;
+package org.apache.ignite.internal;
 
+import static org.apache.ignite.lang.ErrorGroups.extractGroupCode;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 
 import org.apache.ignite.internal.util.ExceptionUtils;
-import org.apache.ignite.lang.ErrorGroup;
+import org.apache.ignite.lang.IgniteCheckedException;
 import org.apache.ignite.lang.IgniteException;
-import org.hamcrest.Matchers;
+import org.apache.ignite.lang.TraceableException;
 
 /**
- * Test utils for Compute.
+ * Test utils for checking public exceptions.
  */
-public class ComputeTestUtils {
+public class IgniteExceptionTestUtils {
     /**
      * <em>Assert</em> that passed throwable is a public exception with 
expected error group, code and message.
      *
      * @param throwable exception to check.
-     * @param expectedErrorGroup - expected {@link ErrorGroup}.
      * @param expectedErrorCode - expected error code.
      * @param containMessage - message that exception should contain.
      */
     public static void assertPublicException(
             Throwable throwable,
-            ErrorGroup expectedErrorGroup,
             int expectedErrorCode,
             String containMessage
     ) {
-        assertPublicException(throwable, IgniteException.class, 
expectedErrorGroup, expectedErrorCode, containMessage);
+        assertTraceableException(throwable, IgniteException.class, 
expectedErrorCode, containMessage);
     }
 
     /**
-     * <em>Assert</em> that passed throwable is a public exception with 
expected type, error group, code and message.
+     * <em>Assert</em> that passed throwable is a public checked exception 
with expected error group, code and message.
+     *
+     * @param throwable exception to check.
+     * @param expectedErrorCode - expected error code.
+     * @param containMessage - message that exception should contain.
+     */
+    public static void assertPublicCheckedException(
+            Throwable throwable,
+            int expectedErrorCode,
+            String containMessage
+    ) {
+        assertTraceableException(throwable, IgniteCheckedException.class, 
expectedErrorCode, containMessage);
+    }
+
+    /**
+     * <em>Assert</em> that passed throwable is a traceable exception with 
expected type, error group, code and message.
      *
      * @param throwable - exception to check.
      * @param expectedType - expected public exception type.
-     * @param expectedErrorGroup - expected {@link ErrorGroup}.
      * @param expectedErrorCode - expected error code.
      * @param containMessage - message that exception should contain.
      */
-    public static void assertPublicException(
+    public static void assertTraceableException(
             Throwable throwable,
-            Class<? extends IgniteException> expectedType,
-            ErrorGroup expectedErrorGroup,
+            Class<? extends TraceableException> expectedType,
             int expectedErrorCode,
             String containMessage
     ) {
         Throwable cause = ExceptionUtils.unwrapCause(throwable);
 
         assertThat(cause, instanceOf(expectedType));
-        IgniteException ex = expectedType.cast(cause);
+        TraceableException ex = expectedType.cast(cause);
 
-        assertThat(ex.groupCode(), is(expectedErrorGroup.groupCode()));
-        assertThat(ex.groupName(), is(expectedErrorGroup.name()));
+        assertThat(ex.groupCode(), is(extractGroupCode(expectedErrorCode)));
         assertThat(ex.code(), is(expectedErrorCode));
         assertThat(ex.traceId(), is(notNullValue()));
-        assertThat(ex.getMessage(), Matchers.containsString(containMessage));
+        assertThat(cause.getMessage(), containsString(containMessage));
     }
 }
diff --git 
a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs 
b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
index 838265a57d..5b534d95bb 100644
--- a/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
+++ b/modules/platforms/dotnet/Apache.Ignite.Tests/Compute/ComputeTests.cs
@@ -347,15 +347,13 @@ namespace Apache.Ignite.Tests.Compute
         public async Task 
TestExceptionInJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTrace()
         {
             var jobExecution = await Client.Compute.ExecuteAsync<object>(await 
GetNodeAsync(1), Units, ExceptionJob, "foo-bar");
-            var ex = Assert.ThrowsAsync<IgniteException>(async () => await 
jobExecution.GetResultAsync());
+            var ex = Assert.ThrowsAsync<ComputeException>(async () => await 
jobExecution.GetResultAsync());
 
-            Assert.AreEqual("Test exception: foo-bar", ex!.Message);
+            Assert.AreEqual("Job execution failed: java.lang.RuntimeException: 
Test exception: foo-bar", ex!.Message);
             Assert.IsNotNull(ex.InnerException);
 
             var str = ex.ToString();
 
-            // TODO IGNITE-20858: Fix once user errors are handled properly
-            StringAssert.Contains("Apache.Ignite.IgniteException: Test 
exception: foo-bar", str);
             StringAssert.Contains(
                 "at 
org.apache.ignite.internal.runner.app.PlatformTestNodeRunner$ExceptionJob.execute(PlatformTestNodeRunner.java:",
                 str);
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
index 6473062580..373206bfd2 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTest.java
@@ -22,11 +22,13 @@ import static org.apache.ignite.compute.JobState.COMPLETED;
 import static org.apache.ignite.compute.JobState.EXECUTING;
 import static org.apache.ignite.compute.JobState.FAILED;
 import static org.apache.ignite.compute.JobState.QUEUED;
+import static 
org.apache.ignite.internal.IgniteExceptionTestUtils.assertTraceableException;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.will;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.JobStatusMatcher.jobStatusWithState;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+import static 
org.apache.ignite.lang.ErrorGroups.Compute.COMPUTE_JOB_FAILED_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Table.COLUMN_ALREADY_EXISTS_ERR;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -57,10 +59,12 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.ignite.client.IgniteClient;
 import org.apache.ignite.client.IgniteClient.Builder;
 import org.apache.ignite.client.IgniteClientConnectionException;
+import org.apache.ignite.compute.ComputeException;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.DeploymentUnit;
 import org.apache.ignite.compute.JobExecution;
@@ -72,7 +76,6 @@ import org.apache.ignite.table.mapper.Mapper;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
-import org.junit.jupiter.params.provider.ValueSource;
 
 /**
  * Thin client compute integration test.
@@ -282,30 +285,24 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
         assertThat(execution.statusAsync(), 
willBe(jobStatusWithState(COMPLETED)));
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    void 
testIgniteExceptionInJobPropagatesToClientWithMessageAndCodeAndTraceId(boolean 
async) {
-        IgniteException cause;
-
-        if (async) {
-            JobExecution<String> execution = client().compute()
-                    .executeAsync(Set.of(node(0)), List.of(), 
IgniteExceptionJob.class.getName());
-
-            CompletionException ex = assertThrows(
-                    CompletionException.class,
-                    () -> execution.resultAsync().join()
-            );
-
-            assertThat(execution.statusAsync(), 
willBe(jobStatusWithState(FAILED)));
+    @Test
+    void 
testIgniteExceptionInJobPropagatesToClientWithMessageAndCodeAndTraceIdAsync() {
+        IgniteException cause = getExceptionInJobExecutionAsync(
+                client().compute().executeAsync(Set.of(node(0)), List.of(), 
IgniteExceptionJob.class.getName())
+        );
 
-            cause = (IgniteException) ex.getCause();
-        } else {
-            IgniteException ex = assertThrows(
-                    IgniteException.class,
-                    () -> client().compute().<String>execute(Set.of(node(0)), 
List.of(), IgniteExceptionJob.class.getName()));
+        assertThat(cause.getMessage(), containsString("Custom job error"));
+        assertEquals(TRACE_ID, cause.traceId());
+        assertEquals(COLUMN_ALREADY_EXISTS_ERR, cause.code());
+        assertInstanceOf(CustomException.class, cause);
+        assertNull(cause.getCause()); // No stack trace by default.
+    }
 
-            cause = (IgniteException) ex.getCause();
-        }
+    @Test
+    void 
testIgniteExceptionInJobPropagatesToClientWithMessageAndCodeAndTraceIdSync() {
+        IgniteException cause = getExceptionInJobExecutionSync(
+                () -> client().compute().execute(Set.of(node(0)), List.of(), 
IgniteExceptionJob.class.getName())
+        );
 
         assertThat(cause.getMessage(), containsString("Custom job error"));
         assertEquals(TRACE_ID, cause.traceId());
@@ -314,70 +311,187 @@ public class ItThinClientComputeTest extends 
ItAbstractThinClientTest {
         assertNull(cause.getCause()); // No stack trace by default.
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    void testExceptionInJobPropagatesToClientWithClassAndMessage(boolean 
async) {
-        IgniteException cause;
+    @Test
+    void testExceptionInJobPropagatesToClientWithClassAndMessageAsync() {
+        IgniteException cause = getExceptionInJobExecutionAsync(
+                client().compute().executeAsync(Set.of(node(0)), List.of(), 
ExceptionJob.class.getName())
+        );
 
-        if (async) {
-            JobExecution<String> execution = 
client().compute().executeAsync(Set.of(node(0)), List.of(), 
ExceptionJob.class.getName());
+        assertComputeExceptionWithClassAndMessage(cause);
+    }
 
-            CompletionException ex = assertThrows(
-                    CompletionException.class,
-                    () -> execution.resultAsync().join()
-            );
+    @Test
+    void testExceptionInJobPropagatesToClientWithClassAndMessageSync() {
+        IgniteException cause = getExceptionInJobExecutionSync(
+                () -> client().compute().execute(Set.of(node(0)), List.of(), 
ExceptionJob.class.getName())
+        );
 
-            assertThat(execution.statusAsync(), 
willBe(jobStatusWithState(FAILED)));
+        assertComputeExceptionWithClassAndMessage(cause);
+    }
 
-            cause = (IgniteException) ex.getCause();
-        } else {
-            IgniteException ex = assertThrows(
-                    IgniteException.class,
-                    () -> client().compute().<String>execute(Set.of(node(0)), 
List.of(), ExceptionJob.class.getName()));
+    @Test
+    void 
testExceptionInJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTraceAsync()
 {
+        // Second node has sendServerExceptionStackTraceToClient enabled.
+        IgniteException cause = getExceptionInJobExecutionAsync(
+                client().compute().executeAsync(Set.of(node(1)), List.of(), 
ExceptionJob.class.getName())
+        );
 
-            cause = (IgniteException) ex.getCause();
-        }
+        assertComputeExceptionWithStackTrace(cause);
+    }
 
-        // TODO IGNITE-20858: Once user errors are handled properly, make sure 
the cause is ArithmeticException
-        assertThat(cause.getMessage(), containsString("math err"));
-        assertEquals(INTERNAL_ERR, cause.code());
-        assertNull(cause.getCause()); // No stack trace by default.
+    @Test
+    void 
testExceptionInJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTraceSync()
 {
+        // Second node has sendServerExceptionStackTraceToClient enabled.
+        IgniteException cause = getExceptionInJobExecutionSync(
+                () -> client().compute().execute(Set.of(node(1)), List.of(), 
ExceptionJob.class.getName())
+        );
+
+        assertComputeExceptionWithStackTrace(cause);
     }
 
-    @ParameterizedTest
-    @ValueSource(booleans = {true, false})
-    void 
testExceptionInJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTrace(boolean
 async) {
+    @Test
+    void testExceptionInBroadcastJobPropagatesToClient() {
+        Map<ClusterNode, JobExecution<String>> executions = 
client().compute().broadcastAsync(
+                Set.of(node(0), node(1)), List.of(), 
ExceptionJob.class.getName()
+        );
+
+        
assertComputeExceptionWithClassAndMessage(getExceptionInJobExecutionAsync(executions.get(node(0))));
+
         // Second node has sendServerExceptionStackTraceToClient enabled.
-        IgniteException cause;
+        
assertComputeExceptionWithStackTrace(getExceptionInJobExecutionAsync(executions.get(node(1))));
+    }
 
-        if (async) {
-            JobExecution<String> execution = 
client().compute().executeAsync(Set.of(node(1)), List.of(), 
ExceptionJob.class.getName());
+    @Test
+    void 
testExceptionInColocatedTupleJobPropagatesToClientWithClassAndMessageAsync() {
+        var key = Tuple.create().set(COLUMN_KEY, 1);
 
-            CompletionException ex = assertThrows(
-                    CompletionException.class,
-                    () -> execution.resultAsync().join()
-            );
+        IgniteException cause = getExceptionInJobExecutionAsync(
+                client().compute().executeColocatedAsync(TABLE_NAME, key, 
List.of(), ExceptionJob.class.getName()
+        ));
 
-            assertThat(execution.statusAsync(), 
willBe(jobStatusWithState(FAILED)));
+        assertComputeExceptionWithClassAndMessage(cause);
+    }
 
-            cause = (IgniteException) ex.getCause();
-        } else {
-            IgniteException ex = assertThrows(
-                    IgniteException.class,
-                    () -> client().compute().execute(Set.of(node(1)), 
List.of(), ExceptionJob.class.getName()));
+    @Test
+    void 
testExceptionInColocatedTupleJobPropagatesToClientWithClassAndMessageSync() {
+        var key = Tuple.create().set(COLUMN_KEY, 1);
 
-            cause = (IgniteException) ex.getCause();
-        }
+        IgniteException cause = getExceptionInJobExecutionSync(
+                () -> client().compute().executeColocated(TABLE_NAME, key, 
List.of(), ExceptionJob.class.getName())
+        );
 
-        // TODO IGNITE-20858: Once user errors are handled properly, make sure 
the cause is ArithmeticException
-        assertThat(cause.getMessage(), containsString("math err"));
-        assertEquals(INTERNAL_ERR, cause.code());
+        assertComputeExceptionWithClassAndMessage(cause);
+    }
+
+    @Test
+    void 
testExceptionInColocatedTupleJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTraceAsync()
 {
+        // Second node has sendServerExceptionStackTraceToClient enabled.
+        var key = Tuple.create().set(COLUMN_KEY, 2);
+
+        IgniteException cause = getExceptionInJobExecutionAsync(
+                client().compute().executeColocatedAsync(TABLE_NAME, key, 
List.of(), ExceptionJob.class.getName())
+        );
+
+        assertComputeExceptionWithStackTrace(cause);
+    }
+
+    @Test
+    void 
testExceptionInColocatedTupleJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTraceSync()
 {
+        // Second node has sendServerExceptionStackTraceToClient enabled.
+        var key = Tuple.create().set(COLUMN_KEY, 2);
+
+        IgniteException cause = getExceptionInJobExecutionSync(
+                () -> client().compute().executeColocated(TABLE_NAME, key, 
List.of(), ExceptionJob.class.getName())
+        );
+
+        assertComputeExceptionWithStackTrace(cause);
+    }
+
+    @Test
+    void 
testExceptionInColocatedPojoJobPropagatesToClientWithClassAndMessageAsync() {
+        var key = new TestPojo(1);
+        Mapper<TestPojo> mapper = Mapper.of(TestPojo.class);
+
+        IgniteException cause = getExceptionInJobExecutionAsync(
+                client().compute().executeColocatedAsync(TABLE_NAME, key, 
mapper, List.of(), ExceptionJob.class.getName())
+        );
+
+        assertComputeExceptionWithClassAndMessage(cause);
+    }
+
+    @Test
+    void 
testExceptionInColocatedPojoJobPropagatesToClientWithClassAndMessageSync() {
+        var key = new TestPojo(1);
+        Mapper<TestPojo> mapper = Mapper.of(TestPojo.class);
+
+        IgniteException cause = getExceptionInJobExecutionSync(
+                () -> client().compute().executeColocated(TABLE_NAME, key, 
mapper, List.of(), ExceptionJob.class.getName())
+        );
+
+        assertComputeExceptionWithClassAndMessage(cause);
+    }
+
+    @Test
+    void 
testExceptionInColocatedPojoJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTraceAsync()
 {
+        // Second node has sendServerExceptionStackTraceToClient enabled.
+        var key = new TestPojo(2);
+        Mapper<TestPojo> mapper = Mapper.of(TestPojo.class);
+
+        IgniteException cause = getExceptionInJobExecutionAsync(
+                client().compute().executeColocatedAsync(TABLE_NAME, key, 
mapper, List.of(), ExceptionJob.class.getName())
+        );
+
+        assertComputeExceptionWithStackTrace(cause);
+    }
+
+    @Test
+    void 
testExceptionInColocatedPojoJobWithSendServerExceptionStackTraceToClientPropagatesToClientWithStackTraceSync()
 {
+        // Second node has sendServerExceptionStackTraceToClient enabled.
+        var key = new TestPojo(2);
+        Mapper<TestPojo> mapper = Mapper.of(TestPojo.class);
+
+        IgniteException cause = getExceptionInJobExecutionSync(
+                () -> client().compute().executeColocated(TABLE_NAME, key, 
mapper, List.of(), ExceptionJob.class.getName())
+        );
+
+        assertComputeExceptionWithStackTrace(cause);
+    }
+
+    private static IgniteException 
getExceptionInJobExecutionAsync(JobExecution<String> execution) {
+        CompletionException ex = assertThrows(
+                CompletionException.class,
+                () -> execution.resultAsync().join()
+        );
+
+        assertThat(execution.statusAsync(), 
willBe(jobStatusWithState(FAILED)));
+
+        return (IgniteException) ex.getCause();
+    }
+
+    private static IgniteException 
getExceptionInJobExecutionSync(Supplier<String> execution) {
+        IgniteException ex = assertThrows(IgniteException.class, 
execution::get);
+
+        return (IgniteException) ex.getCause();
+    }
+
+    private static void 
assertComputeExceptionWithClassAndMessage(IgniteException cause) {
+        String expectedMessage = "Job execution failed: 
java.lang.ArithmeticException: math err";
+        assertTraceableException(cause, ComputeException.class, 
COMPUTE_JOB_FAILED_ERR, expectedMessage);
+
+        assertNull(cause.getCause()); // No stack trace by default.
+    }
+
+    private static void assertComputeExceptionWithStackTrace(IgniteException 
cause) {
+        String expectedMessage = "Job execution failed: 
java.lang.ArithmeticException: math err";
+        assertTraceableException(cause, ComputeException.class, 
COMPUTE_JOB_FAILED_ERR, expectedMessage);
 
         assertNotNull(cause.getCause());
 
         assertThat(cause.getCause().getMessage(), containsString(
-                "at 
org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$"
-                        + 
"ExceptionJob.execute(ItThinClientComputeTest.java:"));
+                "Caused by: java.lang.ArithmeticException: math err" + 
System.lineSeparator()
+                        + "\tat 
org.apache.ignite.internal.runner.app.client.ItThinClientComputeTest$"
+                        + "ExceptionJob.execute(ItThinClientComputeTest.java:")
+        );
     }
 
     @ParameterizedTest
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java
 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java
index 0dba6e079b..15f3b15784 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerClassIntegrationTest.java
@@ -75,7 +75,8 @@ public abstract class ClusterPerClassIntegrationTest extends 
IgniteIntegrationTe
             + "    }\n"
             + "  },\n"
             + "  clientConnector: { port:{} },\n"
-            + "  rest.port: {}\n"
+            + "  rest.port: {},\n"
+            + "  compute.threadPoolSize: 1\n"
             + "}";
 
     /** Cluster nodes. */
diff --git 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
index e21b6f4ee7..d115619117 100644
--- 
a/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
+++ 
b/modules/runner/src/testFixtures/java/org/apache/ignite/internal/ClusterPerTestIntegrationTest.java
@@ -47,11 +47,9 @@ public abstract class ClusterPerTestIntegrationTest extends 
IgniteIntegrationTes
             + "    }\n"
             + "  },\n"
             + "  clientConnector: { port:{} },\n"
-            + "  rest.port: {}\n"
-            + "  compute: {\n"
-            + "    threadPoolSize: 1\n"
-            + "  }\n"
-            + "}\n";
+            + "  rest.port: {},\n"
+            + "  compute.threadPoolSize: 1\n"
+            + "}";
 
     /** Template for node bootstrap config with Scalecube settings for fast 
failure detection. */
     public static final String 
FAST_FAILURE_DETECTION_NODE_BOOTSTRAP_CFG_TEMPLATE = "{\n"

Reply via email to