This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 775dbea475 [Feature] [Zeta] Optimize CoordinatorService ThreadPool 
Configuration to Prevent Potential OOM (#8241)
775dbea475 is described below

commit 775dbea4754bccfb22203a48bca4ec96608fa7a3
Author: xiaochen <[email protected]>
AuthorDate: Thu Dec 19 23:52:54 2024 +0800

    [Feature] [Zeta] Optimize CoordinatorService ThreadPool Configuration to 
Prevent Potential OOM (#8241)
---
 .../seatunnel-engine/hybrid-cluster-deployment.md  | 21 +++++++++++
 .../separated-cluster-deployment.md                | 23 ++++++++++++
 .../seatunnel-engine/hybrid-cluster-deployment.md  | 22 ++++++++++++
 .../separated-cluster-deployment.md                | 20 +++++++++++
 .../engine/common/config/EngineConfig.java         |  4 +++
 .../config/YamlSeaTunnelDomConfigProcessor.java    | 22 ++++++++++++
 .../config/server/CoordinatorServiceConfig.java    | 42 ++++++++++++++++++++++
 .../common/config/server/ServerConfigOptions.java  | 19 ++++++++++
 .../config/YamlSeaTunnelConfigParserTest.java      |  4 +++
 .../src/test/resources/seatunnel.yaml              |  3 ++
 .../engine/server/CoordinatorService.java          |  6 ++--
 11 files changed, 183 insertions(+), 3 deletions(-)

diff --git a/docs/en/seatunnel-engine/hybrid-cluster-deployment.md 
b/docs/en/seatunnel-engine/hybrid-cluster-deployment.md
index ac072c494d..6aad26dccb 100644
--- a/docs/en/seatunnel-engine/hybrid-cluster-deployment.md
+++ b/docs/en/seatunnel-engine/hybrid-cluster-deployment.md
@@ -154,6 +154,27 @@ seatunnel:
 
 When `dynamic-slot: true` is used, the `job-schedule-strategy: WAIT` 
configuration will become invalid and will be forcibly changed to 
`job-schedule-strategy: REJECT`, because this parameter is meaningless in 
dynamic slots.
 
+### 4.7 Coordinator Service
+
+CoordinatorService responsible for the process of generating each job from a 
LogicalDag to an ExecutionDag, 
+and then to a PhysicalDag. It ultimately creates the JobMaster for the job to 
handle scheduling, execution, and state monitoring.
+
+**core-thread-num**
+
+The corePoolSize of seatunnel coordinator job's executor cached thread pool 
+
+**max-thread-num**
+
+The max job count can be executed at same time
+
+Example
+
+```yaml
+coordinator-service:
+   core-thread-num: 30
+   max-thread-num: 1000
+```
+
 ## 5. Configure The SeaTunnel Engine Network Service
 
 All SeaTunnel Engine network-related configurations are in the 
`hazelcast.yaml` file.
diff --git a/docs/en/seatunnel-engine/separated-cluster-deployment.md 
b/docs/en/seatunnel-engine/separated-cluster-deployment.md
index 91215eb459..1244042dbb 100644
--- a/docs/en/seatunnel-engine/separated-cluster-deployment.md
+++ b/docs/en/seatunnel-engine/separated-cluster-deployment.md
@@ -297,6 +297,29 @@ seatunnel:
 ```
 When `dynamic-slot: true` is used, the `job-schedule-strategy: WAIT` 
configuration will become invalid and will be forcibly changed to 
`job-schedule-strategy: REJECT`, because this parameter is meaningless in 
dynamic slots.
 
+
+### 4.8 Coordinator Service
+
+CoordinatorService responsible for the process of generating each job from a 
LogicalDag to an ExecutionDag,
+and then to a PhysicalDag. It ultimately creates the JobMaster for the job to 
handle scheduling, execution, and state monitoring.
+
+**core-thread-num**
+
+The corePoolSize of seatunnel coordinator job's executor cached thread pool
+
+**max-thread-num**
+
+The max job count can be executed at same time
+
+Example
+
+```yaml
+coordinator-service:
+  core-thread-num: 30
+  max-thread-num: 1000
+```
+
+
 ## 5. Configuring SeaTunnel Engine Network Services
 
 All network-related configurations of the SeaTunnel Engine are in the 
`hazelcast-master.yaml` and `hazelcast-worker.yaml` files.
diff --git a/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md 
b/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md
index 7780527345..084bf980f7 100644
--- a/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md
+++ b/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md
@@ -153,6 +153,28 @@ seatunnel:
 
 当`dynamic-slot: ture`时,`job-schedule-strategy: WAIT` 
配置会失效,将被强制修改为`job-schedule-strategy: REJECT`,因为动态Slot时该参数没有意义,可以直接提交。
 
+### 4.7 Coordinator Service
+
+CoordinatorService 提供了每个作业从 LogicalDag 到 ExecutionDag,再到 PhysicalDag 的生成流程, 
并最终创建作业的 JobMaster 进行作业的调度执行和状态监控
+
+**core-thread-num**
+
+配置 CoordinatorService 线程池核心线程数量
+
+**max-thread-num**
+
+同时可执行的最大作业数量
+
+Example
+
+```yaml
+coordinator-service:
+   core-thread-num: 30
+   max-thread-num: 1000
+```
+
+
+
 ## 5. 配置 SeaTunnel Engine 网络服务
 
 所有 SeaTunnel Engine 网络相关的配置都在 `hazelcast.yaml` 文件中.
diff --git a/docs/zh/seatunnel-engine/separated-cluster-deployment.md 
b/docs/zh/seatunnel-engine/separated-cluster-deployment.md
index bdc369ff8c..b0dd827789 100644
--- a/docs/zh/seatunnel-engine/separated-cluster-deployment.md
+++ b/docs/zh/seatunnel-engine/separated-cluster-deployment.md
@@ -301,6 +301,26 @@ seatunnel:
 
 当`dynamic-slot: ture`时,`job-schedule-strategy: WAIT` 
配置会失效,将被强制修改为`job-schedule-strategy: REJECT`,因为动态Slot时该参数没有意义,可以直接提交。
 
+### 4.8 Coordinator Service
+
+CoordinatorService 提供了每个作业从 LogicalDag 到 ExecutionDag,再到 PhysicalDag 的生成流程, 
并最终创建作业的 JobMaster 进行作业的调度执行和状态监控
+
+**core-thread-num**
+
+配置 CoordinatorService 线程池核心线程数量
+
+**max-thread-num**
+
+同时可执行的最大作业数量
+
+Example
+
+```yaml
+coordinator-service:
+  core-thread-num: 30
+  max-thread-num: 1000
+```
+
 ## 5. 配置 SeaTunnel Engine 网络服务
 
 所有 SeaTunnel Engine 网络相关的配置都在 `hazelcast-master.yaml`和`hazelcast-worker.yaml` 
文件中.
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index 99a721109b..dd043aba55 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.common.config;
 
 import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
 import 
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
+import 
org.apache.seatunnel.engine.common.config.server.CoordinatorServiceConfig;
 import org.apache.seatunnel.engine.common.config.server.HttpConfig;
 import org.apache.seatunnel.engine.common.config.server.QueueType;
 import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
@@ -57,6 +58,9 @@ public class EngineConfig {
 
     private CheckpointConfig checkpointConfig = 
ServerConfigOptions.CHECKPOINT.defaultValue();
 
+    private CoordinatorServiceConfig coordinatorServiceConfig =
+            ServerConfigOptions.COORDINATOR_SERVICE.defaultValue();
+
     private ConnectorJarStorageConfig connectorJarStorageConfig =
             ServerConfigOptions.CONNECTOR_JAR_STORAGE_CONFIG.defaultValue();
 
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index c0f6b6ad6c..310485a524 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -22,6 +22,7 @@ import 
org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
 import 
org.apache.seatunnel.engine.common.config.server.ConnectorJarHAStorageConfig;
 import 
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
 import 
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageMode;
+import 
org.apache.seatunnel.engine.common.config.server.CoordinatorServiceConfig;
 import org.apache.seatunnel.engine.common.config.server.HttpConfig;
 import org.apache.seatunnel.engine.common.config.server.QueueType;
 import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
@@ -106,6 +107,25 @@ public class YamlSeaTunnelDomConfigProcessor extends 
AbstractDomConfigProcessor
         return slotServiceConfig;
     }
 
+    private CoordinatorServiceConfig parseCoordinatorServiceConfig(Node 
coordinatorServiceNode) {
+        CoordinatorServiceConfig coordinatorServiceConfig = new 
CoordinatorServiceConfig();
+        for (Node node : childElements(coordinatorServiceNode)) {
+            String name = cleanNodeName(node);
+            if (ServerConfigOptions.MAX_THREAD_NUM.key().equals(name)) {
+                coordinatorServiceConfig.setMaxThreadNum(
+                        getIntegerValue(
+                                ServerConfigOptions.MAX_THREAD_NUM.key(), 
getTextContent(node)));
+            } else if (ServerConfigOptions.CORE_THREAD_NUM.key().equals(name)) 
{
+                coordinatorServiceConfig.setCoreThreadNum(
+                        getIntegerValue(
+                                ServerConfigOptions.CORE_THREAD_NUM.key(), 
getTextContent(node)));
+            } else {
+                LOGGER.warning("Unrecognized element: " + name);
+            }
+        }
+        return coordinatorServiceConfig;
+    }
+
     private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) {
         final EngineConfig engineConfig = config.getEngineConfig();
         for (Node node : childElements(engineNode)) {
@@ -177,6 +197,8 @@ public class YamlSeaTunnelDomConfigProcessor extends 
AbstractDomConfigProcessor
                         
ScheduleStrategy.valueOf(getTextContent(node).toUpperCase(Locale.ROOT)));
             } else if (ServerConfigOptions.HTTP.key().equals(name)) {
                 engineConfig.setHttpConfig(parseHttpConfig(node));
+            } else if 
(ServerConfigOptions.COORDINATOR_SERVICE.key().equals(name)) {
+                
engineConfig.setCoordinatorServiceConfig(parseCoordinatorServiceConfig(node));
             } else {
                 LOGGER.warning("Unrecognized element: " + name);
             }
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CoordinatorServiceConfig.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CoordinatorServiceConfig.java
new file mode 100644
index 0000000000..3ed455e373
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CoordinatorServiceConfig.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config.server;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+import static com.hazelcast.internal.util.Preconditions.checkPositive;
+
+@Data
+public class CoordinatorServiceConfig implements Serializable {
+
+    private int coreThreadNum = 
ServerConfigOptions.CORE_THREAD_NUM.defaultValue();
+
+    private int maxThreadNum = 
ServerConfigOptions.MAX_THREAD_NUM.defaultValue();
+
+    public void setCoreThreadNum(int coreThreadNum) {
+        checkPositive(coreThreadNum, ServerConfigOptions.CORE_THREAD_NUM + " 
must be >= 0");
+        this.coreThreadNum = coreThreadNum;
+    }
+
+    public void setMaxThreadNum(int maxThreadNum) {
+        checkPositive(maxThreadNum, ServerConfigOptions.MAX_THREAD_NUM + " 
must be > 0");
+        this.maxThreadNum = maxThreadNum;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index 7153cccd0f..e0de761742 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -132,6 +132,25 @@ public class ServerConfigOptions {
                     .type(new TypeReference<Map<String, String>>() {})
                     .noDefaultValue()
                     .withDescription("The checkpoint storage instance 
configuration.");
+
+    public static final Option<Integer> CORE_THREAD_NUM =
+            Options.key("core-thread-num")
+                    .intType()
+                    .defaultValue(10)
+                    .withDescription("The core thread num of coordinator 
service");
+
+    public static final Option<Integer> MAX_THREAD_NUM =
+            Options.key("max-thread-num")
+                    .intType()
+                    .defaultValue(Integer.MAX_VALUE)
+                    .withDescription("The max thread num of coordinator 
service");
+
+    public static final Option<CoordinatorServiceConfig> COORDINATOR_SERVICE =
+            Options.key("coordinator-service")
+                    .type(new TypeReference<CoordinatorServiceConfig>() {})
+                    .defaultValue(new CoordinatorServiceConfig())
+                    .withDescription("The coordinator service configuration.");
+
     public static final Option<Integer> HISTORY_JOB_EXPIRE_MINUTES =
             Options.key("history-job-expire-minutes")
                     .intType()
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
 
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
index 42e6aa681b..bae761237d 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
@@ -76,6 +76,10 @@ public class YamlSeaTunnelConfigParserTest {
         
Assertions.assertTrue(config.getEngineConfig().getHttpConfig().isEnableDynamicPort());
         Assertions.assertEquals(8080, 
config.getEngineConfig().getHttpConfig().getPort());
         Assertions.assertEquals(200, 
config.getEngineConfig().getHttpConfig().getPortRange());
+        Assertions.assertEquals(
+                30, 
config.getEngineConfig().getCoordinatorServiceConfig().getCoreThreadNum());
+        Assertions.assertEquals(
+                1000, 
config.getEngineConfig().getCoordinatorServiceConfig().getMaxThreadNum());
     }
 
     @Test
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml 
b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
index 88f8c3f9bb..da6b831f38 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
+++ b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
@@ -22,6 +22,9 @@ seatunnel:
         slot-service:
             dynamic-slot: false
             slot-num: 5
+        coordinator-service:
+            core-thread-num: 30
+            max-thread-num: 1000
         checkpoint:
             interval: 6000
             timeout: 7000
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index bfa4379ad5..5b74a1f181 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -199,11 +199,12 @@ public class CoordinatorService {
             @NonNull SeaTunnelServer seaTunnelServer,
             EngineConfig engineConfig) {
         this.nodeEngine = nodeEngine;
+        this.engineConfig = engineConfig;
         this.logger = nodeEngine.getLogger(getClass());
         this.executorService =
                 new ThreadPoolExecutor(
-                        0,
-                        Integer.MAX_VALUE,
+                        
engineConfig.getCoordinatorServiceConfig().getCoreThreadNum(),
+                        
engineConfig.getCoordinatorServiceConfig().getMaxThreadNum(),
                         60L,
                         TimeUnit.SECONDS,
                         new SynchronousQueue<>(),
@@ -212,7 +213,6 @@ public class CoordinatorService {
                                 .build(),
                         new ThreadPoolStatus.RejectionCountingHandler());
         this.seaTunnelServer = seaTunnelServer;
-        this.engineConfig = engineConfig;
         masterActiveListener = Executors.newSingleThreadScheduledExecutor();
         masterActiveListener.scheduleAtFixedRate(
                 this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);

Reply via email to