This is an automated email from the ASF dual-hosted git repository.
mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new b973ad80b55 IGNITE-27686 Improve compute error message on marshaller
mismatch (#7626)
b973ad80b55 is described below
commit b973ad80b55effbde8527d20caae25529d246435
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Wed Feb 25 11:51:48 2026 +0300
IGNITE-27686 Improve compute error message on marshaller mismatch (#7626)
---
.../ignite/marshalling/ByteArrayMarshaller.java | 3 +-
.../marshalling/ByteArrayMarshallerTest.java | 6 +-
.../client/proto/ClientComputeJobUnpacker.java | 2 +-
.../internal/compute/SharedComputeUtils.java | 85 +++++++++++++++----
.../proto/ClientComputeJobPackerUnpackerTest.java | 4 +-
.../apache/ignite/client/fakes/FakeCompute.java | 4 +-
.../ignite/internal/compute/ComputeUtils.java | 2 +-
.../ignite/internal/compute/IgniteComputeImpl.java | 2 +-
.../compute/ResultUnmarshallingJobExecution.java | 4 +-
.../internal/compute/ComputeComponentImplTest.java | 6 +-
.../compute/executor/ComputeExecutorTest.java | 2 +-
...tThinClientComputeTypeCheckMarshallingTest.java | 98 +++++++++++++++++-----
12 files changed, 168 insertions(+), 50 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/marshalling/ByteArrayMarshaller.java
b/modules/api/src/main/java/org/apache/ignite/marshalling/ByteArrayMarshaller.java
index a046737ce8f..08085254793 100644
---
a/modules/api/src/main/java/org/apache/ignite/marshalling/ByteArrayMarshaller.java
+++
b/modules/api/src/main/java/org/apache/ignite/marshalling/ByteArrayMarshaller.java
@@ -55,7 +55,8 @@ public interface ByteArrayMarshaller<T> extends Marshaller<T,
byte[]> {
}
}
- throw new UnsupportedObjectTypeMarshallingException(object.getClass());
+ String msg = object.getClass() + " must be Serializable to be used
with ByteArrayMarshaller.";
+ throw new UnsupportedObjectTypeMarshallingException(msg);
}
@Override
diff --git
a/modules/api/src/test/java/org/apache/ignite/marshalling/ByteArrayMarshallerTest.java
b/modules/api/src/test/java/org/apache/ignite/marshalling/ByteArrayMarshallerTest.java
index 5a6a3d3a55c..24ed94ee79d 100644
---
a/modules/api/src/test/java/org/apache/ignite/marshalling/ByteArrayMarshallerTest.java
+++
b/modules/api/src/test/java/org/apache/ignite/marshalling/ByteArrayMarshallerTest.java
@@ -17,10 +17,10 @@
package org.apache.ignite.marshalling;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import java.math.BigDecimal;
import java.sql.Timestamp;
@@ -148,7 +148,9 @@ class ByteArrayMarshallerTest {
assertThrows(
UnsupportedObjectTypeMarshallingException.class,
- () -> notSerializableMarshaller.marshal(notSerializable)
+ () -> notSerializableMarshaller.marshal(notSerializable),
+
"org.apache.ignite.marshalling.ByteArrayMarshallerTest$NotSerializable"
+ + " must be Serializable to be used with
ByteArrayMarshaller."
);
}
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java
index c049baf0c0a..5722f3867ff 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientComputeJobUnpacker.java
@@ -49,7 +49,7 @@ public final class ClientComputeJobUnpacker {
) {
ComputeJobDataHolder holder =
unpackJobArgumentWithoutMarshaller(unpacker, false);
- return SharedComputeUtils.unmarshalArgOrResult(holder, marshaller,
resultClass);
+ return SharedComputeUtils.unmarshalResult(holder, marshaller,
resultClass);
}
/** Unpacks compute job argument without marshaller. */
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/compute/SharedComputeUtils.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/compute/SharedComputeUtils.java
index 8fde6c25bc0..83f2342b256 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/compute/SharedComputeUtils.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/compute/SharedComputeUtils.java
@@ -138,37 +138,92 @@ public class SharedComputeUtils {
}
/**
- * Unmarshals the job argument or result.
+ * Unmarshals the job argument.
*
* @param holder Data holder.
* @param marshaller Optional marshaller.
- * @param resultClass Optional result class.
+ * @param pojoType Optional pojo type. Required if holder contains POJO.
* @param <T> Type of the object.
* @return Unmarshalled object.
*/
- public static <T> @Nullable T unmarshalArgOrResult(
+ public static <T> @Nullable T unmarshalArg(
@Nullable ComputeJobDataHolder holder,
@Nullable Marshaller<?, byte[]> marshaller,
- @Nullable Class<?> resultClass
+ @Nullable Class<?> pojoType
) {
- return unmarshalArgOrResult(holder, marshaller, resultClass,
Thread.currentThread().getContextClassLoader());
+ return unmarshalArg(holder, marshaller, pojoType,
Thread.currentThread().getContextClassLoader());
}
/**
- * Unmarshals the job argument or result.
+ * Unmarshals the job argument.
*
* @param holder Data holder.
* @param marshaller Optional marshaller.
- * @param resultClass Optional result class.
+ * @param pojoType Optional pojo type. Required if holder contains POJO.
* @param classLoader Class loader to set before unmarshalling.
* @param <T> Type of the object.
* @return Unmarshalled object.
*/
- public static <T> @Nullable T unmarshalArgOrResult(
+ public static <T> @Nullable T unmarshalArg(
@Nullable ComputeJobDataHolder holder,
@Nullable Marshaller<?, byte[]> marshaller,
- @Nullable Class<?> resultClass,
+ @Nullable Class<?> pojoType,
ClassLoader classLoader
+ ) {
+ return unmarshalArgOrResult(
+ holder,
+ marshaller,
+ pojoType,
+ classLoader,
+ "JobDescriptor.argumentMarshaller is defined, but the
ComputeJob.inputMarshaller is not defined.",
+ "ComputeJob.inputMarshaller is defined, but the
JobDescriptor.argumentMarshaller is not defined."
+ );
+ }
+
+ /**
+ * Unmarshals the job result.
+ *
+ * @param holder Data holder.
+ * @param marshaller Optional marshaller.
+ * @param pojoType Optional pojo type. Required if holder contains POJO.
+ * @param <T> Type of the object.
+ * @return Unmarshalled object.
+ */
+ public static <T> @Nullable T unmarshalResult(
+ @Nullable ComputeJobDataHolder holder,
+ @Nullable Marshaller<?, byte[]> marshaller,
+ @Nullable Class<?> pojoType
+ ) {
+ // No need to pass classloader when unmarshalling result because only
the argument is unmarshalled in the isolated job classloader.
+ return unmarshalArgOrResult(
+ holder,
+ marshaller,
+ pojoType,
+ Thread.currentThread().getContextClassLoader(),
+ "ComputeJob.resultMarshaller is defined, but the
JobDescriptor.resultMarshaller is not defined.",
+ "JobDescriptor.resultMarshaller is defined, but the
ComputeJob.resultMarshaller is not defined."
+ );
+ }
+
+ /**
+ * Unmarshals the job argument or result.
+ *
+ * @param holder Data holder.
+ * @param marshaller Optional marshaller.
+ * @param pojoType Optional pojo type. Required if holder contains POJO.
+ * @param classLoader Class loader to set before unmarshalling.
+ * @param missingMarshallerErrorMessage Message to throw if marshaller is
not defined and holder contains marshalled data.
+ * @param unexpectedMarshallerErrorMessage Message to throw if marshaller
is defined but the holder doesn't contain marshalled data.
+ * @param <T> Type of the object.
+ * @return Unmarshalled object.
+ */
+ private static <T> @Nullable T unmarshalArgOrResult(
+ @Nullable ComputeJobDataHolder holder,
+ @Nullable Marshaller<?, byte[]> marshaller,
+ @Nullable Class<?> pojoType,
+ ClassLoader classLoader,
+ String missingMarshallerErrorMessage,
+ String unexpectedMarshallerErrorMessage
) {
if (holder == null || holder.data() == null) {
return null;
@@ -178,9 +233,9 @@ public class SharedComputeUtils {
if (type != MARSHALLED_CUSTOM && marshaller != null) {
throw new ComputeException(
MARSHALLING_TYPE_MISMATCH_ERR,
- "Marshaller is defined on the server, but the argument was
not marshalled on the client. "
+ unexpectedMarshallerErrorMessage + " "
+ "If you want to use default marshalling
strategy, "
- + "then you should not define your marshaller in
the job. "
+ + "then you should not define your marshaller only
in one place. "
+ "If you would like to use your own marshaller,
then double-check "
+ "that both of them are defined in the client and
in the server."
);
@@ -197,7 +252,7 @@ public class SharedComputeUtils {
return (T) TupleWithSchemaMarshalling.unmarshal(holder.data());
case POJO:
- if (resultClass == null) {
+ if (pojoType == null) {
throw new ComputeException(
MARSHALLING_TYPE_MISMATCH_ERR,
"JobDescriptor.resultClass is not defined, but the
job result is packed as a POJO");
@@ -205,13 +260,13 @@ public class SharedComputeUtils {
Tuple tuple =
TupleWithSchemaMarshalling.unmarshal(holder.data());
- return resultClass == Tuple.class
+ return pojoType == Tuple.class
? (T) tuple
- : (T) unmarshalPojo(resultClass, tuple);
+ : (T) unmarshalPojo(pojoType, tuple);
case MARSHALLED_CUSTOM:
if (marshaller == null) {
- throw new ComputeException(MARSHALLING_TYPE_MISMATCH_ERR,
"Marshaller should be defined on the client");
+ throw new ComputeException(MARSHALLING_TYPE_MISMATCH_ERR,
missingMarshallerErrorMessage);
}
return unmarshalData(marshaller, classLoader, holder.data());
diff --git
a/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientComputeJobPackerUnpackerTest.java
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientComputeJobPackerUnpackerTest.java
index 893608f8602..eedca7b506e 100644
---
a/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientComputeJobPackerUnpackerTest.java
+++
b/modules/client-common/src/test/java/org/apache/ignite/internal/client/proto/ClientComputeJobPackerUnpackerTest.java
@@ -209,7 +209,7 @@ class ClientComputeJobPackerUnpackerTest {
assertThrows(
ComputeException.class,
() -> unpackJobResult(messageUnpacker, null, null),
- "Marshaller should be defined on the client"
+ "ComputeJob.resultMarshaller is defined, but the
JobDescriptor.resultMarshaller is not defined."
);
}
}
@@ -230,7 +230,7 @@ class ClientComputeJobPackerUnpackerTest {
assertThrows(
ComputeException.class,
() -> unpackJobResult(messageUnpacker, new
TestStringMarshaller(), null),
- "Marshaller is defined on the server, but the argument was
not marshalled on the client"
+ "JobDescriptor.resultMarshaller is defined, but the
ComputeJob.resultMarshaller is not defined."
);
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
index fa5060dccb3..cddd07268cc 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeCompute.java
@@ -137,7 +137,7 @@ public class FakeCompute implements IgniteComputeInternal {
ComputeJob<Object, Object> job =
ComputeUtils.instantiateJob(jobClass);
CompletableFuture<Object> jobFut = job.executeAsync(
new JobExecutionContextImpl(ignite, new AtomicBoolean(),
jobClassLoader, null),
-
SharedComputeUtils.unmarshalArgOrResult(executionContext.arg(), null, null));
+ SharedComputeUtils.unmarshalArg(executionContext.arg(),
null, null));
return jobExecution(jobFut != null ? jobFut :
nullCompletedFuture());
}
@@ -235,7 +235,7 @@ public class FakeCompute implements IgniteComputeInternal {
@Override
public CompletableFuture<R> resultAsync() {
return internalExecution.resultAsync()
- .thenApply(r ->
SharedComputeUtils.unmarshalArgOrResult(
+ .thenApply(r -> SharedComputeUtils.unmarshalResult(
r, descriptor.resultMarshaller(),
descriptor.resultClass()));
}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
index bc2c81ac68f..3b16c59dd9f 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
@@ -372,7 +372,7 @@ public class ComputeUtils {
}
if (input instanceof ComputeJobDataHolder) {
- return
SharedComputeUtils.unmarshalArgOrResult((ComputeJobDataHolder) input,
marshaller, pojoType, classLoader);
+ return SharedComputeUtils.unmarshalArg((ComputeJobDataHolder)
input, marshaller, pojoType, classLoader);
}
if (marshaller == null) {
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
index 8c48e308eac..533f0b927a4 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/IgniteComputeImpl.java
@@ -705,7 +705,7 @@ public class IgniteComputeImpl implements
IgniteComputeInternal, StreamerReceive
ExceptionUtils.sneakyThrow(err);
}
- byte[] resBytes =
SharedComputeUtils.unmarshalArgOrResult(res, null, null);
+ byte[] resBytes = SharedComputeUtils.unmarshalResult(res,
null, null);
return new IgniteBiTuple<>(resBytes,
res.observableTimestamp());
});
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultUnmarshallingJobExecution.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultUnmarshallingJobExecution.java
index 3c59f026052..9159f2620f1 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultUnmarshallingJobExecution.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ResultUnmarshallingJobExecution.java
@@ -54,7 +54,7 @@ public class ResultUnmarshallingJobExecution<R> implements
JobExecution<R> {
r -> {
updateTimestamp(r);
- return SharedComputeUtils.unmarshalArgOrResult(r,
resultUnmarshaller, resultClass);
+ return SharedComputeUtils.unmarshalResult(r,
resultUnmarshaller, resultClass);
});
}
@@ -83,7 +83,7 @@ public class ResultUnmarshallingJobExecution<R> implements
JobExecution<R> {
updateTimestamp(r);
return new IgniteBiTuple<>(
- SharedComputeUtils.unmarshalArgOrResult(r,
resultUnmarshaller, resultClass),
+ SharedComputeUtils.unmarshalResult(r, resultUnmarshaller,
resultClass),
r.observableTimestamp());
});
}
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
index 5a4a062b7a6..1db8c5dd93a 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/ComputeComponentImplTest.java
@@ -427,7 +427,7 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
ExecuteRequest capturedRequest =
invokeAndCaptureRequest(ExecuteRequest.class);
assertThat(capturedRequest.jobClassName(), is(jobClassName));
-
assertThat(SharedComputeUtils.unmarshalArgOrResult(capturedRequest.input(),
null, null), is(equalTo(arg)));
+ assertThat(SharedComputeUtils.unmarshalArg(capturedRequest.input(),
null, null), is(equalTo(arg)));
}
private void assertThatJobResultRequestWasSent(UUID jobId) {
@@ -495,7 +495,7 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
.build();
JobResultResponse jobResultResponse =
sendRequestAndCaptureResponse(jobResultRequest, testNode, 456L);
-
assertThat(SharedComputeUtils.unmarshalArgOrResult(jobResultResponse.result(),
null, null), is("jobResponse"));
+
assertThat(SharedComputeUtils.unmarshalResult(jobResultResponse.result(), null,
null), is("jobResponse"));
assertThat(jobResultResponse.throwable(), is(nullValue()));
}
@@ -741,7 +741,7 @@ class ComputeComponentImplTest extends
BaseIgniteAbstractTest {
}
private static CompletableFuture<String>
unwrapResult(JobExecution<ComputeJobDataHolder> execution) {
- return execution.resultAsync().thenApply(r ->
SharedComputeUtils.unmarshalArgOrResult(r, null, null));
+ return execution.resultAsync().thenApply(r ->
SharedComputeUtils.unmarshalResult(r, null, null));
}
private static class SimpleJob implements ComputeJob<String, String> {
diff --git
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
index a44f9819fa9..cf735092688 100644
---
a/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
+++
b/modules/compute/src/test/java/org/apache/ignite/internal/compute/executor/ComputeExecutorTest.java
@@ -236,7 +236,7 @@ class ComputeExecutorTest extends BaseIgniteAbstractTest {
await().until(execution::state, jobStateWithStatus(COMPLETED));
- assertThat(execution.resultAsync().thenApply(h ->
SharedComputeUtils.unmarshalArgOrResult(h, null, null)), willBe(1));
+ assertThat(execution.resultAsync().thenApply(h ->
SharedComputeUtils.unmarshalResult(h, null, null)), willBe(1));
assertThat(JobSuccess.runTimes.get(), is(1));
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTypeCheckMarshallingTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTypeCheckMarshallingTest.java
index 9cb5feb8bac..d7c815c8572 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTypeCheckMarshallingTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientComputeTypeCheckMarshallingTest.java
@@ -20,18 +20,17 @@ package org.apache.ignite.internal.runner.app.client;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.compute.JobStatus.COMPLETED;
import static org.apache.ignite.compute.JobStatus.FAILED;
-import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
+import static org.apache.ignite.internal.IgniteExceptionTestUtils.hasMessage;
+import static
org.apache.ignite.internal.IgniteExceptionTestUtils.traceableException;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.JobStateMatcher.jobStateWithStatus;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
import org.apache.ignite.compute.ComputeException;
import org.apache.ignite.compute.ComputeJob;
import org.apache.ignite.compute.JobDescriptor;
@@ -44,17 +43,17 @@ import org.apache.ignite.lang.ErrorGroups.Compute;
import org.apache.ignite.marshalling.ByteArrayMarshaller;
import org.apache.ignite.marshalling.Marshaller;
import org.apache.ignite.marshalling.UnsupportedObjectTypeMarshallingException;
+import org.hamcrest.Matcher;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.Test;
/**
* Test for exceptions that are thrown when marshallers are defined in a wrong
way or throw an exception.
*/
-@SuppressWarnings("resource")
public class ItThinClientComputeTypeCheckMarshallingTest extends
ItAbstractThinClientTest {
@Test
void argumentMarshallerDefinedOnlyInJob() {
- // When submit job with custom marshaller that is defined in job but
+ // When submit job with custom argument marshaller that is defined in
job but
// client JobDescriptor does not declare the argument marshaller.
JobExecution<String> result = submit(
JobTarget.node(node(1)),
@@ -63,12 +62,15 @@ public class ItThinClientComputeTypeCheckMarshallingTest
extends ItAbstractThinC
);
await().until(result::stateAsync, willBe(jobStateWithStatus(FAILED)));
- assertResultFailsWithErr(Compute.MARSHALLING_TYPE_MISMATCH_ERR,
result);
+ assertResultFailsWithErr(
+ result, Compute.MARSHALLING_TYPE_MISMATCH_ERR,
+ "ComputeJob.inputMarshaller is defined, but the
JobDescriptor.argumentMarshaller is not defined."
+ );
}
@Test
void resultMarshallerDefinedOnlyInJob() {
- // When submit job with custom marshaller that is defined in job but
+ // When submit job with custom result marshaller that is defined in
job but
// client JobDescriptor does not declare the result marshaller.
JobExecution<String> result = submit(
JobTarget.node(node(1)),
@@ -77,7 +79,47 @@ public class ItThinClientComputeTypeCheckMarshallingTest
extends ItAbstractThinC
);
await().until(result::stateAsync,
willBe(jobStateWithStatus(COMPLETED)));
- assertThat(result.resultAsync(), willThrow(ComputeException.class));
+ assertResultFailsWithErr(
+ result, Compute.MARSHALLING_TYPE_MISMATCH_ERR,
+ "ComputeJob.resultMarshaller is defined, but the
JobDescriptor.resultMarshaller is not defined."
+ );
+ }
+
+ @Test
+ void argumentMarshallerDefinedOnlyInDescriptor() {
+ // When submit job with custom argument marshaller that is defined in
client JobDescriptor but
+ // job class does not declare the input marshaller.
+ JobExecution<String> result = submit(
+ JobTarget.node(node(1)),
+
JobDescriptor.builder(ResultMarshallingJob.class).argumentMarshaller(ByteArrayMarshaller.create()).build(),
+ "Input"
+ );
+
+ await().until(result::stateAsync, willBe(jobStateWithStatus(FAILED)));
+ assertResultFailsWithErr(
+ result, Compute.MARSHALLING_TYPE_MISMATCH_ERR,
+ "JobDescriptor.argumentMarshaller is defined, but the
ComputeJob.inputMarshaller is not defined."
+ );
+ }
+
+ @Test
+ void resultMarshallerDefinedOnlyInDescriptor() {
+ // When submit job with custom result marshaller that is defined in
client JobDescriptor but
+ // job class does not declare the result marshaller.
+ JobExecution<String> result = submit(
+ JobTarget.node(node(1)),
+ JobDescriptor.builder(ArgMarshallingJob.class)
+ .argumentMarshaller(ByteArrayMarshaller.create())
+ .resultMarshaller(ByteArrayMarshaller.create())
+ .build(),
+ "Input"
+ );
+
+ await().until(result::stateAsync,
willBe(jobStateWithStatus(COMPLETED)));
+ assertResultFailsWithErr(
+ result, Compute.MARSHALLING_TYPE_MISMATCH_ERR,
+ "JobDescriptor.resultMarshaller is defined, but the
ComputeJob.resultMarshaller is not defined."
+ );
}
@Test
@@ -87,14 +129,18 @@ public class ItThinClientComputeTypeCheckMarshallingTest
extends ItAbstractThinC
JobExecution<Integer> result = submit(
JobTarget.node(node(1)),
// The descriptor does not match actual job arguments.
- JobDescriptor.<Integer,
Integer>builder(ArgumentTypeCheckingmarshallingJob.class.getName())
+ JobDescriptor.<Integer,
Integer>builder(ArgumentTypeCheckingMarshallingJob.class.getName())
.argumentMarshaller(new IntegerMarshaller())
.build(),
1
);
await().until(result::stateAsync, willBe(jobStateWithStatus(FAILED)));
- assertResultFailsWithErr(Compute.MARSHALLING_TYPE_MISMATCH_ERR,
result);
+ assertResultFailsWithErr(
+ result, Compute.MARSHALLING_TYPE_MISMATCH_ERR,
+ "Exception in user-defined marshaller",
+ hasMessage(containsString("java.lang.RuntimeException: User
defined error."))
+ );
}
@Test
@@ -112,11 +158,15 @@ public class ItThinClientComputeTypeCheckMarshallingTest
extends ItAbstractThinC
await().until(result::stateAsync,
willBe(jobStateWithStatus(COMPLETED)));
- // The job has completed successfully, but result was not unmarshaled
- assertThrowsWithCause(() -> result.resultAsync().join(),
ClassCastException.class);
+ // The job has completed successfully, but result was not unmarshalled
correctly
+ assertResultFailsWithErr(
+ result, Compute.MARSHALLING_TYPE_MISMATCH_ERR,
+ "Exception in user-defined marshaller",
+ instanceOf(ClassCastException.class)
+ );
}
- static class ArgumentTypeCheckingmarshallingJob implements
ComputeJob<String, String> {
+ static class ArgumentTypeCheckingMarshallingJob implements
ComputeJob<String, String> {
@Override
public CompletableFuture<String> executeAsync(JobExecutionContext
context, @Nullable String arg) {
return completedFuture(arg);
@@ -136,7 +186,7 @@ public class ItThinClientComputeTypeCheckMarshallingTest
extends ItAbstractThinC
return (String) obj;
}
- throw new
UnsupportedObjectTypeMarshallingException(obj.getClass());
+ throw new RuntimeException("User defined error.");
}
};
}
@@ -155,9 +205,19 @@ public class ItThinClientComputeTypeCheckMarshallingTest
extends ItAbstractThinC
}
}
- private static void assertResultFailsWithErr(int errCode, JobExecution<?>
result) {
- var ex = assertThrows(CompletionException.class, () ->
result.resultAsync().join());
- assertThat(ex.getCause(), instanceOf(ComputeException.class));
- assertThat(((ComputeException) ex.getCause()).code(),
equalTo(errCode));
+ private static void assertResultFailsWithErr(JobExecution<?> result, int
errCode, String expectedMessage) {
+ assertResultFailsWithErr(result, errCode, expectedMessage, null);
+ }
+
+ private static void assertResultFailsWithErr(
+ JobExecution<?> result,
+ int errCode,
+ String expectedMessage,
+ @Nullable Matcher<? extends Throwable> causeMatcher
+ ) {
+ assertThat(
+ result.resultAsync(),
+ willThrow(traceableException(ComputeException.class, errCode,
expectedMessage).withCause(causeMatcher))
+ );
}
}