This is an automated email from the ASF dual-hosted git repository.

shenghang pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e9b6071e4d [Feature][connector-kudu] Support regex and whole-database 
table_name for source (#10180)
e9b6071e4d is described below

commit e9b6071e4dbedd1ed22eb9596311078e4c8e0d31
Author: yzeng1618 <[email protected]>
AuthorDate: Thu Dec 11 22:46:57 2025 +0800

    [Feature][connector-kudu] Support regex and whole-database table_name for 
source (#10180)
    
    Co-authored-by: zengyi <[email protected]>
---
 docs/en/connector-v2/source/Kudu.md                | 73 ++++++++++++++++-
 docs/zh/connector-v2/source/Kudu.md                | 73 ++++++++++++++++-
 .../seatunnel/kudu/config/KuduSourceOptions.java   | 10 +++
 .../kudu/config/KuduSourceTableConfig.java         | 57 ++++++++++---
 .../seatunnel/kudu/source/KuduSourceFactory.java   | 18 +++--
 .../kudu/config/KuduSourceTableConfigTest.java     | 93 ++++++++++++++++++++++
 .../seatunnel/e2e/connector/kudu/KuduIT.java       | 45 +++++++++++
 .../resources/kudu_to_assert_with_all_tables.conf  | 49 ++++++++++++
 .../kudu_to_assert_with_pattern_tables.conf        | 47 +++++++++++
 .../kudu_to_assert_with_table_list_pattern.conf    | 58 ++++++++++++++
 10 files changed, 504 insertions(+), 19 deletions(-)

diff --git a/docs/en/connector-v2/source/Kudu.md 
b/docs/en/connector-v2/source/Kudu.md
index 22e1442d56..d6d0ffb510 100644
--- a/docs/en/connector-v2/source/Kudu.md
+++ b/docs/en/connector-v2/source/Kudu.md
@@ -57,9 +57,10 @@ The tested kudu version is 1.11.1.
 | kerberos_krb5conf                         | String | No       | -            
                                  | Kerberos krb5 conf. Note that all zeta 
nodes require have this file.                                                   
                                                                          |
 | scan_token_query_timeout                  | Long   | No       | 30000        
                                  | The timeout for connecting scan token. If 
not set, it will be the same as operationTimeout.                               
                                                                       |
 | scan_token_batch_size_bytes               | Int    | No       | 1024 * 1024  
                                  | Kudu scan bytes. The maximum number of 
bytes read at a time, the default is 1MB.                                       
                                                                          |
+| use_regex                                 | Bool   | No       | false        
                                  | Control regular expression matching for 
`table_name`. When set to `true`, the `table_name` will be treated as a regular 
expression pattern and can match multiple tables. When set to `false` or not 
specified, the `table_name` will be treated as an exact table name (no regex 
matching). |
 | filter                                    | String | No       | -            
                                  | Kudu scan filter expressions,example id > 
100 AND id < 200.                                                               
                                                                       |
 | schema                                    | Map    | No       | 1024 * 1024  
                                  | SeaTunnel Schema.                           
                                                                                
                                                                     |
-| table_list                                | Array  | No       | -            
                                  | The list of tables to be read. you can use 
this configuration instead of `table_path` example: ```table_list = [{ 
table_name = "kudu_source_table_1"},{ table_name = "kudu_source_table_2"}] ``` |
+| table_list                                | Array  | No       | -            
                                  | The list of tables to be read. you can use 
this configuration instead of `table_name`, for example: ```table_list = [{ 
table_name = "kudu_source_table_1"},{ table_name = "kudu_source_table_2"}] ```. 
You can also configure `use_regex = true` inside each entry to enable regex 
matching for `table_name`. |
 | common-options                            |        | No       | -            
                                  | Source plugin common parameters, please 
refer to [Source Common Options](../source-common-options.md) for details.      
                                                                         |
 
 ## Task Example
@@ -143,6 +144,76 @@ sink {
 }
 ```
 
+### Table Matching With Regex
+
+The Kudu Source supports using regular expressions on `table_name` to match 
multiple tables (including whole-database style synchronization, since Kudu 
tables are in a single logical database).
+
+#### Exact Table Name
+
+Use `table_name` to specify a single Kudu table with an exact name:
+
+```hocon
+source {
+  kudu {
+    kudu_masters = "kudu-master:7051"
+    table_name = "kudu_source_table_1"
+  }
+}
+```
+
+#### Regex Matching
+
+Use `table_name` as a regex pattern and enable `use_regex` to read multiple 
tables with one configuration:
+
+```hocon
+source {
+  kudu {
+    kudu_masters = "kudu-master:7051"
+    # Match tables like kudu_source_table_1, kudu_source_table_2, etc.
+    table_name = "kudu_source_table_\\d+"
+    use_regex = true
+  }
+}
+```
+
+You can also combine regex entries in `table_list`:
+
+```hocon
+source {
+  kudu {
+    kudu_masters = "kudu-master:7051"
+    table_list = [
+      {
+        table_name = "kudu_source_table_1"
+      },
+      {
+        table_name = "kudu_source_table_2"
+      },
+      {
+        # Regex matching - any table whose name starts with prefix_ and ends 
with digits
+        table_name = "prefix_\\d+"
+        use_regex = true
+      }
+    ]
+  }
+}
+```
+
+#### Whole-Database Matching
+
+You can also synchronize all tables in the current Kudu cluster (or all 
business tables in the current instance, if there are no system tables) by 
using a catch-all regex:
+
+```hocon
+source {
+  kudu {
+    kudu_masters = "kudu-master:7051"
+    # Match all tables in the current Kudu cluster
+    table_name = ".*"
+    use_regex = true
+  }
+}
+```
+
 ## Changelog
 
 <ChangeLog />
diff --git a/docs/zh/connector-v2/source/Kudu.md 
b/docs/zh/connector-v2/source/Kudu.md
index e68816a6c3..e93f8c9bf2 100644
--- a/docs/zh/connector-v2/source/Kudu.md
+++ b/docs/zh/connector-v2/source/Kudu.md
@@ -57,9 +57,10 @@ import ChangeLog from '../changelog/connector-kudu.md';
 | kerberos_krb5conf | String | 否 | - | Kerberos krb5 conf。注意所有 zeta 节点都需要有此文件。 
|
 | scan_token_query_timeout | Long | 否 | 30000 | 连接扫描令牌的超时时间。如果未设置,将与 
operationTimeout 相同。 |
 | scan_token_batch_size_bytes | Int | 否 | 1024 * 1024 | Kudu 
扫描字节数。一次读取的最大字节数,默认为 1MB。 |
+| use_regex | Bool | 否 | false | 控制 `table_name` 的正则匹配。当设置为 `true` 
时,`table_name` 将被视为正则表达式模式,可以匹配多张表。当设置为 `false` 或未指定时,`table_name` 
将被视为精确表名(不进行正则匹配)。 |
 | filter | String | 否 | - | Kudu 扫描过滤表达式,例如 id > 100 AND id < 200。 |
 | schema | Map | 否 | 1024 * 1024 | SeaTunnel Schema。 |
-| table_list | Array | 否 | - | 要读取的表列表。您可以使用此配置代替 
`table_path`,例如:```table_list = [{ table_name = "kudu_source_table_1"},{ 
table_name = "kudu_source_table_2"}] ``` |
+| table_list | Array | 否 | - | 要读取的表列表。您可以使用此配置代替 
`table_name`,例如:```table_list = [{ table_name = "kudu_source_table_1"},{ 
table_name = "kudu_source_table_2"}] ```。也可以在每个 entry 中配置 `use_regex = true` 来对 
`table_name` 启用正则匹配。 |
 | common-options | | 否 | - | 源插件通用参数,请参考 [源通用选项](../source-common-options.md) 
详见。 |
 
 ## 任务示例
@@ -143,6 +144,76 @@ sink {
 }
 ```
 
+### 使用正则表达式匹配表
+
+Kudu Source 支持在 `table_name` 上使用正则表达式来匹配多张表(由于 Kudu 逻辑上只有一个 
database,因此也可以用来实现“整库表”同步)。
+
+#### 精确表名
+
+使用 `table_name` 指定单个 Kudu 表的精确名称:
+
+```hocon
+source {
+  kudu {
+    kudu_masters = "kudu-master:7051"
+    table_name = "kudu_source_table_1"
+  }
+}
+```
+
+#### 正则匹配
+
+将 `table_name` 视为正则表达式,并开启 `use_regex`,即可用一条配置匹配多张表:
+
+```hocon
+source {
+  kudu {
+    kudu_masters = "kudu-master:7051"
+    # 匹配 kudu_source_table_1、kudu_source_table_2 等
+    table_name = "kudu_source_table_\\d+"
+    use_regex = true
+  }
+}
+```
+
+也可以在 `table_list` 中组合精确表和正则表:
+
+```hocon
+source {
+  kudu {
+    kudu_masters = "kudu-master:7051"
+    table_list = [
+      {
+        table_name = "kudu_source_table_1"
+      },
+      {
+        table_name = "kudu_source_table_2"
+      },
+      {
+        # 使用正则匹配,以 prefix_ 开头、以数字结尾的所有表
+        table_name = "prefix_\\d+"
+        use_regex = true
+      }
+    ]
+  }
+}
+```
+
+#### 整库匹配
+
+如果当前 Kudu 实例中只有业务表,或者你希望“一次性同步所有表”,可以使用一个全匹配的正则:
+
+```hocon
+source {
+  kudu {
+    kudu_masters = "kudu-master:7051"
+    # 匹配当前 Kudu 实例中的所有表
+    table_name = ".*"
+    use_regex = true
+  }
+}
+```
+
 ## 变更日志
 
 <ChangeLog />
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceOptions.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceOptions.java
index 2411c20f17..c296c4a00d 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceOptions.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceOptions.java
@@ -38,6 +38,16 @@ public class KuduSourceOptions extends KuduBaseOptions {
                     .withDescription(
                             "Kudu scan bytes. The maximum number of bytes read 
at a time, the default is 1MB");
 
+    public static final Option<Boolean> USE_REGEX =
+            Options.key("use_regex")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Control regular expression matching for 
table_name. When set to true, "
+                                    + "the table_name will be treated as a 
regular expression pattern. "
+                                    + "When set to false or not specified, the 
table_name will be treated "
+                                    + "as an exact table name (no regex 
matching).");
+
     public static final Option<String> FILTER =
             Options.key("filter")
                     .stringType()
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
index 742c33a46d..e97d83d386 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfig.java
@@ -17,8 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kudu.config;
 
-import org.apache.seatunnel.shade.com.google.common.collect.Lists;
-
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.options.ConnectorCommonOptions;
 import org.apache.seatunnel.api.table.catalog.Catalog;
@@ -32,8 +30,10 @@ import 
org.apache.seatunnel.connectors.seatunnel.kudu.catalog.KuduCatalogFactory
 import lombok.Getter;
 
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
 @Getter
@@ -61,15 +61,28 @@ public class KuduSourceTableConfig implements Serializable {
 
         try (KuduCatalog kuduCatalog = (KuduCatalog) optionalCatalog.get()) {
             kuduCatalog.open();
+
+            List<ReadonlyConfig> tableConfigs = new ArrayList<>();
             if 
(config.getOptional(ConnectorCommonOptions.TABLE_LIST).isPresent()) {
-                return config.get(ConnectorCommonOptions.TABLE_LIST).stream()
-                        .map(ReadonlyConfig::fromMap)
-                        .map(readonlyConfig -> 
parseKuduSourceConfig(readonlyConfig, kuduCatalog))
-                        .collect(Collectors.toList());
+                tableConfigs =
+                        config.get(ConnectorCommonOptions.TABLE_LIST).stream()
+                                .map(ReadonlyConfig::fromMap)
+                                .collect(Collectors.toList());
+            } else {
+                tableConfigs.add(config);
+            }
+
+            List<KuduSourceTableConfig> result = new ArrayList<>();
+            for (ReadonlyConfig tableConfig : tableConfigs) {
+                Boolean useRegex = 
tableConfig.get(KuduSourceOptions.USE_REGEX);
+                if (useRegex != null && useRegex) {
+                    result.addAll(parseKuduSourceConfigWithRegex(tableConfig, 
kuduCatalog));
+                } else {
+                    result.add(parseKuduSourceConfig(tableConfig, 
kuduCatalog));
+                }
             }
-            KuduSourceTableConfig kuduSourceTableConfig =
-                    parseKuduSourceConfig(config, kuduCatalog);
-            return Lists.newArrayList(kuduSourceTableConfig);
+
+            return result;
         }
     }
 
@@ -86,4 +99,30 @@ public class KuduSourceTableConfig implements Serializable {
         return new KuduSourceTableConfig(
                 tableName, catalogTable, config.get(KuduSourceOptions.FILTER));
     }
+
+    static List<KuduSourceTableConfig> parseKuduSourceConfigWithRegex(
+            ReadonlyConfig config, KuduCatalog kuduCatalog) {
+        String patternString = config.get(KuduBaseOptions.TABLE_NAME);
+        if (patternString == null) {
+            throw new IllegalArgumentException(
+                    "When `use_regex` is enabled, `table_name` must be 
configured");
+        }
+
+        Pattern pattern = Pattern.compile(patternString);
+
+        List<String> allTables =
+                
kuduCatalog.listTables(kuduCatalog.getDefaultDatabase()).stream()
+                        .filter(tableName -> 
pattern.matcher(tableName).matches())
+                        .collect(Collectors.toList());
+
+        List<KuduSourceTableConfig> result = new ArrayList<>();
+        for (String tableName : allTables) {
+            CatalogTable catalogTable = 
kuduCatalog.getTable(TablePath.of(tableName));
+            result.add(
+                    new KuduSourceTableConfig(
+                            tableName, catalogTable, 
config.get(KuduSourceOptions.FILTER)));
+        }
+
+        return result;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
index c6f6ba4c69..6db0b56867 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSourceFactory.java
@@ -46,14 +46,16 @@ public class KuduSourceFactory implements 
TableSourceFactory {
         return OptionRule.builder()
                 .required(KuduSourceOptions.MASTER)
                 .optional(KuduSourceOptions.SCHEMA)
-                .optional(KuduSourceOptions.WORKER_COUNT)
-                .optional(KuduSourceOptions.OPERATION_TIMEOUT)
-                .optional(KuduSourceOptions.ADMIN_OPERATION_TIMEOUT)
-                .optional(KuduSourceOptions.QUERY_TIMEOUT)
-                .optional(KuduSourceOptions.SCAN_BATCH_SIZE_BYTES)
-                .optional(KuduSourceOptions.FILTER)
-                .optional(KuduSourceOptions.ENABLE_KERBEROS)
-                .optional(KuduSourceOptions.KERBEROS_KRB5_CONF)
+                .optional(
+                        KuduSourceOptions.WORKER_COUNT,
+                        KuduSourceOptions.OPERATION_TIMEOUT,
+                        KuduSourceOptions.ADMIN_OPERATION_TIMEOUT,
+                        KuduSourceOptions.QUERY_TIMEOUT,
+                        KuduSourceOptions.SCAN_BATCH_SIZE_BYTES,
+                        KuduSourceOptions.FILTER,
+                        KuduSourceOptions.USE_REGEX,
+                        KuduSourceOptions.ENABLE_KERBEROS,
+                        KuduSourceOptions.KERBEROS_KRB5_CONF)
                 .exclusive(KuduSourceOptions.TABLE_NAME, 
ConnectorCommonOptions.TABLE_LIST)
                 .conditional(
                         KuduSourceOptions.ENABLE_KERBEROS,
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/test/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfigTest.java
 
b/seatunnel-connectors-v2/connector-kudu/src/test/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfigTest.java
new file mode 100644
index 0000000000..a03d5358ed
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/test/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSourceTableConfigTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
+import org.apache.seatunnel.connectors.seatunnel.kudu.catalog.KuduCatalog;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class KuduSourceTableConfigTest {
+
+    @Test
+    void testParseKuduSourceConfigWithRegex() {
+        Map<String, Object> configMap = new HashMap<>();
+        configMap.put(KuduBaseOptions.TABLE_NAME.key(), 
"kudu_source_table_\\d+");
+        configMap.put(KuduSourceOptions.FILTER.key(), "id > 10");
+        ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+
+        List<String> tables =
+                Arrays.asList("kudu_source_table_1", "kudu_source_table_2", 
"other_table");
+        KuduCatalog kuduCatalog = new FakeKuduCatalog(tables);
+
+        List<KuduSourceTableConfig> result =
+                KuduSourceTableConfig.parseKuduSourceConfigWithRegex(config, 
kuduCatalog);
+
+        Assertions.assertEquals(2, result.size());
+        Assertions.assertEquals("kudu_source_table_1", 
result.get(0).getTablePath().getTableName());
+        Assertions.assertEquals("kudu_source_table_2", 
result.get(1).getTablePath().getTableName());
+        Assertions.assertEquals("id > 10", result.get(0).getFilter());
+        Assertions.assertEquals("id > 10", result.get(1).getFilter());
+    }
+
+    private static class FakeKuduCatalog extends KuduCatalog {
+
+        private final List<String> tables;
+
+        FakeKuduCatalog(List<String> tables) {
+            super("test_catalog", createCommonConfig());
+            this.tables = tables;
+        }
+
+        @Override
+        public String getDefaultDatabase() {
+            return "default_database";
+        }
+
+        @Override
+        public List<String> listTables(String databaseName) {
+            return tables;
+        }
+
+        @Override
+        public CatalogTable getTable(TablePath tablePath) {
+            TableIdentifier identifier = TableIdentifier.of(name(), tablePath);
+            TableSchema schema = TableSchema.builder().build();
+            return CatalogTable.of(
+                    identifier, schema, Collections.emptyMap(), 
Collections.emptyList(), null);
+        }
+
+        private static CommonConfig createCommonConfig() {
+            Map<String, Object> map = new HashMap<>();
+            map.put(KuduBaseOptions.MASTER.key(), "dummy:7051");
+            ReadonlyConfig readonlyConfig = ReadonlyConfig.fromMap(map);
+            return new CommonConfig(readonlyConfig);
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
index 047e5bc07f..38c4ec352c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kudu/KuduIT.java
@@ -414,6 +414,51 @@ public class KuduIT extends TestSuiteBase implements 
TestResource {
         kuduClient.deleteTable("kudu_source_table_2");
     }
 
+    @TestTemplate
+    public void testKuduMultipleReadWithRegex(TestContainer container)
+            throws IOException, InterruptedException {
+        initializeKuduTable("kudu_source_table_1");
+        initializeKuduTable("kudu_source_table_2");
+        batchInsertData("kudu_source_table_1");
+        batchInsertData("kudu_source_table_2");
+        Container.ExecResult execResult =
+                
container.executeJob("/kudu_to_assert_with_pattern_tables.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        kuduClient.deleteTable("kudu_source_table_1");
+        kuduClient.deleteTable("kudu_source_table_2");
+    }
+
+    @TestTemplate
+    public void testKuduWholeDatabaseRead(TestContainer container)
+            throws IOException, InterruptedException {
+        initializeKuduTable("kudu_source_table_1");
+        initializeKuduTable("kudu_source_table_2");
+        batchInsertData("kudu_source_table_1");
+        batchInsertData("kudu_source_table_2");
+        Container.ExecResult execResult =
+                container.executeJob("/kudu_to_assert_with_all_tables.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        kuduClient.deleteTable("kudu_source_table_1");
+        kuduClient.deleteTable("kudu_source_table_2");
+    }
+
+    @TestTemplate
+    public void testKuduTableListWithRegex(TestContainer container)
+            throws IOException, InterruptedException {
+        initializeKuduTable("kudu_source_table_1");
+        initializeKuduTable("kudu_source_table_2");
+        initializeKuduTable("kudu_extra_1");
+        batchInsertData("kudu_source_table_1");
+        batchInsertData("kudu_source_table_2");
+        batchInsertData("kudu_extra_1");
+        Container.ExecResult execResult =
+                
container.executeJob("/kudu_to_assert_with_table_list_pattern.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+        kuduClient.deleteTable("kudu_source_table_1");
+        kuduClient.deleteTable("kudu_source_table_2");
+        kuduClient.deleteTable("kudu_extra_1");
+    }
+
     @DisabledOnContainer(
             value = {},
             type = {EngineType.FLINK},
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_all_tables.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_all_tables.conf
new file mode 100644
index 0000000000..7d1fb3a9cf
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_all_tables.conf
@@ -0,0 +1,49 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of batch processing with 
whole-database style table matching in Kudu source
+######
+
+env {
+  # You can set engine configuration here
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # Kudu source with full-database regex table_name
+  kudu{
+    kudu_masters = "kudu-master:7051"
+    # Match all user tables in the current Kudu cluster.
+    table_name = ".*"
+    use_regex = true
+    plugin_output = "kudu"
+  }
+}
+
+transform {
+}
+
+sink {
+  Assert {
+    rules {
+      # Only the two tables created in the test case should appear
+      table-names = ["kudu_source_table_1", "kudu_source_table_2"]
+    }
+  }
+}
+
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_pattern_tables.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_pattern_tables.conf
new file mode 100644
index 0000000000..ec68ad14f9
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_pattern_tables.conf
@@ -0,0 +1,47 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of batch processing with regex 
table matching in Kudu source
+######
+
+env {
+  # You can set engine configuration here
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # Kudu source with regex table_name
+  kudu{
+    kudu_masters = "kudu-master:7051"
+    table_name = "kudu_source_table_\\d+"
+    use_regex = true
+    plugin_output = "kudu"
+  }
+}
+
+transform {
+}
+
+sink {
+  Assert {
+    rules {
+      table-names = ["kudu_source_table_1", "kudu_source_table_2"]
+    }
+  }
+}
+
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_table_list_pattern.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_table_list_pattern.conf
new file mode 100644
index 0000000000..f79c065a63
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kudu-e2e/src/test/resources/kudu_to_assert_with_table_list_pattern.conf
@@ -0,0 +1,58 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of batch processing with mixed 
table_list and regex entries in Kudu source
+######
+
+env {
+  # You can set engine configuration here
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # Kudu source with table_list that combines exact and regex table_name 
entries
+  kudu{
+    kudu_masters = "kudu-master:7051"
+    table_list = [
+      {
+        table_name = "kudu_source_table_1"
+      },
+      {
+        table_name = "kudu_source_table_2"
+      },
+      {
+        # Regex entry - matches additional tables, e.g. kudu_extra_1
+        table_name = "kudu_extra_.*"
+        use_regex = true
+      }
+    ]
+    plugin_output = "kudu"
+  }
+}
+
+transform {
+}
+
+sink {
+  Assert {
+    rules {
+      table-names = ["kudu_source_table_1", "kudu_source_table_2", 
"kudu_extra_1"]
+    }
+  }
+}
+

Reply via email to