github-advanced-security[bot] commented on code in PR #16523: URL: https://github.com/apache/dolphinscheduler/pull/16523#discussion_r1732311026
########## dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java: ########## @@ -93,47 +96,60 @@ } log.info("In parallel mode, current expectedParallelismNumber:{}", expectedParallelismNumber); + final List<Integer> workflowInstanceIdList = Lists.newArrayList(); for (List<ZonedDateTime> stringDate : Lists.partition(listDate, expectedParallelismNumber)) { - final BackfillWorkflowCommandParam backfillWorkflowCommandParam = BackfillWorkflowCommandParam.builder() - .commandParams(backfillWorkflowDTO.getStartParamList()) - .startNodes(backfillWorkflowDTO.getStartNodes()) - .backfillTimeList(stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList())) - .timeZone(DateUtils.getTimezone()) - .build(); - doCreateCommand(backfillWorkflowDTO, backfillWorkflowCommandParam); + final Integer workflowInstanceId = doBackfillWorkflow( + backfillWorkflowDTO, + stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList())); + workflowInstanceIdList.add(workflowInstanceId); } + return workflowInstanceIdList; } - private void doCreateCommand(final BackfillWorkflowDTO backfillWorkflowDTO, - final BackfillWorkflowCommandParam backfillWorkflowCommandParam) { - List<String> backfillTimeList = backfillWorkflowCommandParam.getBackfillTimeList(); - final Command command = Command.builder() - .commandType(backfillWorkflowDTO.getExecType()) - .processDefinitionCode(backfillWorkflowDTO.getWorkflowDefinition().getCode()) - .processDefinitionVersion(backfillWorkflowDTO.getWorkflowDefinition().getVersion()) - .executorId(backfillWorkflowDTO.getLoginUser().getId()) - .scheduleTime(DateUtils.stringToDate(backfillTimeList.get(0))) - .commandParam(JSONUtils.toJsonString(backfillWorkflowCommandParam)) - .taskDependType(backfillWorkflowDTO.getTaskDependType()) + private Integer doBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO, + final List<String> backfillTimeList) { + final Server masterServer = registryClient.getRandomServer(RegistryNodeType.MASTER).orElse(null); + if (masterServer == null) { + throw new ServiceException("no master server available"); + } + + final ProcessDefinition workflowDefinition = backfillWorkflowDTO.getWorkflowDefinition(); + final WorkflowBackfillTriggerRequest backfillTriggerRequest = WorkflowBackfillTriggerRequest.builder() + .userId(backfillWorkflowDTO.getLoginUser().getId()) + .backfillTimeList(backfillTimeList) + .workflowCode(workflowDefinition.getCode()) + .workflowVersion(workflowDefinition.getVersion()) + .startNodes(backfillWorkflowDTO.getStartNodes()) .failureStrategy(backfillWorkflowDTO.getFailureStrategy()) + .taskDependType(backfillWorkflowDTO.getTaskDependType()) + .execType(backfillWorkflowDTO.getExecType()) .warningType(backfillWorkflowDTO.getWarningType()) .warningGroupId(backfillWorkflowDTO.getWarningGroupId()) - .startTime(new Date()) - .processInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority()) - .updateTime(new Date()) + .workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority()) .workerGroup(backfillWorkflowDTO.getWorkerGroup()) .tenantCode(backfillWorkflowDTO.getTenantCode()) - .dryRun(backfillWorkflowDTO.getDryRun().getCode()) - .testFlag(backfillWorkflowDTO.getTestFlag().getCode()) + .environmentCode(backfillWorkflowDTO.getEnvironmentCode()) + .startParamList(backfillWorkflowDTO.getStartParamList()) + .dryRun(backfillWorkflowDTO.getDryRun()) + .testFlag(backfillWorkflowDTO.getTestFlag()) .build(); - commandDao.insert(command); + + final WorkflowBackfillTriggerResponse backfillTriggerResponse = Clients + .withService(IWorkflowInstanceController.class) + .withHost(masterServer.getHost() + ":" + masterServer.getPort()) + .backfillTriggerWorkflow(backfillTriggerRequest); + if (!backfillTriggerResponse.isSuccess()) { + throw new ServiceException("Backfill workflow failed: " + backfillTriggerResponse.getMessage()); + } final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = backfillWorkflowDTO.getBackfillParams(); if (backfillParams.getBackfillDependentMode() == ComplementDependentMode.ALL_DEPENDENT) { - doBackfillDependentWorkflow(backfillWorkflowCommandParam, command); + doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList); } + return backfillTriggerResponse.getWorkflowInstanceId(); } - private void doBackfillDependentWorkflow(final BackfillWorkflowCommandParam backfillWorkflowCommandParam, - final Command backfillCommand) { + private void doBackfillDependentWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO, Review Comment: ## Useless parameter The parameter 'backfillWorkflowDTO' is never used. [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/4568) ########## dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/workflow/BackfillWorkflowExecutorDelegate.java: ########## @@ -93,47 +96,60 @@ } log.info("In parallel mode, current expectedParallelismNumber:{}", expectedParallelismNumber); + final List<Integer> workflowInstanceIdList = Lists.newArrayList(); for (List<ZonedDateTime> stringDate : Lists.partition(listDate, expectedParallelismNumber)) { - final BackfillWorkflowCommandParam backfillWorkflowCommandParam = BackfillWorkflowCommandParam.builder() - .commandParams(backfillWorkflowDTO.getStartParamList()) - .startNodes(backfillWorkflowDTO.getStartNodes()) - .backfillTimeList(stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList())) - .timeZone(DateUtils.getTimezone()) - .build(); - doCreateCommand(backfillWorkflowDTO, backfillWorkflowCommandParam); + final Integer workflowInstanceId = doBackfillWorkflow( + backfillWorkflowDTO, + stringDate.stream().map(DateUtils::dateToString).collect(Collectors.toList())); + workflowInstanceIdList.add(workflowInstanceId); } + return workflowInstanceIdList; } - private void doCreateCommand(final BackfillWorkflowDTO backfillWorkflowDTO, - final BackfillWorkflowCommandParam backfillWorkflowCommandParam) { - List<String> backfillTimeList = backfillWorkflowCommandParam.getBackfillTimeList(); - final Command command = Command.builder() - .commandType(backfillWorkflowDTO.getExecType()) - .processDefinitionCode(backfillWorkflowDTO.getWorkflowDefinition().getCode()) - .processDefinitionVersion(backfillWorkflowDTO.getWorkflowDefinition().getVersion()) - .executorId(backfillWorkflowDTO.getLoginUser().getId()) - .scheduleTime(DateUtils.stringToDate(backfillTimeList.get(0))) - .commandParam(JSONUtils.toJsonString(backfillWorkflowCommandParam)) - .taskDependType(backfillWorkflowDTO.getTaskDependType()) + private Integer doBackfillWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO, + final List<String> backfillTimeList) { + final Server masterServer = registryClient.getRandomServer(RegistryNodeType.MASTER).orElse(null); + if (masterServer == null) { + throw new ServiceException("no master server available"); + } + + final ProcessDefinition workflowDefinition = backfillWorkflowDTO.getWorkflowDefinition(); + final WorkflowBackfillTriggerRequest backfillTriggerRequest = WorkflowBackfillTriggerRequest.builder() + .userId(backfillWorkflowDTO.getLoginUser().getId()) + .backfillTimeList(backfillTimeList) + .workflowCode(workflowDefinition.getCode()) + .workflowVersion(workflowDefinition.getVersion()) + .startNodes(backfillWorkflowDTO.getStartNodes()) .failureStrategy(backfillWorkflowDTO.getFailureStrategy()) + .taskDependType(backfillWorkflowDTO.getTaskDependType()) + .execType(backfillWorkflowDTO.getExecType()) .warningType(backfillWorkflowDTO.getWarningType()) .warningGroupId(backfillWorkflowDTO.getWarningGroupId()) - .startTime(new Date()) - .processInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority()) - .updateTime(new Date()) + .workflowInstancePriority(backfillWorkflowDTO.getWorkflowInstancePriority()) .workerGroup(backfillWorkflowDTO.getWorkerGroup()) .tenantCode(backfillWorkflowDTO.getTenantCode()) - .dryRun(backfillWorkflowDTO.getDryRun().getCode()) - .testFlag(backfillWorkflowDTO.getTestFlag().getCode()) + .environmentCode(backfillWorkflowDTO.getEnvironmentCode()) + .startParamList(backfillWorkflowDTO.getStartParamList()) + .dryRun(backfillWorkflowDTO.getDryRun()) + .testFlag(backfillWorkflowDTO.getTestFlag()) .build(); - commandDao.insert(command); + + final WorkflowBackfillTriggerResponse backfillTriggerResponse = Clients + .withService(IWorkflowInstanceController.class) + .withHost(masterServer.getHost() + ":" + masterServer.getPort()) + .backfillTriggerWorkflow(backfillTriggerRequest); + if (!backfillTriggerResponse.isSuccess()) { + throw new ServiceException("Backfill workflow failed: " + backfillTriggerResponse.getMessage()); + } final BackfillWorkflowDTO.BackfillParamsDTO backfillParams = backfillWorkflowDTO.getBackfillParams(); if (backfillParams.getBackfillDependentMode() == ComplementDependentMode.ALL_DEPENDENT) { - doBackfillDependentWorkflow(backfillWorkflowCommandParam, command); + doBackfillDependentWorkflow(backfillWorkflowDTO, backfillTimeList); } + return backfillTriggerResponse.getWorkflowInstanceId(); } - private void doBackfillDependentWorkflow(final BackfillWorkflowCommandParam backfillWorkflowCommandParam, - final Command backfillCommand) { + private void doBackfillDependentWorkflow(final BackfillWorkflowDTO backfillWorkflowDTO, + final List<String> backfillTimeList) { Review Comment: ## Useless parameter The parameter 'backfillTimeList' is never used. [Show more details](https://github.com/apache/dolphinscheduler/security/code-scanning/4569) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@dolphinscheduler.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org