This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 9fd5145 [Improvement-5773][server] need to support two parameters
related to task (#5774)
9fd5145 is described below
commit 9fd5145b66646f3df847ea3c81bb272621ee86ca
Author: Hua Jiang <[email protected]>
AuthorDate: Fri Jul 9 17:00:32 2021 +0800
[Improvement-5773][server] need to support two parameters related to task
(#5774)
* add some new parameter for task
* restore official properties
* improve imports
* modify a variable's name
Co-authored-by: jiang hua <[email protected]>
---
.../apache/dolphinscheduler/common/Constants.java | 10 +++
.../dolphinscheduler/common/utils/JSONUtils.java | 2 +-
.../dolphinscheduler/common/utils/LoggerUtils.java | 1 +
.../dolphinscheduler/server/utils/ParamUtils.java | 67 +++++++++++++++++
.../server/worker/task/ShellCommandExecutor.java | 10 +--
.../server/worker/task/shell/ShellTask.java | 15 ++--
.../server/utils/ParamUtilsTest.java | 87 +++++++++++++++++++++-
.../worker/shell/ShellCommandExecutorTest.java | 47 ++++++------
8 files changed, 198 insertions(+), 41 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
index 39f8898..225dc23 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/Constants.java
@@ -661,6 +661,16 @@ public final class Constants {
public static final String PARAMETER_BUSINESS_DATE = "system.biz.date";
/**
+ * the absolute path of current executing task
+ */
+ public static final String PARAMETER_TASK_EXECUTE_PATH =
"system.task.execute.path";
+
+ /**
+ * the instance id of current task
+ */
+ public static final String PARAMETER_TASK_INSTANCE_ID =
"system.task.instance.id";
+
+ /**
* ACCEPTED
*/
public static final String ACCEPTED = "ACCEPTED";
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
index 9e929e3..2a1e2ac 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/JSONUtils.java
@@ -31,12 +31,12 @@ import java.util.List;
import java.util.Map;
import java.util.TimeZone;
-import com.fasterxml.jackson.core.JsonProcessingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
index 904ca97..545bbb2 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/LoggerUtils.java
@@ -144,4 +144,5 @@ public class LoggerUtils {
, String info) {
optionalLogger.ifPresent((Logger logger) -> logger.info(info));
}
+
}
\ No newline at end of file
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
index a49d915..cbf663f 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/utils/ParamUtils.java
@@ -17,13 +17,19 @@
package org.apache.dolphinscheduler.server.utils;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.process.Property;
+import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
+import org.apache.dolphinscheduler.common.utils.Preconditions;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
+
+import org.apache.logging.log4j.util.Strings;
import java.util.Date;
import java.util.HashMap;
@@ -96,6 +102,67 @@ public class ParamUtils {
}
/**
+ * parameter conversion
+ * @param taskExecutionContext the context of this task instance
+ * @param parameters the parameters
+ * @return global params
+ */
+ public static Map<String,Property> convert(TaskExecutionContext
taskExecutionContext, AbstractParameters parameters) {
+ Preconditions.checkNotNull(taskExecutionContext);
+ Preconditions.checkNotNull(parameters);
+ Map<String,Property> globalParams =
getUserDefParamsMap(taskExecutionContext.getDefinedParams());
+ Map<String,String> globalParamsMap =
taskExecutionContext.getDefinedParams();
+ CommandType commandType =
CommandType.of(taskExecutionContext.getCmdTypeIfComplement());
+ Date scheduleTime = taskExecutionContext.getScheduleTime();
+
+ Map<String,Property> localParams = parameters.getLocalParametersMap();
+
+ if (globalParams == null && localParams == null) {
+ return null;
+ }
+ // if it is a complement,
+ // you need to pass in the task instance id to locate the time
+ // of the process instance complement
+ Map<String,String> params = BusinessTimeUtils
+ .getBusinessTime(commandType,
+ scheduleTime);
+
+ if (globalParamsMap != null) {
+ params.putAll(globalParamsMap);
+ }
+
+ if (Strings.isNotBlank(taskExecutionContext.getExecutePath())) {
+
params.put(Constants.PARAMETER_TASK_EXECUTE_PATH,taskExecutionContext.getExecutePath());
+ }
+
params.put(Constants.PARAMETER_TASK_INSTANCE_ID,Integer.toString(taskExecutionContext.getTaskInstanceId()));
+
+ if (globalParams != null && localParams != null) {
+ globalParams.putAll(localParams);
+ } else if (globalParams == null && localParams != null) {
+ globalParams = localParams;
+ }
+ Iterator<Map.Entry<String, Property>> iter =
globalParams.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, Property> en = iter.next();
+ Property property = en.getValue();
+
+ if (StringUtils.isNotEmpty(property.getValue())
+ && property.getValue().startsWith("$")) {
+ /**
+ * local parameter refers to global parameter with the same
name
+ * note: the global parameters of the process instance here
are solidified parameters,
+ * and there are no variables in them.
+ */
+ String val = property.getValue();
+ val = ParameterUtils.convertParameterPlaceholders(val,
params);
+ property.setValue(val);
+ }
+ }
+
+ return globalParams;
+ }
+
+ /**
* format convert
* @param paramsMap params map
* @return Map of converted
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
index 8f3da45..b547ef9 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
@@ -14,12 +14,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.dolphinscheduler.server.worker.task;
-import org.apache.commons.io.FileUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
-import org.slf4j.Logger;
+
+import org.apache.commons.io.FileUtils;
import java.io.File;
import java.io.IOException;
@@ -29,6 +30,8 @@ import java.nio.file.Paths;
import java.util.List;
import java.util.function.Consumer;
+import org.slf4j.Logger;
+
/**
* shell command executor
*/
@@ -78,7 +81,6 @@ public class ShellCommandExecutor extends
AbstractCommandExecutor {
return OSUtils.isWindows() ? CMD : SH;
}
-
/**
* create command file if not exists
* @param execCommand exec command
@@ -117,6 +119,4 @@ public class ShellCommandExecutor extends
AbstractCommandExecutor {
FileUtils.writeStringToFile(new File(commandFile), sb.toString(),
StandardCharsets.UTF_8);
}
}
-
-
}
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
index e193571..f7887df 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/shell/ShellTask.java
@@ -35,8 +35,6 @@ import
org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.apache.dolphinscheduler.server.worker.task.CommandExecuteResult;
import org.apache.dolphinscheduler.server.worker.task.ShellCommandExecutor;
-import org.slf4j.Logger;
-
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -51,6 +49,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.slf4j.Logger;
+
/**
* shell task
*/
@@ -101,7 +101,8 @@ public class ShellTask extends AbstractTask {
public void handle() throws Exception {
try {
// construct process
- CommandExecuteResult commandExecuteResult =
shellCommandExecutor.run(buildCommand());
+ String command = buildCommand();
+ CommandExecuteResult commandExecuteResult =
shellCommandExecutor.run(command);
setExitStatusCode(commandExecuteResult.getExitStatusCode());
setAppIds(commandExecuteResult.getAppIds());
setProcessId(commandExecuteResult.getProcessId());
@@ -165,12 +166,8 @@ public class ShellTask extends AbstractTask {
private String parseScript(String script) {
// combining local and global parameters
- Map<String, Property> paramsMap =
ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
- taskExecutionContext.getDefinedParams(),
- shellParameters.getLocalParametersMap(),
- shellParameters.getVarPoolMap(),
- CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
- taskExecutionContext.getScheduleTime());
+ Map<String, Property> paramsMap =
ParamUtils.convert(taskExecutionContext,shellParameters);
+
// replace variable TIME with $[YYYYmmddd...] in shell file when
history run job and batch complement job
if (taskExecutionContext.getScheduleTime() != null) {
if (paramsMap == null) {
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
index a9a1b89..99a6eb2 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/utils/ParamUtilsTest.java
@@ -20,22 +20,30 @@ package org.apache.dolphinscheduler.server.utils;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.Direct;
+import org.apache.dolphinscheduler.common.enums.TaskType;
import org.apache.dolphinscheduler.common.process.Property;
+import org.apache.dolphinscheduler.common.task.shell.ShellParameters;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.databind.SerializationFeature;
+
/**
* Test ParamUtils
*/
@@ -82,7 +90,6 @@ public class ParamUtilsTest {
varProperty.setType(DataType.VARCHAR);
varProperty.setValue("${global_param}");
varPoolParams.put("varPool", varProperty);
-
}
/**
@@ -90,7 +97,6 @@ public class ParamUtilsTest {
*/
@Test
public void testConvert() {
-
//The expected value
String expected =
"{\"varPool\":{\"prop\":\"local_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
+
"\"global_param\":{\"prop\":\"global_param\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"20191229\"},"
@@ -127,6 +133,83 @@ public class ParamUtilsTest {
}
/**
+ * Test some new params related to task
+ */
+ @Test
+ public void testConvertForParamsRelatedTask() throws Exception {
+ // start to form some test data for new paramters
+ Map<String,Property> globalParams = new HashMap<>();
+ Map<String,String> globalParamsMap = new HashMap<>();
+
+ Property taskInstanceIdProperty = new Property();
+ String propName = "task_execution_id";
+ String paramValue = String.format("${%s}",
Constants.PARAMETER_TASK_INSTANCE_ID);
+ taskInstanceIdProperty.setProp(propName);
+ taskInstanceIdProperty.setDirect(Direct.IN);
+ taskInstanceIdProperty.setType(DataType.VARCHAR);
+ taskInstanceIdProperty.setValue(paramValue);
+ globalParams.put(propName,taskInstanceIdProperty);
+ globalParamsMap.put(propName,paramValue);
+
+ Property taskExecutionPathProperty = new Property();
+ propName = "task_execution_path";
+ paramValue = String.format("${%s}",
Constants.PARAMETER_TASK_EXECUTE_PATH);
+ taskExecutionPathProperty.setProp(propName);
+ taskExecutionPathProperty.setDirect(Direct.IN);
+ taskExecutionPathProperty.setType(DataType.VARCHAR);
+ taskExecutionPathProperty.setValue(paramValue);
+
+ globalParams.put(propName,taskExecutionPathProperty);
+ globalParamsMap.put(propName,paramValue);
+
+ Calendar calendar = Calendar.getInstance();
+ calendar.set(2019,11,30);
+ Date date = calendar.getTime();
+
+ List<Property> globalParamList =
globalParams.values().stream().collect(Collectors.toList());
+
+ TaskExecutionContext taskExecutionContext = new TaskExecutionContext();
+ taskExecutionContext.setTaskInstanceId(1);
+ taskExecutionContext.setTaskName("params test");
+ taskExecutionContext.setTaskType(TaskType.SHELL.getDesc());
+ taskExecutionContext.setHost("127.0.0.1:1234");
+ taskExecutionContext.setExecutePath("/tmp/test");
+ taskExecutionContext.setLogPath("/log");
+ taskExecutionContext.setProcessInstanceId(1);
+ taskExecutionContext.setExecutorId(1);
+ taskExecutionContext.setCmdTypeIfComplement(0);
+ taskExecutionContext.setScheduleTime(date);
+
taskExecutionContext.setGlobalParams(JSONUtils.toJsonString(globalParamList));
+ taskExecutionContext.setDefinedParams(globalParamsMap);
+ taskExecutionContext.setTaskParams(
+ "{\"rawScript\":\"#!/bin/sh\\necho $[yyyy-MM-dd
HH:mm:ss]\\necho \\\" ${task_execution_id} \\\"\\necho \\\"
${task_execution_path}\\\"\\n\","
+ + "\"localParams\":"
+ +
"[{\"prop\":\"task_execution_id\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"${system.task.instance.id}\"},"
+ +
"{\"prop\":\"task_execution_path\",\"direct\":\"IN\",\"type\":\"VARCHAR"
+ +
"\",\"value\":\"${system.task.execute.path}\"}],\"resourceList\":[]}");
+
+ ShellParameters shellParameters =
JSONUtils.parseObject(taskExecutionContext.getTaskParams(),
ShellParameters.class);
+
+ //The expected value
+ String expected =
"{\"task_execution_id\":{\"prop\":\"task_execution_id\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"1\"},"
+ +
"\"task_execution_path\":{\"prop\":\"task_execution_path\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"/tmp/test\"}}";
+
+ //The expected value when globalParams is null but localParams is not
null
+ Map<String, Property> paramsMap =
ParamUtils.convert(taskExecutionContext, shellParameters);
+
+ String result = JSONUtils.toJsonString(paramsMap);
+
+ Map<String,String> resultMap = JSONUtils.parseObject(result,Map.class);
+ Map<String,String> expectedMap =
JSONUtils.parseObject(expected,Map.class);
+
+ result =
JSONUtils.toJsonString(resultMap,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
+ expected =
JSONUtils.toJsonString(expectedMap,SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS);
+
+ assertEquals(expected, result);
+
+ }
+
+ /**
* Test the overload method of convert
*/
@Test
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
index e224fa8..b4dfb0e 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/shell/ShellCommandExecutorTest.java
@@ -14,17 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dolphinscheduler.server.worker.shell;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.lang.reflect.Method;
+package org.apache.dolphinscheduler.server.worker.shell;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.OSUtils;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.worker.task.AbstractCommandExecutor;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
@@ -32,6 +27,13 @@ import
org.apache.dolphinscheduler.server.worker.task.TaskProps;
import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.dolphinscheduler.service.process.ProcessService;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.lang.reflect.Method;
+import java.util.Date;
+import java.util.List;
+
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
@@ -45,9 +47,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.ApplicationContext;
-import java.util.Date;
-import java.util.List;
-
/**
* python shell command executor test
*/
@@ -84,21 +83,21 @@ public class ShellCommandExecutorTest {
taskProps.setTaskTimeout(360000);
taskProps.setTaskInstanceId(7657);
-
- TaskInstance taskInstance = processService.findTaskInstanceById(7657);
-
-// TaskNode taskNode = JSON.parseObject(taskJson, TaskNode.class);
-// taskProps.setTaskParams(taskNode.getParams());
-
-
- // custom logger
-// Logger taskLogger =
LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
-// taskInstance.getProcessDefinitionId(),
-// taskInstance.getProcessInstanceId(),
-// taskInstance.getId()));
-
-
-// AbstractTask task = TaskManager.newTask(taskInstance.getTaskType(),
taskProps, taskLogger);
+ // TaskInstance taskInstance =
processService.findTaskInstanceById(7657);
+ //
+ // TaskNode taskNode = JSON.parseObject(taskJson,
TaskNode.class);
+ // taskProps.setTaskParams(taskNode.getParams());
+ //
+ //
+ // // custom logger
+ // Logger taskLogger =
LoggerFactory.getLogger(LoggerUtils.buildTaskId(LoggerUtils.TASK_LOGGER_INFO_PREFIX,
+ // taskInstance.getProcessDefine().getCode(),
+ // taskInstance.getProcessDefine().getVersion(),
+ // taskInstance.getProcessInstanceId(),
+ // taskInstance.getId()));
+ //
+ //
+ // AbstractTask task =
TaskManager.newTask(taskInstance.getTaskType(), taskProps, taskLogger);
AbstractTask task = null;