This is an automated email from the ASF dual-hosted git repository.
mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d49610e2902 IGNITE-26530 Fix ByteArrayMarshaller (#6679)
d49610e2902 is described below
commit d49610e29021a117cb58054cfa1890cb73a27a7d
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Thu Oct 2 13:16:35 2025 +0300
IGNITE-26530 Fix ByteArrayMarshaller (#6679)
---
.../ignite/marshalling/ByteArrayMarshaller.java | 7 ++-
.../internal/compute/SharedComputeUtils.java | 49 ++++++++++++++---
.../org/apache/ignite/internal/compute/Pojo.java | 62 ----------------------
.../ignite/internal/compute/ComputeUtils.java | 12 ++---
.../compute/executor/ComputeExecutorImpl.java | 4 +-
.../compute/task/TaskExecutionInternal.java | 4 +-
6 files changed, 56 insertions(+), 82 deletions(-)
diff --git
a/modules/api/src/main/java/org/apache/ignite/marshalling/ByteArrayMarshaller.java
b/modules/api/src/main/java/org/apache/ignite/marshalling/ByteArrayMarshaller.java
index 94b71a5bf08..a046737ce8f 100644
---
a/modules/api/src/main/java/org/apache/ignite/marshalling/ByteArrayMarshaller.java
+++
b/modules/api/src/main/java/org/apache/ignite/marshalling/ByteArrayMarshaller.java
@@ -78,16 +78,15 @@ public interface ByteArrayMarshaller<T> extends
Marshaller<T, byte[]> {
* deserialization.
*
* - Deterministic loader selection:
- * We explicitly resolve classes using the ClassLoader that
defined this ByteArrayMarshaller
- * instance. In Ignite, this loader is set up to be the job/unit
ClassLoader when user code
- * is executed or transported. This makes class resolution
deterministic and aligned with
+ * We explicitly resolve classes using the ClassLoader set in
the thread's context. In Ignite, this loader is set up before
+ * the unmarshal call to the job/unit ClassLoader. This makes
class resolution deterministic and aligned with
* the deployment context, mirroring the environment where
serialization occurred.
*/
@Override
protected Class<?> resolveClass(ObjectStreamClass desc) throws
IOException, ClassNotFoundException {
String name = desc.getName();
try {
- return Class.forName(name, false,
ByteArrayMarshaller.this.getClass().getClassLoader());
+ return Class.forName(name, false,
Thread.currentThread().getContextClassLoader());
} catch (ClassNotFoundException ex) {
return super.resolveClass(desc);
}
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/compute/SharedComputeUtils.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/compute/SharedComputeUtils.java
index 83aed4be769..a750cee0af3 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/compute/SharedComputeUtils.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/compute/SharedComputeUtils.java
@@ -148,7 +148,27 @@ public class SharedComputeUtils {
public static <T> @Nullable T unmarshalArgOrResult(
@Nullable ComputeJobDataHolder holder,
@Nullable Marshaller<?, byte[]> marshaller,
- @Nullable Class<?> resultClass) {
+ @Nullable Class<?> resultClass
+ ) {
+ return unmarshalArgOrResult(holder, marshaller, resultClass,
Thread.currentThread().getContextClassLoader());
+ }
+
+ /**
+ * Unmarshals the job argument or result.
+ *
+ * @param holder Data holder.
+ * @param marshaller Optional marshaller.
+ * @param resultClass Optional result class.
+ * @param classLoader Class loader to set before unmarshalling.
+ * @param <T> Type of the object.
+ * @return Unmarshalled object.
+ */
+ public static <T> @Nullable T unmarshalArgOrResult(
+ @Nullable ComputeJobDataHolder holder,
+ @Nullable Marshaller<?, byte[]> marshaller,
+ @Nullable Class<?> resultClass,
+ ClassLoader classLoader
+ ) {
if (holder == null || holder.data() == null) {
return null;
}
@@ -192,11 +212,7 @@ public class SharedComputeUtils {
if (marshaller == null) {
throw new ComputeException(MARSHALLING_TYPE_MISMATCH_ERR,
"Marshaller should be defined on the client");
}
- try {
- return (T) marshaller.unmarshal(holder.data());
- } catch (Exception ex) {
- throw new ComputeException(MARSHALLING_TYPE_MISMATCH_ERR,
"Exception in user-defined marshaller", ex);
- }
+ return unmarshalData(marshaller, classLoader, holder.data());
case TUPLE_COLLECTION:
return (T)
readTupleCollection(ByteBuffer.wrap(holder.data()).order(ByteOrder.LITTLE_ENDIAN));
@@ -234,6 +250,27 @@ public class SharedComputeUtils {
}
}
+ /**
+ * Unmarshal raw data using custom marshaller. Sets the specified
classloader to the thread's context.
+ *
+ * @param marshaller Marshaller.
+ * @param classLoader Class loader to set before unmarshalling.
+ * @param raw raw presentation of object.
+ * @param <T> Type of the object.
+ * @return Unmarshalled object.
+ */
+ public static <T> @Nullable T unmarshalData(Marshaller<?, byte[]>
marshaller, ClassLoader classLoader, byte[] raw) {
+ ClassLoader contextClassLoader =
Thread.currentThread().getContextClassLoader();
+ try {
+ Thread.currentThread().setContextClassLoader(classLoader);
+ return (T) marshaller.unmarshal(raw);
+ } catch (Exception ex) {
+ throw new ComputeException(MARSHALLING_TYPE_MISMATCH_ERR,
"Exception in user-defined marshaller", ex);
+ } finally {
+ Thread.currentThread().setContextClassLoader(contextClassLoader);
+ }
+ }
+
private static boolean isNativeType(Class<?> clazz) {
return NATIVE_TYPES.contains(clazz);
}
diff --git
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/Pojo.java
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/Pojo.java
deleted file mode 100644
index 9e335dbd2cf..00000000000
--- a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/Pojo.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.compute;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-/**
- * Pojo that is used to test default java serialization.
- */
-public class Pojo implements Serializable {
- private String name;
-
- public Pojo(String name) {
- this.name = name;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- @Override
- public String toString() {
- return "Pojo [name=" + name + "]";
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Pojo pojo = (Pojo) o;
- return Objects.equals(name, pojo.name);
- }
-
- @Override
- public int hashCode() {
- return Objects.hashCode(name);
- }
-}
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
index 3c0090edbfb..1431cb4706c 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
@@ -357,20 +357,22 @@ public class ComputeUtils {
* @param marshaller Optional marshaller to unmarshal the input.
* @param input Input object.
* @param pojoType Pojo type to use when unmarshalling as a pojo.
+ * @param classLoader Class loader to set before unmarshalling.
* @param <T> Result type.
* @return Unmarshalled object.
*/
public static <T> @Nullable T unmarshalOrNotIfNull(
@Nullable Marshaller<T, byte[]> marshaller,
@Nullable Object input,
- @Nullable Class<?> pojoType
+ @Nullable Class<?> pojoType,
+ ClassLoader classLoader
) {
if (input == null) {
return null;
}
if (input instanceof ComputeJobDataHolder) {
- return
SharedComputeUtils.unmarshalArgOrResult((ComputeJobDataHolder) input,
marshaller, pojoType);
+ return
SharedComputeUtils.unmarshalArgOrResult((ComputeJobDataHolder) input,
marshaller, pojoType, classLoader);
}
if (marshaller == null) {
@@ -384,11 +386,7 @@ public class ComputeUtils {
}
if (input instanceof byte[]) {
- try {
- return marshaller.unmarshal((byte[]) input);
- } catch (Exception ex) {
- throw new ComputeException(MARSHALLING_TYPE_MISMATCH_ERR,
"Exception in user-defined marshaller: " + ex.getMessage(), ex);
- }
+ return SharedComputeUtils.unmarshalData(marshaller, classLoader,
(byte[]) input);
}
throw new ComputeException(
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
index 605aa598516..2ff5e119aec 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
@@ -216,7 +216,9 @@ public class ComputeExecutorImpl implements ComputeExecutor
{
) {
return () -> {
CompletableFuture<R> userJobFut = jobInstance.executeAsync(
- context, unmarshalOrNotIfNull(inputMarshaller, arg,
getJobExecuteArgumentType(jobClass)));
+ context,
+ unmarshalOrNotIfNull(inputMarshaller, arg,
getJobExecuteArgumentType(jobClass), jobClass.getClassLoader())
+ );
return userJobFut == null
? null
diff --git
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
index 5424b894192..c23e002e597 100644
---
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
+++
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
@@ -149,8 +149,8 @@ public class TaskExecutionInternal<I, M, T, R> implements
CancellableTaskExecuti
reduceResultMarshallerRef =
task.reduceJobResultMarshaller();
Class<?> splitArgumentType =
getTaskSplitArgumentType(taskClass);
- return task.splitAsync(context,
unmarshalOrNotIfNull(task.splitJobInputMarshaller(), arg, splitArgumentType))
- .thenApply(jobs -> new SplitResult<>(task, jobs));
+ I input =
unmarshalOrNotIfNull(task.splitJobInputMarshaller(), arg, splitArgumentType,
taskClass.getClassLoader());
+ return task.splitAsync(context, input).thenApply(jobs ->
new SplitResult<>(task, jobs));
},
Integer.MAX_VALUE,