This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new b184364561 Fix WorkerTaskExecutorThreadPool#isOverload is incorrect 
(#16027)
b184364561 is described below

commit b18436456115b09235af262012e537bae0f5200c
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon May 20 19:13:53 2024 +0800

    Fix WorkerTaskExecutorThreadPool#isOverload is incorrect (#16027)
---
 .../worker/runner/WorkerTaskExecutorHolder.java    |   4 +
 .../runner/WorkerTaskExecutorThreadPool.java       |  11 +-
 .../TaskInstanceKillOperationFunction.java         |   4 +-
 .../runner/WorkerTaskExecutorThreadPoolTest.java   | 252 ++++++++++-----------
 4 files changed, 131 insertions(+), 140 deletions(-)

diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorHolder.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorHolder.java
index fa07dbfde6..e44b23152c 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorHolder.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorHolder.java
@@ -48,6 +48,10 @@ public class WorkerTaskExecutorHolder {
         workerTaskExecutorMap.clear();
     }
 
+    public static int size() {
+        return workerTaskExecutorMap.size();
+    }
+
     public static Collection<WorkerTaskExecutor> getAllTaskExecutor() {
         return workerTaskExecutorMap.values();
     }
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
index 4606d9f7e9..99645f89a4 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
@@ -39,6 +39,7 @@ public class WorkerTaskExecutorThreadPool {
     public WorkerTaskExecutorThreadPool(WorkerConfig workerConfig) {
         this.threadPoolExecutor =
                 
ThreadUtils.newDaemonFixedThreadExecutor("WorkerTaskExecutorThreadPool", 
workerConfig.getExecThreads());
+        threadPoolExecutor.prestartAllCoreThreads();
         this.workerConfig = workerConfig;
 
         
WorkerServerMetrics.registerWorkerExecuteQueueSizeGauge(this::getWaitingTaskExecutorSize);
@@ -64,15 +65,19 @@ public class WorkerTaskExecutorThreadPool {
     }
 
     public boolean isOverload() {
-        return threadPoolExecutor.getQueue().size() > 0;
+        return WorkerTaskExecutorHolder.size() >= 
workerConfig.getExecThreads();
     }
 
     public int getWaitingTaskExecutorSize() {
-        return threadPoolExecutor.getQueue().size();
+        if (WorkerTaskExecutorHolder.size() <= workerConfig.getExecThreads()) {
+            return 0;
+        } else {
+            return WorkerTaskExecutorHolder.size() - 
workerConfig.getExecThreads();
+        }
     }
 
     public int getRunningTaskExecutorSize() {
-        return threadPoolExecutor.getActiveCount();
+        return Math.min(WorkerTaskExecutorHolder.size(), 
workerConfig.getExecThreads());
     }
 
     /**
diff --git 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
index d55765d23f..dc1ea8b2e5 100644
--- 
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
+++ 
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
@@ -89,8 +89,8 @@ public class TaskInstanceKillOperationFunction
             taskExecutionContext
                     .setCurrentExecutionStatus(result ? 
TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);
 
-            
WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
-            
messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId());
+            WorkerTaskExecutorHolder.remove(taskInstanceId);
+            messageRetryRunner.removeRetryMessages(taskInstanceId);
             return TaskInstanceKillResponse.success(taskExecutionContext);
         } finally {
             LogUtils.removeTaskInstanceIdMDC();
diff --git 
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java
 
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java
index 182ac6a1c2..9f27ac4309 100644
--- 
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java
+++ 
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java
@@ -17,48 +17,127 @@
 
 package org.apache.dolphinscheduler.server.worker.runner;
 
-import org.apache.dolphinscheduler.common.enums.ResUploadType;
-import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
-import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import 
org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy;
 import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
 import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
 import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-import org.apache.dolphinscheduler.spi.enums.ResourceType;
 
-import java.io.IOException;
-import java.util.List;
+import org.apache.commons.lang3.RandomUtils;
 
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import com.google.common.truth.Truth;
+
 class WorkerTaskExecutorThreadPoolTest {
 
+    @BeforeEach
+    public void setUp() {
+        WorkerTaskExecutorHolder.clear();
+    }
+
     @Test
     public void testIsOverload() {
-        WorkerConfig workerConfig = new WorkerConfig();
-        workerConfig.setExecThreads(1);
-        
workerConfig.setTaskExecuteThreadsFullPolicy(TaskExecuteThreadsFullPolicy.CONTINUE);
+        final int execThreadCount = RandomUtils.nextInt(1, 100);
+        final int totalTaskCount = RandomUtils.nextInt(1, 10000);
+        final WorkerConfig workerConfig = createWorkerConfig(execThreadCount, 
TaskExecuteThreadsFullPolicy.CONTINUE);
+        final WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool =
+                new WorkerTaskExecutorThreadPool(workerConfig);
+        // submit totalTaskCount task, the thread pool size is 
execThreadCount, reject policy is CONTINUE
+        // after submit execThreadCount task, the thread pool is overload
+        for (int i = 1; i <= totalTaskCount; i++) {
+            MockWorkerTaskExecutor mockWorkerTaskExecutor =
+                    new MockWorkerTaskExecutor(() -> 
ThreadUtils.sleep(10_000L));
+            
workerTaskExecutorThreadPool.submitWorkerTaskExecutor(mockWorkerTaskExecutor);
+            if (i >= execThreadCount) {
+                
Truth.assertThat(workerTaskExecutorThreadPool.isOverload()).isTrue();
+            } else {
+                
Truth.assertThat(workerTaskExecutorThreadPool.isOverload()).isFalse();
+            }
+        }
+    }
+
+    @Test
+    public void testSubmitWorkerTaskExecutorWithContinuePolicy() {
+        final int execThreadCount = RandomUtils.nextInt(1, 100);
+        final int totalTaskCount = RandomUtils.nextInt(1, 10000);
+        final WorkerConfig workerConfig = createWorkerConfig(execThreadCount, 
TaskExecuteThreadsFullPolicy.CONTINUE);
+        final WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool =
+                new WorkerTaskExecutorThreadPool(workerConfig);
+        // submit totalTaskCount task, the thread pool size is 
execThreadCount, reject policy is CONTINUE
+        // all task will be submitted success
+        for (int i = 1; i <= totalTaskCount; i++) {
+            MockWorkerTaskExecutor mockWorkerTaskExecutor =
+                    new MockWorkerTaskExecutor(() -> 
ThreadUtils.sleep(10_000L));
+            
Truth.assertThat(workerTaskExecutorThreadPool.submitWorkerTaskExecutor(mockWorkerTaskExecutor)).isTrue();
+        }
+    }
+
+    @Test
+    public void testSubmitWorkerTaskExecutorWithRejectPolicy() {
+        final int execThreadCount = RandomUtils.nextInt(1, 100);
+        final int totalTaskCount = RandomUtils.nextInt(1, 10000);
+        final WorkerConfig workerConfig = createWorkerConfig(execThreadCount, 
TaskExecuteThreadsFullPolicy.REJECT);
+        final WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool =
+                new WorkerTaskExecutorThreadPool(workerConfig);
+        // submit totalTaskCount task, the thread pool size is 
execThreadCount, reject policy is REJECT
+        // only the front execThreadCount task will be submitted success
+        for (int i = 1; i <= totalTaskCount; i++) {
+            MockWorkerTaskExecutor mockWorkerTaskExecutor =
+                    new MockWorkerTaskExecutor(() -> 
ThreadUtils.sleep(10_000L));
+            boolean submitResult = 
workerTaskExecutorThreadPool.submitWorkerTaskExecutor(mockWorkerTaskExecutor);
+            if (i <= execThreadCount) {
+                Assertions.assertTrue(submitResult, "The " + i + " task should 
submit success");
+            } else {
+                Assertions.assertFalse(submitResult, "The " + i + " task 
should submit failed");
+            }
+        }
+    }
+
+    @Test
+    public void testGetWaitingTaskExecutorSize() {
+        final int execThreadCount = RandomUtils.nextInt(1, 100);
+        final int totalTaskCount = RandomUtils.nextInt(1, 10000);
+        final WorkerConfig workerConfig = createWorkerConfig(execThreadCount, 
TaskExecuteThreadsFullPolicy.CONTINUE);
+        final WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool =
+                new WorkerTaskExecutorThreadPool(workerConfig);
+
+        
Truth.assertThat(workerTaskExecutorThreadPool.getWaitingTaskExecutorSize()).isEqualTo(0);
+        for (int i = 1; i <= totalTaskCount; i++) {
+            MockWorkerTaskExecutor mockWorkerTaskExecutor =
+                    new MockWorkerTaskExecutor(() -> 
ThreadUtils.sleep(10_000L));
+            
workerTaskExecutorThreadPool.submitWorkerTaskExecutor(mockWorkerTaskExecutor);
+            if (i <= execThreadCount) {
+                
Truth.assertThat(workerTaskExecutorThreadPool.getWaitingTaskExecutorSize()).isEqualTo(0);
+            } else {
+                
Truth.assertThat(workerTaskExecutorThreadPool.getWaitingTaskExecutorSize())
+                        .isEqualTo(i - execThreadCount);
+            }
+        }
+    }
+
+    @Test
+    public void testGetRunningTaskExecutorSize() {
+        final int execThreadCount = RandomUtils.nextInt(1, 100);
+        final int totalTaskCount = RandomUtils.nextInt(1, 10000);
+        WorkerConfig workerConfig = createWorkerConfig(execThreadCount, 
TaskExecuteThreadsFullPolicy.CONTINUE);
         WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool = new 
WorkerTaskExecutorThreadPool(workerConfig);
-        // submit 100 task, the thread pool size is 1
-        // assert the overload should be true
-        // assert the submitQueue should be 99
-        for (int i = 0; i < 100; i++) {
-            boolean submitResult =
-                    workerTaskExecutorThreadPool.submitWorkerTaskExecutor(new 
MockWorkerTaskExecutor(() -> {
-                        try {
-                            Thread.sleep(10_000L);
-                        } catch (InterruptedException e) {
-                            throw new RuntimeException(e);
-                        }
-                    }));
-            Assertions.assertTrue(submitResult);
+
+        
Truth.assertThat(workerTaskExecutorThreadPool.getRunningTaskExecutorSize()).isEqualTo(0);
+        for (int i = 1; i <= totalTaskCount; i++) {
+            MockWorkerTaskExecutor mockWorkerTaskExecutor =
+                    new MockWorkerTaskExecutor(() -> 
ThreadUtils.sleep(10_000L));
+            
workerTaskExecutorThreadPool.submitWorkerTaskExecutor(mockWorkerTaskExecutor);
+            if (i <= execThreadCount) {
+                
Truth.assertThat(workerTaskExecutorThreadPool.getRunningTaskExecutorSize()).isEqualTo(i);
+            } else {
+                
Truth.assertThat(workerTaskExecutorThreadPool.getRunningTaskExecutorSize()).isEqualTo(execThreadCount);
+            }
         }
-        Assertions.assertTrue(workerTaskExecutorThreadPool.isOverload());
-        Assertions.assertEquals(99, 
workerTaskExecutorThreadPool.getWaitingTaskExecutorSize());
-        Assertions.assertEquals(1, 
workerTaskExecutorThreadPool.getRunningTaskExecutorSize());
     }
 
     static class MockWorkerTaskExecutor extends WorkerTaskExecutor {
@@ -66,116 +145,11 @@ class WorkerTaskExecutorThreadPoolTest {
         private final Runnable runnable;
 
         protected MockWorkerTaskExecutor(Runnable runnable) {
-            super(TaskExecutionContext.builder().taskInstanceId((int) 
System.nanoTime()).build(), new WorkerConfig(),
-                    new WorkerMessageSender(), new StorageOperate() {
-
-                        @Override
-                        public void createTenantDirIfNotExists(String 
tenantCode) {
-
-                        }
-
-                        @Override
-                        public String getResDir(String tenantCode) {
-                            return null;
-                        }
-
-                        @Override
-                        public String getUdfDir(String tenantCode) {
-                            return null;
-                        }
-
-                        @Override
-                        public boolean mkdir(String tenantCode, String path) 
throws IOException {
-                            return false;
-                        }
-
-                        @Override
-                        public String getResourceFullName(String tenantCode, 
String fileName) {
-                            return null;
-                        }
-
-                        @Override
-                        public String getResourceFileName(String tenantCode, 
String fullName) {
-                            return null;
-                        }
-
-                        @Override
-                        public String getFileName(ResourceType resourceType, 
String tenantCode, String fileName) {
-                            return null;
-                        }
-
-                        @Override
-                        public boolean exists(String fullName) {
-                            return false;
-                        }
-
-                        @Override
-                        public boolean delete(String filePath, boolean 
recursive) {
-                            return false;
-                        }
-
-                        @Override
-                        public boolean delete(String filePath, List<String> 
childrenPathArray,
-                                              boolean recursive) {
-                            return false;
-                        }
-
-                        @Override
-                        public boolean copy(String srcPath, String dstPath, 
boolean deleteSource,
-                                            boolean overwrite) {
-                            return false;
-                        }
-
-                        @Override
-                        public String getDir(ResourceType resourceType, String 
tenantCode) {
-                            return null;
-                        }
-
-                        @Override
-                        public boolean upload(String tenantCode, String 
srcFile, String dstPath, boolean deleteSource,
-                                              boolean overwrite) {
-                            return false;
-                        }
-
-                        @Override
-                        public void download(String srcFilePath, String 
dstFile, boolean overwrite) {
-
-                        }
-
-                        @Override
-                        public List<String> vimFile(String tenantCode, String 
filePath, int skipLineNums,
-                                                    int limit) {
-                            return null;
-                        }
-
-                        @Override
-                        public void deleteTenant(String tenantCode) {
-
-                        }
-
-                        @Override
-                        public ResUploadType returnStorageType() {
-                            return null;
-                        }
-
-                        @Override
-                        public List<StorageEntity> 
listFilesStatusRecursively(String path, String defaultPath,
-                                                                              
String tenantCode, ResourceType type) {
-                            return null;
-                        }
-
-                        @Override
-                        public List<StorageEntity> listFilesStatus(String 
path, String defaultPath, String tenantCode,
-                                                                   
ResourceType type) throws Exception {
-                            return null;
-                        }
-
-                        @Override
-                        public StorageEntity getFileStatus(String path, String 
defaultPath, String tenantCode,
-                                                           ResourceType type) 
throws Exception {
-                            return null;
-                        }
-                    }, new WorkerRegistryClient());
+            super(TaskExecutionContext.builder().taskInstanceId((int) 
System.nanoTime()).build(),
+                    new WorkerConfig(),
+                    new WorkerMessageSender(),
+                    null,
+                    new WorkerRegistryClient());
             this.runnable = runnable;
         }
 
@@ -190,4 +164,12 @@ class WorkerTaskExecutorThreadPoolTest {
         }
     }
 
+    private WorkerConfig createWorkerConfig(int execThreads,
+                                            TaskExecuteThreadsFullPolicy 
taskExecuteThreadsFullPolicy) {
+        WorkerConfig workerConfig = new WorkerConfig();
+        workerConfig.setExecThreads(execThreads);
+        
workerConfig.setTaskExecuteThreadsFullPolicy(taskExecuteThreadsFullPolicy);
+        return workerConfig;
+    }
+
 }

Reply via email to