This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 3b58145  [Feature]Add Python task "task variable / result transfer" 
implementation (#3659)
3b58145 is described below

commit 3b581455fc572c01c93a9d5ecfb955a966a20263
Author: guyinyou <36399867+guyin...@users.noreply.github.com>
AuthorDate: Fri Sep 25 13:20:42 2020 +0800

    [Feature]Add Python task "task variable / result transfer" implementation 
(#3659)
    
    * 增加Python Task的“任务变量/结果传递”实现
    Signed-off-by: 古崟佑
    
    * add two files license
    Signed-off-by: 古崟佑
    
    * fix 'server/worker/task/AbstractCommandExecutor.java' code style
    Signed-off-by: 1941815847Cy4 <1941815847...@kuaishou.com>
    
    * update DB
    Signed-off-by: 古崟佑
    
    * update DB -- 2
    Signed-off-by: 古崟佑
    
    * fix codeStyle
    Signed-off-by: 古崟佑
    
    * fix codestyle
    Signed-off-by: 古崟佑
    
    * fix codeStyle
    Signed-off-by: 古崟佑
    
    * fix codeStyle
    Signed-off-by: 古崟佑
    
    * fix codeStyle
    Signed-off-by: 古崟佑
    
    * add VarPoolUtils Test
    Signed-off-by: 古崟佑
    
    * fix VarPoolUtilsTest codeStyle
    Signed-off-by: 古崟佑
    
    * fix VarPoolUtilsTest codeStyle
    Signed-off-by: 古崟佑
    
    * fix VarPoolUtilsTest codeStyle
    Signed-off-by: 古崟佑
    
    * fix VarPoolUtilsTest codeStyle
    Signed-off-by: 古崟佑
    
    * fix VarPoolUtilsTest codeStyle
    Signed-off-by: 古崟佑
    
    * add test config for VarPoolUtilsTest
    Signed-off-by: 古崟佑
    
    * fix unit test
    Signed-off-by: 古崟佑
    
    * fix codeStyle
    Signed-off-by: 古崟佑
    
    * fix VarPoolUtilsTest.java
    Signed-off-by: 古崟佑
    
    * fix
    Signed-off-by: 古崟佑
    
    * change the test class path
    Signed-off-by: 古崟佑
    
    * fix
    Signed-off-by: 古崟佑
    
    * fix "print the error message"
    Signed-off-by: 古崟佑
    
    * fix bug
    Signed-off-by: 古崟佑
    
    * fix
    Signed-off-by: 古崟佑
    
    Co-authored-by: 1941815847Cy4 <1941815847...@kuaishou.com>
---
 .../dolphinscheduler/common/task/TaskParams.java   |  78 +++++++++
 .../common/utils/VarPoolUtils.java                 | 124 ++++++++++++++
 .../common/utils/VarPoolUtilsTest.java             |  73 ++++++++
 .../dao/entity/ProcessInstance.java                |  13 ++
 .../dolphinscheduler/dao/entity/TaskInstance.java  |  12 ++
 .../remote/command/TaskExecuteResponseCommand.java |  12 ++
 .../master/processor/TaskResponseProcessor.java    |   3 +-
 .../master/processor/queue/TaskResponseEvent.java  |  18 +-
 .../processor/queue/TaskResponseService.java       |   3 +-
 .../server/master/runner/MasterExecThread.java     |  15 +-
 .../server/worker/runner/TaskExecuteThread.java    |   1 +
 .../worker/task/AbstractCommandExecutor.java       |  15 +-
 .../server/worker/task/AbstractTask.java           |  13 ++
 .../server/worker/task/python/PythonTask.java      | 189 +++++++++++----------
 .../service/process/ProcessService.java            |   5 +-
 pom.xml                                            |   1 +
 sql/dolphinscheduler-postgre.sql                   |   2 +
 sql/dolphinscheduler_mysql.sql                     |   2 +
 .../1.3.3_schema/mysql/dolphinscheduler_ddl.sql    |  40 +++++
 .../postgresql/dolphinscheduler_ddl.sql            |  36 ++++
 20 files changed, 555 insertions(+), 100 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java
new file mode 100644
index 0000000..abea2d9
--- /dev/null
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/task/TaskParams.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.task;
+
+import java.util.Map;
+
+public class TaskParams {
+
+    private String rawScript;
+    private Map<String, String>[] localParams;
+
+    public void setRawScript(String rawScript) {
+        this.rawScript = rawScript;
+    }
+
+    public void setLocalParams(Map<String, String>[] localParams) {
+        this.localParams = localParams;
+    }
+
+    public String getRawScript() {
+        return rawScript;
+    }
+
+    public void setLocalParamValue(String prop, Object value) {
+        if (localParams == null || value == null) {
+            return;
+        }
+        for (int i = 0; i < localParams.length; i++) {
+            if (localParams[i].get("prop").equals(prop)) {
+                localParams[i].put("value", (String)value);
+            }
+        }
+    }
+
+    public void setLocalParamValue(Map<String, Object> propToValue) {
+        if (localParams == null || propToValue == null) {
+            return;
+        }
+        for (int i = 0; i < localParams.length; i++) {
+            String prop = localParams[i].get("prop");
+            if (propToValue.containsKey(prop)) {
+                localParams[i].put("value",(String)propToValue.get(prop));
+            }
+        }
+    }
+
+    public String getLocalParamValue(String prop) {
+        if (localParams == null) {
+            return null;
+        }
+        for (int i = 0; i < localParams.length; i++) {
+            String tmpProp = localParams[i].get("prop");
+            if (tmpProp.equals(prop)) {
+                return localParams[i].get("value");
+            }
+        }
+        return null;
+    }
+    
+    public Map<String, String>[] getLocalParams() {
+        return localParams;
+    }
+} 
\ No newline at end of file
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
new file mode 100644
index 0000000..837e96f
--- /dev/null
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/VarPoolUtils.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.utils;
+
+import org.apache.dolphinscheduler.common.model.TaskNode;
+import org.apache.dolphinscheduler.common.task.TaskParams;
+
+import java.text.ParseException;
+import java.util.Map;
+
+public class VarPoolUtils {
+    /**
+     * getTaskNodeLocalParam
+     * @param taskNode taskNode
+     * @param prop prop
+     * @return localParamForProp
+     */
+    public static Object getTaskNodeLocalParam(TaskNode taskNode, String prop) 
{
+        String taskParamsJson = taskNode.getParams();
+        TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, 
TaskParams.class);
+        if (taskParams == null) {
+            return null;
+        }
+        return taskParams.getLocalParamValue(prop);
+    }
+    
+    /**
+     * setTaskNodeLocalParams
+     * @param taskNode taskNode
+     * @param prop LocalParamName
+     * @param value LocalParamValue
+     */
+    public static void setTaskNodeLocalParams(TaskNode taskNode, String prop, 
Object value) {
+        String taskParamsJson = taskNode.getParams();
+        TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, 
TaskParams.class);
+        if (taskParams == null) {
+            return;
+        }
+        taskParams.setLocalParamValue(prop, value);
+        taskNode.setParams(JSONUtils.toJsonString(taskParams));
+    }
+
+    /**
+     * setTaskNodeLocalParams
+     * @param taskNode taskNode
+     * @param propToValue propToValue
+     */
+    public static void setTaskNodeLocalParams(TaskNode taskNode, Map<String, 
Object> propToValue) {
+        String taskParamsJson = taskNode.getParams();
+        TaskParams taskParams = JSONUtils.parseObject(taskParamsJson, 
TaskParams.class);
+        if (taskParams == null) {
+            return;
+        }
+        taskParams.setLocalParamValue(propToValue);
+        taskNode.setParams(JSONUtils.toJsonString(taskParams));
+    }
+
+    /**
+     * convertVarPoolToMap
+     * @param propToValue propToValue
+     * @param varPool varPool
+     * @throws ParseException ParseException
+     */
+    public static void convertVarPoolToMap(Map<String, Object> propToValue, 
String varPool) throws ParseException {
+        if (varPool == null || propToValue == null) {
+            return;
+        }
+        String[] splits = varPool.split("\\$VarPool\\$");
+        for (String kv : splits) {
+            String[] kvs = kv.split(",");
+            if (kvs.length == 2) {
+                propToValue.put(kvs[0], kvs[1]);
+            } else {
+                throw new ParseException(kv, 2);
+            }
+        }
+    }
+
+    /**
+     * convertPythonScriptPlaceholders
+     * @param rawScript rawScript
+     * @return String
+     * @throws StringIndexOutOfBoundsException StringIndexOutOfBoundsException
+     */
+    public static String convertPythonScriptPlaceholders(String rawScript) 
throws StringIndexOutOfBoundsException {
+        int len = "${setShareVar(${".length();
+        int scriptStart = 0;
+        while ((scriptStart = rawScript.indexOf("${setShareVar(${", 
scriptStart)) != -1) {
+            int start = -1;
+            int end = rawScript.indexOf('}', scriptStart + len);
+            String prop = rawScript.substring(scriptStart + len, end);
+
+            start = rawScript.indexOf(',', end);
+            end = rawScript.indexOf(')', start);
+
+            String value = rawScript.substring(start + 1, end);
+
+            start = rawScript.indexOf('}', start) + 1;
+            end = rawScript.length();
+
+            String replaceScript = 
String.format("print(\"${{setValue({},{})}}\".format(\"%s\",%s))", prop, value);
+
+            rawScript = rawScript.substring(0, scriptStart) + replaceScript + 
rawScript.substring(start, end);
+
+            scriptStart += replaceScript.length();
+        }
+        return rawScript;
+    }
+} 
\ No newline at end of file
diff --git 
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java
 
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java
new file mode 100644
index 0000000..e47203c
--- /dev/null
+++ 
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/VarPoolUtilsTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.utils;
+
+import org.apache.dolphinscheduler.common.model.TaskNode;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VarPoolUtilsTest {
+    
+    private static final Logger logger = 
LoggerFactory.getLogger(VarPoolUtilsTest.class);
+    
+    @Test
+    public void testSetTaskNodeLocalParams() {
+        String taskJson = 
"{\"conditionResult\":\"{\\\"successNode\\\":[\\\"\\\"],\\\"failedNode\\\":[\\\"\\\"]}\","
+            + 
"\"conditionsTask\":false,\"depList\":[],\"dependence\":\"{}\",\"forbidden\":false,\"id\":\"tasks-75298\",\"maxRetryTimes\":0,\"name\":\"a1\","
+            + "\"params\":\"{\\\"rawScript\\\":\\\"print(\\\\\\\"this is 
python task \\\\\\\",${p0})\\\","
+            + 
"\\\"localParams\\\":[{\\\"prop\\\":\\\"p1\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"1\\\"}],"
+            + 
"\\\"resourceList\\\":[]}\",\"preTasks\":\"[]\",\"retryInterval\":1,\"runFlag\":\"NORMAL\",\"taskInstancePriority\":\"MEDIUM\","
+            + 
"\"taskTimeoutParameter\":{\"enable\":false,\"interval\":0},\"timeout\":\"{\\\"enable\\\":false,\\\"strategy\\\":\\\"\\\"}\","
+            + "\"type\":\"PYTHON\",\"workerGroup\":\"default\"}";
+        TaskNode taskNode = JSONUtils.parseObject(taskJson, TaskNode.class);
+        
+        VarPoolUtils.setTaskNodeLocalParams(taskNode, "p1", "test1");
+        Assert.assertEquals(VarPoolUtils.getTaskNodeLocalParam(taskNode, 
"p1"), "test1");
+        
+        ConcurrentHashMap<String, Object> propToValue = new 
ConcurrentHashMap<String, Object>();
+        propToValue.put("p1", "test2");
+        
+        VarPoolUtils.setTaskNodeLocalParams(taskNode, propToValue);
+        Assert.assertEquals(VarPoolUtils.getTaskNodeLocalParam(taskNode, 
"p1"), "test2");
+    }
+    
+    @Test
+    public void testConvertVarPoolToMap() throws Exception {
+        String varPool = "p1,66$VarPool$p2,69$VarPool$";
+        ConcurrentHashMap<String, Object> propToValue = new 
ConcurrentHashMap<String, Object>();
+        VarPoolUtils.convertVarPoolToMap(propToValue, varPool);
+        Assert.assertEquals((String)propToValue.get("p1"), "66");
+        Assert.assertEquals((String)propToValue.get("p2"), "69");
+        logger.info(propToValue.toString());
+    }
+    
+    @Test
+    public void testConvertPythonScriptPlaceholders() throws Exception {
+        String rawScript = 
"print(${p1});\n${setShareVar(${p1},3)};\n${setShareVar(${p2},4)};";
+        rawScript = VarPoolUtils.convertPythonScriptPlaceholders(rawScript);
+        Assert.assertEquals(rawScript, "print(${p1});\n"
+            + "print(\"${{setValue({},{})}}\".format(\"p1\",3));\n"
+            + "print(\"${{setValue({},{})}}\".format(\"p2\",4));");
+        logger.info(rawScript);
+    }
+}
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
index 3d1a756..e3a3f11 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/ProcessInstance.java
@@ -225,6 +225,11 @@ public class ProcessInstance {
     private int tenantId;
 
     /**
+     * varPool string
+     */
+    private String varPool;
+    
+    /**
      * receivers for api
      */
     @TableField(exist = false)
@@ -256,6 +261,14 @@ public class ProcessInstance {
                 DateUtils.getCurrentTimeStamp();
     }
 
+    public String getVarPool() {
+        return varPool;
+    }
+
+    public void setVarPool(String varPool) {
+        this.varPool = varPool;
+    }
+    
     public ProcessDefinition getProcessDefinition() {
         return processDefinition;
     }
diff --git 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
index 9688200..b13ca87 100644
--- 
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
+++ 
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java
@@ -212,6 +212,11 @@ public class TaskInstance implements Serializable {
     private int executorId;
 
     /**
+     * varPool string
+     */
+    private String varPool;
+    
+    /**
      * executor name
      */
     @TableField(exist = false)
@@ -232,7 +237,14 @@ public class TaskInstance implements Serializable {
         this.executePath = executePath;
     }
 
+    public String getVarPool() {
+        return varPool;
+    }
 
+    public void setVarPool(String varPool) {
+        this.varPool = varPool;
+    }
+    
     public ProcessInstance getProcessInstance() {
         return processInstance;
     }
diff --git 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
index e559334..7f6ee66 100644
--- 
a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
+++ 
b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskExecuteResponseCommand.java
@@ -63,7 +63,19 @@ public class TaskExecuteResponseCommand implements 
Serializable {
      */
     private String appIds;
 
+    /**
+     * varPool string
+     */
+    private String varPool;
 
+    public void setVarPool(String varPool) {
+        this.varPool = varPool;
+    }
+
+    public String getVarPool() {
+        return varPool;
+    }
+    
     public int getTaskInstanceId() {
         return taskInstanceId;
     }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
index b04b930..2633ccd 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskResponseProcessor.java
@@ -90,7 +90,8 @@ public class TaskResponseProcessor implements 
NettyRequestProcessor {
                 responseCommand.getEndTime(),
                 responseCommand.getProcessId(),
                 responseCommand.getAppIds(),
-                responseCommand.getTaskInstanceId());
+                responseCommand.getTaskInstanceId(),
+                responseCommand.getVarPool());
 
         taskResponseService.addResponse(taskResponseEvent);
 
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
index 051cc38..ba07be5 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseEvent.java
@@ -79,7 +79,12 @@ public class TaskResponseEvent {
      */
     private Event event;
 
-    public static TaskResponseEvent newAck(ExecutionStatus state, Date 
startTime, String workerAddress, String executePath, String logPath, int 
taskInstanceId){
+    /**
+     * varPool
+     */
+    private String varPool;
+    
+    public static TaskResponseEvent newAck(ExecutionStatus state, Date 
startTime, String workerAddress, String executePath, String logPath, int 
taskInstanceId) {
         TaskResponseEvent event = new TaskResponseEvent();
         event.setState(state);
         event.setStartTime(startTime);
@@ -91,7 +96,7 @@ public class TaskResponseEvent {
         return event;
     }
 
-    public static TaskResponseEvent newResult(ExecutionStatus state, Date 
endTime, int processId, String appIds, int taskInstanceId){
+    public static TaskResponseEvent newResult(ExecutionStatus state, Date 
endTime, int processId, String appIds, int taskInstanceId, String varPool) {
         TaskResponseEvent event = new TaskResponseEvent();
         event.setState(state);
         event.setEndTime(endTime);
@@ -99,9 +104,18 @@ public class TaskResponseEvent {
         event.setAppIds(appIds);
         event.setTaskInstanceId(taskInstanceId);
         event.setEvent(Event.RESULT);
+        event.setVarPool(varPool);
         return event;
     }
 
+    public String getVarPool() {
+        return varPool;
+    }
+
+    public void setVarPool(String varPool) {
+        this.varPool = varPool;
+    }
+    
     public int getTaskInstanceId() {
         return taskInstanceId;
     }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
index ba07313..6434db7 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/processor/queue/TaskResponseService.java
@@ -139,7 +139,8 @@ public class TaskResponseService {
                         taskResponseEvent.getEndTime(),
                         taskResponseEvent.getProcessId(),
                         taskResponseEvent.getAppIds(),
-                        taskResponseEvent.getTaskInstanceId());
+                        taskResponseEvent.getTaskInstanceId(),
+                        taskResponseEvent.getVarPool());
                 break;
             default:
                 throw new IllegalArgumentException("invalid event type : " + 
event);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index 788b306..3c28e16 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -45,6 +45,7 @@ import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
+import org.apache.dolphinscheduler.common.utils.VarPoolUtils;
 import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
 import org.apache.dolphinscheduler.dao.entity.Schedule;
 import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -59,6 +60,7 @@ import org.apache.commons.io.FileUtils;
 
 import java.io.File;
 import java.io.IOException;
+import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -651,14 +653,23 @@ public class MasterExecThread implements Runnable {
      * submit post node
      * @param parentNodeName parent node name
      */
+    private Map<String,Object> propToValue = new ConcurrentHashMap<String, 
Object>();
     private void submitPostNode(String parentNodeName){
 
         List<String> submitTaskNodeList = parsePostNodeList(parentNodeName);
 
         List<TaskInstance> taskInstances = new ArrayList<>();
         for(String taskNode : submitTaskNodeList){
+            try {
+                VarPoolUtils.convertVarPoolToMap(propToValue, 
processInstance.getVarPool());
+            } catch (ParseException e) {
+                logger.error("parse {} exception", 
processInstance.getVarPool(), e);
+                throw new RuntimeException();
+            }
+            TaskNode taskNodeObject = dag.getNode(taskNode);
+            VarPoolUtils.setTaskNodeLocalParams(taskNodeObject, propToValue);
             taskInstances.add(createTaskInstance(processInstance, taskNode,
-                    dag.getNode(taskNode)));
+                taskNodeObject));
         }
 
         // if previous node success , post node submit
@@ -999,6 +1010,8 @@ public class MasterExecThread implements Runnable {
                         task.getName(), task.getId(), task.getState());
                 // node success , post node submit
                 if(task.getState() == ExecutionStatus.SUCCESS){
+                    processInstance.setVarPool(task.getVarPool());
+                    processService.updateProcessInstance(processInstance);
                     completeTaskList.put(task.getName(), task);
                     submitPostNode(task.getName());
                     continue;
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index 3ba4945..58f7433 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -155,6 +155,7 @@ public class TaskExecuteThread implements Runnable {
             responseCommand.setEndTime(new Date());
             responseCommand.setProcessId(task.getProcessId());
             responseCommand.setAppIds(task.getAppIds());
+            responseCommand.setVarPool(task.getVarPool());
             logger.info("task instance id : {},task final status : {}", 
taskExecutionContext.getTaskInstanceId(), task.getExitStatus());
         } catch (Exception e) {
             logger.error("task scheduler failure", e);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
index 3dedece..dddd1a6 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
@@ -66,6 +66,7 @@ public abstract class AbstractCommandExecutor {
      */
     protected static final Pattern APPLICATION_REGEX = 
Pattern.compile(Constants.APPLICATION_REGEX);
 
+    protected StringBuilder varPool = new StringBuilder();
     /**
      *  process
      */
@@ -234,7 +235,10 @@ public abstract class AbstractCommandExecutor {
         return result;
     }
 
-
+    public String getVarPool() {
+        return varPool.toString();
+    }
+    
     /**
      * cancel application
      * @throws Exception exception
@@ -347,8 +351,13 @@ public abstract class AbstractCommandExecutor {
                     long lastFlushTime = System.currentTimeMillis();
 
                     while ((line = inReader.readLine()) != null) {
-                        logBuffer.add(line);
-                        lastFlushTime = flush(lastFlushTime);
+                        if (line.startsWith("${setValue(")) {
+                            
varPool.append(line.substring("${setValue(".length(), line.length() - 2));
+                            varPool.append("$VarPool$");
+                        } else {
+                            logBuffer.add(line);
+                            lastFlushTime = flush(lastFlushTime);
+                        }
                     }
                 } catch (Exception e) {
                     logger.error(e.getMessage(),e);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
index ae03932..1a66349 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
@@ -48,6 +48,11 @@ import java.util.Map;
 public abstract class AbstractTask {
 
     /**
+     * varPool string
+     */
+    protected String varPool;
+    
+    /**
      * taskExecutionContext
      **/
     TaskExecutionContext taskExecutionContext;
@@ -121,6 +126,14 @@ public abstract class AbstractTask {
         logger.info(" -> {}", String.join("\n\t", logs));
     }
 
+    public void setVarPool(String varPool) {
+        this.varPool = varPool;
+    }
+
+    public String getVarPool() {
+        return varPool;
+    }
+    
     /**
      * get exit status code
      * @return  exit status code
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
index 367da80..6e561c1 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/python/PythonTask.java
@@ -38,103 +38,110 @@ import java.util.Map;
  */
 public class PythonTask extends AbstractTask {
 
-  /**
-   *  python parameters
-   */
-  private PythonParameters pythonParameters;
-
-  /**
-   *  task dir
-   */
-  private String taskDir;
-
-  /**
-   * python command executor
-   */
-  private PythonCommandExecutor pythonCommandExecutor;
-
-  /**
-   * taskExecutionContext
-   */
-  private TaskExecutionContext taskExecutionContext;
-
-  /**
-   * constructor
-   * @param taskExecutionContext taskExecutionContext
-   * @param logger    logger
-   */
-  public PythonTask(TaskExecutionContext taskExecutionContext, Logger logger) {
-    super(taskExecutionContext, logger);
-    this.taskExecutionContext = taskExecutionContext;
-
-    this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle,
-            taskExecutionContext,
-            logger);
-  }
-
-  @Override
-  public void init() {
-    logger.info("python task params {}", taskExecutionContext.getTaskParams());
-
-    pythonParameters = 
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
PythonParameters.class);
-
-    if (!pythonParameters.checkParameters()) {
-      throw new RuntimeException("python task params is not valid");
+    /**
+     *  python parameters
+     */
+    private PythonParameters pythonParameters;
+
+    /**
+     *    task dir
+     */
+    private String taskDir;
+
+    /**
+     * python command executor
+     */
+    private PythonCommandExecutor pythonCommandExecutor;
+
+    /**
+     * taskExecutionContext
+     */
+    private TaskExecutionContext taskExecutionContext;
+
+    /**
+     * constructor
+     * @param taskExecutionContext taskExecutionContext
+     * @param logger        logger
+     */
+    public PythonTask(TaskExecutionContext taskExecutionContext, Logger 
logger) {
+        super(taskExecutionContext, logger);
+        this.taskExecutionContext = taskExecutionContext;
+
+        this.pythonCommandExecutor = new PythonCommandExecutor(this::logHandle,
+                        taskExecutionContext,
+                        logger);
     }
-  }
 
-  @Override
-  public void handle() throws Exception {
-    try {
-      //  construct process
-      CommandExecuteResult commandExecuteResult = 
pythonCommandExecutor.run(buildCommand());
+    @Override
+    public void init() {
+        logger.info("python task params {}", 
taskExecutionContext.getTaskParams());
 
-      setExitStatusCode(commandExecuteResult.getExitStatusCode());
-      setAppIds(commandExecuteResult.getAppIds());
-      setProcessId(commandExecuteResult.getProcessId());
-    }
-    catch (Exception e) {
-      logger.error("python task failure", e);
-      setExitStatusCode(Constants.EXIT_CODE_FAILURE);
-      throw e;
-    }
-  }
-
-  @Override
-  public void cancelApplication(boolean cancelApplication) throws Exception {
-    // cancel process
-    pythonCommandExecutor.cancelApplication();
-  }
-
-  /**
-   * build command
-   * @return raw python script
-   * @throws Exception exception
-   */
-  private String buildCommand() throws Exception {
-    String rawPythonScript = 
pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
-
-    // replace placeholder
-    Map<String, Property> paramsMap = 
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
-            taskExecutionContext.getDefinedParams(),
-            pythonParameters.getLocalParametersMap(),
-            CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
-            taskExecutionContext.getScheduleTime());
-    if (paramsMap != null){
-      rawPythonScript = 
ParameterUtils.convertParameterPlaceholders(rawPythonScript, 
ParamUtils.convert(paramsMap));
-    }
+        pythonParameters = 
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
PythonParameters.class);
 
-    logger.info("raw python script : {}", pythonParameters.getRawScript());
-    logger.info("task dir : {}", taskDir);
-
-    return rawPythonScript;
-  }
+        if (!pythonParameters.checkParameters()) {
+            throw new RuntimeException("python task params is not valid");
+        }
+    }
 
-  @Override
-  public AbstractParameters getParameters() {
-    return pythonParameters;
-  }
+    @Override
+    public void handle() throws Exception {
+        try {
+            //    construct process
+            CommandExecuteResult commandExecuteResult = 
pythonCommandExecutor.run(buildCommand());
+
+            setExitStatusCode(commandExecuteResult.getExitStatusCode());
+            setAppIds(commandExecuteResult.getAppIds());
+            setProcessId(commandExecuteResult.getProcessId());
+            setVarPool(pythonCommandExecutor.getVarPool());
+        }
+        catch (Exception e) {
+            logger.error("python task failure", e);
+            setExitStatusCode(Constants.EXIT_CODE_FAILURE);
+            throw e;
+        }
+    }
 
+    @Override
+    public void cancelApplication(boolean cancelApplication) throws Exception {
+        // cancel process
+        pythonCommandExecutor.cancelApplication();
+    }
 
+    /**
+     * build command
+     * @return raw python script
+     * @throws Exception exception
+     */
+    private String buildCommand() throws Exception {
+        String rawPythonScript = 
pythonParameters.getRawScript().replaceAll("\\r\\n", "\n");
+
+        // replace placeholder
+        Map<String, Property> paramsMap = 
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
+                        taskExecutionContext.getDefinedParams(),
+                        pythonParameters.getLocalParametersMap(),
+                        
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
+                        taskExecutionContext.getScheduleTime());
+        
+        try {
+            rawPythonScript = 
VarPoolUtils.convertPythonScriptPlaceholders(rawPythonScript);
+        }
+        catch (StringIndexOutOfBoundsException e) {
+            logger.error("setShareVar field format error, raw python script : 
{}", rawPythonScript);
+        }
+        
+        if (paramsMap != null) {
+            rawPythonScript = 
ParameterUtils.convertParameterPlaceholders(rawPythonScript, 
ParamUtils.convert(paramsMap));
+        }
+
+        logger.info("raw python script : {}", pythonParameters.getRawScript());
+        logger.info("task dir : {}", taskDir);
+
+        return rawPythonScript;
+    }
 
+    @Override
+    public AbstractParameters getParameters() {
+        return pythonParameters;
+    }
+    
 }
diff --git 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
index 6f64267..7344cf1 100644
--- 
a/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
+++ 
b/dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/process/ProcessService.java
@@ -1464,17 +1464,20 @@ public class ProcessService {
      * @param state state
      * @param endTime endTime
      * @param taskInstId taskInstId
+     * @param varPool varPool
      */
     public void changeTaskState(ExecutionStatus state,
                                 Date endTime,
                                 int processId,
                                 String appIds,
-                                int taskInstId) {
+                                int taskInstId,
+                                String varPool) {
         TaskInstance taskInstance = taskInstanceMapper.selectById(taskInstId);
         taskInstance.setPid(processId);
         taskInstance.setAppLink(appIds);
         taskInstance.setState(state);
         taskInstance.setEndTime(endTime);
+        taskInstance.setVarPool(varPool);
         saveTaskInstance(taskInstance);
     }
 
diff --git a/pom.xml b/pom.xml
index c8cb5a9..e895b01 100644
--- a/pom.xml
+++ b/pom.xml
@@ -785,6 +785,7 @@
                         <include>**/common/utils/StringTest.java</include>
                         <include>**/common/utils/StringUtilsTest.java</include>
                         
<include>**/common/utils/TaskParametersUtilsTest.java</include>
+                        
<include>**/common/utils/VarPoolUtilsTest.java</include>
                         <include>**/common/utils/HadoopUtilsTest.java</include>
                         <include>**/common/utils/HttpUtilsTest.java</include>
                         
<include>**/common/utils/KerberosHttpClientTest.java</include>
diff --git a/sql/dolphinscheduler-postgre.sql b/sql/dolphinscheduler-postgre.sql
index 5ae37e1..e2f5ebd 100644
--- a/sql/dolphinscheduler-postgre.sql
+++ b/sql/dolphinscheduler-postgre.sql
@@ -377,6 +377,7 @@ CREATE TABLE t_ds_process_instance (
   worker_group varchar(64) ,
   timeout int DEFAULT '0' ,
   tenant_id int NOT NULL DEFAULT '-1' ,
+  var_pool text ,
   PRIMARY KEY (id)
 ) ;
   create index process_instance_index on t_ds_process_instance 
(process_definition_id,id);
@@ -595,6 +596,7 @@ CREATE TABLE t_ds_task_instance (
   executor_id int DEFAULT NULL ,
   first_submit_time timestamp DEFAULT NULL ,
   delay_time int DEFAULT '0' ,
+  var_pool text ,
   PRIMARY KEY (id)
 ) ;
 
diff --git a/sql/dolphinscheduler_mysql.sql b/sql/dolphinscheduler_mysql.sql
index 61e6975..9039a19 100644
--- a/sql/dolphinscheduler_mysql.sql
+++ b/sql/dolphinscheduler_mysql.sql
@@ -487,6 +487,7 @@ CREATE TABLE `t_ds_process_instance` (
   `worker_group` varchar(64) DEFAULT NULL COMMENT 'worker group id',
   `timeout` int(11) DEFAULT '0' COMMENT 'time out',
   `tenant_id` int(11) NOT NULL DEFAULT '-1' COMMENT 'tenant id',
+  `var_pool` longtext COMMENT 'var_pool',
   PRIMARY KEY (`id`),
   KEY `process_instance_index` (`process_definition_id`,`id`) USING BTREE,
   KEY `start_time_index` (`start_time`) USING BTREE
@@ -737,6 +738,7 @@ CREATE TABLE `t_ds_task_instance` (
   `executor_id` int(11) DEFAULT NULL,
   `first_submit_time` datetime DEFAULT NULL COMMENT 'task first submit time',
   `delay_time` int(4) DEFAULT '0' COMMENT 'task delay execution time',
+  `var_pool` longtext COMMENT 'var_pool',
   PRIMARY KEY (`id`),
   KEY `process_instance_id` (`process_instance_id`) USING BTREE,
   KEY `task_instance_index` (`process_definition_id`,`process_instance_id`) 
USING BTREE,
diff --git a/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql 
b/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql
index 4348827..ae66da9 100644
--- a/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql
+++ b/sql/upgrade/1.3.3_schema/mysql/dolphinscheduler_ddl.sql
@@ -56,6 +56,46 @@ delimiter ;
 CALL uc_dolphin_T_t_ds_task_instance_A_delay_time();
 DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_A_delay_time;
 
+-- uc_dolphin_T_t_ds_task_instance_A_var_pool
+drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_task_instance_A_var_pool;
+delimiter d//
+CREATE PROCEDURE uc_dolphin_T_t_ds_task_instance_A_var_pool()
+   BEGIN
+       IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+           WHERE TABLE_NAME='t_ds_task_instance'
+           AND TABLE_SCHEMA=(SELECT DATABASE())
+           AND COLUMN_NAME ='var_pool')
+   THEN
+         ALTER TABLE t_ds_task_instance ADD `var_pool` longtext NULL;
+       END IF;
+ END;
+
+d//
+
+delimiter ;
+CALL uc_dolphin_T_t_ds_task_instance_A_var_pool();
+DROP PROCEDURE uc_dolphin_T_t_ds_task_instance_A_var_pool;
+
+-- uc_dolphin_T_t_ds_process_instance_A_var_pool
+drop PROCEDURE if EXISTS uc_dolphin_T_t_ds_process_instance_A_var_pool;
+delimiter d//
+CREATE PROCEDURE uc_dolphin_T_t_ds_process_instance_A_var_pool()
+   BEGIN
+       IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+           WHERE TABLE_NAME='t_ds_process_instance'
+           AND TABLE_SCHEMA=(SELECT DATABASE())
+           AND COLUMN_NAME ='var_pool')
+   THEN
+         ALTER TABLE t_ds_process_instance ADD `var_pool` longtext NULL;
+       END IF;
+ END;
+
+d//
+
+delimiter ;
+CALL uc_dolphin_T_t_ds_process_instance_A_var_pool();
+DROP PROCEDURE uc_dolphin_T_t_ds_process_instance_A_var_pool;
+
 -- uc_dolphin_T_t_ds_process_definition_A_modify_by
 drop PROCEDURE if EXISTS ct_dolphin_T_t_ds_process_definition_version;
 delimiter d//
diff --git a/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql 
b/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql
index e276761..3351cac 100644
--- a/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql
+++ b/sql/upgrade/1.3.3_schema/postgresql/dolphinscheduler_ddl.sql
@@ -51,6 +51,42 @@ delimiter ;
 SELECT uc_dolphin_T_t_ds_task_instance_A_delay_time();
 DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_A_delay_time();
 
+-- uc_dolphin_T_t_ds_process_instance_A_var_pool
+delimiter d//
+CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_process_instance_A_var_pool() 
RETURNS void AS $$
+BEGIN
+       IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+          WHERE TABLE_NAME='t_ds_process_instance'
+                            AND COLUMN_NAME ='var_pool')
+      THEN
+         ALTER TABLE t_ds_process_instance ADD COLUMN var_pool text;
+       END IF;
+END;
+$$ LANGUAGE plpgsql;
+d//
+
+delimiter ;
+SELECT uc_dolphin_T_t_ds_process_instance_A_var_pool();
+DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_process_instance_A_var_pool();
+
+-- uc_dolphin_T_t_ds_task_instance_A_var_pool
+delimiter d//
+CREATE OR REPLACE FUNCTION uc_dolphin_T_t_ds_task_instance_A_var_pool() 
RETURNS void AS $$
+BEGIN
+       IF NOT EXISTS (SELECT 1 FROM information_schema.COLUMNS
+          WHERE TABLE_NAME='t_ds_task_instance'
+                            AND COLUMN_NAME ='var_pool')
+      THEN
+         ALTER TABLE t_ds_task_instance ADD COLUMN var_pool text;
+       END IF;
+END;
+$$ LANGUAGE plpgsql;
+d//
+
+delimiter ;
+SELECT uc_dolphin_T_t_ds_task_instance_A_var_pool();
+DROP FUNCTION IF EXISTS uc_dolphin_T_t_ds_task_instance_A_var_pool();
+
 -- uc_dolphin_T_t_ds_process_definition_A_modify_by
 delimiter d//
 CREATE OR REPLACE FUNCTION ct_dolphin_T_t_ds_process_definition_version() 
RETURNS void AS $$

Reply via email to