This is an automated email from the ASF dual-hosted git repository.

xincheng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new d38d504332 [Fix] Fix running task instance throught api gots failed 
(#14433)
d38d504332 is described below

commit d38d504332042fcf60b6560284e4eb729600fbbe
Author: 旺阳 <[email protected]>
AuthorDate: Thu Jul 6 11:38:26 2023 +0800

    [Fix] Fix running task instance throught api gots failed (#14433)
    
    * update logic
    
    * split method
---
 .../master/runner/StreamTaskExecuteRunnable.java   |  7 ++++-
 .../execute/TaskExecutionContextFactory.java       | 33 ++++++++++++++--------
 2 files changed, 27 insertions(+), 13 deletions(-)

diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
index 5f64a3c099..40ef9e5df6 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/StreamTaskExecuteRunnable.java
@@ -54,6 +54,7 @@ import 
org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
 import 
org.apache.dolphinscheduler.server.master.runner.dispatcher.WorkerTaskDispatcher;
 import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnable;
 import 
org.apache.dolphinscheduler.server.master.runner.execute.DefaultTaskExecuteRunnableFactory;
+import 
org.apache.dolphinscheduler.server.master.runner.execute.TaskExecutionContextFactory;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.spi.enums.ResourceType;
@@ -104,6 +105,8 @@ public class StreamTaskExecuteRunnable implements Runnable {
 
     protected TaskExecuteStartMessage taskExecuteStartMessage;
 
+    protected TaskExecutionContextFactory taskExecutionContextFactory;
+
     /**
      * task event queue
      */
@@ -122,6 +125,7 @@ public class StreamTaskExecuteRunnable implements Runnable {
                 
SpringApplicationContext.getBean(StreamTaskInstanceExecCacheManager.class);
         this.taskDefinition = taskDefinition;
         this.taskExecuteStartMessage = taskExecuteStartMessage;
+        this.taskExecutionContextFactory = 
SpringApplicationContext.getBean(TaskExecutionContextFactory.class);
     }
 
     public TaskInstance getTaskInstance() {
@@ -337,7 +341,8 @@ public class StreamTaskExecuteRunnable implements Runnable {
         
taskExecutionContext.setProcessDefineVersion(processDefinition.getVersion());
         // process instance id default 0
         taskExecutionContext.setProcessInstanceId(0);
-
+        
taskExecutionContextFactory.setDataQualityTaskExecutionContext(taskExecutionContext,
 taskInstance, tenantCode);
+        
taskExecutionContextFactory.setK8sTaskRelatedInfo(taskExecutionContext, 
taskInstance);
         return taskExecutionContext;
     }
 
diff --git 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecutionContextFactory.java
 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecutionContextFactory.java
index 03005ff70c..0153c81364 100644
--- 
a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecutionContextFactory.java
+++ 
b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/execute/TaskExecutionContextFactory.java
@@ -121,33 +121,42 @@ public class TaskExecutionContextFactory {
                         .orElse(null);
         setTaskResourceInfo(resources);
 
-        // TODO to be optimized
-        DataQualityTaskExecutionContext dataQualityTaskExecutionContext = null;
-        if 
(TASK_TYPE_DATA_QUALITY.equalsIgnoreCase(taskInstance.getTaskType())) {
-            dataQualityTaskExecutionContext = new 
DataQualityTaskExecutionContext();
-            setDataQualityTaskRelation(dataQualityTaskExecutionContext, 
taskInstance, workflowInstance.getTenantCode());
-        }
-
-        K8sTaskExecutionContext k8sTaskExecutionContext = 
setK8sTaskRelation(taskInstance);
-
         Map<String, Property> businessParamsMap = 
curingParamsService.preBuildBusinessParams(workflowInstance);
 
         AbstractParameters baseParam = 
taskPluginManager.getParameters(ParametersNode.builder()
                 
.taskType(taskInstance.getTaskType()).taskParams(taskInstance.getTaskParams()).build());
         Map<String, Property> propertyMap =
                 curingParamsService.paramParsingPreparation(taskInstance, 
baseParam, workflowInstance);
-        return TaskExecutionContextBuilder.get()
+        TaskExecutionContext taskExecutionContext = 
TaskExecutionContextBuilder.get()
                 .buildWorkflowInstanceHost(masterConfig.getMasterAddress())
                 .buildTaskInstanceRelatedInfo(taskInstance)
                 .buildTaskDefinitionRelatedInfo(taskInstance.getTaskDefine())
                 
.buildProcessInstanceRelatedInfo(taskInstance.getProcessInstance())
                 
.buildProcessDefinitionRelatedInfo(taskInstance.getProcessDefine())
                 .buildResourceParametersInfo(resources)
-                
.buildDataQualityTaskExecutionContext(dataQualityTaskExecutionContext)
-                .buildK8sTaskRelatedInfo(k8sTaskExecutionContext)
                 .buildBusinessParamsMap(businessParamsMap)
                 .buildParamInfo(propertyMap)
                 .create();
+
+        setDataQualityTaskExecutionContext(taskExecutionContext, taskInstance, 
workflowInstance.getTenantCode());
+        setK8sTaskRelatedInfo(taskExecutionContext, taskInstance);
+        return taskExecutionContext;
+    }
+
+    public void setDataQualityTaskExecutionContext(TaskExecutionContext 
taskExecutionContext, TaskInstance taskInstance,
+                                                   String tenantCode) {
+        // TODO to be optimized
+        DataQualityTaskExecutionContext dataQualityTaskExecutionContext = null;
+        if 
(TASK_TYPE_DATA_QUALITY.equalsIgnoreCase(taskInstance.getTaskType())) {
+            dataQualityTaskExecutionContext = new 
DataQualityTaskExecutionContext();
+            setDataQualityTaskRelation(dataQualityTaskExecutionContext, 
taskInstance, tenantCode);
+        }
+        
taskExecutionContext.setDataQualityTaskExecutionContext(dataQualityTaskExecutionContext);
+    }
+
+    public void setK8sTaskRelatedInfo(TaskExecutionContext 
taskExecutionContext, TaskInstance taskInstance) {
+        K8sTaskExecutionContext k8sTaskExecutionContext = 
setK8sTaskRelation(taskInstance);
+        
taskExecutionContext.setK8sTaskExecutionContext(k8sTaskExecutionContext);
     }
 
     private Map<String, String> getResourceFullNames(TaskInstance 
taskInstance) {

Reply via email to