This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new b184364561 Fix WorkerTaskExecutorThreadPool#isOverload is incorrect
(#16027)
b184364561 is described below
commit b18436456115b09235af262012e537bae0f5200c
Author: Wenjun Ruan <[email protected]>
AuthorDate: Mon May 20 19:13:53 2024 +0800
Fix WorkerTaskExecutorThreadPool#isOverload is incorrect (#16027)
---
.../worker/runner/WorkerTaskExecutorHolder.java | 4 +
.../runner/WorkerTaskExecutorThreadPool.java | 11 +-
.../TaskInstanceKillOperationFunction.java | 4 +-
.../runner/WorkerTaskExecutorThreadPoolTest.java | 252 ++++++++++-----------
4 files changed, 131 insertions(+), 140 deletions(-)
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorHolder.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorHolder.java
index fa07dbfde6..e44b23152c 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorHolder.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorHolder.java
@@ -48,6 +48,10 @@ public class WorkerTaskExecutorHolder {
workerTaskExecutorMap.clear();
}
+ public static int size() {
+ return workerTaskExecutorMap.size();
+ }
+
public static Collection<WorkerTaskExecutor> getAllTaskExecutor() {
return workerTaskExecutorMap.values();
}
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
index 4606d9f7e9..99645f89a4 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPool.java
@@ -39,6 +39,7 @@ public class WorkerTaskExecutorThreadPool {
public WorkerTaskExecutorThreadPool(WorkerConfig workerConfig) {
this.threadPoolExecutor =
ThreadUtils.newDaemonFixedThreadExecutor("WorkerTaskExecutorThreadPool",
workerConfig.getExecThreads());
+ threadPoolExecutor.prestartAllCoreThreads();
this.workerConfig = workerConfig;
WorkerServerMetrics.registerWorkerExecuteQueueSizeGauge(this::getWaitingTaskExecutorSize);
@@ -64,15 +65,19 @@ public class WorkerTaskExecutorThreadPool {
}
public boolean isOverload() {
- return threadPoolExecutor.getQueue().size() > 0;
+ return WorkerTaskExecutorHolder.size() >=
workerConfig.getExecThreads();
}
public int getWaitingTaskExecutorSize() {
- return threadPoolExecutor.getQueue().size();
+ if (WorkerTaskExecutorHolder.size() <= workerConfig.getExecThreads()) {
+ return 0;
+ } else {
+ return WorkerTaskExecutorHolder.size() -
workerConfig.getExecThreads();
+ }
}
public int getRunningTaskExecutorSize() {
- return threadPoolExecutor.getActiveCount();
+ return Math.min(WorkerTaskExecutorHolder.size(),
workerConfig.getExecThreads());
}
/**
diff --git
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
index d55765d23f..dc1ea8b2e5 100644
---
a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
+++
b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/operator/TaskInstanceKillOperationFunction.java
@@ -89,8 +89,8 @@ public class TaskInstanceKillOperationFunction
taskExecutionContext
.setCurrentExecutionStatus(result ?
TaskExecutionStatus.SUCCESS : TaskExecutionStatus.FAILURE);
-
WorkerTaskExecutorHolder.remove(taskExecutionContext.getTaskInstanceId());
-
messageRetryRunner.removeRetryMessages(taskExecutionContext.getTaskInstanceId());
+ WorkerTaskExecutorHolder.remove(taskInstanceId);
+ messageRetryRunner.removeRetryMessages(taskInstanceId);
return TaskInstanceKillResponse.success(taskExecutionContext);
} finally {
LogUtils.removeTaskInstanceIdMDC();
diff --git
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java
index 182ac6a1c2..9f27ac4309 100644
---
a/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java
+++
b/dolphinscheduler-worker/src/test/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecutorThreadPoolTest.java
@@ -17,48 +17,127 @@
package org.apache.dolphinscheduler.server.worker.runner;
-import org.apache.dolphinscheduler.common.enums.ResUploadType;
-import org.apache.dolphinscheduler.plugin.storage.api.StorageEntity;
-import org.apache.dolphinscheduler.plugin.storage.api.StorageOperate;
+import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import
org.apache.dolphinscheduler.server.worker.config.TaskExecuteThreadsFullPolicy;
import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
import org.apache.dolphinscheduler.server.worker.registry.WorkerRegistryClient;
import org.apache.dolphinscheduler.server.worker.rpc.WorkerMessageSender;
-import org.apache.dolphinscheduler.spi.enums.ResourceType;
-import java.io.IOException;
-import java.util.List;
+import org.apache.commons.lang3.RandomUtils;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import com.google.common.truth.Truth;
+
class WorkerTaskExecutorThreadPoolTest {
+ @BeforeEach
+ public void setUp() {
+ WorkerTaskExecutorHolder.clear();
+ }
+
@Test
public void testIsOverload() {
- WorkerConfig workerConfig = new WorkerConfig();
- workerConfig.setExecThreads(1);
-
workerConfig.setTaskExecuteThreadsFullPolicy(TaskExecuteThreadsFullPolicy.CONTINUE);
+ final int execThreadCount = RandomUtils.nextInt(1, 100);
+ final int totalTaskCount = RandomUtils.nextInt(1, 10000);
+ final WorkerConfig workerConfig = createWorkerConfig(execThreadCount,
TaskExecuteThreadsFullPolicy.CONTINUE);
+ final WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool =
+ new WorkerTaskExecutorThreadPool(workerConfig);
+ // submit totalTaskCount task, the thread pool size is
execThreadCount, reject policy is CONTINUE
+ // after submit execThreadCount task, the thread pool is overload
+ for (int i = 1; i <= totalTaskCount; i++) {
+ MockWorkerTaskExecutor mockWorkerTaskExecutor =
+ new MockWorkerTaskExecutor(() ->
ThreadUtils.sleep(10_000L));
+
workerTaskExecutorThreadPool.submitWorkerTaskExecutor(mockWorkerTaskExecutor);
+ if (i >= execThreadCount) {
+
Truth.assertThat(workerTaskExecutorThreadPool.isOverload()).isTrue();
+ } else {
+
Truth.assertThat(workerTaskExecutorThreadPool.isOverload()).isFalse();
+ }
+ }
+ }
+
+ @Test
+ public void testSubmitWorkerTaskExecutorWithContinuePolicy() {
+ final int execThreadCount = RandomUtils.nextInt(1, 100);
+ final int totalTaskCount = RandomUtils.nextInt(1, 10000);
+ final WorkerConfig workerConfig = createWorkerConfig(execThreadCount,
TaskExecuteThreadsFullPolicy.CONTINUE);
+ final WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool =
+ new WorkerTaskExecutorThreadPool(workerConfig);
+ // submit totalTaskCount task, the thread pool size is
execThreadCount, reject policy is CONTINUE
+ // all task will be submitted success
+ for (int i = 1; i <= totalTaskCount; i++) {
+ MockWorkerTaskExecutor mockWorkerTaskExecutor =
+ new MockWorkerTaskExecutor(() ->
ThreadUtils.sleep(10_000L));
+
Truth.assertThat(workerTaskExecutorThreadPool.submitWorkerTaskExecutor(mockWorkerTaskExecutor)).isTrue();
+ }
+ }
+
+ @Test
+ public void testSubmitWorkerTaskExecutorWithRejectPolicy() {
+ final int execThreadCount = RandomUtils.nextInt(1, 100);
+ final int totalTaskCount = RandomUtils.nextInt(1, 10000);
+ final WorkerConfig workerConfig = createWorkerConfig(execThreadCount,
TaskExecuteThreadsFullPolicy.REJECT);
+ final WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool =
+ new WorkerTaskExecutorThreadPool(workerConfig);
+ // submit totalTaskCount task, the thread pool size is
execThreadCount, reject policy is REJECT
+ // only the front execThreadCount task will be submitted success
+ for (int i = 1; i <= totalTaskCount; i++) {
+ MockWorkerTaskExecutor mockWorkerTaskExecutor =
+ new MockWorkerTaskExecutor(() ->
ThreadUtils.sleep(10_000L));
+ boolean submitResult =
workerTaskExecutorThreadPool.submitWorkerTaskExecutor(mockWorkerTaskExecutor);
+ if (i <= execThreadCount) {
+ Assertions.assertTrue(submitResult, "The " + i + " task should
submit success");
+ } else {
+ Assertions.assertFalse(submitResult, "The " + i + " task
should submit failed");
+ }
+ }
+ }
+
+ @Test
+ public void testGetWaitingTaskExecutorSize() {
+ final int execThreadCount = RandomUtils.nextInt(1, 100);
+ final int totalTaskCount = RandomUtils.nextInt(1, 10000);
+ final WorkerConfig workerConfig = createWorkerConfig(execThreadCount,
TaskExecuteThreadsFullPolicy.CONTINUE);
+ final WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool =
+ new WorkerTaskExecutorThreadPool(workerConfig);
+
+
Truth.assertThat(workerTaskExecutorThreadPool.getWaitingTaskExecutorSize()).isEqualTo(0);
+ for (int i = 1; i <= totalTaskCount; i++) {
+ MockWorkerTaskExecutor mockWorkerTaskExecutor =
+ new MockWorkerTaskExecutor(() ->
ThreadUtils.sleep(10_000L));
+
workerTaskExecutorThreadPool.submitWorkerTaskExecutor(mockWorkerTaskExecutor);
+ if (i <= execThreadCount) {
+
Truth.assertThat(workerTaskExecutorThreadPool.getWaitingTaskExecutorSize()).isEqualTo(0);
+ } else {
+
Truth.assertThat(workerTaskExecutorThreadPool.getWaitingTaskExecutorSize())
+ .isEqualTo(i - execThreadCount);
+ }
+ }
+ }
+
+ @Test
+ public void testGetRunningTaskExecutorSize() {
+ final int execThreadCount = RandomUtils.nextInt(1, 100);
+ final int totalTaskCount = RandomUtils.nextInt(1, 10000);
+ WorkerConfig workerConfig = createWorkerConfig(execThreadCount,
TaskExecuteThreadsFullPolicy.CONTINUE);
WorkerTaskExecutorThreadPool workerTaskExecutorThreadPool = new
WorkerTaskExecutorThreadPool(workerConfig);
- // submit 100 task, the thread pool size is 1
- // assert the overload should be true
- // assert the submitQueue should be 99
- for (int i = 0; i < 100; i++) {
- boolean submitResult =
- workerTaskExecutorThreadPool.submitWorkerTaskExecutor(new
MockWorkerTaskExecutor(() -> {
- try {
- Thread.sleep(10_000L);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }));
- Assertions.assertTrue(submitResult);
+
+
Truth.assertThat(workerTaskExecutorThreadPool.getRunningTaskExecutorSize()).isEqualTo(0);
+ for (int i = 1; i <= totalTaskCount; i++) {
+ MockWorkerTaskExecutor mockWorkerTaskExecutor =
+ new MockWorkerTaskExecutor(() ->
ThreadUtils.sleep(10_000L));
+
workerTaskExecutorThreadPool.submitWorkerTaskExecutor(mockWorkerTaskExecutor);
+ if (i <= execThreadCount) {
+
Truth.assertThat(workerTaskExecutorThreadPool.getRunningTaskExecutorSize()).isEqualTo(i);
+ } else {
+
Truth.assertThat(workerTaskExecutorThreadPool.getRunningTaskExecutorSize()).isEqualTo(execThreadCount);
+ }
}
- Assertions.assertTrue(workerTaskExecutorThreadPool.isOverload());
- Assertions.assertEquals(99,
workerTaskExecutorThreadPool.getWaitingTaskExecutorSize());
- Assertions.assertEquals(1,
workerTaskExecutorThreadPool.getRunningTaskExecutorSize());
}
static class MockWorkerTaskExecutor extends WorkerTaskExecutor {
@@ -66,116 +145,11 @@ class WorkerTaskExecutorThreadPoolTest {
private final Runnable runnable;
protected MockWorkerTaskExecutor(Runnable runnable) {
- super(TaskExecutionContext.builder().taskInstanceId((int)
System.nanoTime()).build(), new WorkerConfig(),
- new WorkerMessageSender(), new StorageOperate() {
-
- @Override
- public void createTenantDirIfNotExists(String
tenantCode) {
-
- }
-
- @Override
- public String getResDir(String tenantCode) {
- return null;
- }
-
- @Override
- public String getUdfDir(String tenantCode) {
- return null;
- }
-
- @Override
- public boolean mkdir(String tenantCode, String path)
throws IOException {
- return false;
- }
-
- @Override
- public String getResourceFullName(String tenantCode,
String fileName) {
- return null;
- }
-
- @Override
- public String getResourceFileName(String tenantCode,
String fullName) {
- return null;
- }
-
- @Override
- public String getFileName(ResourceType resourceType,
String tenantCode, String fileName) {
- return null;
- }
-
- @Override
- public boolean exists(String fullName) {
- return false;
- }
-
- @Override
- public boolean delete(String filePath, boolean
recursive) {
- return false;
- }
-
- @Override
- public boolean delete(String filePath, List<String>
childrenPathArray,
- boolean recursive) {
- return false;
- }
-
- @Override
- public boolean copy(String srcPath, String dstPath,
boolean deleteSource,
- boolean overwrite) {
- return false;
- }
-
- @Override
- public String getDir(ResourceType resourceType, String
tenantCode) {
- return null;
- }
-
- @Override
- public boolean upload(String tenantCode, String
srcFile, String dstPath, boolean deleteSource,
- boolean overwrite) {
- return false;
- }
-
- @Override
- public void download(String srcFilePath, String
dstFile, boolean overwrite) {
-
- }
-
- @Override
- public List<String> vimFile(String tenantCode, String
filePath, int skipLineNums,
- int limit) {
- return null;
- }
-
- @Override
- public void deleteTenant(String tenantCode) {
-
- }
-
- @Override
- public ResUploadType returnStorageType() {
- return null;
- }
-
- @Override
- public List<StorageEntity>
listFilesStatusRecursively(String path, String defaultPath,
-
String tenantCode, ResourceType type) {
- return null;
- }
-
- @Override
- public List<StorageEntity> listFilesStatus(String
path, String defaultPath, String tenantCode,
-
ResourceType type) throws Exception {
- return null;
- }
-
- @Override
- public StorageEntity getFileStatus(String path, String
defaultPath, String tenantCode,
- ResourceType type)
throws Exception {
- return null;
- }
- }, new WorkerRegistryClient());
+ super(TaskExecutionContext.builder().taskInstanceId((int)
System.nanoTime()).build(),
+ new WorkerConfig(),
+ new WorkerMessageSender(),
+ null,
+ new WorkerRegistryClient());
this.runnable = runnable;
}
@@ -190,4 +164,12 @@ class WorkerTaskExecutorThreadPoolTest {
}
}
+ private WorkerConfig createWorkerConfig(int execThreads,
+ TaskExecuteThreadsFullPolicy
taskExecuteThreadsFullPolicy) {
+ WorkerConfig workerConfig = new WorkerConfig();
+ workerConfig.setExecThreads(execThreads);
+
workerConfig.setTaskExecuteThreadsFullPolicy(taskExecuteThreadsFullPolicy);
+ return workerConfig;
+ }
+
}