This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 016c58124e6 IGNITE-26890 Add serdes tests for Compute with custom
exceptions (#12514)
016c58124e6 is described below
commit 016c58124e6b4ed612853d8cc72465f587d2e851
Author: Maksim Davydov <[email protected]>
AuthorDate: Fri Feb 27 14:14:39 2026 +0300
IGNITE-26890 Add serdes tests for Compute with custom exceptions (#12514)
---
.../ignite/internal/GridJobExecuteResponse.java | 61 ++++----
.../ignite/p2p/GridP2PComputeExceptionTest.java | 154 +++++++++++++++++++++
.../ignite/testsuites/IgniteP2PSelfTestSuite.java | 4 +-
...xternalRunnableWithExternalizableException.java | 83 +++++++++++
.../ExternalRunnableWithSerializableException.java | 42 ++++++
...ternalTaskWithBrokenExceptionSerialization.java | 132 ++++++++++++++++++
6 files changed, 451 insertions(+), 25 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
index 08ebcd08063..aafe0182807 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobExecuteResponse.java
@@ -22,7 +22,6 @@ import java.util.UUID;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
-import org.apache.ignite.internal.managers.communication.ErrorMessage;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -50,16 +49,13 @@ public class GridJobExecuteResponse implements Message {
@Order(2)
IgniteUuid jobId;
+ /** Job result exception call holder. */
+ @Order(3)
+ @Nullable byte[] gridExBytes;
+
/** */
private IgniteException gridEx;
- /**
- * Serialization call holder for {@code gridEx}. Works with {@link
#marshallUserData(Marshaller)}.
- * Wraps also possible serialization error.
- */
- @Order(value = 3, method = "exceptionMessage")
- @Nullable ErrorMessage gridExMsg;
-
/** Job result serialization call holder. */
@Order(4)
@Nullable byte[] resBytes;
@@ -156,21 +152,13 @@ public class GridJobExecuteResponse implements Message {
}
/** */
- public void exceptionMessage(@Nullable ErrorMessage gridExMsg) {
- if (gridExMsg == null) {
- gridEx = null;
-
- return;
- }
-
- Throwable t = gridExMsg.error();
-
- gridEx = t instanceof IgniteException ? (IgniteException)t : new
IgniteException(t);
+ public void exceptionBytes(@Nullable byte[] gridExBytes) {
+ this.gridExBytes = gridExBytes;
}
/** */
- public @Nullable ErrorMessage exceptionMessage() {
- return gridEx == null ? null : new ErrorMessage(gridEx);
+ public @Nullable byte[] exceptionBytes() {
+ return gridExBytes;
}
/**
@@ -227,7 +215,7 @@ public class GridJobExecuteResponse implements Message {
* Serializes user data to byte[] with provided marshaller.
* Erases non-marshalled data like {@link #getJobAttributes()} or {@link
#getJobResult()}.
*/
- public void marshallUserData(Marshaller marsh, @Nullable IgniteLogger log)
{
+ public void marshallUserData(Marshaller marsh, @Nullable IgniteLogger log)
throws IgniteCheckedException {
if (res != null) {
try {
resBytes = U.marshal(marsh, res);
@@ -261,6 +249,25 @@ public class GridJobExecuteResponse implements Message {
jobAttrs = null;
}
+
+ if (gridEx != null) {
+ try {
+ gridExBytes = U.marshal(marsh, gridEx);
+ }
+ catch (IgniteCheckedException e) {
+ String msg = "Failed to serialize job exception [nodeId=" +
nodeId +
+ ", ses=" + sesId + ", jobId=" + jobId +
+ ", msg=\"" + e.getMessage() + "\"]";
+
+ gridEx = new IgniteException(msg);
+
+ U.error(log, msg, e);
+
+ gridExBytes = U.marshal(marsh, gridEx);
+ }
+
+ gridEx = null;
+ }
}
/**
@@ -279,14 +286,20 @@ public class GridJobExecuteResponse implements Message {
resBytes = null;
}
+
+ if (gridExBytes != null) {
+ gridEx = U.unmarshal(marshaller, gridExBytes, clsLdr);
+
+ gridExBytes = null;
+ }
}
/** */
private void wrapSerializationError(IgniteCheckedException e, String msg,
@Nullable IgniteLogger log) {
if (gridEx != null)
- e.addSuppressed(gridEx);
-
- gridEx = U.convertException(e);
+ gridEx.addSuppressed(e);
+ else
+ gridEx = U.convertException(e);
if (log != null && (log.isDebugEnabled() || !X.hasCause(e,
NodeStoppingException.class)))
U.error(log, msg, e);
diff --git
a/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PComputeExceptionTest.java
b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PComputeExceptionTest.java
new file mode 100644
index 00000000000..6cfdd35a3d2
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/p2p/GridP2PComputeExceptionTest.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.p2p;
+
+import java.lang.reflect.Constructor;
+import java.net.MalformedURLException;
+import java.net.URL;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.compute.ComputeTask;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.lang.IgniteRunnable;
+import org.apache.ignite.testframework.GridTestExternalClassLoader;
+import org.apache.ignite.testframework.config.GridTestProperties;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+
+/** */
+public class GridP2PComputeExceptionTest extends GridCommonAbstractTest {
+ /** */
+ private static final String RUNNABLE_WITH_SERIALIZABLE_EXCEPTION =
+
"org.apache.ignite.tests.p2p.compute.ExternalRunnableWithSerializableException";
+
+ /** */
+ private static final String RUNNABLE_WITH_EXTERNALIZABLE_EXCEPTION =
+
"org.apache.ignite.tests.p2p.compute.ExternalRunnableWithExternalizableException";
+
+ /** */
+ private static final String TASK_WITH_BROKEN_EXCEPTION =
+
"org.apache.ignite.tests.p2p.compute.ExternalTaskWithBrokenExceptionSerialization";
+
+ /** */
+ public static final String EX_MSG = "Message from Exception";
+
+ /** */
+ private static final String EX_UNMARSHAL_MSG = "Failed to unmarshal object
with optimized marshaller";
+
+ /** */
+ private static final String EX_SERIALIZE_MSG = "Failed to serialize job
exception";
+
+ /** Test class loader. */
+ private static final ClassLoader TEST_CLS_LDR;
+
+ static {
+ try {
+ URL[] urls = new URL[] {new
URL(GridTestProperties.getProperty("p2p.uri.cls"))};
+
+ TEST_CLS_LDR = new GridTestExternalClassLoader(urls);
+ }
+ catch (MalformedURLException e) {
+ throw new RuntimeException("Define property p2p.uri.cls", e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return
super.getConfiguration(igniteInstanceName).setPeerClassLoadingEnabled(true);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testComputeRunnableWithSerializableException() throws
Exception {
+ testComputeRunnableWithException(RUNNABLE_WITH_SERIALIZABLE_EXCEPTION);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testComputeRunnableWithExternalizableException() throws
Exception {
+
testComputeRunnableWithException(RUNNABLE_WITH_EXTERNALIZABLE_EXCEPTION);
+ }
+
+ /** */
+ private void testComputeRunnableWithException(String runnableBinaryName)
throws Exception {
+ try (IgniteEx ignite = startGrids(3); IgniteEx cli =
startClientGrid()) {
+ Object runnable =
TEST_CLS_LDR.loadClass(runnableBinaryName).getConstructor().newInstance();
+ IgniteCompute compute = cli.compute(cli.cluster().forRemotes());
+
+ assertThrows(log, () -> compute.run((IgniteRunnable)runnable),
IgniteException.class, EX_MSG);
+ assertEquals(4, ignite.cluster().topologyVersion());
+ }
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testBrokenExceptionSerialization() throws Exception {
+ testComputeTaskWithBrokenException(TASK_WITH_BROKEN_EXCEPTION, true);
+ }
+
+ /**
+ * @throws Exception if failed.
+ */
+ @Test
+ public void testBrokenExceptionDeserialization() throws Exception {
+ testComputeTaskWithBrokenException(TASK_WITH_BROKEN_EXCEPTION, false);
+ }
+
+ /**
+ * @param taskBinaryName ComputeTask class binary name
+ * @param isWriteBroken {@code True} - if serialization step is broken,
+ * {@code False} - if deserialization step is broken
+ */
+ @SuppressWarnings("unchecked")
+ private void testComputeTaskWithBrokenException(String taskBinaryName,
boolean isWriteBroken) throws Exception {
+ try (IgniteEx ignite = startGrids(3); IgniteEx cli =
startClientGrid()) {
+ Constructor<?> ctor =
TEST_CLS_LDR.loadClass(taskBinaryName).getConstructor(boolean.class);
+ ComputeTask<Object, Object> task = (ComputeTask<Object,
Object>)ctor.newInstance(isWriteBroken);
+
+ IgniteCompute compute = cli.compute(cli.cluster().forRemotes());
+
+ if (isWriteBroken)
+ assertThrows(log, () -> compute.execute(task, null),
IgniteException.class, EX_SERIALIZE_MSG);
+ else {
+ // Ignite internally wraps the unmarshaller exceptions:
+ // OptimizedMarshaller throw IgniteCheckedException when
unmarshalling fails which is then wrapped
+ // into BinaryObjectException
+ assertThrows(log, () -> compute.execute(task, null),
BinaryObjectException.class, EX_UNMARSHAL_MSG);
+ }
+
+ assertEquals(4, ignite.cluster().topologyVersion());
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java
index 1bf0b31d861..fffa334aca2 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteP2PSelfTestSuite.java
@@ -24,6 +24,7 @@ import
org.apache.ignite.internal.managers.deployment.P2PCacheOperationIntoCompu
import
org.apache.ignite.internal.managers.deployment.P2PClassLoadingIssuesTest;
import org.apache.ignite.p2p.DeploymentClassLoaderCallableTest;
import org.apache.ignite.p2p.GridP2PClassLoadingSelfTest;
+import org.apache.ignite.p2p.GridP2PComputeExceptionTest;
import org.apache.ignite.p2p.GridP2PComputeWithNestedEntryProcessorTest;
import org.apache.ignite.p2p.GridP2PContinuousDeploymentClientDisconnectTest;
import org.apache.ignite.p2p.GridP2PContinuousDeploymentSelfTest;
@@ -82,7 +83,8 @@ import org.junit.runners.Suite;
GridDifferentLocalDeploymentSelfTest.class,
P2PUnsupportedClassVersionTest.class,
P2PClassLoadingFailureHandlingTest.class,
- P2PClassLoadingIssuesTest.class
+ P2PClassLoadingIssuesTest.class,
+ GridP2PComputeExceptionTest.class
})
public class IgniteP2PSelfTestSuite {
}
diff --git
a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ExternalRunnableWithExternalizableException.java
b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ExternalRunnableWithExternalizableException.java
new file mode 100644
index 00000000000..7d6005e1625
--- /dev/null
+++
b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ExternalRunnableWithExternalizableException.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.compute;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.lang.reflect.Field;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.lang.IgniteRunnable;
+
+/** */
+public class ExternalRunnableWithExternalizableException implements
IgniteRunnable {
+ /** */
+ private static final String EX_MSG = "Message from Exception";
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ throw new ExternalizableException(EX_MSG);
+ }
+
+ /** Custom {@link Externalizable} Exception */
+ public static class ExternalizableException extends IgniteException
implements Externalizable {
+ /** */
+ public ExternalizableException() {
+ // No-op.
+ }
+
+ /** */
+ public ExternalizableException(String msg) {
+ super(msg);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws
IOException {
+ out.writeObject(getMessage());
+ out.writeObject(getStackTrace());
+ out.writeObject(getCause());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
+ String msg = (String)in.readObject();
+ setMessage(msg);
+
+ setStackTrace((StackTraceElement[])in.readObject());
+
+ Throwable cause = (Throwable)in.readObject();
+
+ if (cause != null)
+ initCause(cause);
+ }
+
+ /** */
+ private void setMessage(String msg) {
+ try {
+ Field detailMsg =
Throwable.class.getDeclaredField("detailMessage");
+
+ detailMsg.setAccessible(true);
+ detailMsg.set(this, msg);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Deserialization for exception is
broken!", e);
+ }
+ }
+ }
+}
diff --git
a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ExternalRunnableWithSerializableException.java
b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ExternalRunnableWithSerializableException.java
new file mode 100644
index 00000000000..c786e71c10c
--- /dev/null
+++
b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ExternalRunnableWithSerializableException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.compute;
+
+import java.io.Externalizable;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.lang.IgniteRunnable;
+
+/** */
+public class ExternalRunnableWithSerializableException implements
IgniteRunnable {
+ /** */
+ private static final String EX_MSG = "Message from Exception";
+
+
+ /** {@inheritDoc} */
+ @Override public void run() {
+ throw new SerializableException(EX_MSG);
+ }
+
+ /** Custom {@link Externalizable} Exception */
+ public static class SerializableException extends IgniteException {
+ /** */
+ public SerializableException(String msg) {
+ super(msg);
+ }
+ }
+}
diff --git
a/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ExternalTaskWithBrokenExceptionSerialization.java
b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ExternalTaskWithBrokenExceptionSerialization.java
new file mode 100644
index 00000000000..6c2c84e4255
--- /dev/null
+++
b/modules/extdata/p2p/src/main/java/org/apache/ignite/tests/p2p/compute/ExternalTaskWithBrokenExceptionSerialization.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tests.p2p.compute;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.compute.ComputeJob;
+import org.apache.ignite.compute.ComputeJobAdapter;
+import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.compute.ComputeTaskAdapter;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/** */
+public class ExternalTaskWithBrokenExceptionSerialization extends
ComputeTaskAdapter<Object, Object> {
+ /** */
+ private static final String EX_MSG = "Message from Exception";
+
+ /** */
+ private static final String BROKEN_EX_MSG = "Exception occurred on
serialization step";
+
+ /** */
+ private final boolean isWriteBroken;
+
+ /** */
+ public ExternalTaskWithBrokenExceptionSerialization(boolean isWriteBroken)
{
+ this.isWriteBroken = isWriteBroken;
+ }
+
+ /** {@inheritDoc} */
+ @Override @NotNull public Map<? extends ComputeJob, ClusterNode>
map(List<ClusterNode> subgrid, @Nullable Object arg) {
+ return subgrid.stream().filter(g ->
!g.isClient()).collect(Collectors.toMap(ignored -> job(), srv -> srv));
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable Object reduce(List<ComputeJobResult> results)
throws IgniteException {
+ return null;
+ }
+
+ /** */
+ private ComputeJobAdapter job() {
+ return new ComputeJobAdapter() {
+ @Override public Object execute() throws IgniteException {
+ throw new
ExternalizableExceptionWithBrokenSerialization(EX_MSG, isWriteBroken);
+ }
+ };
+ }
+
+ /** Custom {@link Externalizable} Exception */
+ public static class ExternalizableExceptionWithBrokenSerialization extends
IgniteException implements Externalizable {
+ /** */
+ private boolean isWriteBroken;
+
+ /** */
+ public ExternalizableExceptionWithBrokenSerialization() {
+ // No-op.
+ }
+
+ /** */
+ public ExternalizableExceptionWithBrokenSerialization(String msg,
boolean isWriteBroken) {
+ super(msg);
+
+ this.isWriteBroken = isWriteBroken;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws
IOException {
+ if (isWriteBroken)
+ throw new IgniteException(BROKEN_EX_MSG);
+
+ out.writeBoolean(false);
+ out.writeObject(getMessage());
+ out.writeObject(getStackTrace());
+ out.writeObject(getCause());
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
+ isWriteBroken = in.readBoolean();
+
+ if (!isWriteBroken)
+ throw new IgniteException(BROKEN_EX_MSG);
+
+ String msg = (String)in.readObject();
+
+ setMessage(msg);
+
+ setStackTrace((StackTraceElement[])in.readObject());
+
+ Throwable cause = (Throwable)in.readObject();
+
+ if (cause != null)
+ initCause(cause);
+ }
+
+ /** */
+ private void setMessage(String msg) {
+ try {
+ Field detailMsg =
Throwable.class.getDeclaredField("detailMessage");
+
+ detailMsg.setAccessible(true);
+ detailMsg.set(this, msg);
+ }
+ catch (Exception e) {
+ throw new RuntimeException("Unexpected deserialization
exception is caught!", e);
+ }
+ }
+ }
+}