This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 60de61930b [#9564] feat(flink): Flink connector supports user
authentication (#9565)
60de61930b is described below
commit 60de61930b1761f01a8aa98e65554d06f761147a
Author: roryqi <[email protected]>
AuthorDate: Sun Jan 4 19:30:45 2026 +0800
[#9564] feat(flink): Flink connector supports user authentication (#9565)
### What changes were proposed in this pull request?
Flink connector supports user authentication.
### Why are the changes needed?
Fix: #9564
### Does this PR introduce _any_ user-facing change?
Add the documents.
### How was this patch tested?
Add integration tests.
---------
Co-authored-by: Copilot <[email protected]>
---
.../gravitino/client/KerberosTokenProvider.java | 205 +++++++++++++----
.../flink-authentication-with-gravitino.md | 50 +++++
flink-connector/flink/build.gradle.kts | 1 +
.../connector/catalog/GravitinoCatalogManager.java | 97 +++++++-
.../store/GravitinoCatalogStoreFactoryOptions.java | 8 +
.../connector/integration/test/FlinkEnvIT.java | 6 +-
.../integration/test/hive/FlinkHiveCatalogIT.java | 24 ++
.../test/hive/FlinkHiveKerberosClientIT.java | 243 +++++++++++++++++++++
.../gravitino/integration/test/util/BaseIT.java | 31 ++-
9 files changed, 622 insertions(+), 43 deletions(-)
diff --git
a/clients/client-java/src/main/java/org/apache/gravitino/client/KerberosTokenProvider.java
b/clients/client-java/src/main/java/org/apache/gravitino/client/KerberosTokenProvider.java
index 849c13b282..f3688700fa 100644
---
a/clients/client-java/src/main/java/org/apache/gravitino/client/KerberosTokenProvider.java
+++
b/clients/client-java/src/main/java/org/apache/gravitino/client/KerberosTokenProvider.java
@@ -24,10 +24,14 @@ import com.google.common.base.Splitter;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.time.Instant;
import java.util.Base64;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosKey;
+import javax.security.auth.kerberos.KerberosPrincipal;
import javax.security.auth.kerberos.KerberosTicket;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
@@ -49,7 +53,7 @@ public final class KerberosTokenProvider implements
AuthDataProvider {
private String clientPrincipal;
private String keytabFile;
private String host = "localhost";
- private LoginContext loginContext;
+ private SubjectProvider subjectProvider;
private KerberosTokenProvider() {}
@@ -82,22 +86,11 @@ public final class KerberosTokenProvider implements
AuthDataProvider {
private byte[] getTokenInternal() throws Exception {
@SuppressWarnings("null")
List<String> principalComponents =
Splitter.on('@').splitToList(clientPrincipal);
- // Gravitino server's principal must start with HTTP. This restriction
follows
- // the style of Apache Hadoop.
String serverPrincipal = "HTTP/" + host + "@" + principalComponents.get(1);
-
- synchronized (this) {
- if (loginContext == null) {
- loginContext = KerberosUtils.login(clientPrincipal, keytabFile);
- } else if (isLoginTicketExpired() && keytabFile != null) {
- // We only support use keytab to re-login context
- loginContext.logout();
- loginContext = KerberosUtils.login(clientPrincipal, keytabFile);
- }
- }
+ Subject currentSubject = subjectProvider.get();
return KerberosUtils.doAs(
- loginContext.getSubject(),
+ currentSubject,
new Callable<byte[]>() {
@Override
public byte[] call() throws Exception {
@@ -127,24 +120,11 @@ public final class KerberosTokenProvider implements
AuthDataProvider {
});
}
- @SuppressWarnings("JavaUtilDate")
- private boolean isLoginTicketExpired() {
- Set<KerberosTicket> tickets =
- loginContext.getSubject().getPrivateCredentials(KerberosTicket.class);
-
- if (tickets.isEmpty()) {
- return false;
- }
-
- return tickets.iterator().next().getEndTime().getTime() <
System.currentTimeMillis();
- }
-
- /** Closes the KerberosTokenProvider and releases any underlying resources.
*/
@Override
public void close() throws IOException {
try {
- if (loginContext != null) {
- loginContext.logout();
+ if (subjectProvider != null) {
+ subjectProvider.close();
}
} catch (LoginException le) {
throw new IOException("Fail to close login context", le);
@@ -155,6 +135,113 @@ public final class KerberosTokenProvider implements
AuthDataProvider {
this.host = host;
}
+ /**
+ * Strategy interface for providing Kerberos Subject credentials.
+ *
+ * <p>There are two strategies:
+ *
+ * <ul>
+ * <li>{@link ExistingSubjectProvider} - Reuses credentials maintained by
the framework (e.g.,
+ * Flink)
+ * <li>{@link LoginSubjectProvider} - Creates and manages new credentials
using keytab and
+ * principal
+ * </ul>
+ */
+ private interface SubjectProvider {
+ Subject get() throws LoginException;
+
+ void close() throws LoginException;
+ }
+
+ /**
+ * SubjectProvider that reuses existing Kerberos credentials from the
framework.
+ *
+ * <p>When Flink (or another framework) has already logged in using keytab
and principal, this
+ * provider reuses those credentials instead of creating new ones. This
approach:
+ *
+ * <ul>
+ * <li>Avoids redundant Kerberos logins
+ * <li>Lets the framework manage credential lifecycle (renewal, expiration)
+ * <li>Reduces complexity - the client doesn't need to maintain credentials
+ * </ul>
+ *
+ * <p>The Subject is obtained from the current AccessControlContext and
contains KerberosKey or
+ * KerberosTicket credentials managed by the framework.
+ */
+ private static final class ExistingSubjectProvider implements
SubjectProvider {
+ private final Subject subject;
+
+ ExistingSubjectProvider(Subject subject) {
+ this.subject = subject;
+ }
+
+ @Override
+ public Subject get() {
+ return subject;
+ }
+
+ @Override
+ public void close() {
+ // no-op: The framework owns the Subject and is responsible for its
lifecycle
+ }
+ }
+
+ /**
+ * SubjectProvider that performs Kerberos login using keytab and principal.
+ *
+ * <p>This provider is used when no existing Kerberos credentials are
available from the
+ * framework. It:
+ *
+ * <ul>
+ * <li>Performs initial login using the provided keytab file and principal
+ * <li>Manages the LoginContext lifecycle
+ * <li>Automatically re-authenticates when the TGT (Ticket Granting
Ticket) expires
+ * </ul>
+ *
+ * <p>This provider is responsible for credential lifecycle management
including renewal and
+ * cleanup.
+ */
+ private static final class LoginSubjectProvider implements SubjectProvider {
+ private final String principal;
+ private final String keytabFile;
+ private LoginContext loginContext;
+
+ LoginSubjectProvider(String principal, String keytabFile) {
+ this.principal = principal;
+ this.keytabFile = keytabFile;
+ }
+
+ @Override
+ public synchronized Subject get() throws LoginException {
+ // Perform initial login if not already logged in
+ if (loginContext == null) {
+ loginContext = KerberosUtils.login(principal, keytabFile);
+ } else if (keytabFile != null && isLoginTicketExpired(loginContext)) {
+ // If the TGT (Ticket Granting Ticket) has expired, logout and
re-authenticate
+ // This ensures we always have valid credentials for GSS context
negotiation
+ loginContext.logout();
+ loginContext = KerberosUtils.login(principal, keytabFile);
+ }
+ return loginContext.getSubject();
+ }
+
+ @Override
+ public void close() throws LoginException {
+ if (loginContext != null) {
+ loginContext.logout();
+ }
+ }
+
+ private boolean isLoginTicketExpired(LoginContext ctx) {
+ Set<KerberosTicket> tickets =
ctx.getSubject().getPrivateCredentials(KerberosTicket.class);
+ if (tickets.isEmpty()) {
+ return false;
+ }
+ // For one principal, there should be only one TGT ticket
+ return
tickets.iterator().next().getEndTime().toInstant().isBefore(Instant.now());
+ }
+ }
+
/**
* Creates a new instance of the KerberosTokenProvider.Builder
*
@@ -194,19 +281,41 @@ public final class KerberosTokenProvider implements
AuthDataProvider {
/**
* Builds the instance of the KerberosTokenProvider.
*
+ * <p>This method determines the authentication strategy:
+ *
+ * <ul>
+ * <li>If Kerberos credentials already exist in the current context
(e.g., Flink has logged
+ * in), use {@link ExistingSubjectProvider} to reuse those
credentials
+ * <li>Otherwise, use {@link LoginSubjectProvider} to perform a new
Kerberos login using the
+ * provided keytab and principal
+ * </ul>
+ *
* @return The built KerberosTokenProvider instance.
*/
- @SuppressWarnings("null")
+ @SuppressWarnings("removal")
public KerberosTokenProvider build() {
KerberosTokenProvider provider = new KerberosTokenProvider();
- Preconditions.checkArgument(
- StringUtils.isNotBlank(clientPrincipal),
- "KerberosTokenProvider must set clientPrincipal");
- Preconditions.checkArgument(
- Splitter.on('@').splitToList(clientPrincipal).size() == 2,
- "Principal has the wrong format");
- provider.clientPrincipal = clientPrincipal;
+ // Check if the framework (e.g., Flink) has already established Kerberos
credentials
+ java.security.AccessControlContext context =
java.security.AccessController.getContext();
+ Subject subject = Subject.getSubject(context);
+
+ // If credentials exist (KerberosKey or KerberosTicket), reuse them
+ // This avoids redundant logins when Flink has already authenticated
with Kerberos
+ if (subject != null
+ && (!subject.getPrivateCredentials(KerberosKey.class).isEmpty()
+ ||
!subject.getPrivateCredentials(KerberosTicket.class).isEmpty())) {
+ // Use ExistingSubjectProvider: framework manages the credentials
+ provider.subjectProvider = new ExistingSubjectProvider(subject);
+
+ extractPrincipalFromSubject(subject);
+ setProviderClientPrincipal(provider);
+
+ return provider;
+ }
+
+ // No existing credentials found - we need to login ourselves
+ setProviderClientPrincipal(provider);
if (keyTabFile != null) {
Preconditions.checkArgument(
@@ -216,7 +325,29 @@ public final class KerberosTokenProvider implements
AuthDataProvider {
provider.keytabFile = keyTabFile.getAbsolutePath();
}
+ // Use LoginSubjectProvider: client manages the credentials
+ // This provider will login using keytab/principal and handle ticket
renewal
+ provider.subjectProvider =
+ new LoginSubjectProvider(provider.clientPrincipal,
provider.keytabFile);
return provider;
}
+
+ private void setProviderClientPrincipal(KerberosTokenProvider provider) {
+ Preconditions.checkArgument(
+ StringUtils.isNotBlank(clientPrincipal),
+ "KerberosTokenProvider must set clientPrincipal");
+ Preconditions.checkArgument(
+ Splitter.on('@').splitToList(clientPrincipal).size() == 2,
+ "Principal has the wrong format");
+ provider.clientPrincipal = clientPrincipal;
+ }
+
+ private void extractPrincipalFromSubject(Subject subject) {
+ clientPrincipal =
+ subject.getPrincipals(KerberosPrincipal.class).stream()
+ .findFirst()
+ .map(Object::toString)
+ .orElse(null);
+ }
}
}
diff --git a/docs/flink-connector/flink-authentication-with-gravitino.md
b/docs/flink-connector/flink-authentication-with-gravitino.md
new file mode 100644
index 0000000000..da5c099fd3
--- /dev/null
+++ b/docs/flink-connector/flink-authentication-with-gravitino.md
@@ -0,0 +1,50 @@
+---
+title: "Flink authentication with Gravitino server"
+slug: /flink-connector/flink-authentication
+keyword: flink connector authentication oauth2 kerberos
+license: "This software is licensed under the Apache License version 2."
+---
+
+## Overview
+
+Flink connector supports `simple`, `oauth2`, and `kerberos` authentication
when accessing the Gravitino server.
+
+| Property | Type | Default
Value | Description
| Required |
Since Version |
+|-----------------------------------------------------------|--------|---------------|--------------------------------------------------------------------------------------------------------------------------------------------|----------|---------------|
+| table.catalog-store.gravitino.gravitino.client.auth.type | string | (none)
| When explicitly set, only `oauth` is supported. If unset, Flink selects
Kerberos or simple authentication based on its security settings. | No |
1.2.0 |
+
+## Simple mode
+
+In simple mode, the username originates from Flink. The resolution order is:
+1. `HADOOP_USER_NAME` environment variable
+2. The logged-in OS user
+
+## OAuth2 mode
+
+In OAuth2 mode, configure the following settings to fetch an OAuth2 token to
access the Gravitino server:
+
+| Property | Type
| Default Value | Description | Required
| Since Version |
+|-----------------------------------------------------------------------|--------|---------------|--------------------------------------------------|--------------------------------|---------------|
+| table.catalog-store.gravitino.gravitino.client.oauth2.serverUri |
string | (none) | The OAuth2 server URI. |
Yes, for OAuth2 mode | 1.2.0 |
+| table.catalog-store.gravitino.gravitino.client.oauth2.tokenPath |
string | (none) | The token endpoint path on the OAuth2 server. |
Yes, for OAuth2 mode | 1.2.0 |
+| table.catalog-store.gravitino.gravitino.client.oauth2.credential |
string | (none) | The credential used to request the OAuth2 token. |
Yes, for OAuth2 mode | 1.2.0 |
+| table.catalog-store.gravitino.gravitino.client.oauth2.scope |
string | (none) | The scope used to request the OAuth2 token. |
Yes, for OAuth2 mode | 1.2.0 |
+
+### OAuth2 Configuration Example
+
+```yaml
+table.catalog-store.kind: gravitino
+table.catalog-store.gravitino.gravitino.uri: http://localhost:8090
+table.catalog-store.gravitino.gravitino.metalake: my_metalake
+table.catalog-store.gravitino.gravitino.client.auth.type: oauth2
+table.catalog-store.gravitino.gravitino.client.oauth2.serverUri:
https://oauth-server.example.com
+table.catalog-store.gravitino.gravitino.client.oauth2.tokenPath: /oauth/token
+table.catalog-store.gravitino.gravitino.client.oauth2.credential:
your-client-credentials
+table.catalog-store.gravitino.gravitino.client.oauth2.scope: your-scope
+```
+
+## Kerberos mode
+
+In Kerberos mode, use Flink security configurations to obtain a Kerberos
ticket for accessing the Gravitino server. Configure
`security.kerberos.login.principal` and `security.kerberos.login.keytab` for
the Kerberos principal and keytab.
+
+The Gravitino server principal follows the pattern `HTTP/$host@$realm`; ensure
`$host` matches the host specified in the Gravitino server URI. Ensure
`krb5.conf` is available to Flink, for example via
`-Djava.security.krb5.conf=/path/to/krb5.conf` in Flink JVM options.
diff --git a/flink-connector/flink/build.gradle.kts
b/flink-connector/flink/build.gradle.kts
index 9e5659fb0c..b653814cba 100644
--- a/flink-connector/flink/build.gradle.kts
+++ b/flink-connector/flink/build.gradle.kts
@@ -96,6 +96,7 @@ dependencies {
testImplementation(libs.testcontainers.mysql)
testImplementation(libs.metrics.core)
testImplementation(libs.flinkjdbc)
+ testImplementation(libs.minikdc)
testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
index 0735be2519..bd355ab6f5 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/GravitinoCatalogManager.java
@@ -21,12 +21,19 @@ package org.apache.gravitino.flink.connector.catalog;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Sets;
+import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.lang3.StringUtils;
import org.apache.gravitino.Catalog;
+import org.apache.gravitino.auth.AuthenticatorType;
+import org.apache.gravitino.client.DefaultOAuth2TokenProvider;
import org.apache.gravitino.client.GravitinoAdminClient;
import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.client.KerberosTokenProvider;
+import
org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions;
+import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,8 +61,35 @@ public class GravitinoCatalogManager {
this.gravitinoUri = gravitinoUri;
this.metalakeName = metalakeName;
this.gravitinoClientConfig = gravitinoClientConfig;
- this.gravitinoClient =
-
GravitinoAdminClient.builder(gravitinoUri).withClientConfig(gravitinoClientConfig).build();
+
+ String authType =
gravitinoClientConfig.get(GravitinoCatalogStoreFactoryOptions.AUTH_TYPE);
+
+ // Only OAuth is explicitly configured; otherwise follow Flink security
(Kerberos if enabled,
+ // simple auth otherwise).
+ if (AuthenticatorType.OAUTH.name().equalsIgnoreCase(authType)) {
+ this.gravitinoClient = buildOAuthClient(gravitinoUri,
gravitinoClientConfig);
+ } else {
+ if (authType != null) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Unsupported auth type '%s'. Only OAUTH is supported; leave %s
unset to use Flink Kerberos settings (or simple auth if security is disabled).",
+ authType, GravitinoCatalogStoreFactoryOptions.AUTH_TYPE));
+ }
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ if (getUgi().getAuthenticationMethod()
+ != UserGroupInformation.AuthenticationMethod.KERBEROS) {
+ throw new IllegalStateException(
+ String.format(
+ "Flink security is enabled, but current user authentication
method is %s rather than KERBEROS",
+ getUgi().getAuthenticationMethod()));
+ }
+ this.gravitinoClient = buildKerberosClient(gravitinoUri,
gravitinoClientConfig);
+ } else {
+ this.gravitinoClient = buildSimpleClient(gravitinoUri,
gravitinoClientConfig);
+ }
+ }
+
this.metalake = gravitinoClient.loadMetalake(metalakeName);
}
@@ -206,4 +240,63 @@ public class GravitinoCatalogManager {
&& gravitinoCatalogManager.metalakeName.equals(metalakeName)
&&
gravitinoCatalogManager.gravitinoClientConfig.equals(gravitinoClientConfig);
}
+
+ private static GravitinoAdminClient buildOAuthClient(
+ String gravitinoUri, Map<String, String> config) {
+ String serverUri =
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_SERVER_URI);
+ String credential =
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_CREDENTIAL);
+ String path =
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_TOKEN_PATH);
+ String scope =
config.get(GravitinoCatalogStoreFactoryOptions.OAUTH2_SCOPE);
+ Preconditions.checkArgument(
+ StringUtils.isNoneBlank(serverUri, credential, path, scope),
+ String.format(
+ "OAuth2 authentication requires: %s, %s, %s, and %s",
+ GravitinoCatalogStoreFactoryOptions.OAUTH2_SERVER_URI,
+ GravitinoCatalogStoreFactoryOptions.OAUTH2_CREDENTIAL,
+ GravitinoCatalogStoreFactoryOptions.OAUTH2_TOKEN_PATH,
+ GravitinoCatalogStoreFactoryOptions.OAUTH2_SCOPE));
+
+ DefaultOAuth2TokenProvider provider =
+ DefaultOAuth2TokenProvider.builder()
+ .withUri(serverUri)
+ .withCredential(credential)
+ .withPath(path)
+ .withScope(scope)
+ .build();
+
+ return GravitinoAdminClient.builder(gravitinoUri)
+ .withOAuth(provider)
+ .withClientConfig(config)
+ .build();
+ }
+
+ private static GravitinoAdminClient buildKerberosClient(
+ String gravitinoUri, Map<String, String> config) {
+
+ return getUgi()
+ .doAs(
+ (PrivilegedAction<GravitinoAdminClient>)
+ () ->
+ GravitinoAdminClient.builder(gravitinoUri)
+
.withKerberosAuth(KerberosTokenProvider.builder().build())
+ .withClientConfig(config)
+ .build());
+ }
+
+ private static GravitinoAdminClient buildSimpleClient(
+ String gravitinoUri, Map<String, String> config) {
+ String userName = getUgi().getUserName();
+ return GravitinoAdminClient.builder(gravitinoUri)
+ .withSimpleAuth(userName)
+ .withClientConfig(config)
+ .build();
+ }
+
+ private static UserGroupInformation getUgi() {
+ try {
+ return UserGroupInformation.getCurrentUser();
+ } catch (Exception e) {
+ throw new IllegalStateException("Unable to get current user group
information", e);
+ }
+ }
}
diff --git
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java
index fb304c4ab3..f989c3d138 100644
---
a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java
+++
b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStoreFactoryOptions.java
@@ -46,4 +46,12 @@ public class GravitinoCatalogStoreFactoryOptions {
.mapType()
.defaultValue(ImmutableMap.of())
.withDescription("The config of Gravitino client");
+
+ public static final String AUTH_TYPE = "gravitino.client.auth.type";
+
+ // OAuth2 config keys
+ public static final String OAUTH2_SERVER_URI =
"gravitino.client.oauth2.serverUri";
+ public static final String OAUTH2_CREDENTIAL =
"gravitino.client.oauth2.credential";
+ public static final String OAUTH2_TOKEN_PATH =
"gravitino.client.oauth2.tokenPath";
+ public static final String OAUTH2_SCOPE = "gravitino.client.oauth2.scope";
}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
index 404d220aea..b275d81a78 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java
@@ -68,7 +68,7 @@ public abstract class FlinkEnvIT extends BaseIT {
protected static FileSystem hdfs;
- private static String gravitinoUri = "http://127.0.0.1:8090";
+ protected static String gravitinoUri = "http://localhost:8090";
private final String lakeHouseIcebergProvider = "lakehouse-iceberg";
@@ -143,7 +143,7 @@ public abstract class FlinkEnvIT extends BaseIT {
private void initGravitinoEnv() {
// Gravitino server is already started by AbstractIT, just construct
gravitinoUrl
int gravitinoPort = getGravitinoServerPort();
- gravitinoUri = String.format("http://127.0.0.1:%d", gravitinoPort);
+ gravitinoUri = String.format("http://localhost:%d", gravitinoPort);
if (lakeHouseIcebergProvider.equalsIgnoreCase(getProvider())) {
this.icebergRestServiceUri = getIcebergRestServiceUri();
}
@@ -183,7 +183,7 @@ public abstract class FlinkEnvIT extends BaseIT {
}
}
- private static void initFlinkEnv() {
+ protected void initFlinkEnv() {
final Configuration configuration = new Configuration();
configuration.setString(
"table.catalog-store.kind",
GravitinoCatalogStoreFactoryOptions.GRAVITINO);
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
index 8274d166eb..f64170f811 100644
---
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java
@@ -25,6 +25,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.io.IOException;
+import java.security.PrivilegedAction;
import java.util.Arrays;
import java.util.Map;
import java.util.Optional;
@@ -58,6 +59,7 @@ import
org.apache.gravitino.rel.expressions.transforms.Transforms;
import org.apache.gravitino.rel.types.Types;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.security.UserGroupInformation;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@@ -67,6 +69,7 @@ import org.junit.jupiter.api.Test;
@Tag("gravitino-docker-test")
public class FlinkHiveCatalogIT extends FlinkCommonIT {
private static final String DEFAULT_HIVE_CATALOG =
"test_flink_hive_schema_catalog";
+ private static final String FLINK_USER_NAME = "gravitino";
private static org.apache.gravitino.Catalog hiveCatalog;
@@ -103,7 +106,9 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
int numCatalogs = tableEnv.listCatalogs().length;
// Create a new catalog.
+
String catalogName = "gravitino_hive";
+
Configuration configuration = new Configuration();
configuration.set(
CommonCatalogOptions.CATALOG_TYPE,
GravitinoHiveCatalogFactoryOptions.IDENTIFIER);
@@ -115,6 +120,8 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
// Check the catalog properties.
org.apache.gravitino.Catalog gravitinoCatalog =
metalake.loadCatalog(catalogName);
+ Assertions.assertEquals(FLINK_USER_NAME,
gravitinoCatalog.auditInfo().creator());
+
Map<String, String> properties = gravitinoCatalog.properties();
Assertions.assertEquals(hiveMetastoreUri,
properties.get(HiveConstants.METASTORE_URIS));
Map<String, String> flinkProperties =
@@ -605,4 +612,21 @@ public class FlinkHiveCatalogIT extends FlinkCommonIT {
protected boolean supportDropCascade() {
return true;
}
+
+ @Override
+ protected void initFlinkEnv() {
+ try {
+ UserGroupInformation proxyUser =
+ UserGroupInformation.createProxyUser(
+ FLINK_USER_NAME, UserGroupInformation.getCurrentUser());
+ proxyUser.doAs(
+ (PrivilegedAction<Void>)
+ () -> {
+ super.initFlinkEnv();
+ return null;
+ });
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to obtain UGI for Flink user", e);
+ }
+ }
}
diff --git
a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveKerberosClientIT.java
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveKerberosClientIT.java
new file mode 100644
index 0000000000..e87f60af1b
--- /dev/null
+++
b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveKerberosClientIT.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.flink.connector.integration.test.hive;
+
+import static org.apache.gravitino.server.authentication.KerberosConfig.KEYTAB;
+import static
org.apache.gravitino.server.authentication.KerberosConfig.PRINCIPAL;
+import static org.apache.hadoop.minikdc.MiniKdc.MAX_TICKET_LIFETIME;
+
+import com.google.common.collect.Maps;
+import java.io.File;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CommonCatalogOptions;
+import org.apache.flink.table.catalog.hive.factories.HiveCatalogFactoryOptions;
+import org.apache.gravitino.Configs;
+import org.apache.gravitino.auth.AuthenticatorType;
+import org.apache.gravitino.catalog.hive.HiveConstants;
+import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalog;
+import
org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions;
+import org.apache.gravitino.flink.connector.integration.test.FlinkEnvIT;
+import
org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions;
+import org.apache.hadoop.minikdc.KerberosSecurityTestcase;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Integration test for creating Gravitino Hive catalogs with Kerberos
authentication. This test
+ * verifies that catalog creation works correctly when the Gravitino server is
configured with
+ * Kerberos authentication. The test extends FlinkEnvIT directly to keep the
test scope focused on
+ * Kerberos-specific scenarios.
+ */
+@Tag("gravitino-docker-test")
+public class FlinkHiveKerberosClientIT extends FlinkEnvIT {
+
+ private static final KerberosSecurityTestcase kdc =
+ new KerberosSecurityTestcase() {
+ @Override
+ public void createMiniKdcConf() {
+ super.createMiniKdcConf();
+ // Use a very short ticket lifetime to speed up Kerberos expiration
in tests.
+ // The test operations are executed immediately after the MiniKdc is
started and
+ // are not long-running, so a 5-second lifetime is sufficient and
keeps the test
+ // fast. If this test ever becomes flaky on slower environments or
CI systems,
+ // consider increasing this value to a more forgiving lifetime (for
example, 60s).
+ getConf().setProperty(MAX_TICKET_LIFETIME, "5");
+ }
+ };
+
+ private static final String keytabFile =
+ new File(System.getProperty("test.dir", "target"),
UUID.randomUUID().toString())
+ .getAbsolutePath();
+
+ // Server principals for Gravitino server
+ private static final String serverPrincipal = "HTTP/[email protected]";
+ private static final String serverPrincipalWithAll =
"HTTP/[email protected]";
+
+ // Client principal for authentication
+ private static final String clientPrincipal = "[email protected]";
+
+ @Override
+ protected String getProvider() {
+ return "hive";
+ }
+
+ @Override
+ @BeforeAll
+ public void startIntegrationTest() throws Exception {
+ // Set up Kerberos before starting integration test
+ kdc.startMiniKdc();
+ initKeyTab();
+
+ // Configure a Gravitino server with Kerberos
+ Map<String, String> configs = Maps.newHashMap();
+ configs.put(Configs.AUTHENTICATORS.getKey(),
AuthenticatorType.KERBEROS.name().toLowerCase());
+ configs.put(PRINCIPAL.getKey(), serverPrincipal);
+ configs.put(KEYTAB.getKey(), keytabFile);
+ configs.put("client.kerberos.principal", clientPrincipal);
+ configs.put("client.kerberos.keytab", keytabFile);
+
+ registerCustomConfigs(configs);
+
+ // Start the integration test (starts Gravitino server)
+ super.startIntegrationTest();
+ }
+
+ @Override
+ @AfterAll
+ public void stopIntegrationTest() {
+ try {
+ super.stopIntegrationTest();
+ UserGroupInformation.setConfiguration(new
org.apache.hadoop.conf.Configuration(false));
+ kdc.stopMiniKdc();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to stop Kerberos test", e);
+ }
+ }
+
+ @Override
+ protected void initFlinkEnv() {
+ // Initialize Kerberos authentication for Hadoop UGI.
+ // This approach follows Flink's HadoopLoginModule pattern for Kerberos
authentication,
+ // which performs a login from keytab to establish a secure context before
creating
+ // Flink table environment. See
org.apache.flink.runtime.security.modules.HadoopLoginModule
+ // in Flink source code for reference.
+ org.apache.hadoop.conf.Configuration hadoopConfig = new
org.apache.hadoop.conf.Configuration();
+
+ hadoopConfig.set("hadoop.security.authentication", "kerberos");
+ hadoopConfig.set("hadoop.security.auth_to_local",
"RULE:[1:$1@$0](.*@EXAMPLE.COM)s/@.*//");
+
+ UserGroupInformation.setConfiguration(hadoopConfig);
+ UserGroupInformation proxyUser = null;
+ try {
+ proxyUser =
UserGroupInformation.loginUserFromKeytabAndReturnUGI(clientPrincipal,
keytabFile);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to obtain UGI for Kerberos user", e);
+ }
+
+ final Configuration configuration = new Configuration();
+ configuration.setString(
+ "table.catalog-store.kind",
GravitinoCatalogStoreFactoryOptions.GRAVITINO);
+
configuration.setString("table.catalog-store.gravitino.gravitino.metalake",
GRAVITINO_METALAKE);
+ configuration.setString("table.catalog-store.gravitino.gravitino.uri",
gravitinoUri);
+ EnvironmentSettings.Builder builder =
+ EnvironmentSettings.newInstance().withConfiguration(configuration);
+ tableEnv =
+ proxyUser.doAs(
+ (PrivilegedAction<TableEnvironment>)
+ () -> TableEnvironment.create(builder.inBatchMode().build()));
+ }
+
+ @Test
+ public void testCreateGravitinoHiveCatalogWithKerberosAuth() {
+ tableEnv.useCatalog(DEFAULT_CATALOG);
+ int numCatalogs = tableEnv.listCatalogs().length;
+
+ // Create a new catalog with Kerberos authentication using SQL
+ String catalogName = "gravitino_hive_kerberos";
+ tableEnv.executeSql(
+ String.format(
+ "CREATE CATALOG %s WITH ("
+ + "'type'='gravitino-hive', "
+ + "'hive-conf-dir'='src/test/resources/flink-tests',"
+ + "'hive.metastore.uris'='%s'"
+ + ")",
+ catalogName, hiveMetastoreUri));
+
+ // Verify catalog exists in Gravitino
+ Assertions.assertTrue(metalake.catalogExists(catalogName));
+
+ // Check the catalog properties
+ org.apache.gravitino.Catalog gravitinoCatalog =
metalake.loadCatalog(catalogName);
+ Assertions.assertNotNull(gravitinoCatalog);
+ Assertions.assertEquals(catalogName, gravitinoCatalog.name());
+
+ // Assert creator is the Kerberos-authenticated client principal
+ Assertions.assertEquals("client", gravitinoCatalog.auditInfo().creator());
+
+ Map<String, String> properties = gravitinoCatalog.properties();
+ Assertions.assertEquals(hiveMetastoreUri,
properties.get(HiveConstants.METASTORE_URIS));
+
+ // Verify Flink-specific properties are stored correctly
+ Map<String, String> flinkProperties =
+ properties.entrySet().stream()
+ .filter(e ->
e.getKey().startsWith(PropertiesConverter.FLINK_PROPERTY_PREFIX))
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ Assertions.assertEquals(2, flinkProperties.size());
+ Assertions.assertEquals(
+ "src/test/resources/flink-tests",
+
flinkProperties.get(flinkByPass(HiveCatalogFactoryOptions.HIVE_CONF_DIR.key())));
+ Assertions.assertEquals(
+ GravitinoHiveCatalogFactoryOptions.IDENTIFIER,
+
flinkProperties.get(flinkByPass(CommonCatalogOptions.CATALOG_TYPE.key())));
+
+ // Get the created catalog
+ Optional<Catalog> catalog = tableEnv.getCatalog(catalogName);
+ Assertions.assertTrue(catalog.isPresent());
+ Assertions.assertInstanceOf(GravitinoHiveCatalog.class, catalog.get());
+
+ // List catalogs
+ String[] catalogs = tableEnv.listCatalogs();
+ Assertions.assertEquals(numCatalogs + 1, catalogs.length, "Should create a
new catalog");
+ Assertions.assertTrue(
+ Arrays.asList(catalogs).contains(catalogName), "Should create the
correct catalog.");
+
+ // Use the catalog
+ tableEnv.useCatalog(catalogName);
+ Assertions.assertEquals(
+ catalogName,
+ tableEnv.getCurrentCatalog(),
+ "Current catalog should be the Kerberos-authenticated catalog.");
+
+ // Drop the catalog
+ tableEnv.useCatalog(DEFAULT_CATALOG);
+ tableEnv.executeSql("DROP CATALOG " + catalogName);
+ Assertions.assertFalse(metalake.catalogExists(catalogName));
+
+ Optional<Catalog> droppedCatalog = tableEnv.getCatalog(catalogName);
+ Assertions.assertFalse(droppedCatalog.isPresent(), "Catalog should be
dropped");
+ }
+
+ private static void initKeyTab() throws Exception {
+ File newKeytabFile = new File(keytabFile);
+ String newClientPrincipal = removeRealm(clientPrincipal);
+ String newServerPrincipal = removeRealm(serverPrincipal);
+ String newServerPrincipalAll = removeRealm(serverPrincipalWithAll);
+ kdc.getKdc()
+ .createPrincipal(
+ newKeytabFile, newClientPrincipal, newServerPrincipal,
newServerPrincipalAll);
+ }
+
+ private static String removeRealm(String principal) {
+ return principal.substring(0, principal.lastIndexOf("@"));
+ }
+}
diff --git
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/BaseIT.java
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/BaseIT.java
index 0cbf803655..b39bf40662 100644
---
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/BaseIT.java
+++
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/util/BaseIT.java
@@ -57,6 +57,7 @@ import org.apache.gravitino.Configs;
import org.apache.gravitino.auth.AuthenticatorType;
import org.apache.gravitino.auxiliary.AuxiliaryServiceManager;
import org.apache.gravitino.client.GravitinoAdminClient;
+import org.apache.gravitino.client.KerberosTokenProvider;
import org.apache.gravitino.config.ConfigConstants;
import org.apache.gravitino.integration.test.MiniGravitino;
import org.apache.gravitino.integration.test.MiniGravitinoContext;
@@ -138,6 +139,21 @@ public class BaseIT {
customConfigs.putAll(configs);
}
+ /**
+ * Creates a KerberosTokenProvider with the given principal and keytab file
path.
+ *
+ * @param principal The Kerberos principal (e.g., "[email protected]")
+ * @param keytabPath The path to the keytab file
+ * @return A configured KerberosTokenProvider instance
+ */
+ protected static KerberosTokenProvider createKerberosTokenProvider(
+ String principal, String keytabPath) {
+ return KerberosTokenProvider.builder()
+ .withClientPrincipal(principal)
+ .withKeyTabFile(new File(keytabPath))
+ .build();
+ }
+
protected int getLanceRESTServerPort() {
JettyServerConfig lanceServerConfig =
JettyServerConfig.fromConfig(serverConfig, LANCE_CONFIG_PREFIX);
@@ -406,7 +422,20 @@ public class BaseIT {
}
} else if
(authenticators.contains(AuthenticatorType.KERBEROS.name().toLowerCase())) {
serverUri = "http://localhost:" + jettyServerConfig.getHttpPort();
- client = null;
+ // Get Kerberos configuration from custom configs
+ String principal = customConfigs.get("client.kerberos.principal");
+ String keytabPath = customConfigs.get("client.kerberos.keytab");
+
+ if (principal != null && keytabPath != null) {
+ KerberosTokenProvider kerberosTokenProvider =
+ createKerberosTokenProvider(principal, keytabPath);
+ client =
+
GravitinoAdminClient.builder(serverUri).withKerberosAuth(kerberosTokenProvider).build();
+ } else {
+ LOG.warn(
+ "Kerberos authentication configured but principal or keytab not
provided. Client will be null.");
+ client = null;
+ }
} else {
client = GravitinoAdminClient.builder(serverUri).build();
}