This is an automated email from the ASF dual-hosted git repository.
shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e9b6071e4d [Feature][connector-kudu] Support regex and whole-database
table_name for source (#10180)
e9b6071e4d is described below
commit e9b6071e4dbedd1ed22eb9596311078e4c8e0d31
Author: yzeng1618 <[email protected]>
AuthorDate: Thu Dec 11 22:46:57 2025 +0800
[Feature][connector-kudu] Support regex and whole-database table_name for
source (#10180)
Co-authored-by: zengyi <[email protected]>
---
docs/en/connector-v2/source/Kudu.md | 73 ++++++++++++++++-
docs/zh/connector-v2/source/Kudu.md | 73 ++++++++++++++++-
.../seatunnel/kudu/config/KuduSourceOptions.java | 10 +++
.../kudu/config/KuduSourceTableConfig.java | 57 ++++++++++---
.../seatunnel/kudu/source/KuduSourceFactory.java | 18 +++--
.../kudu/config/KuduSourceTableConfigTest.java | 93 ++++++++++++++++++++++
.../seatunnel/e2e/connector/kudu/KuduIT.java | 45 +++++++++++
.../resources/kudu_to_assert_with_all_tables.conf | 49 ++++++++++++
.../kudu_to_assert_with_pattern_tables.conf | 47 +++++++++++
.../kudu_to_assert_with_table_list_pattern.conf | 58 ++++++++++++++
10 files changed, 504 insertions(+), 19 deletions(-)
diff --git a/docs/en/connector-v2/source/Kudu.md
b/docs/en/connector-v2/source/Kudu.md
index 22e1442d56..d6d0ffb510 100644
--- a/docs/en/connector-v2/source/Kudu.md
+++ b/docs/en/connector-v2/source/Kudu.md
@@ -57,9 +57,10 @@ The tested kudu version is 1.11.1.
| kerberos_krb5conf | String | No | -
| Kerberos krb5 conf. Note that all zeta
nodes require have this file.
|
| scan_token_query_timeout | Long | No | 30000
| The timeout for connecting scan token. If
not set, it will be the same as operationTimeout.
|
| scan_token_batch_size_bytes | Int | No | 1024 * 1024
| Kudu scan bytes. The maximum number of
bytes read at a time, the default is 1MB.
|
+| use_regex | Bool | No | false
| Control regular expression matching for
`table_name`. When set to `true`, the `table_name` will be treated as a regular
expression pattern and can match multiple tables. When set to `false` or not
specified, the `table_name` will be treated as an exact table name (no regex
matching). |
| filter | String | No | -
| Kudu scan filter expressions,example id >
100 AND id < 200.
|
| schema | Map | No | 1024 * 1024
| SeaTunnel Schema.
|
-| table_list | Array | No | -
| The list of tables to be read. you can use
this configuration instead of `table_path` example: ```table_list = [{
table_name = "kudu_source_table_1"},{ table_name = "kudu_source_table_2"}] ``` |
+| table_list | Array | No | -
| The list of tables to be read. you can use
this configuration instead of `table_name`, for example: ```table_list = [{
table_name = "kudu_source_table_1"},{ table_name = "kudu_source_table_2"}] ```.
You can also configure `use_regex = true` inside each entry to enable regex
matching for `table_name`. |
| common-options | | No | -
| Source plugin common parameters, please
refer to [Source Common Options](../source-common-options.md) for details.
|
## Task Example
@@ -143,6 +144,76 @@ sink {
}
```
+### Table Matching With Regex
+
+The Kudu Source supports using regular expressions on `table_name` to match
multiple tables (including whole-database style synchronization, since Kudu
tables are in a single logical database).
+
+#### Exact Table Name
+
+Use `table_name` to specify a single Kudu table with an exact name:
+
+```hocon
+source {
+ kudu {
+ kudu_masters = "kudu-master:7051"
+ table_name = "kudu_source_table_1"
+ }
+}
+```
+
+#### Regex Matching
+
+Use `table_name` as a regex pattern and enable `use_regex` to read multiple
tables with one configuration:
+
+```hocon
+source {
+ kudu {
+ kudu_masters = "kudu-master:7051"
+ # Match tables like kudu_source_table_1, kudu_source_table_2, etc.
+ table_name = "kudu_source_table_\\d+"
+ use_regex = true
+ }
+}
+```
+
+You can also combine regex entries in `table_list`:
+
+```hocon
+source {
+ kudu {
+ kudu_masters = "kudu-master:7051"
+ table_list = [
+ {
+ table_name = "kudu_source_table_1"
+ },
+ {
+ table_name = "kudu_source_table_2"
+ },
+ {
+ # Regex matching - any table whose name starts with prefix_ and ends
with digits
+ table_name = "prefix_\\d+"
+ use_regex = true
+ }
+ ]
+ }
+}
+```
+
+#### Whole-Database Matching
+
+You can also synchronize all tables in the current Kudu cluster (or all
business tables in the current instance, if there are no system tables) by
using a catch-all regex:
+
+```hocon
+source {
+ kudu {
+ kudu_masters = "kudu-master:7051"
+ # Match all tables in the current Kudu cluster
+ table_name = ".*"
+ use_regex = true
+ }
+}
+```
+
## Changelog
<ChangeLog />
diff --git a/docs/zh/connector-v2/source/Kudu.md
b/docs/zh/connector-v2/source/Kudu.md
index e68816a6c3..e93f8c9bf2 100644
--- a/docs/zh/connector-v2/source/Kudu.md
+++ b/docs/zh/connector-v2/source/Kudu.md
@@ -57,9 +57,10 @@ import ChangeLog from '../changelog/connector-kudu.md';
| kerberos_krb5conf | String | 否 | - | Kerberos krb5 conf。注意所有 zeta 节点都需要有此文件。
|
| scan_token_query_timeout | Long | 否 | 30000 | 连接扫描令牌的超时时间。如果未设置,将与
operationTimeout 相同。 |
| scan_token_batch_size_bytes | Int | 否 | 1024 * 1024 | Kudu
扫描字节数。一次读取的最大字节数,默认为 1MB。 |
+| use_regex | Bool | 否 | false | 控制 `table_name` 的正则匹配。当设置为 `true`
时,`table_name` 将被视为正则表达式模式,可以匹配多张表。当设置为 `false` 或未指定时,`table_name`
将被视为精确表名(不进行正则匹配)。 |
| filter | String | 否 | - | Kudu 扫描过滤表达式,例如 id > 100 AND id < 200。 |
| schema | Map | 否 | 1024 * 1024 | SeaTunnel Schema。 |
-| table_list | Array | 否 | - | 要读取的表列表。您可以使用此配置代替
`table_path`,例如:```table_list = [{ table_name = "kudu_source_table_1"},{
table_name = "kudu_source_table_2"}] ``` |
+| table_list | Array | 否 | - | 要读取的表列表。您可以使用此配置代替
`table_name`,例如:```table_list = [{ table_name = "kudu_source_table_1"},{
table_name = "kudu_source_table_2"}] ```。也可以在每个 entry 中配置 `use_regex = true` 来对
`table_name` 启用正则匹配。 |
| common-options | | 否 | - | 源插件通用参数,请参考 [源通用选项](../source-common-options.md)
详见。 |
## 任务示例
@@ -143,6 +144,76 @@ sink {
}
```
+### 使用正则表达式匹配表
+
+Kudu Source 支持在 `table_name` 上使用正则表达式来匹配多张表(由于 Kudu 逻辑上只有一个
database,因此也可以用来实现“整库表”同步)。
+
+#### 精确表名
+
+使用 `table_name` 指定单个 Kudu 表的精确名称:
+
+```hocon
+source {
+ kudu {
+ kudu_masters = "kudu-master:7051"
+ table_name = "kudu_source_table_1"
+ }
+}
+```
+
+#### 正则匹配
+
+将 `table_name` 视为正则表达式,并开启 `use_regex`,即可用一条配置匹配多张表:
+
+```hocon
+source {
+ kudu {
+ kudu_masters = "kudu-master:7051"
+ # 匹配 kudu_source_table_1、kudu_source_table_2 等
+ table_name = "kudu_source_table_\\d+"
+ use_regex = true
+ }
+}
+```
+
+也可以在 `table_list` 中组合精确表和正则表:
+
+```hocon
+source {
+ kudu {
+ kudu_masters = "kudu-master:7051"
+ table_list = [
+ {
+ table_name = "kudu_source_table_1"
+ },
+ {
+ table_name = "kudu_source_table_2"
+ },
+ {
+ # 使用正则匹配,以 prefix_ 开头、以数字结尾的所有表
+ table_name = "prefix_\\d+"
+ use_regex = true
+ }
+ ]
+ }
+}
+```
+
+#### 整库匹配
+
+如果当前 Kudu 实例中只有业务表,或者你希望“一次性同步所有表”,可以使用一个全匹配的正则:
+
+```hocon
+source {
+ kudu {
+ kudu_masters = "kudu-master:7051"
+ # 匹配当前 Kudu 实例中的所有表
+ table_name = ".*"
+ use_regex = true
+ }
+}
+```
+
## 变更日志
<ChangeLog />
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceOptions.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceOptions.java
index 2411c20f17..c296c4a00d 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceOptions.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceOptions.java
@@ -38,6 +38,16 @@ public class KuduSourceOptions extends KuduBaseOptions {
.withDescription(
"Kudu scan bytes. The maximum number of bytes read
at a time, the default is 1MB");
+ public static final Option<Boolean> USE_REGEX =
+ Options.key("use_regex")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Control regular expression matching for
table_name. When set to true, "
+ + "the table_name will be treated as a
regular expression pattern. "
+ + "When set to false or not specified, the
table_name will be treated "
+ + "as an exact table name (no regex
matching).");
+
public static final Option<String> FILTER =
Options.key("filter")
.stringType()
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
index 742c33a46d..e97d83d386 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
@@ -17,8 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.kudu.config;
-import org.apache.seatunnel.shade.com.google.common.collect.Lists;
-
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.options.ConnectorCommonOptions;
import org.apache.seatunnel.api.table.catalog.Catalog;
@@ -32,8 +30,10 @@ import
org.apache.seatunnel.connectors.seatunnel.kudu.catalog.KuduCatalogFactory
import lombok.Getter;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
@Getter
@@ -61,15 +61,28 @@ public class KuduSourceTableConfig implements Serializable {
try (KuduCatalog kuduCatalog = (KuduCatalog) optionalCatalog.get()) {
kuduCatalog.open();
+
+ List<ReadonlyConfig> tableConfigs = new ArrayList<>();
if
(config.getOptional(ConnectorCommonOptions.TABLE_LIST).isPresent()) {
- return config.get(ConnectorCommonOptions.TABLE_LIST).stream()
- .map(ReadonlyConfig::fromMap)
- .map(readonlyConfig ->
parseKuduSourceConfig(readonlyConfig, kuduCatalog))
- .collect(Collectors.toList());
+ tableConfigs =
+ config.get(ConnectorCommonOptions.TABLE_LIST).stream()
+ .map(ReadonlyConfig::fromMap)
+ .collect(Collectors.toList());
+ } else {
+ tableConfigs.add(config);
+ }
+
+ List<KuduSourceTableConfig> result = new ArrayList<>();
+ for (ReadonlyConfig tableConfig : tableConfigs) {
+ Boolean useRegex =
tableConfig.get(KuduSourceOptions.USE_REGEX);
+ if (useRegex != null && useRegex) {
+ result.addAll(parseKuduSourceConfigWithRegex(tableConfig,
kuduCatalog));
+ } else {
+ result.add(parseKuduSourceConfig(tableConfig,
kuduCatalog));
+ }
}
- KuduSourceTableConfig kuduSourceTableConfig =
- parseKuduSourceConfig(config, kuduCatalog);
- return Lists.newArrayList(kuduSourceTableConfig);
+
+ return result;
}
}
@@ -86,4 +99,30 @@ public class KuduSourceTableConfig implements Serializable {
return new KuduSourceTableConfig(
tableName, catalogTable, config.get(KuduSourceOptions.FILTER));
}
+
+ static List<KuduSourceTableConfig> parseKuduSourceConfigWithRegex(
+ ReadonlyConfig config, KuduCatalog kuduCatalog) {
+ String patternString = config.get(KuduBaseOptions.TABLE_NAME);
+ if (patternString == null) {
+ throw new IllegalArgumentException(
+ "When `use_regex` is enabled, `table_name` must be
configured");
+ }
+
+ Pattern pattern = Pattern.compile(patternString);
+
+ List<String> allTables =
+
kuduCatalog.listTables(kuduCatalog.getDefaultDatabase()).stream()
+ .filter(tableName ->
pattern.matcher(tableName).matches())
+ .collect(Collectors.toList());
+
+ List<KuduSourceTableConfig> result = new ArrayList<>();
+ for (String tableName : allTables) {
+ CatalogTable catalogTable =
kuduCatalog.getTable(TablePath.of(tableName));
+ result.add(
+ new KuduSourceTableConfig(
+ tableName, catalogTable,
config.get(KuduSourceOptions.FILTER)));
+ }
+
+ return result;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
index c6f6ba4c69..6db0b56867 100644
---
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
@@ -46,14 +46,16 @@ public class KuduSourceFactory implements
TableSourceFactory {
return OptionRule.builder()
.required(KuduSourceOptions.MASTER)
.optional(KuduSourceOptions.SCHEMA)
- .optional(KuduSourceOptions.WORKER_COUNT)
- .optional(KuduSourceOptions.OPERATION_TIMEOUT)
- .optional(KuduSourceOptions.ADMIN_OPERATION_TIMEOUT)
- .optional(KuduSourceOptions.QUERY_TIMEOUT)
- .optional(KuduSourceOptions.SCAN_BATCH_SIZE_BYTES)
- .optional(KuduSourceOptions.FILTER)
- .optional(KuduSourceOptions.ENABLE_KERBEROS)
- .optional(KuduSourceOptions.KERBEROS_KRB5_CONF)
+ .optional(
+ KuduSourceOptions.WORKER_COUNT,
+ KuduSourceOptions.OPERATION_TIMEOUT,
+ KuduSourceOptions.ADMIN_OPERATION_TIMEOUT,
+ KuduSourceOptions.QUERY_TIMEOUT,
+ KuduSourceOptions.SCAN_BATCH_SIZE_BYTES,
+ KuduSourceOptions.FILTER,
+ KuduSourceOptions.USE_REGEX,
+ KuduSourceOptions.ENABLE_KERBEROS,
+ KuduSourceOptions.KERBEROS_KRB5_CONF)
.exclusive(KuduSourceOptions.TABLE_NAME,
ConnectorCommonOptions.TABLE_LIST)
.conditional(
KuduSourceOptions.ENABLE_KERBEROS,
diff --git
a/seatunnel-connectors-v2/connector-kudu/src/test/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfigTest.java
b/seatunnel-connectors-v2/connector-kudu/src/test/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfigTest.java
new file mode 100644
index 0000000000..a03d5358ed
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-kudu/src/test/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfigTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.connectors.seatunnel.kudu.catalog.KuduCatalog;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class KuduSourceTableConfigTest {
+
+ @Test
+ void testParseKuduSourceConfigWithRegex() {
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(KuduBaseOptions.TABLE_NAME.key(),
"kudu_source_table_\\d+");
+ configMap.put(KuduSourceOptions.FILTER.key(), "id > 10");
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+
+ List<String> tables =
+ Arrays.asList("kudu_source_table_1", "kudu_source_table_2",
"other_table");
+ KuduCatalog kuduCatalog = new FakeKuduCatalog(tables);
+
+ List<KuduSourceTableConfig> result =
+ KuduSourceTableConfig.parseKuduSourceConfigWithRegex(config,
kuduCatalog);
+
+ Assertions.assertEquals(2, result.size());
+ Assertions.assertEquals("kudu_source_table_1",
result.get(0).getTablePath().getTableName());
+ Assertions.assertEquals("kudu_source_table_2",
result.get(1).getTablePath().getTableName());
+ Assertions.assertEquals("id > 10", result.get(0).getFilter());
+ Assertions.assertEquals("id > 10", result.get(1).getFilter());
+ }
+
+ private static class FakeKuduCatalog extends KuduCatalog {
+
+ private final List<String> tables;
+
+ FakeKuduCatalog(List<String> tables) {
+ super("test_catalog", createCommonConfig());
+ this.tables = tables;
+ }
+
+ @Override
+ public String getDefaultDatabase() {
+ return "default_database";
+ }
+
+ @Override
+ public List<String> listTables(String databaseName) {
+ return tables;
+ }
+
+ @Override
+ public CatalogTable getTable(TablePath tablePath) {
+ TableIdentifier identifier = TableIdentifier.of(name(), tablePath);
+ TableSchema schema = TableSchema.builder().build();
+ return CatalogTable.of(
+ identifier, schema, Collections.emptyMap(),
Collections.emptyList(), null);
+ }
+
+ private static CommonConfig createCommonConfig() {
+ Map<String, Object> map = new HashMap<>();
+ map.put(KuduBaseOptions.MASTER.key(), "dummy:7051");
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(map);
+ return new CommonConfig(readonlyConfig);
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
index 047e5bc07f..38c4ec352c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
@@ -414,6 +414,51 @@ public class KuduIT extends TestSuiteBase implements
TestResource {
kuduClient.deleteTable("kudu_source_table_2");
}
+ @TestTemplate
+ public void testKuduMultipleReadWithRegex(TestContainer container)
+ throws IOException, InterruptedException {
+ initializeKuduTable("kudu_source_table_1");
+ initializeKuduTable("kudu_source_table_2");
+ batchInsertData("kudu_source_table_1");
+ batchInsertData("kudu_source_table_2");
+ Container.ExecResult execResult =
+
container.executeJob("/kudu_to_assert_with_pattern_tables.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ kuduClient.deleteTable("kudu_source_table_1");
+ kuduClient.deleteTable("kudu_source_table_2");
+ }
+
+ @TestTemplate
+ public void testKuduWholeDatabaseRead(TestContainer container)
+ throws IOException, InterruptedException {
+ initializeKuduTable("kudu_source_table_1");
+ initializeKuduTable("kudu_source_table_2");
+ batchInsertData("kudu_source_table_1");
+ batchInsertData("kudu_source_table_2");
+ Container.ExecResult execResult =
+ container.executeJob("/kudu_to_assert_with_all_tables.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ kuduClient.deleteTable("kudu_source_table_1");
+ kuduClient.deleteTable("kudu_source_table_2");
+ }
+
+ @TestTemplate
+ public void testKuduTableListWithRegex(TestContainer container)
+ throws IOException, InterruptedException {
+ initializeKuduTable("kudu_source_table_1");
+ initializeKuduTable("kudu_source_table_2");
+ initializeKuduTable("kudu_extra_1");
+ batchInsertData("kudu_source_table_1");
+ batchInsertData("kudu_source_table_2");
+ batchInsertData("kudu_extra_1");
+ Container.ExecResult execResult =
+
container.executeJob("/kudu_to_assert_with_table_list_pattern.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ kuduClient.deleteTable("kudu_source_table_1");
+ kuduClient.deleteTable("kudu_source_table_2");
+ kuduClient.deleteTable("kudu_extra_1");
+ }
+
@DisabledOnContainer(
value = {},
type = {EngineType.FLINK},
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_all_tables.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_all_tables.conf
new file mode 100644
index 0000000000..7d1fb3a9cf
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_all_tables.conf
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of batch processing with
whole-database style table matching in Kudu source
+######
+
+env {
+ # You can set engine configuration here
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # Kudu source with full-database regex table_name
+ kudu{
+ kudu_masters = "kudu-master:7051"
+ # Match all user tables in the current Kudu cluster.
+ table_name = ".*"
+ use_regex = true
+ plugin_output = "kudu"
+ }
+}
+
+transform {
+}
+
+sink {
+ Assert {
+ rules {
+ # Only the two tables created in the test case should appear
+ table-names = ["kudu_source_table_1", "kudu_source_table_2"]
+ }
+ }
+}
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_pattern_tables.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_pattern_tables.conf
new file mode 100644
index 0000000000..ec68ad14f9
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_pattern_tables.conf
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of batch processing with regex
table matching in Kudu source
+######
+
+env {
+ # You can set engine configuration here
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # Kudu source with regex table_name
+ kudu{
+ kudu_masters = "kudu-master:7051"
+ table_name = "kudu_source_table_\\d+"
+ use_regex = true
+ plugin_output = "kudu"
+ }
+}
+
+transform {
+}
+
+sink {
+ Assert {
+ rules {
+ table-names = ["kudu_source_table_1", "kudu_source_table_2"]
+ }
+ }
+}
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_table_list_pattern.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_table_list_pattern.conf
new file mode 100644
index 0000000000..f79c065a63
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_table_list_pattern.conf
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of batch processing with mixed
table_list and regex entries in Kudu source
+######
+
+env {
+ # You can set engine configuration here
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ # Kudu source with table_list that combines exact and regex table_name
entries
+ kudu{
+ kudu_masters = "kudu-master:7051"
+ table_list = [
+ {
+ table_name = "kudu_source_table_1"
+ },
+ {
+ table_name = "kudu_source_table_2"
+ },
+ {
+ # Regex entry - matches additional tables, e.g. kudu_extra_1
+ table_name = "kudu_extra_.*"
+ use_regex = true
+ }
+ ]
+ plugin_output = "kudu"
+ }
+}
+
+transform {
+}
+
+sink {
+ Assert {
+ rules {
+ table-names = ["kudu_source_table_1", "kudu_source_table_2",
"kudu_extra_1"]
+ }
+ }
+}
+