This is an automated email from the ASF dual-hosted git repository.

mpochatkin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new d49610e2902 IGNITE-26530 Fix ByteArrayMarshaller (#6679)
d49610e2902 is described below

commit d49610e29021a117cb58054cfa1890cb73a27a7d
Author: Vadim Pakhnushev <[email protected]>
AuthorDate: Thu Oct 2 13:16:35 2025 +0300

    IGNITE-26530 Fix ByteArrayMarshaller (#6679)
---
 .../ignite/marshalling/ByteArrayMarshaller.java    |  7 ++-
 .../internal/compute/SharedComputeUtils.java       | 49 ++++++++++++++---
 .../org/apache/ignite/internal/compute/Pojo.java   | 62 ----------------------
 .../ignite/internal/compute/ComputeUtils.java      | 12 ++---
 .../compute/executor/ComputeExecutorImpl.java      |  4 +-
 .../compute/task/TaskExecutionInternal.java        |  4 +-
 6 files changed, 56 insertions(+), 82 deletions(-)

diff --git 
a/modules/api/src/main/java/org/apache/ignite/marshalling/ByteArrayMarshaller.java
 
b/modules/api/src/main/java/org/apache/ignite/marshalling/ByteArrayMarshaller.java
index 94b71a5bf08..a046737ce8f 100644
--- 
a/modules/api/src/main/java/org/apache/ignite/marshalling/ByteArrayMarshaller.java
+++ 
b/modules/api/src/main/java/org/apache/ignite/marshalling/ByteArrayMarshaller.java
@@ -78,16 +78,15 @@ public interface ByteArrayMarshaller<T> extends 
Marshaller<T, byte[]> {
              *   deserialization.
              *
              * - Deterministic loader selection:
-             *   We explicitly resolve classes using the ClassLoader that 
defined this ByteArrayMarshaller
-             *   instance. In Ignite, this loader is set up to be the job/unit 
ClassLoader when user code
-             *   is executed or transported. This makes class resolution 
deterministic and aligned with
+             *   We explicitly resolve classes using the ClassLoader set in 
the thread's context. In Ignite, this loader is set up before
+             *   the unmarshal call to the job/unit ClassLoader. This makes 
class resolution deterministic and aligned with
              *   the deployment context, mirroring the environment where 
serialization occurred.
              */
             @Override
             protected Class<?> resolveClass(ObjectStreamClass desc) throws 
IOException, ClassNotFoundException {
                 String name = desc.getName();
                 try {
-                    return Class.forName(name, false, 
ByteArrayMarshaller.this.getClass().getClassLoader());
+                    return Class.forName(name, false, 
Thread.currentThread().getContextClassLoader());
                 } catch (ClassNotFoundException ex) {
                     return super.resolveClass(desc);
                 }
diff --git 
a/modules/client-common/src/main/java/org/apache/ignite/internal/compute/SharedComputeUtils.java
 
b/modules/client-common/src/main/java/org/apache/ignite/internal/compute/SharedComputeUtils.java
index 83aed4be769..a750cee0af3 100644
--- 
a/modules/client-common/src/main/java/org/apache/ignite/internal/compute/SharedComputeUtils.java
+++ 
b/modules/client-common/src/main/java/org/apache/ignite/internal/compute/SharedComputeUtils.java
@@ -148,7 +148,27 @@ public class SharedComputeUtils {
     public static <T> @Nullable T unmarshalArgOrResult(
             @Nullable ComputeJobDataHolder holder,
             @Nullable Marshaller<?, byte[]> marshaller,
-            @Nullable Class<?> resultClass) {
+            @Nullable Class<?> resultClass
+    ) {
+        return unmarshalArgOrResult(holder, marshaller, resultClass, 
Thread.currentThread().getContextClassLoader());
+    }
+
+    /**
+     * Unmarshals the job argument or result.
+     *
+     * @param holder Data holder.
+     * @param marshaller Optional marshaller.
+     * @param resultClass Optional result class.
+     * @param classLoader Class loader to set before unmarshalling.
+     * @param <T> Type of the object.
+     * @return Unmarshalled object.
+     */
+    public static <T> @Nullable T unmarshalArgOrResult(
+            @Nullable ComputeJobDataHolder holder,
+            @Nullable Marshaller<?, byte[]> marshaller,
+            @Nullable Class<?> resultClass,
+            ClassLoader classLoader
+    ) {
         if (holder == null || holder.data() == null) {
             return null;
         }
@@ -192,11 +212,7 @@ public class SharedComputeUtils {
                 if (marshaller == null) {
                     throw new ComputeException(MARSHALLING_TYPE_MISMATCH_ERR, 
"Marshaller should be defined on the client");
                 }
-                try {
-                    return (T) marshaller.unmarshal(holder.data());
-                } catch (Exception ex) {
-                    throw new ComputeException(MARSHALLING_TYPE_MISMATCH_ERR, 
"Exception in user-defined marshaller", ex);
-                }
+                return unmarshalData(marshaller, classLoader, holder.data());
 
             case TUPLE_COLLECTION:
                 return (T) 
readTupleCollection(ByteBuffer.wrap(holder.data()).order(ByteOrder.LITTLE_ENDIAN));
@@ -234,6 +250,27 @@ public class SharedComputeUtils {
         }
     }
 
+    /**
+     * Unmarshal raw data using custom marshaller. Sets the specified 
classloader to the thread's context.
+     *
+     * @param marshaller Marshaller.
+     * @param classLoader Class loader to set before unmarshalling.
+     * @param raw raw presentation of object.
+     * @param <T> Type of the object.
+     * @return Unmarshalled object.
+     */
+    public static <T> @Nullable T unmarshalData(Marshaller<?, byte[]> 
marshaller, ClassLoader classLoader, byte[] raw) {
+        ClassLoader contextClassLoader = 
Thread.currentThread().getContextClassLoader();
+        try {
+            Thread.currentThread().setContextClassLoader(classLoader);
+            return (T) marshaller.unmarshal(raw);
+        } catch (Exception ex) {
+            throw new ComputeException(MARSHALLING_TYPE_MISMATCH_ERR, 
"Exception in user-defined marshaller", ex);
+        } finally {
+            Thread.currentThread().setContextClassLoader(contextClassLoader);
+        }
+    }
+
     private static boolean isNativeType(Class<?> clazz) {
         return NATIVE_TYPES.contains(clazz);
     }
diff --git 
a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/Pojo.java 
b/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/Pojo.java
deleted file mode 100644
index 9e335dbd2cf..00000000000
--- a/modules/compute/src/jobs/java/org/apache/ignite/internal/compute/Pojo.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.compute;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-/**
- * Pojo that is used to test default java serialization.
- */
-public class Pojo implements Serializable {
-    private String name;
-
-    public Pojo(String name) {
-        this.name = name;
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public void setName(String name) {
-        this.name = name;
-    }
-
-    @Override
-    public String toString() {
-        return "Pojo [name=" + name + "]";
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (this == o) {
-            return true;
-        }
-        if (o == null || getClass() != o.getClass()) {
-            return false;
-        }
-        Pojo pojo = (Pojo) o;
-        return Objects.equals(name, pojo.name);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hashCode(name);
-    }
-}
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
index 3c0090edbfb..1431cb4706c 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/ComputeUtils.java
@@ -357,20 +357,22 @@ public class ComputeUtils {
      * @param marshaller Optional marshaller to unmarshal the input.
      * @param input Input object.
      * @param pojoType Pojo type to use when unmarshalling as a pojo.
+     * @param classLoader Class loader to set before unmarshalling.
      * @param <T> Result type.
      * @return Unmarshalled object.
      */
     public static <T> @Nullable T unmarshalOrNotIfNull(
             @Nullable Marshaller<T, byte[]> marshaller,
             @Nullable Object input,
-            @Nullable Class<?> pojoType
+            @Nullable Class<?> pojoType,
+            ClassLoader classLoader
     ) {
         if (input == null) {
             return null;
         }
 
         if (input instanceof ComputeJobDataHolder) {
-            return 
SharedComputeUtils.unmarshalArgOrResult((ComputeJobDataHolder) input, 
marshaller, pojoType);
+            return 
SharedComputeUtils.unmarshalArgOrResult((ComputeJobDataHolder) input, 
marshaller, pojoType, classLoader);
         }
 
         if (marshaller == null) {
@@ -384,11 +386,7 @@ public class ComputeUtils {
         }
 
         if (input instanceof byte[]) {
-            try {
-                return marshaller.unmarshal((byte[]) input);
-            } catch (Exception ex) {
-                throw new ComputeException(MARSHALLING_TYPE_MISMATCH_ERR, 
"Exception in user-defined marshaller: " + ex.getMessage(), ex);
-            }
+            return SharedComputeUtils.unmarshalData(marshaller, classLoader, 
(byte[]) input);
         }
 
         throw new ComputeException(
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
index 605aa598516..2ff5e119aec 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/executor/ComputeExecutorImpl.java
@@ -216,7 +216,9 @@ public class ComputeExecutorImpl implements ComputeExecutor 
{
     ) {
         return () -> {
             CompletableFuture<R> userJobFut = jobInstance.executeAsync(
-                    context, unmarshalOrNotIfNull(inputMarshaller, arg, 
getJobExecuteArgumentType(jobClass)));
+                    context,
+                    unmarshalOrNotIfNull(inputMarshaller, arg, 
getJobExecuteArgumentType(jobClass), jobClass.getClassLoader())
+            );
 
             return userJobFut == null
                     ? null
diff --git 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
index 5424b894192..c23e002e597 100644
--- 
a/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
+++ 
b/modules/compute/src/main/java/org/apache/ignite/internal/compute/task/TaskExecutionInternal.java
@@ -149,8 +149,8 @@ public class TaskExecutionInternal<I, M, T, R> implements 
CancellableTaskExecuti
                     reduceResultMarshallerRef = 
task.reduceJobResultMarshaller();
 
                     Class<?> splitArgumentType = 
getTaskSplitArgumentType(taskClass);
-                    return task.splitAsync(context, 
unmarshalOrNotIfNull(task.splitJobInputMarshaller(), arg, splitArgumentType))
-                            .thenApply(jobs -> new SplitResult<>(task, jobs));
+                    I input = 
unmarshalOrNotIfNull(task.splitJobInputMarshaller(), arg, splitArgumentType, 
taskClass.getClassLoader());
+                    return task.splitAsync(context, input).thenApply(jobs -> 
new SplitResult<>(task, jobs));
                 },
 
                 Integer.MAX_VALUE,

Reply via email to