This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ef1b8b1bb [Improve] [Connector-V2] Change Connector Custom Config 
Prefix To Map (#3719)
ef1b8b1bb is described below

commit ef1b8b1bb502089f781b3a9dac4ec97e1e8af54b
Author: Hisoka <[email protected]>
AuthorDate: Tue Jan 3 09:18:48 2023 +0800

    [Improve] [Connector-V2] Change Connector Custom Config Prefix To Map 
(#3719)
    
    * [Improve] [Connector-V2] Change Connector Custom Config Prefix To Map
---
 docs/en/connector-v2/sink/Clickhouse.md            | 15 ++--
 docs/en/connector-v2/sink/Kafka.md                 | 44 +++++++-----
 docs/en/connector-v2/sink/Rabbitmq.md              | 38 ++++++----
 docs/en/connector-v2/sink/StarRocks.md             | 83 +++++++++++-----------
 docs/en/connector-v2/source/kafka.md               | 21 +++---
 .../common/config/TypesafeConfigUtils.java         | 13 +---
 .../common/config/TypesafeConfigUtilsTest.java     | 15 ----
 .../clickhouse/config/ClickhouseConfig.java        |  2 +-
 .../clickhouse/sink/ClickhouseSinkFactory.java     |  4 +-
 .../clickhouse/sink/client/ClickhouseSink.java     |  9 +--
 .../connectors/seatunnel/kafka/config/Config.java  | 25 +++----
 .../seatunnel/kafka/sink/KafkaSinkFactory.java     |  2 +-
 .../seatunnel/kafka/sink/KafkaSinkWriter.java      | 11 ++-
 .../seatunnel/kafka/source/KafkaSource.java        |  8 +--
 .../seatunnel/kafka/source/KafkaSourceFactory.java |  2 +-
 .../seatunnel/rabbitmq/config/RabbitmqConfig.java  | 33 +++++----
 .../rabbitmq/sink/RabbitmqSinkFactory.java         |  4 +-
 .../seatunnel/starrocks/config/SinkConfig.java     | 48 ++++++-------
 .../starrocks/sink/StarRocksSinkFactory.java       |  2 +-
 19 files changed, 190 insertions(+), 189 deletions(-)

diff --git a/docs/en/connector-v2/sink/Clickhouse.md 
b/docs/en/connector-v2/sink/Clickhouse.md
index 5ab4072f3..d82b0417a 100644
--- a/docs/en/connector-v2/sink/Clickhouse.md
+++ b/docs/en/connector-v2/sink/Clickhouse.md
@@ -30,7 +30,7 @@ Write data to Clickhouse can also be done using JDBC
 | username                              | string  | yes      | -             |
 | password                              | string  | yes      | -             |
 | fields                                | string  | yes      | -             |
-| clickhouse.*                          | string  | no       |               |
+| clickhouse.config                     | map     | no       |               |
 | bulk_size                             | string  | no       | 20000         |
 | split_mode                            | string  | no       | false         |
 | sharding_key                          | string  | no       | -             |
@@ -63,12 +63,10 @@ The table name
 
 The data field that needs to be output to `ClickHouse` , if not configured, it 
will be automatically adapted according to the sink table `schema` .
 
-### clickhouse [string]
+### clickhouse.config [map]
 
 In addition to the above mandatory parameters that must be specified by 
`clickhouse-jdbc` , users can also specify multiple optional parameters, which 
cover all the 
[parameters](https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-client#configuration)
 provided by `clickhouse-jdbc` .
 
-The way to specify the parameter is to add the prefix `clickhouse.` to the 
original parameter name. For example, the way to specify `socket_timeout` is: 
`clickhouse.socket_timeout = 50000` . If these non-essential parameters are not 
specified, they will use the default values given by `clickhouse-jdbc`.
-
 ### bulk_size [number]
 
 The number of rows written through 
[Clickhouse-jdbc](https://github.com/ClickHouse/clickhouse-jdbc) each time, the 
`default is 20000` .
@@ -113,6 +111,10 @@ sink {
     table = "fake_all"
     username = "default"
     password = ""
+    clickhouse.confg = {
+      max_rows_to_read = "100"
+      read_overflow_mode = "throw"
+    }
   }
 }
 ```
@@ -184,7 +186,6 @@ sink {
 ### next version
 
 - [Improve] Clickhouse Sink support nest type and array 
type([3047](https://github.com/apache/incubator-seatunnel/pull/3047))
-
 - [Improve] Clickhouse Sink support geo 
type([3141](https://github.com/apache/incubator-seatunnel/pull/3141))
-
-- [Feature] Support CDC write DELETE/UPDATE/INSERT events 
([3653](https://github.com/apache/incubator-seatunnel/pull/3653))
\ No newline at end of file
+- [Feature] Support CDC write DELETE/UPDATE/INSERT events 
([3653](https://github.com/apache/incubator-seatunnel/pull/3653))
+- [Improve] Change Connector Custom Config Prefix To Map 
[3719](https://github.com/apache/incubator-seatunnel/pull/3719)
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/Kafka.md 
b/docs/en/connector-v2/sink/Kafka.md
index e8860824b..6908597fc 100644
--- a/docs/en/connector-v2/sink/Kafka.md
+++ b/docs/en/connector-v2/sink/Kafka.md
@@ -13,19 +13,19 @@ By default, we will use 2pc to guarantee the message is 
sent to kafka exactly on
 
 ## Options
 
-| name                 | type                  | required | default value |
-|----------------------|-----------------------| -------- | ------------- |
-| topic                | string                | yes      | -             |
-| bootstrap.servers    | string                | yes      | -             |
-| kafka.*              | kafka producer config | no       | -             |
-| semantic             | string                | no       | NON           |
-| partition_key_fields | array                 | no       | -             |
-| partition            | int                   | no       | -             |
-| assign_partitions    | array                 | no       | -             |
-| transaction_prefix   | string                | no       | -             |
-| format               | String                | no       | json          |
-| field_delimiter      | String                | no       | ,             |
-| common-options       | config                | no       | -             |
+| name                 | type   | required | default value |
+|----------------------|--------|----------|---------------|
+| topic                | string | yes      | -             |
+| bootstrap.servers    | string | yes      | -             |
+| kafka.config         | map    | no       | -             |
+| semantic             | string | no       | NON           |
+| partition_key_fields | array  | no       | -             |
+| partition            | int    | no       | -             |
+| assign_partitions    | array  | no       | -             |
+| transaction_prefix   | string | no       | -             |
+| format               | String | no       | json          |
+| field_delimiter      | String | no       | ,             |
+| common-options       | config | no       | -             |
 
 ### topic [string]
 
@@ -59,10 +59,10 @@ For example, if you want to use value of fields from 
upstream data as key, you c
 
 Upstream data is the following:
 
-| name | age  | data          |
-| ---- | ---- | ------------- |
-| Jack | 16   | data-example1 |
-| Mary | 23   | data-example2 |
+| name | age | data          |
+|------|-----|---------------|
+| Jack | 16  | data-example1 |
+| Mary | 23  | data-example2 |
 
 If name is set as the key, then the hash value of the name column will 
determine which partition the message is sent to.
 
@@ -79,7 +79,7 @@ We can decide which partition to send based on the content of 
the message. The f
 For example, there are five partitions in total, and the assign_partitions 
field in config is as follows:
 assign_partitions = ["shoe", "clothing"]
 
-Then the message containing "shoe" will be sent to partition zero ,because 
"shoe" is subscripted as zero in assign_partitions, and the message containing 
"clothing" will be sent to partition one.For other messages, the hash algorithm 
will be used to divide them into the remaining partitions.
+Then the message containing "shoe" will be sent to partition zero ,because 
"shoe" is subscribed as zero in assign_partitions, and the message containing 
"clothing" will be sent to partition one.For other messages, the hash algorithm 
will be used to divide them into the remaining partitions.
 
 This function by `MessageContentPartitioner` class implements 
`org.apache.kafka.clients.producer.Partitioner` interface.If we need custom 
partitions, we need to implement this interface as well.
 
@@ -113,6 +113,11 @@ sink {
       format = json
       kafka.request.timeout.ms = 60000
       semantics = EXACTLY_ONCE
+      kafka.config = {
+        acks = "all"
+        request.timeout.ms = 60000
+        buffer.memory = 33554432
+      }
   }
   
 }
@@ -184,4 +189,5 @@ sink {
 ### next version
 
 - [Improve] Support to specify multiple partition keys 
[3230](https://github.com/apache/incubator-seatunnel/pull/3230)
-- [Improve] Add text format for kafka sink connector 
[3711](https://github.com/apache/incubator-seatunnel/pull/3711)
\ No newline at end of file
+- [Improve] Add text format for kafka sink connector 
[3711](https://github.com/apache/incubator-seatunnel/pull/3711)
+- [Improve] Change Connector Custom Config Prefix To Map 
[3719](https://github.com/apache/incubator-seatunnel/pull/3719)
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/Rabbitmq.md 
b/docs/en/connector-v2/sink/Rabbitmq.md
index aa0aeb784..0e87b9a35 100644
--- a/docs/en/connector-v2/sink/Rabbitmq.md
+++ b/docs/en/connector-v2/sink/Rabbitmq.md
@@ -12,20 +12,21 @@ Used to write data to Rabbitmq.
 
 ##  Options
 
-| name                        | type    | required  | default value |
-|-----------------------------|---------|-----------|---------------|
-| host                        | string  | yes       | -             |
-| port                        | int     | yes       | -             |
-| virtual_host                | string  | yes       | -             |
-| username                    | string  | yes       | -             |
-| password                    | string  | yes       | -             |
-| queue_name                  | string  | yes       | -             |
-| url                         | string  | no        | -             |
-| network_recovery_interval   | int     | no        | -             |
-| topology_recovery_enabled   | boolean | no        | -             |
-| automatic_recovery_enabled  | boolean | no        | -             |
-| connection_timeout          | int     | no        | -             |
-| common-options              |         | no        | -             |
+| name                       | type    | required  | default value |
+|----------------------------|---------|-----------|---------------|
+| host                       | string  | yes       | -             |
+| port                       | int     | yes       | -             |
+| virtual_host               | string  | yes       | -             |
+| username                   | string  | yes       | -             |
+| password                   | string  | yes       | -             |
+| queue_name                 | string  | yes       | -             |
+| url                        | string  | no        | -             |
+| network_recovery_interval  | int     | no        | -             |
+| topology_recovery_enabled  | boolean | no        | -             |
+| automatic_recovery_enabled | boolean | no        | -             |
+| connection_timeout         | int     | no        | -             |
+| rabbitmq.config            | map     | no        | -             |
+| common-options             |         | no        | -             |
 
 ### host [string]
 
@@ -77,6 +78,10 @@ if true, enables connection recovery
 
 connection TCP establishment timeout in milliseconds; zero for infinite
 
+### rabbitmq.config [map]
+
+In addition to the above parameters that must be specified by the RabbitMQ 
client, the user can also specify multiple non-mandatory parameters for the 
client, covering [all the parameters specified in the official RabbitMQ 
document](https://www.rabbitmq.com/configure.html).
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common 
Options](common-options.md) for details
@@ -94,6 +99,10 @@ sink {
           username = "guest"
           password = "guest"
           queue_name = "test1"
+          rabbitmq.config = {
+            requested-heartbeat = 10
+            connection-timeout = 10
+          }
       }
 }
 ```
@@ -103,3 +112,4 @@ sink {
 ### next version
 
 - Add Rabbitmq Sink Connector
+- [Improve] Change Connector Custom Config Prefix To Map 
[3719](https://github.com/apache/incubator-seatunnel/pull/3719)
diff --git a/docs/en/connector-v2/sink/StarRocks.md 
b/docs/en/connector-v2/sink/StarRocks.md
index 8cab5aabe..02243f722 100644
--- a/docs/en/connector-v2/sink/StarRocks.md
+++ b/docs/en/connector-v2/sink/StarRocks.md
@@ -11,21 +11,21 @@ The internal implementation of StarRocks sink connector is 
cached and imported b
 
 ## Options
 
-| name                        | type                         | required | 
default value   |
-|-----------------------------|------------------------------|----------|-----------------|
-| node_urls                   | list                         | yes      | -    
           |
-| username                    | string                       | yes      | -    
           |
-| password                    | string                       | yes      | -    
           |
-| database                    | string                       | yes      | -    
           |
-| table                       | string                       | yes       | -   
            |
-| labelPrefix                 | string                       | no       | -    
           |
-| batch_max_rows              | long                         | no       | 1024 
           |
-| batch_max_bytes             | int                          | no       | 5 * 
1024 * 1024 |
-| batch_interval_ms           | int                          | no       | -    
           |
-| max_retries                 | int                          | no       | -    
           |
-| retry_backoff_multiplier_ms | int                          | no       | -    
           |
-| max_retry_backoff_ms        | int                          | no       | -    
           |
-| sink.properties.*           | starrocks stream load config | no       | -    
           |
+| name                        | type   | required | default value   |
+|-----------------------------|--------|----------|-----------------|
+| node_urls                   | list   | yes      | -               |
+| username                    | string | yes      | -               |
+| password                    | string | yes      | -               |
+| database                    | string | yes      | -               |
+| table                       | string | yes      | -               |
+| labelPrefix                 | string | no       | -               |
+| batch_max_rows              | long   | no       | 1024            |
+| batch_max_bytes             | int    | no       | 5 * 1024 * 1024 |
+| batch_interval_ms           | int    | no       | -               |
+| max_retries                 | int    | no       | -               |
+| retry_backoff_multiplier_ms | int    | no       | -               |
+| max_retry_backoff_ms        | int    | no       | -               |
+| starrocks.config            | map    | no       | -               |
 
 ### node_urls [list]
 
@@ -75,11 +75,9 @@ Using as a multiplier for generating the next delay for 
backoff
 
 The amount of time to wait before attempting to retry a request to `StarRocks`
 
-### sink.properties.*  [starrocks stream load config]
+### starrocks.config  [map]
 
 The parameter of the stream load `data_desc`
-The way to specify the parameter is to add the prefix `sink.properties.` to 
the original stream load parameter name 
-For example, the way to specify `strip_outer_array` is: 
`sink.properties.strip_outer_array`
 
 #### Supported import data formats
 
@@ -89,37 +87,41 @@ The supported formats include CSV and JSON. Default value: 
CSV
 
 Use JSON format to import data
 
-```
+```hocon
 sink {
-    StarRocks {
-        nodeUrls = ["e2e_starRocksdb:8030"]
-        username = root
-        password = ""
-        database = "test"
-        table = "e2e_table_sink"
-        batch_max_rows = 10
-        sink.properties.format = "JSON"
-        sink.properties.strip_outer_array = true
+  StarRocks {
+    nodeUrls = ["e2e_starRocksdb:8030"]
+    username = root
+    password = ""
+    database = "test"
+    table = "e2e_table_sink"
+    batch_max_rows = 10
+    starrocks.config = {
+      format = "JSON"
+      strip_outer_array = true
     }
+  }
 }
 
 ```
 
 Use CSV format to import data
 
-```
+```hocon
 sink {
-    StarRocks {
-        nodeUrls = ["e2e_starRocksdb:8030"]
-        username = root
-        password = ""
-        database = "test"
-        table = "e2e_table_sink"
-        batch_max_rows = 10
-        sink.properties.format = "CSV"
-        sink.properties.column_separator = "\x01"
-        sink.properties.row_delimiter = "\x02"
+  StarRocks {
+    nodeUrls = ["e2e_starRocksdb:8030"]
+    username = root
+    password = ""
+    database = "test"
+    table = "e2e_table_sink"
+    batch_max_rows = 10
+    starrocks.config = {
+      format = "CSV"
+      column_separator = "\\x01"
+      row_delimiter = "\\x02"
     }
+  }
 }
 ```
 
@@ -127,4 +129,5 @@ sink {
 
 ### next version
 
-- Add StarRocks Sink Connector
\ No newline at end of file
+- Add StarRocks Sink Connector
+- [Improve] Change Connector Custom Config Prefix To Map 
[3719](https://github.com/apache/incubator-seatunnel/pull/3719)
\ No newline at end of file
diff --git a/docs/en/connector-v2/source/kafka.md 
b/docs/en/connector-v2/source/kafka.md
index 801240879..449bf8a90 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -18,13 +18,13 @@ Source connector for Apache Kafka.
 ## Options
 
 | name                                | type    | required | default value     
       |
-|-------------------------------------|---------| -------- 
|--------------------------|
+|-------------------------------------|---------|----------|--------------------------|
 | topic                               | String  | yes      | -                 
       |
 | bootstrap.servers                   | String  | yes      | -                 
       |
 | pattern                             | Boolean | no       | false             
       |
 | consumer.group                      | String  | no       | 
SeaTunnel-Consumer-Group |
 | commit_on_checkpoint                | Boolean | no       | true              
       |
-| kafka.*                             | String  | no       | -                 
       |
+| kafka.config                        | Map     | no       | -                 
       |
 | common-options                      | config  | no       | -                 
       |
 | schema                              |         | no       | -                 
       |
 | format                              | String  | no       | json              
       |
@@ -58,12 +58,10 @@ If true the consumer's offset will be periodically 
committed in the background.
 
 The interval for dynamically discovering topics and partitions.
 
-### kafka.* [string]
+### kafka.config [map]
 
 In addition to the above necessary parameters that must be specified by the 
`Kafka consumer` client, users can also specify multiple `consumer` client 
non-mandatory parameters, covering [all consumer parameters specified in the 
official Kafka 
document](https://kafka.apache.org/documentation.html#consumerconfigs).
 
-The way to specify parameters is to add the prefix `kafka.` to the original 
parameter name. For example, the way to specify `auto.offset.reset` is: 
`kafka.auto.offset.reset = latest` . If these non-essential parameters are not 
specified, they will use the default values given in the official Kafka 
documentation.
-
 ### common-options [config]
 
 Source plugin common parameters, please refer to [Source Common 
Options](common-options.md) for details.
@@ -88,7 +86,7 @@ The initial consumption pattern of consumers,there are 
several types:
 
 ## start_mode.timestamp
 
-The time required for consumption mode to be timestamp.
+The time required for consumption mode to be "timestamp".
 
 ##  start_mode.offsets
 
@@ -120,11 +118,15 @@ source {
       }
     }
     format = text
-    field_delimiter = "#“
+    field_delimiter = "#"
     topic = "topic_1,topic_2,topic_3"
     bootstrap.servers = "localhost:9092"
-    kafka.max.poll.records = 500
-    kafka.client.id = client_1
+    kafka.config = {
+      client.id = client_1
+      max.poll.records = 500
+      auto.offset.reset = "earliest"
+      enable.auto.commit = "false"
+    }
   }
   
 }
@@ -210,4 +212,5 @@ source {
 
 - [Improve] Support setting read starting offset or time at startup config 
([3157](https://github.com/apache/incubator-seatunnel/pull/3157))
 - [Improve] Support for dynamic discover topic & partition in streaming mode 
([3125](https://github.com/apache/incubator-seatunnel/pull/3125))
+- [Improve] Change Connector Custom Config Prefix To Map 
[3719](https://github.com/apache/incubator-seatunnel/pull/3719)
 - [Bug] Fixed the problem that parsing the offset format failed when the 
startup mode was 
offset([3810](https://github.com/apache/incubator-seatunnel/pull/3810))
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
index 02cc58d07..7d215dc95 100644
--- 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/TypesafeConfigUtils.java
@@ -38,7 +38,9 @@ public final class TypesafeConfigUtils {
      * @param source     config source
      * @param prefix     config prefix
      * @param keepPrefix true if keep prefix
+     * @deprecated use org.apache.seatunnel.api.configuration.Option interface 
instead
      */
+    @Deprecated
     public static Config extractSubConfig(Config source, String prefix, 
boolean keepPrefix) {
 
         // use LinkedHashMap to keep insertion order
@@ -84,17 +86,6 @@ public final class TypesafeConfigUtils {
         return hasConfig;
     }
 
-    public static Config extractSubConfigThrowable(Config source, String 
prefix, boolean keepPrefix) {
-
-        Config config = extractSubConfig(source, prefix, keepPrefix);
-
-        if (config.isEmpty()) {
-            throw new ConfigRuntimeException("config is empty");
-        }
-
-        return config;
-    }
-
     @SuppressWarnings("unchecked")
     public static <T> T getConfig(final Config config, final String configKey, 
@NonNull final T defaultValue) {
         if (defaultValue.getClass().equals(Long.class)) {
diff --git 
a/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
 
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
index 3b4964c57..6977bed12 100644
--- 
a/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
+++ 
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/config/TypesafeConfigUtilsTest.java
@@ -18,9 +18,7 @@
 package org.apache.seatunnel.common.config;
 
 import static 
org.apache.seatunnel.common.config.TypesafeConfigUtils.extractSubConfig;
-import static 
org.apache.seatunnel.common.config.TypesafeConfigUtils.extractSubConfigThrowable;
 import static 
org.apache.seatunnel.common.config.TypesafeConfigUtils.hasSubConfig;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
@@ -59,19 +57,6 @@ public class TypesafeConfigUtilsTest {
         Assertions.assertFalse(hasSubConfig);
     }
 
-    @Test
-    public void testExtractSubConfigThrowable() {
-        Config config = getConfig();
-
-        assertThrows(ConfigRuntimeException.class, () -> 
extractSubConfigThrowable(config, "test1.", false), "config is empty");
-
-        Config subConfig = extractSubConfigThrowable(config, "test.", false);
-        Map<String, String> configMap = new HashMap<>();
-        configMap.put("t0", "v0");
-        configMap.put("t1", "v1");
-        Assertions.assertEquals(ConfigFactory.parseMap(configMap), subConfig);
-    }
-
     public Config getConfig() {
         Map<String, Object> configMap = new HashMap<>();
         configMap.put("test.t0", "v0");
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
index 9f93eb36c..ecaef394a 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java
@@ -129,7 +129,7 @@ public class ClickhouseConfig {
     public static final Option<List<NodePassConfig>> NODE_PASS = 
Options.key("node_pass").listType(NodePassConfig.class)
         .noDefaultValue().withDescription("The password of Clickhouse server 
node");
 
-    public static final Option<Map<String, String>> CLICKHOUSE_PREFIX = 
Options.key("clickhouse").mapType()
+    public static final Option<Map<String, String>> CLICKHOUSE_CONFIG = 
Options.key("clickhouse.config").mapType()
         .defaultValue(Collections.emptyMap()).withDescription("Clickhouse 
custom config");
 
     public static final Option<String> FILE_FIELDS_DELIMITER = 
Options.key("file_fields_delimiter").stringType()
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
index 03d71f136..0e343a7d8 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/ClickhouseSinkFactory.java
@@ -19,7 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_PREFIX;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
@@ -48,7 +48,7 @@ public class ClickhouseSinkFactory implements 
TableSinkFactory {
     public OptionRule optionRule() {
         return OptionRule.builder()
             .required(HOST, DATABASE, TABLE)
-            .optional(CLICKHOUSE_PREFIX,
+            .optional(CLICKHOUSE_CONFIG,
                 BULK_SIZE,
                 SPLIT_MODE,
                 FIELDS,
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index 61149daad..13f0c49d1 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -19,7 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
 
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.BULK_SIZE;
-import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_PREFIX;
+import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.CLICKHOUSE_CONFIG;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.DATABASE;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.FIELDS;
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.HOST;
@@ -42,7 +42,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
@@ -115,10 +114,8 @@ public class ClickhouseSink implements 
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
         }
 
         Properties clickhouseProperties = new Properties();
-        if (TypesafeConfigUtils.hasSubConfig(config, CLICKHOUSE_PREFIX.key() + 
".")) {
-            TypesafeConfigUtils.extractSubConfig(config, 
CLICKHOUSE_PREFIX.key() + ".", false).entrySet().forEach(e -> {
-                clickhouseProperties.put(e.getKey(), 
String.valueOf(e.getValue().unwrapped()));
-            });
+        if (CheckConfigUtil.isValidParam(config, CLICKHOUSE_CONFIG.key())) {
+            config.getObject(CLICKHOUSE_CONFIG.key()).forEach((key, value) -> 
clickhouseProperties.put(key, String.valueOf(value.unwrapped())));
         }
 
         if (isCredential) {
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index f8a69901a..2962339b7 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 
 import java.util.List;
+import java.util.Map;
 
 public class Config {
 
@@ -36,22 +37,22 @@ public class Config {
      */
     public static final String DEFAULT_FIELD_DELIMITER = ",";
 
-    public static final Option<String> KAFKA_CONFIG_PREFIX = 
Options.key("kafka.")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("In addition to the above parameters that must be 
specified by the Kafka producer or consumer client, " +
-                    "the user can also specify multiple non-mandatory 
parameters for the producer or consumer client, " +
-                    "covering all the producer parameters specified in the 
official Kafka document.");
+    public static final Option<Map<String, String>> KAFKA_CONFIG = 
Options.key("kafka.config")
+        .mapType()
+        .noDefaultValue()
+        .withDescription("In addition to the above parameters that must be 
specified by the Kafka producer or consumer client, " +
+            "the user can also specify multiple non-mandatory parameters for 
the producer or consumer client, " +
+            "covering all the producer parameters specified in the official 
Kafka document.");
 
     public static final Option<String> TOPIC = Options.key("topic")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("Kafka topic name. If there are multiple topics, 
use , to split, for example: \"tpc1,tpc2\".");
+        .stringType()
+        .noDefaultValue()
+        .withDescription("Kafka topic name. If there are multiple topics, use 
, to split, for example: \"tpc1,tpc2\".");
 
     public static final Option<Boolean> PATTERN = Options.key("pattern")
-            .booleanType()
-            .defaultValue(false)
-            .withDescription("If pattern is set to true,the regular expression 
for a pattern of topic names to read from." +
+        .booleanType()
+        .defaultValue(false)
+        .withDescription("If pattern is set to true,the regular expression for 
a pattern of topic names to read from." +
                     " All topics in clients with names that match the 
specified regular expression will be subscribed by the consumer.");
 
     public static final Option<String> BOOTSTRAP_SERVERS = 
Options.key("bootstrap.servers")
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
index fef40310e..8f897aa20 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
@@ -35,7 +35,7 @@ public class KafkaSinkFactory implements TableSinkFactory {
     public OptionRule optionRule() {
         return OptionRule.builder()
                 .required(Config.TOPIC, Config.BOOTSTRAP_SERVERS)
-                .optional(Config.KAFKA_CONFIG_PREFIX, 
Config.ASSIGN_PARTITIONS, Config.TRANSACTION_PREFIX)
+                .optional(Config.KAFKA_CONFIG, Config.ASSIGN_PARTITIONS, 
Config.TRANSACTION_PREFIX)
                 .exclusive(Config.PARTITION, Config.PARTITION_KEY_FIELDS)
                 .build();
     }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 4cbdf10d5..94e4ec922 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -22,7 +22,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFA
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
-import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PARTITION_KEY_FIELDS;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TOPIC;
@@ -31,7 +31,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRAN
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.kafka.config.KafkaSemantics;
 import 
org.apache.seatunnel.connectors.seatunnel.kafka.exception.KafkaConnectorException;
@@ -136,11 +136,10 @@ public class KafkaSinkWriter implements 
SinkWriter<SeaTunnelRow, KafkaCommitInfo
     }
 
     private Properties getKafkaProperties(Config pluginConfig) {
-        Config kafkaConfig = 
TypesafeConfigUtils.extractSubConfig(pluginConfig, KAFKA_CONFIG_PREFIX.key(), 
false);
         Properties kafkaProperties = new Properties();
-        kafkaConfig.entrySet().forEach(entry -> {
-            kafkaProperties.put(entry.getKey(), entry.getValue().unwrapped());
-        });
+        if (CheckConfigUtil.isValidParam(pluginConfig, KAFKA_CONFIG.key())) {
+            pluginConfig.getObject(KAFKA_CONFIG.key()).forEach((key, value) -> 
kafkaProperties.put(key, value.unwrapped()));
+        }
         if (pluginConfig.hasPath(ASSIGN_PARTITIONS.key())) {
             kafkaProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, 
"org.apache.seatunnel.connectors.seatunnel.kafka.sink.MessageContentPartitioner");
         }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 52d7a067e..d840311c1 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -24,6 +24,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFA
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SCHEMA;
@@ -46,7 +47,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.common.exception.CommonErrorCode;
@@ -159,9 +159,9 @@ public class KafkaSource implements 
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
             this.discoveryIntervalMillis = 
config.getLong(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key());
         }
 
-        TypesafeConfigUtils.extractSubConfig(config, "kafka.", 
false).entrySet().forEach(e -> {
-            this.metadata.getProperties().put(e.getKey(), 
String.valueOf(e.getValue().unwrapped()));
-        });
+        if (CheckConfigUtil.isValidParam(config, KAFKA_CONFIG.key())) {
+            config.getObject(KAFKA_CONFIG.key()).forEach((key, value) -> 
this.metadata.getProperties().put(key, value.unwrapped()));
+        }
 
         setDeserialization(config);
     }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
index 8a4dce93f..3146b8e0f 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
@@ -38,7 +38,7 @@ public class KafkaSourceFactory implements TableSourceFactory 
{
         return OptionRule.builder()
             .required(Config.TOPIC, Config.BOOTSTRAP_SERVERS)
             .optional(Config.START_MODE, Config.PATTERN, 
Config.CONSUMER_GROUP, Config.COMMIT_ON_CHECKPOINT,
-                Config.KAFKA_CONFIG_PREFIX, Config.SCHEMA,
+                Config.KAFKA_CONFIG, Config.SCHEMA,
                 Config.FORMAT, Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS)
             .conditional(Config.START_MODE, StartMode.TIMESTAMP, 
Config.START_MODE_TIMESTAMP)
             .conditional(Config.START_MODE, StartMode.SPECIFIC_OFFSETS, 
Config.START_MODE_OFFSETS)
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
index ff3051b0c..cf24a4043 100644
--- 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/config/RabbitmqConfig.java
@@ -19,7 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -29,6 +29,7 @@ import lombok.Getter;
 import lombok.Setter;
 
 import java.io.Serializable;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -58,8 +59,6 @@ public class RabbitmqConfig implements Serializable {
 
     private boolean forE2ETesting = false;
 
-    public static final String RABBITMQ_SINK_CONFIG_PREFIX = 
"rabbitmq.properties.";
-
     private final Map<String, Object> sinkOptionProps = new HashMap<>();
 
     public static final Option<String> HOST = Options.key("host")
@@ -150,22 +149,26 @@ public class RabbitmqConfig implements Serializable {
             .withDescription("the routing key to publish the message to");
 
     public static final Option<String> EXCHANGE = Options.key("exchange")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("the exchange to publish the message to");
+        .stringType()
+        .noDefaultValue()
+        .withDescription("the exchange to publish the message to");
 
     public static final Option<Boolean> FOR_E2E_TESTING = 
Options.key("for_e2e_testing")
-            .booleanType()
-            .noDefaultValue()
-            .withDescription("use to recognize E2E mode");
+        .booleanType()
+        .noDefaultValue()
+        .withDescription("use to recognize E2E mode");
+
+    public static final Option<Map<String, String>> RABBITMQ_CONFIG = 
Options.key("rabbitmq.config").mapType()
+        .defaultValue(Collections.emptyMap()).withDescription("In addition to 
the above parameters that must be specified by the RabbitMQ client, the user 
can also specify multiple non-mandatory parameters for the client, " +
+            "covering [all the parameters specified in the official RabbitMQ 
document](https://www.rabbitmq.com/configure.html).");
 
     private void parseSinkOptionProperties(Config pluginConfig) {
-        Config sinkOptionConfig = 
TypesafeConfigUtils.extractSubConfig(pluginConfig,
-                RABBITMQ_SINK_CONFIG_PREFIX, false);
-        sinkOptionConfig.entrySet().forEach(entry -> {
-            final String configKey = entry.getKey().toLowerCase();
-            this.sinkOptionProps.put(configKey, entry.getValue().unwrapped());
-        });
+        if (CheckConfigUtil.isValidParam(pluginConfig, RABBITMQ_CONFIG.key())) 
{
+            pluginConfig.getObject(RABBITMQ_CONFIG.key()).forEach((key, value) 
-> {
+                final String configKey = key.toLowerCase();
+                this.sinkOptionProps.put(configKey, value.unwrapped());
+            });
+        }
     }
 
     public RabbitmqConfig(Config config) {
diff --git 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java
 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java
index 12ee197c2..449badcaa 100644
--- 
a/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-rabbitmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rabbitmq/sink/RabbitmqSinkFactory.java
@@ -25,6 +25,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.Rabbitmq
 import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PASSWORD;
 import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.PORT;
 import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.QUEUE_NAME;
+import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.RABBITMQ_CONFIG;
 import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.ROUTING_KEY;
 import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.TOPOLOGY_RECOVERY_ENABLED;
 import static 
org.apache.seatunnel.connectors.seatunnel.rabbitmq.config.RabbitmqConfig.URL;
@@ -62,7 +63,8 @@ public class RabbitmqSinkFactory implements 
TableSourceFactory {
                 NETWORK_RECOVERY_INTERVAL,
                 TOPOLOGY_RECOVERY_ENABLED,
                 AUTOMATIC_RECOVERY_ENABLED,
-                CONNECTION_TIMEOUT
+                CONNECTION_TIMEOUT,
+                RABBITMQ_CONFIG
             )
             .build();
     }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
index dc8f1bd60..16da9f983 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java
@@ -19,7 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.starrocks.config;
 
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -64,30 +64,30 @@ public class SinkConfig {
             .withDescription("The prefix of StarRocks stream load label");
 
     public static final Option<String> DATABASE = Options.key("database")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("The name of StarRocks database");
+        .stringType()
+        .noDefaultValue()
+        .withDescription("The name of StarRocks database");
 
     public static final Option<String> TABLE = Options.key("table")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("The name of StarRocks table");
+        .stringType()
+        .noDefaultValue()
+        .withDescription("The name of StarRocks table");
 
-    public static final Option<String> STARROCKS_SINK_CONFIG_PREFIX = 
Options.key("sink.properties.")
-            .stringType()
-            .noDefaultValue()
-            .withDescription("The parameter of the stream load data_desc. " +
-                    "The way to specify the parameter is to add the prefix 
`sink.properties.` to the original stream load parameter name ");
+    public static final Option<Map<String, String>> STARROCKS_CONFIG = 
Options.key("starrocks.config")
+        .mapType()
+        .noDefaultValue()
+        .withDescription("The parameter of the stream load data_desc. " +
+            "The way to specify the parameter is to add the original stream 
load parameter into map");
 
     public static final Option<Integer> BATCH_MAX_SIZE = 
Options.key("batch_max_rows")
-            .intType()
-            .defaultValue(DEFAULT_BATCH_MAX_SIZE)
-            .withDescription("For batch writing, when the number of buffers 
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the 
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
+        .intType()
+        .defaultValue(DEFAULT_BATCH_MAX_SIZE)
+        .withDescription("For batch writing, when the number of buffers 
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the 
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
 
     public static final Option<Long> BATCH_MAX_BYTES = 
Options.key("batch_max_bytes")
-            .longType()
-            .defaultValue(DEFAULT_BATCH_BYTES)
-            .withDescription("For batch writing, when the number of buffers 
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the 
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
+        .longType()
+        .defaultValue(DEFAULT_BATCH_BYTES)
+        .withDescription("For batch writing, when the number of buffers 
reaches the number of batch_max_rows or the byte size of batch_max_bytes or the 
time reaches batch_interval_ms, the data will be flushed into the StarRocks");
 
     public static final Option<Integer> BATCH_INTERVAL_MS = 
Options.key("batch_interval_ms")
             .intType()
@@ -182,11 +182,11 @@ public class SinkConfig {
     }
 
     private static void parseSinkStreamLoadProperties(Config pluginConfig, 
SinkConfig sinkConfig) {
-        Config starRocksConfig = 
TypesafeConfigUtils.extractSubConfig(pluginConfig,
-                STARROCKS_SINK_CONFIG_PREFIX.key(), false);
-        starRocksConfig.entrySet().forEach(entry -> {
-            final String configKey = entry.getKey().toLowerCase();
-            sinkConfig.streamLoadProps.put(configKey, 
entry.getValue().unwrapped());
-        });
+        if (CheckConfigUtil.isValidParam(pluginConfig, 
STARROCKS_CONFIG.key())) {
+            pluginConfig.getObject(STARROCKS_CONFIG.key()).forEach((key, 
value) -> {
+                final String configKey = key.toLowerCase();
+                sinkConfig.streamLoadProps.put(configKey, value.unwrapped());
+            });
+        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index 146cec807..579fa6ad5 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -37,7 +37,7 @@ public class StarRocksSinkFactory implements TableSinkFactory 
{
                 .required(SinkConfig.NODE_URLS, SinkConfig.USERNAME, 
SinkConfig.PASSWORD, SinkConfig.DATABASE, SinkConfig.TABLE)
                 .optional(SinkConfig.LABEL_PREFIX, SinkConfig.BATCH_MAX_SIZE, 
SinkConfig.BATCH_MAX_BYTES,
                         SinkConfig.BATCH_INTERVAL_MS, SinkConfig.MAX_RETRIES, 
SinkConfig.MAX_RETRY_BACKOFF_MS,
-                        SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS, 
SinkConfig.STARROCKS_SINK_CONFIG_PREFIX)
+                        SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS, 
SinkConfig.STARROCKS_CONFIG)
                 .build();
     }
 }

Reply via email to