This is an automated email from the ASF dual-hosted git repository.

fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 60de61930b [#9564] feat(flink): Flink connector supports user 
authentication (#9565)
60de61930b is described below

commit 60de61930b1761f01a8aa98e65554d06f761147a
Author: roryqi <[email protected]>
AuthorDate: Sun Jan 4 19:30:45 2026 +0800

    [#9564] feat(flink): Flink connector supports user authentication (#9565)
    
    ### What changes were proposed in this pull request?
    
    Flink connector supports user authentication.
    
    ### Why are the changes needed?
    
    Fix: #9564
    
    ### Does this PR introduce _any_ user-facing change?
    
    Add the documents.
    
    ### How was this patch tested?
    
    Add integration tests.
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 .../gravitino/client/KerberosTokenProvider.java    | 205 +++++++++++++----
 .../flink-authentication-with-gravitino.md         |  50 +++++
 flink-connector/flink/build.gradle.kts             |   1 +
 .../connector/catalog/GravitinoCatalogManager.java |  97 +++++++-
 .../store/GravitinoCatalogStoreFactoryOptions.java |   8 +
 .../connector/integration/test/FlinkEnvIT.java     |   6 +-
 .../integration/test/hive/FlinkHiveCatalogIT.java  |  24 ++
 .../test/hive/FlinkHiveKerberosClientIT.java       | 243 +++++++++++++++++++++
 .../gravitino/integration/test/util/BaseIT.java    |  31 ++-
 9 files changed, 622 insertions(+), 43 deletions(-)

diff --git 
a/clients/client-java/src/main/java/org/apache/gravitino/client/KerberosTokenProvider.java
 
b/clients/client-java/src/main/java/org/apache/gravitino/client/KerberosTokenProvider.java
index 849c13b282..f3688700fa 100644
--- 
a/clients/client-java/src/main/java/org/apache/gravitino/client/KerberosTokenProvider.java
+++ 
b/clients/client-java/src/main/java/org/apache/gravitino/client/KerberosTokenProvider.java
@@ -24,10 +24,14 @@ import com.google.common.base.Splitter;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.time.Instant;
 import java.util.Base64;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosKey;
+import javax.security.auth.kerberos.KerberosPrincipal;
 import javax.security.auth.kerberos.KerberosTicket;
 import javax.security.auth.login.LoginContext;
 import javax.security.auth.login.LoginException;
@@ -49,7 +53,7 @@ public final class KerberosTokenProvider implements 
AuthDataProvider {
   private String clientPrincipal;
   private String keytabFile;
   private String host = "localhost";
-  private LoginContext loginContext;
+  private SubjectProvider subjectProvider;
 
   private KerberosTokenProvider() {}
 
@@ -82,22 +86,11 @@ public final class KerberosTokenProvider implements 
AuthDataProvider {
   private byte[] getTokenInternal() throws Exception {
     @SuppressWarnings("null")
     List<String> principalComponents = 
Splitter.on('@').splitToList(clientPrincipal);
-    // Gravitino server's principal must start with HTTP. This restriction 
follows
-    // the style of Apache Hadoop.
     String serverPrincipal = "HTTP/" + host + "@" + principalComponents.get(1);
-
-    synchronized (this) {
-      if (loginContext == null) {
-        loginContext = KerberosUtils.login(clientPrincipal, keytabFile);
-      } else if (isLoginTicketExpired() && keytabFile != null) {
-        // We only support use keytab to re-login context
-        loginContext.logout();
-        loginContext = KerberosUtils.login(clientPrincipal, keytabFile);
-      }
-    }
+    Subject currentSubject = subjectProvider.get();
 
     return KerberosUtils.doAs(
-        loginContext.getSubject(),
+        currentSubject,
         new Callable<byte[]>() {
           @Override
           public byte[] call() throws Exception {
@@ -127,24 +120,11 @@ public final class KerberosTokenProvider implements 
AuthDataProvider {
         });
   }
 
-  @SuppressWarnings("JavaUtilDate")
-  private boolean isLoginTicketExpired() {
-    Set<KerberosTicket> tickets =
-        loginContext.getSubject().getPrivateCredentials(KerberosTicket.class);
-
-    if (tickets.isEmpty()) {
-      return false;
-    }
-
-    return tickets.iterator().next().getEndTime().getTime() < 
System.currentTimeMillis();
-  }
-
-  /** Closes the KerberosTokenProvider and releases any underlying resources. 
*/
   @Override
   public void close() throws IOException {
     try {
-      if (loginContext != null) {
-        loginContext.logout();
+      if (subjectProvider != null) {
+        subjectProvider.close();
       }
     } catch (LoginException le) {
       throw new IOException("Fail to close login context", le);
@@ -155,6 +135,113 @@ public final class KerberosTokenProvider implements 
AuthDataProvider {
     this.host = host;
   }
 
+  /**
+   * Strategy interface for providing Kerberos Subject credentials.
+   *
+   * <p>There are two strategies:
+   *
+   * <ul>
+   *   <li>{@link ExistingSubjectProvider} - Reuses credentials maintained by 
the framework (e.g.,
+   *       Flink)
+   *   <li>{@link LoginSubjectProvider} - Creates and manages new credentials 
using keytab and
+   *       principal
+   * </ul>
+   */
+  private interface SubjectProvider {
+    Subject get() throws LoginException;
+
+    void close() throws LoginException;
+  }
+
+  /**
+   * SubjectProvider that reuses existing Kerberos credentials from the 
framework.
+   *
+   * <p>When Flink (or another framework) has already logged in using keytab 
and principal, this
+   * provider reuses those credentials instead of creating new ones. This 
approach:
+   *
+   * <ul>
+   *   <li>Avoids redundant Kerberos logins
+   *   <li>Lets the framework manage credential lifecycle (renewal, expiration)
+   *   <li>Reduces complexity - the client doesn't need to maintain credentials
+   * </ul>
+   *
+   * <p>The Subject is obtained from the current AccessControlContext and 
contains KerberosKey or
+   * KerberosTicket credentials managed by the framework.
+   */
+  private static final class ExistingSubjectProvider implements 
SubjectProvider {
+    private final Subject subject;
+
+    ExistingSubjectProvider(Subject subject) {
+      this.subject = subject;
+    }
+
+    @Override
+    public Subject get() {
+      return subject;
+    }
+
+    @Override
+    public void close() {
+      // no-op: The framework owns the Subject and is responsible for its 
lifecycle
+    }
+  }
+
+  /**
+   * SubjectProvider that performs Kerberos login using keytab and principal.
+   *
+   * <p>This provider is used when no existing Kerberos credentials are 
available from the
+   * framework. It:
+   *
+   * <ul>
+   *   <li>Performs initial login using the provided keytab file and principal
+   *   <li>Manages the LoginContext lifecycle
+   *   <li>Automatically re-authenticates when the TGT (Ticket Granting 
Ticket) expires
+   * </ul>
+   *
+   * <p>This provider is responsible for credential lifecycle management 
including renewal and
+   * cleanup.
+   */
+  private static final class LoginSubjectProvider implements SubjectProvider {
+    private final String principal;
+    private final String keytabFile;
+    private LoginContext loginContext;
+
+    LoginSubjectProvider(String principal, String keytabFile) {
+      this.principal = principal;
+      this.keytabFile = keytabFile;
+    }
+
+    @Override
+    public synchronized Subject get() throws LoginException {
+      // Perform initial login if not already logged in
+      if (loginContext == null) {
+        loginContext = KerberosUtils.login(principal, keytabFile);
+      } else if (keytabFile != null && isLoginTicketExpired(loginContext)) {
+        // If the TGT (Ticket Granting Ticket) has expired, logout and 
re-authenticate
+        // This ensures we always have valid credentials for GSS context 
negotiation
+        loginContext.logout();
+        loginContext = KerberosUtils.login(principal, keytabFile);
+      }
+      return loginContext.getSubject();
+    }
+
+    @Override
+    public void close() throws LoginException {
+      if (loginContext != null) {
+        loginContext.logout();
+      }
+    }
+
+    private boolean isLoginTicketExpired(LoginContext ctx) {
+      Set<KerberosTicket> tickets = 
ctx.getSubject().getPrivateCredentials(KerberosTicket.class);
+      if (tickets.isEmpty()) {
+        return false;
+      }
+      // For one principal, there should be only one TGT ticket
+      return 
tickets.iterator().next().getEndTime().toInstant().isBefore(Instant.now());
+    }
+  }
+
   /**
    * Creates a new instance of the KerberosTokenProvider.Builder
    *
@@ -194,19 +281,41 @@ public final class KerberosTokenProvider implements 
AuthDataProvider {
     /**
      * Builds the instance of the KerberosTokenProvider.
      *
+     * <p>This method determines the authentication strategy:
+     *
+     * <ul>
+     *   <li>If Kerberos credentials already exist in the current context 
(e.g., Flink has logged
+     *       in), use {@link ExistingSubjectProvider} to reuse those 
credentials
+     *   <li>Otherwise, use {@link LoginSubjectProvider} to perform a new 
Kerberos login using the
+     *       provided keytab and principal
+     * </ul>
+     *
      * @return The built KerberosTokenProvider instance.
      */
-    @SuppressWarnings("null")
+    @SuppressWarnings("removal")
     public KerberosTokenProvider build() {
       KerberosTokenProvider provider = new KerberosTokenProvider();
 
-      Preconditions.checkArgument(
-          StringUtils.isNotBlank(clientPrincipal),
-          "KerberosTokenProvider must set clientPrincipal");
-      Preconditions.checkArgument(
-          Splitter.on('@').splitToList(clientPrincipal).size() == 2,
-          "Principal has the wrong format");
-      provider.clientPrincipal = clientPrincipal;
+      // Check if the framework (e.g., Flink) has already established Kerberos 
credentials
+      java.security.AccessControlContext context = 
java.security.AccessController.getContext();
+      Subject subject = Subject.getSubject(context);
+
+      // If credentials exist (KerberosKey or KerberosTicket), reuse them
+      // This avoids redundant logins when Flink has already authenticated 
with Kerberos
+      if (subject != null
+          && (!subject.getPrivateCredentials(KerberosKey.class).isEmpty()
+              || 
!subject.getPrivateCredentials(KerberosTicket.class).isEmpty())) {
+        // Use ExistingSubjectProvider: framework manages the credentials
+        provider.subjectProvider = new ExistingSubjectProvider(subject);
+
+        extractPrincipalFromSubject(subject);
+        setProviderClientPrincipal(provider);
+
+        return provider;
+      }
+
+      // No existing credentials found - we need to login ourselves
+      setProviderClientPrincipal(provider);
 
       if (keyTabFile != null) {
         Preconditions.checkArgument(
@@ -216,7 +325,29 @@ public final class KerberosTokenProvider implements 
AuthDataProvider {
         provider.keytabFile = keyTabFile.getAbsolutePath();
       }
 
+      // Use LoginSubjectProvider: client manages the credentials
+      // This provider will login using keytab/principal and handle ticket 
renewal
+      provider.subjectProvider =
+          new LoginSubjectProvider(provider.clientPrincipal, 
provider.keytabFile);
       return provider;
     }
+
+    private void setProviderClientPrincipal(KerberosTokenProvider provider) {
+      Preconditions.checkArgument(
+          StringUtils.isNotBlank(clientPrincipal),
+          "KerberosTokenProvider must set clientPrincipal");
+      Preconditions.checkArgument(
+          Splitter.on('@').splitToList(clientPrincipal).size() == 2,
+          "Principal has the wrong format");
+      provider.clientPrincipal = clientPrincipal;
+    }
+
+    private void extractPrincipalFromSubject(Subject subject) {
+      clientPrincipal =
+          subject.getPrincipals(KerberosPrincipal.class).stream()
+              .findFirst()
+              .map(Object::toString)
+              .orElse(null);
+    }
   }
 }
diff --git a/docs/flink-connector/flink-authentication-with-gravitino.md 
b/docs/flink-connector/flink-authentication-with-gravitino.md
new file mode 100644
index 0000000000..da5c099fd3
--- /dev/null
+++ b/docs/flink-connector/flink-authentication-with-gravitino.md
@@ -0,0 +1,50 @@
+---
+title: "Flink authentication with Gravitino server"
+slug: /flink-connector/flink-authentication
+keyword: flink connector authentication oauth2 kerberos
+license: "This software is licensed under the Apache License version 2."
+---
+
+## Overview
+
+Flink connector supports `simple`, `oauth2`, and `kerberos` authentication 
when accessing the Gravitino server.
+
+| Property                                                  | Type   | Default 
Value | Description                                                             
                                                                   | Required | 
Since Version |
+|-----------------------------------------------------------|--------|---------------|--------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------|
+| table.catalog-store.gravitino.gravitino.client.auth.type  | string | (none)  
      | When explicitly set, only `oauth` is supported. If unset, Flink selects 
Kerberos or simple authentication based on its security settings.  | No       | 
1.2.0         |
+
+## Simple mode
+
+In simple mode, the username originates from Flink. The resolution order is:
+1. `HADOOP_USER_NAME` environment variable
+2. The logged-in OS user
+
+## OAuth2 mode
+
+In OAuth2 mode, configure the following settings to fetch an OAuth2 token to 
access the Gravitino server:
+
+| Property                                                              | Type 
  | Default Value | Description                                      | Required 
                      | Since Version |
+|-----------------------------------------------------------------------|--------|---------------|--------------------------------------------------|--------------------------------|---------------|
+| table.catalog-store.gravitino.gravitino.client.oauth2.serverUri       | 
string | (none)        | The OAuth2 server URI.                           | 
Yes, for OAuth2 mode           | 1.2.0         |
+| table.catalog-store.gravitino.gravitino.client.oauth2.tokenPath       | 
string | (none)        | The token endpoint path on the OAuth2 server.    | 
Yes, for OAuth2 mode           | 1.2.0         |
+| table.catalog-store.gravitino.gravitino.client.oauth2.credential      | 
string | (none)        | The credential used to request the OAuth2 token. | 
Yes, for OAuth2 mode           | 1.2.0         |
+| table.catalog-store.gravitino.gravitino.client.oauth2.scope           | 
string | (none)        | The scope used to request the OAuth2 token.      | 
Yes, for OAuth2 mode           | 1.2.0         |
+
+### OAuth2 Configuration Example
+
+```yaml
+table.catalog-store.kind: gravitino
+table.catalog-store.gravitino.gravitino.uri: http://localhost:8090
+table.catalog-store.gravitino.gravitino.metalake: my_metalake
+table.catalog-store.gravitino.gravitino.client.auth.type: oauth2
+table.catalog-store.gravitino.gravitino.client.oauth2.serverUri: 
https://oauth-server.example.com
+table.catalog-store.gravitino.gravitino.client.oauth2.tokenPath: /oauth/token
+table.catalog-store.gravitino.gravitino.client.oauth2.credential: 
your-client-credentials
+table.catalog-store.gravitino.gravitino.client.oauth2.scope: your-scope
+```
+
+## Kerberos mode
+
+In Kerberos mode, use Flink security configurations to obtain a Kerberos 
ticket for accessing the Gravitino server. Configure 
`security.kerberos.login.principal` and `security.kerberos.login.keytab` for 
the Kerberos principal and keytab.
+
+The Gravitino server principal follows the pattern `HTTP/$host@$realm`; ensure 
`$host` matches the host specified in the Gravitino server URI. Ensure 
`krb5.conf` is available to Flink, for example via 
`-Djava.security.krb5.conf=/path/to/krb5.conf` in Flink JVM options.
diff --git a/flink-connector/flink/build.gradle.kts 
b/flink-connector/flink/build.gradle.kts
index 9e5659fb0c..b653814cba 100644
--- a/flink-connector/flink/build.gradle.kts
+++ b/flink-connector/flink/build.gradle.kts
@@ -96,6 +96,7 @@ dependencies {
   testImplementation(libs.testcontainers.mysql)
   testImplementation(libs.metrics.core)
   testImplementation(libs.flinkjdbc)
+  testImplementation(libs.minikdc)
 
   testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
   
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")
diff --git 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
index 0735be2519..bd355ab6f5 100644
--- 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
+++ 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
@@ -21,12 +21,19 @@ package org.apache.gravitino.flink.connector.catalog;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.Sets;
+import java.security.PrivilegedAction;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Catalog;
+import org.apache.gravitino.auth.AuthenticatorType;
+import org.apache.gravitino.client.DefaultOAuth2TokenProvider;
 import org.apache.gravitino.client.GravitinoAdminClient;
 import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.client.KerberosTokenProvider;
+import 
org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,8 +61,35 @@ public class GravitinoCatalogManager {
     this.gravitinoUri = gravitinoUri;
     this.metalakeName = metalakeName;
     this.gravitinoClientConfig = gravitinoClientConfig;
-    this.gravitinoClient =
-        
GravitinoAdminClient.builder(gravitinoUri).withClientConfig(gravitinoClientConfig).build();
+
+    String authType = 
gravitinoClientConfig.get(GravitinoCatalogStoreFactoryOptions.AUTH_TYPE);
+
+    // Only OAuth is explicitly configured; otherwise follow Flink security 
(Kerberos if enabled,
+    // simple auth otherwise).
+    if (AuthenticatorType.OAUTH.name().equalsIgnoreCase(authType)) {
+      this.gravitinoClient = buildOAuthClient(gravitinoUri, 
gravitinoClientConfig);
+    } else {
+      if (authType != null) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Unsupported auth type '%s'. Only OAUTH is supported; leave %s 
unset to use Flink Kerberos settings (or simple auth if security is disabled).",
+                authType, GravitinoCatalogStoreFactoryOptions.AUTH_TYPE));
+      }
+
+      if (UserGroupInformation.isSecurityEnabled()) {
+        if (getUgi().getAuthenticationMethod()
+            != UserGroupInformation.AuthenticationMethod.KERBEROS) {
+          throw new IllegalStateException(
+              String.format(
+                  "Flink security is enabled, but current user authentication 
method is %s rather than KERBEROS",
+                  getUgi().getAuthenticationMethod()));
+        }
+        this.gravitinoClient = buildKerberosClient(gravitinoUri, 
gravitinoClientConfig);
+      } else {
+        this.gravitinoClient = buildSimpleClient(gravitinoUri, 
gravitinoClientConfig);
+      }
+    }
+
     this.metalake = gravitinoClient.loadMetalake(metalakeName);
   }
 
@@ -206,4 +240,63 @@ public class GravitinoCatalogManager {
         && gravitinoCatalogManager.metalakeName.equals(metalakeName)
         && 
gravitinoCatalogManager.gravitinoClientConfig.equals(gravitinoClientConfig);
   }
+
+  private static GravitinoAdminClient buildOAuthClient(
+      String gravitinoUri, Map<String, String> config) {
+    String serverUri = 
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_SERVER_URI);
+    String credential = 
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_CREDENTIAL);
+    String path = 
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_TOKEN_PATH);
+    String scope = 
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_SCOPE);
+    Preconditions.checkArgument(
+        StringUtils.isNoneBlank(serverUri, credential, path, scope),
+        String.format(
+            "OAuth2 authentication requires: %s, %s, %s, and %s",
+            GravitinoCatalogStoreFactoryOptions.OAUTH2_SERVER_URI,
+            GravitinoCatalogStoreFactoryOptions.OAUTH2_CREDENTIAL,
+            GravitinoCatalogStoreFactoryOptions.OAUTH2_TOKEN_PATH,
+            GravitinoCatalogStoreFactoryOptions.OAUTH2_SCOPE));
+
+    DefaultOAuth2TokenProvider provider =
+        DefaultOAuth2TokenProvider.builder()
+            .withUri(serverUri)
+            .withCredential(credential)
+            .withPath(path)
+            .withScope(scope)
+            .build();
+
+    return GravitinoAdminClient.builder(gravitinoUri)
+        .withOAuth(provider)
+        .withClientConfig(config)
+        .build();
+  }
+
+  private static GravitinoAdminClient buildKerberosClient(
+      String gravitinoUri, Map<String, String> config) {
+
+    return getUgi()
+        .doAs(
+            (PrivilegedAction<GravitinoAdminClient>)
+                () ->
+                    GravitinoAdminClient.builder(gravitinoUri)
+                        
.withKerberosAuth(KerberosTokenProvider.builder().build())
+                        .withClientConfig(config)
+                        .build());
+  }
+
+  private static GravitinoAdminClient buildSimpleClient(
+      String gravitinoUri, Map<String, String> config) {
+    String userName = getUgi().getUserName();
+    return GravitinoAdminClient.builder(gravitinoUri)
+        .withSimpleAuth(userName)
+        .withClientConfig(config)
+        .build();
+  }
+
+  private static UserGroupInformation getUgi() {
+    try {
+      return UserGroupInformation.getCurrentUser();
+    } catch (Exception e) {
+      throw new IllegalStateException("Unable to get current user group 
information", e);
+    }
+  }
 }
diff --git 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java
 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java
index fb304c4ab3..f989c3d138 100644
--- 
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java
+++ 
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java
@@ -46,4 +46,12 @@ public class GravitinoCatalogStoreFactoryOptions {
           .mapType()
           .defaultValue(ImmutableMap.of())
           .withDescription("The config of Gravitino client");
+
+  public static final String AUTH_TYPE = "gravitino.client.auth.type";
+
+  // OAuth2 config keys
+  public static final String OAUTH2_SERVER_URI = 
"gravitino.client.oauth2.serverUri";
+  public static final String OAUTH2_CREDENTIAL = 
"gravitino.client.oauth2.credential";
+  public static final String OAUTH2_TOKEN_PATH = 
"gravitino.client.oauth2.tokenPath";
+  public static final String OAUTH2_SCOPE = "gravitino.client.oauth2.scope";
 }
diff --git 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
index 404d220aea..b275d81a78 100644
--- 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
+++ 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
@@ -68,7 +68,7 @@ public abstract class FlinkEnvIT extends BaseIT {
 
   protected static FileSystem hdfs;
 
-  private static String gravitinoUri = "http://127.0.0.1:8090";;
+  protected static String gravitinoUri = "http://localhost:8090";;
 
   private final String lakeHouseIcebergProvider = "lakehouse-iceberg";
 
@@ -143,7 +143,7 @@ public abstract class FlinkEnvIT extends BaseIT {
   private void initGravitinoEnv() {
     // Gravitino server is already started by AbstractIT, just construct 
gravitinoUrl
     int gravitinoPort = getGravitinoServerPort();
-    gravitinoUri = String.format("http://127.0.0.1:%d";, gravitinoPort);
+    gravitinoUri = String.format("http://localhost:%d";, gravitinoPort);
     if (lakeHouseIcebergProvider.equalsIgnoreCase(getProvider())) {
       this.icebergRestServiceUri = getIcebergRestServiceUri();
     }
@@ -183,7 +183,7 @@ public abstract class FlinkEnvIT extends BaseIT {
     }
   }
 
-  private static void initFlinkEnv() {
+  protected void initFlinkEnv() {
     final Configuration configuration = new Configuration();
     configuration.setString(
         "table.catalog-store.kind", 
GravitinoCatalogStoreFactoryOptions.GRAVITINO);
diff --git 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
index 8274d166eb..f64170f811 100644
--- 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
+++ 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import java.io.IOException;
+import java.security.PrivilegedAction;
 import java.util.Arrays;
 import java.util.Map;
 import java.util.Optional;
@@ -58,6 +59,7 @@ import 
org.apache.gravitino.rel.expressions.transforms.Transforms;
 import org.apache.gravitino.rel.types.Types;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
@@ -67,6 +69,7 @@ import org.junit.jupiter.api.Test;
 @Tag("gravitino-docker-test")
 public class FlinkHiveCatalogIT extends FlinkCommonIT {
   private static final String DEFAULT_HIVE_CATALOG = 
"test_flink_hive_schema_catalog";
+  private static final String FLINK_USER_NAME = "gravitino";
 
   private static org.apache.gravitino.Catalog hiveCatalog;
 
@@ -103,7 +106,9 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
     int numCatalogs = tableEnv.listCatalogs().length;
 
     // Create a new catalog.
+
     String catalogName = "gravitino_hive";
+
     Configuration configuration = new Configuration();
     configuration.set(
         CommonCatalogOptions.CATALOG_TYPE, 
GravitinoHiveCatalogFactoryOptions.IDENTIFIER);
@@ -115,6 +120,8 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
 
     // Check the catalog properties.
     org.apache.gravitino.Catalog gravitinoCatalog = 
metalake.loadCatalog(catalogName);
+    Assertions.assertEquals(FLINK_USER_NAME, 
gravitinoCatalog.auditInfo().creator());
+
     Map<String, String> properties = gravitinoCatalog.properties();
     Assertions.assertEquals(hiveMetastoreUri, 
properties.get(HiveConstants.METASTORE_URIS));
     Map<String, String> flinkProperties =
@@ -605,4 +612,21 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
   protected boolean supportDropCascade() {
     return true;
   }
+
+  @Override
+  protected void initFlinkEnv() {
+    try {
+      UserGroupInformation proxyUser =
+          UserGroupInformation.createProxyUser(
+              FLINK_USER_NAME, UserGroupInformation.getCurrentUser());
+      proxyUser.doAs(
+          (PrivilegedAction<Void>)
+              () -> {
+                super.initFlinkEnv();
+                return null;
+              });
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to obtain UGI for Flink user", e);
+    }
+  }
 }
diff --git 
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveKerberosClientIT.java
 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveKerberosClientIT.java
new file mode 100644
index 0000000000..e87f60af1b
--- /dev/null
+++ 
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveKerberosClientIT.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.flink.connector.integration.test.hive;
+
+import static org.apache.gravitino.server.authentication.KerberosConfig.KEYTAB;
+import static 
org.apache.gravitino.server.authentication.KerberosConfig.PRINCIPAL;
+import static org.apache.hadoop.minikdc.MiniKdc.MAX_TICKET_LIFETIME;
+
+import com.google.common.collect.Maps;
+import java.io.File;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.auth.AuthenticatorType;
+import org.apache.gravitino.catalog.hive.HiveConstants;
+import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalog;
+import 
org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions;
+import org.apache.gravitino.flink.connector.integration.test.FlinkEnvIT;
+import 
org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions;
+import org.apache.hadoop.minikdc.KerberosSecurityTestcase;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration test for creating Gravitino Hive catalogs with Kerberos 
authentication. This test
+ * verifies that catalog creation works correctly when the Gravitino server is 
configured with
+ * Kerberos authentication. The test extends FlinkEnvIT directly to keep the 
test scope focused on
+ * Kerberos-specific scenarios.
+ */
+@Tag("gravitino-docker-test")
+public class FlinkHiveKerberosClientIT extends FlinkEnvIT {
+
+  private static final KerberosSecurityTestcase kdc =
+      new KerberosSecurityTestcase() {
+        @Override
+        public void createMiniKdcConf() {
+          super.createMiniKdcConf();
+          // Use a very short ticket lifetime to speed up Kerberos expiration 
in tests.
+          // The test operations are executed immediately after the MiniKdc is 
started and
+          // are not long-running, so a 5-second lifetime is sufficient and 
keeps the test
+          // fast. If this test ever becomes flaky on slower environments or 
CI systems,
+          // consider increasing this value to a more forgiving lifetime (for 
example, 60s).
+          getConf().setProperty(MAX_TICKET_LIFETIME, "5");
+        }
+      };
+
+  private static final String keytabFile =
+      new File(System.getProperty("test.dir", "target"), 
UUID.randomUUID().toString())
+          .getAbsolutePath();
+
+  // Server principals for Gravitino server
+  private static final String serverPrincipal = "HTTP/[email protected]";
+  private static final String serverPrincipalWithAll = 
"HTTP/[email protected]";
+
+  // Client principal for authentication
+  private static final String clientPrincipal = "[email protected]";
+
+  @Override
+  protected String getProvider() {
+    return "hive";
+  }
+
+  @Override
+  @BeforeAll
+  public void startIntegrationTest() throws Exception {
+    // Set up Kerberos before starting integration test
+    kdc.startMiniKdc();
+    initKeyTab();
+
+    // Configure a Gravitino server with Kerberos
+    Map<String, String> configs = Maps.newHashMap();
+    configs.put(Configs.AUTHENTICATORS.getKey(), 
AuthenticatorType.KERBEROS.name().toLowerCase());
+    configs.put(PRINCIPAL.getKey(), serverPrincipal);
+    configs.put(KEYTAB.getKey(), keytabFile);
+    configs.put("client.kerberos.principal", clientPrincipal);
+    configs.put("client.kerberos.keytab", keytabFile);
+
+    registerCustomConfigs(configs);
+
+    // Start the integration test (starts Gravitino server)
+    super.startIntegrationTest();
+  }
+
+  @Override
+  @AfterAll
+  public void stopIntegrationTest() {
+    try {
+      super.stopIntegrationTest();
+      UserGroupInformation.setConfiguration(new 
org.apache.hadoop.conf.Configuration(false));
+      kdc.stopMiniKdc();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to stop Kerberos test", e);
+    }
+  }
+
+  @Override
+  protected void initFlinkEnv() {
+    // Initialize Kerberos authentication for Hadoop UGI.
+    // This approach follows Flink's HadoopLoginModule pattern for Kerberos 
authentication,
+    // which performs a login from keytab to establish a secure context before 
creating
+    // Flink table environment. See 
org.apache.flink.runtime.security.modules.HadoopLoginModule
+    // in Flink source code for reference.
+    org.apache.hadoop.conf.Configuration hadoopConfig = new 
org.apache.hadoop.conf.Configuration();
+
+    hadoopConfig.set("hadoop.security.authentication", "kerberos");
+    hadoopConfig.set("hadoop.security.auth_to_local", 
"RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//");
+
+    UserGroupInformation.setConfiguration(hadoopConfig);
+    UserGroupInformation proxyUser = null;
+    try {
+      proxyUser = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(clientPrincipal, 
keytabFile);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to obtain UGI for Kerberos user", e);
+    }
+
+    final Configuration configuration = new Configuration();
+    configuration.setString(
+        "table.catalog-store.kind", 
GravitinoCatalogStoreFactoryOptions.GRAVITINO);
+    
configuration.setString("table.catalog-store.gravitino.gravitino.metalake", 
GRAVITINO_METALAKE);
+    configuration.setString("table.catalog-store.gravitino.gravitino.uri", 
gravitinoUri);
+    EnvironmentSettings.Builder builder =
+        EnvironmentSettings.newInstance().withConfiguration(configuration);
+    tableEnv =
+        proxyUser.doAs(
+            (PrivilegedAction<TableEnvironment>)
+                () -> TableEnvironment.create(builder.inBatchMode().build()));
+  }
+
+  @Test
+  public void testCreateGravitinoHiveCatalogWithKerberosAuth() {
+    tableEnv.useCatalog(DEFAULT_CATALOG);
+    int numCatalogs = tableEnv.listCatalogs().length;
+
+    // Create a new catalog with Kerberos authentication using SQL
+    String catalogName = "gravitino_hive_kerberos";
+    tableEnv.executeSql(
+        String.format(
+            "CREATE CATALOG %s WITH ("
+                + "'type'='gravitino-hive', "
+                + "'hive-conf-dir'='src/test/resources/flink-tests',"
+                + "'hive.metastore.uris'='%s'"
+                + ")",
+            catalogName, hiveMetastoreUri));
+
+    // Verify catalog exists in Gravitino
+    Assertions.assertTrue(metalake.catalogExists(catalogName));
+
+    // Check the catalog properties
+    org.apache.gravitino.Catalog gravitinoCatalog = 
metalake.loadCatalog(catalogName);
+    Assertions.assertNotNull(gravitinoCatalog);
+    Assertions.assertEquals(catalogName, gravitinoCatalog.name());
+
+    // Assert creator is the Kerberos-authenticated client principal
+    Assertions.assertEquals("client", gravitinoCatalog.auditInfo().creator());
+
+    Map<String, String> properties = gravitinoCatalog.properties();
+    Assertions.assertEquals(hiveMetastoreUri, 
properties.get(HiveConstants.METASTORE_URIS));
+
+    // Verify Flink-specific properties are stored correctly
+    Map<String, String> flinkProperties =
+        properties.entrySet().stream()
+            .filter(e -> 
e.getKey().startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX))
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+    Assertions.assertEquals(2, flinkProperties.size());
+    Assertions.assertEquals(
+        "src/test/resources/flink-tests",
+        
flinkProperties.get(flinkByPass(HiveCatalogFactoryOptions.HIVE_CONF_DIR.key())));
+    Assertions.assertEquals(
+        GravitinoHiveCatalogFactoryOptions.IDENTIFIER,
+        
flinkProperties.get(flinkByPass(CommonCatalogOptions.CATALOG_TYPE.key())));
+
+    // Get the created catalog
+    Optional<Catalog> catalog = tableEnv.getCatalog(catalogName);
+    Assertions.assertTrue(catalog.isPresent());
+    Assertions.assertInstanceOf(GravitinoHiveCatalog.class, catalog.get());
+
+    // List catalogs
+    String[] catalogs = tableEnv.listCatalogs();
+    Assertions.assertEquals(numCatalogs + 1, catalogs.length, "Should create a 
new catalog");
+    Assertions.assertTrue(
+        Arrays.asList(catalogs).contains(catalogName), "Should create the 
correct catalog.");
+
+    // Use the catalog
+    tableEnv.useCatalog(catalogName);
+    Assertions.assertEquals(
+        catalogName,
+        tableEnv.getCurrentCatalog(),
+        "Current catalog should be the Kerberos-authenticated catalog.");
+
+    // Drop the catalog
+    tableEnv.useCatalog(DEFAULT_CATALOG);
+    tableEnv.executeSql("DROP CATALOG " + catalogName);
+    Assertions.assertFalse(metalake.catalogExists(catalogName));
+
+    Optional<Catalog> droppedCatalog = tableEnv.getCatalog(catalogName);
+    Assertions.assertFalse(droppedCatalog.isPresent(), "Catalog should be 
dropped");
+  }
+
+  private static void initKeyTab() throws Exception {
+    File newKeytabFile = new File(keytabFile);
+    String newClientPrincipal = removeRealm(clientPrincipal);
+    String newServerPrincipal = removeRealm(serverPrincipal);
+    String newServerPrincipalAll = removeRealm(serverPrincipalWithAll);
+    kdc.getKdc()
+        .createPrincipal(
+            newKeytabFile, newClientPrincipal, newServerPrincipal, 
newServerPrincipalAll);
+  }
+
+  private static String removeRealm(String principal) {
+    return principal.substring(0, principal.lastIndexOf("@"));
+  }
+}
diff --git 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/BaseIT.java
 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/BaseIT.java
index 0cbf803655..b39bf40662 100644
--- 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/BaseIT.java
+++ 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/BaseIT.java
@@ -57,6 +57,7 @@ import org.apache.gravitino.Configs;
 import org.apache.gravitino.auth.AuthenticatorType;
 import org.apache.gravitino.auxiliary.AuxiliaryServiceManager;
 import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.client.KerberosTokenProvider;
 import org.apache.gravitino.config.ConfigConstants;
 import org.apache.gravitino.integration.test.MiniGravitino;
 import org.apache.gravitino.integration.test.MiniGravitinoContext;
@@ -138,6 +139,21 @@ public class BaseIT {
     customConfigs.putAll(configs);
   }
 
+  /**
+   * Creates a KerberosTokenProvider with the given principal and keytab file 
path.
+   *
+   * @param principal The Kerberos principal (e.g., "[email protected]")
+   * @param keytabPath The path to the keytab file
+   * @return A configured KerberosTokenProvider instance
+   */
+  protected static KerberosTokenProvider createKerberosTokenProvider(
+      String principal, String keytabPath) {
+    return KerberosTokenProvider.builder()
+        .withClientPrincipal(principal)
+        .withKeyTabFile(new File(keytabPath))
+        .build();
+  }
+
   protected int getLanceRESTServerPort() {
     JettyServerConfig lanceServerConfig =
         JettyServerConfig.fromConfig(serverConfig, LANCE_CONFIG_PREFIX);
@@ -406,7 +422,20 @@ public class BaseIT {
       }
     } else if 
(authenticators.contains(AuthenticatorType.KERBEROS.name().toLowerCase())) {
       serverUri = "http://localhost:"; + jettyServerConfig.getHttpPort();
-      client = null;
+      // Get Kerberos configuration from custom configs
+      String principal = customConfigs.get("client.kerberos.principal");
+      String keytabPath = customConfigs.get("client.kerberos.keytab");
+
+      if (principal != null && keytabPath != null) {
+        KerberosTokenProvider kerberosTokenProvider =
+            createKerberosTokenProvider(principal, keytabPath);
+        client =
+            
GravitinoAdminClient.builder(serverUri).withKerberosAuth(kerberosTokenProvider).build();
+      } else {
+        LOG.warn(
+            "Kerberos authentication configured but principal or keytab not 
provided. Client will be null.");
+        client = null;
+      }
     } else {
       client = GravitinoAdminClient.builder(serverUri).build();
     }


Reply via email to