This is an automated email from the ASF dual-hosted git repository.

mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new b973ad80b55 IGNITE-27686 Improve compute error message on marshaller 
mismatch  (#7626)
b973ad80b55 is described below

commit b973ad80b55effbde8527d20caae25529d246435
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Wed Feb 25 11:51:48 2026 +0300

    IGNITE-27686 Improve compute error message on marshaller mismatch  (#7626)
---
 .../ignite/marshalling/ByteArrayMarshaller.java    |  3 +-
 .../marshalling/ByteArrayMarshallerTest.java       |  6 +-
 .../client/proto/ClientComputeJobUnpacker.java     |  2 +-
 .../internal/compute/SharedComputeUtils.java       | 85 +++++++++++++++----
 .../proto/ClientComputeJobPackerUnpackerTest.java  |  4 +-
 .../apache/ignite/client/fakes/FakeCompute.java    |  4 +-
 .../ignite/internal/compute/ComputeUtils.java      |  2 +-
 .../ignite/internal/compute/IgniteComputeImpl.java |  2 +-
 .../compute/ResultUnmarshallingJobExecution.java   |  4 +-
 .../internal/compute/ComputeComponentImplTest.java |  6 +-
 .../compute/executor/ComputeExecutorTest.java      |  2 +-
 ...tThinClientComputeTypeCheckMarshallingTest.java | 98 +++++++++++++++++-----
 12 files changed, 168 insertions(+), 50 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/marshalling/ByteArrayMarshaller.java
 
b/modules/api/src/main/java/org/apache/ignite/marshalling/ByteArrayMarshaller.java
index a046737ce8f..08085254793 100644
--- 
a/modules/api/src/main/java/org/apache/ignite/marshalling/ByteArrayMarshaller.java
+++ 
b/modules/api/src/main/java/org/apache/ignite/marshalling/ByteArrayMarshaller.java
@@ -55,7 +55,8 @@ public interface ByteArrayMarshaller<T> extends Marshaller<T, 
byte[]> {
             }
         }
 
-        throw new UnsupportedObjectTypeMarshallingException(object.getClass());
+        String msg = object.getClass() + " must be Serializable to be used 
with ByteArrayMarshaller.";
+        throw new UnsupportedObjectTypeMarshallingException(msg);
     }
 
     @Override
diff --git 
a/modules/api/src/test/java/org/apache/ignite/marshalling/ByteArrayMarshallerTest.java
 
b/modules/api/src/test/java/org/apache/ignite/marshalling/ByteArrayMarshallerTest.java
index 5a6a3d3a55c..24ed94ee79d 100644
--- 
a/modules/api/src/test/java/org/apache/ignite/marshalling/ByteArrayMarshallerTest.java
+++ 
b/modules/api/src/test/java/org/apache/ignite/marshalling/ByteArrayMarshallerTest.java
@@ -17,10 +17,10 @@
 
 package org.apache.ignite.marshalling;
 
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.math.BigDecimal;
 import java.sql.Timestamp;
@@ -148,7 +148,9 @@ class ByteArrayMarshallerTest {
 
         assertThrows(
                 UnsupportedObjectTypeMarshallingException.class,
-                () -> notSerializableMarshaller.marshal(notSerializable)
+                () -> notSerializableMarshaller.marshal(notSerializable),
+                
"org.apache.ignite.marshalling.ByteArrayMarshallerTest$NotSerializable"
+                        + " must be Serializable to be used with 
ByteArrayMarshaller."
         );
     }
 
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java
index c049baf0c0a..5722f3867ff 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java
@@ -49,7 +49,7 @@ public final class ClientComputeJobUnpacker {
     ) {
         ComputeJobDataHolder holder = 
unpackJobArgumentWithoutMarshaller(unpacker, false);
 
-        return SharedComputeUtils.unmarshalArgOrResult(holder, marshaller, 
resultClass);
+        return SharedComputeUtils.unmarshalResult(holder, marshaller, 
resultClass);
     }
 
     /** Unpacks compute job argument without marshaller. */
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/compute/SharedComputeUtils.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/compute/SharedComputeUtils.java
index 8fde6c25bc0..83f2342b256 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/compute/SharedComputeUtils.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/compute/SharedComputeUtils.java
@@ -138,37 +138,92 @@ public class SharedComputeUtils {
     }
 
     /**
-     * Unmarshals the job argument or result.
+     * Unmarshals the job argument.
      *
      * @param holder Data holder.
      * @param marshaller Optional marshaller.
-     * @param resultClass Optional result class.
+     * @param pojoType Optional pojo type. Required if holder contains POJO.
      * @param <T> Type of the object.
      * @return Unmarshalled object.
      */
-    public static <T> @Nullable T unmarshalArgOrResult(
+    public static <T> @Nullable T unmarshalArg(
             @Nullable ComputeJobDataHolder holder,
             @Nullable Marshaller<?, byte[]> marshaller,
-            @Nullable Class<?> resultClass
+            @Nullable Class<?> pojoType
     ) {
-        return unmarshalArgOrResult(holder, marshaller, resultClass, 
Thread.currentThread().getContextClassLoader());
+        return unmarshalArg(holder, marshaller, pojoType, 
Thread.currentThread().getContextClassLoader());
     }
 
     /**
-     * Unmarshals the job argument or result.
+     * Unmarshals the job argument.
      *
      * @param holder Data holder.
      * @param marshaller Optional marshaller.
-     * @param resultClass Optional result class.
+     * @param pojoType Optional pojo type. Required if holder contains POJO.
      * @param classLoader Class loader to set before unmarshalling.
      * @param <T> Type of the object.
      * @return Unmarshalled object.
      */
-    public static <T> @Nullable T unmarshalArgOrResult(
+    public static <T> @Nullable T unmarshalArg(
             @Nullable ComputeJobDataHolder holder,
             @Nullable Marshaller<?, byte[]> marshaller,
-            @Nullable Class<?> resultClass,
+            @Nullable Class<?> pojoType,
             ClassLoader classLoader
+    ) {
+        return unmarshalArgOrResult(
+                holder,
+                marshaller,
+                pojoType,
+                classLoader,
+                "JobDescriptor.argumentMarshaller is defined, but the 
ComputeJob.inputMarshaller is not defined.",
+                "ComputeJob.inputMarshaller is defined, but the 
JobDescriptor.argumentMarshaller is not defined."
+        );
+    }
+
+    /**
+     * Unmarshals the job result.
+     *
+     * @param holder Data holder.
+     * @param marshaller Optional marshaller.
+     * @param pojoType Optional pojo type. Required if holder contains POJO.
+     * @param <T> Type of the object.
+     * @return Unmarshalled object.
+     */
+    public static <T> @Nullable T unmarshalResult(
+            @Nullable ComputeJobDataHolder holder,
+            @Nullable Marshaller<?, byte[]> marshaller,
+            @Nullable Class<?> pojoType
+    ) {
+        // No need to pass classloader when unmarshalling result because only 
the argument is unmarshalled in the isolated job classloader.
+        return unmarshalArgOrResult(
+                holder,
+                marshaller,
+                pojoType,
+                Thread.currentThread().getContextClassLoader(),
+                "ComputeJob.resultMarshaller is defined, but the 
JobDescriptor.resultMarshaller is not defined.",
+                "JobDescriptor.resultMarshaller is defined, but the 
ComputeJob.resultMarshaller is not defined."
+        );
+    }
+
+    /**
+     * Unmarshals the job argument or result.
+     *
+     * @param holder Data holder.
+     * @param marshaller Optional marshaller.
+     * @param pojoType Optional pojo type. Required if holder contains POJO.
+     * @param classLoader Class loader to set before unmarshalling.
+     * @param missingMarshallerErrorMessage Message to throw if marshaller is 
not defined and holder contains marshalled data.
+     * @param unexpectedMarshallerErrorMessage Message to throw if marshaller 
is defined but the holder doesn't contain marshalled data.
+     * @param <T> Type of the object.
+     * @return Unmarshalled object.
+     */
+    private static <T> @Nullable T unmarshalArgOrResult(
+            @Nullable ComputeJobDataHolder holder,
+            @Nullable Marshaller<?, byte[]> marshaller,
+            @Nullable Class<?> pojoType,
+            ClassLoader classLoader,
+            String missingMarshallerErrorMessage,
+            String unexpectedMarshallerErrorMessage
     ) {
         if (holder == null || holder.data() == null) {
             return null;
@@ -178,9 +233,9 @@ public class SharedComputeUtils {
         if (type != MARSHALLED_CUSTOM && marshaller != null) {
             throw new ComputeException(
                     MARSHALLING_TYPE_MISMATCH_ERR,
-                    "Marshaller is defined on the server, but the argument was 
not marshalled on the client. "
+                    unexpectedMarshallerErrorMessage + " "
                             + "If you want to use default marshalling 
strategy, "
-                            + "then you should not define your marshaller in 
the job. "
+                            + "then you should not define your marshaller only 
in one place. "
                             + "If you would like to use your own marshaller, 
then double-check "
                             + "that both of them are defined in the client and 
in the server."
             );
@@ -197,7 +252,7 @@ public class SharedComputeUtils {
                 return (T) TupleWithSchemaMarshalling.unmarshal(holder.data());
 
             case POJO:
-                if (resultClass == null) {
+                if (pojoType == null) {
                     throw new ComputeException(
                             MARSHALLING_TYPE_MISMATCH_ERR,
                             "JobDescriptor.resultClass is not defined, but the 
job result is packed as a POJO");
@@ -205,13 +260,13 @@ public class SharedComputeUtils {
 
                 Tuple tuple = 
TupleWithSchemaMarshalling.unmarshal(holder.data());
 
-                return resultClass == Tuple.class
+                return pojoType == Tuple.class
                         ? (T) tuple
-                        : (T) unmarshalPojo(resultClass, tuple);
+                        : (T) unmarshalPojo(pojoType, tuple);
 
             case MARSHALLED_CUSTOM:
                 if (marshaller == null) {
-                    throw new ComputeException(MARSHALLING_TYPE_MISMATCH_ERR, 
"Marshaller should be defined on the client");
+                    throw new ComputeException(MARSHALLING_TYPE_MISMATCH_ERR, 
missingMarshallerErrorMessage);
                 }
                 return unmarshalData(marshaller, classLoader, holder.data());
 
diff --git 
a/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientComputeJobPackerUnpackerTest.java
 
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientComputeJobPackerUnpackerTest.java
index 893608f8602..eedca7b506e 100644
--- 
a/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientComputeJobPackerUnpackerTest.java
+++ 
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientComputeJobPackerUnpackerTest.java
@@ -209,7 +209,7 @@ class ClientComputeJobPackerUnpackerTest {
             assertThrows(
                     ComputeException.class,
                     () -> unpackJobResult(messageUnpacker, null, null),
-                    "Marshaller should be defined on the client"
+                    "ComputeJob.resultMarshaller is defined, but the 
JobDescriptor.resultMarshaller is not defined."
             );
         }
     }
@@ -230,7 +230,7 @@ class ClientComputeJobPackerUnpackerTest {
             assertThrows(
                     ComputeException.class,
                     () -> unpackJobResult(messageUnpacker, new 
TestStringMarshaller(), null),
-                    "Marshaller is defined on the server, but the argument was 
not marshalled on the client"
+                    "JobDescriptor.resultMarshaller is defined, but the 
ComputeJob.resultMarshaller is not defined."
             );
         }
     }
diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index fa5060dccb3..cddd07268cc 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -137,7 +137,7 @@ public class FakeCompute implements IgniteComputeInternal {
             ComputeJob<Object, Object> job = 
ComputeUtils.instantiateJob(jobClass);
             CompletableFuture<Object> jobFut = job.executeAsync(
                     new JobExecutionContextImpl(ignite, new AtomicBoolean(), 
jobClassLoader, null),
-                    
SharedComputeUtils.unmarshalArgOrResult(executionContext.arg(), null, null));
+                    SharedComputeUtils.unmarshalArg(executionContext.arg(), 
null, null));
 
             return jobExecution(jobFut != null ? jobFut : 
nullCompletedFuture());
         }
@@ -235,7 +235,7 @@ public class FakeCompute implements IgniteComputeInternal {
             @Override
             public CompletableFuture<R> resultAsync() {
                 return internalExecution.resultAsync()
-                        .thenApply(r -> 
SharedComputeUtils.unmarshalArgOrResult(
+                        .thenApply(r -> SharedComputeUtils.unmarshalResult(
                                 r, descriptor.resultMarshaller(), 
descriptor.resultClass()));
             }
 
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
index bc2c81ac68f..3b16c59dd9f 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
@@ -372,7 +372,7 @@ public class ComputeUtils {
         }
 
         if (input instanceof ComputeJobDataHolder) {
-            return 
SharedComputeUtils.unmarshalArgOrResult((ComputeJobDataHolder) input, 
marshaller, pojoType, classLoader);
+            return SharedComputeUtils.unmarshalArg((ComputeJobDataHolder) 
input, marshaller, pojoType, classLoader);
         }
 
         if (marshaller == null) {
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 8c48e308eac..533f0b927a4 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -705,7 +705,7 @@ public class IgniteComputeImpl implements 
IgniteComputeInternal, StreamerReceive
                         ExceptionUtils.sneakyThrow(err);
                     }
 
-                    byte[] resBytes = 
SharedComputeUtils.unmarshalArgOrResult(res, null, null);
+                    byte[] resBytes = SharedComputeUtils.unmarshalResult(res, 
null, null);
 
                     return new IgniteBiTuple<>(resBytes, 
res.observableTimestamp());
                 });
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultUnmarshallingJobExecution.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultUnmarshallingJobExecution.java
index 3c59f026052..9159f2620f1 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultUnmarshallingJobExecution.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultUnmarshallingJobExecution.java
@@ -54,7 +54,7 @@ public class ResultUnmarshallingJobExecution<R> implements 
JobExecution<R> {
                 r -> {
                     updateTimestamp(r);
 
-                    return SharedComputeUtils.unmarshalArgOrResult(r, 
resultUnmarshaller, resultClass);
+                    return SharedComputeUtils.unmarshalResult(r, 
resultUnmarshaller, resultClass);
                 });
     }
 
@@ -83,7 +83,7 @@ public class ResultUnmarshallingJobExecution<R> implements 
JobExecution<R> {
             updateTimestamp(r);
 
             return new IgniteBiTuple<>(
-                    SharedComputeUtils.unmarshalArgOrResult(r, 
resultUnmarshaller, resultClass),
+                    SharedComputeUtils.unmarshalResult(r, resultUnmarshaller, 
resultClass),
                     r.observableTimestamp());
         });
     }
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index 5a4a062b7a6..1db8c5dd93a 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -427,7 +427,7 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
         ExecuteRequest capturedRequest = 
invokeAndCaptureRequest(ExecuteRequest.class);
 
         assertThat(capturedRequest.jobClassName(), is(jobClassName));
-        
assertThat(SharedComputeUtils.unmarshalArgOrResult(capturedRequest.input(), 
null, null), is(equalTo(arg)));
+        assertThat(SharedComputeUtils.unmarshalArg(capturedRequest.input(), 
null, null), is(equalTo(arg)));
     }
 
     private void assertThatJobResultRequestWasSent(UUID jobId) {
@@ -495,7 +495,7 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
                 .build();
         JobResultResponse jobResultResponse = 
sendRequestAndCaptureResponse(jobResultRequest, testNode, 456L);
 
-        
assertThat(SharedComputeUtils.unmarshalArgOrResult(jobResultResponse.result(), 
null, null), is("jobResponse"));
+        
assertThat(SharedComputeUtils.unmarshalResult(jobResultResponse.result(), null, 
null), is("jobResponse"));
         assertThat(jobResultResponse.throwable(), is(nullValue()));
     }
 
@@ -741,7 +741,7 @@ class ComputeComponentImplTest extends 
BaseIgniteAbstractTest {
     }
 
     private static CompletableFuture<String> 
unwrapResult(JobExecution<ComputeJobDataHolder> execution) {
-        return execution.resultAsync().thenApply(r -> 
SharedComputeUtils.unmarshalArgOrResult(r, null, null));
+        return execution.resultAsync().thenApply(r -> 
SharedComputeUtils.unmarshalResult(r, null, null));
     }
 
     private static class SimpleJob implements ComputeJob<String, String> {
diff --git 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
index a44f9819fa9..cf735092688 100644
--- 
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
+++ 
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
@@ -236,7 +236,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
 
         await().until(execution::state, jobStateWithStatus(COMPLETED));
 
-        assertThat(execution.resultAsync().thenApply(h -> 
SharedComputeUtils.unmarshalArgOrResult(h, null, null)), willBe(1));
+        assertThat(execution.resultAsync().thenApply(h -> 
SharedComputeUtils.unmarshalResult(h, null, null)), willBe(1));
         assertThat(JobSuccess.runTimes.get(), is(1));
     }
 
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTypeCheckMarshallingTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTypeCheckMarshallingTest.java
index 9cb5feb8bac..d7c815c8572 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTypeCheckMarshallingTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTypeCheckMarshallingTest.java
@@ -20,18 +20,17 @@ package org.apache.ignite.internal.runner.app.client;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static org.apache.ignite.compute.JobStatus.COMPLETED;
 import static org.apache.ignite.compute.JobStatus.FAILED;
-import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.internal.IgniteExceptionTestUtils.hasMessage;
+import static 
org.apache.ignite.internal.IgniteExceptionTestUtils.traceableException;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.JobStateMatcher.jobStateWithStatus;
 import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import org.apache.ignite.compute.ComputeException;
 import org.apache.ignite.compute.ComputeJob;
 import org.apache.ignite.compute.JobDescriptor;
@@ -44,17 +43,17 @@ import org.apache.ignite.lang.ErrorGroups.Compute;
 import org.apache.ignite.marshalling.ByteArrayMarshaller;
 import org.apache.ignite.marshalling.Marshaller;
 import org.apache.ignite.marshalling.UnsupportedObjectTypeMarshallingException;
+import org.hamcrest.Matcher;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.Test;
 
 /**
  * Test for exceptions that are thrown when marshallers are defined in a wrong 
way or throw an exception.
  */
-@SuppressWarnings("resource")
 public class ItThinClientComputeTypeCheckMarshallingTest extends 
ItAbstractThinClientTest {
     @Test
     void argumentMarshallerDefinedOnlyInJob() {
-        // When submit job with custom marshaller that is defined in job but
+        // When submit job with custom argument marshaller that is defined in 
job but
         // client JobDescriptor does not declare the argument marshaller.
         JobExecution<String> result = submit(
                 JobTarget.node(node(1)),
@@ -63,12 +62,15 @@ public class ItThinClientComputeTypeCheckMarshallingTest 
extends ItAbstractThinC
         );
 
         await().until(result::stateAsync, willBe(jobStateWithStatus(FAILED)));
-        assertResultFailsWithErr(Compute.MARSHALLING_TYPE_MISMATCH_ERR, 
result);
+        assertResultFailsWithErr(
+                result, Compute.MARSHALLING_TYPE_MISMATCH_ERR,
+                "ComputeJob.inputMarshaller is defined, but the 
JobDescriptor.argumentMarshaller is not defined."
+        );
     }
 
     @Test
     void resultMarshallerDefinedOnlyInJob() {
-        // When submit job with custom marshaller that is defined in job but
+        // When submit job with custom result marshaller that is defined in 
job but
         // client JobDescriptor does not declare the result marshaller.
         JobExecution<String> result = submit(
                 JobTarget.node(node(1)),
@@ -77,7 +79,47 @@ public class ItThinClientComputeTypeCheckMarshallingTest 
extends ItAbstractThinC
         );
 
         await().until(result::stateAsync, 
willBe(jobStateWithStatus(COMPLETED)));
-        assertThat(result.resultAsync(), willThrow(ComputeException.class));
+        assertResultFailsWithErr(
+                result, Compute.MARSHALLING_TYPE_MISMATCH_ERR,
+                "ComputeJob.resultMarshaller is defined, but the 
JobDescriptor.resultMarshaller is not defined."
+        );
+    }
+
+    @Test
+    void argumentMarshallerDefinedOnlyInDescriptor() {
+        // When submit job with custom argument marshaller that is defined in 
client JobDescriptor but
+        // job class does not declare the input marshaller.
+        JobExecution<String> result = submit(
+                JobTarget.node(node(1)),
+                
JobDescriptor.builder(ResultMarshallingJob.class).argumentMarshaller(ByteArrayMarshaller.create()).build(),
+                "Input"
+        );
+
+        await().until(result::stateAsync, willBe(jobStateWithStatus(FAILED)));
+        assertResultFailsWithErr(
+                result, Compute.MARSHALLING_TYPE_MISMATCH_ERR,
+                "JobDescriptor.argumentMarshaller is defined, but the 
ComputeJob.inputMarshaller is not defined."
+        );
+    }
+
+    @Test
+    void resultMarshallerDefinedOnlyInDescriptor() {
+        // When submit job with custom result marshaller that is defined in 
client JobDescriptor but
+        // job class does not declare the result marshaller.
+        JobExecution<String> result = submit(
+                JobTarget.node(node(1)),
+                JobDescriptor.builder(ArgMarshallingJob.class)
+                        .argumentMarshaller(ByteArrayMarshaller.create())
+                        .resultMarshaller(ByteArrayMarshaller.create())
+                        .build(),
+                "Input"
+        );
+
+        await().until(result::stateAsync, 
willBe(jobStateWithStatus(COMPLETED)));
+        assertResultFailsWithErr(
+                result, Compute.MARSHALLING_TYPE_MISMATCH_ERR,
+                "JobDescriptor.resultMarshaller is defined, but the 
ComputeJob.resultMarshaller is not defined."
+        );
     }
 
     @Test
@@ -87,14 +129,18 @@ public class ItThinClientComputeTypeCheckMarshallingTest 
extends ItAbstractThinC
         JobExecution<Integer> result = submit(
                 JobTarget.node(node(1)),
                 // The descriptor does not match actual job arguments.
-                JobDescriptor.<Integer, 
Integer>builder(ArgumentTypeCheckingmarshallingJob.class.getName())
+                JobDescriptor.<Integer, 
Integer>builder(ArgumentTypeCheckingMarshallingJob.class.getName())
                         .argumentMarshaller(new IntegerMarshaller())
                         .build(),
                 1
         );
 
         await().until(result::stateAsync, willBe(jobStateWithStatus(FAILED)));
-        assertResultFailsWithErr(Compute.MARSHALLING_TYPE_MISMATCH_ERR, 
result);
+        assertResultFailsWithErr(
+                result, Compute.MARSHALLING_TYPE_MISMATCH_ERR,
+                "Exception in user-defined marshaller",
+                hasMessage(containsString("java.lang.RuntimeException: User 
defined error."))
+        );
     }
 
     @Test
@@ -112,11 +158,15 @@ public class ItThinClientComputeTypeCheckMarshallingTest 
extends ItAbstractThinC
 
         await().until(result::stateAsync, 
willBe(jobStateWithStatus(COMPLETED)));
 
-        // The job has completed successfully, but result was not unmarshaled
-        assertThrowsWithCause(() -> result.resultAsync().join(), 
ClassCastException.class);
+        // The job has completed successfully, but result was not unmarshalled 
correctly
+        assertResultFailsWithErr(
+                result, Compute.MARSHALLING_TYPE_MISMATCH_ERR,
+                "Exception in user-defined marshaller",
+                instanceOf(ClassCastException.class)
+        );
     }
 
-    static class ArgumentTypeCheckingmarshallingJob implements 
ComputeJob<String, String> {
+    static class ArgumentTypeCheckingMarshallingJob implements 
ComputeJob<String, String> {
         @Override
         public CompletableFuture<String> executeAsync(JobExecutionContext 
context, @Nullable String arg) {
             return completedFuture(arg);
@@ -136,7 +186,7 @@ public class ItThinClientComputeTypeCheckMarshallingTest 
extends ItAbstractThinC
                         return (String) obj;
                     }
 
-                    throw new 
UnsupportedObjectTypeMarshallingException(obj.getClass());
+                    throw new RuntimeException("User defined error.");
                 }
             };
         }
@@ -155,9 +205,19 @@ public class ItThinClientComputeTypeCheckMarshallingTest 
extends ItAbstractThinC
         }
     }
 
-    private static void assertResultFailsWithErr(int errCode, JobExecution<?> 
result) {
-        var ex = assertThrows(CompletionException.class, () -> 
result.resultAsync().join());
-        assertThat(ex.getCause(), instanceOf(ComputeException.class));
-        assertThat(((ComputeException) ex.getCause()).code(), 
equalTo(errCode));
+    private static void assertResultFailsWithErr(JobExecution<?> result, int 
errCode, String expectedMessage) {
+        assertResultFailsWithErr(result, errCode, expectedMessage, null);
+    }
+
+    private static void assertResultFailsWithErr(
+            JobExecution<?> result,
+            int errCode,
+            String expectedMessage,
+            @Nullable Matcher<? extends Throwable> causeMatcher
+    ) {
+        assertThat(
+                result.resultAsync(),
+                willThrow(traceableException(ComputeException.class, errCode, 
expectedMessage).withCause(causeMatcher))
+        );
     }
 }

Reply via email to