This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new cc021bd2c [Zeta] [Fix] Fix some operation didn't use classloader which
belong task (#4611)
cc021bd2c is described below
commit cc021bd2c6de55f39e1ce4f3e3db50ad6d438439
Author: Jia Fan <[email protected]>
AuthorDate: Tue Apr 18 21:49:26 2023 +0800
[Zeta] [Fix] Fix some operation didn't use classloader which belong task
(#4611)
---
.../engine/server/task/operation/source/RequestSplitOperation.java | 7 +++++++
.../server/task/operation/source/SourceNoMoreElementOperation.java | 7 +++++++
.../server/task/operation/source/SourceRegisterOperation.java | 7 +++++++
3 files changed, 21 insertions(+)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
index 45da5b64f..5750046d0 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/RequestSplitOperation.java
@@ -51,9 +51,16 @@ public class RequestSplitOperation extends Operation
implements IdentifiedDataSe
RetryUtils.retryWithException(
() -> {
+ ClassLoader classLoader =
+ server.getTaskExecutionService()
+
.getExecutionContext(enumeratorTaskID.getTaskGroupLocation())
+ .getClassLoader();
+ ClassLoader oldClassLoader =
Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(classLoader);
SourceSplitEnumeratorTask<?> task =
server.getTaskExecutionService().getTask(enumeratorTaskID);
task.requestSplit(taskID.getTaskIndex());
+
Thread.currentThread().setContextClassLoader(oldClassLoader);
return null;
},
new RetryUtils.RetryMaterial(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
index 184d36abd..fec6afcb9 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceNoMoreElementOperation.java
@@ -49,9 +49,16 @@ public class SourceNoMoreElementOperation extends Operation
implements Identifie
SeaTunnelServer server = getService();
RetryUtils.retryWithException(
() -> {
+ ClassLoader classLoader =
+ server.getTaskExecutionService()
+
.getExecutionContext(enumeratorTaskID.getTaskGroupLocation())
+ .getClassLoader();
+ ClassLoader oldClassLoader =
Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(classLoader);
SourceSplitEnumeratorTask<?> task =
server.getTaskExecutionService().getTask(enumeratorTaskID);
task.readerFinished(currentTaskID.getTaskID());
+
Thread.currentThread().setContextClassLoader(oldClassLoader);
return null;
},
new RetryUtils.RetryMaterial(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
index 8ac8a795f..7e0d5ce3a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/source/SourceRegisterOperation.java
@@ -55,9 +55,16 @@ public class SourceRegisterOperation extends Operation
implements IdentifiedData
Address readerAddress = getCallerAddress();
RetryUtils.retryWithException(
() -> {
+ ClassLoader classLoader =
+ server.getTaskExecutionService()
+
.getExecutionContext(enumeratorTaskID.getTaskGroupLocation())
+ .getClassLoader();
+ ClassLoader oldClassLoader =
Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(classLoader);
SourceSplitEnumeratorTask<?> task =
server.getTaskExecutionService().getTask(enumeratorTaskID);
task.receivedReader(readerTaskID, readerAddress);
+
Thread.currentThread().setContextClassLoader(oldClassLoader);
return null;
},
new RetryUtils.RetryMaterial(