This is an automated email from the ASF dual-hosted git repository.
zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 26c4bb5300 [Feature][connector-hive] Support regex and whole-database
table_name for source (#10250)
26c4bb5300 is described below
commit 26c4bb53007a70b32c113948e9240a1ba6ed167a
Author: yzeng1618 <[email protected]>
AuthorDate: Thu Jan 1 08:32:34 2026 +0800
[Feature][connector-hive] Support regex and whole-database table_name for
source (#10250)
Co-authored-by: zengyi <[email protected]>
---
docs/en/connector-v2/source/Hive.md | 51 +++-
docs/zh/connector-v2/source/Hive.md | 53 ++++-
.../seatunnel/hive/config/HiveOptions.java | 8 +
.../seatunnel/hive/source/HiveSourceFactory.java | 2 +
.../hive/source/config/HiveSourceConfig.java | 7 +-
.../source/config/HiveSourceTableDiscovery.java | 129 ++++++++++
.../hive/source/config/HiveTableNamePattern.java | 90 +++++++
.../config/MultipleTableHiveSourceConfig.java | 91 +++++++-
.../reader/MultipleTableHiveSourceReader.java | 6 +-
.../config/HiveSourceTableDiscoveryTest.java | 260 +++++++++++++++++++++
.../seatunnel/e2e/connector/hive/HiveIT.java | 84 ++++++-
.../test/resources/regex/fake_to_hive_regex_1.conf | 59 +++++
.../test/resources/regex/fake_to_hive_regex_2.conf | 59 +++++
.../resources/regex/fake_to_hive_regex_ignore.conf | 59 +++++
.../regex/fake_to_hive_regex_no_match.conf | 59 +++++
.../resources/regex/fake_to_hive_regex_other.conf | 59 +++++
.../resources/regex/hive_regex_db_to_assert.conf | 88 +++++++
.../regex/hive_regex_db_to_assert_root.conf | 84 +++++++
.../regex/hive_regex_table_pattern_to_assert.conf | 83 +++++++
.../regex/hive_regex_table_prefix_to_assert.conf | 85 +++++++
20 files changed, 1403 insertions(+), 13 deletions(-)
diff --git a/docs/en/connector-v2/source/Hive.md
b/docs/en/connector-v2/source/Hive.md
index 7d99051e73..d5fb0da0a6 100644
--- a/docs/en/connector-v2/source/Hive.md
+++ b/docs/en/connector-v2/source/Hive.md
@@ -51,6 +51,7 @@ Read all the data in a split in a pollNext call. What splits
are read will be sa
| name | type | required | default value |
|-----------------------|--------|----------|----------------|
| table_name | string | yes | - |
+| use_regex | boolean| no | false |
| metastore_uri | string | yes | - |
| krb5_path | string | no | /etc/krb5.conf |
| kerberos_principal | string | no | - |
@@ -66,7 +67,19 @@ Read all the data in a split in a pollNext call. What splits
are read will be sa
### table_name [string]
-Target Hive table name eg: db1.table1
+Target Hive table name eg: `db1.table1`. When `use_regex = true`, this field
uses `databasePattern.tablePattern` (Hive has no schema) to match multiple
tables from Hive metastore.
+
+### use_regex [boolean]
+
+Whether to treat `table_name` as a regular expression pattern for matching
multiple tables (whole database / subset). This also works inside each entry of
`table_list` / `tables_configs`.
+
+Regex syntax notes:
+- The dot (`.`) is treated as the separator between database and table
patterns (Hive only supports `database.table`).
+- Only one unescaped dot is allowed (as the database/table separator). If you
need to use dot (`.`) in a regular expression (e.g. `.*`), you must escape it
as `\.` (in a HOCON string, write `\\.`).
+- Examples: `db0.\.*`, `db1.user_table_[0-9]+`, `db[1-2].(app|web)order_\.*`.
+- In SeaTunnel job config (HOCON string), backslashes need escaping. For
example, the regex `db0.\.*` should be configured as `db0.\\.*`.
+- `db0.\.*` matches all tables in database `db0` (whole database
synchronization).
+- `\.*.\.*` matches all tables in all databases (whole Hive synchronization).
### metastore_uri [string]
@@ -136,6 +149,7 @@ Source plugin common parameters, please refer to [Source
Common Options](../sour
### Example 2: Multiple tables
> Note: Hive is a structured data source and should be use 'table_list', and
> 'tables_configs' will be removed in the future.
+> You can also set `use_regex = true` in each table config to match multiple
tables.
```bash
@@ -171,7 +185,40 @@ Source plugin common parameters, please refer to [Source
Common Options](../sour
```
-### Example3 : Kerberos
+### Example 3: Regex matching (whole database / subset)
+
+```bash
+ Hive {
+ metastore_uri = "thrift://namenode001:9083"
+
+ # 1) Whole database: all tables in database `a`
+ table_name = "a.\\.*"
+ use_regex = true
+ }
+```
+
+```bash
+ Hive {
+ metastore_uri = "thrift://namenode001:9083"
+
+ # 2) Whole Hive: all tables in all databases
+ table_name = "\\.*.\\.*"
+ use_regex = true
+ }
+```
+
+```bash
+ Hive {
+ metastore_uri = "thrift://namenode001:9083"
+
+ # 3) Subset: tables matching `tmp_.*` in database `a`
+ # Note: escape the dot wildcard as `\.` (in HOCON string, write `\\.`)
because unescaped dots are treated as separators
+ table_name = "a.tmp_\\.*"
+ use_regex = true
+ }
+```
+
+### Example 4 : Kerberos
```bash
source {
diff --git a/docs/zh/connector-v2/source/Hive.md
b/docs/zh/connector-v2/source/Hive.md
index 396585b545..9183f622d8 100644
--- a/docs/zh/connector-v2/source/Hive.md
+++ b/docs/zh/connector-v2/source/Hive.md
@@ -51,6 +51,7 @@ import ChangeLog from '../changelog/connector-hive.md';
| 名称 | 类型 | 必需 | 默认值 |
|-----------------------|--------|------|---------|
| table_name | string | 是 | - |
+| use_regex | boolean| 否 | false |
| metastore_uri | string | 是 | - |
| krb5_path | string | 否 | /etc/krb5.conf |
| kerberos_principal | string | 否 | - |
@@ -66,7 +67,19 @@ import ChangeLog from '../changelog/connector-hive.md';
### table_name [string]
-目标 Hive 表名,例如:`db1.table1`
+目标 Hive 表名,例如:`db1.table1`。当 `use_regex = true` 时,该字段支持 `数据库正则.表正则`(Hive 没有
schema)来匹配 Hive 元存储中的多张表。
+
+### use_regex [boolean]
+
+是否将 `table_name` 视为正则表达式进行匹配。开启后,`table_name` 可用于整库/多表同步;同样也支持在 `table_list` /
`tables_configs` 的每个表配置里单独开启。
+
+语法说明:
+- 点号(`.`)被视为数据库与表之间的分隔符(Hive 仅支持 `database.table`)。
+- 只允许出现 1 个未转义的点号(`.`)(作为数据库/表分隔符)。如果需要在正则表达式中使用点号(`.`)(例如 `.*`),必须写成
`\.`(HOCON 字符串里需要写成 `\\.`)。
+- 例如:`db0.\.*`、`db1.user_table_[0-9]+`、`db[1-2].(app|web)order_\.*`。
+- 在 SeaTunnel 作业配置(HOCON 字符串)中,反斜杠需要再次转义。例如正则 `db0.\.*` 在配置中应写成 `db0.\\.*`。
+- `db0.\.*` 表示同步 `db0` 库下的所有表(整库同步)。
+- `\.*.\.*` 表示同步所有库下的所有表(整 Hive 同步)。
### metastore_uri [string]
@@ -134,6 +147,7 @@ Kerberos 认证的 keytab 文件路径
### 示例 2:多表
> 注意:Hive 是结构化数据源,应使用 `table_list`,`tables_configs` 将在未来移除。
+> 也支持在每个表配置中设置 `use_regex = true` 来按正则匹配多表。
```bash
Hive {
@@ -165,7 +179,40 @@ Kerberos 认证的 keytab 文件路径
}
```
-### 示例 3:Kerberos
+### 示例 3:正则匹配多表(整库/整库子集)
+
+```bash
+ Hive {
+ metastore_uri = "thrift://namenode001:9083"
+
+ # 1) 整库同步:同步 `a` 库下的所有表
+ table_name = "a.\\.*"
+ use_regex = true
+ }
+```
+
+```bash
+ Hive {
+ metastore_uri = "thrift://namenode001:9083"
+
+ # 2) 整 Hive 同步:同步所有库下的所有表
+ table_name = "\\.*.\\.*"
+ use_regex = true
+ }
+```
+
+```bash
+ Hive {
+ metastore_uri = "thrift://namenode001:9083"
+
+ # 3) 整库子集:同步 `a` 库下,表名匹配 `tmp_.*` 的表
+ # 注意:`.*` 里的点号(`.`)必须写成 `\.`(HOCON 字符串里写 `\\.`),因为未转义的点号会被当作分隔符
+ table_name = "a.tmp_\\.*"
+ use_regex = true
+ }
+```
+
+### 示例 4:Kerberos
```bash
source {
@@ -384,4 +431,4 @@ sink {
## 变更日志
-<ChangeLog />
\ No newline at end of file
+<ChangeLog />
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveOptions.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveOptions.java
index 63b90e4537..66027da7a6 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveOptions.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/config/HiveOptions.java
@@ -29,6 +29,14 @@ public class HiveOptions extends FileBaseSourceOptions {
.noDefaultValue()
.withDescription("Hive table name");
+ public static final Option<Boolean> USE_REGEX =
+ Options.key("use_regex")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "Use regular expression for `table_name` matching.
"
+ + "When set to true, the `table_name` will
be treated as a regex pattern and can match multiple tables.");
+
public static final Option<String> METASTORE_URI =
Options.key("metastore_uri")
.stringType()
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
index c37362500b..848a75e3f1 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSourceFactory.java
@@ -28,6 +28,7 @@ import
org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
import com.google.auto.service.AutoService;
@@ -52,6 +53,7 @@ public class HiveSourceFactory implements TableSourceFactory {
.optional(HiveConfig.TABLE_NAME)
.optional(HiveConfig.METASTORE_URI)
.optional(ConnectorCommonOptions.TABLE_CONFIGS,
ConnectorCommonOptions.TABLE_LIST)
+ .optional(HiveOptions.USE_REGEX)
.optional(FileBaseSourceOptions.READ_PARTITIONS)
.optional(FileBaseSourceOptions.READ_COLUMNS)
.optional(FileBaseSourceOptions.KERBEROS_PRINCIPAL)
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
index e878a23167..3fe619bfa9 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
@@ -40,6 +40,7 @@ import
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSou
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConstants;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
import org.apache.seatunnel.connectors.seatunnel.hive.storage.StorageFactory;
@@ -86,9 +87,13 @@ public class HiveSourceConfig implements Serializable {
try {
table = HiveTableUtils.getTableInfo(readonlyConfig);
} catch (Exception e) {
+ String tableName =
+
readonlyConfig.getOptional(HiveOptions.TABLE_NAME).orElse("<missing>");
throw new HiveConnectorException(
HiveConnectorErrorCode.GET_HIVE_TABLE_INFORMATION_FAILED,
- "Hive metastore not reachable while initializing
HiveSource. Please ensure metastore is available or provide explicit file
path-based config.",
+ "Failed to get Hive table information for table_name='"
+ + tableName
+ + "'. Please ensure metastore is reachable and the
table exists.",
e);
}
this.hadoopConf = parseHiveHadoopConfig(readonlyConfig, table);
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceTableDiscovery.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceTableDiscovery.java
new file mode 100644
index 0000000000..36da54fd66
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceTableDiscovery.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
+
+import lombok.experimental.UtilityClass;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.regex.PatternSyntaxException;
+
+@UtilityClass
+public class HiveSourceTableDiscovery {
+
+ public static boolean isEnabled(ReadonlyConfig config) {
+ return config != null && config.get(HiveOptions.USE_REGEX);
+ }
+
+ public static List<TablePath> discoverTablePaths(ReadonlyConfig config,
Catalog catalog) {
+ if (config == null || catalog == null) {
+ return Collections.emptyList();
+ }
+
+ if (!config.get(HiveOptions.USE_REGEX)) {
+ return Collections.emptyList();
+ }
+
+ String patternStr =
config.getOptional(HiveOptions.TABLE_NAME).orElse(null);
+ if (patternStr == null || patternStr.trim().isEmpty()) {
+ throw new IllegalArgumentException(
+ "When `use_regex` is enabled, `table_name` must be
configured");
+ }
+
+ HiveTableNamePattern tableNamePattern =
HiveTableNamePattern.parse(patternStr);
+ Pattern databasePattern =
compilePattern(tableNamePattern.getDatabasePattern(), patternStr);
+ Pattern tablePattern =
compilePattern(tableNamePattern.getTablePattern(), patternStr);
+
+ List<TablePath> tablePaths = new ArrayList<>();
+ String databasePatternStr = tableNamePattern.getDatabasePattern();
+ if (isExactDatabaseName(databasePatternStr)) {
+ String databaseName = databasePatternStr;
+ for (String tableName : catalog.listTables(databaseName)) {
+ if (tablePattern.matcher(tableName).matches()) {
+ tablePaths.add(TablePath.of(databaseName, tableName));
+ }
+ }
+ } else {
+ for (String databaseName : catalog.listDatabases()) {
+ if (!databasePattern.matcher(databaseName).matches()) {
+ continue;
+ }
+ List<String> tables = catalog.listTables(databaseName);
+ for (String tableName : tables) {
+ if (tablePattern.matcher(tableName).matches()) {
+ tablePaths.add(TablePath.of(databaseName, tableName));
+ }
+ }
+ }
+ }
+
+ tablePaths.sort(Comparator.comparing(TablePath::getFullName));
+ return tablePaths;
+ }
+
+ private static Pattern compilePattern(String pattern, String rawTableName)
{
+ try {
+ return Pattern.compile(pattern);
+ } catch (PatternSyntaxException exception) {
+ throw new IllegalArgumentException(
+ "Invalid regex pattern in `table_name`: "
+ + rawTableName
+ + ", resolved pattern: "
+ + pattern,
+ exception);
+ }
+ }
+
+ /**
+ * Treat databasePattern as an exact database name only when it doesn't
contain obvious regex
+ * meta characters.
+ */
+ private static boolean isExactDatabaseName(String databasePattern) {
+ if (databasePattern == null || databasePattern.isEmpty()) {
+ return false;
+ }
+ for (int i = 0; i < databasePattern.length(); i++) {
+ char ch = databasePattern.charAt(i);
+ if (ch == '\\'
+ || ch == '.'
+ || ch == '*'
+ || ch == '+'
+ || ch == '?'
+ || ch == '|'
+ || ch == '['
+ || ch == ']'
+ || ch == '('
+ || ch == ')'
+ || ch == '{'
+ || ch == '}'
+ || ch == '^'
+ || ch == '$') {
+ return false;
+ }
+ }
+ return true;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveTableNamePattern.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveTableNamePattern.java
new file mode 100644
index 0000000000..7da5b4bd85
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveTableNamePattern.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.config;
+
+import java.util.Optional;
+
+final class HiveTableNamePattern {
+
+ private static final String DOT_PLACEHOLDER = "__$DOT$__";
+
+ private final String databasePattern;
+ private final String tablePattern;
+
+ private HiveTableNamePattern(String databasePattern, String tablePattern) {
+ this.databasePattern = databasePattern;
+ this.tablePattern = tablePattern;
+ }
+
+ static HiveTableNamePattern parse(String rawPattern) {
+ if (rawPattern == null || rawPattern.trim().isEmpty()) {
+ throw new IllegalArgumentException(
+ "`table_name` must not be blank when `use_regex` is
enabled");
+ }
+
+ String processed = rawPattern.trim().replace("\\.", DOT_PLACEHOLDER);
+ Optional<Integer> separatorIndex = findTableSeparator(processed);
+ if (!separatorIndex.isPresent()) {
+ throw new IllegalArgumentException(
+ "Hive `table_name` must use `databasePattern.tablePattern`
when `use_regex` is enabled. "
+ + "Invalid `table_name`: "
+ + processed.replace(DOT_PLACEHOLDER, "."));
+ }
+
+ int index = separatorIndex.get();
+ String databasePattern = processed.substring(0, index).trim();
+ String tablePattern = processed.substring(index + 1).trim();
+
+ if (databasePattern.isEmpty() || tablePattern.isEmpty()) {
+ throw new IllegalArgumentException(
+ "Hive `table_name` must use `databasePattern.tablePattern`
when `use_regex` is enabled. "
+ + "Invalid `table_name`: "
+ + processed.replace(DOT_PLACEHOLDER, "."));
+ }
+
+ databasePattern = databasePattern.replace(DOT_PLACEHOLDER, ".");
+ tablePattern = tablePattern.replace(DOT_PLACEHOLDER, ".");
+ return new HiveTableNamePattern(databasePattern, tablePattern);
+ }
+
+ private static Optional<Integer> findTableSeparator(String
processedPattern) {
+ int firstDot = processedPattern.indexOf('.');
+ if (firstDot < 0) {
+ return Optional.empty();
+ }
+ int lastDot = processedPattern.lastIndexOf('.');
+ if (firstDot != lastDot) {
+ throw new IllegalArgumentException(
+ "Hive does not support schema in `table_name` when
`use_regex` is enabled. "
+ + "Please use `databasePattern.tablePattern` (only
one unescaped '.') and escape dots in regex as '\\.' "
+ + "(in HOCON string, write '\\\\.' instead). "
+ + "Examples: `db0.\\.*`, `db1.user_table_[0-9]+`,
`db[1-2].[app|web]order_\\.*`. "
+ + "Invalid `table_name`: "
+ + processedPattern.replace(DOT_PLACEHOLDER, "."));
+ }
+ return Optional.of(firstDot);
+ }
+
+ String getDatabasePattern() {
+ return databasePattern;
+ }
+
+ String getTablePattern() {
+ return tablePattern;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java
index def16ce787..42e3402a72 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/MultipleTableHiveSourceConfig.java
@@ -18,16 +18,24 @@
package org.apache.seatunnel.connectors.seatunnel.hive.source.config;
import org.apache.seatunnel.shade.com.google.common.collect.Lists;
+import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.options.ConnectorCommonOptions;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
+import
org.apache.seatunnel.connectors.seatunnel.hive.utils.HiveMetaStoreCatalog;
import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.stream.Collectors;
+@Slf4j
public class MultipleTableHiveSourceConfig implements Serializable {
private static final long serialVersionUID = 1L;
@@ -39,26 +47,101 @@ public class MultipleTableHiveSourceConfig implements
Serializable {
parseFromLocalFileSourceByTableList(readonlyConfig);
} else if
(readonlyConfig.getOptional(ConnectorCommonOptions.TABLE_CONFIGS).isPresent()) {
parseFromLocalFileSourceByTableConfigs(readonlyConfig);
+ } else if (HiveSourceTableDiscovery.isEnabled(readonlyConfig)) {
+ parseFromLocalFileSourceByDiscovery(readonlyConfig);
} else {
parseFromLocalFileSourceConfig(readonlyConfig);
}
}
private void parseFromLocalFileSourceByTableList(ReadonlyConfig
readonlyConfig) {
- this.hiveSourceConfigs =
+ List<ReadonlyConfig> expanded =
readonlyConfig.get(ConnectorCommonOptions.TABLE_LIST).stream()
.map(ReadonlyConfig::fromMap)
- .map(HiveSourceConfig::new)
+ .flatMap(tableConfig ->
expandTableConfigIfNeeded(tableConfig).stream())
.collect(Collectors.toList());
+ this.hiveSourceConfigs = buildHiveSourceConfigs(expanded);
}
+
// hive is structured, should use table_list
@Deprecated
private void parseFromLocalFileSourceByTableConfigs(ReadonlyConfig
readonlyConfig) {
- this.hiveSourceConfigs =
+ List<ReadonlyConfig> expanded =
readonlyConfig.get(ConnectorCommonOptions.TABLE_CONFIGS).stream()
.map(ReadonlyConfig::fromMap)
- .map(HiveSourceConfig::new)
+ .flatMap(tableConfig ->
expandTableConfigIfNeeded(tableConfig).stream())
.collect(Collectors.toList());
+ this.hiveSourceConfigs = buildHiveSourceConfigs(expanded);
+ }
+
+ private void parseFromLocalFileSourceByDiscovery(ReadonlyConfig
readonlyConfig) {
+ List<ReadonlyConfig> expanded =
expandTableConfigIfNeeded(readonlyConfig);
+ this.hiveSourceConfigs = buildHiveSourceConfigs(expanded);
+ }
+
+ private List<ReadonlyConfig> expandTableConfigIfNeeded(ReadonlyConfig
tableConfig) {
+ if (!HiveSourceTableDiscovery.isEnabled(tableConfig)) {
+ return Lists.newArrayList(tableConfig);
+ }
+
+ String tableNamePattern =
+
tableConfig.getOptional(HiveOptions.TABLE_NAME).orElse("<missing table_name>");
+ if (!tableConfig.getOptional(HiveOptions.METASTORE_URI).isPresent()
+ ||
StringUtils.isBlank(tableConfig.get(HiveOptions.METASTORE_URI))) {
+ throw new IllegalArgumentException(
+ "Hive metastore_uri is required for regex table discovery
(use_regex). table_name="
+ + tableNamePattern);
+ }
+
+ try (HiveMetaStoreCatalog catalog =
HiveMetaStoreCatalog.create(tableConfig)) {
+ catalog.open();
+ List<TablePath> tablePaths =
+ HiveSourceTableDiscovery.discoverTablePaths(tableConfig,
catalog);
+ if (tablePaths.isEmpty()) {
+ throw new IllegalArgumentException(
+ "No hive tables matched the regex pattern. Please
check `table_name` and `use_regex`. table_name="
+ + tableNamePattern);
+ }
+ logMatchedTables(tableNamePattern, tablePaths);
+ return tablePaths.stream()
+ .map(path -> overrideTableName(tableConfig,
path.getFullName()))
+ .collect(Collectors.toList());
+ }
+ }
+
+ private void logMatchedTables(String tableNamePattern, List<TablePath>
tablePaths) {
+ String matchedTables =
+
tablePaths.stream().map(TablePath::getFullName).collect(Collectors.joining(",
"));
+ log.info(
+ "Hive regex discovery matched {} table(s) for table_name='{}':
{}",
+ tablePaths.size(),
+ tableNamePattern,
+ matchedTables);
+ }
+
+ private ReadonlyConfig overrideTableName(ReadonlyConfig baseConfig, String
tableName) {
+ LinkedHashMap<String, Object> map = new
LinkedHashMap<>(baseConfig.getSourceMap());
+ map.put(HiveOptions.TABLE_NAME.key(), tableName);
+ return ReadonlyConfig.fromMap(map);
+ }
+
+ private List<HiveSourceConfig> buildHiveSourceConfigs(List<ReadonlyConfig>
tableConfigs) {
+ List<HiveSourceConfig> configs = new ArrayList<>(tableConfigs.size());
+ for (ReadonlyConfig tableConfig : tableConfigs) {
+ String tableName =
+
tableConfig.getOptional(HiveOptions.TABLE_NAME).orElse("<missing table_name>");
+ try {
+ configs.add(new HiveSourceConfig(tableConfig));
+ } catch (Exception exception) {
+ log.error(
+ "Failed to initialize Hive source config for
table_name='{}'. "
+ + "Please check table existence/permissions
and metastore connectivity.",
+ tableName,
+ exception);
+ throw exception;
+ }
+ }
+ return configs;
}
private void parseFromLocalFileSourceConfig(ReadonlyConfig
localFileSourceRootConfig) {
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/reader/MultipleTableHiveSourceReader.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/reader/MultipleTableHiveSourceReader.java
index bb5ba33ffd..cc29e0b325 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/reader/MultipleTableHiveSourceReader.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/reader/MultipleTableHiveSourceReader.java
@@ -80,10 +80,12 @@ public class MultipleTableHiveSourceReader implements
SourceReader<SeaTunnelRow,
+ "]");
}
try {
- readStrategy.read(split, output);
+ readStrategy.read(split.getFilePath(), split.getTableId(),
output);
} catch (Exception e) {
String errorMsg =
- String.format("Read data from this file [%s]
failed", split.splitId());
+ String.format(
+ "Read data failed, tableId=[%s],
file=[%s], splitId=[%s]",
+ split.getTableId(), split.getFilePath(),
split.splitId());
throw new FileConnectorException(FILE_READ_FAILED,
errorMsg, e);
}
} else if (noMoreSplit && sourceSplits.isEmpty()) {
diff --git
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceTableDiscoveryTest.java
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceTableDiscoveryTest.java
new file mode 100644
index 0000000000..ae362f9b8c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceTableDiscoveryTest.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.source.config;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
+import
org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
+import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class HiveSourceTableDiscoveryTest {
+
+ @Test
+ void testDiscoverByUseRegexWithTableName() {
+ FakeCatalog catalog = new FakeCatalog();
+ catalog.addTable("ods", "tmp_1");
+ catalog.addTable("ods", "tmp_2");
+ catalog.addTable("ods", "t1");
+ catalog.addTable("dw", "tmp_1");
+
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(HiveOptions.USE_REGEX.key(), true);
+ configMap.put(HiveOptions.TABLE_NAME.key(), "ods.tmp_\\d+");
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+
+ List<TablePath> result =
HiveSourceTableDiscovery.discoverTablePaths(config, catalog);
+ Assertions.assertEquals(2, result.size());
+ Assertions.assertTrue(result.contains(TablePath.of("ods.tmp_1")));
+ Assertions.assertTrue(result.contains(TablePath.of("ods.tmp_2")));
+ }
+
+ @Test
+ void testDiscoverWholeDatabaseByDatabasePattern() {
+ FakeCatalog catalog = new FakeCatalog();
+ catalog.addTable("ods", "t1");
+ catalog.addTable("ods", "t2");
+ catalog.addTable("dw", "t1");
+ catalog.addTable("ods_backup", "t3");
+
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(HiveOptions.USE_REGEX.key(), true);
+ configMap.put(HiveOptions.TABLE_NAME.key(), "ods.\\.*");
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+
+ List<TablePath> result =
HiveSourceTableDiscovery.discoverTablePaths(config, catalog);
+ Assertions.assertEquals(2, result.size());
+ Assertions.assertTrue(result.contains(TablePath.of("ods.t1")));
+ Assertions.assertTrue(result.contains(TablePath.of("ods.t2")));
+ }
+
+ @Test
+ void
testDiscoverWholeDatabaseByExactDatabaseNameDoesNotMatchPrefixDatabases() {
+ FakeCatalog catalog = new FakeCatalog();
+ catalog.addTable("a", "t1");
+ catalog.addTable("a", "t2");
+ catalog.addTable("abc", "t3");
+
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(HiveOptions.USE_REGEX.key(), true);
+ configMap.put(HiveOptions.TABLE_NAME.key(), "a.\\.*");
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+
+ List<TablePath> result =
HiveSourceTableDiscovery.discoverTablePaths(config, catalog);
+ Assertions.assertEquals(2, result.size());
+ Assertions.assertTrue(result.contains(TablePath.of("a.t1")));
+ Assertions.assertTrue(result.contains(TablePath.of("a.t2")));
+ }
+
+ @Test
+ void testDiscoverAllDatabasesAllTables() {
+ FakeCatalog catalog = new FakeCatalog();
+ catalog.addTable("a", "t1");
+ catalog.addTable("a", "t2");
+ catalog.addTable("b", "t3");
+
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(HiveOptions.USE_REGEX.key(), true);
+ configMap.put(HiveOptions.TABLE_NAME.key(), "\\.*.\\.*");
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+
+ List<TablePath> result =
HiveSourceTableDiscovery.discoverTablePaths(config, catalog);
+ Assertions.assertEquals(3, result.size());
+ Assertions.assertTrue(result.contains(TablePath.of("a.t1")));
+ Assertions.assertTrue(result.contains(TablePath.of("a.t2")));
+ Assertions.assertTrue(result.contains(TablePath.of("b.t3")));
+ }
+
+ @Test
+ void testUseRegexRequiresEscapingDotsInsideTablePattern() {
+ FakeCatalog catalog = new FakeCatalog();
+ catalog.addTable("ods", "tmp_1");
+ catalog.addTable("ods", "tmp_2");
+
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(HiveOptions.USE_REGEX.key(), true);
+ configMap.put(HiveOptions.TABLE_NAME.key(), "ods.tmp_.*");
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> HiveSourceTableDiscovery.discoverTablePaths(config,
catalog));
+ }
+
+ @Test
+ void testUseRegexAllowsEscapedDotsInsideTablePattern() {
+ FakeCatalog catalog = new FakeCatalog();
+ catalog.addTable("ods", "tmp_1");
+ catalog.addTable("ods", "tmp_2");
+ catalog.addTable("ods", "t1");
+
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(HiveOptions.USE_REGEX.key(), true);
+ configMap.put(HiveOptions.TABLE_NAME.key(), "ods.tmp_\\.*");
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+
+ List<TablePath> result =
HiveSourceTableDiscovery.discoverTablePaths(config, catalog);
+ Assertions.assertEquals(2, result.size());
+ Assertions.assertTrue(result.contains(TablePath.of("ods.tmp_1")));
+ Assertions.assertTrue(result.contains(TablePath.of("ods.tmp_2")));
+ }
+
+ @Test
+ void testUseRegexRequiresTableName() {
+ FakeCatalog catalog = new FakeCatalog();
+ catalog.addTable("ods", "t1");
+
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(HiveOptions.USE_REGEX.key(), true);
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> HiveSourceTableDiscovery.discoverTablePaths(config,
catalog));
+ }
+
+ @Test
+ void testUseRegexRequiresDatabaseAndTableSeparator() {
+ FakeCatalog catalog = new FakeCatalog();
+ catalog.addTable("ods", "tmp_1");
+
+ Map<String, Object> configMap = new HashMap<>();
+ configMap.put(HiveOptions.USE_REGEX.key(), true);
+ configMap.put(HiveOptions.TABLE_NAME.key(), "tmp_\\d+");
+ ReadonlyConfig config = ReadonlyConfig.fromMap(configMap);
+
+ Assertions.assertThrows(
+ IllegalArgumentException.class,
+ () -> HiveSourceTableDiscovery.discoverTablePaths(config,
catalog));
+ }
+
+ private static class FakeCatalog implements Catalog {
+
+ private final Map<String, List<String>> databaseTables = new
HashMap<>();
+
+ void addTable(String database, String table) {
+ databaseTables.computeIfAbsent(database, ignored -> new
ArrayList<>()).add(table);
+ }
+
+ @Override
+ public void open() throws CatalogException {}
+
+ @Override
+ public void close() throws CatalogException {}
+
+ @Override
+ public String name() {
+ return "fake_hive_catalog";
+ }
+
+ @Override
+ public String getDefaultDatabase() throws CatalogException {
+ return "default";
+ }
+
+ @Override
+ public boolean databaseExists(String databaseName) throws
CatalogException {
+ return databaseTables.containsKey(databaseName);
+ }
+
+ @Override
+ public List<String> listDatabases() throws CatalogException {
+ return new ArrayList<>(databaseTables.keySet());
+ }
+
+ @Override
+ public List<String> listTables(String databaseName)
+ throws CatalogException, DatabaseNotExistException {
+ return databaseTables.getOrDefault(databaseName,
Collections.emptyList());
+ }
+
+ @Override
+ public boolean tableExists(TablePath tablePath) throws
CatalogException {
+ if (tablePath == null || tablePath.getDatabaseName() == null) {
+ return false;
+ }
+ return databaseTables
+ .getOrDefault(tablePath.getDatabaseName(),
Collections.emptyList())
+ .contains(tablePath.getTableName());
+ }
+
+ @Override
+ public CatalogTable getTable(TablePath tablePath)
+ throws CatalogException, TableNotExistException {
+ throw new UnsupportedOperationException("not needed for discovery
test");
+ }
+
+ @Override
+ public void createTable(TablePath tablePath, CatalogTable table,
boolean ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ throw new UnsupportedOperationException("not needed for discovery
test");
+ }
+
+ @Override
+ public void dropTable(TablePath tablePath, boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ throw new UnsupportedOperationException("not needed for discovery
test");
+ }
+
+ @Override
+ public void createDatabase(TablePath tablePath, boolean ignoreIfExists)
+ throws DatabaseAlreadyExistException, CatalogException {
+ throw new UnsupportedOperationException("not needed for discovery
test");
+ }
+
+ @Override
+ public void dropDatabase(TablePath tablePath, boolean
ignoreIfNotExists)
+ throws DatabaseNotExistException, CatalogException {
+ throw new UnsupportedOperationException("not needed for discovery
test");
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
index 913b7e597d..e7419a146b 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
@@ -59,6 +59,43 @@ public class HiveIT extends TestSuiteBase implements
TestResource {
+ " name STRING,"
+ " score INT"
+ ")";
+ private static final String CREATE_REGEX_DB_A_SQL = "CREATE DATABASE IF
NOT EXISTS a";
+ private static final String CREATE_REGEX_DB_ABC_SQL = "CREATE DATABASE IF
NOT EXISTS abc";
+ private static final String CREATE_REGEX_TABLE_1_SQL =
+ "CREATE TABLE IF NOT EXISTS a.test_hive_regex_1"
+ + "("
+ + " pk_id BIGINT,"
+ + " name STRING,"
+ + " score INT"
+ + ")";
+ private static final String CREATE_REGEX_TABLE_2_SQL =
+ "CREATE TABLE IF NOT EXISTS a.test_hive_regex_2"
+ + "("
+ + " pk_id BIGINT,"
+ + " name STRING,"
+ + " score INT"
+ + ")";
+ private static final String CREATE_REGEX_TABLE_OTHER_SQL =
+ "CREATE TABLE IF NOT EXISTS a.test_hive_regex_other"
+ + "("
+ + " pk_id BIGINT,"
+ + " name STRING,"
+ + " score INT"
+ + ")";
+ private static final String CREATE_REGEX_TABLE_NO_MATCH_SQL =
+ "CREATE TABLE IF NOT EXISTS a.test_hive_no_match"
+ + "("
+ + " pk_id BIGINT,"
+ + " name STRING,"
+ + " score INT"
+ + ")";
+ private static final String CREATE_REGEX_TABLE_IGNORE_SQL =
+ "CREATE TABLE IF NOT EXISTS abc.test_hive_regex_ignore"
+ + "("
+ + " pk_id BIGINT,"
+ + " name STRING,"
+ + " score INT"
+ + ")";
private static final String HMS_HOST = "metastore";
private static final String HIVE_SERVER_HOST = "hiveserver2";
@@ -169,7 +206,11 @@ public class HiveIT extends TestSuiteBase implements
TestResource {
.withCreateContainerCmdModifier(cmd ->
cmd.withName(HIVE_SERVER_HOST))
.withNetworkAliases(HIVE_SERVER_HOST)
.withFileSystemBind("/tmp/data", "/opt/hive/data")
- .withEnv("SERVICE_OPTS",
"-Dhive.metastore.uris=thrift://metastore:9083")
+ .withEnv(
+ "SERVICE_OPTS",
+ "-Dhive.metastore.uris=thrift://metastore:9083"
+ + "
-Dhive.metastore.warehouse.dir=/opt/hive/data/warehouse"
+ + "
-Dmetastore.warehouse.dir=/opt/hive/data/warehouse")
.withEnv("IS_RESUME", "true")
.dependsOn(hmsContainer);
hiveServerContainer.setPortBindings(Collections.singletonList("10000:10000"));
@@ -208,6 +249,13 @@ public class HiveIT extends TestSuiteBase implements
TestResource {
// Avoid fragile HMS list calls; rely on default database existing in
test images
try (Statement statement = this.hiveConnection.createStatement()) {
statement.execute(CREATE_SQL);
+ statement.execute(CREATE_REGEX_DB_A_SQL);
+ statement.execute(CREATE_REGEX_DB_ABC_SQL);
+ statement.execute(CREATE_REGEX_TABLE_1_SQL);
+ statement.execute(CREATE_REGEX_TABLE_2_SQL);
+ statement.execute(CREATE_REGEX_TABLE_OTHER_SQL);
+ statement.execute(CREATE_REGEX_TABLE_NO_MATCH_SQL);
+ statement.execute(CREATE_REGEX_TABLE_IGNORE_SQL);
} catch (Exception exception) {
log.error(ExceptionUtils.getMessage(exception));
throw exception;
@@ -229,6 +277,40 @@ public class HiveIT extends TestSuiteBase implements
TestResource {
executeJob(container, "/fake_to_hive.conf", "/hive_to_assert.conf");
}
+ @TestTemplate
+ public void testHiveSourceWholeDatabaseUseRegex(TestContainer container)
throws Exception {
+ Container.ExecResult exec1 =
container.executeJob("/regex/fake_to_hive_regex_1.conf");
+ Assertions.assertEquals(0, exec1.getExitCode());
+ Container.ExecResult exec2 =
container.executeJob("/regex/fake_to_hive_regex_2.conf");
+ Assertions.assertEquals(0, exec2.getExitCode());
+ Container.ExecResult execOther =
+ container.executeJob("/regex/fake_to_hive_regex_other.conf");
+ Assertions.assertEquals(0, execOther.getExitCode());
+ Container.ExecResult execNoMatch =
+
container.executeJob("/regex/fake_to_hive_regex_no_match.conf");
+ Assertions.assertEquals(0, execNoMatch.getExitCode());
+ Container.ExecResult exec3 =
container.executeJob("/regex/fake_to_hive_regex_ignore.conf");
+ Assertions.assertEquals(0, exec3.getExitCode());
+
+ Container.ExecResult readResult =
+ container.executeJob("/regex/hive_regex_db_to_assert.conf");
+ Assertions.assertEquals(0, readResult.getExitCode());
+ // Verify root-level regex discovery also works
+ Container.ExecResult readResultRoot =
+
container.executeJob("/regex/hive_regex_db_to_assert_root.conf");
+ Assertions.assertEquals(0, readResultRoot.getExitCode());
+
+ // Verify regex pattern matching a subset of tables in the same
database
+ Container.ExecResult readResultPattern =
+
container.executeJob("/regex/hive_regex_table_pattern_to_assert.conf");
+ Assertions.assertEquals(0, readResultPattern.getExitCode());
+
+ // Verify regex matching with escaped dot wildcard (e.g.
"test_hive_regex_.*")
+ Container.ExecResult readResultPrefix =
+
container.executeJob("/regex/hive_regex_table_prefix_to_assert.conf");
+ Assertions.assertEquals(0, readResultPrefix.getExitCode());
+ }
+
@TestTemplate
@Disabled(
"[HDFS/COS/OSS/S3] is not available in CI, if you want to run this
test, please set up your own environment in the test case file,
hadoop_hive_conf_path_local and ip below}")
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/fake_to_hive_regex_1.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/fake_to_hive_regex_1.conf
new file mode 100644
index 0000000000..15b57f238c
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/fake_to_hive_regex_1.conf
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A1", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B1", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C1", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Hive {
+ table_name = "a.test_hive_regex_1"
+ metastore_uri = "thrift://metastore:9083"
+ }
+}
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/fake_to_hive_regex_2.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/fake_to_hive_regex_2.conf
new file mode 100644
index 0000000000..419f2a1b9e
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/fake_to_hive_regex_2.conf
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [4, "A2", 200]
+ },
+ {
+ kind = INSERT
+ fields = [5, "B2", 200]
+ },
+ {
+ kind = INSERT
+ fields = [6, "C2", 200]
+ }
+ ]
+ }
+}
+
+sink {
+ Hive {
+ table_name = "a.test_hive_regex_2"
+ metastore_uri = "thrift://metastore:9083"
+ }
+}
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/fake_to_hive_regex_ignore.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/fake_to_hive_regex_ignore.conf
new file mode 100644
index 0000000000..0ee3e7b531
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/fake_to_hive_regex_ignore.conf
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [7, "X", 1]
+ },
+ {
+ kind = INSERT
+ fields = [8, "Y", 1]
+ },
+ {
+ kind = INSERT
+ fields = [9, "Z", 1]
+ }
+ ]
+ }
+}
+
+sink {
+ Hive {
+ table_name = "abc.test_hive_regex_ignore"
+ metastore_uri = "thrift://metastore:9083"
+ }
+}
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/fake_to_hive_regex_no_match.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/fake_to_hive_regex_no_match.conf
new file mode 100644
index 0000000000..0a8886b51b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/fake_to_hive_regex_no_match.conf
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [20, "N1", 400]
+ },
+ {
+ kind = INSERT
+ fields = [21, "N2", 400]
+ },
+ {
+ kind = INSERT
+ fields = [22, "N3", 400]
+ }
+ ]
+ }
+}
+
+sink {
+ Hive {
+ table_name = "a.test_hive_no_match"
+ metastore_uri = "thrift://metastore:9083"
+ }
+}
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/fake_to_hive_regex_other.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/fake_to_hive_regex_other.conf
new file mode 100644
index 0000000000..eb9b2ad250
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/fake_to_hive_regex_other.conf
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [10, "O1", 300]
+ },
+ {
+ kind = INSERT
+ fields = [11, "O2", 300]
+ },
+ {
+ kind = INSERT
+ fields = [12, "O3", 300]
+ }
+ ]
+ }
+}
+
+sink {
+ Hive {
+ table_name = "a.test_hive_regex_other"
+ metastore_uri = "thrift://metastore:9083"
+ }
+}
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/hive_regex_db_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/hive_regex_db_to_assert.conf
new file mode 100644
index 0000000000..8f1d42ca4a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/hive_regex_db_to_assert.conf
@@ -0,0 +1,88 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Hive {
+ table_list = [
+ {
+ table_name = "a.\\.*"
+ use_regex = true
+ metastore_uri = "thrift://metastore:9083"
+ hive.hadoop.conf-path = "/tmp/hadoop"
+ }
+ ]
+ plugin_output = hive_source
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = hive_source
+ rules {
+ table-names = [
+ "a.test_hive_no_match",
+ "a.test_hive_regex_1",
+ "a.test_hive_regex_2",
+ "a.test_hive_regex_other"
+ ]
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 3
+ }
+ ],
+ field_rules = [
+ {
+ field_name = pk_id
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = score
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/hive_regex_db_to_assert_root.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/hive_regex_db_to_assert_root.conf
new file mode 100644
index 0000000000..6358e5e8b8
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/hive_regex_db_to_assert_root.conf
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Hive {
+ table_name = "a.\\.*"
+ use_regex = true
+ metastore_uri = "thrift://metastore:9083"
+ hive.hadoop.conf-path = "/tmp/hadoop"
+ plugin_output = hive_source
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = hive_source
+ rules {
+ table-names = [
+ "a.test_hive_no_match",
+ "a.test_hive_regex_1",
+ "a.test_hive_regex_2",
+ "a.test_hive_regex_other"
+ ]
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 3
+ }
+ ],
+ field_rules = [
+ {
+ field_name = pk_id
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = score
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/hive_regex_table_pattern_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/hive_regex_table_pattern_to_assert.conf
new file mode 100644
index 0000000000..328cb25f98
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/hive_regex_table_pattern_to_assert.conf
@@ -0,0 +1,83 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Hive {
+ # Regex table pattern: only match test_hive_regex_1 and test_hive_regex_2
+ table_name = "a.test_hive_regex_\\d+"
+ use_regex = true
+ metastore_uri = "thrift://metastore:9083"
+ hive.hadoop.conf-path = "/tmp/hadoop"
+ plugin_output = hive_source
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = hive_source
+ rules {
+ table-names = [
+ "a.test_hive_regex_1",
+ "a.test_hive_regex_2"
+ ]
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 3
+ }
+ ],
+ field_rules = [
+ {
+ field_name = pk_id
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = score
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/hive_regex_table_prefix_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/hive_regex_table_prefix_to_assert.conf
new file mode 100644
index 0000000000..cf7a628e12
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/regex/hive_regex_table_prefix_to_assert.conf
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Hive {
+ # Regex prefix matching: tables like test_hive_regex_1 / test_hive_regex_2
/ test_hive_regex_other
+ # Note: escape the dot wildcard as `\.` (in HOCON string, write `\\.`)
+ table_name = "a.test_hive_regex_\\.*"
+ use_regex = true
+ metastore_uri = "thrift://metastore:9083"
+ hive.hadoop.conf-path = "/tmp/hadoop"
+ plugin_output = hive_source
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = hive_source
+ rules {
+ table-names = [
+ "a.test_hive_regex_1",
+ "a.test_hive_regex_2",
+ "a.test_hive_regex_other"
+ ]
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 3
+ }
+ ],
+ field_rules = [
+ {
+ field_name = pk_id
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = score
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}