This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7704bb7af0 [Feature] [Zeta] Add /option-rules REST API for runtime
connector metadata (#10585)
7704bb7af0 is described below
commit 7704bb7af0efa909504b3b0cc4bd1eaba3ea2c86
Author: yzeng1618 <[email protected]>
AuthorDate: Tue Mar 31 13:45:30 2026 +0800
[Feature] [Zeta] Add /option-rules REST API for runtime connector metadata
(#10585)
---
docs/en/engines/zeta/rest-api-v1.md | 94 +++++++
docs/en/engines/zeta/rest-api-v2.md | 94 +++++++
docs/zh/engines/zeta/rest-api-v1.md | 94 +++++++
docs/zh/engines/zeta/rest-api-v2.md | 94 +++++++
seatunnel-engine/seatunnel-engine-server/pom.xml | 5 +
.../seatunnel/engine/server/JettyService.java | 4 +
.../seatunnel/engine/server/rest/RestConstant.java | 1 +
.../server/rest/RestHttpGetCommandProcessor.java | 24 ++
.../server/rest/response/OptionRuleResponse.java | 167 +++++++++++
.../server/rest/service/OptionRulesService.java | 310 +++++++++++++++++++++
.../server/rest/servlet/OptionRulesServlet.java | 78 ++++++
.../engine/server/rest/OptionRulesApiTest.java | 177 ++++++++++++
.../rest/service/OptionRulesServiceTest.java | 303 ++++++++++++++++++++
13 files changed, 1445 insertions(+)
diff --git a/docs/en/engines/zeta/rest-api-v1.md
b/docs/en/engines/zeta/rest-api-v1.md
index f4d750fc65..60b508bf8b 100644
--- a/docs/en/engines/zeta/rest-api-v1.md
+++ b/docs/en/engines/zeta/rest-api-v1.md
@@ -37,6 +37,100 @@ network:
## API reference
+### Get Connector Option Rules
+
+<details>
+ <summary><code>GET</code>
<code><b>/hazelcast/rest/maps/option-rules?type=source&plugin=FakeSource</b></code>
<code>(Returns the full runtime OptionRule metadata of a
connector.)</code></summary>
+
+#### Parameters
+
+> | name | type | data type | description
|
+>
|--------|----------|-----------|--------------------------------------------------------------------|
+> | type | required | string | plugin type, currently supports `source`
and `sink` |
+> | plugin | required | string | connector factory identifier, for example
`FakeSource` or `Console` |
+
+#### Responses
+
+```json
+{
+ "engineType": "seatunnel",
+ "pluginType": "source",
+ "pluginName": "FakeSource",
+ "optionRule": {
+ "optionalOptions": [
+ {
+ "key": "row.num",
+ "type": "java.lang.Integer",
+ "defaultValue": 5,
+ "description": "The total number of data generated per degree of
parallelism",
+ "fallbackKeys": [],
+ "optionValues": null
+ }
+ ],
+ "requiredOptions": [
+ {
+ "ruleType": "EXCLUSIVE",
+ "options": [
+ {
+ "key": "schema",
+ "type": "org.apache.seatunnel.api.table.catalog.TableSchema",
+ "defaultValue": null,
+ "description": "The schema of the upstream table",
+ "fallbackKeys": [],
+ "optionValues": null
+ }
+ ]
+ },
+ {
+ "ruleType": "CONDITIONAL",
+ "options": [
+ {
+ "key": "string.template",
+ "type": "java.util.List<java.lang.String>",
+ "defaultValue": null,
+ "description": "The template list of string type that connector
generated, if user configured it, connector will randomly select an item from
the template list",
+ "fallbackKeys": [],
+ "optionValues": null
+ }
+ ],
+ "expression": "'string.fake.mode' == TEMPLATE",
+ "expressionTree": {
+ "condition": {
+ "option": {
+ "key": "string.fake.mode",
+ "type":
"org.apache.seatunnel.connectors.seatunnel.fake.config.FakeSourceOptions$FakeMode",
+ "defaultValue": "RANDOM",
+ "description": "The fake mode of generating string data",
+ "fallbackKeys": [],
+ "optionValues": [
+ "RANDOM",
+ "TEMPLATE"
+ ]
+ },
+ "expectValue": "TEMPLATE",
+ "operator": null,
+ "next": null
+ },
+ "operator": null,
+ "next": null
+ }
+ }
+ ],
+ "conditionRules": []
+ }
+}
+```
+
+**Notes:**
+- The response is resolved from runtime plugin discovery, so it follows the
connector version installed on the server.
+- `requiredOptions[].ruleType` can be `ABSOLUTELY_REQUIRED`, `EXCLUSIVE`,
`BUNDLED`, or `CONDITIONAL`.
+- `optionRule.conditionRules` recursively exposes nested conditional option
rules and is an empty array when the connector does not define nested rules.
+- For conditional rules, both `expression` and `expressionTree` are returned
for dynamic form rendering.
+
+</details>
+
+------------------------------------------------------------------------------------------
+
### Returns an overview over the Zeta engine cluster.
<details>
diff --git a/docs/en/engines/zeta/rest-api-v2.md
b/docs/en/engines/zeta/rest-api-v2.md
index 2badb19237..8717b8973d 100644
--- a/docs/en/engines/zeta/rest-api-v2.md
+++ b/docs/en/engines/zeta/rest-api-v2.md
@@ -39,6 +39,100 @@ Please refer [security](security.md)
## API reference
+### Get Connector Option Rules
+
+<details>
+ <summary><code>GET</code>
<code><b>/option-rules?type=source&plugin=FakeSource</b></code> <code>(Returns
the full runtime OptionRule metadata of a connector.)</code></summary>
+
+#### Parameters
+
+> | name | type | data type | description
|
+>
|--------|----------|-----------|--------------------------------------------------------------------|
+> | type | required | string | plugin type, currently supports `source`
and `sink` |
+> | plugin | required | string | connector factory identifier, for example
`FakeSource` or `Console` |
+
+#### Responses
+
+```json
+{
+ "engineType": "seatunnel",
+ "pluginType": "source",
+ "pluginName": "FakeSource",
+ "optionRule": {
+ "optionalOptions": [
+ {
+ "key": "row.num",
+ "type": "java.lang.Integer",
+ "defaultValue": 5,
+ "description": "The total number of data generated per degree of
parallelism",
+ "fallbackKeys": [],
+ "optionValues": null
+ }
+ ],
+ "requiredOptions": [
+ {
+ "ruleType": "EXCLUSIVE",
+ "options": [
+ {
+ "key": "schema",
+ "type": "org.apache.seatunnel.api.table.catalog.TableSchema",
+ "defaultValue": null,
+ "description": "The schema of the upstream table",
+ "fallbackKeys": [],
+ "optionValues": null
+ }
+ ]
+ },
+ {
+ "ruleType": "CONDITIONAL",
+ "options": [
+ {
+ "key": "string.template",
+ "type": "java.util.List<java.lang.String>",
+ "defaultValue": null,
+ "description": "The template list of string type that connector
generated, if user configured it, connector will randomly select an item from
the template list",
+ "fallbackKeys": [],
+ "optionValues": null
+ }
+ ],
+ "expression": "'string.fake.mode' == TEMPLATE",
+ "expressionTree": {
+ "condition": {
+ "option": {
+ "key": "string.fake.mode",
+ "type":
"org.apache.seatunnel.connectors.seatunnel.fake.config.FakeSourceOptions$FakeMode",
+ "defaultValue": "RANDOM",
+ "description": "The fake mode of generating string data",
+ "fallbackKeys": [],
+ "optionValues": [
+ "RANDOM",
+ "TEMPLATE"
+ ]
+ },
+ "expectValue": "TEMPLATE",
+ "operator": null,
+ "next": null
+ },
+ "operator": null,
+ "next": null
+ }
+ }
+ ],
+ "conditionRules": []
+ }
+}
+```
+
+**Notes:**
+- The response is resolved from runtime plugin discovery, so it follows the
connector version installed on the server.
+- `requiredOptions[].ruleType` can be `ABSOLUTELY_REQUIRED`, `EXCLUSIVE`,
`BUNDLED`, or `CONDITIONAL`.
+- `optionRule.conditionRules` recursively exposes nested conditional option
rules and is an empty array when the connector does not define nested rules.
+- For conditional rules, both `expression` and `expressionTree` are returned
for dynamic form rendering.
+
+</details>
+
+------------------------------------------------------------------------------------------
+
### Returns an overview over the Zeta engine cluster.
<details>
diff --git a/docs/zh/engines/zeta/rest-api-v1.md
b/docs/zh/engines/zeta/rest-api-v1.md
index 5a08e80c49..0b448615d3 100644
--- a/docs/zh/engines/zeta/rest-api-v1.md
+++ b/docs/zh/engines/zeta/rest-api-v1.md
@@ -36,6 +36,100 @@ network:
## API参考
+### 获取 Connector 的 OptionRule
+
+<details>
+ <summary><code>GET</code>
<code><b>/hazelcast/rest/maps/option-rules?type=source&plugin=FakeSource</b></code>
<code>(返回 Connector 运行时完整的 OptionRule 元数据。)</code></summary>
+
+#### 参数
+
+> | 参数名称 | 是否必传 | 参数类型 | 参数描述 |
+> |--------|------|------|-----------------------------------------|
+> | type | 是 | string | 插件类型,当前支持 `source` 和 `sink` |
+> | plugin | 是 | string | connector 的 factory identifier,例如 `FakeSource` 或
`Console` |
+
+#### 响应
+
+```json
+{
+ "engineType": "seatunnel",
+ "pluginType": "source",
+ "pluginName": "FakeSource",
+ "optionRule": {
+ "optionalOptions": [
+ {
+ "key": "row.num",
+ "type": "java.lang.Integer",
+ "defaultValue": 5,
+ "description": "The total number of data generated per degree of
parallelism",
+ "fallbackKeys": [],
+ "optionValues": null
+ }
+ ],
+ "requiredOptions": [
+ {
+ "ruleType": "EXCLUSIVE",
+ "options": [
+ {
+ "key": "schema",
+ "type": "org.apache.seatunnel.api.table.catalog.TableSchema",
+ "defaultValue": null,
+ "description": "The schema of the upstream table",
+ "fallbackKeys": [],
+ "optionValues": null
+ }
+ ]
+ },
+ {
+ "ruleType": "CONDITIONAL",
+ "options": [
+ {
+ "key": "string.template",
+ "type": "java.util.List<java.lang.String>",
+ "defaultValue": null,
+ "description": "The template list of string type that connector
generated, if user configured it, connector will randomly select an item from
the template list",
+ "fallbackKeys": [],
+ "optionValues": null
+ }
+ ],
+ "expression": "'string.fake.mode' == TEMPLATE",
+ "expressionTree": {
+ "condition": {
+ "option": {
+ "key": "string.fake.mode",
+ "type":
"org.apache.seatunnel.connectors.seatunnel.fake.config.FakeSourceOptions$FakeMode",
+ "defaultValue": "RANDOM",
+ "description": "The fake mode of generating string data",
+ "fallbackKeys": [],
+ "optionValues": [
+ "RANDOM",
+ "TEMPLATE"
+ ]
+ },
+ "expectValue": "TEMPLATE",
+ "operator": null,
+ "next": null
+ },
+ "operator": null,
+ "next": null
+ }
+ }
+ ],
+ "conditionRules": []
+ }
+}
+```
+
+**说明:**
+- 响应结果来自运行时 plugin discovery,会跟随服务端实际安装的 connector 版本。
+- `requiredOptions[].ruleType` 可能是 `ABSOLUTELY_REQUIRED`、`EXCLUSIVE`、`BUNDLED`
或 `CONDITIONAL`。
+- `optionRule.conditionRules` 会递归返回嵌套条件规则;当 connector 未定义嵌套规则时,该字段返回空数组。
+- 对于条件规则,会同时返回 `expression` 和 `expressionTree`,便于 Web 做动态表单渲染。
+
+</details>
+
+------------------------------------------------------------------------------------------
+
### 返回Zeta集群的概览
<details>
diff --git a/docs/zh/engines/zeta/rest-api-v2.md
b/docs/zh/engines/zeta/rest-api-v2.md
index f840be1c3b..d91dee0754 100644
--- a/docs/zh/engines/zeta/rest-api-v2.md
+++ b/docs/zh/engines/zeta/rest-api-v2.md
@@ -37,6 +37,100 @@ seatunnel:
## API参考
+### 获取 Connector 的 OptionRule
+
+<details>
+ <summary><code>GET</code>
<code><b>/option-rules?type=source&plugin=FakeSource</b></code> <code>(返回
Connector 运行时完整的 OptionRule 元数据。)</code></summary>
+
+#### 参数
+
+> | 参数名称 | 是否必传 | 参数类型 | 参数描述 |
+> |--------|------|------|-----------------------------------------|
+> | type | 是 | string | 插件类型,当前支持 `source` 和 `sink` |
+> | plugin | 是 | string | connector 的 factory identifier,例如 `FakeSource` 或
`Console` |
+
+#### 响应
+
+```json
+{
+ "engineType": "seatunnel",
+ "pluginType": "source",
+ "pluginName": "FakeSource",
+ "optionRule": {
+ "optionalOptions": [
+ {
+ "key": "row.num",
+ "type": "java.lang.Integer",
+ "defaultValue": 5,
+ "description": "The total number of data generated per degree of
parallelism",
+ "fallbackKeys": [],
+ "optionValues": null
+ }
+ ],
+ "requiredOptions": [
+ {
+ "ruleType": "EXCLUSIVE",
+ "options": [
+ {
+ "key": "schema",
+ "type": "org.apache.seatunnel.api.table.catalog.TableSchema",
+ "defaultValue": null,
+ "description": "The schema of the upstream table",
+ "fallbackKeys": [],
+ "optionValues": null
+ }
+ ]
+ },
+ {
+ "ruleType": "CONDITIONAL",
+ "options": [
+ {
+ "key": "string.template",
+ "type": "java.util.List<java.lang.String>",
+ "defaultValue": null,
+ "description": "The template list of string type that connector
generated, if user configured it, connector will randomly select an item from
the template list",
+ "fallbackKeys": [],
+ "optionValues": null
+ }
+ ],
+ "expression": "'string.fake.mode' == TEMPLATE",
+ "expressionTree": {
+ "condition": {
+ "option": {
+ "key": "string.fake.mode",
+ "type":
"org.apache.seatunnel.connectors.seatunnel.fake.config.FakeSourceOptions$FakeMode",
+ "defaultValue": "RANDOM",
+ "description": "The fake mode of generating string data",
+ "fallbackKeys": [],
+ "optionValues": [
+ "RANDOM",
+ "TEMPLATE"
+ ]
+ },
+ "expectValue": "TEMPLATE",
+ "operator": null,
+ "next": null
+ },
+ "operator": null,
+ "next": null
+ }
+ }
+ ],
+ "conditionRules": []
+ }
+}
+```
+
+**说明:**
+- 响应结果来自运行时 plugin discovery,会跟随服务端实际安装的 connector 版本。
+- `requiredOptions[].ruleType` 可能是 `ABSOLUTELY_REQUIRED`、`EXCLUSIVE`、`BUNDLED`
或 `CONDITIONAL`。
+- `optionRule.conditionRules` 会递归返回嵌套条件规则;当 connector 未定义嵌套规则时,该字段返回空数组。
+- 对于条件规则,会同时返回 `expression` 和 `expressionTree`,便于 Web 做动态表单渲染。
+
+</details>
+
+------------------------------------------------------------------------------------------
+
### 返回Zeta集群的概览
<details>
diff --git a/seatunnel-engine/seatunnel-engine-server/pom.xml
b/seatunnel-engine/seatunnel-engine-server/pom.xml
index d9e1c860c5..8260fcbff2 100644
--- a/seatunnel-engine/seatunnel-engine-server/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-server/pom.xml
@@ -71,6 +71,11 @@
<artifactId>seatunnel-config-sql</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-plugin-discovery</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
index 52cad3b2d8..ae1cf3e2fc 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/JettyService.java
@@ -38,6 +38,7 @@ import
org.apache.seatunnel.engine.server.rest.servlet.EncryptConfigServlet;
import org.apache.seatunnel.engine.server.rest.servlet.FinishedJobsServlet;
import org.apache.seatunnel.engine.server.rest.servlet.JobInfoServlet;
import org.apache.seatunnel.engine.server.rest.servlet.MetricsServlet;
+import org.apache.seatunnel.engine.server.rest.servlet.OptionRulesServlet;
import org.apache.seatunnel.engine.server.rest.servlet.OverviewServlet;
import org.apache.seatunnel.engine.server.rest.servlet.PendingJobsServlet;
import org.apache.seatunnel.engine.server.rest.servlet.RunningJobsServlet;
@@ -74,6 +75,7 @@ import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_LOG;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_LOGS;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_METRICS;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_OPEN_METRICS;
+import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_OPTION_RULES;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_OVERVIEW;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_PENDING_JOBS;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_RUNNING_JOB;
@@ -207,6 +209,7 @@ public class JettyService {
ServletHolder allLogNameServlet = new ServletHolder(new
AllLogNameServlet(nodeEngine));
ServletHolder metricsServlet = new ServletHolder(new
MetricsServlet(nodeEngine));
+ ServletHolder optionRulesHolder = new ServletHolder(new
OptionRulesServlet(nodeEngine));
ServletHolder checkpointOverviewHolder =
new ServletHolder(new CheckpointOverviewServlet(nodeEngine));
ServletHolder checkpointHistoryHolder =
@@ -239,6 +242,7 @@ public class JettyService {
context.addServlet(allLogNameServlet,
convertUrlToPath(REST_URL_GET_ALL_LOG_NAME));
context.addServlet(metricsServlet, convertUrlToPath(REST_URL_METRICS));
context.addServlet(metricsServlet,
convertUrlToPath(REST_URL_OPEN_METRICS));
+ context.addServlet(optionRulesHolder,
convertUrlToPath(REST_URL_OPTION_RULES));
context.addServlet(
checkpointOverviewHolder,
convertUrlToPath(REST_URL_CHECKPOINT_OVERVIEW));
context.addServlet(checkpointHistoryHolder,
convertUrlToPath(REST_URL_CHECKPOINT_HISTORY));
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
index 9cdc746a5a..6766753524 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestConstant.java
@@ -100,6 +100,7 @@ public class RestConstant {
public static final String REST_URL_GET_ALL_LOG_NAME = "/get-all-log-name";
public static final String REST_URL_METRICS = "/metrics";
public static final String REST_URL_OPEN_METRICS = "/openmetrics";
+ public static final String REST_URL_OPTION_RULES = "/option-rules";
public static final String REST_URL_CHECKPOINT_OVERVIEW =
"/jobs/checkpoints";
public static final String REST_URL_CHECKPOINT_HISTORY =
"/jobs/checkpoints/history";
// api path end
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index 2b5426821b..b5212a9b3e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -27,15 +27,18 @@ import org.apache.seatunnel.engine.server.log.FormatType;
import org.apache.seatunnel.engine.server.log.Log4j2HttpGetCommandProcessor;
import org.apache.seatunnel.engine.server.rest.service.JobInfoService;
import org.apache.seatunnel.engine.server.rest.service.LogService;
+import org.apache.seatunnel.engine.server.rest.service.OptionRulesService;
import org.apache.seatunnel.engine.server.rest.service.OverviewService;
import org.apache.seatunnel.engine.server.rest.service.RunningThreadService;
import org.apache.seatunnel.engine.server.rest.service.SystemMonitoringService;
import org.apache.seatunnel.engine.server.rest.service.ThreadDumpService;
+import com.google.gson.Gson;
import com.hazelcast.internal.ascii.TextCommandService;
import com.hazelcast.internal.ascii.rest.HttpCommandProcessor;
import com.hazelcast.internal.ascii.rest.HttpGetCommand;
import com.hazelcast.internal.ascii.rest.RestValue;
+import com.hazelcast.internal.json.Json;
import com.hazelcast.internal.util.JsonUtil;
import com.hazelcast.internal.util.StringUtil;
import com.hazelcast.spi.impl.NodeEngineImpl;
@@ -51,6 +54,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_400;
+import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_404;
import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.CONTEXT_PATH;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.INSTANCE_CONTEXT_PATH;
@@ -61,6 +65,7 @@ import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_LOG;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_LOGS;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_METRICS;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_OPEN_METRICS;
+import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_OPTION_RULES;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_OVERVIEW;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_RUNNING_JOB;
import static
org.apache.seatunnel.engine.server.rest.RestConstant.REST_URL_RUNNING_JOBS;
@@ -79,6 +84,7 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
private ThreadDumpService threadDumpService;
private RunningThreadService runningThreadService;
private LogService logService;
+ private OptionRulesService optionRulesService;
public RestHttpGetCommandProcessor(TextCommandService textCommandService) {
@@ -90,6 +96,7 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
this.threadDumpService = new ThreadDumpService(nodeEngine);
this.runningThreadService = new RunningThreadService(nodeEngine);
this.logService = new LogService(nodeEngine);
+ this.optionRulesService = new OptionRulesService(nodeEngine);
}
public RestHttpGetCommandProcessor(
@@ -106,6 +113,7 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
this.threadDumpService = new ThreadDumpService(nodeEngine);
this.runningThreadService = new RunningThreadService(nodeEngine);
this.logService = new LogService(nodeEngine);
+ this.optionRulesService = new OptionRulesService(nodeEngine);
}
@Override
@@ -126,6 +134,8 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
getRunningThread(httpGetCommand);
} else if (uri.startsWith(CONTEXT_PATH + REST_URL_OVERVIEW)) {
overView(httpGetCommand, uri);
+ } else if (uri.startsWith(CONTEXT_PATH + REST_URL_OPTION_RULES)) {
+ getOptionRules(httpGetCommand, uri);
} else if (uri.equals(INSTANCE_CONTEXT_PATH + REST_URL_METRICS)) {
handleMetrics(httpGetCommand, TextFormat.CONTENT_TYPE_004);
} else if (uri.equals(INSTANCE_CONTEXT_PATH +
REST_URL_OPEN_METRICS)) {
@@ -180,6 +190,20 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
JsonUtils.toJsonString(overviewService.getOverviewInfo(tags)))));
}
+ private void getOptionRules(HttpGetCommand command, String uri) {
+ try {
+ Map<String, String> params = getUriParam(uri);
+ String response =
+ new Gson()
+ .toJson(
+ optionRulesService.getOptionRules(
+ params.get("type"),
params.get("plugin")));
+ this.prepareResponse(command, Json.parse(response).asObject());
+ } catch (java.util.NoSuchElementException e) {
+ prepareResponse(SC_404, command, exceptionResponse(e));
+ }
+ }
+
public void getThreadDump(HttpGetCommand command) {
this.prepareResponse(command, threadDumpService.getThreadDump());
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/response/OptionRuleResponse.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/response/OptionRuleResponse.java
new file mode 100644
index 0000000000..ada70fa721
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/response/OptionRuleResponse.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest.response;
+
+import lombok.Getter;
+
+import java.util.List;
+
+@Getter
+public class OptionRuleResponse {
+
+ private final String engineType;
+ private final String pluginType;
+ private final String pluginName;
+ private final OptionRuleMetadata optionRule;
+
+ public OptionRuleResponse(
+ String engineType,
+ String pluginType,
+ String pluginName,
+ OptionRuleMetadata optionRule) {
+ this.engineType = engineType;
+ this.pluginType = pluginType;
+ this.pluginName = pluginName;
+ this.optionRule = optionRule;
+ }
+
+ @Getter
+ public static class OptionRuleMetadata {
+
+ private final List<OptionMetadata> optionalOptions;
+ private final List<RequiredOptionMetadata> requiredOptions;
+ private final List<ConditionRuleMetadata> conditionRules;
+
+ public OptionRuleMetadata(
+ List<OptionMetadata> optionalOptions,
+ List<RequiredOptionMetadata> requiredOptions,
+ List<ConditionRuleMetadata> conditionRules) {
+ this.optionalOptions = optionalOptions;
+ this.requiredOptions = requiredOptions;
+ this.conditionRules = conditionRules;
+ }
+ }
+
+ @Getter
+ public static class ConditionRuleMetadata {
+
+ private final String expression;
+ private final ExpressionNode expressionTree;
+ private final OptionRuleMetadata optionRule;
+
+ public ConditionRuleMetadata(
+ String expression, ExpressionNode expressionTree,
OptionRuleMetadata optionRule) {
+ this.expression = expression;
+ this.expressionTree = expressionTree;
+ this.optionRule = optionRule;
+ }
+ }
+
+ @Getter
+ public static class RequiredOptionMetadata {
+
+ private final RuleType ruleType;
+ private final List<OptionMetadata> options;
+ private final String expression;
+ private final ExpressionNode expressionTree;
+
+ public RequiredOptionMetadata(
+ RuleType ruleType,
+ List<OptionMetadata> options,
+ String expression,
+ ExpressionNode expressionTree) {
+ this.ruleType = ruleType;
+ this.options = options;
+ this.expression = expression;
+ this.expressionTree = expressionTree;
+ }
+ }
+
+ @Getter
+ public static class OptionMetadata {
+
+ private final String key;
+ private final String type;
+ private final Object defaultValue;
+ private final String description;
+ private final List<String> fallbackKeys;
+ private final List<Object> optionValues;
+
+ public OptionMetadata(
+ String key,
+ String type,
+ Object defaultValue,
+ String description,
+ List<String> fallbackKeys,
+ List<Object> optionValues) {
+ this.key = key;
+ this.type = type;
+ this.defaultValue = defaultValue;
+ this.description = description;
+ this.fallbackKeys = fallbackKeys;
+ this.optionValues = optionValues;
+ }
+ }
+
+ @Getter
+ public static class ExpressionNode {
+
+ private final ConditionNode condition;
+ private final LogicalOperator operator;
+ private final ExpressionNode next;
+
+ public ExpressionNode(
+ ConditionNode condition, LogicalOperator operator,
ExpressionNode next) {
+ this.condition = condition;
+ this.operator = operator;
+ this.next = next;
+ }
+ }
+
+ @Getter
+ public static class ConditionNode {
+
+ private final OptionMetadata option;
+ private final Object expectValue;
+ private final LogicalOperator operator;
+ private final ConditionNode next;
+
+ public ConditionNode(
+ OptionMetadata option,
+ Object expectValue,
+ LogicalOperator operator,
+ ConditionNode next) {
+ this.option = option;
+ this.expectValue = expectValue;
+ this.operator = operator;
+ this.next = next;
+ }
+ }
+
+ public enum RuleType {
+ ABSOLUTELY_REQUIRED,
+ EXCLUSIVE,
+ BUNDLED,
+ CONDITIONAL
+ }
+
+ public enum LogicalOperator {
+ AND,
+ OR
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/OptionRulesService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/OptionRulesService.java
new file mode 100644
index 0000000000..b904f3dffa
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/OptionRulesService.java
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest.service;
+
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
+
+import org.apache.seatunnel.api.common.PluginIdentifier;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.SingleChoiceOption;
+import org.apache.seatunnel.api.configuration.util.Condition;
+import org.apache.seatunnel.api.configuration.util.ConditionRule;
+import org.apache.seatunnel.api.configuration.util.Expression;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.configuration.util.RequiredOption;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.engine.server.rest.response.OptionRuleResponse;
+import org.apache.seatunnel.plugin.discovery.PluginDiscovery;
+import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
+import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
+
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.stream.Collectors;
+
+/**
+ * Service for exposing runtime connector option rules via REST APIs.
+ *
+ * <p>The returned metadata is loaded from runtime plugin discovery and cached
for the lifetime of
+ * the service instance. The cache can be cleared explicitly if plugin
metadata needs to be
+ * refreshed.
+ */
+@Slf4j
+public class OptionRulesService extends BaseService {
+
+ private static final String PARAM_TYPE = "type";
+ private static final String PARAM_PLUGIN = "plugin";
+ private static final Map<Class<? extends RequiredOption>,
OptionRuleResponse.RuleType>
+ REQUIRED_OPTION_RULE_TYPES = createRequiredOptionRuleTypes();
+
+ private final Map<PluginType, PluginDiscovery<?>> pluginDiscoveries;
+ private final ConcurrentMap<PluginType, LinkedHashMap<PluginIdentifier,
OptionRule>>
+ discoveredPluginsCache;
+ private final ConcurrentMap<PluginType, ConcurrentMap<String,
OptionRuleResponse>>
+ responseCache;
+
+ public OptionRulesService(NodeEngineImpl nodeEngine) {
+ super(nodeEngine);
+ Map<PluginType, PluginDiscovery<?>> discoveries = new
EnumMap<>(PluginType.class);
+ discoveries.put(PluginType.SOURCE, new
SeaTunnelSourcePluginDiscovery());
+ discoveries.put(PluginType.SINK, new SeaTunnelSinkPluginDiscovery());
+ this.pluginDiscoveries = Collections.unmodifiableMap(discoveries);
+ this.discoveredPluginsCache = new ConcurrentHashMap<>();
+ this.responseCache = new ConcurrentHashMap<>();
+ }
+
+ /**
+ * Returns the full option rule metadata of a runtime connector.
+ *
+ * @param pluginTypeText connector type text, currently supports {@code
source} and {@code sink}
+ * @param pluginName connector factory identifier
+ * @return connector option rule metadata
+ * @throws IllegalArgumentException if the request parameters are blank or
unsupported
+ * @throws NoSuchElementException if the target plugin cannot be found
from runtime discovery
+ */
+ public OptionRuleResponse getOptionRules(String pluginTypeText, String
pluginName) {
+ PluginType pluginType = parseSupportedPluginType(pluginTypeText);
+ String normalizedPluginName = normalizePluginName(pluginName);
+ String displayPluginName = pluginName.trim();
+ ConcurrentMap<String, OptionRuleResponse> pluginTypeCache =
getPluginTypeCache(pluginType);
+ return pluginTypeCache.computeIfAbsent(
+ normalizedPluginName,
+ key ->
+ buildOptionRuleResponse(
+ pluginType, normalizedPluginName,
displayPluginName));
+ }
+
+ /**
+ * Clears all cached discovery and response metadata.
+ *
+ * <p>This is primarily intended for future plugin reload scenarios.
Current Zeta deployments
+ * still expect service restart after plugin installation or upgrade.
+ */
+ public void clearCache() {
+ discoveredPluginsCache.clear();
+ responseCache.clear();
+ log.info("Cleared option rules cache");
+ }
+
+ OptionRuleResponse buildResponse(PluginIdentifier pluginIdentifier,
OptionRule optionRule) {
+ return new OptionRuleResponse(
+ pluginIdentifier.getEngineType(),
+ pluginIdentifier.getPluginType(),
+ pluginIdentifier.getPluginName(),
+ toOptionRuleMetadata(optionRule));
+ }
+
+ private OptionRuleResponse.OptionRuleMetadata
toOptionRuleMetadata(OptionRule optionRule) {
+ List<OptionRuleResponse.OptionMetadata> optionalOptions =
+ optionRule.getOptionalOptions().stream()
+ .map(this::toOptionMetadata)
+ .collect(Collectors.toList());
+ List<OptionRuleResponse.RequiredOptionMetadata> requiredOptions =
+ optionRule.getRequiredOptions().stream()
+ .map(this::toRequiredOptionMetadata)
+ .collect(Collectors.toList());
+ List<OptionRuleResponse.ConditionRuleMetadata> conditionRules =
+ optionRule.getConditionRules().stream()
+ .map(this::toConditionRuleMetadata)
+ .collect(Collectors.toList());
+ return new OptionRuleResponse.OptionRuleMetadata(
+ optionalOptions, requiredOptions, conditionRules);
+ }
+
+ private ConcurrentMap<String, OptionRuleResponse>
getPluginTypeCache(PluginType pluginType) {
+ return responseCache.computeIfAbsent(pluginType, key -> new
ConcurrentHashMap<>());
+ }
+
+ private OptionRuleResponse buildOptionRuleResponse(
+ PluginType pluginType, String normalizedPluginName, String
displayPluginName) {
+ Map.Entry<PluginIdentifier, OptionRule> pluginEntry =
+ findPlugin(pluginType, normalizedPluginName,
displayPluginName);
+ return buildResponse(pluginEntry.getKey(), pluginEntry.getValue());
+ }
+
+ private Map.Entry<PluginIdentifier, OptionRule> findPlugin(
+ PluginType pluginType, String normalizedPluginName, String
displayPluginName) {
+ return getDiscoveredPlugins(pluginType).entrySet().stream()
+ .filter(
+ pluginEntry ->
+ pluginEntry
+ .getKey()
+ .getPluginName()
+
.equalsIgnoreCase(normalizedPluginName))
+ .findFirst()
+ .orElseThrow(
+ () ->
+ new NoSuchElementException(
+ String.format(
+ "Plugin '%s' not found for
type '%s'.",
+ displayPluginName,
pluginType.getType())));
+ }
+
+ private LinkedHashMap<PluginIdentifier, OptionRule> getDiscoveredPlugins(
+ PluginType pluginType) {
+ return discoveredPluginsCache.computeIfAbsent(
+ pluginType,
+ key -> {
+ PluginDiscovery<?> pluginDiscovery =
pluginDiscoveries.get(key);
+ if (pluginDiscovery == null) {
+ throw new IllegalArgumentException(
+ String.format("Unsupported plugin type: %s",
pluginType.getType()));
+ }
+ return pluginDiscovery.getPlugins();
+ });
+ }
+
+ private PluginType parseSupportedPluginType(String pluginTypeText) {
+ if (StringUtils.isBlank(pluginTypeText)) {
+ throw new IllegalArgumentException(
+ String.format("Parameter '%s' cannot be empty.",
PARAM_TYPE));
+ }
+ String normalizedPluginType = pluginTypeText.trim();
+ if (StringUtils.equalsIgnoreCase(normalizedPluginType,
PluginType.SOURCE.getType())) {
+ return PluginType.SOURCE;
+ }
+ if (StringUtils.equalsIgnoreCase(normalizedPluginType,
PluginType.SINK.getType())) {
+ return PluginType.SINK;
+ }
+ throw new IllegalArgumentException(
+ String.format(
+ "Unsupported plugin type '%s'. Only '%s' and '%s' are
supported.",
+ normalizedPluginType,
+ PluginType.SOURCE.getType(),
+ PluginType.SINK.getType()));
+ }
+
+ private String normalizePluginName(String pluginName) {
+ if (StringUtils.isBlank(pluginName)) {
+ throw new IllegalArgumentException(
+ String.format("Parameter '%s' cannot be empty.",
PARAM_PLUGIN));
+ }
+ return pluginName.trim().toLowerCase(Locale.ROOT);
+ }
+
+ private OptionRuleResponse.RequiredOptionMetadata toRequiredOptionMetadata(
+ RequiredOption requiredOption) {
+ List<OptionRuleResponse.OptionMetadata> options =
+ requiredOption.getOptions().stream()
+ .map(this::toOptionMetadata)
+ .collect(Collectors.toList());
+ OptionRuleResponse.RuleType ruleType = resolveRuleType(requiredOption);
+ if (ruleType == null) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unsupported required option type: %s",
+ requiredOption.getClass().getName()));
+ }
+ if (requiredOption instanceof
RequiredOption.ConditionalRequiredOptions) {
+ Expression expression =
+ ((RequiredOption.ConditionalRequiredOptions)
requiredOption).getExpression();
+ return new OptionRuleResponse.RequiredOptionMetadata(
+ ruleType, options, expression.toString(),
toExpressionNode(expression));
+ }
+ return new OptionRuleResponse.RequiredOptionMetadata(ruleType,
options, null, null);
+ }
+
+ private OptionRuleResponse.ConditionRuleMetadata toConditionRuleMetadata(
+ ConditionRule conditionRule) {
+ Expression expression = conditionRule.getExpression();
+ return new OptionRuleResponse.ConditionRuleMetadata(
+ expression.toString(),
+ toExpressionNode(expression),
+ toOptionRuleMetadata(conditionRule.getOptionRule()));
+ }
+
+ private OptionRuleResponse.RuleType resolveRuleType(RequiredOption
requiredOption) {
+ return REQUIRED_OPTION_RULE_TYPES.entrySet().stream()
+ .filter(entry -> entry.getKey().isInstance(requiredOption))
+ .map(Map.Entry::getValue)
+ .findFirst()
+ .orElse(null);
+ }
+
+ private OptionRuleResponse.ExpressionNode toExpressionNode(Expression
expression) {
+ if (expression == null) {
+ return null;
+ }
+ return new OptionRuleResponse.ExpressionNode(
+ toConditionNode(expression.getCondition()),
+ toLogicalOperator(expression.and()),
+ toExpressionNode(expression.getNext()));
+ }
+
+ private OptionRuleResponse.ConditionNode toConditionNode(Condition<?>
condition) {
+ if (condition == null) {
+ return null;
+ }
+ return new OptionRuleResponse.ConditionNode(
+ toOptionMetadata(condition.getOption()),
+ condition.getExpectValue(),
+ toLogicalOperator(condition.and()),
+ toConditionNode(condition.getNext()));
+ }
+
+ private OptionRuleResponse.LogicalOperator toLogicalOperator(Boolean and) {
+ if (and == null) {
+ return null;
+ }
+ return and ? OptionRuleResponse.LogicalOperator.AND :
OptionRuleResponse.LogicalOperator.OR;
+ }
+
+ private OptionRuleResponse.OptionMetadata toOptionMetadata(Option<?>
option) {
+ List<Object> optionValues = null;
+ if (option instanceof SingleChoiceOption) {
+ optionValues = new ArrayList<>(((SingleChoiceOption<?>)
option).getOptionValues());
+ }
+ return new OptionRuleResponse.OptionMetadata(
+ option.key(),
+ option.typeReference().getType().getTypeName(),
+ option.defaultValue(),
+ option.getDescription(),
+ new ArrayList<>(option.getFallbackKeys()),
+ optionValues);
+ }
+
+ private static Map<Class<? extends RequiredOption>,
OptionRuleResponse.RuleType>
+ createRequiredOptionRuleTypes() {
+ Map<Class<? extends RequiredOption>, OptionRuleResponse.RuleType>
ruleTypes =
+ new HashMap<>();
+ ruleTypes.put(
+ RequiredOption.AbsolutelyRequiredOptions.class,
+ OptionRuleResponse.RuleType.ABSOLUTELY_REQUIRED);
+ ruleTypes.put(
+ RequiredOption.ExclusiveRequiredOptions.class,
+ OptionRuleResponse.RuleType.EXCLUSIVE);
+ ruleTypes.put(
+ RequiredOption.BundledRequiredOptions.class,
OptionRuleResponse.RuleType.BUNDLED);
+ ruleTypes.put(
+ RequiredOption.ConditionalRequiredOptions.class,
+ OptionRuleResponse.RuleType.CONDITIONAL);
+ return Collections.unmodifiableMap(ruleTypes);
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/OptionRulesServlet.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/OptionRulesServlet.java
new file mode 100644
index 0000000000..5628775182
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/OptionRulesServlet.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest.servlet;
+
+import org.apache.seatunnel.engine.server.rest.ErrResponse;
+import org.apache.seatunnel.engine.server.rest.service.OptionRulesService;
+
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import lombok.extern.slf4j.Slf4j;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+@Slf4j
+public class OptionRulesServlet extends BaseServlet {
+
+ private final OptionRulesService optionRulesService;
+
+ public OptionRulesServlet(NodeEngineImpl nodeEngine) {
+ super(nodeEngine);
+ this.optionRulesService = new OptionRulesService(nodeEngine);
+ }
+
+ @Override
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException, IOException {
+ Map<String, String> params = getParameterMap(req);
+ String pluginType = params.get("type");
+ String pluginName = params.get("plugin");
+ try {
+ writeJson(resp, optionRulesService.getOptionRules(pluginType,
pluginName));
+ } catch (IllegalArgumentException e) {
+ log.warn(
+ "Invalid option rules request, type: {}, plugin: {},
error: {}",
+ pluginType,
+ pluginName,
+ e.getMessage());
+ writeJson(resp, errorResponse(e.getMessage()),
HttpServletResponse.SC_BAD_REQUEST);
+ } catch (NoSuchElementException e) {
+ log.info("Option rules plugin not found, type: {}, plugin: {}",
pluginType, pluginName);
+ writeJson(resp, errorResponse(e.getMessage()),
HttpServletResponse.SC_NOT_FOUND);
+ } catch (RuntimeException e) {
+ log.error(
+ "Failed to load option rules, type: {}, plugin: {}",
pluginType, pluginName, e);
+ writeJson(
+ resp,
+ errorResponse("Failed to load option rules."),
+ HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+ }
+ }
+
+ private ErrResponse errorResponse(String message) {
+ ErrResponse errResponse = new ErrResponse();
+ errResponse.setStatus("FAIL");
+ errResponse.setMessage(message);
+ return errResponse;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/OptionRulesApiTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/OptionRulesApiTest.java
new file mode 100644
index 0000000000..84a6b2ce3f
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/OptionRulesApiTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest;
+
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.config.server.HttpConfig;
+import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+import org.apache.seatunnel.engine.server.TestUtils;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LoggerContext;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.config.Config;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+
+import static io.restassured.RestAssured.given;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasItem;
+
+class OptionRulesApiTest {
+
+ private static final int HTTP_PORT = 18082;
+
+ private static HazelcastInstanceImpl instance;
+ private static Config originalHazelcastConfig;
+ private static boolean originalHttpEnabled;
+ private static int originalHttpPort;
+ private static boolean originalEnableDynamicPort;
+ private static ExecutionMode originalExecutionMode;
+
+ @BeforeAll
+ static void before() {
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+ HttpConfig httpConfig =
seaTunnelConfig.getEngineConfig().getHttpConfig();
+ originalHazelcastConfig = seaTunnelConfig.getHazelcastConfig();
+ originalHttpEnabled = httpConfig.isEnabled();
+ originalHttpPort = httpConfig.getPort();
+ originalEnableDynamicPort = httpConfig.isEnableDynamicPort();
+ originalExecutionMode = seaTunnelConfig.getEngineConfig().getMode();
+
+ Config hazelcastConfig = Config.loadFromString(getHazelcastConfig());
+
hazelcastConfig.setClusterName(TestUtils.getClusterName("OptionRulesApiTest"));
+ seaTunnelConfig.setHazelcastConfig(hazelcastConfig);
+ seaTunnelConfig.getEngineConfig().setMode(ExecutionMode.LOCAL);
+
+ httpConfig.setEnabled(true);
+ httpConfig.setPort(HTTP_PORT);
+ httpConfig.setEnableDynamicPort(false);
+
+ instance =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+ }
+
+ @Test
+ void shouldReturnOptionRulesViaJettyRest() {
+ given().queryParam("type", "source")
+ .queryParam("plugin", "FakeSource")
+ .when()
+ .get("http://localhost:" + HTTP_PORT +
RestConstant.REST_URL_OPTION_RULES)
+ .then()
+ .statusCode(200)
+ .body("engineType", equalTo("seatunnel"))
+ .body("pluginType", equalTo("source"))
+ .body("pluginName", equalTo("FakeSource"))
+ .body("optionRule.optionalOptions.key", hasItem("row.num"))
+ .body("optionRule.requiredOptions.ruleType",
hasItem("EXCLUSIVE"))
+ .body("optionRule.requiredOptions.ruleType",
hasItem("CONDITIONAL"))
+ .body("optionRule.conditionRules.size()", equalTo(0));
+ }
+
+ @Test
+ void shouldReturnOptionRulesViaHazelcastRest() {
+ int hazelcastPort =
instance.getCluster().getLocalMember().getAddress().getPort();
+ given().queryParam("type", "source")
+ .queryParam("plugin", "FakeSource")
+ .when()
+ .get(
+ "http://localhost:"
+ + hazelcastPort
+ + RestConstant.CONTEXT_PATH
+ + RestConstant.REST_URL_OPTION_RULES)
+ .then()
+ .statusCode(200)
+ .body("pluginName", equalTo("FakeSource"))
+ .body("optionRule.requiredOptions.ruleType",
hasItem("EXCLUSIVE"))
+ .body("optionRule.conditionRules.size()", equalTo(0));
+ }
+
+ @Test
+ void shouldRejectMissingTypeParameter() {
+ given().queryParam("plugin", "FakeSource")
+ .when()
+ .get("http://localhost:" + HTTP_PORT +
RestConstant.REST_URL_OPTION_RULES)
+ .then()
+ .statusCode(400)
+ .body("message", containsString("Parameter 'type' cannot be
empty"));
+ }
+
+ @Test
+ void shouldReturnNotFoundForUnknownPlugin() {
+ given().queryParam("type", "source")
+ .queryParam("plugin", "MissingPlugin")
+ .when()
+ .get("http://localhost:" + HTTP_PORT +
RestConstant.REST_URL_OPTION_RULES)
+ .then()
+ .statusCode(404)
+ .body("message", containsString("MissingPlugin"));
+ }
+
+ @AfterAll
+ static void after() {
+ if (instance != null) {
+ instance.shutdown();
+ }
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+ seaTunnelConfig.setHazelcastConfig(originalHazelcastConfig);
+ seaTunnelConfig.getEngineConfig().setMode(originalExecutionMode);
+ HttpConfig httpConfig =
seaTunnelConfig.getEngineConfig().getHttpConfig();
+ httpConfig.setEnabled(originalHttpEnabled);
+ httpConfig.setPort(originalHttpPort);
+ httpConfig.setEnableDynamicPort(originalEnableDynamicPort);
+
+ LoggerContext context = (LoggerContext) LogManager.getContext(false);
+ context.close();
+ }
+
+ private static String getHazelcastConfig() {
+ return "hazelcast:\n"
+ + " cluster-name: seatunnel\n"
+ + " network:\n"
+ + " rest-api:\n"
+ + " enabled: true\n"
+ + " endpoint-groups:\n"
+ + " CLUSTER_WRITE:\n"
+ + " enabled: true\n"
+ + " DATA:\n"
+ + " enabled: true\n"
+ + " join:\n"
+ + " tcp-ip:\n"
+ + " enabled: true\n"
+ + " member-list:\n"
+ + " - localhost\n"
+ + " port:\n"
+ + " auto-increment: true\n"
+ + " port-count: 100\n"
+ + " port: 5801\n"
+ + "\n"
+ + " properties:\n"
+ + " hazelcast.invocation.max.retry.count: 200\n"
+ + " hazelcast.tcp.join.port.try.count: 30\n"
+ + " hazelcast.invocation.retry.pause.millis: 2000\n"
+ + "
hazelcast.slow.operation.detector.stacktrace.logging.enabled: true\n"
+ + " hazelcast.logging.type: log4j2\n"
+ + " hazelcast.operation.generic.thread.count: 200\n";
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/service/OptionRulesServiceTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/service/OptionRulesServiceTest.java
new file mode 100644
index 0000000000..8a8a53a991
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/rest/service/OptionRulesServiceTest.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.rest.service;
+
+import org.apache.seatunnel.api.common.PluginIdentifier;
+import org.apache.seatunnel.api.configuration.Option;
+import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.api.configuration.SingleChoiceOption;
+import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.engine.server.rest.response.OptionRuleResponse;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class OptionRulesServiceTest {
+
+ private final OptionRulesService service = new OptionRulesService(null);
+
+ @Test
+ void shouldReturnRuntimeSourceOptionRules() {
+ OptionRuleResponse response = service.getOptionRules("source",
"FakeSource");
+
+ assertEquals("seatunnel", response.getEngineType());
+ assertEquals("source", response.getPluginType());
+ assertEquals("FakeSource", response.getPluginName());
+ assertFalse(response.getOptionRule().getOptionalOptions().isEmpty());
+ assertTrue(
+ response.getOptionRule().getOptionalOptions().stream()
+ .anyMatch(option ->
"row.num".equals(option.getKey())));
+ assertTrue(
+ response.getOptionRule().getRequiredOptions().stream()
+ .anyMatch(
+ option ->
+ option.getRuleType()
+ ==
OptionRuleResponse.RuleType.EXCLUSIVE));
+
+ OptionRuleResponse.RequiredOptionMetadata conditionalRule =
+ response.getOptionRule().getRequiredOptions().stream()
+ .filter(
+ option ->
+ option.getRuleType()
+ ==
OptionRuleResponse.RuleType.CONDITIONAL)
+ .findFirst()
+ .orElseThrow(AssertionError::new);
+ assertNotNull(conditionalRule.getExpression());
+ assertNotNull(conditionalRule.getExpressionTree());
+ assertTrue(response.getOptionRule().getConditionRules().isEmpty());
+ }
+
+ @Test
+ void shouldReturnRuntimeSinkOptionRules() {
+ OptionRuleResponse response = service.getOptionRules("sink",
"Console");
+
+ assertEquals("seatunnel", response.getEngineType());
+ assertEquals("sink", response.getPluginType());
+ assertEquals("Console", response.getPluginName());
+ assertTrue(
+ response.getOptionRule().getOptionalOptions().stream()
+ .anyMatch(option ->
"log.print.data".equals(option.getKey())));
+ assertTrue(response.getOptionRule().getConditionRules().isEmpty());
+ }
+
+ @Test
+ void shouldTrimPluginTypeAndPluginName() {
+ OptionRuleResponse response = service.getOptionRules(" source ", "
FakeSource ");
+
+ assertEquals("source", response.getPluginType());
+ assertEquals("FakeSource", response.getPluginName());
+ }
+
+ @Test
+ void shouldPreserveBundledConditionalAndChoiceMetadata() {
+ SingleChoiceOption<AuthMode> authMode =
+ Options.key("auth.mode")
+ .singleChoice(
+ AuthMode.class,
Arrays.asList(AuthMode.PASSWORD, AuthMode.TOKEN))
+ .defaultValue(AuthMode.PASSWORD)
+ .withDescription("Authentication mode");
+ Option<String> username =
+ Options.key("username")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Username")
+ .withFallbackKeys("user");
+ Option<String> password =
+
Options.key("password").stringType().noDefaultValue().withDescription("Password");
+ Option<String> token =
+
Options.key("token").stringType().noDefaultValue().withDescription("Access
token");
+
+ OptionRule optionRule =
+ OptionRule.builder()
+ .optional(authMode)
+ .bundled(username, password)
+ .conditional(
+ authMode, Arrays.asList(AuthMode.PASSWORD,
AuthMode.TOKEN), token)
+ .build();
+
+ OptionRuleResponse response =
+ service.buildResponse(
+ PluginIdentifier.of("seatunnel", "source",
"CustomSource"), optionRule);
+
+ OptionRuleResponse.OptionMetadata authModeMetadata =
+ response.getOptionRule().getOptionalOptions().stream()
+ .filter(option -> "auth.mode".equals(option.getKey()))
+ .findFirst()
+ .orElseThrow(AssertionError::new);
+ assertEquals(2, authModeMetadata.getOptionValues().size());
+
+ OptionRuleResponse.RequiredOptionMetadata bundledRule =
+ response.getOptionRule().getRequiredOptions().stream()
+ .filter(
+ option ->
+ option.getRuleType() ==
OptionRuleResponse.RuleType.BUNDLED)
+ .findFirst()
+ .orElseThrow(AssertionError::new);
+ assertTrue(
+ bundledRule.getOptions().stream()
+ .anyMatch(option ->
option.getFallbackKeys().contains("user")));
+
+ OptionRuleResponse.RequiredOptionMetadata conditionalRule =
+ response.getOptionRule().getRequiredOptions().stream()
+ .filter(
+ option ->
+ option.getRuleType()
+ ==
OptionRuleResponse.RuleType.CONDITIONAL)
+ .findFirst()
+ .orElseThrow(AssertionError::new);
+ assertEquals(
+ OptionRuleResponse.LogicalOperator.OR,
+ conditionalRule.getExpressionTree().getOperator());
+ assertNotNull(conditionalRule.getExpressionTree().getNext());
+ assertTrue(response.getOptionRule().getConditionRules().isEmpty());
+ }
+
+ @Test
+ void shouldPreserveNestedConditionRuleMetadata() {
+ SingleChoiceOption<AuthMode> authMode =
+ Options.key("auth.mode")
+ .singleChoice(
+ AuthMode.class,
Arrays.asList(AuthMode.PASSWORD, AuthMode.TOKEN))
+ .defaultValue(AuthMode.PASSWORD)
+ .withDescription("Authentication mode");
+ Option<String> username =
+
Options.key("username").stringType().noDefaultValue().withDescription("Username");
+ Option<String> password =
+
Options.key("password").stringType().noDefaultValue().withDescription("Password");
+
+ OptionRule adminRule = OptionRule.builder().required(password).build();
+ OptionRule passwordAuthRule =
+ OptionRule.builder()
+ .required(username)
+ .conditionalRule(username, "admin", adminRule)
+ .build();
+ OptionRule optionRule =
+ OptionRule.builder()
+ .optional(authMode)
+ .conditionalRule(authMode, AuthMode.PASSWORD,
passwordAuthRule)
+ .build();
+
+ OptionRuleResponse response =
+ service.buildResponse(
+ PluginIdentifier.of("seatunnel", "source",
"NestedSource"), optionRule);
+
+ assertTrue(response.getOptionRule().getRequiredOptions().isEmpty());
+ assertEquals(1, response.getOptionRule().getConditionRules().size());
+
+ OptionRuleResponse.ConditionRuleMetadata rootConditionRule =
+ response.getOptionRule().getConditionRules().get(0);
+ assertEquals("'auth.mode' == PASSWORD",
rootConditionRule.getExpression());
+ assertNotNull(rootConditionRule.getExpressionTree());
+ assertEquals(
+ "username",
+ rootConditionRule
+ .getOptionRule()
+ .getRequiredOptions()
+ .get(0)
+ .getOptions()
+ .get(0)
+ .getKey());
+
+ assertEquals(1,
rootConditionRule.getOptionRule().getConditionRules().size());
+ OptionRuleResponse.ConditionRuleMetadata nestedConditionRule =
+ rootConditionRule.getOptionRule().getConditionRules().get(0);
+ assertEquals("'username' == admin",
nestedConditionRule.getExpression());
+ assertNotNull(nestedConditionRule.getExpressionTree());
+ assertEquals(
+ "password",
+ nestedConditionRule
+ .getOptionRule()
+ .getRequiredOptions()
+ .get(0)
+ .getOptions()
+ .get(0)
+ .getKey());
+
assertTrue(nestedConditionRule.getOptionRule().getConditionRules().isEmpty());
+ }
+
+ @Test
+ void shouldClearCachedOptionRules() {
+ OptionRuleResponse cachedResponse = service.getOptionRules("source",
"FakeSource");
+ OptionRuleResponse sameCachedResponse =
service.getOptionRules("source", "FakeSource");
+
+ assertSame(cachedResponse, sameCachedResponse);
+
+ service.clearCache();
+
+ OptionRuleResponse refreshedResponse =
service.getOptionRules("source", "FakeSource");
+ assertNotSame(cachedResponse, refreshedResponse);
+ assertEquals("FakeSource", refreshedResponse.getPluginName());
+ }
+
+ @Test
+ void shouldReturnConsistentResponsesForConcurrentRequests() throws
Exception {
+ int threadCount = 8;
+ ExecutorService executorService =
Executors.newFixedThreadPool(threadCount);
+ CountDownLatch ready = new CountDownLatch(threadCount);
+ CountDownLatch start = new CountDownLatch(1);
+ List<Future<OptionRuleResponse>> futures = new ArrayList<>();
+ try {
+ for (int i = 0; i < threadCount; i++) {
+ futures.add(
+ executorService.submit(
+ () -> {
+ ready.countDown();
+ assertTrue(start.await(10,
TimeUnit.SECONDS));
+ return service.getOptionRules("source",
"FakeSource");
+ }));
+ }
+
+ assertTrue(ready.await(10, TimeUnit.SECONDS));
+ start.countDown();
+
+ OptionRuleResponse firstResponse = futures.get(0).get(30,
TimeUnit.SECONDS);
+ for (Future<OptionRuleResponse> future : futures) {
+ assertSame(firstResponse, future.get(30, TimeUnit.SECONDS));
+ }
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
+
+ @Test
+ void shouldRejectInvalidType() {
+ IllegalArgumentException error =
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> service.getOptionRules("transform", "Replace"));
+ assertTrue(error.getMessage().contains("Unsupported plugin type"));
+ }
+
+ @Test
+ void shouldRejectBlankPluginName() {
+ IllegalArgumentException error =
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> service.getOptionRules("source", " "));
+ assertTrue(error.getMessage().contains("Parameter 'plugin' cannot be
empty"));
+ }
+
+ @Test
+ void shouldThrowWhenPluginDoesNotExist() {
+ assertThrows(
+ NoSuchElementException.class,
+ () -> service.getOptionRules("source", "MissingPlugin"));
+ }
+
+ private enum AuthMode {
+ PASSWORD,
+ TOKEN
+ }
+}