This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new 492b318  [Fix][Server] Fix clear task execute path is related to 
master (#5123)
492b318 is described below

commit 492b318bd321d35247488e1f181e3ea9d1259963
Author: Shiwen Cheng <[email protected]>
AuthorDate: Fri Mar 26 10:11:56 2021 +0800

    [Fix][Server] Fix clear task execute path is related to master (#5123)
---
 .../dolphinscheduler/common/utils/FileUtils.java   | 23 +----------
 .../common/utils/FileUtilsTest.java                |  4 +-
 .../server/master/runner/MasterExecThread.java     | 30 ---------------
 .../worker/processor/TaskExecuteProcessor.java     |  2 +-
 .../server/worker/runner/TaskExecuteThread.java    | 45 ++++++++++++++++++----
 .../server/master/MasterExecThreadTest.java        |  6 +--
 6 files changed, 43 insertions(+), 67 deletions(-)

diff --git 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
index 0dcfbdd..ae6291a 100644
--- 
a/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
+++ 
b/dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/utils/FileUtils.java
@@ -121,27 +121,8 @@ public class FileUtils {
      * @return directory of process execution
      */
     public static String getProcessExecDir(int projectId, int processDefineId, 
int processInstanceId, int taskInstanceId) {
-        String fileName = String.format("%s/exec/process/%s/%s/%s/%s", 
DATA_BASEDIR, Integer.toString(projectId),
-                Integer.toString(processDefineId), 
Integer.toString(processInstanceId), Integer.toString(taskInstanceId));
-        File file = new File(fileName);
-        if (!file.getParentFile().exists()) {
-            file.getParentFile().mkdirs();
-        }
-
-        return fileName;
-    }
-
-    /**
-     * directory of process instances
-     *
-     * @param projectId project id
-     * @param processDefineId process definition id
-     * @param processInstanceId process instance id
-     * @return directory of process instances
-     */
-    public static String getProcessExecDir(int projectId, int processDefineId, 
int processInstanceId) {
-        String fileName = String.format("%s/exec/process/%s/%s/%s", 
DATA_BASEDIR, Integer.toString(projectId),
-                Integer.toString(processDefineId), 
Integer.toString(processInstanceId));
+        String fileName = String.format("%s/exec/process/%d/%d/%d/%d", 
DATA_BASEDIR,
+                projectId, processDefineId, processInstanceId, taskInstanceId);
         File file = new File(fileName);
         if (!file.getParentFile().exists()) {
             file.getParentFile().mkdirs();
diff --git 
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java
 
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java
index a4a39ae..a1ddef1 100644
--- 
a/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java
+++ 
b/dolphinscheduler-common/src/test/java/org/apache/dolphinscheduler/common/utils/FileUtilsTest.java
@@ -60,10 +60,8 @@ public class FileUtilsTest {
 
     @Test
     public void testGetProcessExecDir() {
-        String dir = FileUtils.getProcessExecDir(1,2,3, 4);
+        String dir = FileUtils.getProcessExecDir(1, 2, 3, 4);
         Assert.assertEquals("/tmp/dolphinscheduler/exec/process/1/2/3/4", dir);
-        dir = FileUtils.getProcessExecDir(1,2,3);
-        Assert.assertEquals("/tmp/dolphinscheduler/exec/process/1/2/3", dir);
     }
 
     @Test
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
index b7a4d00..b9ad8f3 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/master/runner/MasterExecThread.java
@@ -42,7 +42,6 @@ import org.apache.dolphinscheduler.common.process.Property;
 import org.apache.dolphinscheduler.common.thread.Stopper;
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.common.utils.CollectionUtils;
-import org.apache.dolphinscheduler.common.utils.CommonUtils;
 import org.apache.dolphinscheduler.common.utils.DateUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
@@ -61,16 +60,11 @@ import 
org.apache.dolphinscheduler.service.process.ProcessService;
 import org.apache.dolphinscheduler.service.quartz.cron.CronUtils;
 import org.apache.dolphinscheduler.service.queue.PeerTaskInstancePriorityQueue;
 
-import org.apache.commons.io.FileUtils;
-
-import java.io.File;
-import java.io.IOException;
 import java.text.ParseException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Date;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -78,7 +72,6 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
-import java.util.stream.Collectors;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -236,8 +229,6 @@ public class MasterExecThread implements Runnable {
             processService.updateProcessInstance(processInstance);
         } finally {
             taskExecService.shutdown();
-            // post handle
-            postHandle();
         }
     }
 
@@ -428,27 +419,6 @@ public class MasterExecThread implements Runnable {
     }
 
     /**
-     * process post handle
-     */
-    private void postHandle() {
-        logger.info("develop mode is: {}", CommonUtils.isDevelopMode());
-
-        if (!CommonUtils.isDevelopMode()) {
-            // get exec dir
-            String execLocalPath = 
org.apache.dolphinscheduler.common.utils.FileUtils
-                    
.getProcessExecDir(processInstance.getProcessDefinition().getProjectId(),
-                            processInstance.getProcessDefinitionId(),
-                            processInstance.getId());
-
-            try {
-                FileUtils.deleteDirectory(new File(execLocalPath));
-            } catch (IOException e) {
-                logger.error("delete exec dir failed ", e);
-            }
-        }
-    }
-
-    /**
      * submit task to execute
      *
      * @param taskInstance task instance
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
index cfd2c3f..f03d86b 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteProcessor.java
@@ -141,7 +141,7 @@ public class TaskExecuteProcessor implements 
NettyRequestProcessor {
 
         // local execute path
         String execLocalPath = getExecLocalPath(taskExecutionContext);
-        logger.info("task instance  local execute path : {} ", execLocalPath);
+        logger.info("task instance local execute path : {}", execLocalPath);
         taskExecutionContext.setExecutePath(execLocalPath);
 
         FileUtils.taskLoggerThreadLocal.set(taskLogger);
diff --git 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
index c036ac9..409c2b7 100644
--- 
a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
+++ 
b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/runner/TaskExecuteThread.java
@@ -29,6 +29,7 @@ import org.apache.dolphinscheduler.common.utils.HadoopUtils;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
 import org.apache.dolphinscheduler.common.utils.OSUtils;
 import org.apache.dolphinscheduler.common.utils.RetryerUtils;
+import org.apache.dolphinscheduler.common.utils.StringUtils;
 import org.apache.dolphinscheduler.remote.command.Command;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteAckCommand;
 import org.apache.dolphinscheduler.remote.command.TaskExecuteResponseCommand;
@@ -45,6 +46,7 @@ import 
org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.commons.collections.MapUtils;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -71,17 +73,17 @@ public class TaskExecuteThread implements Runnable, Delayed 
{
     private final Logger logger = 
LoggerFactory.getLogger(TaskExecuteThread.class);
 
     /**
-     *  task instance
+     * task instance
      */
     private TaskExecutionContext taskExecutionContext;
 
     /**
-     *  abstract task
+     * abstract task
      */
     private AbstractTask task;
 
     /**
-     *  task callback service
+     * task callback service
      */
     private TaskCallbackService taskCallbackService;
 
@@ -185,9 +187,38 @@ public class TaskExecuteThread implements Runnable, 
Delayed {
             responseCommand.setAppIds(task.getAppIds());
         } finally {
             
taskExecutionContextCacheManager.removeByTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-            
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(),responseCommand.convert2Command(),Event.RESULT);
+            
ResponceCache.get().cache(taskExecutionContext.getTaskInstanceId(), 
responseCommand.convert2Command(), Event.RESULT);
             
taskCallbackService.sendResult(taskExecutionContext.getTaskInstanceId(), 
responseCommand.convert2Command());
+            clearTaskExecPath();
+        }
+    }
+
+    /**
+     * when task finish, clear execute path.
+     */
+    private void clearTaskExecPath() {
+        logger.info("develop mode is: {}", CommonUtils.isDevelopMode());
+
+        if (!CommonUtils.isDevelopMode()) {
+            // get exec dir
+            String execLocalPath = taskExecutionContext.getExecutePath();
+
+            if (StringUtils.isEmpty(execLocalPath)) {
+                logger.warn("task: {} exec local path is empty.", 
taskExecutionContext.getTaskName());
+                return;
+            }
 
+            if ("/".equals(execLocalPath)) {
+                logger.warn("task: {} exec local path is '/', direct deletion 
is not allowed", taskExecutionContext.getTaskName());
+                return;
+            }
+
+            try {
+                org.apache.commons.io.FileUtils.deleteDirectory(new 
File(execLocalPath));
+                logger.info("exec local path: {} cleared.", execLocalPath);
+            } catch (IOException e) {
+                logger.error("delete exec dir failed : {}", e.getMessage(), e);
+            }
         }
     }
 
@@ -196,7 +227,7 @@ public class TaskExecuteThread implements Runnable, Delayed 
{
      * @return
      */
     private Map<String, String> getGlobalParamsMap() {
-        Map<String,String> globalParamsMap = new HashMap<>(16);
+        Map<String, String> globalParamsMap = new HashMap<>(16);
 
         // global params string
         String globalParamsStr = taskExecutionContext.getGlobalParams();
@@ -241,7 +272,7 @@ public class TaskExecuteThread implements Runnable, Delayed 
{
     }
 
     /**
-     *  kill task
+     * kill task
      */
     public void kill() {
         if (task != null) {
@@ -261,7 +292,7 @@ public class TaskExecuteThread implements Runnable, Delayed 
{
      * @param logger
      */
     private void downloadResource(String execLocalPath,
-                                  Map<String,String> projectRes,
+                                  Map<String, String> projectRes,
                                   Logger logger) throws Exception {
         if (MapUtils.isEmpty(projectRes)) {
             return;
diff --git 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
index 6979a93..cdea252 100644
--- 
a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
+++ 
b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/master/MasterExecThreadTest.java
@@ -102,16 +102,12 @@ public class MasterExecThreadTest {
         processDefinition.setGlobalParamList(Collections.EMPTY_LIST);
         
Mockito.when(processInstance.getProcessDefinition()).thenReturn(processDefinition);
 
-        masterExecThread = PowerMockito.spy(new MasterExecThread(
-                processInstance
-                , processService
-                , null, null, config));
+        masterExecThread = PowerMockito.spy(new 
MasterExecThread(processInstance, processService, null, null, config));
         // prepareProcess init dag
         Field dag = MasterExecThread.class.getDeclaredField("dag");
         dag.setAccessible(true);
         dag.set(masterExecThread, new DAG());
         PowerMockito.doNothing().when(masterExecThread, "executeProcess");
-        PowerMockito.doNothing().when(masterExecThread, "postHandle");
         PowerMockito.doNothing().when(masterExecThread, "prepareProcess");
         PowerMockito.doNothing().when(masterExecThread, "runProcess");
         PowerMockito.doNothing().when(masterExecThread, "endProcess");

Reply via email to