This is an automated email from the ASF dual-hosted git repository.
kerwin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 76b1eefb68 [Feature-14321][API] Support to complement data in
descending or ascending order of date (#14450)
76b1eefb68 is described below
commit 76b1eefb68a7a6333c6b93987e5d3e842eabb2db
Author: calvin <[email protected]>
AuthorDate: Wed Jul 5 14:35:18 2023 +0800
[Feature-14321][API] Support to complement data in descending or ascending
order of date (#14450)
---
.../api/controller/ExecutorController.java | 19 +-
.../apache/dolphinscheduler/api/enums/Status.java | 3 +
.../dolphinscheduler/api/python/PythonGateway.java | 5 +-
.../api/service/ExecutorService.java | 4 +-
.../api/service/impl/ExecutorServiceImpl.java | 259 ++++++++++-----------
.../controller/ExecuteFunctionControllerTest.java | 16 +-
.../api/service/ExecuteFunctionServiceTest.java | 47 ++--
.../common/enums/ExecutionOrder.java | 50 ++++
dolphinscheduler-ui/src/locales/en_US/project.ts | 3 +
dolphinscheduler-ui/src/locales/zh_CN/project.ts | 3 +
.../workflow/definition/components/start-modal.tsx | 15 ++
.../workflow/definition/components/use-form.ts | 3 +-
12 files changed, 256 insertions(+), 171 deletions(-)
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
index 0cd3a59346..5151665b34 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ExecutorController.java
@@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
+import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.RunMode;
@@ -101,6 +102,7 @@ public class ExecutorController extends BaseController {
* @param timeout timeout
* @param expectedParallelismNumber the expected parallelism number when
execute complement in parallel mode
* @param testFlag testFlag
+ * @param executionOrder complement data in some kind of order
* @return start process result code
*/
@Operation(summary = "startProcessInstance", description =
"RUN_PROCESS_INSTANCE_NOTES")
@@ -123,7 +125,8 @@ public class ExecutorController extends BaseController {
@Parameter(name = "dryRun", description = "DRY_RUN", schema =
@Schema(implementation = int.class, example = "0")),
@Parameter(name = "testFlag", description = "TEST_FLAG", schema =
@Schema(implementation = int.class, example = "0")),
@Parameter(name = "complementDependentMode", description =
"COMPLEMENT_DEPENDENT_MODE", schema = @Schema(implementation =
ComplementDependentMode.class)),
- @Parameter(name = "allLevelDependent", description =
"ALL_LEVEL_DEPENDENT", schema = @Schema(implementation = boolean.class, example
= "false"))
+ @Parameter(name = "allLevelDependent", description =
"ALL_LEVEL_DEPENDENT", schema = @Schema(implementation = boolean.class, example
= "false")),
+ @Parameter(name = "executionOrder", description =
"EXECUTION_ORDER", schema = @Schema(implementation = ExecutionOrder.class))
})
@PostMapping(value = "start-process-instance")
@ResponseStatus(HttpStatus.OK)
@@ -151,7 +154,8 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "testFlag",
defaultValue = "0") int testFlag,
@RequestParam(value =
"complementDependentMode", required = false) ComplementDependentMode
complementDependentMode,
@RequestParam(value = "version",
required = false) Integer version,
- @RequestParam(value =
"allLevelDependent", required = false, defaultValue = "false") boolean
allLevelDependent) {
+ @RequestParam(value =
"allLevelDependent", required = false, defaultValue = "false") boolean
allLevelDependent,
+ @RequestParam(value = "executionOrder",
required = false) ExecutionOrder executionOrder) {
if (timeout == null) {
timeout = Constants.MAX_TASK_TIMEOUT;
@@ -170,7 +174,7 @@ public class ExecutorController extends BaseController {
startNodeList, taskDependType, warningType, warningGroupId,
runMode, processInstancePriority,
workerGroup, tenantCode, environmentCode, timeout,
startParamMap, expectedParallelismNumber, dryRun,
testFlag,
- complementDependentMode, version, allLevelDependent);
+ complementDependentMode, version, allLevelDependent,
executionOrder);
return returnDataList(result);
}
@@ -196,6 +200,7 @@ public class ExecutorController extends BaseController {
* @param timeout timeout
* @param expectedParallelismNumber the expected parallelism number when
execute complement in parallel mode
* @param testFlag testFlag
+ * @param executionOrder complement data in some kind of order
* @return start process result code
*/
@Operation(summary = "batchStartProcessInstance", description =
"BATCH_RUN_PROCESS_INSTANCE_NOTES")
@@ -218,7 +223,8 @@ public class ExecutorController extends BaseController {
@Parameter(name = "dryRun", description = "DRY_RUN", schema =
@Schema(implementation = int.class, example = "0")),
@Parameter(name = "testFlag", description = "TEST_FLAG", schema =
@Schema(implementation = int.class, example = "0")),
@Parameter(name = "complementDependentMode", description =
"COMPLEMENT_DEPENDENT_MODE", schema = @Schema(implementation =
ComplementDependentMode.class)),
- @Parameter(name = "allLevelDependent", description =
"ALL_LEVEL_DEPENDENT", schema = @Schema(implementation = boolean.class, example
= "false"))
+ @Parameter(name = "allLevelDependent", description =
"ALL_LEVEL_DEPENDENT", schema = @Schema(implementation = boolean.class, example
= "false")),
+ @Parameter(name = "executionOrder", description =
"EXECUTION_ORDER", schema = @Schema(implementation = ExecutionOrder.class))
})
@PostMapping(value = "batch-start-process-instance")
@ResponseStatus(HttpStatus.OK)
@@ -245,7 +251,8 @@ public class ExecutorController extends BaseController {
@RequestParam(value = "dryRun",
defaultValue = "0", required = false) int dryRun,
@RequestParam(value = "testFlag",
defaultValue = "0") int testFlag,
@RequestParam(value =
"complementDependentMode", required = false) ComplementDependentMode
complementDependentMode,
- @RequestParam(value =
"allLevelDependent", required = false, defaultValue = "false") boolean
allLevelDependent) {
+ @RequestParam(value =
"allLevelDependent", required = false, defaultValue = "false") boolean
allLevelDependent,
+ @RequestParam(value =
"executionOrder", required = false) ExecutionOrder executionOrder) {
if (timeout == null) {
log.debug("Parameter timeout set to {} due to null.",
Constants.MAX_TASK_TIMEOUT);
@@ -275,7 +282,7 @@ public class ExecutorController extends BaseController {
startNodeList, taskDependType, warningType,
warningGroupId, runMode, processInstancePriority,
workerGroup, tenantCode, environmentCode, timeout,
startParamMap, expectedParallelismNumber, dryRun,
testFlag,
- complementDependentMode, null, allLevelDependent);
+ complementDependentMode, null, allLevelDependent,
executionOrder);
if (!Status.SUCCESS.equals(result.get(Constants.STATUS))) {
log.error("Process definition start failed, projectCode:{},
processDefinitionCode:{}.", projectCode,
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
index eaa6300ac2..7cc5c22c93 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/enums/Status.java
@@ -413,6 +413,9 @@ public enum Status {
WORKFLOW_INSTANCE_IS_NOT_FINISHED(50071, "the workflow instance is not
finished, can not do this operation",
"工作流实例未结束,不能执行此操作"),
+ TASK_PARALLELISM_PARAMS_ERROR(50080, "task parallelism parameter is not
valid", "任务并行度参数无效"),
+ TASK_COMPLEMENT_DATA_DATE_ERROR(50081, "The range of date for
complementing date is not valid", "补数选择的日期范围无效"),
+
HDFS_NOT_STARTUP(60001, "hdfs not startup", "hdfs未启用"),
STORAGE_NOT_STARTUP(60002, "storage not startup", "存储未启用"),
S3_CANNOT_RENAME(60003, "directory cannot be renamed", "S3无法重命名文件夹"),
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
index a40a89db30..fe57f12d16 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/python/PythonGateway.java
@@ -34,6 +34,7 @@ import org.apache.dolphinscheduler.api.service.UsersService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
+import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ProcessExecutionTypeEnum;
@@ -94,6 +95,7 @@ public class PythonGateway {
private static final TaskDependType DEFAULT_TASK_DEPEND_TYPE =
TaskDependType.TASK_POST;
private static final RunMode DEFAULT_RUN_MODE = RunMode.RUN_MODE_SERIAL;
+ private static final ExecutionOrder DEFAULT_EXECUTION_ORDER =
ExecutionOrder.DESC_ORDER;
private static final int DEFAULT_DRY_RUN = 0;
private static final int DEFAULT_TEST_FLAG = 0;
private static final ComplementDependentMode COMPLEMENT_DEPENDENT_MODE =
ComplementDependentMode.OFF_MODE;
@@ -405,7 +407,8 @@ public class PythonGateway {
DEFAULT_TEST_FLAG,
COMPLEMENT_DEPENDENT_MODE,
processDefinition.getVersion(),
- false);
+ false,
+ DEFAULT_EXECUTION_ORDER);
}
// side object
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
index 9e8ee66de0..c67f790c34 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
@@ -21,6 +21,7 @@ import
org.apache.dolphinscheduler.api.dto.workflowInstance.WorkflowExecuteRespo
import org.apache.dolphinscheduler.api.enums.ExecuteType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
+import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.RunMode;
@@ -58,6 +59,7 @@ public interface ExecutorService {
* @param timeout timeout
* @param startParams the global param values which pass to new process
instance
* @param expectedParallelismNumber the expected parallelism number when
execute complement in parallel mode
+ * @param executionOrder the execution order when complementing data
* @return execute process instance code
*/
Map<String, Object> execProcessInstance(User loginUser, long projectCode,
@@ -72,7 +74,7 @@ public interface ExecutorService {
Map<String, String> startParams,
Integer expectedParallelismNumber,
int dryRun, int testFlag,
ComplementDependentMode
complementDependentMode, Integer version,
- boolean allLevelDependent);
+ boolean allLevelDependent,
ExecutionOrder executionOrder);
/**
* check whether the process definition can be executed
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index a707beab0f..ba996a46eb 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -48,6 +48,7 @@ import
org.apache.dolphinscheduler.common.enums.ApiTriggerType;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
import org.apache.dolphinscheduler.common.enums.CycleEnum;
+import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.Priority;
@@ -104,10 +105,12 @@ import org.apache.commons.lang3.StringUtils;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
@@ -118,7 +121,7 @@ import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
-import com.google.common.collect.Lists;
+import com.google.common.base.Splitter;
/**
* executor service impl
@@ -207,6 +210,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
* @param startParams the global param values which pass to
new process instance
* @param expectedParallelismNumber the expected parallelism number when
execute complement in parallel mode
* @param testFlag testFlag
+ * @param executionOrder the execution order when complementing data
* @return execute process instance code
*/
@Override
@@ -222,7 +226,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
Map<String, String>
startParams, Integer expectedParallelismNumber,
int dryRun, int testFlag,
ComplementDependentMode
complementDependentMode, Integer version,
- boolean allLevelDependent) {
+ boolean allLevelDependent,
ExecutionOrder executionOrder) {
Project project = projectMapper.queryByCode(projectCode);
// check user access for project
Map<String, Object> result =
@@ -236,6 +240,14 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
putMsg(result, Status.TASK_TIMEOUT_PARAMS_ERROR);
return result;
}
+
+ if (Objects.nonNull(expectedParallelismNumber) &&
expectedParallelismNumber <= 0) {
+ log.warn("Parameter expectedParallelismNumber is invalid,
expectedParallelismNumber:{}.",
+ expectedParallelismNumber);
+ putMsg(result, Status.TASK_PARALLELISM_PARAMS_ERROR);
+ return result;
+ }
+
checkValidTenant(tenantCode);
ProcessDefinition processDefinition;
if (null != version) {
@@ -264,7 +276,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
cronTime, warningType, loginUser.getId(),
warningGroupId, runMode, processInstancePriority,
workerGroup, tenantCode,
environmentCode, startParams,
expectedParallelismNumber, dryRun, testFlag,
- complementDependentMode, allLevelDependent);
+ complementDependentMode, allLevelDependent,
executionOrder);
if (create > 0) {
processDefinition.setWarningGroupId(warningGroupId);
@@ -738,6 +750,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
* @param testFlag testFlag
* @param environmentCode environmentCode
* @param allLevelDependent allLevelDependent
+ * @param executionOrder executionOrder
* @return command id
*/
private int createCommand(Long triggerCode, CommandType commandType, long
processDefineCode, TaskDependType nodeDep,
@@ -747,7 +760,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
Long environmentCode,
Map<String, String> startParams, Integer
expectedParallelismNumber, int dryRun,
int testFlag, ComplementDependentMode
complementDependentMode,
- boolean allLevelDependent) {
+ boolean allLevelDependent, ExecutionOrder
executionOrder) {
/**
* instantiate command schedule instance
@@ -806,7 +819,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
log.info("Start to create {} command,
processDefinitionCode:{}.",
command.getCommandType().getDescp(),
processDefineCode);
return createComplementCommandList(triggerCode, schedule,
runMode, command, expectedParallelismNumber,
- complementDependentMode, allLevelDependent);
+ complementDependentMode, allLevelDependent,
executionOrder);
} catch (CronParseException cronParseException) {
// We catch the exception here just to make compiler happy,
since we have already validated the schedule
// cron expression before
@@ -822,174 +835,138 @@ public class ExecutorServiceImpl extends
BaseServiceImpl implements ExecutorServ
}
}
+ private int createComplementCommand(Long triggerCode, Command command,
Map<String, String> cmdParam,
+ List<ZonedDateTime> dateTimeList,
List<Schedule> schedules,
+ ComplementDependentMode
complementDependentMode, boolean allLevelDependent) {
+
+ String dateTimeListStr = dateTimeList.stream()
+ .map(item -> DateUtils.dateToString(item))
+ .collect(Collectors.joining(COMMA));
+
+ cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST,
dateTimeListStr);
+ command.setCommandParam(JSONUtils.toJsonString(cmdParam));
+
+ log.info("Creating command, commandInfo:{}.", command);
+ int createCount = commandService.createCommand(command);
+
+ if (createCount > 0) {
+ log.info("Create {} command complete, processDefinitionCode:{}",
+ command.getCommandType().getDescp(),
command.getProcessDefinitionCode());
+ } else {
+ log.error("Create {} command error, processDefinitionCode:{}",
+ command.getCommandType().getDescp(),
command.getProcessDefinitionCode());
+ }
+
+ if (schedules.isEmpty() || complementDependentMode ==
ComplementDependentMode.OFF_MODE) {
+ log.info(
+ "Complement dependent mode is off mode or Scheduler is
empty, so skip create complement dependent command, processDefinitionCode:{}.",
+ command.getProcessDefinitionCode());
+ } else {
+ log.info(
+ "Complement dependent mode is all dependent and Scheduler
is not empty, need create complement dependent command,
processDefinitionCode:{}.",
+ command.getProcessDefinitionCode());
+ createComplementDependentCommand(schedules, command,
allLevelDependent);
+ }
+
+ if (createCount > 0) {
+ triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND,
triggerCode, command.getId());
+ }
+ return createCount;
+ }
+
/**
* create complement command
* close left and close right
*
* @param scheduleTimeParam
* @param runMode
+ * @param executionOrder
* @return
*/
protected int createComplementCommandList(Long triggerCode, String
scheduleTimeParam, RunMode runMode,
Command command,
Integer
expectedParallelismNumber,
ComplementDependentMode
complementDependentMode,
- boolean allLevelDependent)
throws CronParseException {
+ boolean allLevelDependent,
+ ExecutionOrder executionOrder)
throws CronParseException {
int createCount = 0;
- String startDate = null;
- String endDate = null;
- String dateList = null;
int dependentProcessDefinitionCreateCount = 0;
runMode = (runMode == null) ? RunMode.RUN_MODE_SERIAL : runMode;
Map<String, String> cmdParam =
JSONUtils.toMap(command.getCommandParam());
Map<String, String> scheduleParam = JSONUtils.toMap(scheduleTimeParam);
- if
(scheduleParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
- dateList =
scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
- dateList = removeDuplicates(dateList);
+
+ if (Objects.isNull(executionOrder)) {
+ executionOrder = ExecutionOrder.DESC_ORDER;
}
+
+ List<Schedule> schedules =
processService.queryReleaseSchedulerListByProcessDefinitionCode(
+ command.getProcessDefinitionCode());
+
+ List<ZonedDateTime> listDate = new ArrayList<>();
if (scheduleParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_START_DATE) &&
scheduleParam.containsKey(
CMD_PARAM_COMPLEMENT_DATA_END_DATE)) {
- startDate =
scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_START_DATE);
- endDate = scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_END_DATE);
+ String startDate =
scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_START_DATE);
+ String endDate =
scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_END_DATE);
+ if (startDate != null && endDate != null) {
+ listDate = CronUtils.getSelfFireDateList(
+ DateUtils.stringToZoneDateTime(startDate),
+ DateUtils.stringToZoneDateTime(endDate),
+ schedules);
+ }
}
+
+ if
(scheduleParam.containsKey(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST)) {
+ String dateList =
scheduleParam.get(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST);
+
+ if (StringUtils.isNotBlank(dateList)) {
+ listDate = Splitter.on(COMMA).splitToStream(dateList)
+ .map(item ->
DateUtils.stringToZoneDateTime(item.trim()))
+ .distinct()
+ .collect(Collectors.toList());
+ }
+ }
+
+ if (CollectionUtils.isEmpty(listDate)) {
+ throw new ServiceException(Status.TASK_COMPLEMENT_DATA_DATE_ERROR);
+ }
+
+ if (executionOrder.equals(ExecutionOrder.DESC_ORDER)) {
+ Collections.sort(listDate, Collections.reverseOrder());
+ } else {
+ Collections.sort(listDate);
+ }
+
switch (runMode) {
case RUN_MODE_SERIAL: {
log.info("RunMode of {} command is serial run,
processDefinitionCode:{}.",
command.getCommandType().getDescp(),
command.getProcessDefinitionCode());
- if (StringUtils.isNotEmpty(dateList)) {
- cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST,
dateList);
- command.setCommandParam(JSONUtils.toJsonString(cmdParam));
- log.info("Creating command, commandInfo:{}.", command);
- createCount = commandService.createCommand(command);
- if (createCount > 0) {
- log.info("Create {} command complete,
processDefinitionCode:{}",
- command.getCommandType().getDescp(),
command.getProcessDefinitionCode());
- } else {
- log.error("Create {} command error,
processDefinitionCode:{}",
- command.getCommandType().getDescp(),
command.getProcessDefinitionCode());
- }
- }
- if (startDate != null && endDate != null) {
- cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_START_DATE,
startDate);
- cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_END_DATE, endDate);
- command.setCommandParam(JSONUtils.toJsonString(cmdParam));
- log.info("Creating command, commandInfo:{}.", command);
- createCount = commandService.createCommand(command);
- if (createCount > 0) {
- log.info("Create {} command complete,
processDefinitionCode:{}",
- command.getCommandType().getDescp(),
command.getProcessDefinitionCode());
- } else {
- log.error("Create {} command error,
processDefinitionCode:{}",
- command.getCommandType().getDescp(),
command.getProcessDefinitionCode());
- }
- // dependent process definition
- List<Schedule> schedules =
processService.queryReleaseSchedulerListByProcessDefinitionCode(
- command.getProcessDefinitionCode());
-
- if (schedules.isEmpty() || complementDependentMode ==
ComplementDependentMode.OFF_MODE) {
- log.info(
- "Complement dependent mode is off mode or
Scheduler is empty, so skip create complement dependent command,
processDefinitionCode:{}.",
- command.getProcessDefinitionCode());
- } else {
- log.info(
- "Complement dependent mode is all dependent
and Scheduler is not empty, need create complement dependent command,
processDefinitionCode:{}.",
- command.getProcessDefinitionCode());
- dependentProcessDefinitionCreateCount +=
- createComplementDependentCommand(schedules,
command, allLevelDependent);
- }
- }
- if (createCount > 0) {
-
triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, triggerCode,
command.getId());
- }
+ createCount = createComplementCommand(triggerCode, command,
cmdParam, listDate, schedules,
+ complementDependentMode, allLevelDependent);
break;
}
case RUN_MODE_PARALLEL: {
log.info("RunMode of {} command is parallel run,
processDefinitionCode:{}.",
command.getCommandType().getDescp(),
command.getProcessDefinitionCode());
- if (startDate != null && endDate != null) {
- List<Schedule> schedules =
processService.queryReleaseSchedulerListByProcessDefinitionCode(
- command.getProcessDefinitionCode());
- List<ZonedDateTime> listDate =
CronUtils.getSelfFireDateList(
- DateUtils.stringToZoneDateTime(startDate),
- DateUtils.stringToZoneDateTime(endDate),
- schedules);
- int listDateSize = listDate.size();
- createCount = listDate.size();
- if (!CollectionUtils.isEmpty(listDate)) {
- if (expectedParallelismNumber != null &&
expectedParallelismNumber != 0) {
- createCount = Math.min(createCount,
expectedParallelismNumber);
- }
- log.info("Complement command run in parallel mode,
current expectedParallelismNumber:{}.",
- createCount);
-
- // Distribute the number of tasks equally to each
command.
- // The last command with insufficient quantity will be
assigned to the remaining tasks.
- int itemsPerCommand = (listDateSize / createCount);
- int remainingItems = (listDateSize % createCount);
- int startDateIndex = 0;
- int endDateIndex = 0;
-
- for (int i = 1; i <= createCount; i++) {
- int extra = (i <= remainingItems) ? 1 : 0;
- int singleCommandItems = (itemsPerCommand + extra);
-
- if (i == 1) {
- endDateIndex += singleCommandItems - 1;
- } else {
- startDateIndex = endDateIndex + 1;
- endDateIndex += singleCommandItems;
- }
-
- cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_START_DATE,
-
DateUtils.dateToString(listDate.get(startDateIndex)));
- cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_END_DATE,
-
DateUtils.dateToString(listDate.get(endDateIndex)));
-
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
- log.info("Creating command, commandInfo:{}.",
command);
- if (commandService.createCommand(command) > 0) {
- log.info("Create {} command complete,
processDefinitionCode:{}",
- command.getCommandType().getDescp(),
command.getProcessDefinitionCode());
-
triggerRelationService.saveTriggerToDb(ApiTriggerType.COMMAND, triggerCode,
- command.getId());
- } else {
- log.error("Create {} command error,
processDefinitionCode:{}",
- command.getCommandType().getDescp(),
command.getProcessDefinitionCode());
- }
- if (schedules.isEmpty() || complementDependentMode
== ComplementDependentMode.OFF_MODE) {
- log.info(
- "Complement dependent mode is off mode
or Scheduler is empty, so skip create complement dependent command,
processDefinitionCode:{}.",
- command.getProcessDefinitionCode());
- } else {
- log.info(
- "Complement dependent mode is all
dependent and Scheduler is not empty, need create complement dependent command,
processDefinitionCode:{}.",
- command.getProcessDefinitionCode());
- dependentProcessDefinitionCreateCount +=
-
createComplementDependentCommand(schedules, command, allLevelDependent);
- }
- }
+
+ int queueNum = 0;
+ if (CollectionUtils.isNotEmpty(listDate)) {
+ queueNum = listDate.size();
+ if (expectedParallelismNumber != null &&
expectedParallelismNumber != 0) {
+ queueNum = Math.min(queueNum,
expectedParallelismNumber);
}
- }
- if (StringUtils.isNotEmpty(dateList)) {
- List<String> listDate =
Arrays.asList(dateList.split(COMMA));
- createCount = listDate.size();
- if (!CollectionUtils.isEmpty(listDate)) {
- if (expectedParallelismNumber != null &&
expectedParallelismNumber != 0) {
- createCount = Math.min(createCount,
expectedParallelismNumber);
- }
- log.info("Complement command run in parallel mode,
current expectedParallelismNumber:{}.",
- createCount);
- for (List<String> stringDate :
Lists.partition(listDate, createCount)) {
-
cmdParam.put(CMD_PARAM_COMPLEMENT_DATA_SCHEDULE_DATE_LIST, String.join(COMMA,
stringDate));
-
command.setCommandParam(JSONUtils.toJsonString(cmdParam));
- log.info("Creating command, commandInfo:{}.",
command);
- if (commandService.createCommand(command) > 0) {
- log.info("Create {} command complete,
processDefinitionCode:{}",
- command.getCommandType().getDescp(),
command.getProcessDefinitionCode());
- } else {
- log.error("Create {} command error,
processDefinitionCode:{}",
- command.getCommandType().getDescp(),
command.getProcessDefinitionCode());
- }
+ log.info("Complement command run in parallel mode, current
expectedParallelismNumber:{}.",
+ queueNum);
+ List[] queues = new List[queueNum];
+
+ for (int i = 0; i < listDate.size(); i++) {
+ if (Objects.isNull(queues[i % queueNum])) {
+ queues[i % queueNum] = new ArrayList();
}
+ queues[i % queueNum].add(listDate.get(i));
+ }
+ for (List queue : queues) {
+ createCount = createComplementCommand(triggerCode,
command, cmdParam, queue, schedules,
+ complementDependentMode, allLevelDependent);
}
}
break;
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecuteFunctionControllerTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecuteFunctionControllerTest.java
index 83f66dbeac..7cd97a5551 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecuteFunctionControllerTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ExecuteFunctionControllerTest.java
@@ -31,6 +31,7 @@ import
org.apache.dolphinscheduler.api.service.ExecutorService;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
+import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.RunMode;
@@ -68,6 +69,7 @@ public class ExecuteFunctionControllerTest extends
AbstractControllerTest {
final WarningType warningType = WarningType.NONE;
final int warningGroupId = 3;
final RunMode runMode = RunMode.RUN_MODE_SERIAL;
+ final ExecutionOrder executionOrder = ExecutionOrder.DESC_ORDER;
final Priority processInstancePriority = Priority.HIGH;
final String workerGroup = "workerGroup";
final String tenantCode = "root";
@@ -112,6 +114,7 @@ public class ExecuteFunctionControllerTest extends
AbstractControllerTest {
paramsMap.add("expectedParallelismNumber",
String.valueOf(expectedParallelismNumber));
paramsMap.add("dryRun", String.valueOf(dryRun));
paramsMap.add("testFlag", String.valueOf(testFlag));
+ paramsMap.add("executionOrder", String.valueOf(executionOrder));
when(executorService.execProcessInstance(any(User.class),
eq(projectCode), eq(processDefinitionCode),
eq(scheduleTime), eq(execType), eq(failureStrategy),
eq(startNodeList), eq(taskDependType),
@@ -120,7 +123,7 @@ public class ExecuteFunctionControllerTest extends
AbstractControllerTest {
eq(environmentCode),
eq(timeout), eq(startParams), eq(expectedParallelismNumber),
eq(dryRun), eq(testFlag),
eq(complementDependentMode), eq(version),
- eq(allLevelDependent)))
+ eq(allLevelDependent), eq(executionOrder)))
.thenReturn(executeServiceResult);
// When
@@ -158,6 +161,7 @@ public class ExecuteFunctionControllerTest extends
AbstractControllerTest {
paramsMap.add("expectedParallelismNumber",
String.valueOf(expectedParallelismNumber));
paramsMap.add("dryRun", String.valueOf(dryRun));
paramsMap.add("testFlag", String.valueOf(testFlag));
+ paramsMap.add("executionOrder", String.valueOf(executionOrder));
when(executorService.execProcessInstance(any(User.class),
eq(projectCode), eq(processDefinitionCode),
eq(scheduleTime), eq(execType), eq(failureStrategy),
eq(startNodeList), eq(taskDependType),
@@ -166,7 +170,8 @@ public class ExecuteFunctionControllerTest extends
AbstractControllerTest {
eq(environmentCode),
eq(Constants.MAX_TASK_TIMEOUT), eq(startParams),
eq(expectedParallelismNumber), eq(dryRun),
eq(testFlag),
- eq(complementDependentMode), eq(version),
eq(allLevelDependent))).thenReturn(executeServiceResult);
+ eq(complementDependentMode), eq(version),
eq(allLevelDependent), eq(executionOrder)))
+ .thenReturn(executeServiceResult);
// When
final MvcResult mvcResult = mockMvc
@@ -203,6 +208,7 @@ public class ExecuteFunctionControllerTest extends
AbstractControllerTest {
paramsMap.add("expectedParallelismNumber",
String.valueOf(expectedParallelismNumber));
paramsMap.add("dryRun", String.valueOf(dryRun));
paramsMap.add("testFlag", String.valueOf(testFlag));
+ paramsMap.add("executionOrder", String.valueOf(executionOrder));
when(executorService.execProcessInstance(any(User.class),
eq(projectCode), eq(processDefinitionCode),
eq(scheduleTime), eq(execType), eq(failureStrategy),
eq(startNodeList), eq(taskDependType),
@@ -210,7 +216,8 @@ public class ExecuteFunctionControllerTest extends
AbstractControllerTest {
eq(warningGroupId), eq(runMode), eq(processInstancePriority),
eq(workerGroup), eq(tenantCode),
eq(environmentCode),
eq(timeout), eq(null), eq(expectedParallelismNumber),
eq(dryRun), eq(testFlag),
- eq(complementDependentMode), eq(version),
eq(allLevelDependent))).thenReturn(executeServiceResult);
+ eq(complementDependentMode), eq(version),
eq(allLevelDependent), eq(executionOrder)))
+ .thenReturn(executeServiceResult);
// When
final MvcResult mvcResult = mockMvc
@@ -239,7 +246,8 @@ public class ExecuteFunctionControllerTest extends
AbstractControllerTest {
eq(scheduleTime), eq(null), eq(failureStrategy), eq(null),
eq(null), eq(warningType),
eq(null), eq(null), eq(null), eq("default"), eq("default"),
eq(-1L),
eq(Constants.MAX_TASK_TIMEOUT), eq(null), eq(null), eq(0),
eq(0),
- eq(complementDependentMode), eq(version),
eq(allLevelDependent))).thenReturn(executeServiceResult);
+ eq(complementDependentMode), eq(version),
eq(allLevelDependent), eq(null)))
+ .thenReturn(executeServiceResult);
// When
final MvcResult mvcResult = mockMvc
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
index eba1f34e17..ff2aad42cc 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecuteFunctionServiceTest.java
@@ -38,6 +38,7 @@ import
org.apache.dolphinscheduler.api.service.impl.ProjectServiceImpl;
import org.apache.dolphinscheduler.common.constants.Constants;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.ComplementDependentMode;
+import org.apache.dolphinscheduler.common.enums.ExecutionOrder;
import org.apache.dolphinscheduler.common.enums.FailureStrategy;
import org.apache.dolphinscheduler.common.enums.Priority;
import org.apache.dolphinscheduler.common.enums.ReleaseState;
@@ -280,10 +281,12 @@ public class ExecuteFunctionServiceTest {
null, null,
null, null, null,
RunMode.RUN_MODE_SERIAL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode,
100L, 10, null, 0, Constants.DRY_RUN_FLAG_NO,
+ Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode,
100L, 10, null, null,
+ Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null,
- false);
+ false,
+ ExecutionOrder.DESC_ORDER);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(commandService, times(1)).createCommand(any(Command.class));
@@ -305,10 +308,12 @@ public class ExecuteFunctionServiceTest {
null, "123456789,987654321",
null, null, null,
RunMode.RUN_MODE_SERIAL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode,
100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
+ Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode,
100L, 110, null, null,
+ Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null,
- false);
+ false,
+ ExecutionOrder.DESC_ORDER);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(commandService, times(1)).createCommand(any(Command.class));
@@ -332,7 +337,8 @@ public class ExecuteFunctionServiceTest {
Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null,
- false);
+ false,
+ ExecutionOrder.DESC_ORDER);
} catch (ServiceException e) {
Assertions.assertEquals(Status.START_NODE_NOT_EXIST_IN_LAST_PROCESS.getCode(),
e.getCode());
}
@@ -413,10 +419,11 @@ public class ExecuteFunctionServiceTest {
null, null,
null, null, null,
RunMode.RUN_MODE_SERIAL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode,
100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
+ Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode,
100L, 110, null, 2, Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null,
- false);
+ false,
+ ExecutionOrder.DESC_ORDER);
Assertions.assertEquals(Status.START_PROCESS_INSTANCE_ERROR,
result.get(Constants.STATUS));
verify(commandService, times(0)).createCommand(any(Command.class));
}
@@ -437,10 +444,12 @@ public class ExecuteFunctionServiceTest {
null, null,
null, null, null,
RunMode.RUN_MODE_SERIAL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode,
100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
+ Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode,
100L, 110, null, null,
+ Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null,
- false);
+ false,
+ ExecutionOrder.DESC_ORDER);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(commandService, times(1)).createCommand(any(Command.class));
}
@@ -461,13 +470,14 @@ public class ExecuteFunctionServiceTest {
null, null,
null, null, null,
RunMode.RUN_MODE_PARALLEL,
- Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode,
100L, 110, null, 0, Constants.DRY_RUN_FLAG_NO,
+ Priority.LOW, Constants.DEFAULT_WORKER_GROUP, tenantCode,
100L, 110, null, 2, Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null,
- false);
- Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
- verify(commandService, times(31)).createCommand(any(Command.class));
+ false,
+ ExecutionOrder.DESC_ORDER);
+ Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
+ verify(commandService, times(2)).createCommand(any(Command.class));
}
/**
@@ -490,7 +500,8 @@ public class ExecuteFunctionServiceTest {
Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null,
- false);
+ false,
+ ExecutionOrder.DESC_ORDER);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
verify(commandService, times(15)).createCommand(any(Command.class));
@@ -518,11 +529,12 @@ public class ExecuteFunctionServiceTest {
100L,
110,
null,
- 0,
+ null,
Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_NO,
ComplementDependentMode.OFF_MODE, null,
- false));
+ false,
+ ExecutionOrder.DESC_ORDER));
}
@Test
@@ -555,7 +567,8 @@ public class ExecuteFunctionServiceTest {
Constants.DRY_RUN_FLAG_NO,
Constants.TEST_FLAG_YES,
ComplementDependentMode.OFF_MODE, null,
- false);
+ false,
+ ExecutionOrder.DESC_ORDER);
Assertions.assertEquals(Status.SUCCESS, result.get(Constants.STATUS));
}
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionOrder.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionOrder.java
new file mode 100644
index 0000000000..4c82d291dc
--- /dev/null
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/enums/ExecutionOrder.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.common.enums;
+
+import com.baomidou.mybatisplus.annotation.EnumValue;
+
+/**
+ * complement data in some kind of order
+ */
+public enum ExecutionOrder {
+
+ /**
+ * 0 complement data in descending order
+ * 1 complement data in ascending order
+ */
+ DESC_ORDER(0, "descending order"),
+ ASC_ORDER(1, "ascending order");
+
+ ExecutionOrder(int code, String desc) {
+ this.code = code;
+ this.desc = desc;
+ }
+
+ @EnumValue
+ private final int code;
+ private final String desc;
+
+ public int getCode() {
+ return code;
+ }
+
+ public String getDesc() {
+ return desc;
+ }
+}
diff --git a/dolphinscheduler-ui/src/locales/en_US/project.ts
b/dolphinscheduler-ui/src/locales/en_US/project.ts
index fa69010413..091af80c99 100644
--- a/dolphinscheduler-ui/src/locales/en_US/project.ts
+++ b/dolphinscheduler-ui/src/locales/en_US/project.ts
@@ -140,6 +140,9 @@ export default {
parallelism: 'Parallelism',
custom_parallelism: 'Custom Parallelism',
please_enter_parallelism: 'Please enter Parallelism',
+ order_of_execution: 'Order of execution',
+ ascending_order: 'In ascending order',
+ descending_order: 'In descending order',
please_choose: 'Please Choose',
start_time: 'Start Time',
end_time: 'End Time',
diff --git a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
index 20d1bb8993..c62f792bfe 100644
--- a/dolphinscheduler-ui/src/locales/zh_CN/project.ts
+++ b/dolphinscheduler-ui/src/locales/zh_CN/project.ts
@@ -141,6 +141,9 @@ export default {
parallelism: '并行度',
custom_parallelism: '自定义并行度',
please_enter_parallelism: '请输入并行度',
+ order_of_execution: '执行顺序',
+ ascending_order: '按日期升序执行',
+ descending_order: '按日期降序执行',
please_choose: '请选择',
start_time: '开始时间',
end_time: '结束时间',
diff --git
a/dolphinscheduler-ui/src/views/projects/workflow/definition/components/start-modal.tsx
b/dolphinscheduler-ui/src/views/projects/workflow/definition/components/start-modal.tsx
index 45e24e1506..f2c939a261 100644
---
a/dolphinscheduler-ui/src/views/projects/workflow/definition/components/start-modal.tsx
+++
b/dolphinscheduler-ui/src/views/projects/workflow/definition/components/start-modal.tsx
@@ -432,6 +432,21 @@ export default defineComponent({
/>
</NFormItem>
)}
+ <NFormItem
+ label={t('project.workflow.order_of_execution')}
+ path='executionOrder'
+ >
+ <NRadioGroup v-model:value={this.startForm.executionOrder}>
+ <NSpace>
+ <NRadio value={'DESC_ORDER'}>
+ {t('project.workflow.descending_order')}
+ </NRadio>
+ <NRadio value={'ASC_ORDER'}>
+ {t('project.workflow.ascending_order')}
+ </NRadio>
+ </NSpace>
+ </NRadioGroup>
+ </NFormItem>
<NFormItem
label={t('project.workflow.schedule_date')}
path={
diff --git
a/dolphinscheduler-ui/src/views/projects/workflow/definition/components/use-form.ts
b/dolphinscheduler-ui/src/views/projects/workflow/definition/components/use-form.ts
index aa07d9ad83..2c5cf6820a 100644
---
a/dolphinscheduler-ui/src/views/projects/workflow/definition/components/use-form.ts
+++
b/dolphinscheduler-ui/src/views/projects/workflow/definition/components/use-form.ts
@@ -70,7 +70,8 @@ export const useForm = () => {
dryRun: 0,
testFlag: 0,
version: null,
- allLevelDependent: 'false'
+ allLevelDependent: 'false',
+ executionOrder: 'DESC_ORDER',
},
saving: false,
rules: {