This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9fd5145  [Improvement-5773][server] need to support two parameters 
related to task (#5774)
9fd5145 is described below

commit 9fd5145b66646f3df847ea3c81bb272621ee86ca
Author: Hua Jiang <[email protected]>
AuthorDate: Fri Jul 9 17:00:32 2021 +0800

    [Improvement-5773][server] need to support two parameters related to task 
(#5774)
    
    * add some new parameter for task
    
    * restore official properties
    
    * improve imports
    
    * modify a variable's name
    
    Co-authored-by: jiang hua <[email protected]>
---
 .../apache/dolphinscheduler/common/Constants.java  | 10 +++
 .../dolphinscheduler/common/utils/JSONUtils.java   |  2 +-
 .../dolphinscheduler/common/utils/LoggerUtils.java |  1 +
 .../dolphinscheduler/server/utils/ParamUtils.java  | 67 +++++++++++++++++
 .../server/worker/task/ShellCommandExecutor.java   | 10 +--
 .../server/worker/task/shell/ShellTask.java        | 15 ++--
 .../server/utils/ParamUtilsTest.java               | 87 +++++++++++++++++++++-
 .../worker/shell/ShellCommandExecutorTest.java     | 47 ++++++------
 8 files changed, 198 insertions(+), 41 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 39f8898..225dc23 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -661,6 +661,16 @@ public final class Constants {
     public static final String PARAMETER_BUSINESS_DATE = "system.biz.date";
 
     /**
+     * the absolute path of current executing task
+     */
+    public static final String PARAMETER_TASK_EXECUTE_PATH = 
"system.task.execute.path";
+
+    /**
+     * the instance id of current task
+     */
+    public static final String PARAMETER_TASK_INSTANCE_ID = 
"system.task.instance.id";
+
+    /**
      * ACCEPTED
      */
     public static final String ACCEPTED = "ACCEPTED";
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
index 9e929e3..2a1e2ac 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
@@ -31,12 +31,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.TimeZone;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.DeserializationContext;
 import com.fasterxml.jackson.databind.JsonDeserializer;
diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
index 904ca97..545bbb2 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
@@ -144,4 +144,5 @@ public class LoggerUtils {
             , String info) {
         optionalLogger.ifPresent((Logger logger) -> logger.info(info));
     }
+
 }
\ No newline at end of file
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
index a49d915..cbf663f 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
@@ -17,13 +17,19 @@
 
 package org.apache.dolphinscheduler.server.utils;
 
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.DataType;
 import org.apache.dolphinscheduler.common.enums.Direct;
 import org.apache.dolphinscheduler.common.process.Property;
+import org.apache.dolphinscheduler.common.task.AbstractParameters;
 import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.common.utils.Preconditions;
 import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+
+import org.apache.logging.log4j.util.Strings;
 
 import java.util.Date;
 import java.util.HashMap;
@@ -96,6 +102,67 @@ public class ParamUtils {
     }
 
     /**
+     * parameter conversion
+     * @param taskExecutionContext the context of this task instance
+     * @param parameters the parameters
+     * @return global params
+     */
+    public static Map<String,Property> convert(TaskExecutionContext 
taskExecutionContext, AbstractParameters parameters) {
+        Preconditions.checkNotNull(taskExecutionContext);
+        Preconditions.checkNotNull(parameters);
+        Map<String,Property> globalParams = 
getUserDefParamsMap(taskExecutionContext.getDefinedParams());
+        Map<String,String> globalParamsMap = 
taskExecutionContext.getDefinedParams();
+        CommandType commandType = 
CommandType.of(taskExecutionContext.getCmdTypeIfComplement());
+        Date scheduleTime = taskExecutionContext.getScheduleTime();
+
+        Map<String,Property> localParams = parameters.getLocalParametersMap();
+
+        if (globalParams == null && localParams == null) {
+            return null;
+        }
+        // if it is a complement,
+        // you need to pass in the task instance id to locate the time
+        // of the process instance complement
+        Map<String,String> params = BusinessTimeUtils
+                .getBusinessTime(commandType,
+                        scheduleTime);
+
+        if (globalParamsMap != null) {
+            params.putAll(globalParamsMap);
+        }
+
+        if (Strings.isNotBlank(taskExecutionContext.getExecutePath())) {
+            
params.put(Constants.PARAMETER_TASK_EXECUTE_PATH,taskExecutionContext.getExecutePath());
+        }
+        
params.put(Constants.PARAMETER_TASK_INSTANCE_ID,Integer.toString(taskExecutionContext.getTaskInstanceId()));
+
+        if (globalParams != null && localParams != null) {
+            globalParams.putAll(localParams);
+        } else if (globalParams == null && localParams != null) {
+            globalParams = localParams;
+        }
+        Iterator<Map.Entry<String, Property>> iter = 
globalParams.entrySet().iterator();
+        while (iter.hasNext()) {
+            Map.Entry<String, Property> en = iter.next();
+            Property property = en.getValue();
+
+            if (StringUtils.isNotEmpty(property.getValue())
+                    && property.getValue().startsWith("$")) {
+                /**
+                 *  local parameter refers to global parameter with the same 
name
+                 *  note: the global parameters of the process instance here 
are solidified parameters,
+                 *  and there are no variables in them.
+                 */
+                String val = property.getValue();
+                val  = ParameterUtils.convertParameterPlaceholders(val, 
params);
+                property.setValue(val);
+            }
+        }
+
+        return globalParams;
+    }
+
+    /**
      * format convert
      * @param paramsMap params map
      * @return Map of converted
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
index 8f3da45..b547ef9 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
@@ -14,12 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.dolphinscheduler.server.worker.task;
 
-import org.apache.commons.io.FileUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
-import org.slf4j.Logger;
+
+import org.apache.commons.io.FileUtils;
 
 import java.io.File;
 import java.io.IOException;
@@ -29,6 +30,8 @@ import java.nio.file.Paths;
 import java.util.List;
 import java.util.function.Consumer;
 
+import org.slf4j.Logger;
+
 /**
  * shell command executor
  */
@@ -78,7 +81,6 @@ public class ShellCommandExecutor extends 
AbstractCommandExecutor {
         return OSUtils.isWindows() ? CMD : SH;
     }
 
-
     /**
      * create command file if not exists
      * @param execCommand   exec command
@@ -117,6 +119,4 @@ public class ShellCommandExecutor extends 
AbstractCommandExecutor {
             FileUtils.writeStringToFile(new File(commandFile), sb.toString(), 
StandardCharsets.UTF_8);
         }
     }
-
-
 }
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
index e193571..f7887df 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
@@ -35,8 +35,6 @@ import 
org.apache.dolphinscheduler.server.worker.task.AbstractTask;
 import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
 import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
 
-import org.slf4j.Logger;
-
 import java.io.File;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -51,6 +49,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.slf4j.Logger;
+
 /**
  * shell task
  */
@@ -101,7 +101,8 @@ public class ShellTask extends AbstractTask {
     public void handle() throws Exception {
         try {
             // construct process
-            CommandExecuteResult commandExecuteResult = 
shellCommandExecutor.run(buildCommand());
+            String command = buildCommand();
+            CommandExecuteResult commandExecuteResult = 
shellCommandExecutor.run(command);
             setExitStatusCode(commandExecuteResult.getExitStatusCode());
             setAppIds(commandExecuteResult.getAppIds());
             setProcessId(commandExecuteResult.getProcessId());
@@ -165,12 +166,8 @@ public class ShellTask extends AbstractTask {
 
     private String parseScript(String script) {
         // combining local and global parameters
-        Map<String, Property> paramsMap = 
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
-            taskExecutionContext.getDefinedParams(),
-            shellParameters.getLocalParametersMap(),
-            shellParameters.getVarPoolMap(),
-            CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
-            taskExecutionContext.getScheduleTime());
+        Map<String, Property> paramsMap = 
ParamUtils.convert(taskExecutionContext,shellParameters);
+
         // replace variable TIME with $[YYYYmmddd...] in shell file when 
history run job and batch complement job
         if (taskExecutionContext.getScheduleTime() != null) {
             if (paramsMap == null) {
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
index a9a1b89..99a6eb2 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
@@ -20,22 +20,30 @@ package org.apache.dolphinscheduler.server.utils;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
+import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.CommandType;
 import org.apache.dolphinscheduler.common.enums.DataType;
 import org.apache.dolphinscheduler.common.enums.Direct;
+import org.apache.dolphinscheduler.common.enums.TaskType;
 import org.apache.dolphinscheduler.common.process.Property;
+import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 
 import java.util.Calendar;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.fasterxml.jackson.databind.SerializationFeature;
+
 /**
  * Test ParamUtils
  */
@@ -82,7 +90,6 @@ public class ParamUtilsTest {
         varProperty.setType(DataType.VARCHAR);
         varProperty.setValue("${global_param}");
         varPoolParams.put("varPool", varProperty);
-
     }
 
     /**
@@ -90,7 +97,6 @@ public class ParamUtilsTest {
      */
     @Test
     public void testConvert() {
-
         //The expected value
         String expected = 
"{\"varPool\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
                 + 
"\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
@@ -127,6 +133,83 @@ public class ParamUtilsTest {
     }
 
     /**
+     * Test some new params related to task
+     */
+    @Test
+    public void testConvertForParamsRelatedTask() throws Exception {
+        // start to form some test data for new paramters
+        Map<String,Property> globalParams = new HashMap<>();
+        Map<String,String> globalParamsMap = new HashMap<>();
+
+        Property taskInstanceIdProperty = new Property();
+        String propName = "task_execution_id";
+        String paramValue = String.format("${%s}", 
Constants.PARAMETER_TASK_INSTANCE_ID);
+        taskInstanceIdProperty.setProp(propName);
+        taskInstanceIdProperty.setDirect(Direct.IN);
+        taskInstanceIdProperty.setType(DataType.VARCHAR);
+        taskInstanceIdProperty.setValue(paramValue);
+        globalParams.put(propName,taskInstanceIdProperty);
+        globalParamsMap.put(propName,paramValue);
+
+        Property taskExecutionPathProperty = new Property();
+        propName = "task_execution_path";
+        paramValue = String.format("${%s}", 
Constants.PARAMETER_TASK_EXECUTE_PATH);
+        taskExecutionPathProperty.setProp(propName);
+        taskExecutionPathProperty.setDirect(Direct.IN);
+        taskExecutionPathProperty.setType(DataType.VARCHAR);
+        taskExecutionPathProperty.setValue(paramValue);
+
+        globalParams.put(propName,taskExecutionPathProperty);
+        globalParamsMap.put(propName,paramValue);
+
+        Calendar calendar = Calendar.getInstance();
+        calendar.set(2019,11,30);
+        Date date = calendar.getTime();
+
+        List<Property> globalParamList = 
globalParams.values().stream().collect(Collectors.toList());
+
+        TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+        taskExecutionContext.setTaskInstanceId(1);
+        taskExecutionContext.setTaskName("params test");
+        taskExecutionContext.setTaskType(TaskType.SHELL.getDesc());
+        taskExecutionContext.setHost("127.0.0.1:1234");
+        taskExecutionContext.setExecutePath("/tmp/test");
+        taskExecutionContext.setLogPath("/log");
+        taskExecutionContext.setProcessInstanceId(1);
+        taskExecutionContext.setExecutorId(1);
+        taskExecutionContext.setCmdTypeIfComplement(0);
+        taskExecutionContext.setScheduleTime(date);
+        
taskExecutionContext.setGlobalParams(JSONUtils.toJsonString(globalParamList));
+        taskExecutionContext.setDefinedParams(globalParamsMap);
+        taskExecutionContext.setTaskParams(
+                "{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd 
HH:mm:ss]\\necho \\\" ${task_execution_id} \\\"\\necho \\\" 
${task_execution_path}\\\"\\n\","
+                        + "\"localParams\":"
+                        + 
"[{\"prop\":\"task_execution_id\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"${system.task.instance.id}\"},"
+                        + 
"{\"prop\":\"task_execution_path\",\"direct\":\"IN\",\"type\":\"VARCHAR"
+                        + 
"\",\"value\":\"${system.task.execute.path}\"}],\"resourceList\":[]}");
+
+        ShellParameters shellParameters = 
JSONUtils.parseObject(taskExecutionContext.getTaskParams(), 
ShellParameters.class);
+
+        //The expected value
+        String expected = 
"{\"task_execution_id\":{\"prop\":\"task_execution_id\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"1\"},"
+                + 
"\"task_execution_path\":{\"prop\":\"task_execution_path\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"/tmp/test\"}}";
+
+        //The expected value when globalParams is null but localParams is not 
null
+        Map<String, Property> paramsMap = 
ParamUtils.convert(taskExecutionContext, shellParameters);
+
+        String result = JSONUtils.toJsonString(paramsMap);
+
+        Map<String,String> resultMap = JSONUtils.parseObject(result,Map.class);
+        Map<String,String> expectedMap = 
JSONUtils.parseObject(expected,Map.class);
+
+        result = 
JSONUtils.toJsonString(resultMap,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
+        expected = 
JSONUtils.toJsonString(expectedMap,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
+
+        assertEquals(expected, result);
+
+    }
+
+    /**
      * Test the overload method of convert
      */
     @Test
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
index e224fa8..b4dfb0e 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
@@ -14,17 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dolphinscheduler.server.worker.shell;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.Method;
+package org.apache.dolphinscheduler.server.worker.shell;
 
 import org.apache.dolphinscheduler.common.Constants;
 import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
 import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
 import org.apache.dolphinscheduler.server.worker.task.AbstractCommandExecutor;
 import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
@@ -32,6 +27,13 @@ import 
org.apache.dolphinscheduler.server.worker.task.TaskProps;
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.util.Date;
+import java.util.List;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Ignore;
@@ -45,9 +47,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.context.ApplicationContext;
 
-import java.util.Date;
-import java.util.List;
-
 /**
  * python shell command executor test
  */
@@ -84,21 +83,21 @@ public class ShellCommandExecutorTest {
         taskProps.setTaskTimeout(360000);
         taskProps.setTaskInstanceId(7657);
 
-
-        TaskInstance taskInstance = processService.findTaskInstanceById(7657);
-
-//        TaskNode taskNode = JSON.parseObject(taskJson, TaskNode.class);
-//        taskProps.setTaskParams(taskNode.getParams());
-
-
-        // custom logger
-//        Logger taskLogger = 
LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
-//                taskInstance.getProcessDefinitionId(),
-//                taskInstance.getProcessInstanceId(),
-//                taskInstance.getId()));
-
-
-//        AbstractTask task = TaskManager.newTask(taskInstance.getTaskType(), 
taskProps, taskLogger);
+        //        TaskInstance taskInstance = 
processService.findTaskInstanceById(7657);
+        //
+        //        TaskNode taskNode = JSON.parseObject(taskJson, 
TaskNode.class);
+        //        taskProps.setTaskParams(taskNode.getParams());
+        //
+        //
+        //        // custom logger
+        //        Logger taskLogger = 
LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
+        //                taskInstance.getProcessDefine().getCode(),
+        //                taskInstance.getProcessDefine().getVersion(),
+        //                taskInstance.getProcessInstanceId(),
+        //                taskInstance.getId()));
+        //
+        //
+        //        AbstractTask task = 
TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger);
 
         AbstractTask task = null;
 

Reply via email to