This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ef1b8b1bb [Improve] [Connector-V2] Change Connector Custom Config
Prefix To Map (#3719)
ef1b8b1bb is described below
commit ef1b8b1bb502089f781b3a9dac4ec97e1e8af54b
Author: Hisoka <[email protected]>
AuthorDate: Tue Jan 3 09:18:48 2023 +0800
[Improve] [Connector-V2] Change Connector Custom Config Prefix To Map
(#3719)
* [Improve] [Connector-V2] Change Connector Custom Config Prefix To Map
---
docs/en/connector-v2/sink/Clickhouse.md | 15 ++--
docs/en/connector-v2/sink/Kafka.md | 44 +++++++-----
docs/en/connector-v2/sink/Rabbitmq.md | 38 ++++++----
docs/en/connector-v2/sink/StarRocks.md | 83 +++++++++++-----------
docs/en/connector-v2/source/kafka.md | 21 +++---
.../common/config/TypesafeConfigUtils.java | 13 +---
.../common/config/TypesafeConfigUtilsTest.java | 15 ----
.../clickhouse/config/ClickhouseConfig.java | 2 +-
.../clickhouse/sink/ClickhouseSinkFactory.java | 4 +-
.../clickhouse/sink/client/ClickhouseSink.java | 9 +--
.../connectors/seatunnel/kafka/config/Config.java | 25 +++----
.../seatunnel/kafka/sink/KafkaSinkFactory.java | 2 +-
.../seatunnel/kafka/sink/KafkaSinkWriter.java | 11 ++-
.../seatunnel/kafka/source/KafkaSource.java | 8 +--
.../seatunnel/kafka/source/KafkaSourceFactory.java | 2 +-
.../seatunnel/rabbitmq/config/RabbitmqConfig.java | 33 +++++----
.../rabbitmq/sink/RabbitmqSinkFactory.java | 4 +-
.../seatunnel/starrocks/config/SinkConfig.java | 48 ++++++-------
.../starrocks/sink/StarRocksSinkFactory.java | 2 +-
19 files changed, 190 insertions(+), 189 deletions(-)
diff --git a/docs/en/connector-v2/sink/Clickhouse.md
b/docs/en/connector-v2/sink/Clickhouse.md
index 5ab4072f3..d82b0417a 100644
--- a/docs/en/connector-v2/sink/Clickhouse.md
+++ b/docs/en/connector-v2/sink/Clickhouse.md
@@ -30,7 +30,7 @@ Write data to Clickhouse can also be done using JDBC
| username | string | yes | - |
| password | string | yes | - |
| fields | string | yes | - |
-| clickhouse.* | string | no | |
+| clickhouse.config | map | no | |
| bulk_size | string | no | 20000 |
| split_mode | string | no | false |
| sharding_key | string | no | - |
@@ -63,12 +63,10 @@ The table name
The data field that needs to be output to `ClickHouse` , if not configured, it
will be automatically adapted according to the sink table `schema` .
-### clickhouse [string]
+### clickhouse.config [map]
In addition to the above mandatory parameters that must be specified by
`clickhouse-jdbc` , users can also specify multiple optional parameters, which
cover all the
[parameters](https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-client#configuration)
provided by `clickhouse-jdbc` .
-The way to specify the parameter is to add the prefix `clickhouse.` to the
original parameter name. For example, the way to specify `socket_timeout` is:
`clickhouse.socket_timeout = 50000` . If these non-essential parameters are not
specified, they will use the default values given by `clickhouse-jdbc`.
-
### bulk_size [number]
The number of rows written through
[Clickhouse-jdbc](https://github.com/ClickHouse/clickhouse-jdbc) each time, the
`default is 20000` .
@@ -113,6 +111,10 @@ sink {
table = "fake_all"
username = "default"
password = ""
+ clickhouse.confg = {
+ max_rows_to_read = "100"
+ read_overflow_mode = "throw"
+ }
}
}
```
@@ -184,7 +186,6 @@ sink {
### next version
- [Improve] Clickhouse Sink support nest type and array
type([3047](https://github.com/apache/incubator-seatunnel/pull/3047))
-
- [Improve] Clickhouse Sink support geo
type([3141](https://github.com/apache/incubator-seatunnel/pull/3141))
-
-- [Feature] Support CDC write DELETE/UPDATE/INSERT events
([3653](https://github.com/apache/incubator-seatunnel/pull/3653))
\ No newline at end of file
+- [Feature] Support CDC write DELETE/UPDATE/INSERT events
([3653](https://github.com/apache/incubator-seatunnel/pull/3653))
+- [Improve] Change Connector Custom Config Prefix To Map
[3719](https://github.com/apache/incubator-seatunnel/pull/3719)
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/Kafka.md
b/docs/en/connector-v2/sink/Kafka.md
index e8860824b..6908597fc 100644
--- a/docs/en/connector-v2/sink/Kafka.md
+++ b/docs/en/connector-v2/sink/Kafka.md
@@ -13,19 +13,19 @@ By default, we will use 2pc to guarantee the message is
sent to kafka exactly on
## Options
-| name | type | required | default value |
-|----------------------|-----------------------| -------- | ------------- |
-| topic | string | yes | - |
-| bootstrap.servers | string | yes | - |
-| kafka.* | kafka producer config | no | - |
-| semantic | string | no | NON |
-| partition_key_fields | array | no | - |
-| partition | int | no | - |
-| assign_partitions | array | no | - |
-| transaction_prefix | string | no | - |
-| format | String | no | json |
-| field_delimiter | String | no | , |
-| common-options | config | no | - |
+| name | type | required | default value |
+|----------------------|--------|----------|---------------|
+| topic | string | yes | - |
+| bootstrap.servers | string | yes | - |
+| kafka.config | map | no | - |
+| semantic | string | no | NON |
+| partition_key_fields | array | no | - |
+| partition | int | no | - |
+| assign_partitions | array | no | - |
+| transaction_prefix | string | no | - |
+| format | String | no | json |
+| field_delimiter | String | no | , |
+| common-options | config | no | - |
### topic [string]
@@ -59,10 +59,10 @@ For example, if you want to use value of fields from
upstream data as key, you c
Upstream data is the following:
-| name | age | data |
-| ---- | ---- | ------------- |
-| Jack | 16 | data-example1 |
-| Mary | 23 | data-example2 |
+| name | age | data |
+|------|-----|---------------|
+| Jack | 16 | data-example1 |
+| Mary | 23 | data-example2 |
If name is set as the key, then the hash value of the name column will
determine which partition the message is sent to.
@@ -79,7 +79,7 @@ We can decide which partition to send based on the content of
the message. The f
For example, there are five partitions in total, and the assign_partitions
field in config is as follows:
assign_partitions = ["shoe", "clothing"]
-Then the message containing "shoe" will be sent to partition zero ,because
"shoe" is subscripted as zero in assign_partitions, and the message containing
"clothing" will be sent to partition one.For other messages, the hash algorithm
will be used to divide them into the remaining partitions.
+Then the message containing "shoe" will be sent to partition zero ,because
"shoe" is subscribed as zero in assign_partitions, and the message containing
"clothing" will be sent to partition one.For other messages, the hash algorithm
will be used to divide them into the remaining partitions.
This function by `MessageContentPartitioner` class implements
`org.apache.kafka.clients.producer.Partitioner` interface.If we need custom
partitions, we need to implement this interface as well.
@@ -113,6 +113,11 @@ sink {
format = json
kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
+ kafka.config = {
+ acks = "all"
+ request.timeout.ms = 60000
+ buffer.memory = 33554432
+ }
}
}
@@ -184,4 +189,5 @@ sink {
### next version
- [Improve] Support to specify multiple partition keys
[3230](https://github.com/apache/incubator-seatunnel/pull/3230)
-- [Improve] Add text format for kafka sink connector
[3711](https://github.com/apache/incubator-seatunnel/pull/3711)
\ No newline at end of file
+- [Improve] Add text format for kafka sink connector
[3711](https://github.com/apache/incubator-seatunnel/pull/3711)
+- [Improve] Change Connector Custom Config Prefix To Map
[3719](https://github.com/apache/incubator-seatunnel/pull/3719)
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/Rabbitmq.md
b/docs/en/connector-v2/sink/Rabbitmq.md
index aa0aeb784..0e87b9a35 100644
--- a/docs/en/connector-v2/sink/Rabbitmq.md
+++ b/docs/en/connector-v2/sink/Rabbitmq.md
@@ -12,20 +12,21 @@ Used to write data to Rabbitmq.
## Options
-| name | type | required | default value |
-|-----------------------------|---------|-----------|---------------|
-| host | string | yes | - |
-| port | int | yes | - |
-| virtual_host | string | yes | - |
-| username | string | yes | - |
-| password | string | yes | - |
-| queue_name | string | yes | - |
-| url | string | no | - |
-| network_recovery_interval | int | no | - |
-| topology_recovery_enabled | boolean | no | - |
-| automatic_recovery_enabled | boolean | no | - |
-| connection_timeout | int | no | - |
-| common-options | | no | - |
+| name | type | required | default value |
+|----------------------------|---------|-----------|---------------|
+| host | string | yes | - |
+| port | int | yes | - |
+| virtual_host | string | yes | - |
+| username | string | yes | - |
+| password | string | yes | - |
+| queue_name | string | yes | - |
+| url | string | no | - |
+| network_recovery_interval | int | no | - |
+| topology_recovery_enabled | boolean | no | - |
+| automatic_recovery_enabled | boolean | no | - |
+| connection_timeout | int | no | - |
+| rabbitmq.config | map | no | - |
+| common-options | | no | - |
### host [string]
@@ -77,6 +78,10 @@ if true, enables connection recovery
connection TCP establishment timeout in milliseconds; zero for infinite
+### rabbitmq.config [map]
+
+In addition to the above parameters that must be specified by the RabbitMQ
client, the user can also specify multiple non-mandatory parameters for the
client, covering [all the parameters specified in the official RabbitMQ
document](https://www.rabbitmq.com/configure.html).
+
### common options
Sink plugin common parameters, please refer to [Sink Common
Options](common-options.md) for details
@@ -94,6 +99,10 @@ sink {
username = "guest"
password = "guest"
queue_name = "test1"
+ rabbitmq.config = {
+ requested-heartbeat = 10
+ connection-timeout = 10
+ }
}
}
```
@@ -103,3 +112,4 @@ sink {
### next version
- Add Rabbitmq Sink Connector
+- [Improve] Change Connector Custom Config Prefix To Map
[3719](https://github.com/apache/incubator-seatunnel/pull/3719)
diff --git a/docs/en/connector-v2/sink/StarRocks.md
b/docs/en/connector-v2/sink/StarRocks.md
index 8cab5aabe..02243f722 100644
--- a/docs/en/connector-v2/sink/StarRocks.md
+++ b/docs/en/connector-v2/sink/StarRocks.md
@@ -11,21 +11,21 @@ The internal implementation of StarRocks sink connector is
cached and imported b
## Options
-| name | type | required |
default value |
-|-----------------------------|------------------------------|----------|-----------------|
-| node_urls | list | yes | -
|
-| username | string | yes | -
|
-| password | string | yes | -
|
-| database | string | yes | -
|
-| table | string | yes | -
|
-| labelPrefix | string | no | -
|
-| batch_max_rows | long | no | 1024
|
-| batch_max_bytes | int | no | 5 *
1024 * 1024 |
-| batch_interval_ms | int | no | -
|
-| max_retries | int | no | -
|
-| retry_backoff_multiplier_ms | int | no | -
|
-| max_retry_backoff_ms | int | no | -
|
-| sink.properties.* | starrocks stream load config | no | -
|
+| name | type | required | default value |
+|-----------------------------|--------|----------|-----------------|
+| node_urls | list | yes | - |
+| username | string | yes | - |
+| password | string | yes | - |
+| database | string | yes | - |
+| table | string | yes | - |
+| labelPrefix | string | no | - |
+| batch_max_rows | long | no | 1024 |
+| batch_max_bytes | int | no | 5 * 1024 * 1024 |
+| batch_interval_ms | int | no | - |
+| max_retries | int | no | - |
+| retry_backoff_multiplier_ms | int | no | - |
+| max_retry_backoff_ms | int | no | - |
+| starrocks.config | map | no | - |
### node_urls [list]
@@ -75,11 +75,9 @@ Using as a multiplier for generating the next delay for
backoff
The amount of time to wait before attempting to retry a request to `StarRocks`
-### sink.properties.* [starrocks stream load config]
+### starrocks.config [map]
The parameter of the stream load `data_desc`
-The way to specify the parameter is to add the prefix `sink.properties.` to
the original stream load parameter name
-For example, the way to specify `strip_outer_array` is:
`sink.properties.strip_outer_array`
#### Supported import data formats
@@ -89,37 +87,41 @@ The supported formats include CSV and JSON. Default value:
CSV
Use JSON format to import data
-```
+```hocon
sink {
- StarRocks {
- nodeUrls = ["e2e_starRocksdb:8030"]
- username = root
- password = ""
- database = "test"
- table = "e2e_table_sink"
- batch_max_rows = 10
- sink.properties.format = "JSON"
- sink.properties.strip_outer_array = true
+ StarRocks {
+ nodeUrls = ["e2e_starRocksdb:8030"]
+ username = root
+ password = ""
+ database = "test"
+ table = "e2e_table_sink"
+ batch_max_rows = 10
+ starrocks.config = {
+ format = "JSON"
+ strip_outer_array = true
}
+ }
}
```
Use CSV format to import data
-```
+```hocon
sink {
- StarRocks {
- nodeUrls = ["e2e_starRocksdb:8030"]
- username = root
- password = ""
- database = "test"
- table = "e2e_table_sink"
- batch_max_rows = 10
- sink.properties.format = "CSV"
- sink.properties.column_separator = "\x01"
- sink.properties.row_delimiter = "\x02"
+ StarRocks {
+ nodeUrls = ["e2e_starRocksdb:8030"]
+ username = root
+ password = ""
+ database = "test"
+ table = "e2e_table_sink"
+ batch_max_rows = 10
+ starrocks.config = {
+ format = "CSV"
+ column_separator = "\\x01"
+ row_delimiter = "\\x02"
}
+ }
}
```
@@ -127,4 +129,5 @@ sink {
### next version
-- Add StarRocks Sink Connector
\ No newline at end of file
+- Add StarRocks Sink Connector
+- [Improve] Change Connector Custom Config Prefix To Map
[3719](https://github.com/apache/incubator-seatunnel/pull/3719)
\ No newline at end of file
diff --git a/docs/en/connector-v2/source/kafka.md
b/docs/en/connector-v2/source/kafka.md
index 801240879..449bf8a90 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -18,13 +18,13 @@ Source connector for Apache Kafka.
## Options
| name | type | required | default value
|
-|-------------------------------------|---------| --------
|--------------------------|
+|-------------------------------------|---------|----------|--------------------------|
| topic | String | yes | -
|
| bootstrap.servers | String | yes | -
|
| pattern | Boolean | no | false
|
| consumer.group | String | no |
SeaTunnel-Consumer-Group |
| commit_on_checkpoint | Boolean | no | true
|
-| kafka.* | String | no | -
|
+| kafka.config | Map | no | -
|
| common-options | config | no | -
|
| schema | | no | -
|
| format | String | no | json
|
@@ -58,12 +58,10 @@ If true the consumer's offset will be periodically
committed in the background.
The interval for dynamically discovering topics and partitions.
-### kafka.* [string]
+### kafka.config [map]
In addition to the above necessary parameters that must be specified by the
`Kafka consumer` client, users can also specify multiple `consumer` client
non-mandatory parameters, covering [all consumer parameters specified in the
official Kafka
document](https://kafka.apache.org/documentation.html#consumerconfigs).
-The way to specify parameters is to add the prefix `kafka.` to the original
parameter name. For example, the way to specify `auto.offset.reset` is:
`kafka.auto.offset.reset = latest` . If these non-essential parameters are not
specified, they will use the default values given in the official Kafka
documentation.
-
### common-options [config]
Source plugin common parameters, please refer to [Source Common
Options](common-options.md) for details.
@@ -88,7 +86,7 @@ The initial consumption pattern of consumers,there are
several types:
## start_mode.timestamp
-The time required for consumption mode to be timestamp.
+The time required for consumption mode to be "timestamp".
## start_mode.offsets
@@ -120,11 +118,15 @@ source {
}
}
format = text
- field_delimiter = "#“
+ field_delimiter = "#"
topic = "topic_1,topic_2,topic_3"
bootstrap.servers = "localhost:9092"
- kafka.max.poll.records = 500
- kafka.client.id = client_1
+ kafka.config = {
+ client.id = client_1
+ max.poll.records = 500
+ auto.offset.reset = "earliest"
+ enable.auto.commit = "false"
+ }
}
}
@@ -210,4 +212,5 @@ source {
- [Improve] Support setting read starting offset or time at startup config
([3157](https://github.com/apache/incubator-seatunnel/pull/3157))
- [Improve] Support for dynamic discover topic & partition in streaming mode
([3125](https://github.com/apache/incubator-seatunnel/pull/3125))
+- [Improve] Change Connector Custom Config Prefix To Map
[3719](https://github.com/apache/incubator-seatunnel/pull/3719)
- [Bug] Fixed the problem that parsing the offset format failed when the
startup mode was
offset([3810](https://github.com/apache/incubator-seatunnel/pull/3810))
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
index 02cc58d07..7d215dc95 100644
---
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
@@ -38,7 +38,9 @@ public final class TypesafeConfigUtils {
* @param source config source
* @param prefix config prefix
* @param keepPrefix true if keep prefix
+ * @deprecated use org.apache.seatunnel.api.configuration.Option interface
instead
*/
+ @Deprecated
public static Config extractSubConfig(Config source, String prefix,
boolean keepPrefix) {
// use LinkedHashMap to keep insertion order
@@ -84,17 +86,6 @@ public final class TypesafeConfigUtils {
return hasConfig;
}
- public static Config extractSubConfigThrowable(Config source, String
prefix, boolean keepPrefix) {
-
- Config config = extractSubConfig(source, prefix, keepPrefix);
-
- if (config.isEmpty()) {
- throw new ConfigRuntimeException("config is empty");
- }
-
- return config;
- }
-
@SuppressWarnings("unchecked")
public static <T> T getConfig(final Config config, final String configKey,
@NonNull final T defaultValue) {
if (defaultValue.getClass().equals(Long.class)) {
diff --git
a/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
index 3b4964c57..6977bed12 100644
---
a/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
+++
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
@@ -18,9 +18,7 @@
package org.apache.seatunnel.common.config;
import static
org.apache.seatunnel.common.config.TypesafeConfigUtils.extractSubConfig;
-import static
org.apache.seatunnel.common.config.TypesafeConfigUtils.extractSubConfigThrowable;
import static
org.apache.seatunnel.common.config.TypesafeConfigUtils.hasSubConfig;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
@@ -59,19 +57,6 @@ public class TypesafeConfigUtilsTest {
Assertions.assertFalse(hasSubConfig);
}
- @Test
- public void testExtractSubConfigThrowable() {
- Config config = getConfig();
-
- assertThrows(ConfigRuntimeException.class, () ->
extractSubConfigThrowable(config, "test1.", false), "config is empty");
-
- Config subConfig = extractSubConfigThrowable(config, "test.", false);
- Map<String, String> configMap = new HashMap<>();
- configMap.put("t0", "v0");
- configMap.put("t1", "v1");
- Assertions.assertEquals(ConfigFactory.parseMap(configMap), subConfig);
- }
-
public Config getConfig() {
Map<String, Object> configMap = new HashMap<>();
configMap.put("test.t0", "v0");
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
index 9f93eb36c..ecaef394a 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
@@ -129,7 +129,7 @@ public class ClickhouseConfig {
public static final Option<List<NodePassConfig>> NODE_PASS =
Options.key("node_pass").listType(NodePassConfig.class)
.noDefaultValue().withDescription("The password of Clickhouse server
node");
- public static final Option<Map<String, String>> CLICKHOUSE_PREFIX =
Options.key("clickhouse").mapType()
+ public static final Option<Map<String, String>> CLICKHOUSE_CONFIG =
Options.key("clickhouse.config").mapType()
.defaultValue(Collections.emptyMap()).withDescription("Clickhouse
custom config");
public static final Option<String> FILE_FIELDS_DELIMITER =
Options.key("file_fields_delimiter").stringType()
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
index 03d71f136..0e343a7d8 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
@@ -19,7 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_PREFIX;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
@@ -48,7 +48,7 @@ public class ClickhouseSinkFactory implements
TableSinkFactory {
public OptionRule optionRule() {
return OptionRule.builder()
.required(HOST, DATABASE, TABLE)
- .optional(CLICKHOUSE_PREFIX,
+ .optional(CLICKHOUSE_CONFIG,
BULK_SIZE,
SPLIT_MODE,
FIELDS,
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index 61149daad..13f0c49d1 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -19,7 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
-import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_PREFIX;
+import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
import static
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
@@ -42,7 +42,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
@@ -115,10 +114,8 @@ public class ClickhouseSink implements
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
}
Properties clickhouseProperties = new Properties();
- if (TypesafeConfigUtils.hasSubConfig(config, CLICKHOUSE_PREFIX.key() +
".")) {
- TypesafeConfigUtils.extractSubConfig(config,
CLICKHOUSE_PREFIX.key() + ".", false).entrySet().forEach(e -> {
- clickhouseProperties.put(e.getKey(),
String.valueOf(e.getValue().unwrapped()));
- });
+ if (CheckConfigUtil.isValidParam(config, CLICKHOUSE_CONFIG.key())) {
+ config.getObject(CLICKHOUSE_CONFIG.key()).forEach((key, value) ->
clickhouseProperties.put(key, String.valueOf(value.unwrapped())));
}
if (isCredential) {
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index f8a69901a..2962339b7 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import java.util.List;
+import java.util.Map;
public class Config {
@@ -36,22 +37,22 @@ public class Config {
*/
public static final String DEFAULT_FIELD_DELIMITER = ",";
- public static final Option<String> KAFKA_CONFIG_PREFIX =
Options.key("kafka.")
- .stringType()
- .noDefaultValue()
- .withDescription("In addition to the above parameters that must be
specified by the Kafka producer or consumer client, " +
- "the user can also specify multiple non-mandatory
parameters for the producer or consumer client, " +
- "covering all the producer parameters specified in the
official Kafka document.");
+ public static final Option<Map<String, String>> KAFKA_CONFIG =
Options.key("kafka.config")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("In addition to the above parameters that must be
specified by the Kafka producer or consumer client, " +
+ "the user can also specify multiple non-mandatory parameters for
the producer or consumer client, " +
+ "covering all the producer parameters specified in the official
Kafka document.");
public static final Option<String> TOPIC = Options.key("topic")
- .stringType()
- .noDefaultValue()
- .withDescription("Kafka topic name. If there are multiple topics,
use , to split, for example: \"tpc1,tpc2\".");
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Kafka topic name. If there are multiple topics, use
, to split, for example: \"tpc1,tpc2\".");
public static final Option<Boolean> PATTERN = Options.key("pattern")
- .booleanType()
- .defaultValue(false)
- .withDescription("If pattern is set to true,the regular expression
for a pattern of topic names to read from." +
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("If pattern is set to true,the regular expression for
a pattern of topic names to read from." +
" All topics in clients with names that match the
specified regular expression will be subscribed by the consumer.");
public static final Option<String> BOOTSTRAP_SERVERS =
Options.key("bootstrap.servers")
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
index fef40310e..8f897aa20 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
@@ -35,7 +35,7 @@ public class KafkaSinkFactory implements TableSinkFactory {
public OptionRule optionRule() {
return OptionRule.builder()
.required(Config.TOPIC, Config.BOOTSTRAP_SERVERS)
- .optional(Config.KAFKA_CONFIG_PREFIX,
Config.ASSIGN_PARTITIONS, Config.TRANSACTION_PREFIX)
+ .optional(Config.KAFKA_CONFIG, Config.ASSIGN_PARTITIONS,
Config.TRANSACTION_PREFIX)
.exclusive(Config.PARTITION, Config.PARTITION_KEY_FIELDS)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 4cbdf10d5..94e4ec922 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -22,7 +22,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFA
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
-import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY_FIELDS;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
@@ -31,7 +31,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRAN
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSemantics;
import
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
@@ -136,11 +136,10 @@ public class KafkaSinkWriter implements
SinkWriter<SeaTunnelRow, KafkaCommitInfo
}
private Properties getKafkaProperties(Config pluginConfig) {
- Config kafkaConfig =
TypesafeConfigUtils.extractSubConfig(pluginConfig, KAFKA_CONFIG_PREFIX.key(),
false);
Properties kafkaProperties = new Properties();
- kafkaConfig.entrySet().forEach(entry -> {
- kafkaProperties.put(entry.getKey(), entry.getValue().unwrapped());
- });
+ if (CheckConfigUtil.isValidParam(pluginConfig, KAFKA_CONFIG.key())) {
+ pluginConfig.getObject(KAFKA_CONFIG.key()).forEach((key, value) ->
kafkaProperties.put(key, value.unwrapped()));
+ }
if (pluginConfig.hasPath(ASSIGN_PARTITIONS.key())) {
kafkaProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,
"org.apache.seatunnel.connectors.seatunnel.kafka.sink.MessageContentPartitioner");
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 52d7a067e..d840311c1 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -24,6 +24,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFA
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
+import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
import static
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SCHEMA;
@@ -46,7 +47,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
@@ -159,9 +159,9 @@ public class KafkaSource implements
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
this.discoveryIntervalMillis =
config.getLong(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key());
}
- TypesafeConfigUtils.extractSubConfig(config, "kafka.",
false).entrySet().forEach(e -> {
- this.metadata.getProperties().put(e.getKey(),
String.valueOf(e.getValue().unwrapped()));
- });
+ if (CheckConfigUtil.isValidParam(config, KAFKA_CONFIG.key())) {
+ config.getObject(KAFKA_CONFIG.key()).forEach((key, value) ->
this.metadata.getProperties().put(key, value.unwrapped()));
+ }
setDeserialization(config);
}
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
index 8a4dce93f..3146b8e0f 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
@@ -38,7 +38,7 @@ public class KafkaSourceFactory implements TableSourceFactory
{
return OptionRule.builder()
.required(Config.TOPIC, Config.BOOTSTRAP_SERVERS)
.optional(Config.START_MODE, Config.PATTERN,
Config.CONSUMER_GROUP, Config.COMMIT_ON_CHECKPOINT,
- Config.KAFKA_CONFIG_PREFIX, Config.SCHEMA,
+ Config.KAFKA_CONFIG, Config.SCHEMA,
Config.FORMAT, Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS)
.conditional(Config.START_MODE, StartMode.TIMESTAMP,
Config.START_MODE_TIMESTAMP)
.conditional(Config.START_MODE, StartMode.SPECIFIC_OFFSETS,
Config.START_MODE_OFFSETS)
diff --git
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
index ff3051b0c..cf24a4043 100644
---
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
+++
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
@@ -19,7 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -29,6 +29,7 @@ import lombok.Getter;
import lombok.Setter;
import java.io.Serializable;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -58,8 +59,6 @@ public class RabbitmqConfig implements Serializable {
private boolean forE2ETesting = false;
- public static final String RABBITMQ_SINK_CONFIG_PREFIX =
"rabbitmq.properties.";
-
private final Map<String, Object> sinkOptionProps = new HashMap<>();
public static final Option<String> HOST = Options.key("host")
@@ -150,22 +149,26 @@ public class RabbitmqConfig implements Serializable {
.withDescription("the routing key to publish the message to");
public static final Option<String> EXCHANGE = Options.key("exchange")
- .stringType()
- .noDefaultValue()
- .withDescription("the exchange to publish the message to");
+ .stringType()
+ .noDefaultValue()
+ .withDescription("the exchange to publish the message to");
public static final Option<Boolean> FOR_E2E_TESTING =
Options.key("for_e2e_testing")
- .booleanType()
- .noDefaultValue()
- .withDescription("use to recognize E2E mode");
+ .booleanType()
+ .noDefaultValue()
+ .withDescription("use to recognize E2E mode");
+
+ public static final Option<Map<String, String>> RABBITMQ_CONFIG =
Options.key("rabbitmq.config").mapType()
+ .defaultValue(Collections.emptyMap()).withDescription("In addition to
the above parameters that must be specified by the RabbitMQ client, the user
can also specify multiple non-mandatory parameters for the client, " +
+ "covering [all the parameters specified in the official RabbitMQ
document](https://www.rabbitmq.com/configure.html).");
private void parseSinkOptionProperties(Config pluginConfig) {
- Config sinkOptionConfig =
TypesafeConfigUtils.extractSubConfig(pluginConfig,
- RABBITMQ_SINK_CONFIG_PREFIX, false);
- sinkOptionConfig.entrySet().forEach(entry -> {
- final String configKey = entry.getKey().toLowerCase();
- this.sinkOptionProps.put(configKey, entry.getValue().unwrapped());
- });
+ if (CheckConfigUtil.isValidParam(pluginConfig, RABBITMQ_CONFIG.key()))
{
+ pluginConfig.getObject(RABBITMQ_CONFIG.key()).forEach((key, value)
-> {
+ final String configKey = key.toLowerCase();
+ this.sinkOptionProps.put(configKey, value.unwrapped());
+ });
+ }
}
public RabbitmqConfig(Config config) {
diff --git
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java
index 12ee197c2..449badcaa 100644
---
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java
@@ -25,6 +25,7 @@ import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.Rabbitmq
import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD;
import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT;
import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME;
+import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.RABBITMQ_CONFIG;
import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.ROUTING_KEY;
import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.TOPOLOGY_RECOVERY_ENABLED;
import static
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.URL;
@@ -62,7 +63,8 @@ public class RabbitmqSinkFactory implements
TableSourceFactory {
NETWORK_RECOVERY_INTERVAL,
TOPOLOGY_RECOVERY_ENABLED,
AUTOMATIC_RECOVERY_ENABLED,
- CONNECTION_TIMEOUT
+ CONNECTION_TIMEOUT,
+ RABBITMQ_CONFIG
)
.build();
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
index dc8f1bd60..16da9f983 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
@@ -19,7 +19,7 @@ package
org.apache.seatunnel.connectors.seatunnel.starrocks.config;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -64,30 +64,30 @@ public class SinkConfig {
.withDescription("The prefix of StarRocks stream load label");
public static final Option<String> DATABASE = Options.key("database")
- .stringType()
- .noDefaultValue()
- .withDescription("The name of StarRocks database");
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The name of StarRocks database");
public static final Option<String> TABLE = Options.key("table")
- .stringType()
- .noDefaultValue()
- .withDescription("The name of StarRocks table");
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The name of StarRocks table");
- public static final Option<String> STARROCKS_SINK_CONFIG_PREFIX =
Options.key("sink.properties.")
- .stringType()
- .noDefaultValue()
- .withDescription("The parameter of the stream load data_desc. " +
- "The way to specify the parameter is to add the prefix
`sink.properties.` to the original stream load parameter name ");
+ public static final Option<Map<String, String>> STARROCKS_CONFIG =
Options.key("starrocks.config")
+ .mapType()
+ .noDefaultValue()
+ .withDescription("The parameter of the stream load data_desc. " +
+ "The way to specify the parameter is to add the original stream
load parameter into map");
public static final Option<Integer> BATCH_MAX_SIZE =
Options.key("batch_max_rows")
- .intType()
- .defaultValue(DEFAULT_BATCH_MAX_SIZE)
- .withDescription("For batch writing, when the number of buffers
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
+ .intType()
+ .defaultValue(DEFAULT_BATCH_MAX_SIZE)
+ .withDescription("For batch writing, when the number of buffers
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
public static final Option<Long> BATCH_MAX_BYTES =
Options.key("batch_max_bytes")
- .longType()
- .defaultValue(DEFAULT_BATCH_BYTES)
- .withDescription("For batch writing, when the number of buffers
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
+ .longType()
+ .defaultValue(DEFAULT_BATCH_BYTES)
+ .withDescription("For batch writing, when the number of buffers
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
public static final Option<Integer> BATCH_INTERVAL_MS =
Options.key("batch_interval_ms")
.intType()
@@ -182,11 +182,11 @@ public class SinkConfig {
}
private static void parseSinkStreamLoadProperties(Config pluginConfig,
SinkConfig sinkConfig) {
- Config starRocksConfig =
TypesafeConfigUtils.extractSubConfig(pluginConfig,
- STARROCKS_SINK_CONFIG_PREFIX.key(), false);
- starRocksConfig.entrySet().forEach(entry -> {
- final String configKey = entry.getKey().toLowerCase();
- sinkConfig.streamLoadProps.put(configKey,
entry.getValue().unwrapped());
- });
+ if (CheckConfigUtil.isValidParam(pluginConfig,
STARROCKS_CONFIG.key())) {
+ pluginConfig.getObject(STARROCKS_CONFIG.key()).forEach((key,
value) -> {
+ final String configKey = key.toLowerCase();
+ sinkConfig.streamLoadProps.put(configKey, value.unwrapped());
+ });
+ }
}
}
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index 146cec807..579fa6ad5 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -37,7 +37,7 @@ public class StarRocksSinkFactory implements TableSinkFactory
{
.required(SinkConfig.NODE_URLS, SinkConfig.USERNAME,
SinkConfig.PASSWORD, SinkConfig.DATABASE, SinkConfig.TABLE)
.optional(SinkConfig.LABEL_PREFIX, SinkConfig.BATCH_MAX_SIZE,
SinkConfig.BATCH_MAX_BYTES,
SinkConfig.BATCH_INTERVAL_MS, SinkConfig.MAX_RETRIES,
SinkConfig.MAX_RETRY_BACKOFF_MS,
- SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS,
SinkConfig.STARROCKS_SINK_CONFIG_PREFIX)
+ SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS,
SinkConfig.STARROCKS_CONFIG)
.build();
}
}