This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 775dbea475 [Feature] [Zeta] Optimize CoordinatorService ThreadPool
Configuration to Prevent Potential OOM (#8241)
775dbea475 is described below
commit 775dbea4754bccfb22203a48bca4ec96608fa7a3
Author: xiaochen <[email protected]>
AuthorDate: Thu Dec 19 23:52:54 2024 +0800
[Feature] [Zeta] Optimize CoordinatorService ThreadPool Configuration to
Prevent Potential OOM (#8241)
---
.../seatunnel-engine/hybrid-cluster-deployment.md | 21 +++++++++++
.../separated-cluster-deployment.md | 23 ++++++++++++
.../seatunnel-engine/hybrid-cluster-deployment.md | 22 ++++++++++++
.../separated-cluster-deployment.md | 20 +++++++++++
.../engine/common/config/EngineConfig.java | 4 +++
.../config/YamlSeaTunnelDomConfigProcessor.java | 22 ++++++++++++
.../config/server/CoordinatorServiceConfig.java | 42 ++++++++++++++++++++++
.../common/config/server/ServerConfigOptions.java | 19 ++++++++++
.../config/YamlSeaTunnelConfigParserTest.java | 4 +++
.../src/test/resources/seatunnel.yaml | 3 ++
.../engine/server/CoordinatorService.java | 6 ++--
11 files changed, 183 insertions(+), 3 deletions(-)
diff --git a/docs/en/seatunnel-engine/hybrid-cluster-deployment.md
b/docs/en/seatunnel-engine/hybrid-cluster-deployment.md
index ac072c494d..6aad26dccb 100644
--- a/docs/en/seatunnel-engine/hybrid-cluster-deployment.md
+++ b/docs/en/seatunnel-engine/hybrid-cluster-deployment.md
@@ -154,6 +154,27 @@ seatunnel:
When `dynamic-slot: true` is used, the `job-schedule-strategy: WAIT`
configuration will become invalid and will be forcibly changed to
`job-schedule-strategy: REJECT`, because this parameter is meaningless in
dynamic slots.
+### 4.7 Coordinator Service
+
+CoordinatorService responsible for the process of generating each job from a
LogicalDag to an ExecutionDag,
+and then to a PhysicalDag. It ultimately creates the JobMaster for the job to
handle scheduling, execution, and state monitoring.
+
+**core-thread-num**
+
+The corePoolSize of seatunnel coordinator job's executor cached thread pool
+
+**max-thread-num**
+
+The max job count can be executed at same time
+
+Example
+
+```yaml
+coordinator-service:
+ core-thread-num: 30
+ max-thread-num: 1000
+```
+
## 5. Configure The SeaTunnel Engine Network Service
All SeaTunnel Engine network-related configurations are in the
`hazelcast.yaml` file.
diff --git a/docs/en/seatunnel-engine/separated-cluster-deployment.md
b/docs/en/seatunnel-engine/separated-cluster-deployment.md
index 91215eb459..1244042dbb 100644
--- a/docs/en/seatunnel-engine/separated-cluster-deployment.md
+++ b/docs/en/seatunnel-engine/separated-cluster-deployment.md
@@ -297,6 +297,29 @@ seatunnel:
```
When `dynamic-slot: true` is used, the `job-schedule-strategy: WAIT`
configuration will become invalid and will be forcibly changed to
`job-schedule-strategy: REJECT`, because this parameter is meaningless in
dynamic slots.
+
+### 4.8 Coordinator Service
+
+CoordinatorService responsible for the process of generating each job from a
LogicalDag to an ExecutionDag,
+and then to a PhysicalDag. It ultimately creates the JobMaster for the job to
handle scheduling, execution, and state monitoring.
+
+**core-thread-num**
+
+The corePoolSize of seatunnel coordinator job's executor cached thread pool
+
+**max-thread-num**
+
+The max job count can be executed at same time
+
+Example
+
+```yaml
+coordinator-service:
+ core-thread-num: 30
+ max-thread-num: 1000
+```
+
+
## 5. Configuring SeaTunnel Engine Network Services
All network-related configurations of the SeaTunnel Engine are in the
`hazelcast-master.yaml` and `hazelcast-worker.yaml` files.
diff --git a/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md
b/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md
index 7780527345..084bf980f7 100644
--- a/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md
+++ b/docs/zh/seatunnel-engine/hybrid-cluster-deployment.md
@@ -153,6 +153,28 @@ seatunnel:
当`dynamic-slot: ture`时,`job-schedule-strategy: WAIT`
配置会失效,将被强制修改为`job-schedule-strategy: REJECT`,因为动态Slot时该参数没有意义,可以直接提交。
+### 4.7 Coordinator Service
+
+CoordinatorService 提供了每个作业从 LogicalDag 到 ExecutionDag,再到 PhysicalDag 的生成流程,
并最终创建作业的 JobMaster 进行作业的调度执行和状态监控
+
+**core-thread-num**
+
+配置 CoordinatorService 线程池核心线程数量
+
+**max-thread-num**
+
+同时可执行的最大作业数量
+
+Example
+
+```yaml
+coordinator-service:
+ core-thread-num: 30
+ max-thread-num: 1000
+```
+
+
+
## 5. 配置 SeaTunnel Engine 网络服务
所有 SeaTunnel Engine 网络相关的配置都在 `hazelcast.yaml` 文件中.
diff --git a/docs/zh/seatunnel-engine/separated-cluster-deployment.md
b/docs/zh/seatunnel-engine/separated-cluster-deployment.md
index bdc369ff8c..b0dd827789 100644
--- a/docs/zh/seatunnel-engine/separated-cluster-deployment.md
+++ b/docs/zh/seatunnel-engine/separated-cluster-deployment.md
@@ -301,6 +301,26 @@ seatunnel:
当`dynamic-slot: ture`时,`job-schedule-strategy: WAIT`
配置会失效,将被强制修改为`job-schedule-strategy: REJECT`,因为动态Slot时该参数没有意义,可以直接提交。
+### 4.8 Coordinator Service
+
+CoordinatorService 提供了每个作业从 LogicalDag 到 ExecutionDag,再到 PhysicalDag 的生成流程,
并最终创建作业的 JobMaster 进行作业的调度执行和状态监控
+
+**core-thread-num**
+
+配置 CoordinatorService 线程池核心线程数量
+
+**max-thread-num**
+
+同时可执行的最大作业数量
+
+Example
+
+```yaml
+coordinator-service:
+ core-thread-num: 30
+ max-thread-num: 1000
+```
+
## 5. 配置 SeaTunnel Engine 网络服务
所有 SeaTunnel Engine 网络相关的配置都在 `hazelcast-master.yaml`和`hazelcast-worker.yaml`
文件中.
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index 99a721109b..dd043aba55 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.common.config;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
+import
org.apache.seatunnel.engine.common.config.server.CoordinatorServiceConfig;
import org.apache.seatunnel.engine.common.config.server.HttpConfig;
import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
@@ -57,6 +58,9 @@ public class EngineConfig {
private CheckpointConfig checkpointConfig =
ServerConfigOptions.CHECKPOINT.defaultValue();
+ private CoordinatorServiceConfig coordinatorServiceConfig =
+ ServerConfigOptions.COORDINATOR_SERVICE.defaultValue();
+
private ConnectorJarStorageConfig connectorJarStorageConfig =
ServerConfigOptions.CONNECTOR_JAR_STORAGE_CONFIG.defaultValue();
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index c0f6b6ad6c..310485a524 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -22,6 +22,7 @@ import
org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
import
org.apache.seatunnel.engine.common.config.server.ConnectorJarHAStorageConfig;
import
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageConfig;
import
org.apache.seatunnel.engine.common.config.server.ConnectorJarStorageMode;
+import
org.apache.seatunnel.engine.common.config.server.CoordinatorServiceConfig;
import org.apache.seatunnel.engine.common.config.server.HttpConfig;
import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
@@ -106,6 +107,25 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
return slotServiceConfig;
}
+ private CoordinatorServiceConfig parseCoordinatorServiceConfig(Node
coordinatorServiceNode) {
+ CoordinatorServiceConfig coordinatorServiceConfig = new
CoordinatorServiceConfig();
+ for (Node node : childElements(coordinatorServiceNode)) {
+ String name = cleanNodeName(node);
+ if (ServerConfigOptions.MAX_THREAD_NUM.key().equals(name)) {
+ coordinatorServiceConfig.setMaxThreadNum(
+ getIntegerValue(
+ ServerConfigOptions.MAX_THREAD_NUM.key(),
getTextContent(node)));
+ } else if (ServerConfigOptions.CORE_THREAD_NUM.key().equals(name))
{
+ coordinatorServiceConfig.setCoreThreadNum(
+ getIntegerValue(
+ ServerConfigOptions.CORE_THREAD_NUM.key(),
getTextContent(node)));
+ } else {
+ LOGGER.warning("Unrecognized element: " + name);
+ }
+ }
+ return coordinatorServiceConfig;
+ }
+
private void parseEngineConfig(Node engineNode, SeaTunnelConfig config) {
final EngineConfig engineConfig = config.getEngineConfig();
for (Node node : childElements(engineNode)) {
@@ -177,6 +197,8 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
ScheduleStrategy.valueOf(getTextContent(node).toUpperCase(Locale.ROOT)));
} else if (ServerConfigOptions.HTTP.key().equals(name)) {
engineConfig.setHttpConfig(parseHttpConfig(node));
+ } else if
(ServerConfigOptions.COORDINATOR_SERVICE.key().equals(name)) {
+
engineConfig.setCoordinatorServiceConfig(parseCoordinatorServiceConfig(node));
} else {
LOGGER.warning("Unrecognized element: " + name);
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CoordinatorServiceConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CoordinatorServiceConfig.java
new file mode 100644
index 0000000000..3ed455e373
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CoordinatorServiceConfig.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.config.server;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+import static com.hazelcast.internal.util.Preconditions.checkPositive;
+
+@Data
+public class CoordinatorServiceConfig implements Serializable {
+
+ private int coreThreadNum =
ServerConfigOptions.CORE_THREAD_NUM.defaultValue();
+
+ private int maxThreadNum =
ServerConfigOptions.MAX_THREAD_NUM.defaultValue();
+
+ public void setCoreThreadNum(int coreThreadNum) {
+ checkPositive(coreThreadNum, ServerConfigOptions.CORE_THREAD_NUM + "
must be >= 0");
+ this.coreThreadNum = coreThreadNum;
+ }
+
+ public void setMaxThreadNum(int maxThreadNum) {
+ checkPositive(maxThreadNum, ServerConfigOptions.MAX_THREAD_NUM + "
must be > 0");
+ this.maxThreadNum = maxThreadNum;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index 7153cccd0f..e0de761742 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -132,6 +132,25 @@ public class ServerConfigOptions {
.type(new TypeReference<Map<String, String>>() {})
.noDefaultValue()
.withDescription("The checkpoint storage instance
configuration.");
+
+ public static final Option<Integer> CORE_THREAD_NUM =
+ Options.key("core-thread-num")
+ .intType()
+ .defaultValue(10)
+ .withDescription("The core thread num of coordinator
service");
+
+ public static final Option<Integer> MAX_THREAD_NUM =
+ Options.key("max-thread-num")
+ .intType()
+ .defaultValue(Integer.MAX_VALUE)
+ .withDescription("The max thread num of coordinator
service");
+
+ public static final Option<CoordinatorServiceConfig> COORDINATOR_SERVICE =
+ Options.key("coordinator-service")
+ .type(new TypeReference<CoordinatorServiceConfig>() {})
+ .defaultValue(new CoordinatorServiceConfig())
+ .withDescription("The coordinator service configuration.");
+
public static final Option<Integer> HISTORY_JOB_EXPIRE_MINUTES =
Options.key("history-job-expire-minutes")
.intType()
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
index 42e6aa681b..bae761237d 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
@@ -76,6 +76,10 @@ public class YamlSeaTunnelConfigParserTest {
Assertions.assertTrue(config.getEngineConfig().getHttpConfig().isEnableDynamicPort());
Assertions.assertEquals(8080,
config.getEngineConfig().getHttpConfig().getPort());
Assertions.assertEquals(200,
config.getEngineConfig().getHttpConfig().getPortRange());
+ Assertions.assertEquals(
+ 30,
config.getEngineConfig().getCoordinatorServiceConfig().getCoreThreadNum());
+ Assertions.assertEquals(
+ 1000,
config.getEngineConfig().getCoordinatorServiceConfig().getMaxThreadNum());
}
@Test
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
index 88f8c3f9bb..da6b831f38 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
+++ b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
@@ -22,6 +22,9 @@ seatunnel:
slot-service:
dynamic-slot: false
slot-num: 5
+ coordinator-service:
+ core-thread-num: 30
+ max-thread-num: 1000
checkpoint:
interval: 6000
timeout: 7000
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index bfa4379ad5..5b74a1f181 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -199,11 +199,12 @@ public class CoordinatorService {
@NonNull SeaTunnelServer seaTunnelServer,
EngineConfig engineConfig) {
this.nodeEngine = nodeEngine;
+ this.engineConfig = engineConfig;
this.logger = nodeEngine.getLogger(getClass());
this.executorService =
new ThreadPoolExecutor(
- 0,
- Integer.MAX_VALUE,
+
engineConfig.getCoordinatorServiceConfig().getCoreThreadNum(),
+
engineConfig.getCoordinatorServiceConfig().getMaxThreadNum(),
60L,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
@@ -212,7 +213,6 @@ public class CoordinatorService {
.build(),
new ThreadPoolStatus.RejectionCountingHandler());
this.seaTunnelServer = seaTunnelServer;
- this.engineConfig = engineConfig;
masterActiveListener = Executors.newSingleThreadScheduledExecutor();
masterActiveListener.scheduleAtFixedRate(
this::checkNewActiveMaster, 0, 100, TimeUnit.MILLISECONDS);