This is an automated email from the ASF dual-hosted git repository.
xincheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new d38d504332 [Fix] Fix running task instance throught api gots failed
(#14433)
d38d504332 is described below
commit d38d504332042fcf60b6560284e4eb729600fbbe
Author: 旺阳 <[email protected]>
AuthorDate: Thu Jul 6 11:38:26 2023 +0800
[Fix] Fix running task instance throught api gots failed (#14433)
* update logic
* split method
---
.../master/runner/StreamTaskExecuteRunnable.java | 7 ++++-
.../execute/TaskExecutionContextFactory.java | 33 ++++++++++++++--------
2 files changed, 27 insertions(+), 13 deletions(-)
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
index 5f64a3c099..40ef9e5df6 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
@@ -54,6 +54,7 @@ import
org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
import
org.apache.dolphinscheduler.server.master.runner.dispatcher.WorkerTaskDispatcher;
import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
import
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
+import
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecutionContextFactory;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.spi.enums.ResourceType;
@@ -104,6 +105,8 @@ public class StreamTaskExecuteRunnable implements Runnable {
protected TaskExecuteStartMessage taskExecuteStartMessage;
+ protected TaskExecutionContextFactory taskExecutionContextFactory;
+
/**
* task event queue
*/
@@ -122,6 +125,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
SpringApplicationContext.getBean(StreamTaskInstanceExecCacheManager.class);
this.taskDefinition = taskDefinition;
this.taskExecuteStartMessage = taskExecuteStartMessage;
+ this.taskExecutionContextFactory =
SpringApplicationContext.getBean(TaskExecutionContextFactory.class);
}
public TaskInstance getTaskInstance() {
@@ -337,7 +341,8 @@ public class StreamTaskExecuteRunnable implements Runnable {
taskExecutionContext.setProcessDefineVersion(processDefinition.getVersion());
// process instance id default 0
taskExecutionContext.setProcessInstanceId(0);
-
+
taskExecutionContextFactory.setDataQualityTaskExecutionContext(taskExecutionContext,
taskInstance, tenantCode);
+
taskExecutionContextFactory.setK8sTaskRelatedInfo(taskExecutionContext,
taskInstance);
return taskExecutionContext;
}
diff --git
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecutionContextFactory.java
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecutionContextFactory.java
index 03005ff70c..0153c81364 100644
---
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecutionContextFactory.java
+++
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecutionContextFactory.java
@@ -121,33 +121,42 @@ public class TaskExecutionContextFactory {
.orElse(null);
setTaskResourceInfo(resources);
- // TODO to be optimized
- DataQualityTaskExecutionContext dataQualityTaskExecutionContext = null;
- if
(TASK_TYPE_DATA_QUALITY.equalsIgnoreCase(taskInstance.getTaskType())) {
- dataQualityTaskExecutionContext = new
DataQualityTaskExecutionContext();
- setDataQualityTaskRelation(dataQualityTaskExecutionContext,
taskInstance, workflowInstance.getTenantCode());
- }
-
- K8sTaskExecutionContext k8sTaskExecutionContext =
setK8sTaskRelation(taskInstance);
-
Map<String, Property> businessParamsMap =
curingParamsService.preBuildBusinessParams(workflowInstance);
AbstractParameters baseParam =
taskPluginManager.getParameters(ParametersNode.builder()
.taskType(taskInstance.getTaskType()).taskParams(taskInstance.getTaskParams()).build());
Map<String, Property> propertyMap =
curingParamsService.paramParsingPreparation(taskInstance,
baseParam, workflowInstance);
- return TaskExecutionContextBuilder.get()
+ TaskExecutionContext taskExecutionContext =
TaskExecutionContextBuilder.get()
.buildWorkflowInstanceHost(masterConfig.getMasterAddress())
.buildTaskInstanceRelatedInfo(taskInstance)
.buildTaskDefinitionRelatedInfo(taskInstance.getTaskDefine())
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
.buildResourceParametersInfo(resources)
-
.buildDataQualityTaskExecutionContext(dataQualityTaskExecutionContext)
- .buildK8sTaskRelatedInfo(k8sTaskExecutionContext)
.buildBusinessParamsMap(businessParamsMap)
.buildParamInfo(propertyMap)
.create();
+
+ setDataQualityTaskExecutionContext(taskExecutionContext, taskInstance,
workflowInstance.getTenantCode());
+ setK8sTaskRelatedInfo(taskExecutionContext, taskInstance);
+ return taskExecutionContext;
+ }
+
+ public void setDataQualityTaskExecutionContext(TaskExecutionContext
taskExecutionContext, TaskInstance taskInstance,
+ String tenantCode) {
+ // TODO to be optimized
+ DataQualityTaskExecutionContext dataQualityTaskExecutionContext = null;
+ if
(TASK_TYPE_DATA_QUALITY.equalsIgnoreCase(taskInstance.getTaskType())) {
+ dataQualityTaskExecutionContext = new
DataQualityTaskExecutionContext();
+ setDataQualityTaskRelation(dataQualityTaskExecutionContext,
taskInstance, tenantCode);
+ }
+
taskExecutionContext.setDataQualityTaskExecutionContext(dataQualityTaskExecutionContext);
+ }
+
+ public void setK8sTaskRelatedInfo(TaskExecutionContext
taskExecutionContext, TaskInstance taskInstance) {
+ K8sTaskExecutionContext k8sTaskExecutionContext =
setK8sTaskRelation(taskInstance);
+
taskExecutionContext.setK8sTaskExecutionContext(k8sTaskExecutionContext);
}
private Map<String, String> getResourceFullNames(TaskInstance
taskInstance) {