This is an automated email from the ASF dual-hosted git repository.
leonbao pushed a commit to branch 2.0.2-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/2.0.2-prepare by this push:
new 255670f [Feature][dev-2.0.2-prepare] cherry-pick pull requests from
dev to 2.0.2-prepare #7357 (#7358)
255670f is described below
commit 255670fda9cbd4ed17b9d7f4da83ede42faaff9b
Author: zwZjut <[email protected]>
AuthorDate: Mon Dec 13 12:24:23 2021 +0800
[Feature][dev-2.0.2-prepare] cherry-pick pull requests from dev to
2.0.2-prepare #7357 (#7358)
* [Feature][dolphinscheduler-api] parse traceId in http header for Cross
system delivery to #7237 (#7238)
* to #7237
* rerun test
Co-authored-by: honghuo.zw <[email protected]>
* chery-pick 05aef27 and handle conflicts
* to #7065: fix ExecutorService and schedulerService (#7072)
Co-authored-by: honghuo.zw <[email protected]>
* [Feature][dolphinscheduler-api] access control of taskDefinition and
taskInstance in project to #7081 (#7082)
* to #7081
* fix #7081
* to #7081
Co-authored-by: honghuo.zw <[email protected]>
* chery-pick 8ebe060 and handle conflicts
* cherry-pick 1f18444 and handle conflicts
* fix #6807: dolphinscheduler.zookeeper.env_vars - >
dolphinscheduler.registry.env_vars (#6808)
Co-authored-by: honghuo.zw <[email protected]>
Co-authored-by: Kirs <[email protected]>
* add default constructor (#6780)
Co-authored-by: honghuo.zw <[email protected]>
* to #7108 (#7109)
Co-authored-by: honghuo.zw <[email protected]>
Co-authored-by: Kirs <[email protected]>
---
.../templates/deployment-dolphinscheduler-api.yaml | 2 +-
.../statefulset-dolphinscheduler-master.yaml | 2 +-
.../statefulset-dolphinscheduler-worker.yaml | 2 +-
.../api/aspect/AccessLogAspect.java | 17 +-
.../api/controller/LoggerController.java | 58 ++++++-
.../controller/ProcessDefinitionController.java | 2 +-
.../api/controller/ProcessInstanceController.java | 5 +-
.../api/service/ExecutorService.java | 3 +-
.../api/service/LoggerService.java | 24 +++
.../api/service/ProcessDefinitionService.java | 3 +-
.../api/service/ProcessInstanceService.java | 7 +-
.../api/service/impl/ExecutorServiceImpl.java | 9 +-
.../api/service/impl/LoggerServiceImpl.java | 173 +++++++++++++++++----
.../service/impl/ProcessDefinitionServiceImpl.java | 39 +++--
.../service/impl/ProcessInstanceServiceImpl.java | 79 ++++++++--
.../api/service/impl/SchedulerServiceImpl.java | 10 +-
.../service/impl/TaskDefinitionServiceImpl.java | 12 +-
.../api/service/impl/TaskInstanceServiceImpl.java | 11 ++
.../dolphinscheduler/api/utils/PageInfo.java | 4 +
.../ProcessDefinitionControllerTest.java | 2 +-
.../controller/ProcessInstanceControllerTest.java | 25 ++-
.../api/service/ExecutorServiceTest.java | 1 +
.../api/service/LoggerServiceTest.java | 98 ++++++++++++
.../api/service/ProcessDefinitionServiceTest.java | 8 +-
.../api/service/ProcessInstanceServiceTest.java | 13 +-
.../api/service/SchedulerServiceTest.java | 1 +
.../api/service/TaskDefinitionServiceImplTest.java | 1 +
.../api/service/TaskInstanceServiceTest.java | 8 +
.../dao/mapper/ProcessDefinitionLogMapper.java | 3 +-
.../dao/mapper/ProcessInstanceMapper.java | 7 +-
.../dao/mapper/TaskDefinitionLogMapper.java | 3 +-
.../dao/mapper/ProcessDefinitionLogMapper.xml | 3 +
.../dao/mapper/ProcessInstanceMapper.xml | 3 +
.../dao/mapper/TaskDefinitionLogMapper.xml | 3 +
.../dao/mapper/ProcessDefinitionLogMapperTest.java | 2 +-
.../dao/mapper/ProcessInstanceMapperTest.java | 2 +-
.../master/runner/WorkflowExecuteThread.java | 2 +-
37 files changed, 541 insertions(+), 106 deletions(-)
diff --git
a/docker/kubernetes/dolphinscheduler/templates/deployment-dolphinscheduler-api.yaml
b/docker/kubernetes/dolphinscheduler/templates/deployment-dolphinscheduler-api.yaml
index 65b5dea..b2838df 100644
---
a/docker/kubernetes/dolphinscheduler/templates/deployment-dolphinscheduler-api.yaml
+++
b/docker/kubernetes/dolphinscheduler/templates/deployment-dolphinscheduler-api.yaml
@@ -68,7 +68,7 @@ spec:
- name: TZ
value: {{ .Values.timezone }}
{{- include "dolphinscheduler.database.env_vars" . | nindent 12 }}
- {{- include "dolphinscheduler.zookeeper.env_vars" . | nindent 12 }}
+ {{- include "dolphinscheduler.registry.env_vars" . | nindent 12 }}
{{- include "dolphinscheduler.fs_s3a.env_vars" . | nindent 12 }}
envFrom:
- configMapRef:
diff --git
a/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-master.yaml
b/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-master.yaml
index 735d3cd..686d260 100644
---
a/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-master.yaml
+++
b/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-master.yaml
@@ -65,7 +65,7 @@ spec:
- name: TZ
value: {{ .Values.timezone }}
{{- include "dolphinscheduler.database.env_vars" . | nindent 12 }}
- {{- include "dolphinscheduler.zookeeper.env_vars" . | nindent 12 }}
+ {{- include "dolphinscheduler.registry.env_vars" . | nindent 12 }}
{{- include "dolphinscheduler.fs_s3a.env_vars" . | nindent 12 }}
envFrom:
- configMapRef:
diff --git
a/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml
b/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml
index 60b6753..392ae07 100644
---
a/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml
+++
b/docker/kubernetes/dolphinscheduler/templates/statefulset-dolphinscheduler-worker.yaml
@@ -69,7 +69,7 @@ spec:
- name: ALERT_LISTEN_HOST
value: {{ include "dolphinscheduler.fullname" . }}-alert
{{- include "dolphinscheduler.database.env_vars" . | nindent 12 }}
- {{- include "dolphinscheduler.zookeeper.env_vars" . | nindent 12 }}
+ {{- include "dolphinscheduler.registry.env_vars" . | nindent 12 }}
{{- include "dolphinscheduler.fs_s3a.env_vars" . | nindent 12 }}
envFrom:
- configMapRef:
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/AccessLogAspect.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/AccessLogAspect.java
index a34d041..14d75c7 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/AccessLogAspect.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/aspect/AccessLogAspect.java
@@ -19,11 +19,11 @@ package org.apache.dolphinscheduler.api.aspect;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.spi.utils.StringUtils;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
-import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
@@ -46,6 +46,8 @@ import
org.springframework.web.context.request.ServletRequestAttributes;
public class AccessLogAspect {
private static final Logger logger =
LoggerFactory.getLogger(AccessLogAspect.class);
+ private static final String TRACE_ID = "traceId";
+
@Pointcut("@annotation(org.apache.dolphinscheduler.api.aspect.AccessLogAnnotation)")
public void logPointCut(){
// Do nothing because of it's a pointcut
@@ -60,21 +62,24 @@ public class AccessLogAspect {
Method method = sign.getMethod();
AccessLogAnnotation annotation =
method.getAnnotation(AccessLogAnnotation.class);
- String tranceId = UUID.randomUUID().toString();
+ String traceId = UUID.randomUUID().toString();
// log request
if (!annotation.ignoreRequest()) {
ServletRequestAttributes attributes = (ServletRequestAttributes)
RequestContextHolder.getRequestAttributes();
if (attributes != null) {
HttpServletRequest request = attributes.getRequest();
-
+ String traceIdFromHeader = request.getHeader(TRACE_ID);
+ if (!StringUtils.isEmpty(traceIdFromHeader)) {
+ traceId = traceIdFromHeader;
+ }
// handle login info
String userName = parseLoginInfo(request);
// handle args
String argsString = parseArgs(proceedingJoinPoint, annotation);
- logger.info("REQUEST TRANCE_ID:{}, LOGIN_USER:{}, URI:{},
METHOD:{}, HANDLER:{}, ARGS:{}",
- tranceId,
+ logger.info("REQUEST TRACE_ID:{}, LOGIN_USER:{}, URI:{},
METHOD:{}, HANDLER:{}, ARGS:{}",
+ traceId,
userName,
request.getRequestURI(),
request.getMethod(),
@@ -88,7 +93,7 @@ public class AccessLogAspect {
// log response
if (!annotation.ignoreResponse()) {
- logger.info("RESPONSE TRANCE_ID:{}, BODY:{}, REQUEST DURATION:{}
milliseconds", tranceId, ob, (System.currentTimeMillis() - startTime));
+ logger.info("RESPONSE TRACE_ID:{}, BODY:{}, REQUEST DURATION:{}
milliseconds", traceId, ob, (System.currentTimeMillis() - startTime));
}
return ob;
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java
index 3800264..88c715f 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java
@@ -32,6 +32,7 @@ import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestAttribute;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
@@ -43,6 +44,7 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiImplicitParams;
import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
import springfox.documentation.annotations.ApiIgnore;
/**
@@ -82,7 +84,6 @@ public class LoggerController extends BaseController {
return loggerService.queryLog(taskInstanceId, skipNum, limit);
}
-
/**
* download log file
*
@@ -107,4 +108,59 @@ public class LoggerController extends BaseController {
.body(logBytes);
}
+ /**
+ * query task log in specified project
+ *
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param taskInstanceId task instance id
+ * @param skipNum skip number
+ * @param limit limit
+ * @return task log content
+ */
+ @ApiOperation(value = "queryLogInSpecifiedProject", notes =
"QUERY_TASK_INSTANCE_LOG_IN_SPECIFIED_PROJECT_NOTES")
+ @ApiImplicitParams({
+ @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE",
required = true, type = "Long"),
+ @ApiImplicitParam(name = "taskInstanceId", value = "TASK_ID", required
= true, dataType = "Int", example = "100"),
+ @ApiImplicitParam(name = "skipLineNum", value = "SKIP_LINE_NUM",
required = true, dataType = "Int", example = "100"),
+ @ApiImplicitParam(name = "limit", value = "LIMIT", required = true,
dataType = "Int", example = "100")
+ })
+ @GetMapping(value = "/{projectCode}/detail")
+ @ResponseStatus(HttpStatus.OK)
+ @ApiException(QUERY_TASK_INSTANCE_LOG_ERROR)
+ @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+ public Result<String> queryLog(@ApiIgnore @RequestAttribute(value =
Constants.SESSION_USER) User loginUser,
+ @ApiParam(name = "projectCode", value =
"PROJECT_CODE", required = true) @PathVariable long projectCode,
+ @RequestParam(value = "taskInstanceId") int
taskInstanceId,
+ @RequestParam(value = "skipLineNum") int
skipNum,
+ @RequestParam(value = "limit") int limit) {
+ return returnDataList(loggerService.queryLog(loginUser, projectCode,
taskInstanceId, skipNum, limit));
+ }
+
+ /**
+ * download log file
+ *
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param taskInstanceId task instance id
+ * @return log file content
+ */
+ @ApiOperation(value = "downloadTaskLogInSpecifiedProject", notes =
"DOWNLOAD_TASK_INSTANCE_LOG_IN_SPECIFIED_PROJECT_NOTES")
+ @ApiImplicitParams({
+ @ApiImplicitParam(name = "projectCode", value = "PROJECT_CODE",
required = true, type = "Long"),
+ @ApiImplicitParam(name = "taskInstanceId", value = "TASK_ID", required
= true, dataType = "Int", example = "100")
+ })
+ @GetMapping(value = "/{projectCode}/download-log")
+ @ResponseBody
+ @ApiException(DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR)
+ @AccessLogAnnotation(ignoreRequestArgs = "loginUser")
+ public ResponseEntity downloadTaskLog(@ApiIgnore @RequestAttribute(value =
Constants.SESSION_USER) User loginUser,
+ @ApiParam(name = "projectCode",
value = "PROJECT_CODE", required = true) @PathVariable long projectCode,
+ @RequestParam(value =
"taskInstanceId") int taskInstanceId) {
+ byte[] logBytes = loggerService.getLogBytes(loginUser, projectCode,
taskInstanceId);
+ return ResponseEntity
+ .ok()
+ .header(HttpHeaders.CONTENT_DISPOSITION, "attachment;
filename=\"" + System.currentTimeMillis() + ".log" + "\"")
+ .body(logBytes);
+ }
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
index 3134a48..1d7b485 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionController.java
@@ -515,7 +515,7 @@ public class ProcessDefinitionController extends
BaseController {
@ApiParam(name = "projectCode", value =
"PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable("code") long code,
@RequestParam("limit") Integer limit) {
- Map<String, Object> result = processDefinitionService.viewTree(code,
limit);
+ Map<String, Object> result =
processDefinitionService.viewTree(projectCode, code, limit);
return returnDataList(result);
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
index d7ec405..b69320c 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceController.java
@@ -344,8 +344,9 @@ public class ProcessInstanceController extends
BaseController {
@ApiException(QUERY_PROCESS_INSTANCE_ALL_VARIABLES_ERROR)
@AccessLogAnnotation
public Result viewVariables(@ApiIgnore @RequestAttribute(value =
Constants.SESSION_USER) User loginUser,
+ @ApiParam(name = "projectCode", value =
"PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable("id") Integer id) {
- Map<String, Object> result = processInstanceService.viewVariables(id);
+ Map<String, Object> result =
processInstanceService.viewVariables(projectCode, id);
return returnDataList(result);
}
@@ -368,7 +369,7 @@ public class ProcessInstanceController extends
BaseController {
public Result viewTree(@ApiIgnore @RequestAttribute(value =
Constants.SESSION_USER) User loginUser,
@ApiParam(name = "projectCode", value =
"PROJECT_CODE", required = true) @PathVariable long projectCode,
@PathVariable("id") Integer id) throws Exception {
- Map<String, Object> result = processInstanceService.viewGantt(id);
+ Map<String, Object> result =
processInstanceService.viewGantt(projectCode, id);
return returnDataList(result);
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
index 72d1892..2fa065b 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ExecutorService.java
@@ -68,11 +68,12 @@ public interface ExecutorService {
/**
* check whether the process definition can be executed
*
+ * @param projectCode project code
* @param processDefinition process definition
* @param processDefineCode process definition code
* @return check result code
*/
- Map<String, Object> checkProcessDefinitionValid(ProcessDefinition
processDefinition, long processDefineCode);
+ Map<String, Object> checkProcessDefinitionValid(long projectCode,
ProcessDefinition processDefinition, long processDefineCode);
/**
* do action to process instanceļ¼pause, stop, repeat, recover from pause,
recover from stop
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
index ef30a40..b252522 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java
@@ -18,6 +18,9 @@
package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.dao.entity.User;
+
+import java.util.Map;
/**
* logger service
@@ -43,4 +46,25 @@ public interface LoggerService {
*/
byte[] getLogBytes(int taskInstId);
+ /**
+ * query log
+ *
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param taskInstId task instance id
+ * @param skipLineNum skip line number
+ * @param limit limit
+ * @return log string data
+ */
+ Map<String, Object> queryLog(User loginUser, long projectCode, int
taskInstId, int skipLineNum, int limit);
+
+ /**
+ * get log bytes
+ *
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param taskInstId task instance id
+ * @return log byte array
+ */
+ byte[] getLogBytes(User loginUser, long projectCode, int taskInstId);
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
index c29e65d..755d697 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionService.java
@@ -282,11 +282,12 @@ public interface ProcessDefinitionService {
/**
* Encapsulates the TreeView structure
*
+ * @param projectCode project code
* @param code process definition code
* @param limit limit
* @return tree view json data
*/
- Map<String, Object> viewTree(long code, Integer limit);
+ Map<String, Object> viewTree(long projectCode, long code, Integer limit);
/**
* switch the defined process definition version
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
index 3dbf46d..073e1f3 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/ProcessInstanceService.java
@@ -21,7 +21,6 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.enums.DependResult;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.User;
@@ -165,19 +164,21 @@ public interface ProcessInstanceService {
/**
* view process instance variables
*
+ * @param projectCode project code
* @param processInstanceId process instance id
* @return variables data
*/
- Map<String, Object> viewVariables(Integer processInstanceId);
+ Map<String, Object> viewVariables(long projectCode, Integer
processInstanceId);
/**
* encapsulation gantt structure
*
+ * @param projectCode project code
* @param processInstanceId process instance id
* @return gantt tree data
* @throws Exception exception when json parse
*/
- Map<String, Object> viewGantt(Integer processInstanceId) throws Exception;
+ Map<String, Object> viewGantt(long projectCode, Integer processInstanceId)
throws Exception;
/**
* query process instance by processDefinitionCode and stateArray
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
index 6a4f179..18eba02 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ExecutorServiceImpl.java
@@ -124,7 +124,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
// check process define release state
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(processDefinitionCode);
- result = checkProcessDefinitionValid(processDefinition,
processDefinitionCode);
+ result = checkProcessDefinitionValid(projectCode, processDefinition,
processDefinitionCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@@ -179,14 +179,15 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
/**
* check whether the process definition can be executed
*
+ * @param projectCode project code
* @param processDefinition process definition
* @param processDefineCode process definition code
* @return check result code
*/
@Override
- public Map<String, Object> checkProcessDefinitionValid(ProcessDefinition
processDefinition, long processDefineCode) {
+ public Map<String, Object> checkProcessDefinitionValid(long projectCode,
ProcessDefinition processDefinition, long processDefineCode) {
Map<String, Object> result = new HashMap<>();
- if (processDefinition == null) {
+ if (processDefinition == null || projectCode !=
processDefinition.getProjectCode()) {
// check process definition exists
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineCode);
} else if (processDefinition.getReleaseState() != ReleaseState.ONLINE)
{
@@ -230,7 +231,7 @@ public class ExecutorServiceImpl extends BaseServiceImpl
implements ExecutorServ
ProcessDefinition processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
if (executeType != ExecuteType.STOP && executeType !=
ExecuteType.PAUSE) {
- result = checkProcessDefinitionValid(processDefinition,
processInstance.getProcessDefinitionCode());
+ result = checkProcessDefinitionValid(projectCode,
processDefinition, processInstance.getProcessDefinitionCode());
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
index c9965fe..88c5a94 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java
@@ -17,32 +17,43 @@
package org.apache.dolphinscheduler.api.service.impl;
-import com.google.common.primitives.Bytes;
-import org.apache.commons.lang.StringUtils;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.exceptions.ServiceException;
import org.apache.dolphinscheduler.api.service.LoggerService;
+import org.apache.dolphinscheduler.api.service.ProjectService;
import org.apache.dolphinscheduler.api.utils.Result;
import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.remote.utils.Host;
import org.apache.dolphinscheduler.service.log.LogClientService;
import org.apache.dolphinscheduler.service.process.ProcessService;
+
+import org.apache.commons.lang.StringUtils;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.Objects;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-import java.nio.charset.StandardCharsets;
-import java.util.Objects;
+import com.google.common.primitives.Bytes;
/**
* logger service impl
*/
@Service
-public class LoggerServiceImpl implements LoggerService {
+public class LoggerServiceImpl extends BaseServiceImpl implements
LoggerService {
private static final Logger logger =
LoggerFactory.getLogger(LoggerServiceImpl.class);
@@ -53,6 +64,15 @@ public class LoggerServiceImpl implements LoggerService {
private LogClientService logClient;
+ @Autowired
+ ProjectMapper projectMapper;
+
+ @Autowired
+ ProjectService projectService;
+
+ @Autowired
+ TaskDefinitionMapper taskDefinitionMapper;
+
@PostConstruct
public void init() {
if (Objects.isNull(this.logClient)) {
@@ -84,10 +104,117 @@ public class LoggerServiceImpl implements LoggerService {
if (taskInstance == null ||
StringUtils.isBlank(taskInstance.getHost())) {
return Result.error(Status.TASK_INSTANCE_NOT_FOUND);
}
+ Result<String> result = new Result<>(Status.SUCCESS.getCode(),
Status.SUCCESS.getMsg());
+ String log = queryLog(taskInstance,skipLineNum,limit);
+ result.setData(log);
+ return result;
+ }
- String host = getHost(taskInstance.getHost());
- Result<String> result = new Result<>(Status.SUCCESS.getCode(),
Status.SUCCESS.getMsg());
+ /**
+ * get log size
+ *
+ * @param taskInstId task instance id
+ * @return log byte array
+ */
+ @Override
+ public byte[] getLogBytes(int taskInstId) {
+ TaskInstance taskInstance =
processService.findTaskInstanceById(taskInstId);
+ if (taskInstance == null ||
StringUtils.isBlank(taskInstance.getHost())) {
+ throw new ServiceException("task instance is null or host is
null");
+ }
+ return getLogBytes(taskInstance);
+ }
+
+ /**
+ * query log
+ *
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param taskInstId task instance id
+ * @param skipLineNum skip line number
+ * @param limit limit
+ * @return log string data
+ */
+ @Override
+ @SuppressWarnings("unchecked")
+ public Map<String, Object> queryLog(User loginUser, long projectCode, int
taskInstId, int skipLineNum, int limit) {
+ Project project = projectMapper.queryByCode(projectCode);
+ //check user access for project
+ Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode);
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ return result;
+ }
+ // check whether the task instance can be found
+ TaskInstance task = processService.findTaskInstanceById(taskInstId);
+ if (task == null || StringUtils.isBlank(task.getHost())) {
+ putMsg(result, Status.TASK_INSTANCE_NOT_FOUND);
+ return result;
+ }
+
+ TaskDefinition taskDefinition =
taskDefinitionMapper.queryByCode(task.getTaskCode());
+ if (taskDefinition != null && projectCode !=
taskDefinition.getProjectCode()) {
+ putMsg(result, Status.TASK_INSTANCE_NOT_FOUND, taskInstId);
+ return result;
+ }
+ String log = queryLog(task, skipLineNum, limit);
+ result.put(Constants.DATA_LIST, log);
+ return result;
+ }
+
+ /**
+ * get log bytes
+ *
+ * @param loginUser login user
+ * @param projectCode project code
+ * @param taskInstId task instance id
+ * @return log byte array
+ */
+ @Override
+ public byte[] getLogBytes(User loginUser, long projectCode, int
taskInstId) {
+ Project project = projectMapper.queryByCode(projectCode);
+ //check user access for project
+ Map<String, Object> result =
projectService.checkProjectAndAuth(loginUser, project, projectCode);
+ if (result.get(Constants.STATUS) != Status.SUCCESS) {
+ throw new ServiceException("user has no permission");
+ }
+ // check whether the task instance can be found
+ TaskInstance task = processService.findTaskInstanceById(taskInstId);
+ if (task == null || StringUtils.isBlank(task.getHost())) {
+ throw new ServiceException("task instance is null or host is
null");
+ }
+
+ TaskDefinition taskDefinition =
taskDefinitionMapper.queryByCode(task.getTaskCode());
+ if (taskDefinition != null && projectCode !=
taskDefinition.getProjectCode()) {
+ throw new ServiceException("task instance does not exist in
project");
+ }
+ return getLogBytes(task);
+ }
+
+ /**
+ * get host
+ *
+ * @param address address
+ * @return old version return true ,otherwise return false
+ */
+ private String getHost(String address) {
+ if (Boolean.TRUE.equals(Host.isOldVersion(address))) {
+ return address;
+ }
+ return Host.of(address).getIp();
+ }
+
+ /**
+ * query log
+ *
+ * @param taskInstance task instance
+ * @param skipLineNum skip line number
+ * @param limit limit
+ * @return log string data
+ */
+ private String queryLog(TaskInstance taskInstance, int skipLineNum, int
limit) {
+
+ String host = getHost(taskInstance.getHost());
logger.info("log host : {} , logPath : {} , logServer port : {}",
host, taskInstance.getLogPath(),
Constants.RPC_PORT);
@@ -104,23 +231,16 @@ public class LoggerServiceImpl implements LoggerService {
log.append(logClient
.rollViewLog(host, Constants.RPC_PORT,
taskInstance.getLogPath(), skipLineNum, limit));
- result.setData(log.toString());
- return result;
+ return log.toString();
}
-
/**
- * get log size
+ * get log bytes
*
- * @param taskInstId task instance id
+ * @param taskInstance task instance
* @return log byte array
*/
- @Override
- public byte[] getLogBytes(int taskInstId) {
- TaskInstance taskInstance =
processService.findTaskInstanceById(taskInstId);
- if (taskInstance == null ||
StringUtils.isBlank(taskInstance.getHost())) {
- throw new ServiceException("task instance is null or host is
null");
- }
+ private byte[] getLogBytes(TaskInstance taskInstance) {
String host = getHost(taskInstance.getHost());
byte[] head = String.format(LOG_HEAD_FORMAT,
taskInstance.getLogPath(),
@@ -129,17 +249,4 @@ public class LoggerServiceImpl implements LoggerService {
return Bytes.concat(head,
logClient.getLogBytes(host, Constants.RPC_PORT,
taskInstance.getLogPath()));
}
-
- /**
- * get host
- *
- * @param address address
- * @return old version return true ,otherwise return false
- */
- private String getHost(String address) {
- if (Boolean.TRUE.equals(Host.isOldVersion(address))) {
- return address;
- }
- return Host.of(address).getIp();
- }
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
index b5e8453..9dff036 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessDefinitionServiceImpl.java
@@ -445,7 +445,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(code);
- if (processDefinition == null) {
+ if (processDefinition == null || projectCode !=
processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
} else {
Tenant tenant =
tenantMapper.queryById(processDefinition.getTenantId());
@@ -538,7 +538,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(code);
// check process definition exists
- if (processDefinition == null) {
+ if (processDefinition == null || projectCode !=
processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
return result;
}
@@ -640,7 +640,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(code);
- if (processDefinition == null) {
+ if (processDefinition == null || projectCode !=
processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
return result;
}
@@ -718,7 +718,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(code);
- if (processDefinition == null) {
+ if (processDefinition == null || projectCode !=
processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
return result;
}
@@ -773,7 +773,12 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
Set<Long> defineCodeSet =
Lists.newArrayList(codes.split(Constants.COMMA)).stream().map(Long::parseLong).collect(Collectors.toSet());
List<ProcessDefinition> processDefinitionList =
processDefinitionMapper.queryByCodes(defineCodeSet);
- List<DagDataSchedule> dagDataSchedules =
processDefinitionList.stream().map(this::exportProcessDagData).collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(processDefinitionList)) {
+ return;
+ }
+ // check processDefinition exist in project
+ List<ProcessDefinition> processDefinitionListInProject =
processDefinitionList.stream().filter(o -> projectCode ==
o.getProjectCode()).collect(Collectors.toList());
+ List<DagDataSchedule> dagDataSchedules =
processDefinitionListInProject.stream().map(this::exportProcessDagData).collect(Collectors.toList());
if (CollectionUtils.isNotEmpty(dagDataSchedules)) {
downloadProcessDefinitionFile(response, dagDataSchedules);
}
@@ -1055,7 +1060,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
return result;
}
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(code);
- if (processDefinition == null) {
+ if (processDefinition == null || projectCode !=
processDefinition.getProjectCode()) {
logger.info("process define not exists");
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
return result;
@@ -1091,8 +1096,15 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes);
return result;
}
+ // check processDefinition exist in project
+ List<ProcessDefinition> processDefinitionListInProject =
processDefinitionList.stream().
+ filter(o -> projectCode ==
o.getProjectCode()).collect(Collectors.toList());
+ if (CollectionUtils.isEmpty(processDefinitionListInProject)) {
+ putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, codes);
+ return result;
+ }
Map<Long, List<TaskDefinition>> taskNodeMap = new HashMap<>();
- for (ProcessDefinition processDefinition : processDefinitionList) {
+ for (ProcessDefinition processDefinition :
processDefinitionListInProject) {
DagData dagData = processService.genDagData(processDefinition);
taskNodeMap.put(processDefinition.getCode(),
dagData.getTaskDefinitionList());
}
@@ -1129,15 +1141,16 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
/**
* Encapsulates the TreeView structure
*
+ * @param projectCode project code
* @param code process definition code
* @param limit limit
* @return tree view json data
*/
@Override
- public Map<String, Object> viewTree(long code, Integer limit) {
+ public Map<String, Object> viewTree(long projectCode, long code, Integer
limit) {
Map<String, Object> result = new HashMap<>();
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(code);
- if (null == processDefinition) {
+ if (null == processDefinition || projectCode !=
processDefinition.getProjectCode()) {
logger.info("process define not exists");
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
return result;
@@ -1414,7 +1427,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(code);
- if (Objects.isNull(processDefinition)) {
+ if (Objects.isNull(processDefinition) || projectCode !=
processDefinition.getProjectCode()) {
putMsg(result,
Status.SWITCH_PROCESS_DEFINITION_VERSION_NOT_EXIST_PROCESS_DEFINITION_ERROR,
code);
return result;
}
@@ -1478,7 +1491,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
PageInfo<ProcessDefinitionLog> pageInfo = new PageInfo<>(pageNo,
pageSize);
Page<ProcessDefinitionLog> page = new Page<>(pageNo, pageSize);
- IPage<ProcessDefinitionLog> processDefinitionVersionsPaging =
processDefinitionLogMapper.queryProcessDefinitionVersionsPaging(page, code);
+ IPage<ProcessDefinitionLog> processDefinitionVersionsPaging =
processDefinitionLogMapper.queryProcessDefinitionVersionsPaging(page, code,
projectCode);
List<ProcessDefinitionLog> processDefinitionLogs =
processDefinitionVersionsPaging.getRecords();
pageInfo.setTotalList(processDefinitionLogs);
@@ -1509,7 +1522,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
}
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(code);
- if (processDefinition == null) {
+ if (processDefinition == null || projectCode !=
processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
} else {
if (processDefinition.getVersion() == version) {
@@ -1693,7 +1706,7 @@ public class ProcessDefinitionServiceImpl extends
BaseServiceImpl implements Pro
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(code);
// check process definition exists
- if (processDefinition == null) {
+ if (processDefinition == null || projectCode !=
processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, code);
return result;
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
index 4c97374..2b98e14 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/ProcessInstanceServiceImpl.java
@@ -17,6 +17,14 @@
package org.apache.dolphinscheduler.api.service.impl;
+import static org.apache.dolphinscheduler.common.Constants.DATA_LIST;
+import static org.apache.dolphinscheduler.common.Constants.DEPENDENT_SPLIT;
+import static org.apache.dolphinscheduler.common.Constants.GLOBAL_PARAMS;
+import static org.apache.dolphinscheduler.common.Constants.LOCAL_PARAMS;
+import static
org.apache.dolphinscheduler.common.Constants.PROCESS_INSTANCE_STATE;
+import static org.apache.dolphinscheduler.common.Constants.TASK_LIST;
+import static org.apache.dolphinscheduler.common.Constants.WARNING_GROUP_NAME;
+
import org.apache.dolphinscheduler.api.dto.gantt.GanttDto;
import org.apache.dolphinscheduler.api.dto.gantt.Task;
import org.apache.dolphinscheduler.api.enums.Status;
@@ -43,8 +51,25 @@ import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.placeholder.BusinessTimeUtils;
-import org.apache.dolphinscheduler.dao.entity.*;
-import org.apache.dolphinscheduler.dao.mapper.*;
+import org.apache.dolphinscheduler.dao.entity.AlertGroup;
+import org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
+import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
+import org.apache.dolphinscheduler.dao.entity.ProcessTaskRelationLog;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinitionLog;
+import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.Tenant;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.AlertGroupMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionLogMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
+import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.commons.collections.CollectionUtils;
@@ -72,8 +97,6 @@ import
org.springframework.transaction.annotation.Transactional;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import static org.apache.dolphinscheduler.common.Constants.*;
-
/**
* process instance service impl
*/
@@ -125,6 +148,9 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
@Autowired
private TenantMapper tenantMapper;
+ @Autowired
+ TaskDefinitionMapper taskDefinitionMapper;
+
/**
* return top n SUCCESS process instance order by running time which
started between startTime and endTime
*/
@@ -160,7 +186,7 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
return result;
}
- List<ProcessInstance> processInstances =
processInstanceMapper.queryTopNProcessInstance(size, start, end,
ExecutionStatus.SUCCESS);
+ List<ProcessInstance> processInstances =
processInstanceMapper.queryTopNProcessInstance(size, start, end,
ExecutionStatus.SUCCESS, projectCode);
result.put(DATA_LIST, processInstances);
putMsg(result, Status.SUCCESS);
return result;
@@ -187,7 +213,7 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
ProcessDefinition processDefinition =
processService.findProcessDefinition(processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion());
- if (processDefinition == null) {
+ if (processDefinition == null || projectCode !=
processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processId);
} else {
processInstance.setWarningGroupId(processDefinition.getWarningGroupId());
@@ -295,12 +321,16 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
return result;
}
ProcessInstance processInstance =
processService.findProcessInstanceDetailById(processId);
+ ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
+ if (processDefinition != null && projectCode !=
processDefinition.getProjectCode()) {
+ putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST, processId);
+ return result;
+ }
List<TaskInstance> taskInstanceList =
processService.findValidTaskListByProcessId(processId);
addDependResultForTaskList(taskInstanceList);
Map<String, Object> resultMap = new HashMap<>();
resultMap.put(PROCESS_INSTANCE_STATE,
processInstance.getState().toString());
- ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
if (null != processDefinition && processDefinition.getWarningGroupId()
!= 0) {
//check if exist
@@ -386,6 +416,13 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId);
return result;
}
+
+ TaskDefinition taskDefinition =
taskDefinitionMapper.queryByCode(taskInstance.getTaskCode());
+ if (taskDefinition != null && projectCode !=
taskDefinition.getProjectCode()) {
+ putMsg(result, Status.TASK_INSTANCE_NOT_EXISTS, taskId);
+ return result;
+ }
+
if (!taskInstance.isSubProcess()) {
putMsg(result, Status.TASK_INSTANCE_NOT_SUB_WORKFLOW_INSTANCE,
taskInstance.getName());
return result;
@@ -437,6 +474,12 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST,
processInstanceId);
return result;
}
+ //check process instance exists in project
+ ProcessDefinition processDefinition0 =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
+ if (processDefinition0 != null && projectCode !=
processDefinition0.getProjectCode()) {
+ putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST,
processInstanceId);
+ return result;
+ }
//check process instance status
if (!processInstance.getState().typeIsFinished()) {
putMsg(result, Status.PROCESS_INSTANCE_STATE_OPERATION_ERROR,
@@ -590,6 +633,12 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
return result;
}
+ ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
+ if (processDefinition != null && projectCode !=
processDefinition.getProjectCode()) {
+ putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST,
processInstanceId);
+ return result;
+ }
+
try {
processService.removeTaskLogFile(processInstanceId);
} catch (Exception e) {
@@ -614,11 +663,12 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
/**
* view process instance variables
*
+ * @param projectCode project code
* @param processInstanceId process instance id
* @return variables data
*/
@Override
- public Map<String, Object> viewVariables(Integer processInstanceId) {
+ public Map<String, Object> viewVariables(long projectCode, Integer
processInstanceId) {
Map<String, Object> result = new HashMap<>();
ProcessInstance processInstance =
processInstanceMapper.queryDetailById(processInstanceId);
@@ -627,6 +677,12 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
throw new RuntimeException("workflow instance is null");
}
+ ProcessDefinition processDefinition =
processDefineMapper.queryByCode(processInstance.getProcessDefinitionCode());
+ if (processDefinition != null && projectCode !=
processDefinition.getProjectCode()) {
+ putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST,
processInstanceId);
+ return result;
+ }
+
Map<String, String> timeParams = BusinessTimeUtils
.getBusinessTime(processInstance.getCmdTypeIfComplement(),
processInstance.getScheduleTime());
@@ -686,12 +742,13 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
/**
* encapsulation gantt structure
*
+ * @param projectCode project code
* @param processInstanceId process instance id
* @return gantt tree data
* @throws Exception exception when json parse
*/
@Override
- public Map<String, Object> viewGantt(Integer processInstanceId) throws
Exception {
+ public Map<String, Object> viewGantt(long projectCode, Integer
processInstanceId) throws Exception {
Map<String, Object> result = new HashMap<>();
ProcessInstance processInstance =
processInstanceMapper.queryDetailById(processInstanceId);
@@ -704,6 +761,10 @@ public class ProcessInstanceServiceImpl extends
BaseServiceImpl implements Proce
processInstance.getProcessDefinitionCode(),
processInstance.getProcessDefinitionVersion()
);
+ if (processDefinition != null && projectCode !=
processDefinition.getProjectCode()) {
+ putMsg(result, Status.PROCESS_INSTANCE_NOT_EXIST,
processInstanceId);
+ return result;
+ }
GanttDto ganttDto = new GanttDto();
DAG<String, TaskNode, TaskNodeRelation> dag =
processService.genDagGraph(processDefinition);
//topological sort
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
index 4954fb9..c9af32d 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/SchedulerServiceImpl.java
@@ -146,7 +146,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
// check work flow define release state
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(processDefineCode);
- result =
executorService.checkProcessDefinitionValid(processDefinition,
processDefineCode);
+ result =
executorService.checkProcessDefinitionValid(projectCode,processDefinition,
processDefineCode);
if (result.get(Constants.STATUS) != Status.SUCCESS) {
return result;
}
@@ -247,7 +247,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
}
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(schedule.getProcessDefinitionCode());
- if (processDefinition == null) {
+ if (processDefinition == null || projectCode !=
processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST,
schedule.getProcessDefinitionCode());
return result;
}
@@ -296,7 +296,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
return result;
}
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode());
- if (processDefinition == null) {
+ if (processDefinition == null || projectCode !=
processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST,
scheduleObj.getProcessDefinitionCode());
return result;
}
@@ -396,7 +396,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
}
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(processDefineCode);
- if (processDefinition == null) {
+ if (processDefinition == null || projectCode !=
processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, processDefineCode);
return result;
}
@@ -606,7 +606,7 @@ public class SchedulerServiceImpl extends BaseServiceImpl
implements SchedulerSe
}
ProcessDefinition processDefinition =
processDefinitionMapper.queryByCode(processDefinitionCode);
- if (processDefinition == null) {
+ if (processDefinition == null || projectCode !=
processDefinition.getProjectCode()) {
putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST,
processDefinitionCode);
return result;
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
index 238b1be..97ecb6c 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskDefinitionServiceImpl.java
@@ -191,6 +191,10 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
return result;
}
TaskDefinition taskDefinition =
taskDefinitionMapper.queryByCode(taskCode);
+ if (taskDefinition == null || projectCode !=
taskDefinition.getProjectCode()) {
+ putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
+ return result;
+ }
if (taskDefinition.getFlag() == Flag.YES) {
putMsg(result, Status.TASK_DEFINE_STATE_ONLINE, taskCode);
return result;
@@ -342,7 +346,7 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
return result;
}
TaskDefinition taskDefinition =
taskDefinitionMapper.queryByCode(taskCode);
- if (taskDefinition == null) {
+ if (taskDefinition == null || projectCode !=
taskDefinition.getProjectCode()) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
return result;
}
@@ -377,7 +381,7 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
}
PageInfo<TaskDefinitionLog> pageInfo = new PageInfo<>(pageNo,
pageSize);
Page<TaskDefinitionLog> page = new Page<>(pageNo, pageSize);
- IPage<TaskDefinitionLog> taskDefinitionVersionsPaging =
taskDefinitionLogMapper.queryTaskDefinitionVersionsPaging(page, taskCode);
+ IPage<TaskDefinitionLog> taskDefinitionVersionsPaging =
taskDefinitionLogMapper.queryTaskDefinitionVersionsPaging(page, taskCode,
projectCode);
List<TaskDefinitionLog> taskDefinitionLogs =
taskDefinitionVersionsPaging.getRecords();
pageInfo.setTotalList(taskDefinitionLogs);
@@ -424,7 +428,7 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
}
TaskDefinition taskDefinition =
taskDefinitionMapper.queryByCode(taskCode);
- if (taskDefinition == null) {
+ if (taskDefinition == null || projectCode !=
taskDefinition.getProjectCode()) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, taskCode);
} else {
result.put(Constants.DATA_LIST, taskDefinition);
@@ -520,7 +524,7 @@ public class TaskDefinitionServiceImpl extends
BaseServiceImpl implements TaskDe
return result;
}
TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(code);
- if (taskDefinition == null) {
+ if (taskDefinition == null || projectCode !=
taskDefinition.getProjectCode()) {
putMsg(result, Status.TASK_DEFINE_NOT_EXIST, code);
return result;
}
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
index 9f4b8cb..64ba304 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java
@@ -29,9 +29,11 @@ import
org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -71,6 +73,9 @@ public class TaskInstanceServiceImpl extends BaseServiceImpl
implements TaskInst
@Autowired
UsersService usersService;
+ @Autowired
+ TaskDefinitionMapper taskDefinitionMapper;
+
/**
* query task list by project, process instance, task name, task start
time, task end time, task status, keyword paging
*
@@ -171,6 +176,12 @@ public class TaskInstanceServiceImpl extends
BaseServiceImpl implements TaskInst
return result;
}
+ TaskDefinition taskDefinition =
taskDefinitionMapper.queryByCode(task.getTaskCode());
+ if (taskDefinition != null && projectCode !=
taskDefinition.getProjectCode()) {
+ putMsg(result, Status.TASK_INSTANCE_NOT_FOUND, taskInstanceId);
+ return result;
+ }
+
// check whether the task instance state type is failure or cancel
if (!task.getState().typeIsFailure() &&
!task.getState().typeIsCancel()) {
putMsg(result, Status.TASK_INSTANCE_STATE_OPERATION_ERROR,
taskInstanceId, task.getState().toString());
diff --git
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/PageInfo.java
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/PageInfo.java
index 95405a4..5b443dc 100644
---
a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/PageInfo.java
+++
b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/utils/PageInfo.java
@@ -51,6 +51,10 @@ public class PageInfo<T> {
*/
private Integer pageNo;
+ public PageInfo() {
+
+ }
+
public PageInfo(Integer currentPage, Integer pageSize) {
if (currentPage == null) {
currentPage = 1;
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
index 4737c2f..d141a56 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessDefinitionControllerTest.java
@@ -339,7 +339,7 @@ public class ProcessDefinitionControllerTest {
Map<String, Object> result = new HashMap<>();
putMsg(result, Status.SUCCESS);
- Mockito.when(processDefinitionService.viewTree(processId,
limit)).thenReturn(result);
+ Mockito.when(processDefinitionService.viewTree(projectCode, processId,
limit)).thenReturn(result);
Result response = processDefinitionController.viewTree(user,
projectCode, processId, limit);
Assert.assertTrue(response != null && response.isSuccess());
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java
index 7d6fabc..27665ae 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/controller/ProcessInstanceControllerTest.java
@@ -23,12 +23,19 @@ import static
org.springframework.test.web.servlet.result.MockMvcResultMatchers.
import static
org.springframework.test.web.servlet.result.MockMvcResultMatchers.status;
import org.apache.dolphinscheduler.api.enums.Status;
+import org.apache.dolphinscheduler.api.service.ProcessInstanceService;
import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.ExecutionStatus;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import java.util.HashMap;
+import java.util.Map;
+
import org.junit.Assert;
import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
+import org.springframework.boot.test.mock.mockito.MockBean;
import org.springframework.http.MediaType;
import org.springframework.test.web.servlet.MvcResult;
import org.springframework.util.LinkedMultiValueMap;
@@ -39,6 +46,9 @@ import org.springframework.util.MultiValueMap;
*/
public class ProcessInstanceControllerTest extends AbstractControllerTest {
+ @MockBean(name = "processInstanceService")
+ private ProcessInstanceService processInstanceService;
+
@Test
public void testQueryProcessInstanceList() throws Exception {
MultiValueMap<String, String> paramsMap = new LinkedMultiValueMap<>();
@@ -146,13 +156,14 @@ public class ProcessInstanceControllerTest extends
AbstractControllerTest {
@Test
public void testViewVariables() throws Exception {
- MvcResult mvcResult =
mockMvc.perform(get("/projects/{projectCode}/instance/view-variables",
"cxc_1113")
- .header(SESSION_ID, sessionId)
- .param("processInstanceId", "1204"))
- .andExpect(status().isOk())
- .andExpect(content().contentType(MediaType.APPLICATION_JSON_UTF8))
- .andReturn();
-
+ Map<String, Object> mockResult = new HashMap<>();
+ mockResult.put(Constants.STATUS, Status.SUCCESS);
+
PowerMockito.when(processInstanceService.viewVariables(1113L,123)).thenReturn(mockResult);
+ MvcResult mvcResult =
mockMvc.perform(get("/projects/{projectCode}/process-instances/{id}/view-variables",
"1113", "123")
+ .header(SESSION_ID, sessionId))
+ .andExpect(status().isOk())
+ .andExpect(content().contentType(MediaType.APPLICATION_JSON))
+ .andReturn();
Result result =
JSONUtils.parseObject(mvcResult.getResponse().getContentAsString(),
Result.class);
Assert.assertNotNull(result);
Assert.assertEquals(Status.SUCCESS.getCode(),
result.getCode().intValue());
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
index bb279ea..da942fb 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ExecutorServiceTest.java
@@ -122,6 +122,7 @@ public class ExecutorServiceTest {
processDefinition.setUserId(userId);
processDefinition.setVersion(1);
processDefinition.setCode(1L);
+ processDefinition.setProjectCode(projectCode);
// processInstance
processInstance.setId(processInstanceId);
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
index bd8aa72..bc46d24 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java
@@ -20,9 +20,20 @@ package org.apache.dolphinscheduler.api.service;
import org.apache.dolphinscheduler.api.enums.Status;
import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl;
import org.apache.dolphinscheduler.api.utils.Result;
+import org.apache.dolphinscheduler.common.Constants;
+import org.apache.dolphinscheduler.common.enums.UserType;
+import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
+import org.apache.dolphinscheduler.dao.entity.User;
+import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
+import java.text.MessageFormat;
+import java.util.HashMap;
+import java.util.Map;
+
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -51,6 +62,15 @@ public class LoggerServiceTest {
@Mock
private ProcessService processService;
+ @Mock
+ private ProjectMapper projectMapper;
+
+ @Mock
+ private ProjectService projectService;
+
+ @Mock
+ private TaskDefinitionMapper taskDefinitionMapper;
+
@Before
public void init() {
this.loggerService.init();
@@ -113,9 +133,87 @@ public class LoggerServiceTest {
}
+ @Test
+ public void testQueryLogInSpecifiedProject() {
+ long projectCode = 1L;
+
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
+ Project project = getProject(projectCode);
+
+ User loginUser = new User();
+ loginUser.setId(-1);
+ loginUser.setUserType(UserType.GENERAL_USER);
+ Map<String, Object> result = new HashMap<>();
+ putMsg(result, Status.SUCCESS, projectCode);
+ TaskInstance taskInstance = new TaskInstance();
+
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
+ TaskDefinition taskDefinition = new TaskDefinition();
+ taskDefinition.setProjectCode(projectCode);
+ taskDefinition.setCode(1L);
+ //SUCCESS
+ taskInstance.setTaskCode(1L);
+ taskInstance.setId(1);
+ taskInstance.setHost("127.0.0.1:8080");
+ taskInstance.setLogPath("/temp/log");
+ Mockito.when(projectService.checkProjectAndAuth(loginUser, project,
projectCode)).thenReturn(result);
+
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
+
Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
+ result = loggerService.queryLog(loginUser, projectCode, 1, 1, 1);
+ Assert.assertEquals(Status.SUCCESS.getCode(), ((Status)
result.get(Constants.STATUS)).getCode());
+ }
+
+ @Test
+ public void testGetLogBytesInSpecifiedProject() {
+ long projectCode = 1L;
+
Mockito.when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode));
+ Project project = getProject(projectCode);
+
+ User loginUser = new User();
+ loginUser.setId(-1);
+ loginUser.setUserType(UserType.GENERAL_USER);
+ Map<String, Object> result = new HashMap<>();
+ putMsg(result, Status.SUCCESS, projectCode);
+ TaskInstance taskInstance = new TaskInstance();
+ TaskDefinition taskDefinition = new TaskDefinition();
+ taskDefinition.setProjectCode(projectCode);
+ taskDefinition.setCode(1L);
+ //SUCCESS
+ taskInstance.setTaskCode(1L);
+ taskInstance.setId(1);
+ taskInstance.setHost("127.0.0.1:8080");
+ taskInstance.setLogPath("/temp/log");
+ Mockito.when(projectService.checkProjectAndAuth(loginUser, project,
projectCode)).thenReturn(result);
+
Mockito.when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
+
Mockito.when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
+ loggerService.getLogBytes(loginUser, projectCode, 1);
+ }
+
+
@After
public void close() {
this.loggerService.close();
}
+ /**
+ * get mock Project
+ *
+ * @param projectCode projectCode
+ * @return Project
+ */
+ private Project getProject(long projectCode) {
+ Project project = new Project();
+ project.setCode(projectCode);
+ project.setId(1);
+ project.setName("test");
+ project.setUserId(1);
+ return project;
+ }
+
+ private void putMsg(Map<String, Object> result, Status status, Object...
statusParams) {
+ result.put(Constants.STATUS, status);
+ if (statusParams != null && statusParams.length > 0) {
+ result.put(Constants.MSG, MessageFormat.format(status.getMsg(),
statusParams));
+ } else {
+ result.put(Constants.MSG, status.getMsg());
+ }
+ }
}
\ No newline at end of file
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
index 2229eaf..68fc815 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessDefinitionServiceTest.java
@@ -573,17 +573,17 @@ public class ProcessDefinitionServiceTest {
public void testViewTree() {
//process definition not exist
ProcessDefinition processDefinition = getProcessDefinition();
- Map<String, Object> processDefinitionNullRes =
processDefinitionService.viewTree(46, 10);
+ Map<String, Object> processDefinitionNullRes =
processDefinitionService.viewTree(processDefinition.getProjectCode(),46, 10);
Assert.assertEquals(Status.PROCESS_DEFINE_NOT_EXIST,
processDefinitionNullRes.get(Constants.STATUS));
//task instance not exist
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new
DAG<>());
- Map<String, Object> taskNullRes =
processDefinitionService.viewTree(46, 10);
+ Map<String, Object> taskNullRes =
processDefinitionService.viewTree(processDefinition.getProjectCode(),46, 10);
Assert.assertEquals(Status.SUCCESS, taskNullRes.get(Constants.STATUS));
//task instance exist
- Map<String, Object> taskNotNuLLRes =
processDefinitionService.viewTree(46, 10);
+ Map<String, Object> taskNotNuLLRes =
processDefinitionService.viewTree(processDefinition.getProjectCode(),46, 10);
Assert.assertEquals(Status.SUCCESS,
taskNotNuLLRes.get(Constants.STATUS));
}
@@ -592,7 +592,7 @@ public class ProcessDefinitionServiceTest {
ProcessDefinition processDefinition = getProcessDefinition();
Mockito.when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
Mockito.when(processService.genDagGraph(processDefinition)).thenReturn(new
DAG<>());
- Map<String, Object> taskNotNuLLRes =
processDefinitionService.viewTree(46, 10);
+ Map<String, Object> taskNotNuLLRes =
processDefinitionService.viewTree(processDefinition.getProjectCode(), 46, 10);
Assert.assertEquals(Status.SUCCESS,
taskNotNuLLRes.get(Constants.STATUS));
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
index ccce682..eba085b 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/ProcessInstanceServiceTest.java
@@ -40,6 +40,7 @@ import
org.apache.dolphinscheduler.dao.entity.ProcessDefinition;
import org.apache.dolphinscheduler.dao.entity.ProcessDefinitionLog;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.Tenant;
import org.apache.dolphinscheduler.dao.entity.User;
@@ -48,6 +49,7 @@ import
org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionLogMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.ProcessInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.dao.mapper.TenantMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -111,6 +113,8 @@ public class ProcessInstanceServiceTest {
@Mock
TenantMapper tenantMapper;
+ @Mock
+ TaskDefinitionMapper taskDefinitionMapper;
private String shellJson =
"[{\"name\":\"\",\"preTaskCode\":0,\"preTaskVersion\":0,\"postTaskCode\":123456789,"
+
"\"postTaskVersion\":1,\"conditionType\":0,\"conditionParams\":\"{}\"},{\"name\":\"\",\"preTaskCode\":123456789,"
@@ -255,6 +259,7 @@ public class ProcessInstanceServiceTest {
ProcessInstance processInstance = getProcessInstance();
putMsg(result, Status.SUCCESS, projectCode);
ProcessDefinition processDefinition = getProcessDefinition();
+ processDefinition.setProjectCode(projectCode);
when(projectMapper.queryByCode(projectCode)).thenReturn(project);
when(projectService.checkProjectAndAuth(loginUser, project,
projectCode)).thenReturn(result);
when(processService.findProcessInstanceDetailById(processInstance.getId())).thenReturn(processInstance);
@@ -347,6 +352,9 @@ public class ProcessInstanceServiceTest {
taskInstance.setProcessInstanceId(1);
putMsg(result, Status.SUCCESS, projectCode);
when(processService.findTaskInstanceById(1)).thenReturn(taskInstance);
+ TaskDefinition taskDefinition = new TaskDefinition();
+ taskDefinition.setProjectCode(projectCode);
+
when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition);
Map<String, Object> notSubprocessRes =
processInstanceService.querySubProcessInstanceByTaskId(loginUser, projectCode,
1);
Assert.assertEquals(Status.TASK_INSTANCE_NOT_SUB_WORKFLOW_INSTANCE,
notSubprocessRes.get(Constants.STATUS));
@@ -410,6 +418,7 @@ public class ProcessInstanceServiceTest {
ProcessDefinition processDefinition = getProcessDefinition();
processDefinition.setId(1);
processDefinition.setUserId(1);
+ processDefinition.setProjectCode(projectCode);
Tenant tenant = getTenant();
when(processDefineMapper.queryByCode(46L)).thenReturn(processDefinition);
when(tenantMapper.queryByTenantCode("root")).thenReturn(tenant);
@@ -497,7 +506,7 @@ public class ProcessInstanceServiceTest {
processInstance.setScheduleTime(new Date());
processInstance.setGlobalParams("");
when(processInstanceMapper.queryDetailById(1)).thenReturn(processInstance);
- Map<String, Object> successRes =
processInstanceService.viewVariables(1);
+ Map<String, Object> successRes =
processInstanceService.viewVariables(1L,1);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
@@ -522,7 +531,7 @@ public class ProcessInstanceServiceTest {
when(processService.genDagGraph(Mockito.any(ProcessDefinition.class)))
.thenReturn(graph);
- Map<String, Object> successRes = processInstanceService.viewGantt(1);
+ Map<String, Object> successRes = processInstanceService.viewGantt(0L,
1);
Assert.assertEquals(Status.SUCCESS, successRes.get(Constants.STATUS));
}
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
index 238d1d0..2f8079c 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/SchedulerServiceTest.java
@@ -103,6 +103,7 @@ public class SchedulerServiceTest {
Project project = getProject(projectName, projectCode);
ProcessDefinition processDefinition = new ProcessDefinition();
+ processDefinition.setProjectCode(projectCode);
Schedule schedule = new Schedule();
schedule.setId(1);
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
index d8852b4..92f1e5f 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskDefinitionServiceImplTest.java
@@ -317,6 +317,7 @@ public class TaskDefinitionServiceImplTest {
Mockito.when(taskDefinitionMapper.queryByCode(taskCode)).thenReturn(taskDefinition);
TaskDefinitionLog taskDefinitionLog = new
TaskDefinitionLog(taskDefinition);
Mockito.when(taskDefinitionLogMapper.queryByDefinitionCodeAndVersion(taskCode,
taskDefinition.getVersion())).thenReturn(taskDefinitionLog);
+
Map<String, Object> offlineTaskResult =
taskDefinitionService.releaseTaskDefinition(loginUser, projectCode, taskCode,
ReleaseState.OFFLINE);
Assert.assertEquals(Status.SUCCESS,
offlineTaskResult.get(Constants.STATUS));
diff --git
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
index 08fdf5f..5544a18 100644
---
a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
+++
b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/TaskInstanceServiceTest.java
@@ -32,9 +32,11 @@ import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.dao.entity.ProcessInstance;
import org.apache.dolphinscheduler.dao.entity.Project;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.mapper.ProjectMapper;
+import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper;
import org.apache.dolphinscheduler.dao.mapper.TaskInstanceMapper;
import org.apache.dolphinscheduler.service.process.ProcessService;
@@ -81,6 +83,9 @@ public class TaskInstanceServiceTest {
@Mock
UsersService usersService;
+ @Mock
+ TaskDefinitionMapper taskDefinitionMapper;
+
@Test
public void queryTaskListPaging() {
long projectCode = 1L;
@@ -249,6 +254,9 @@ public class TaskInstanceServiceTest {
// test task not found
when(projectService.checkProjectAndAuth(user, project,
projectCode)).thenReturn(mockSuccess);
when(taskInstanceMapper.selectById(Mockito.anyInt())).thenReturn(null);
+ TaskDefinition taskDefinition = new TaskDefinition();
+ taskDefinition.setProjectCode(projectCode);
+
when(taskDefinitionMapper.queryByCode(task.getTaskCode())).thenReturn(taskDefinition);
Map<String, Object> taskNotFoundRes =
taskInstanceService.forceTaskSuccess(user, projectCode, taskId);
Assert.assertEquals(Status.TASK_INSTANCE_NOT_FOUND,
taskNotFoundRes.get(Constants.STATUS));
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
index 038ed5d..51ccb73 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.java
@@ -73,9 +73,10 @@ public interface ProcessDefinitionLogMapper extends
BaseMapper<ProcessDefinition
*
* @param page pagination info
* @param code process definition code
+ * @param projectCode project code
* @return the paging process definition version list
*/
- IPage<ProcessDefinitionLog>
queryProcessDefinitionVersionsPaging(Page<ProcessDefinitionLog> page,
@Param("code") long code);
+ IPage<ProcessDefinitionLog>
queryProcessDefinitionVersionsPaging(Page<ProcessDefinitionLog> page,
@Param("code") long code, @Param("projectCode") long projectCode);
/**
* delete the certain process definition version by process definition id
and version number
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
index 7be58a7..51f6029 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.java
@@ -213,14 +213,19 @@ public interface ProcessInstanceMapper extends
BaseMapper<ProcessInstance> {
/**
* query top n process instance order by running duration
*
+ * @param size size
+ * @param startTime start time
+ * @param startTime end time
* @param status process instance status
+ * @param projectCode project code
* @return ProcessInstance list
*/
List<ProcessInstance> queryTopNProcessInstance(@Param("size") int size,
@Param("startTime") Date
startTime,
@Param("endTime") Date
endTime,
- @Param("status")
ExecutionStatus status);
+ @Param("status")
ExecutionStatus status,
+ @Param("projectCode") long
projectCode);
/**
* query process instance by processDefinitionCode and stateArray
diff --git
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
index 70ca9f7..ab2620f 100644
---
a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
+++
b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.java
@@ -78,8 +78,9 @@ public interface TaskDefinitionLogMapper extends
BaseMapper<TaskDefinitionLog> {
* query the paging task definition version list by pagination info
*
* @param page pagination info
+ * @param projectCode project code
* @param code process definition code
* @return the paging task definition version list
*/
- IPage<TaskDefinitionLog>
queryTaskDefinitionVersionsPaging(Page<TaskDefinitionLog> page, @Param("code")
long code);
+ IPage<TaskDefinitionLog>
queryTaskDefinitionVersionsPaging(Page<TaskDefinitionLog> page, @Param("code")
long code, @Param("projectCode") long projectCode);
}
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
index 40afa04..3abff6c 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapper.xml
@@ -70,6 +70,9 @@
<include refid="baseSql"/>
from t_ds_process_definition_log
where code = #{code}
+ <if test="projectCode != 0">
+ and project_code = #{projectCode}
+ </if>
order by version desc
</select>
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
index 08db3af..77d96b5 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapper.xml
@@ -51,6 +51,9 @@
<include refid="baseSql"/>
from t_ds_process_instance
where state = #{status}
+ <if test="projectCode != 0">
+ and project_code = #{projectCode}
+ </if>
and start_time between
#{startTime} and #{endTime}
order by end_time-start_time desc
diff --git
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
index da6c0ea..0615167 100644
---
a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
+++
b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskDefinitionLogMapper.xml
@@ -73,6 +73,9 @@
<include refid="baseSql"/>
from t_ds_task_definition_log
where code = #{code}
+ <if test="projectCode != 0">
+ and project_code = #{projectCode}
+ </if>
order by version desc
</select>
</mapper>
diff --git
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapperTest.java
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapperTest.java
index f0981b0..5ce86b1 100644
---
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapperTest.java
+++
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessDefinitionLogMapperTest.java
@@ -159,7 +159,7 @@ public class ProcessDefinitionLogMapperTest {
public void testQueryProcessDefinitionVersionsPaging() {
insertOne();
Page<ProcessDefinitionLog> page = new Page(1, 3);
- IPage<ProcessDefinitionLog> processDefinitionLogs =
processDefinitionLogMapper.queryProcessDefinitionVersionsPaging(page, 1L);
+ IPage<ProcessDefinitionLog> processDefinitionLogs =
processDefinitionLogMapper.queryProcessDefinitionVersionsPaging(page, 1L,1L);
Assert.assertNotEquals(processDefinitionLogs.getTotal(), 0);
}
diff --git
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
index a492beb..613bb8b 100644
---
a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
+++
b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/ProcessInstanceMapperTest.java
@@ -393,7 +393,7 @@ public class ProcessInstanceMapperTest {
ProcessInstance processInstance3 = insertOne(startTime3, endTime3);
Date start = new Date(2020, 1, 1, 1, 1, 1);
Date end = new Date(2021, 1, 1, 1, 1, 1);
- List<ProcessInstance> processInstances =
processInstanceMapper.queryTopNProcessInstance(2, start, end,
ExecutionStatus.SUCCESS);
+ List<ProcessInstance> processInstances =
processInstanceMapper.queryTopNProcessInstance(2, start, end,
ExecutionStatus.SUCCESS,0L);
Assert.assertEquals(2, processInstances.size());
Assert.assertTrue(isSortedByDuration(processInstances));
for (ProcessInstance processInstance : processInstances) {
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
index 4e3349a..640abb7 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteThread.java
@@ -241,7 +241,7 @@ public class WorkflowExecuteThread implements Runnable {
}
private void handleEvents() {
- while (this.stateEvents.size() > 0) {
+ while (!this.stateEvents.isEmpty()) {
try {
StateEvent stateEvent = this.stateEvents.peek();