This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 124eb29248 [Feature][Hive] Kerberos renew for >24h Hive and HDFS 
writes (#9989)
124eb29248 is described below

commit 124eb292480b81bc4bbe511bedec43c571cc78a5
Author: Adam Wang <[email protected]>
AuthorDate: Fri Oct 31 18:07:42 2025 +0800

    [Feature][Hive] Kerberos renew for >24h Hive and HDFS writes (#9989)
---
 .../file/hadoop/HadoopFileSystemProxy.java         |  18 ++++
 .../HadoopFileSystemProxyKerberosRenewTest.java    | 109 ++++++++++++++++++++
 .../seatunnel/hive/utils/HiveMetaStoreProxy.java   |  23 ++++-
 .../utils/HiveMetaStoreProxyKerberosRenewTest.java | 112 +++++++++++++++++++++
 4 files changed, 261 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
index da61cc0cb1..0c62d26bd4 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxy.java
@@ -340,10 +340,28 @@ public class HadoopFileSystemProxy implements 
Serializable, Closeable {
         }
 
         try {
+            // Ensure Kerberos ticket is valid for long-running jobs
+            maybeRelogin();
             return userGroupInformation.doAs(action);
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
             throw new IOException(e);
         }
     }
+
+    private void maybeRelogin() {
+        if (!isAuthTypeKerberos) {
+            return;
+        }
+        if (userGroupInformation == null) {
+            return;
+        }
+        try {
+            if (userGroupInformation.isFromKeytab()) {
+                userGroupInformation.checkTGTAndReloginFromKeytab();
+            }
+        } catch (IOException e) {
+            log.warn("Kerberos re-login from keytab failed: {}", 
e.getMessage());
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxyKerberosRenewTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxyKerberosRenewTest.java
new file mode 100644
index 0000000000..d777470e03
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/hadoop/HadoopFileSystemProxyKerberosRenewTest.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.connectors.seatunnel.file.hadoop;
+
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class HadoopFileSystemProxyKerberosRenewTest {
+
+    private static void set(Object target, String field, Object value) throws 
Exception {
+        Field f = target.getClass().getDeclaredField(field);
+        f.setAccessible(true);
+        f.set(target, value);
+    }
+
+    private static Object invoke(Object target, String method) throws 
Exception {
+        Method m = target.getClass().getDeclaredMethod(method);
+        m.setAccessible(true);
+        return m.invoke(target);
+    }
+
+    @Test
+    void testMaybeReloginFromKeytabCallsCheck() throws Exception {
+        HadoopConf conf = new HadoopConf("file:///");
+        HadoopFileSystemProxy proxy = new HadoopFileSystemProxy(conf);
+
+        UserGroupInformation ugi = Mockito.mock(UserGroupInformation.class);
+        when(ugi.isFromKeytab()).thenReturn(true);
+
+        set(proxy, "isAuthTypeKerberos", true);
+        set(proxy, "userGroupInformation", ugi);
+
+        // invoke private maybeRelogin()
+        invoke(proxy, "maybeRelogin");
+
+        verify(ugi, times(1)).checkTGTAndReloginFromKeytab();
+    }
+
+    @Test
+    void testMaybeReloginNotFromKeytabNoCheck() throws Exception {
+        HadoopConf conf = new HadoopConf("file:///");
+        HadoopFileSystemProxy proxy = new HadoopFileSystemProxy(conf);
+
+        UserGroupInformation ugi = Mockito.mock(UserGroupInformation.class);
+        when(ugi.isFromKeytab()).thenReturn(false);
+
+        set(proxy, "isAuthTypeKerberos", true);
+        set(proxy, "userGroupInformation", ugi);
+
+        invoke(proxy, "maybeRelogin");
+
+        verify(ugi, never()).checkTGTAndReloginFromKeytab();
+    }
+
+    @Test
+    void testMaybeReloginCheckThrowsSwallowed() throws Exception {
+        HadoopConf conf = new HadoopConf("file:///");
+        HadoopFileSystemProxy proxy = new HadoopFileSystemProxy(conf);
+
+        UserGroupInformation ugi = Mockito.mock(UserGroupInformation.class);
+        when(ugi.isFromKeytab()).thenReturn(true);
+        doThrow(new 
IOException("test")).when(ugi).checkTGTAndReloginFromKeytab();
+
+        set(proxy, "isAuthTypeKerberos", true);
+        set(proxy, "userGroupInformation", ugi);
+
+        // should not throw out
+        Assertions.assertDoesNotThrow(
+                () -> {
+                    try {
+                        invoke(proxy, "maybeRelogin");
+                    } catch (Exception e) {
+                        // unwrap reflection InvocationTargetException if any
+                        throw new RuntimeException(e);
+                    }
+                });
+
+        verify(ugi, times(1)).checkTGTAndReloginFromKeytab();
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
index df397f6f40..5120c1d0bc 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.TException;
 
 import lombok.NonNull;
@@ -65,6 +66,7 @@ public class HiveMetaStoreProxy implements Closeable, 
Serializable {
     private final String remoteUser;
 
     private transient HiveMetaStoreClient hiveClient;
+    private transient UserGroupInformation userGroupInformation;
 
     public HiveMetaStoreProxy(ReadonlyConfig config) {
         this.metastoreUri = config.get(HiveOptions.METASTORE_URI);
@@ -82,6 +84,9 @@ public class HiveMetaStoreProxy implements Closeable, 
Serializable {
         if (hiveClient == null) {
             hiveClient = initializeClient();
         }
+        if (kerberosEnabled) {
+            maybeRelogin();
+        }
         return hiveClient;
     }
 
@@ -140,7 +145,10 @@ public class HiveMetaStoreProxy implements Closeable, 
Serializable {
                 krb5Path,
                 principal,
                 keytabPath,
-                (conf, ugi) -> new HiveMetaStoreClient(hiveConf));
+                (conf, ugi) -> {
+                    this.userGroupInformation = ugi;
+                    return new HiveMetaStoreClient(hiveConf);
+                });
     }
 
     private HiveMetaStoreClient loginWithRemoteUser(HiveConf hiveConf) throws 
Exception {
@@ -184,4 +192,17 @@ public class HiveMetaStoreProxy implements Closeable, 
Serializable {
             hiveClient.close();
         }
     }
+
+    private void maybeRelogin() {
+        if (userGroupInformation == null) {
+            return;
+        }
+        try {
+            if (userGroupInformation.isFromKeytab()) {
+                userGroupInformation.checkTGTAndReloginFromKeytab();
+            }
+        } catch (Exception e) {
+            log.warn("Kerberos re-login for HiveMetaStore failed: {}", 
e.getMessage());
+        }
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyKerberosRenewTest.java
 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyKerberosRenewTest.java
new file mode 100644
index 0000000000..a054339add
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-hive/src/test/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxyKerberosRenewTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.hive.utils;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class HiveMetaStoreProxyKerberosRenewTest {
+
+    private static void set(Object target, String field, Object value) throws 
Exception {
+        Field f = target.getClass().getDeclaredField(field);
+        f.setAccessible(true);
+        f.set(target, value);
+    }
+
+    private static Object invoke(Object target, String method) throws 
Exception {
+        Method m = target.getClass().getDeclaredMethod(method);
+        m.setAccessible(true);
+        return m.invoke(target);
+    }
+
+    @Test
+    void testGetClientTriggersMaybeReloginFromKeytab() throws Exception {
+        ReadonlyConfig cfg = Mockito.mock(ReadonlyConfig.class);
+        HiveMetaStoreProxy proxy = new HiveMetaStoreProxy(cfg);
+
+        HiveMetaStoreClient client = Mockito.mock(HiveMetaStoreClient.class);
+        UserGroupInformation ugi = Mockito.mock(UserGroupInformation.class);
+        when(ugi.isFromKeytab()).thenReturn(true);
+
+        set(proxy, "hiveClient", client);
+        set(proxy, "userGroupInformation", ugi);
+        set(proxy, "kerberosEnabled", true);
+
+        HiveMetaStoreClient out = (HiveMetaStoreClient) invoke(proxy, 
"getClient");
+        Assertions.assertNotNull(out);
+        verify(ugi, times(1)).checkTGTAndReloginFromKeytab();
+    }
+
+    @Test
+    void testGetClientTriggersMaybeReloginNotFromKeytab() throws Exception {
+        ReadonlyConfig cfg = Mockito.mock(ReadonlyConfig.class);
+        HiveMetaStoreProxy proxy = new HiveMetaStoreProxy(cfg);
+
+        HiveMetaStoreClient client = Mockito.mock(HiveMetaStoreClient.class);
+        UserGroupInformation ugi = Mockito.mock(UserGroupInformation.class);
+        when(ugi.isFromKeytab()).thenReturn(false);
+
+        set(proxy, "hiveClient", client);
+        set(proxy, "userGroupInformation", ugi);
+        set(proxy, "kerberosEnabled", true);
+
+        HiveMetaStoreClient out = (HiveMetaStoreClient) invoke(proxy, 
"getClient");
+        Assertions.assertNotNull(out);
+        verify(ugi, never()).checkTGTAndReloginFromKeytab();
+    }
+
+    @Test
+    void testGetClientReloginThrowsSwallowed() throws Exception {
+        ReadonlyConfig cfg = Mockito.mock(ReadonlyConfig.class);
+        HiveMetaStoreProxy proxy = new HiveMetaStoreProxy(cfg);
+
+        HiveMetaStoreClient client = Mockito.mock(HiveMetaStoreClient.class);
+        UserGroupInformation ugi = Mockito.mock(UserGroupInformation.class);
+        when(ugi.isFromKeytab()).thenReturn(true);
+        doThrow(new 
RuntimeException("test")).when(ugi).checkTGTAndReloginFromKeytab();
+
+        set(proxy, "hiveClient", client);
+        set(proxy, "userGroupInformation", ugi);
+        set(proxy, "kerberosEnabled", true);
+
+        Assertions.assertDoesNotThrow(
+                () -> {
+                    try {
+                        invoke(proxy, "getClient");
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                });
+        verify(ugi, times(1)).checkTGTAndReloginFromKeytab();
+    }
+}

Reply via email to