This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d56b1645b2 [Feature][Connector-V2][Hive] Support multiple Hive
metastore URIs for automatic failover (#10253)
d56b1645b2 is described below
commit d56b1645b244d2aeb91bb482165da1d86d79f4c4
Author: yzeng1618 <[email protected]>
AuthorDate: Thu Jan 8 19:17:23 2026 +0800
[Feature][Connector-V2][Hive] Support multiple Hive metastore URIs for
automatic failover (#10253)
Co-authored-by: zengyi <[email protected]>
---
docs/en/connector-v2/sink/Hive.md | 11 +-
docs/en/connector-v2/source/Hive.md | 13 ++-
docs/zh/connector-v2/sink/Hive.md | 11 +-
docs/zh/connector-v2/source/Hive.md | 13 ++-
.../seatunnel/hive/utils/HiveMetaStoreCatalog.java | 117 ++++++++++++++++++--
.../HiveMetaStoreCatalogMetastoreUrisTest.java | 119 +++++++++++++++++++++
.../seatunnel/e2e/connector/hive/HiveIT.java | 17 +++
.../fake_to_hive_metastore_uri_failover.conf | 59 ++++++++++
.../hive_to_assert_metastore_uri_failover.conf | 74 +++++++++++++
9 files changed, 417 insertions(+), 17 deletions(-)
diff --git a/docs/en/connector-v2/sink/Hive.md
b/docs/en/connector-v2/sink/Hive.md
index 4c8625e69b..820464d11c 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -60,7 +60,7 @@ Target Hive table name eg: db1.table1, and if the source is
multiple mode, you c
### metastore_uri [string]
-Hive metastore uri
+Hive metastore uri. Supports comma-separated multiple URIs for HA/failover
(whitespace is ignored). SeaTunnel passes this value to Hive
`hive.metastore.uris` and uses Hive `RetryingMetaStoreClient` (if available) to
retry/failover between URIs. This is client-side endpoint failover; make sure
your metastores share/replicate the same backend to keep metadata consistent.
### hdfs_site_path [string]
@@ -159,6 +159,15 @@ Sink plugin common parameters, please refer to [Sink
Common Options](../sink-com
```
+Metastore URI failover example (multiple URIs):
+
+```bash
+ Hive {
+ table_name = "default.seatunnel_orc"
+ metastore_uri = "thrift://metastore-1:9083,thrift://metastore-2:9083"
+ }
+```
+
### example 1
We have a source table like this:
diff --git a/docs/en/connector-v2/source/Hive.md
b/docs/en/connector-v2/source/Hive.md
index d5fb0da0a6..c51e82808f 100644
--- a/docs/en/connector-v2/source/Hive.md
+++ b/docs/en/connector-v2/source/Hive.md
@@ -83,7 +83,7 @@ Regex syntax notes:
### metastore_uri [string]
-Hive metastore uri
+Hive metastore uri. Supports comma-separated multiple URIs for HA/failover
(whitespace is ignored). SeaTunnel passes this value to Hive
`hive.metastore.uris` and uses Hive `RetryingMetaStoreClient` (if available) to
retry/failover between URIs. This is client-side endpoint failover; make sure
your metastores share/replicate the same backend to keep metadata consistent.
### hdfs_site_path [string]
@@ -147,7 +147,16 @@ Source plugin common parameters, please refer to [Source
Common Options](../sour
```
-### Example 2: Multiple tables
+### Example 2: Metastore URI failover
+
+```bash
+ Hive {
+ table_name = "default.seatunnel_orc"
+ metastore_uri = "thrift://metastore-1:9083,thrift://metastore-2:9083"
+ }
+```
+
+### Example 3: Multiple tables
> Note: Hive is a structured data source and should be use 'table_list', and
> 'tables_configs' will be removed in the future.
> You can also set `use_regex = true` in each table config to match multiple
> tables.
diff --git a/docs/zh/connector-v2/sink/Hive.md
b/docs/zh/connector-v2/sink/Hive.md
index 1ba385ba53..636c06fc2f 100644
--- a/docs/zh/connector-v2/sink/Hive.md
+++ b/docs/zh/connector-v2/sink/Hive.md
@@ -60,7 +60,7 @@ import ChangeLog from '../changelog/connector-hive.md';
### metastore_uri [string]
-Hive 元存储 URI
+Hive 元存储 URI。支持通过逗号分隔配置多个 URI 用于高可用/故障切换(会自动去除空格)。SeaTunnel 会将该值写入 Hive 的
`hive.metastore.uris`,并在运行时优先使用 Hive 的 `RetryingMetaStoreClient`
实现重试/切换。注意:该能力仅做客户端连接端点切换,元数据一致性需要由 metastore 部署保证。
### hdfs_site_path [string]
@@ -157,6 +157,15 @@ Sink 插件的通用参数,请参阅 [Sink Common Options](../sink-common-opti
}
```
+metastore_uri 故障切换示例(多 URI):
+
+```bash
+ Hive {
+ table_name = "default.seatunnel_orc"
+ metastore_uri = "thrift://metastore-1:9083,thrift://metastore-2:9083"
+ }
+```
+
### 示例 1
我们有一个源表如下:
diff --git a/docs/zh/connector-v2/source/Hive.md
b/docs/zh/connector-v2/source/Hive.md
index 9183f622d8..7fc8aaa340 100644
--- a/docs/zh/connector-v2/source/Hive.md
+++ b/docs/zh/connector-v2/source/Hive.md
@@ -83,7 +83,7 @@ import ChangeLog from '../changelog/connector-hive.md';
### metastore_uri [string]
-Hive 元存储 URI
+Hive 元存储 URI。支持通过逗号分隔配置多个 URI 用于高可用/故障切换(会自动去除空格)。SeaTunnel 会将该值写入 Hive 的
`hive.metastore.uris`,并在运行时优先使用 Hive 的 `RetryingMetaStoreClient`
实现重试/切换。注意:该能力仅做客户端连接端点切换,元数据一致性需要由 metastore 部署保证。
### hdfs_site_path [string]
@@ -145,7 +145,16 @@ Kerberos 认证的 keytab 文件路径
}
```
-### 示例 2:多表
+### 示例 2:metastore_uri 故障切换(多 URI)
+
+```bash
+ Hive {
+ table_name = "default.seatunnel_orc"
+ metastore_uri = "thrift://metastore-1:9083,thrift://metastore-2:9083"
+ }
+```
+
+### 示例 3:多表
> 注意:Hive 是结构化数据源,应使用 `table_list`,`tables_configs` 将在未来移除。
> 也支持在每个表配置中设置 `use_regex = true` 来按正则匹配多表。
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreCatalog.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreCatalog.java
index ab3cf36369..11a6d301d3 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreCatalog.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreCatalog.java
@@ -37,6 +37,7 @@ import
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorExc
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
import org.apache.hadoop.hive.metastore.api.Database;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -50,6 +51,7 @@ import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
+import java.lang.reflect.Method;
import java.net.MalformedURLException;
import java.net.URI;
import java.nio.file.Files;
@@ -69,6 +71,10 @@ import java.util.Objects;
@Slf4j
public class HiveMetaStoreCatalog implements Catalog, Closeable, Serializable {
private static final List<String> HADOOP_CONF_FILES =
ImmutableList.of("hive-site.xml");
+ private static final String RETRYING_METASTORE_CLIENT_CLASS_NAME =
+ "org.apache.hadoop.hive.metastore.RetryingMetaStoreClient";
+ private static final String
RETRYING_METASTORE_CLIENT_NO_COMPATIBLE_GET_PROXY_MESSAGE =
+ "RetryingMetaStoreClient found but no compatible getProxy method,
falling back to HiveMetaStoreClient";
private final String metastoreUri;
private final String hadoopConfDir;
@@ -81,7 +87,7 @@ public class HiveMetaStoreCatalog implements Catalog,
Closeable, Serializable {
private final String keytabPath;
private final String remoteUser;
- private transient HiveMetaStoreClient hiveClient;
+ private transient IMetaStoreClient hiveClient;
private transient HiveConf hiveConf;
private transient UserGroupInformation userGroupInformation;
@@ -105,7 +111,7 @@ public class HiveMetaStoreCatalog implements Catalog,
Closeable, Serializable {
return create(config);
}
- private synchronized HiveMetaStoreClient getClient() {
+ private synchronized IMetaStoreClient getClient() {
if (hiveClient == null) {
hiveClient = initializeClient();
}
@@ -115,7 +121,7 @@ public class HiveMetaStoreCatalog implements Catalog,
Closeable, Serializable {
return hiveClient;
}
- private HiveMetaStoreClient initializeClient() {
+ private IMetaStoreClient initializeClient() {
this.hiveConf = buildHiveConf();
try {
if (kerberosEnabled) {
@@ -124,7 +130,7 @@ public class HiveMetaStoreCatalog implements Catalog,
Closeable, Serializable {
if (remoteUserEnabled) {
return loginWithRemoteUser(hiveConf);
}
- return new HiveMetaStoreClient(hiveConf);
+ return createClient(hiveConf);
} catch (Exception e) {
String errMsg =
String.format(
@@ -135,6 +141,63 @@ public class HiveMetaStoreCatalog implements Catalog,
Closeable, Serializable {
}
}
+ private IMetaStoreClient createClient(HiveConf hiveConf) throws Exception {
+ IMetaStoreClient retryingClient = tryCreateRetryingClient(hiveConf);
+ if (retryingClient != null) {
+ return retryingClient;
+ }
+ return new HiveMetaStoreClient(hiveConf);
+ }
+
+ private IMetaStoreClient tryCreateRetryingClient(HiveConf hiveConf) {
+ try {
+ Class<?> clazz =
Class.forName(RETRYING_METASTORE_CLIENT_CLASS_NAME);
+ Method getProxyMethod = getProxyMethod(clazz);
+ if (getProxyMethod == null) {
+
log.warn(RETRYING_METASTORE_CLIENT_NO_COMPATIBLE_GET_PROXY_MESSAGE);
+ return null;
+ }
+
+ Object proxy = getProxyMethod.invoke(null, hiveConf, true);
+ if (proxy instanceof IMetaStoreClient) {
+ log.info(
+ "Using RetryingMetaStoreClient for Hive metastore
connection [uris={}]",
+ hiveConf.get("hive.metastore.uris"));
+ return (IMetaStoreClient) proxy;
+ }
+
log.warn(RETRYING_METASTORE_CLIENT_NO_COMPATIBLE_GET_PROXY_MESSAGE);
+ return null;
+ } catch (ClassNotFoundException e) {
+ log.debug("RetryingMetaStoreClient not found, falling back to
HiveMetaStoreClient", e);
+ return null;
+ } catch (Exception e) {
+ log.warn(
+ "Failed to create RetryingMetaStoreClient proxy, falling
back to HiveMetaStoreClient",
+ e);
+ return null;
+ }
+ }
+
+ private static Method getProxyMethod(Class<?> clazz) {
+ // Hive 2.x: getProxy(HiveConf, boolean)
+ // Hive 3.x: getProxy(Configuration, boolean)
+ Method method = null;
+ try {
+ method = clazz.getDeclaredMethod("getProxy", HiveConf.class,
boolean.class);
+ } catch (NoSuchMethodException ignored) {
+ }
+ if (method == null) {
+ try {
+ method = clazz.getDeclaredMethod("getProxy",
Configuration.class, boolean.class);
+ } catch (NoSuchMethodException ignored) {
+ }
+ }
+ if (method != null) {
+ method.setAccessible(true);
+ }
+ return method;
+ }
+
/**
* Try to execute SQL via HiveServer2 JDBC. Returns true if successful,
false if HiveServer2 is
* not available or execution failed.
@@ -196,9 +259,13 @@ public class HiveMetaStoreCatalog implements Catalog,
Closeable, Serializable {
// Try to derive from metastore URI
// metastore URI format: thrift://host:9083
// HiveServer2 JDBC URL format: jdbc:hive2://host:10000/default
+ if (StringUtils.isBlank(metastoreUri)) {
+ return null;
+ }
try {
- if (metastoreUri != null && metastoreUri.startsWith("thrift://")) {
- URI uri = new URI(metastoreUri);
+ String firstUri = getFirstMetastoreUri(metastoreUri);
+ if (firstUri.startsWith("thrift://")) {
+ URI uri = new URI(firstUri);
String host = uri.getHost();
if (host != null) {
return String.format("jdbc:hive2://%s:10000/default",
host);
@@ -213,7 +280,12 @@ public class HiveMetaStoreCatalog implements Catalog,
Closeable, Serializable {
private HiveConf buildHiveConf() {
HiveConf hiveConf = new HiveConf();
- hiveConf.set("hive.metastore.uris", metastoreUri);
+ if (StringUtils.isNotBlank(metastoreUri)) {
+ String normalizedMetastoreUris =
normalizeMetastoreUris(metastoreUri);
+ if (StringUtils.isNotBlank(normalizedMetastoreUris)) {
+ hiveConf.set("hive.metastore.uris", normalizedMetastoreUris);
+ }
+ }
hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI,
false);
hiveConf.setBoolean("hive.metastore.client.capability.check", false);
hiveConf.setBoolean("hive.metastore.client.filter.enabled", false);
@@ -244,7 +316,7 @@ public class HiveMetaStoreCatalog implements Catalog,
Closeable, Serializable {
return hiveConf;
}
- private HiveMetaStoreClient loginWithKerberos(HiveConf hiveConf) throws
Exception {
+ private IMetaStoreClient loginWithKerberos(HiveConf hiveConf) throws
Exception {
Configuration authConf = new Configuration();
authConf.set("hadoop.security.authentication", "kerberos");
return HadoopLoginFactory.loginWithKerberos(
@@ -254,13 +326,36 @@ public class HiveMetaStoreCatalog implements Catalog,
Closeable, Serializable {
keytabPath,
(conf, ugi) -> {
this.userGroupInformation = ugi;
- return new HiveMetaStoreClient(hiveConf);
+ return createClient(hiveConf);
});
}
- private HiveMetaStoreClient loginWithRemoteUser(HiveConf hiveConf) throws
Exception {
+ private IMetaStoreClient loginWithRemoteUser(HiveConf hiveConf) throws
Exception {
return HadoopLoginFactory.loginWithRemoteUser(
- new Configuration(), remoteUser, (conf, ugi) -> new
HiveMetaStoreClient(hiveConf));
+ new Configuration(), remoteUser, (conf, ugi) ->
createClient(hiveConf));
+ }
+
+ private static String normalizeMetastoreUris(@NonNull String metastoreUri)
{
+ String[] uris = metastoreUri.split(",");
+ List<String> cleaned = new ArrayList<>(uris.length);
+ for (String uri : uris) {
+ String trimmed = uri.trim();
+ if (!trimmed.isEmpty()) {
+ cleaned.add(trimmed);
+ }
+ }
+ return String.join(",", cleaned);
+ }
+
+ private static String getFirstMetastoreUri(@NonNull String metastoreUri) {
+ String[] uris = metastoreUri.split(",");
+ for (String uri : uris) {
+ String trimmed = uri.trim();
+ if (!trimmed.isEmpty()) {
+ return trimmed;
+ }
+ }
+ return "";
}
public Table getTable(@NonNull String dbName, @NonNull String tableName) {
diff --git
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreCatalogMetastoreUrisTest.java
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreCatalogMetastoreUrisTest.java
new file mode 100644
index 0000000000..b9152a0ef4
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreCatalogMetastoreUrisTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.utils;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import static org.mockito.Mockito.when;
+
+class HiveMetaStoreCatalogMetastoreUrisTest {
+
+ private static Object invokeStatic(String method, Class<?>[]
parameterTypes, Object... args)
+ throws Exception {
+ Method m = HiveMetaStoreCatalog.class.getDeclaredMethod(method,
parameterTypes);
+ m.setAccessible(true);
+ return m.invoke(null, args);
+ }
+
+ private static Object invoke(Object target, String method) throws
Exception {
+ Method m = HiveMetaStoreCatalog.class.getDeclaredMethod(method);
+ m.setAccessible(true);
+ return m.invoke(target);
+ }
+
+ private static void set(Object target, String field, Object value) throws
Exception {
+ Field f = HiveMetaStoreCatalog.class.getDeclaredField(field);
+ f.setAccessible(true);
+ f.set(target, value);
+ }
+
+ @Test
+ void testNormalizeMetastoreUrisNullThrows() {
+ InvocationTargetException ex =
+ Assertions.assertThrows(
+ InvocationTargetException.class,
+ () ->
+ invokeStatic(
+ "normalizeMetastoreUris",
+ new Class<?>[] {String.class},
+ (Object) null));
+ Assertions.assertInstanceOf(NullPointerException.class, ex.getCause());
+ }
+
+ @Test
+ void testNormalizeMetastoreUrisTrimsAndRemovesEmpty() throws Exception {
+ String in = " thrift://hms-1:9083, thrift://hms-2:9083 , ,";
+ String out =
+ (String) invokeStatic("normalizeMetastoreUris", new Class<?>[]
{String.class}, in);
+ Assertions.assertEquals("thrift://hms-1:9083,thrift://hms-2:9083",
out);
+ }
+
+ @Test
+ void testGetFirstMetastoreUriNullThrows() {
+ InvocationTargetException ex =
+ Assertions.assertThrows(
+ InvocationTargetException.class,
+ () ->
+ invokeStatic(
+ "getFirstMetastoreUri",
+ new Class<?>[] {String.class},
+ (Object) null));
+ Assertions.assertInstanceOf(NullPointerException.class, ex.getCause());
+ }
+
+ @Test
+ void testGetFirstMetastoreUriReturnsTrimmedFirst() throws Exception {
+ String in = " thrift://hms-1:9083, thrift://hms-2:9083";
+ String out =
+ (String) invokeStatic("getFirstMetastoreUri", new Class<?>[]
{String.class}, in);
+ Assertions.assertEquals("thrift://hms-1:9083", out);
+ }
+
+ @Test
+ void testGetFirstMetastoreUriSkipsBlankEntries() throws Exception {
+ String in = " , thrift://a:9083, thrift://b:9083";
+ String out =
+ (String) invokeStatic("getFirstMetastoreUri", new Class<?>[]
{String.class}, in);
+ Assertions.assertEquals("thrift://a:9083", out);
+ }
+
+ @Test
+ void testGetHiveServer2JdbcUrlDerivesFromFirstMetastoreUri() throws
Exception {
+ ReadonlyConfig cfg = Mockito.mock(ReadonlyConfig.class);
+ when(cfg.get(HiveOptions.METASTORE_URI))
+ .thenReturn(" thrift://namenode001:9084,
thrift://namenode001:9083");
+ HiveMetaStoreCatalog catalog = new HiveMetaStoreCatalog(cfg);
+ HiveConf hiveConf = new HiveConf();
+ hiveConf.set("hive.server2.jdbc.url", "");
+ set(catalog, "hiveConf", hiveConf);
+
+ String jdbcUrl = (String) invoke(catalog, "getHiveServer2JdbcUrl");
+ Assertions.assertEquals("jdbc:hive2://namenode001:10000/default",
jdbcUrl);
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
index e7419a146b..5baa66d486 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
@@ -97,6 +97,14 @@ public class HiveIT extends TestSuiteBase implements
TestResource {
+ " score INT"
+ ")";
+ private static final String CREATE_FAILOVER_SQL =
+ "CREATE TABLE test_hive_sink_on_hdfs_failover"
+ + "("
+ + " pk_id BIGINT,"
+ + " name STRING,"
+ + " score INT"
+ + ")";
+
private static final String HMS_HOST = "metastore";
private static final String HIVE_SERVER_HOST = "hiveserver2";
@@ -249,6 +257,7 @@ public class HiveIT extends TestSuiteBase implements
TestResource {
// Avoid fragile HMS list calls; rely on default database existing in
test images
try (Statement statement = this.hiveConnection.createStatement()) {
statement.execute(CREATE_SQL);
+ statement.execute(CREATE_FAILOVER_SQL);
statement.execute(CREATE_REGEX_DB_A_SQL);
statement.execute(CREATE_REGEX_DB_ABC_SQL);
statement.execute(CREATE_REGEX_TABLE_1_SQL);
@@ -277,6 +286,14 @@ public class HiveIT extends TestSuiteBase implements
TestResource {
executeJob(container, "/fake_to_hive.conf", "/hive_to_assert.conf");
}
+ @TestTemplate
+ public void testFakeSinkHiveWithMetastoreFailover(TestContainer container)
throws Exception {
+ executeJob(
+ container,
+ "/fake_to_hive_metastore_uri_failover.conf",
+ "/hive_to_assert_metastore_uri_failover.conf");
+ }
+
@TestTemplate
public void testHiveSourceWholeDatabaseUseRegex(TestContainer container)
throws Exception {
Container.ExecResult exec1 =
container.executeJob("/regex/fake_to_hive_regex_1.conf");
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_metastore_uri_failover.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_metastore_uri_failover.conf
new file mode 100644
index 0000000000..5a059bf07d
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_metastore_uri_failover.conf
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ FakeSource {
+ schema = {
+ fields {
+ pk_id = bigint
+ name = string
+ score = int
+ }
+ primaryKey {
+ name = "pk_id"
+ columnNames = [pk_id]
+ }
+ }
+ rows = [
+ {
+ kind = INSERT
+ fields = [1, "A", 100]
+ },
+ {
+ kind = INSERT
+ fields = [2, "B", 100]
+ },
+ {
+ kind = INSERT
+ fields = [3, "C", 100]
+ }
+ ]
+ }
+}
+
+sink {
+ Hive {
+ table_name = "default.test_hive_sink_on_hdfs_failover"
+ metastore_uri = " thrift://metastore:9084, thrift://metastore:9083 "
+ }
+}
+
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_to_assert_metastore_uri_failover.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_to_assert_metastore_uri_failover.conf
new file mode 100644
index 0000000000..5813c0dadc
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_to_assert_metastore_uri_failover.conf
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+}
+
+source {
+ Hive {
+ table_name = "default.test_hive_sink_on_hdfs_failover"
+ metastore_uri = " thrift://metastore:9084, thrift://metastore:9083 "
+ hive.hadoop.conf-path = "/tmp/hadoop"
+ plugin_output = hive_source
+ }
+}
+
+sink {
+ Assert {
+ plugin_input = hive_source
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 3
+ }
+ ],
+ field_rules = [
+ {
+ field_name = pk_id
+ field_type = bigint
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = name
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = score
+ field_type = int
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
+