This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d56b1645b2 [Feature][Connector-V2][Hive] Support multiple Hive 
metastore URIs for automatic failover (#10253)
d56b1645b2 is described below

commit d56b1645b244d2aeb91bb482165da1d86d79f4c4
Author: yzeng1618 <[email protected]>
AuthorDate: Thu Jan 8 19:17:23 2026 +0800

    [Feature][Connector-V2][Hive] Support multiple Hive metastore URIs for 
automatic failover (#10253)
    
    Co-authored-by: zengyi <[email protected]>
---
 docs/en/connector-v2/sink/Hive.md                  |  11 +-
 docs/en/connector-v2/source/Hive.md                |  13 ++-
 docs/zh/connector-v2/sink/Hive.md                  |  11 +-
 docs/zh/connector-v2/source/Hive.md                |  13 ++-
 .../seatunnel/hive/utils/HiveMetaStoreCatalog.java | 117 ++++++++++++++++++--
 .../HiveMetaStoreCatalogMetastoreUrisTest.java     | 119 +++++++++++++++++++++
 .../seatunnel/e2e/connector/hive/HiveIT.java       |  17 +++
 .../fake_to_hive_metastore_uri_failover.conf       |  59 ++++++++++
 .../hive_to_assert_metastore_uri_failover.conf     |  74 +++++++++++++
 9 files changed, 417 insertions(+), 17 deletions(-)

diff --git a/docs/en/connector-v2/sink/Hive.md 
b/docs/en/connector-v2/sink/Hive.md
index 4c8625e69b..820464d11c 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -60,7 +60,7 @@ Target Hive table name eg: db1.table1, and if the source is 
multiple mode, you c
 
 ### metastore_uri [string]
 
-Hive metastore uri
+Hive metastore uri. Supports comma-separated multiple URIs for HA/failover 
(whitespace is ignored). SeaTunnel passes this value to Hive 
`hive.metastore.uris` and uses Hive `RetryingMetaStoreClient` (if available) to 
retry/failover between URIs. This is client-side endpoint failover; make sure 
your metastores share/replicate the same backend to keep metadata consistent.
 
 ### hdfs_site_path [string]
 
@@ -159,6 +159,15 @@ Sink plugin common parameters, please refer to [Sink 
Common Options](../sink-com
 
 ```
 
+Metastore URI failover example (multiple URIs):
+
+```bash
+  Hive {
+    table_name = "default.seatunnel_orc"
+    metastore_uri = "thrift://metastore-1:9083,thrift://metastore-2:9083"
+  }
+```
+
 ### example 1
 
 We have a source table like this:
diff --git a/docs/en/connector-v2/source/Hive.md 
b/docs/en/connector-v2/source/Hive.md
index d5fb0da0a6..c51e82808f 100644
--- a/docs/en/connector-v2/source/Hive.md
+++ b/docs/en/connector-v2/source/Hive.md
@@ -83,7 +83,7 @@ Regex syntax notes:
 
 ### metastore_uri [string]
 
-Hive metastore uri
+Hive metastore uri. Supports comma-separated multiple URIs for HA/failover 
(whitespace is ignored). SeaTunnel passes this value to Hive 
`hive.metastore.uris` and uses Hive `RetryingMetaStoreClient` (if available) to 
retry/failover between URIs. This is client-side endpoint failover; make sure 
your metastores share/replicate the same backend to keep metadata consistent.
 
 ### hdfs_site_path [string]
 
@@ -147,7 +147,16 @@ Source plugin common parameters, please refer to [Source 
Common Options](../sour
 
 ```
 
-### Example 2: Multiple tables
+### Example 2: Metastore URI failover
+
+```bash
+  Hive {
+    table_name = "default.seatunnel_orc"
+    metastore_uri = "thrift://metastore-1:9083,thrift://metastore-2:9083"
+  }
+```
+
+### Example 3: Multiple tables
 > Note: Hive is a structured data source and should be use 'table_list', and 
 > 'tables_configs' will be removed in the future.
 > You can also set `use_regex = true` in each table config to match multiple 
 > tables.
 
diff --git a/docs/zh/connector-v2/sink/Hive.md 
b/docs/zh/connector-v2/sink/Hive.md
index 1ba385ba53..636c06fc2f 100644
--- a/docs/zh/connector-v2/sink/Hive.md
+++ b/docs/zh/connector-v2/sink/Hive.md
@@ -60,7 +60,7 @@ import ChangeLog from '../changelog/connector-hive.md';
 
 ### metastore_uri [string]
 
-Hive 元存储 URI
+Hive 元存储 URI。支持通过逗号分隔配置多个 URI 用于高可用/故障切换(会自动去除空格)。SeaTunnel 会将该值写入 Hive 的 
`hive.metastore.uris`,并在运行时优先使用 Hive 的 `RetryingMetaStoreClient` 
实现重试/切换。注意:该能力仅做客户端连接端点切换,元数据一致性需要由 metastore 部署保证。
 
 ### hdfs_site_path [string]
 
@@ -157,6 +157,15 @@ Sink 插件的通用参数,请参阅 [Sink Common Options](../sink-common-opti
   }
 ```
 
+metastore_uri 故障切换示例(多 URI):
+
+```bash
+  Hive {
+    table_name = "default.seatunnel_orc"
+    metastore_uri = "thrift://metastore-1:9083,thrift://metastore-2:9083"
+  }
+```
+
 ### 示例 1
 
 我们有一个源表如下:
diff --git a/docs/zh/connector-v2/source/Hive.md 
b/docs/zh/connector-v2/source/Hive.md
index 9183f622d8..7fc8aaa340 100644
--- a/docs/zh/connector-v2/source/Hive.md
+++ b/docs/zh/connector-v2/source/Hive.md
@@ -83,7 +83,7 @@ import ChangeLog from '../changelog/connector-hive.md';
 
 ### metastore_uri [string]
 
-Hive 元存储 URI
+Hive 元存储 URI。支持通过逗号分隔配置多个 URI 用于高可用/故障切换(会自动去除空格)。SeaTunnel 会将该值写入 Hive 的 
`hive.metastore.uris`,并在运行时优先使用 Hive 的 `RetryingMetaStoreClient` 
实现重试/切换。注意:该能力仅做客户端连接端点切换,元数据一致性需要由 metastore 部署保证。
 
 ### hdfs_site_path [string]
 
@@ -145,7 +145,16 @@ Kerberos 认证的 keytab 文件路径
   }
 ```
 
-### 示例 2:多表
+### 示例 2:metastore_uri 故障切换(多 URI)
+
+```bash
+  Hive {
+    table_name = "default.seatunnel_orc"
+    metastore_uri = "thrift://metastore-1:9083,thrift://metastore-2:9083"
+  }
+```
+
+### 示例 3:多表
 > 注意:Hive 是结构化数据源,应使用 `table_list`,`tables_configs` 将在未来移除。
 > 也支持在每个表配置中设置 `use_regex = true` 来按正则匹配多表。
 
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreCatalog.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreCatalog.java
index ab3cf36369..11a6d301d3 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreCatalog.java
@@ -37,6 +37,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorExc
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -50,6 +51,7 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
+import java.lang.reflect.Method;
 import java.net.MalformedURLException;
 import java.net.URI;
 import java.nio.file.Files;
@@ -69,6 +71,10 @@ import java.util.Objects;
 @Slf4j
 public class HiveMetaStoreCatalog implements Catalog, Closeable, Serializable {
     private static final List<String> HADOOP_CONF_FILES = 
ImmutableList.of("hive-site.xml");
+    private static final String RETRYING_METASTORE_CLIENT_CLASS_NAME =
+            "org.apache.hadoop.hive.metastore.RetryingMetaStoreClient";
+    private static final String 
RETRYING_METASTORE_CLIENT_NO_COMPATIBLE_GET_PROXY_MESSAGE =
+            "RetryingMetaStoreClient found but no compatible getProxy method, 
falling back to HiveMetaStoreClient";
 
     private final String metastoreUri;
     private final String hadoopConfDir;
@@ -81,7 +87,7 @@ public class HiveMetaStoreCatalog implements Catalog, 
Closeable, Serializable {
     private final String keytabPath;
     private final String remoteUser;
 
-    private transient HiveMetaStoreClient hiveClient;
+    private transient IMetaStoreClient hiveClient;
     private transient HiveConf hiveConf;
     private transient UserGroupInformation userGroupInformation;
 
@@ -105,7 +111,7 @@ public class HiveMetaStoreCatalog implements Catalog, 
Closeable, Serializable {
         return create(config);
     }
 
-    private synchronized HiveMetaStoreClient getClient() {
+    private synchronized IMetaStoreClient getClient() {
         if (hiveClient == null) {
             hiveClient = initializeClient();
         }
@@ -115,7 +121,7 @@ public class HiveMetaStoreCatalog implements Catalog, 
Closeable, Serializable {
         return hiveClient;
     }
 
-    private HiveMetaStoreClient initializeClient() {
+    private IMetaStoreClient initializeClient() {
         this.hiveConf = buildHiveConf();
         try {
             if (kerberosEnabled) {
@@ -124,7 +130,7 @@ public class HiveMetaStoreCatalog implements Catalog, 
Closeable, Serializable {
             if (remoteUserEnabled) {
                 return loginWithRemoteUser(hiveConf);
             }
-            return new HiveMetaStoreClient(hiveConf);
+            return createClient(hiveConf);
         } catch (Exception e) {
             String errMsg =
                     String.format(
@@ -135,6 +141,63 @@ public class HiveMetaStoreCatalog implements Catalog, 
Closeable, Serializable {
         }
     }
 
+    private IMetaStoreClient createClient(HiveConf hiveConf) throws Exception {
+        IMetaStoreClient retryingClient = tryCreateRetryingClient(hiveConf);
+        if (retryingClient != null) {
+            return retryingClient;
+        }
+        return new HiveMetaStoreClient(hiveConf);
+    }
+
+    private IMetaStoreClient tryCreateRetryingClient(HiveConf hiveConf) {
+        try {
+            Class<?> clazz = 
Class.forName(RETRYING_METASTORE_CLIENT_CLASS_NAME);
+            Method getProxyMethod = getProxyMethod(clazz);
+            if (getProxyMethod == null) {
+                
log.warn(RETRYING_METASTORE_CLIENT_NO_COMPATIBLE_GET_PROXY_MESSAGE);
+                return null;
+            }
+
+            Object proxy = getProxyMethod.invoke(null, hiveConf, true);
+            if (proxy instanceof IMetaStoreClient) {
+                log.info(
+                        "Using RetryingMetaStoreClient for Hive metastore 
connection [uris={}]",
+                        hiveConf.get("hive.metastore.uris"));
+                return (IMetaStoreClient) proxy;
+            }
+            
log.warn(RETRYING_METASTORE_CLIENT_NO_COMPATIBLE_GET_PROXY_MESSAGE);
+            return null;
+        } catch (ClassNotFoundException e) {
+            log.debug("RetryingMetaStoreClient not found, falling back to 
HiveMetaStoreClient", e);
+            return null;
+        } catch (Exception e) {
+            log.warn(
+                    "Failed to create RetryingMetaStoreClient proxy, falling 
back to HiveMetaStoreClient",
+                    e);
+            return null;
+        }
+    }
+
+    private static Method getProxyMethod(Class<?> clazz) {
+        // Hive 2.x: getProxy(HiveConf, boolean)
+        // Hive 3.x: getProxy(Configuration, boolean)
+        Method method = null;
+        try {
+            method = clazz.getDeclaredMethod("getProxy", HiveConf.class, 
boolean.class);
+        } catch (NoSuchMethodException ignored) {
+        }
+        if (method == null) {
+            try {
+                method = clazz.getDeclaredMethod("getProxy", 
Configuration.class, boolean.class);
+            } catch (NoSuchMethodException ignored) {
+            }
+        }
+        if (method != null) {
+            method.setAccessible(true);
+        }
+        return method;
+    }
+
     /**
      * Try to execute SQL via HiveServer2 JDBC. Returns true if successful, 
false if HiveServer2 is
      * not available or execution failed.
@@ -196,9 +259,13 @@ public class HiveMetaStoreCatalog implements Catalog, 
Closeable, Serializable {
         // Try to derive from metastore URI
         // metastore URI format: thrift://host:9083
         // HiveServer2 JDBC URL format: jdbc:hive2://host:10000/default
+        if (StringUtils.isBlank(metastoreUri)) {
+            return null;
+        }
         try {
-            if (metastoreUri != null && metastoreUri.startsWith("thrift://")) {
-                URI uri = new URI(metastoreUri);
+            String firstUri = getFirstMetastoreUri(metastoreUri);
+            if (firstUri.startsWith("thrift://")) {
+                URI uri = new URI(firstUri);
                 String host = uri.getHost();
                 if (host != null) {
                     return String.format("jdbc:hive2://%s:10000/default", 
host);
@@ -213,7 +280,12 @@ public class HiveMetaStoreCatalog implements Catalog, 
Closeable, Serializable {
 
     private HiveConf buildHiveConf() {
         HiveConf hiveConf = new HiveConf();
-        hiveConf.set("hive.metastore.uris", metastoreUri);
+        if (StringUtils.isNotBlank(metastoreUri)) {
+            String normalizedMetastoreUris = 
normalizeMetastoreUris(metastoreUri);
+            if (StringUtils.isNotBlank(normalizedMetastoreUris)) {
+                hiveConf.set("hive.metastore.uris", normalizedMetastoreUris);
+            }
+        }
         hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI, 
false);
         hiveConf.setBoolean("hive.metastore.client.capability.check", false);
         hiveConf.setBoolean("hive.metastore.client.filter.enabled", false);
@@ -244,7 +316,7 @@ public class HiveMetaStoreCatalog implements Catalog, 
Closeable, Serializable {
         return hiveConf;
     }
 
-    private HiveMetaStoreClient loginWithKerberos(HiveConf hiveConf) throws 
Exception {
+    private IMetaStoreClient loginWithKerberos(HiveConf hiveConf) throws 
Exception {
         Configuration authConf = new Configuration();
         authConf.set("hadoop.security.authentication", "kerberos");
         return HadoopLoginFactory.loginWithKerberos(
@@ -254,13 +326,36 @@ public class HiveMetaStoreCatalog implements Catalog, 
Closeable, Serializable {
                 keytabPath,
                 (conf, ugi) -> {
                     this.userGroupInformation = ugi;
-                    return new HiveMetaStoreClient(hiveConf);
+                    return createClient(hiveConf);
                 });
     }
 
-    private HiveMetaStoreClient loginWithRemoteUser(HiveConf hiveConf) throws 
Exception {
+    private IMetaStoreClient loginWithRemoteUser(HiveConf hiveConf) throws 
Exception {
         return HadoopLoginFactory.loginWithRemoteUser(
-                new Configuration(), remoteUser, (conf, ugi) -> new 
HiveMetaStoreClient(hiveConf));
+                new Configuration(), remoteUser, (conf, ugi) -> 
createClient(hiveConf));
+    }
+
+    private static String normalizeMetastoreUris(@NonNull String metastoreUri) 
{
+        String[] uris = metastoreUri.split(",");
+        List<String> cleaned = new ArrayList<>(uris.length);
+        for (String uri : uris) {
+            String trimmed = uri.trim();
+            if (!trimmed.isEmpty()) {
+                cleaned.add(trimmed);
+            }
+        }
+        return String.join(",", cleaned);
+    }
+
+    private static String getFirstMetastoreUri(@NonNull String metastoreUri) {
+        String[] uris = metastoreUri.split(",");
+        for (String uri : uris) {
+            String trimmed = uri.trim();
+            if (!trimmed.isEmpty()) {
+                return trimmed;
+            }
+        }
+        return "";
     }
 
     public Table getTable(@NonNull String dbName, @NonNull String tableName) {
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreCatalogMetastoreUrisTest.java
 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreCatalogMetastoreUrisTest.java
new file mode 100644
index 0000000000..b9152a0ef4
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreCatalogMetastoreUrisTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.utils;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveOptions;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import static org.mockito.Mockito.when;
+
+class HiveMetaStoreCatalogMetastoreUrisTest {
+
+    private static Object invokeStatic(String method, Class<?>[] 
parameterTypes, Object... args)
+            throws Exception {
+        Method m = HiveMetaStoreCatalog.class.getDeclaredMethod(method, 
parameterTypes);
+        m.setAccessible(true);
+        return m.invoke(null, args);
+    }
+
+    private static Object invoke(Object target, String method) throws 
Exception {
+        Method m = HiveMetaStoreCatalog.class.getDeclaredMethod(method);
+        m.setAccessible(true);
+        return m.invoke(target);
+    }
+
+    private static void set(Object target, String field, Object value) throws 
Exception {
+        Field f = HiveMetaStoreCatalog.class.getDeclaredField(field);
+        f.setAccessible(true);
+        f.set(target, value);
+    }
+
+    @Test
+    void testNormalizeMetastoreUrisNullThrows() {
+        InvocationTargetException ex =
+                Assertions.assertThrows(
+                        InvocationTargetException.class,
+                        () ->
+                                invokeStatic(
+                                        "normalizeMetastoreUris",
+                                        new Class<?>[] {String.class},
+                                        (Object) null));
+        Assertions.assertInstanceOf(NullPointerException.class, ex.getCause());
+    }
+
+    @Test
+    void testNormalizeMetastoreUrisTrimsAndRemovesEmpty() throws Exception {
+        String in = " thrift://hms-1:9083, thrift://hms-2:9083 , ,";
+        String out =
+                (String) invokeStatic("normalizeMetastoreUris", new Class<?>[] 
{String.class}, in);
+        Assertions.assertEquals("thrift://hms-1:9083,thrift://hms-2:9083", 
out);
+    }
+
+    @Test
+    void testGetFirstMetastoreUriNullThrows() {
+        InvocationTargetException ex =
+                Assertions.assertThrows(
+                        InvocationTargetException.class,
+                        () ->
+                                invokeStatic(
+                                        "getFirstMetastoreUri",
+                                        new Class<?>[] {String.class},
+                                        (Object) null));
+        Assertions.assertInstanceOf(NullPointerException.class, ex.getCause());
+    }
+
+    @Test
+    void testGetFirstMetastoreUriReturnsTrimmedFirst() throws Exception {
+        String in = " thrift://hms-1:9083, thrift://hms-2:9083";
+        String out =
+                (String) invokeStatic("getFirstMetastoreUri", new Class<?>[] 
{String.class}, in);
+        Assertions.assertEquals("thrift://hms-1:9083", out);
+    }
+
+    @Test
+    void testGetFirstMetastoreUriSkipsBlankEntries() throws Exception {
+        String in = " , thrift://a:9083, thrift://b:9083";
+        String out =
+                (String) invokeStatic("getFirstMetastoreUri", new Class<?>[] 
{String.class}, in);
+        Assertions.assertEquals("thrift://a:9083", out);
+    }
+
+    @Test
+    void testGetHiveServer2JdbcUrlDerivesFromFirstMetastoreUri() throws 
Exception {
+        ReadonlyConfig cfg = Mockito.mock(ReadonlyConfig.class);
+        when(cfg.get(HiveOptions.METASTORE_URI))
+                .thenReturn(" thrift://namenode001:9084, 
thrift://namenode001:9083");
+        HiveMetaStoreCatalog catalog = new HiveMetaStoreCatalog(cfg);
+        HiveConf hiveConf = new HiveConf();
+        hiveConf.set("hive.server2.jdbc.url", "");
+        set(catalog, "hiveConf", hiveConf);
+
+        String jdbcUrl = (String) invoke(catalog, "getHiveServer2JdbcUrl");
+        Assertions.assertEquals("jdbc:hive2://namenode001:10000/default", 
jdbcUrl);
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
index e7419a146b..5baa66d486 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/java/org/apache/seatunnel/e2e/connector/hive/HiveIT.java
@@ -97,6 +97,14 @@ public class HiveIT extends TestSuiteBase implements 
TestResource {
                     + "    score  INT"
                     + ")";
 
+    private static final String CREATE_FAILOVER_SQL =
+            "CREATE TABLE test_hive_sink_on_hdfs_failover"
+                    + "("
+                    + "    pk_id  BIGINT,"
+                    + "    name   STRING,"
+                    + "    score  INT"
+                    + ")";
+
     private static final String HMS_HOST = "metastore";
     private static final String HIVE_SERVER_HOST = "hiveserver2";
 
@@ -249,6 +257,7 @@ public class HiveIT extends TestSuiteBase implements 
TestResource {
         // Avoid fragile HMS list calls; rely on default database existing in 
test images
         try (Statement statement = this.hiveConnection.createStatement()) {
             statement.execute(CREATE_SQL);
+            statement.execute(CREATE_FAILOVER_SQL);
             statement.execute(CREATE_REGEX_DB_A_SQL);
             statement.execute(CREATE_REGEX_DB_ABC_SQL);
             statement.execute(CREATE_REGEX_TABLE_1_SQL);
@@ -277,6 +286,14 @@ public class HiveIT extends TestSuiteBase implements 
TestResource {
         executeJob(container, "/fake_to_hive.conf", "/hive_to_assert.conf");
     }
 
+    @TestTemplate
+    public void testFakeSinkHiveWithMetastoreFailover(TestContainer container) 
throws Exception {
+        executeJob(
+                container,
+                "/fake_to_hive_metastore_uri_failover.conf",
+                "/hive_to_assert_metastore_uri_failover.conf");
+    }
+
     @TestTemplate
     public void testHiveSourceWholeDatabaseUseRegex(TestContainer container) 
throws Exception {
         Container.ExecResult exec1 = 
container.executeJob("/regex/fake_to_hive_regex_1.conf");
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_metastore_uri_failover.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_metastore_uri_failover.conf
new file mode 100644
index 0000000000..5a059bf07d
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/fake_to_hive_metastore_uri_failover.conf
@@ -0,0 +1,59 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        pk_id = bigint
+        name = string
+        score = int
+      }
+      primaryKey {
+        name = "pk_id"
+        columnNames = [pk_id]
+      }
+    }
+    rows = [
+      {
+        kind = INSERT
+        fields = [1, "A", 100]
+      },
+      {
+        kind = INSERT
+        fields = [2, "B", 100]
+      },
+      {
+        kind = INSERT
+        fields = [3, "C", 100]
+      }
+    ]
+  }
+}
+
+sink {
+  Hive {
+    table_name = "default.test_hive_sink_on_hdfs_failover"
+    metastore_uri = " thrift://metastore:9084, thrift://metastore:9083 "
+  }
+}
+
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_to_assert_metastore_uri_failover.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_to_assert_metastore_uri_failover.conf
new file mode 100644
index 0000000000..5813c0dadc
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-hive-e2e/src/test/resources/hive_to_assert_metastore_uri_failover.conf
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  Hive {
+    table_name = "default.test_hive_sink_on_hdfs_failover"
+    metastore_uri = " thrift://metastore:9084, thrift://metastore:9083 "
+    hive.hadoop.conf-path = "/tmp/hadoop"
+    plugin_output = hive_source
+  }
+}
+
+sink {
+  Assert {
+    plugin_input = hive_source
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 3
+        }
+      ],
+      field_rules = [
+        {
+          field_name = pk_id
+          field_type = bigint
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = score
+          field_type = int
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
+

Reply via email to