This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/st-engine by this push:
     new cb4c095f2 [Engine] [Core] Serialize Split before send to other task 
(#2704)
cb4c095f2 is described below

commit cb4c095f229fd5146034043fd9c78279fa05835a
Author: Hisoka <[email protected]>
AuthorDate: Tue Sep 13 17:46:26 2022 +0800

    [Engine] [Core] Serialize Split before send to other task (#2704)
    
    * [Engine] [Core] Serialize Split before send to other task
    
    * [Engine] [E2E] Fix ci problems
---
 .../seatunnel/common/utils/SerializationUtils.java   | 19 +++++++++++++++++++
 .../engine/server/TaskExecutionService.java          | 20 +++++++++++++-------
 .../engine/server/execution/TaskGroupContext.java    |  2 ++
 .../context/SeaTunnelSplitEnumeratorContext.java     |  7 ++++---
 .../task/operation/source/AssignSplitOperation.java  | 16 ++++++++++------
 5 files changed, 48 insertions(+), 16 deletions(-)

diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
index 7e721617e..075139800 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
@@ -76,4 +76,23 @@ public class SerializationUtils {
         }
     }
 
+    public static <T extends Serializable> T deserialize(byte[] bytes, 
ClassLoader classLoader) {
+        try (ByteArrayInputStream s = new ByteArrayInputStream(bytes);
+             ObjectInputStream in = new ObjectInputStream(s) {
+                 @Override
+                 protected Class<?> resolveClass(ObjectStreamClass desc) 
throws IOException, ClassNotFoundException {
+                     // make sure use current thread classloader
+                     if (classLoader == null) {
+                         return super.resolveClass(desc);
+                     }
+                     return Class.forName(desc.getName(), false, classLoader);
+                 }
+             }) {
+            @SuppressWarnings("unchecked") final T obj = (T) in.readObject();
+            return obj;
+        } catch (final ClassNotFoundException | IOException ex) {
+            throw new SerializationException(ex);
+        }
+    }
+
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 81875a852..eb9d1c07f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -142,11 +142,11 @@ public class TaskExecutionService {
         TaskGroup taskGroup = null;
         try {
             Set<URL> jars = taskImmutableInfo.getJars();
-
+            ClassLoader classLoader = 
Thread.currentThread().getContextClassLoader();
             if (!CollectionUtils.isEmpty(jars)) {
+                classLoader = new 
SeatunnelChildFirstClassLoader(Lists.newArrayList(jars));
                 taskGroup =
-                    
CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(),
-                        new 
SeatunnelChildFirstClassLoader(Lists.newArrayList(jars)),
+                    
CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(),
 classLoader,
                         taskImmutableInfo.getGroup());
             } else {
                 taskGroup = 
nodeEngine.getSerializationService().toObject(taskImmutableInfo.getGroup());
@@ -154,7 +154,7 @@ public class TaskExecutionService {
             if 
(executionContexts.containsKey(taskGroup.getTaskGroupLocation())) {
                 throw new RuntimeException(String.format("TaskGroupLocation: 
%s already exists", taskGroup.getTaskGroupLocation()));
             }
-            return deployLocalTask(taskGroup, resultFuture);
+            return deployLocalTask(taskGroup, resultFuture, classLoader);
         } catch (Throwable t) {
             logger.severe(String.format("TaskGroupID : %s  deploy error with 
Exception: %s",
                 taskGroup != null && taskGroup.getTaskGroupLocation() != null 
? taskGroup.getTaskGroupLocation().toString() : "taskGroupLocation is null",
@@ -165,11 +165,17 @@ public class TaskExecutionService {
         return new PassiveCompletableFuture<>(resultFuture);
     }
 
+    public PassiveCompletableFuture<TaskExecutionState> deployLocalTask(
+        @NonNull TaskGroup taskGroup,
+        @NonNull CompletableFuture<TaskExecutionState> resultFuture) {
+        return deployLocalTask(taskGroup, resultFuture, 
Thread.currentThread().getContextClassLoader());
+    }
+
     @SuppressWarnings("checkstyle:MagicNumber")
     public PassiveCompletableFuture<TaskExecutionState> deployLocalTask(
         @NonNull TaskGroup taskGroup,
-        @NonNull CompletableFuture<TaskExecutionState> resultFuture
-    ) {
+        @NonNull CompletableFuture<TaskExecutionState> resultFuture,
+        @NonNull ClassLoader classLoader) {
         try {
             taskGroup.init();
             Collection<Task> tasks = taskGroup.getTasks();
@@ -188,7 +194,7 @@ public class TaskExecutionService {
             submitThreadShareTask(executionTracker, byCooperation.get(true));
             submitBlockingTask(executionTracker, byCooperation.get(false));
             taskGroup.setTasksContext(taskExecutionContextMap);
-            executionContexts.put(taskGroup.getTaskGroupLocation(), new 
TaskGroupContext(taskGroup));
+            executionContexts.put(taskGroup.getTaskGroupLocation(), new 
TaskGroupContext(taskGroup, classLoader));
             cancellationFutures.put(taskGroup.getTaskGroupLocation(), 
cancellationFuture);
         } catch (Throwable t) {
             logger.severe(ExceptionUtils.getMessage(t));
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
index c5b7c74d2..563d736dd 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
@@ -22,4 +22,6 @@ import lombok.Data;
 @Data
 public class TaskGroupContext {
     final TaskGroup taskGroup;
+
+    final ClassLoader classLoader;
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
index 5bd282af5..e1fe6c1ff 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.task.context;
 import org.apache.seatunnel.api.source.SourceEvent;
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
 import 
org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation;
 
@@ -52,14 +53,14 @@ public class SeaTunnelSplitEnumeratorContext<SplitT extends 
SourceSplit> impleme
     @Override
     public void assignSplit(int subtaskId, List<SplitT> splits) {
         task.getExecutionContext().sendToMember(new 
AssignSplitOperation<>(task.getTaskMemberLocation(subtaskId),
-                splits), task.getTaskMemberAddr(subtaskId));
+            SerializationUtils.serialize(splits.toArray())), 
task.getTaskMemberAddr(subtaskId));
     }
 
     @Override
     public void signalNoMoreSplits(int subtaskId) {
         task.getExecutionContext().sendToMember(
-                new 
AssignSplitOperation<>(task.getTaskMemberLocation(subtaskId), 
Collections.emptyList()),
-                task.getTaskMemberAddr(subtaskId));
+            new AssignSplitOperation<>(task.getTaskMemberLocation(subtaskId), 
SerializationUtils.serialize(Collections.emptyList().toArray())),
+            task.getTaskMemberAddr(subtaskId));
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
index a2ccdc4ba..9bcd5bda9 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.engine.server.task.operation.source;
 
 import org.apache.seatunnel.api.source.SourceSplit;
 import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.SerializationUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
@@ -31,17 +32,18 @@ import 
com.hazelcast.nio.serialization.IdentifiedDataSerializable;
 import com.hazelcast.spi.impl.operationservice.Operation;
 
 import java.io.IOException;
-import java.util.List;
+import java.util.Arrays;
+import java.util.stream.Collectors;
 
 public class AssignSplitOperation<SplitT extends SourceSplit> extends 
Operation implements IdentifiedDataSerializable {
 
-    private List<SplitT> splits;
+    private byte[] splits;
     private TaskLocation taskID;
 
     public AssignSplitOperation() {
     }
 
-    public AssignSplitOperation(TaskLocation taskID, List<SplitT> splits) {
+    public AssignSplitOperation(TaskLocation taskID, byte[] splits) {
         this.taskID = taskID;
         this.splits = splits;
     }
@@ -51,7 +53,9 @@ public class AssignSplitOperation<SplitT extends SourceSplit> 
extends Operation
         SeaTunnelServer server = getService();
         RetryUtils.retryWithException(() -> {
             SourceSeaTunnelTask<?, SplitT> task = 
server.getTaskExecutionService().getTask(taskID);
-            task.receivedSourceSplit(splits);
+            ClassLoader classLoader = 
server.getTaskExecutionService().getExecutionContext(taskID.getTaskGroupLocation()).getClassLoader();
+            Object[] o = SerializationUtils.deserialize(splits, classLoader);
+            task.receivedSourceSplit(Arrays.stream(o).map(i -> (SplitT) 
i).collect(Collectors.toList()));
             return null;
         }, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
             exception -> exception instanceof NullPointerException, 
Constant.OPERATION_RETRY_SLEEP));
@@ -59,13 +63,13 @@ public class AssignSplitOperation<SplitT extends 
SourceSplit> extends Operation
 
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
-        out.writeObject(splits);
+        out.writeByteArray(splits);
         taskID.writeData(out);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
-        splits = in.readObject();
+        splits = in.readByteArray();
         taskID.readData(in);
     }
 

Reply via email to