This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new cb4c095f2 [Engine] [Core] Serialize Split before send to other task
(#2704)
cb4c095f2 is described below
commit cb4c095f229fd5146034043fd9c78279fa05835a
Author: Hisoka <[email protected]>
AuthorDate: Tue Sep 13 17:46:26 2022 +0800
[Engine] [Core] Serialize Split before send to other task (#2704)
* [Engine] [Core] Serialize Split before send to other task
* [Engine] [E2E] Fix ci problems
---
.../seatunnel/common/utils/SerializationUtils.java | 19 +++++++++++++++++++
.../engine/server/TaskExecutionService.java | 20 +++++++++++++-------
.../engine/server/execution/TaskGroupContext.java | 2 ++
.../context/SeaTunnelSplitEnumeratorContext.java | 7 ++++---
.../task/operation/source/AssignSplitOperation.java | 16 ++++++++++------
5 files changed, 48 insertions(+), 16 deletions(-)
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
index 7e721617e..075139800 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/SerializationUtils.java
@@ -76,4 +76,23 @@ public class SerializationUtils {
}
}
+ public static <T extends Serializable> T deserialize(byte[] bytes,
ClassLoader classLoader) {
+ try (ByteArrayInputStream s = new ByteArrayInputStream(bytes);
+ ObjectInputStream in = new ObjectInputStream(s) {
+ @Override
+ protected Class<?> resolveClass(ObjectStreamClass desc)
throws IOException, ClassNotFoundException {
+ // make sure use current thread classloader
+ if (classLoader == null) {
+ return super.resolveClass(desc);
+ }
+ return Class.forName(desc.getName(), false, classLoader);
+ }
+ }) {
+ @SuppressWarnings("unchecked") final T obj = (T) in.readObject();
+ return obj;
+ } catch (final ClassNotFoundException | IOException ex) {
+ throw new SerializationException(ex);
+ }
+ }
+
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
index 81875a852..eb9d1c07f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/TaskExecutionService.java
@@ -142,11 +142,11 @@ public class TaskExecutionService {
TaskGroup taskGroup = null;
try {
Set<URL> jars = taskImmutableInfo.getJars();
-
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
if (!CollectionUtils.isEmpty(jars)) {
+ classLoader = new
SeatunnelChildFirstClassLoader(Lists.newArrayList(jars));
taskGroup =
-
CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(),
- new
SeatunnelChildFirstClassLoader(Lists.newArrayList(jars)),
+
CustomClassLoadedObject.deserializeWithCustomClassLoader(nodeEngine.getSerializationService(),
classLoader,
taskImmutableInfo.getGroup());
} else {
taskGroup =
nodeEngine.getSerializationService().toObject(taskImmutableInfo.getGroup());
@@ -154,7 +154,7 @@ public class TaskExecutionService {
if
(executionContexts.containsKey(taskGroup.getTaskGroupLocation())) {
throw new RuntimeException(String.format("TaskGroupLocation:
%s already exists", taskGroup.getTaskGroupLocation()));
}
- return deployLocalTask(taskGroup, resultFuture);
+ return deployLocalTask(taskGroup, resultFuture, classLoader);
} catch (Throwable t) {
logger.severe(String.format("TaskGroupID : %s deploy error with
Exception: %s",
taskGroup != null && taskGroup.getTaskGroupLocation() != null
? taskGroup.getTaskGroupLocation().toString() : "taskGroupLocation is null",
@@ -165,11 +165,17 @@ public class TaskExecutionService {
return new PassiveCompletableFuture<>(resultFuture);
}
+ public PassiveCompletableFuture<TaskExecutionState> deployLocalTask(
+ @NonNull TaskGroup taskGroup,
+ @NonNull CompletableFuture<TaskExecutionState> resultFuture) {
+ return deployLocalTask(taskGroup, resultFuture,
Thread.currentThread().getContextClassLoader());
+ }
+
@SuppressWarnings("checkstyle:MagicNumber")
public PassiveCompletableFuture<TaskExecutionState> deployLocalTask(
@NonNull TaskGroup taskGroup,
- @NonNull CompletableFuture<TaskExecutionState> resultFuture
- ) {
+ @NonNull CompletableFuture<TaskExecutionState> resultFuture,
+ @NonNull ClassLoader classLoader) {
try {
taskGroup.init();
Collection<Task> tasks = taskGroup.getTasks();
@@ -188,7 +194,7 @@ public class TaskExecutionService {
submitThreadShareTask(executionTracker, byCooperation.get(true));
submitBlockingTask(executionTracker, byCooperation.get(false));
taskGroup.setTasksContext(taskExecutionContextMap);
- executionContexts.put(taskGroup.getTaskGroupLocation(), new
TaskGroupContext(taskGroup));
+ executionContexts.put(taskGroup.getTaskGroupLocation(), new
TaskGroupContext(taskGroup, classLoader));
cancellationFutures.put(taskGroup.getTaskGroupLocation(),
cancellationFuture);
} catch (Throwable t) {
logger.severe(ExceptionUtils.getMessage(t));
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
index c5b7c74d2..563d736dd 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/execution/TaskGroupContext.java
@@ -22,4 +22,6 @@ import lombok.Data;
@Data
public class TaskGroupContext {
final TaskGroup taskGroup;
+
+ final ClassLoader classLoader;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
index 5bd282af5..e1fe6c1ff 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/context/SeaTunnelSplitEnumeratorContext.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.server.task.context;
import org.apache.seatunnel.api.source.SourceEvent;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.engine.server.task.SourceSplitEnumeratorTask;
import
org.apache.seatunnel.engine.server.task.operation.source.AssignSplitOperation;
@@ -52,14 +53,14 @@ public class SeaTunnelSplitEnumeratorContext<SplitT extends
SourceSplit> impleme
@Override
public void assignSplit(int subtaskId, List<SplitT> splits) {
task.getExecutionContext().sendToMember(new
AssignSplitOperation<>(task.getTaskMemberLocation(subtaskId),
- splits), task.getTaskMemberAddr(subtaskId));
+ SerializationUtils.serialize(splits.toArray())),
task.getTaskMemberAddr(subtaskId));
}
@Override
public void signalNoMoreSplits(int subtaskId) {
task.getExecutionContext().sendToMember(
- new
AssignSplitOperation<>(task.getTaskMemberLocation(subtaskId),
Collections.emptyList()),
- task.getTaskMemberAddr(subtaskId));
+ new AssignSplitOperation<>(task.getTaskMemberLocation(subtaskId),
SerializationUtils.serialize(Collections.emptyList().toArray())),
+ task.getTaskMemberAddr(subtaskId));
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
index a2ccdc4ba..9bcd5bda9 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/AssignSplitOperation.java
@@ -19,6 +19,7 @@ package
org.apache.seatunnel.engine.server.task.operation.source;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.common.utils.RetryUtils;
+import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
import org.apache.seatunnel.engine.server.execution.TaskLocation;
@@ -31,17 +32,18 @@ import
com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.spi.impl.operationservice.Operation;
import java.io.IOException;
-import java.util.List;
+import java.util.Arrays;
+import java.util.stream.Collectors;
public class AssignSplitOperation<SplitT extends SourceSplit> extends
Operation implements IdentifiedDataSerializable {
- private List<SplitT> splits;
+ private byte[] splits;
private TaskLocation taskID;
public AssignSplitOperation() {
}
- public AssignSplitOperation(TaskLocation taskID, List<SplitT> splits) {
+ public AssignSplitOperation(TaskLocation taskID, byte[] splits) {
this.taskID = taskID;
this.splits = splits;
}
@@ -51,7 +53,9 @@ public class AssignSplitOperation<SplitT extends SourceSplit>
extends Operation
SeaTunnelServer server = getService();
RetryUtils.retryWithException(() -> {
SourceSeaTunnelTask<?, SplitT> task =
server.getTaskExecutionService().getTask(taskID);
- task.receivedSourceSplit(splits);
+ ClassLoader classLoader =
server.getTaskExecutionService().getExecutionContext(taskID.getTaskGroupLocation()).getClassLoader();
+ Object[] o = SerializationUtils.deserialize(splits, classLoader);
+ task.receivedSourceSplit(Arrays.stream(o).map(i -> (SplitT)
i).collect(Collectors.toList()));
return null;
}, new RetryUtils.RetryMaterial(Constant.OPERATION_RETRY_TIME, true,
exception -> exception instanceof NullPointerException,
Constant.OPERATION_RETRY_SLEEP));
@@ -59,13 +63,13 @@ public class AssignSplitOperation<SplitT extends
SourceSplit> extends Operation
@Override
protected void writeInternal(ObjectDataOutput out) throws IOException {
- out.writeObject(splits);
+ out.writeByteArray(splits);
taskID.writeData(out);
}
@Override
protected void readInternal(ObjectDataInput in) throws IOException {
- splits = in.readObject();
+ splits = in.readByteArray();
taskID.readData(in);
}