This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new 492b318 [Fix][Server] Fix clear task execute path is related to
master (#5123)
492b318 is described below
commit 492b318bd321d35247488e1f181e3ea9d1259963
Author: Shiwen Cheng <[email protected]>
AuthorDate: Fri Mar 26 10:11:56 2021 +0800
[Fix][Server] Fix clear task execute path is related to master (#5123)
---
.../dolphinscheduler/common/utils/FileUtils.java | 23 +----------
.../common/utils/FileUtilsTest.java | 4 +-
.../server/master/runner/MasterExecThread.java | 30 ---------------
.../worker/processor/TaskExecuteProcessor.java | 2 +-
.../server/worker/runner/TaskExecuteThread.java | 45 ++++++++++++++++++----
.../server/master/MasterExecThreadTest.java | 6 +--
6 files changed, 43 insertions(+), 67 deletions(-)
diff --git
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
index 0dcfbdd..ae6291a 100644
---
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
+++
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
@@ -121,27 +121,8 @@ public class FileUtils {
* @return directory of process execution
*/
public static String getProcessExecDir(int projectId, int processDefineId,
int processInstanceId, int taskInstanceId) {
- String fileName = String.format("%s/exec/process/%s/%s/%s/%s",
DATA_BASEDIR, Integer.toString(projectId),
- Integer.toString(processDefineId),
Integer.toString(processInstanceId), Integer.toString(taskInstanceId));
- File file = new File(fileName);
- if (!file.getParentFile().exists()) {
- file.getParentFile().mkdirs();
- }
-
- return fileName;
- }
-
- /**
- * directory of process instances
- *
- * @param projectId project id
- * @param processDefineId process definition id
- * @param processInstanceId process instance id
- * @return directory of process instances
- */
- public static String getProcessExecDir(int projectId, int processDefineId,
int processInstanceId) {
- String fileName = String.format("%s/exec/process/%s/%s/%s",
DATA_BASEDIR, Integer.toString(projectId),
- Integer.toString(processDefineId),
Integer.toString(processInstanceId));
+ String fileName = String.format("%s/exec/process/%d/%d/%d/%d",
DATA_BASEDIR,
+ projectId, processDefineId, processInstanceId, taskInstanceId);
File file = new File(fileName);
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
diff --git
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java
index a4a39ae..a1ddef1 100644
---
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java
+++
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java
@@ -60,10 +60,8 @@ public class FileUtilsTest {
@Test
public void testGetProcessExecDir() {
- String dir = FileUtils.getProcessExecDir(1,2,3, 4);
+ String dir = FileUtils.getProcessExecDir(1, 2, 3, 4);
Assert.assertEquals("/tmp/dolphinscheduler/exec/process/1/2/3/4", dir);
- dir = FileUtils.getProcessExecDir(1,2,3);
- Assert.assertEquals("/tmp/dolphinscheduler/exec/process/1/2/3", dir);
}
@Test
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index b7a4d00..b9ad8f3 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -42,7 +42,6 @@ import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.thread.Stopper;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.CommonUtils;
import org.apache.dolphinscheduler.common.utils.DateUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
@@ -61,16 +60,11 @@ import
org.apache.dolphinscheduler.service.process.ProcessService;
import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
-import org.apache.commons.io.FileUtils;
-
-import java.io.File;
-import java.io.IOException;
import java.text.ParseException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -78,7 +72,6 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -236,8 +229,6 @@ public class MasterExecThread implements Runnable {
processService.updateProcessInstance(processInstance);
} finally {
taskExecService.shutdown();
- // post handle
- postHandle();
}
}
@@ -428,27 +419,6 @@ public class MasterExecThread implements Runnable {
}
/**
- * process post handle
- */
- private void postHandle() {
- logger.info("develop mode is: {}", CommonUtils.isDevelopMode());
-
- if (!CommonUtils.isDevelopMode()) {
- // get exec dir
- String execLocalPath =
org.apache.dolphinscheduler.common.utils.FileUtils
-
.getProcessExecDir(processInstance.getProcessDefinition().getProjectId(),
- processInstance.getProcessDefinitionId(),
- processInstance.getId());
-
- try {
- FileUtils.deleteDirectory(new File(execLocalPath));
- } catch (IOException e) {
- logger.error("delete exec dir failed ", e);
- }
- }
- }
-
- /**
* submit task to execute
*
* @param taskInstance task instance
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index cfd2c3f..f03d86b 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -141,7 +141,7 @@ public class TaskExecuteProcessor implements
NettyRequestProcessor {
// local execute path
String execLocalPath = getExecLocalPath(taskExecutionContext);
- logger.info("task instance local execute path : {} ", execLocalPath);
+ logger.info("task instance local execute path : {}", execLocalPath);
taskExecutionContext.setExecutePath(execLocalPath);
FileUtils.taskLoggerThreadLocal.set(taskLogger);
diff --git
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index c036ac9..409c2b7 100644
---
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.common.utils.HadoopUtils;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.OSUtils;
import org.apache.dolphinscheduler.common.utils.RetryerUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.remote.command.Command;
import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@@ -45,6 +46,7 @@ import
org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
import org.apache.commons.collections.MapUtils;
import java.io.File;
+import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -71,17 +73,17 @@ public class TaskExecuteThread implements Runnable, Delayed
{
private final Logger logger =
LoggerFactory.getLogger(TaskExecuteThread.class);
/**
- * task instance
+ * task instance
*/
private TaskExecutionContext taskExecutionContext;
/**
- * abstract task
+ * abstract task
*/
private AbstractTask task;
/**
- * task callback service
+ * task callback service
*/
private TaskCallbackService taskCallbackService;
@@ -185,9 +187,38 @@ public class TaskExecuteThread implements Runnable,
Delayed {
responseCommand.setAppIds(task.getAppIds());
} finally {
taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),responseCommand.convert2Command(),Event.RESULT);
+
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),
responseCommand.convert2Command(), Event.RESULT);
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(),
responseCommand.convert2Command());
+ clearTaskExecPath();
+ }
+ }
+
+ /**
+ * when task finish, clear execute path.
+ */
+ private void clearTaskExecPath() {
+ logger.info("develop mode is: {}", CommonUtils.isDevelopMode());
+
+ if (!CommonUtils.isDevelopMode()) {
+ // get exec dir
+ String execLocalPath = taskExecutionContext.getExecutePath();
+
+ if (StringUtils.isEmpty(execLocalPath)) {
+ logger.warn("task: {} exec local path is empty.",
taskExecutionContext.getTaskName());
+ return;
+ }
+ if ("/".equals(execLocalPath)) {
+ logger.warn("task: {} exec local path is '/', direct deletion
is not allowed", taskExecutionContext.getTaskName());
+ return;
+ }
+
+ try {
+ org.apache.commons.io.FileUtils.deleteDirectory(new
File(execLocalPath));
+ logger.info("exec local path: {} cleared.", execLocalPath);
+ } catch (IOException e) {
+ logger.error("delete exec dir failed : {}", e.getMessage(), e);
+ }
}
}
@@ -196,7 +227,7 @@ public class TaskExecuteThread implements Runnable, Delayed
{
* @return
*/
private Map<String, String> getGlobalParamsMap() {
- Map<String,String> globalParamsMap = new HashMap<>(16);
+ Map<String, String> globalParamsMap = new HashMap<>(16);
// global params string
String globalParamsStr = taskExecutionContext.getGlobalParams();
@@ -241,7 +272,7 @@ public class TaskExecuteThread implements Runnable, Delayed
{
}
/**
- * kill task
+ * kill task
*/
public void kill() {
if (task != null) {
@@ -261,7 +292,7 @@ public class TaskExecuteThread implements Runnable, Delayed
{
* @param logger
*/
private void downloadResource(String execLocalPath,
- Map<String,String> projectRes,
+ Map<String, String> projectRes,
Logger logger) throws Exception {
if (MapUtils.isEmpty(projectRes)) {
return;
diff --git
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
index 6979a93..cdea252 100644
---
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
+++
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
@@ -102,16 +102,12 @@ public class MasterExecThreadTest {
processDefinition.setGlobalParamList(Collections.EMPTY_LIST);
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
- masterExecThread = PowerMockito.spy(new MasterExecThread(
- processInstance
- , processService
- , null, null, config));
+ masterExecThread = PowerMockito.spy(new
MasterExecThread(processInstance, processService, null, null, config));
// prepareProcess init dag
Field dag = MasterExecThread.class.getDeclaredField("dag");
dag.setAccessible(true);
dag.set(masterExecThread, new DAG());
PowerMockito.doNothing().when(masterExecThread, "executeProcess");
- PowerMockito.doNothing().when(masterExecThread, "postHandle");
PowerMockito.doNothing().when(masterExecThread, "prepareProcess");
PowerMockito.doNothing().when(masterExecThread, "runProcess");
PowerMockito.doNothing().when(masterExecThread, "endProcess");