This is an automated email from the ASF dual-hosted git repository.

yuqi4733 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 5188c3e466 [#8952] feat (fileset): Fileset support multiple clusters 
(#8777)
5188c3e466 is described below

commit 5188c3e4660d0132a552d04b0dec223aab02761e
Author: Yuhui <[email protected]>
AuthorDate: Mon Dec 1 17:00:11 2025 +0800

    [#8952] feat (fileset): Fileset support multiple clusters (#8777)
    
    <!--
    1. Title: [#<issue>] <type>(<scope>): <subject>
       Examples:
         - "[#123] feat(operator): support xxx"
         - "[#233] fix: check null before access result in xxx"
         - "[MINOR] refactor: fix typo in variable name"
         - "[MINOR] docs: fix typo in README"
         - "[#255] test: fix flaky test NameOfTheTest"
       Reference: https://www.conventionalcommits.org/en/v1.0.0/
    2. If the PR is unfinished, please mark this PR as draft.
    -->
    
    ### What changes were proposed in this pull request?
    
    Support multiple clusters in one fileset catalog, with different
    filesets located in more than one cluster. Each fileset can be
    configured to use a different cluster.
    
    ### Why are the changes needed?
    
    Fix: #8952
    
    ### Does this PR introduce _any_ user-facing change?
    
    Add documents
    
    ### How was this patch tested?
    
    Add ITs
---
 .../catalog/fileset/FilesetCatalogOperations.java  |  18 +-
 .../fileset/SecureFilesetCatalogOperations.java    |  43 +++-
 .../authentication/AuthenticationConfig.java       |  11 +-
 .../authentication/KerberosUserContext.java        |   1 -
 .../fileset/authentication/UserContext.java        |  60 +----
 .../authentication/kerberos/KerberosClient.java    |  12 +-
 .../authentication/kerberos/KerberosConfig.java    |  16 +-
 .../fileset/TestFilesetCatalogOperations.java      |  70 +++---
 .../test/HadoopUserImpersonationIT.java            |   4 +-
 .../catalog/hive/HiveCatalogOperations.java        |   4 +-
 .../integration/test/HiveUserAuthenticationIT.java |   4 +-
 .../test/HudiCatalogKerberosHiveIT.java            |   4 +-
 .../test/CatalogIcebergKerberosHiveIT.java         |   4 +-
 .../test/CatalogPaimonKerberosFilesystemIT.java    |   4 +-
 .../gravitino/catalog/hadoop/fs/Constants.java     |  19 ++
 .../catalog/hadoop/fs/FileSystemUtils.java         |  26 +-
 .../catalog/hadoop/fs/HDFSFileSystemProvider.java  |   5 +-
 clients/filesystem-hadoop3/build.gradle.kts        |   1 +
 .../filesystem/hadoop/BaseGVFSOperations.java      | 181 ++++++++++----
 .../filesystem/hadoop/FilesetMetadataCache.java    |  34 +++
 .../GravitinoVirtualFileSystemConfiguration.java   |   3 +
 .../hadoop/GravitinoVirtualFileSystemUtils.java    |  23 ++
 .../filesystem/hadoop/HDFSFileSystemProxy.java     | 205 ++++++++++++++++
 .../hadoop/TestBaseGVFSOperationsUserConfigs.java  |  90 +++++++
 .../gravitino/filesystem/hadoop/TestGvfsBase.java  |  32 +++
 .../integration/test/GvfsMultipleClusterIT.java    | 261 +++++++++++++++++++++
 .../src/test/resources/hd_kbs_conf/core-site.xml   |  36 +++
 .../src/test/resources/hd_kbs_conf/hdfs-site.xml   |  24 ++
 .../src/main/java/org/apache/gravitino/Config.java |   3 +-
 .../cache/it/MixEntityStorageBenchmark.java        |   1 -
 docs/fileset-catalog.md                            | 106 ++++++---
 docs/how-to-use-gvfs.md                            |  37 ++-
 docs/manage-fileset-metadata-using-gravitino.md    | 119 ++++++++++
 .../test/IcebergRestKerberosHiveCatalogIT.java     |   4 +-
 .../integration/test/container/HiveContainer.java  |  10 +-
 35 files changed, 1253 insertions(+), 222 deletions(-)

diff --git 
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
 
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
index 0aed858430..8b3f2cb140 100644
--- 
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
+++ 
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/FilesetCatalogOperations.java
@@ -18,7 +18,6 @@
  */
 package org.apache.gravitino.catalog.fileset;
 
-import static org.apache.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;
 import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
 import static org.apache.gravitino.file.Fileset.PROPERTY_CATALOG_PLACEHOLDER;
 import static org.apache.gravitino.file.Fileset.PROPERTY_DEFAULT_LOCATION_NAME;
@@ -200,12 +199,6 @@ public class FilesetCatalogOperations extends 
ManagedSchemaOperations
     return catalogInfo;
   }
 
-  public Configuration getHadoopConf() {
-    Configuration configuration = new Configuration();
-    conf.forEach((k, v) -> configuration.set(k.replace(CATALOG_BYPASS_PREFIX, 
""), v));
-    return configuration;
-  }
-
   public Map<String, String> getConf() {
     return conf;
   }
@@ -467,13 +460,18 @@ public class FilesetCatalogOperations extends 
ManagedSchemaOperations
       try {
         // formalize the path to avoid path without scheme, uri, authority, 
etc.
         for (Map.Entry<String, Path> entry : filesetPaths.entrySet()) {
-
-          FileSystem tmpFs = getFileSystemWithCache(entry.getValue(), conf);
+          // merge the properties from catalog, schema and fileset to get the 
final configuration
+          // for fileset.
+          // the priority is: fileset properties > schema properties > catalog 
properties
+          Map<String, String> fsConf = new HashMap<>(conf);
+          fsConf.putAll(schemaEntity.properties());
+          fsConf.putAll(properties);
+          FileSystem tmpFs = getFileSystemWithCache(entry.getValue(), fsConf);
           Path formalizePath =
               entry.getValue().makeQualified(tmpFs.getUri(), 
tmpFs.getWorkingDirectory());
 
           filesetPathsBuilder.put(entry.getKey(), formalizePath);
-          FileSystem fs = getFileSystemWithCache(formalizePath, conf);
+          FileSystem fs = getFileSystemWithCache(formalizePath, fsConf);
 
           if (fs.exists(formalizePath) && 
fs.getFileStatus(formalizePath).isFile()) {
             throw new IllegalArgumentException(
diff --git 
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/SecureFilesetCatalogOperations.java
 
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/SecureFilesetCatalogOperations.java
index bdc7f48735..b6d5e9cf0c 100644
--- 
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/SecureFilesetCatalogOperations.java
+++ 
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/SecureFilesetCatalogOperations.java
@@ -24,10 +24,9 @@ import static 
org.apache.gravitino.file.Fileset.PROPERTY_DEFAULT_LOCATION_NAME;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.io.IOException;
-import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Collectors;
 import javax.security.auth.Subject;
@@ -103,11 +102,7 @@ public class SecureFilesetCatalogOperations
       throws RuntimeException {
     filesetCatalogOperations.initialize(config, info, propertiesMetadata);
     this.catalogUserContext =
-        UserContext.getUserContext(
-            NameIdentifier.of(info.namespace(), info.name()),
-            config,
-            filesetCatalogOperations.getHadoopConf(),
-            info);
+        UserContext.getUserContext(NameIdentifier.of(info.namespace(), 
info.name()), config, info);
     this.catalogProperties = info.properties();
   }
 
@@ -126,9 +121,10 @@ public class SecureFilesetCatalogOperations
       throws NoSuchSchemaException, FilesetAlreadyExistsException {
     String apiUser = PrincipalUtils.getCurrentUserName();
 
+    Map<String, String> filesetProperties = mergeUpLevelConfigurations(ident, 
properties);
     UserContext userContext =
         UserContext.getUserContext(
-            ident, properties, null, 
filesetCatalogOperations.getCatalogInfo());
+            ident, filesetProperties, 
filesetCatalogOperations.getCatalogInfo());
     return userContext.doAs(
         () -> {
           try {
@@ -157,21 +153,42 @@ public class SecureFilesetCatalogOperations
       throw new RuntimeException("Failed to delete fileset " + ident, ioe);
     }
 
+    Map<String, String> filesetProperties =
+        mergeUpLevelConfigurations(ident, filesetEntity.properties());
     UserContext userContext =
         UserContext.getUserContext(
-            ident, filesetEntity.properties(), null, 
filesetCatalogOperations.getCatalogInfo());
+            ident, filesetProperties, 
filesetCatalogOperations.getCatalogInfo());
     boolean r = userContext.doAs(() -> 
filesetCatalogOperations.dropFileset(ident), ident);
     UserContext.clearUserContext(ident);
     return r;
   }
 
+  public Map<String, String> mergeUpLevelConfigurations(
+      NameIdentifier ident, Map<String, String> entityProperties) {
+    Map<String, String> mergedProperties = new 
HashMap<>(filesetCatalogOperations.getConf());
+    if (ident.namespace().levels().length == 2) {
+      // schema level
+      mergedProperties.putAll(entityProperties);
+      return mergedProperties;
+    }
+
+    // fileset level
+    NameIdentifierUtil.checkFileset(ident);
+    NameIdentifier schemaIdent = NameIdentifierUtil.getSchemaIdentifier(ident);
+    Schema schema = filesetCatalogOperations.loadSchema(schemaIdent);
+    mergedProperties.putAll(schema.properties());
+    mergedProperties.putAll(entityProperties);
+    return mergedProperties;
+  }
+
   @Override
   public Schema createSchema(NameIdentifier ident, String comment, Map<String, 
String> properties)
       throws NoSuchCatalogException, SchemaAlreadyExistsException {
+    Map<String, String> schemaProperties = mergeUpLevelConfigurations(ident, 
properties);
     String apiUser = PrincipalUtils.getCurrentUserName();
     UserContext userContext =
         UserContext.getUserContext(
-            ident, properties, null, 
filesetCatalogOperations.getCatalogInfo());
+            ident, schemaProperties, 
filesetCatalogOperations.getCatalogInfo());
     return userContext.doAs(
         () -> {
           try {
@@ -189,12 +206,12 @@ public class SecureFilesetCatalogOperations
     try {
       SchemaEntity schemaEntity =
           filesetCatalogOperations.store().get(ident, 
Entity.EntityType.SCHEMA, SchemaEntity.class);
-      Map<String, String> properties =
-          
Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap());
 
+      Map<String, String> schemaProperties =
+          mergeUpLevelConfigurations(ident, schemaEntity.properties());
       UserContext userContext =
           UserContext.getUserContext(
-              ident, properties, null, 
filesetCatalogOperations.getCatalogInfo());
+              ident, schemaProperties, 
filesetCatalogOperations.getCatalogInfo());
       boolean r =
           userContext.doAs(() -> filesetCatalogOperations.dropSchema(ident, 
cascade), ident);
       UserContext.clearUserContext(ident);
diff --git 
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/authentication/AuthenticationConfig.java
 
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/authentication/AuthenticationConfig.java
index ea7778aa3c..d70e1bae2f 100644
--- 
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/authentication/AuthenticationConfig.java
+++ 
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/authentication/AuthenticationConfig.java
@@ -19,6 +19,8 @@
 
 package org.apache.gravitino.catalog.fileset.authentication;
 
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.gravitino.Config;
@@ -26,6 +28,7 @@ import org.apache.gravitino.config.ConfigBuilder;
 import org.apache.gravitino.config.ConfigConstants;
 import org.apache.gravitino.config.ConfigEntry;
 import org.apache.gravitino.connector.PropertyEntry;
+import org.apache.hadoop.conf.Configuration;
 
 public class AuthenticationConfig extends Config {
 
@@ -45,11 +48,17 @@ public class AuthenticationConfig extends Config {
 
   public static final boolean KERBEROS_DEFAULT_IMPERSONATION_ENABLE = false;
 
-  public AuthenticationConfig(Map<String, String> properties) {
+  public AuthenticationConfig(Map<String, String> properties, Configuration 
configuration) {
     super(false);
+    loadFromHdfsConfiguration(configuration);
     loadFromMap(properties, k -> true);
   }
 
+  private void loadFromHdfsConfiguration(Configuration configuration) {
+    String authType = configuration.get(HADOOP_SECURITY_AUTHENTICATION, 
"simple");
+    configMap.put(AUTH_TYPE_KEY, authType);
+  }
+
   public static final ConfigEntry<String> AUTH_TYPE_ENTRY =
       new ConfigBuilder(AUTH_TYPE_KEY)
           .doc(
diff --git 
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/authentication/KerberosUserContext.java
 
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/authentication/KerberosUserContext.java
index 03a428f6cd..2d04defa85 100644
--- 
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/authentication/KerberosUserContext.java
+++ 
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/authentication/KerberosUserContext.java
@@ -58,7 +58,6 @@ public class KerberosUserContext extends UserContext {
         HADOOP_SECURITY_AUTHENTICATION,
         AuthenticationMethod.KERBEROS.name().toLowerCase(Locale.ROOT));
     try {
-      UserGroupInformation.setConfiguration(configuration);
       KerberosClient client = new KerberosClient(properties, configuration, 
refreshCredentials);
       // Add the kerberos client to the closable to close resources.
       this.kerberosClient = client;
diff --git 
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/authentication/UserContext.java
 
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/authentication/UserContext.java
index 9985c8164d..5837b2823c 100644
--- 
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/authentication/UserContext.java
+++ 
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/authentication/UserContext.java
@@ -20,9 +20,9 @@
 package org.apache.gravitino.catalog.fileset.authentication;
 
 import static 
org.apache.gravitino.catalog.fileset.SecureFilesetCatalogOperations.GRAVITINO_KEYTAB_FORMAT;
-import static 
org.apache.gravitino.catalog.fileset.authentication.AuthenticationConfig.AUTH_TYPE_ENTRY;
 import static 
org.apache.gravitino.catalog.fileset.authentication.AuthenticationConfig.ENABLE_IMPERSONATION_ENTRY;
 import static 
org.apache.gravitino.catalog.fileset.authentication.AuthenticationConfig.IMPERSONATION_ENABLE_KEY;
+import static 
org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider.GRAVITINO_BYPASS;
 
 import com.google.common.collect.Maps;
 import java.io.Closeable;
@@ -34,6 +34,7 @@ import java.security.PrivilegedExceptionAction;
 import java.util.Map;
 import org.apache.gravitino.NameIdentifier;
 import 
org.apache.gravitino.catalog.fileset.authentication.AuthenticationConfig.AuthenticationType;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
 import org.apache.gravitino.connector.CatalogInfo;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -63,50 +64,20 @@ public abstract class UserContext implements Closeable {
   }
 
   public static UserContext getUserContext(
-      NameIdentifier nameIdentifier,
-      Map<String, String> properties,
-      Configuration configuration,
-      CatalogInfo catalogInfo) {
-    // Try to get the parent user context.
-    NameIdentifier currentNameIdentifier = 
NameIdentifier.of(nameIdentifier.namespace().levels());
-    UserContext parentContext = null;
-    while (!currentNameIdentifier.namespace().isEmpty()) {
-      if (userContextMap.containsKey(currentNameIdentifier)) {
-        parentContext = userContextMap.get(currentNameIdentifier);
-        break;
-      }
-      currentNameIdentifier = 
NameIdentifier.of(currentNameIdentifier.namespace().levels());
-    }
-
-    if (configuration == null) {
-      configuration = new Configuration();
-    }
-    AuthenticationConfig authenticationConfig = new 
AuthenticationConfig(properties);
+      NameIdentifier nameIdentifier, Map<String, String> properties, 
CatalogInfo catalogInfo) {
+    Configuration configuration = 
FileSystemUtils.createConfiguration(GRAVITINO_BYPASS, properties);
+    AuthenticationConfig authenticationConfig = new 
AuthenticationConfig(properties, configuration);
 
     // If we do not set the impersonation, we will use the parent context;
     boolean enableUserImpersonation = 
ENABLE_IMPERSONATION_ENTRY.getDefaultValue();
     if (properties.containsKey(IMPERSONATION_ENABLE_KEY)) {
       enableUserImpersonation = authenticationConfig.isImpersonationEnabled();
-    } else if (parentContext != null) {
-      enableUserImpersonation = parentContext.enableUserImpersonation();
     }
 
     AuthenticationType authenticationType =
-        AuthenticationType.fromString(AUTH_TYPE_ENTRY.getDefaultValue());
-    // If we do not set the authentication type explicitly, we will use the 
parent context. If the
-    // parent is null, then we will use the default value.
-    if (properties.containsKey(AuthenticationConfig.AUTH_TYPE_KEY)) {
-      authenticationType =
-          authenticationConfig.isSimpleAuth()
-              ? AuthenticationType.SIMPLE
-              : AuthenticationType.KERBEROS;
-
-    } else if (parentContext != null) {
-      authenticationType =
-          parentContext instanceof SimpleUserContext
-              ? AuthenticationType.SIMPLE
-              : AuthenticationType.KERBEROS;
-    }
+        authenticationConfig.isSimpleAuth()
+            ? AuthenticationType.SIMPLE
+            : AuthenticationType.KERBEROS;
 
     UserGroupInformation currentUser;
     try {
@@ -116,29 +87,18 @@ public abstract class UserContext implements Closeable {
     }
 
     if (authenticationType == AuthenticationType.SIMPLE) {
-      UserGroupInformation userGroupInformation =
-          parentContext != null ? parentContext.getUser() : currentUser;
       SimpleUserContext simpleUserContext =
-          new SimpleUserContext(userGroupInformation, enableUserImpersonation);
+          new SimpleUserContext(currentUser, enableUserImpersonation);
       addUserContext(nameIdentifier, simpleUserContext);
       return simpleUserContext;
     } else if (authenticationType == AuthenticationType.KERBEROS) {
-      // if the kerberos authentication is inherited from the parent context, 
we will use the
-      // parent context's kerberos configuration.
-      if (parentContext != null && authenticationConfig.isSimpleAuth()) {
-        KerberosUserContext kerberosUserContext = ((KerberosUserContext) 
parentContext).deepCopy();
-        
kerberosUserContext.setEnableUserImpersonation(enableUserImpersonation);
-        addUserContext(nameIdentifier, kerberosUserContext);
-        return kerberosUserContext;
-      }
-
       String keytabPath =
           String.format(
               GRAVITINO_KEYTAB_FORMAT,
               catalogInfo.id() + "-" + nameIdentifier.toString().replace(".", 
"-"));
       KerberosUserContext kerberosUserContext =
           new KerberosUserContext(enableUserImpersonation, keytabPath);
-      kerberosUserContext.initKerberos(properties, configuration, 
parentContext == null);
+      kerberosUserContext.initKerberos(properties, configuration, true);
       addUserContext(nameIdentifier, kerberosUserContext);
       return kerberosUserContext;
     } else {
diff --git 
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/authentication/kerberos/KerberosClient.java
 
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/authentication/kerberos/KerberosClient.java
index 9a839f2ac0..09e6405177 100644
--- 
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/authentication/kerberos/KerberosClient.java
+++ 
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/authentication/kerberos/KerberosClient.java
@@ -19,6 +19,9 @@
 
 package org.apache.gravitino.catalog.fileset.authentication.kerberos;
 
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.HADOOP_KRB5_CONF;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.SECURITY_KRB5_ENV;
+
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
@@ -52,7 +55,7 @@ public class KerberosClient implements Closeable {
   }
 
   public String login(String keytabFilePath) throws IOException {
-    KerberosConfig kerberosConfig = new KerberosConfig(conf);
+    KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf);
 
     // Check the principal and keytab file
     String catalogPrincipal = kerberosConfig.getPrincipalName();
@@ -63,6 +66,11 @@ public class KerberosClient implements Closeable {
     Preconditions.checkArgument(
         principalComponents.size() == 2, "The principal has the wrong format");
 
+    String krb5Config = hadoopConf.get(HADOOP_KRB5_CONF);
+    if (krb5Config != null) {
+      System.setProperty(SECURITY_KRB5_ENV, krb5Config);
+    }
+
     // Login
     UserGroupInformation.setConfiguration(hadoopConf);
     UserGroupInformation.loginUserFromKeytab(catalogPrincipal, keytabFilePath);
@@ -89,7 +97,7 @@ public class KerberosClient implements Closeable {
   }
 
   public File saveKeyTabFileFromUri(String keytabPath) throws IOException {
-    KerberosConfig kerberosConfig = new KerberosConfig(conf);
+    KerberosConfig kerberosConfig = new KerberosConfig(conf, hadoopConf);
 
     String keyTabUri = kerberosConfig.getKeytab();
     Preconditions.checkArgument(StringUtils.isNotBlank(keyTabUri), "Keytab uri 
can't be blank");
diff --git 
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/authentication/kerberos/KerberosConfig.java
 
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/authentication/kerberos/KerberosConfig.java
index b3bb56375f..d9e7fe1d05 100644
--- 
a/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/authentication/kerberos/KerberosConfig.java
+++ 
b/catalogs/catalog-fileset/src/main/java/org/apache/gravitino/catalog/fileset/authentication/kerberos/KerberosConfig.java
@@ -19,6 +19,9 @@
 
 package org.apache.gravitino.catalog.fileset.authentication.kerberos;
 
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.HADOOP_SECURITY_KEYTAB;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.HADOOP_SECURITY_PRINCIPAL;
+
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
@@ -27,6 +30,7 @@ import org.apache.gravitino.config.ConfigBuilder;
 import org.apache.gravitino.config.ConfigConstants;
 import org.apache.gravitino.config.ConfigEntry;
 import org.apache.gravitino.connector.PropertyEntry;
+import org.apache.hadoop.conf.Configuration;
 
 public class KerberosConfig extends AuthenticationConfig {
   public static final String KEY_TAB_URI_KEY = 
"authentication.kerberos.keytab-uri";
@@ -70,11 +74,19 @@ public class KerberosConfig extends AuthenticationConfig {
           .checkValue(value -> value > 0, 
ConfigConstants.POSITIVE_NUMBER_ERROR_MSG)
           .createWithDefault(2);
 
-  public KerberosConfig(Map<String, String> properties) {
-    super(properties);
+  public KerberosConfig(Map<String, String> properties, Configuration 
configuration) {
+    super(properties, configuration);
+    loadFromHdfsConfiguration(configuration);
     loadFromMap(properties, k -> true);
   }
 
+  private void loadFromHdfsConfiguration(Configuration configuration) {
+    String keyTab = configuration.get(HADOOP_SECURITY_KEYTAB, "");
+    configMap.put(KEY_TAB_URI_KEY, keyTab);
+    String principal = configuration.get(HADOOP_SECURITY_PRINCIPAL, "");
+    configMap.put(PRINCIPAL_KEY, principal);
+  }
+
   public String getPrincipalName() {
     return get(PRINCIPAL_ENTRY);
   }
diff --git 
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
 
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
index a9d26fdf6a..6951ced7d6 100644
--- 
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
+++ 
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/TestFilesetCatalogOperations.java
@@ -37,6 +37,7 @@ import static 
org.apache.gravitino.catalog.fileset.FilesetCatalogImpl.FILESET_PR
 import static 
org.apache.gravitino.catalog.fileset.FilesetCatalogImpl.SCHEMA_PROPERTIES_META;
 import static 
org.apache.gravitino.catalog.fileset.FilesetCatalogPropertiesMetadata.DISABLE_FILESYSTEM_OPS;
 import static 
org.apache.gravitino.catalog.fileset.FilesetCatalogPropertiesMetadata.LOCATION;
+import static 
org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider.GRAVITINO_BYPASS;
 import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
 import static org.apache.gravitino.file.Fileset.PROPERTY_DEFAULT_LOCATION_NAME;
 import static 
org.apache.gravitino.file.Fileset.PROPERTY_MULTIPLE_LOCATIONS_PREFIX;
@@ -79,6 +80,7 @@ import org.apache.gravitino.UserPrincipal;
 import org.apache.gravitino.audit.CallerContext;
 import org.apache.gravitino.audit.FilesetAuditConstants;
 import org.apache.gravitino.audit.FilesetDataOperation;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
 import org.apache.gravitino.connector.CatalogInfo;
 import org.apache.gravitino.connector.HasPropertyMetadata;
 import org.apache.gravitino.connector.PropertiesMetadata;
@@ -350,7 +352,7 @@ public class TestFilesetCatalogOperations {
 
     CatalogInfo catalogInfo = randomCatalogInfo();
     ops.initialize(emptyProps, catalogInfo, FILESET_PROPERTIES_METADATA);
-    Configuration conf = ops.getHadoopConf();
+    Configuration conf = FileSystemUtils.createConfiguration(GRAVITINO_BYPASS, 
ops.getConf());
     String value = conf.get("fs.defaultFS");
     Assertions.assertEquals("file:///", value);
 
@@ -1145,7 +1147,7 @@ public class TestFilesetCatalogOperations {
   }
 
   @Test
-  public void testFormalizePath() throws IOException, IllegalAccessException {
+  public void testFormalizePath() throws IOException, IllegalAccessException, 
InterruptedException {
 
     String[] paths =
         new String[] {"tmp/catalog", "/tmp/catalog", "file:/tmp/catalog", 
"file:///tmp/catalog"};
@@ -1724,21 +1726,6 @@ public class TestFilesetCatalogOperations {
                   createMultiLocationSchema("s1", "comment", 
ImmutableMap.of(), illegalLocations));
       Assertions.assertEquals("Location name must not be blank", 
exception.getMessage());
 
-      // empty location name in storage location
-      exception =
-          Assertions.assertThrows(
-              IllegalArgumentException.class,
-              () ->
-                  createMultiLocationFileset(
-                      "fileset_test",
-                      "s1",
-                      null,
-                      Fileset.Type.MANAGED,
-                      ImmutableMap.of(),
-                      ImmutableMap.of("", TEST_ROOT_PATH + "/fileset31"),
-                      null));
-      Assertions.assertEquals("Location name must not be blank", 
exception.getMessage());
-
       // empty location in catalog location
       Map<String, String> illegalLocations2 =
           new HashMap<String, String>() {
@@ -1767,23 +1754,6 @@ public class TestFilesetCatalogOperations {
       Assertions.assertEquals(
           "The value of the schema property location must not be blank", 
exception.getMessage());
 
-      // empty fileset storage location
-      exception =
-          Assertions.assertThrows(
-              IllegalArgumentException.class,
-              () ->
-                  createMultiLocationFileset(
-                      "fileset_test",
-                      "s1",
-                      null,
-                      Fileset.Type.MANAGED,
-                      ImmutableMap.of(),
-                      ImmutableMap.of("location1", ""),
-                      null));
-      Assertions.assertEquals(
-          "Storage location must not be blank for location name: location1",
-          exception.getMessage());
-
       // storage location is parent of schema location
       Schema multipLocationSchema =
           createMultiLocationSchema(
@@ -1810,6 +1780,38 @@ public class TestFilesetCatalogOperations {
               .contains(
                   "The fileset property default-location-name must be set and 
must be one of the fileset locations"),
           "Exception message: " + exception.getMessage());
+
+      // empty location name in storage location
+      exception =
+          Assertions.assertThrows(
+              IllegalArgumentException.class,
+              () ->
+                  createMultiLocationFileset(
+                      "fileset_test",
+                      "s1",
+                      null,
+                      Fileset.Type.MANAGED,
+                      ImmutableMap.of(),
+                      ImmutableMap.of("", TEST_ROOT_PATH + "/fileset31"),
+                      null));
+      Assertions.assertEquals("Location name must not be blank", 
exception.getMessage());
+
+      // empty fileset storage location
+      exception =
+          Assertions.assertThrows(
+              IllegalArgumentException.class,
+              () ->
+                  createMultiLocationFileset(
+                      "fileset_test",
+                      "s1",
+                      null,
+                      Fileset.Type.MANAGED,
+                      ImmutableMap.of(),
+                      ImmutableMap.of("location1", ""),
+                      null));
+      Assertions.assertEquals(
+          "Storage location must not be blank for location name: location1",
+          exception.getMessage());
     }
   }
 
diff --git 
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/integration/test/HadoopUserImpersonationIT.java
 
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/integration/test/HadoopUserImpersonationIT.java
index f00f130fe4..10665e3872 100644
--- 
a/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/integration/test/HadoopUserImpersonationIT.java
+++ 
b/catalogs/catalog-fileset/src/test/java/org/apache/gravitino/catalog/fileset/integration/test/HadoopUserImpersonationIT.java
@@ -103,8 +103,8 @@ public class HadoopUserImpersonationIT extends BaseIT {
         classRef = Class.forName("sun.security.krb5.Config");
       }
 
-      Method refershMethod = classRef.getMethod("refresh");
-      refershMethod.invoke(null);
+      Method refreshMethod = classRef.getMethod("refresh");
+      refreshMethod.invoke(null);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git 
a/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogOperations.java
 
b/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogOperations.java
index 8334ff0dad..f5f3c5bcaa 100644
--- 
a/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogOperations.java
+++ 
b/catalogs/catalog-hive/src/main/java/org/apache/gravitino/catalog/hive/HiveCatalogOperations.java
@@ -280,8 +280,8 @@ public class HiveCatalogOperations implements 
CatalogOperations, SupportsSchemas
         classRef = Class.forName("sun.security.krb5.Config");
       }
 
-      Method refershMethod = classRef.getMethod("refresh");
-      refershMethod.invoke(null);
+      Method refreshMethod = classRef.getMethod("refresh");
+      refreshMethod.invoke(null);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/HiveUserAuthenticationIT.java
 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/HiveUserAuthenticationIT.java
index 861bb44edf..7c3232f94f 100644
--- 
a/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/HiveUserAuthenticationIT.java
+++ 
b/catalogs/catalog-hive/src/test/java/org/apache/gravitino/catalog/hive/integration/test/HiveUserAuthenticationIT.java
@@ -181,8 +181,8 @@ public class HiveUserAuthenticationIT extends BaseIT {
         classRef = Class.forName("sun.security.krb5.Config");
       }
 
-      Method refershMethod = classRef.getMethod("refresh");
-      refershMethod.invoke(null);
+      Method refreshMethod = classRef.getMethod("refresh");
+      refreshMethod.invoke(null);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git 
a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/integration/test/HudiCatalogKerberosHiveIT.java
 
b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/integration/test/HudiCatalogKerberosHiveIT.java
index 9be5a02bee..e5168729a2 100644
--- 
a/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/integration/test/HudiCatalogKerberosHiveIT.java
+++ 
b/catalogs/catalog-lakehouse-hudi/src/test/java/org/apache/gravitino/catalog/lakehouse/hudi/integration/test/HudiCatalogKerberosHiveIT.java
@@ -171,8 +171,8 @@ public class HudiCatalogKerberosHiveIT extends BaseIT {
         classRef = Class.forName("sun.security.krb5.Config");
       }
 
-      java.lang.reflect.Method refershMethod = classRef.getMethod("refresh");
-      refershMethod.invoke(null);
+      java.lang.reflect.Method refreshMethod = classRef.getMethod("refresh");
+      refreshMethod.invoke(null);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git 
a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergKerberosHiveIT.java
 
b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergKerberosHiveIT.java
index 5591a4f26f..15493d9c47 100644
--- 
a/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergKerberosHiveIT.java
+++ 
b/catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergKerberosHiveIT.java
@@ -194,8 +194,8 @@ public class CatalogIcebergKerberosHiveIT extends BaseIT {
         classRef = Class.forName("sun.security.krb5.Config");
       }
 
-      Method refershMethod = classRef.getMethod("refresh");
-      refershMethod.invoke(null);
+      Method refreshMethod = classRef.getMethod("refresh");
+      refreshMethod.invoke(null);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonKerberosFilesystemIT.java
 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonKerberosFilesystemIT.java
index 0102611420..e10caa5005 100644
--- 
a/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonKerberosFilesystemIT.java
+++ 
b/catalogs/catalog-lakehouse-paimon/src/test/java/org/apache/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonKerberosFilesystemIT.java
@@ -185,8 +185,8 @@ public class CatalogPaimonKerberosFilesystemIT extends 
BaseIT {
         classRef = Class.forName("sun.security.krb5.Config");
       }
 
-      Method refershMethod = classRef.getMethod("refresh");
-      refershMethod.invoke(null);
+      Method refreshMethod = classRef.getMethod("refresh");
+      refreshMethod.invoke(null);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git 
a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/Constants.java
 
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/Constants.java
index 96a4c060bf..bd931a5dc9 100644
--- 
a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/Constants.java
+++ 
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/Constants.java
@@ -27,4 +27,23 @@ public class Constants {
 
   // Name of the built-in HDFS file system provider
   public static final String BUILTIN_HDFS_FS_PROVIDER = "builtin-hdfs";
+
+  // Name of the configuration property for HDFS config resources
+  public static final String CONFIG_RESOURCES = "config.resources";
+  // Name of the configuration property to disable HDFS FileSystem cache
+  public static final String FS_DISABLE_CACHE = "fs.hdfs.impl.disable.cache";
+  // Name of the configuration property for Kerberos principal
+  public static final String HADOOP_SECURITY_PRINCIPAL =
+      "hadoop.security.authentication.kerberos.principal";
+  // Name of the configuration property for Kerberos keytab
+  public static final String HADOOP_SECURITY_KEYTAB =
+      "hadoop.security.authentication.kerberos.keytab";
+  // Name of the configuration property for Kerberos krb5.conf location
+  public static final String HADOOP_KRB5_CONF = 
"hadoop.security.authentication.kerberos.krb5.conf";
+  // Environment variable for Java Kerberos configuration
+  public static final String SECURITY_KRB5_ENV = "java.security.krb5.conf";
+  // Supported authentication types
+  public static final String AUTH_KERBEROS = "kerberos";
+  // Simple authentication type
+  public static final String AUTH_SIMPLE = "simple";
 }
diff --git 
a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java
 
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java
index 16b87bfe38..fac32500ce 100644
--- 
a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java
+++ 
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/FileSystemUtils.java
@@ -20,7 +20,10 @@ package org.apache.gravitino.catalog.hadoop.fs;
 
 import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.BUILTIN_HDFS_FS_PROVIDER;
 import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.BUILTIN_LOCAL_FS_PROVIDER;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.CONFIG_RESOURCES;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.FS_DISABLE_CACHE;
 import static 
org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider.GRAVITINO_BYPASS;
+import static 
org.apache.gravitino.catalog.hadoop.fs.HDFSFileSystemProvider.IPC_FALLBACK_TO_SIMPLE_AUTH_ALLOWED;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -37,6 +40,7 @@ import javax.xml.stream.XMLOutputFactory;
 import javax.xml.stream.XMLStreamWriter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 
 public class FileSystemUtils {
 
@@ -215,6 +219,18 @@ public class FileSystemUtils {
    */
   public static Configuration createConfiguration(String bypass, Map<String, 
String> config) {
     try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
+      Configuration configuration = new Configuration();
+
+      String hdfsConfigResources = config.get(CONFIG_RESOURCES);
+      if (StringUtils.isNotBlank(hdfsConfigResources)) {
+        for (String resource : hdfsConfigResources.split(",")) {
+          resource = resource.trim();
+          if (StringUtils.isNotBlank(resource)) {
+            configuration.addResource(new Path(resource));
+          }
+        }
+      }
+
       XMLStreamWriter writer = 
XMLOutputFactory.newInstance().createXMLStreamWriter(out);
       writer.writeStartDocument();
       writer.writeStartElement(CONFIG_ROOT);
@@ -225,12 +241,12 @@ public class FileSystemUtils {
       writer.writeEndElement();
       writer.writeEndDocument();
       writer.close();
+      configuration.addResource(new ByteArrayInputStream(out.toByteArray()));
 
-      return new Configuration() {
-        {
-          addResource(new ByteArrayInputStream(out.toByteArray()));
-        }
-      };
+      configuration.setBoolean(FS_DISABLE_CACHE, true);
+      configuration.setBoolean(IPC_FALLBACK_TO_SIMPLE_AUTH_ALLOWED, true);
+
+      return configuration;
     } catch (Exception e) {
       throw new RuntimeException("Failed to create configuration", e);
     }
diff --git 
a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
 
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
index 00f14fccf4..bf83e75a9f 100644
--- 
a/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
+++ 
b/catalogs/hadoop-common/src/main/java/org/apache/gravitino/catalog/hadoop/fs/HDFSFileSystemProvider.java
@@ -28,6 +28,9 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 public class HDFSFileSystemProvider implements FileSystemProvider {
+  public static final String IPC_FALLBACK_TO_SIMPLE_AUTH_ALLOWED =
+      "hadoop.rpc.protection.fallback-to-simple-auth-allowed";
+  public static final String SCHEME_HDFS = "hdfs";
 
   @Override
   public FileSystem getFileSystem(@Nonnull Path path, @Nonnull Map<String, 
String> config)
@@ -38,7 +41,7 @@ public class HDFSFileSystemProvider implements 
FileSystemProvider {
 
   @Override
   public String scheme() {
-    return "hdfs";
+    return SCHEME_HDFS;
   }
 
   @Override
diff --git a/clients/filesystem-hadoop3/build.gradle.kts 
b/clients/filesystem-hadoop3/build.gradle.kts
index ace9fa42d9..48c2d9b2c6 100644
--- a/clients/filesystem-hadoop3/build.gradle.kts
+++ b/clients/filesystem-hadoop3/build.gradle.kts
@@ -36,6 +36,7 @@ dependencies {
   }
 
   implementation(libs.caffeine)
+  implementation(libs.cglib)
   implementation(libs.commons.lang3)
   implementation(libs.guava)
   implementation(libs.slf4j.api)
diff --git 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/BaseGVFSOperations.java
 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/BaseGVFSOperations.java
index e61b6abc0f..bead984912 100644
--- 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/BaseGVFSOperations.java
+++ 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/BaseGVFSOperations.java
@@ -18,12 +18,14 @@
  */
 package org.apache.gravitino.filesystem.hadoop;
 
+import static 
org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider.GRAVITINO_BYPASS;
+import static 
org.apache.gravitino.catalog.hadoop.fs.HDFSFileSystemProvider.SCHEME_HDFS;
 import static org.apache.gravitino.file.Fileset.PROPERTY_DEFAULT_LOCATION_NAME;
 import static 
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CURRENT_LOCATION_NAME;
 import static 
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_FILESET_METADATA_CACHE_ENABLE;
 import static 
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_FILESET_METADATA_CACHE_ENABLE_DEFAULT;
 import static 
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemUtils.extractIdentifier;
-import static 
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemUtils.getConfigMap;
+import static 
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemUtils.extractNonDefaultConfig;
 import static 
org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystemUtils.getSubPathFromGvfsPath;
 
 import com.github.benmanes.caffeine.cache.Cache;
@@ -42,6 +44,7 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -57,11 +60,13 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Schema;
 import org.apache.gravitino.audit.CallerContext;
 import org.apache.gravitino.audit.FilesetAuditConstants;
 import org.apache.gravitino.audit.FilesetDataOperation;
 import org.apache.gravitino.audit.InternalClientType;
 import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
+import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
 import 
org.apache.gravitino.catalog.hadoop.fs.GravitinoFileSystemCredentialsProvider;
 import org.apache.gravitino.catalog.hadoop.fs.SupportsCredentialVending;
 import org.apache.gravitino.client.GravitinoClient;
@@ -661,44 +666,24 @@ public abstract class BaseGVFSOperations implements 
Closeable {
                         NameIdentifier.of(filesetIdent.namespace().level(2), 
filesetIdent.name())));
   }
 
-  @VisibleForTesting
-  Cache<FileSystemCacheKey, FileSystem> internalFileSystemCache() {
-    return fileSystemCache;
-  }
-
   /**
-   * Lazy initialization of GravitinoClient using double-checked locking 
pattern. This ensures the
-   * expensive client creation only happens when actually needed.
+   * Get the schema by the schema identifier from the cache or load it from 
the server if the cache
+   * is disabled.
    *
-   * @return the GravitinoClient
+   * @param schemaIdent the schema identifier.
+   * @return the schema.
    */
-  @VisibleForTesting
-  GravitinoClient getGravitinoClient() {
-    if (gravitinoClient == null) {
-      synchronized (this) {
-        if (gravitinoClient == null) {
-          this.gravitinoClient = 
GravitinoVirtualFileSystemUtils.createClient(conf);
-        }
-      }
-    }
-    return gravitinoClient;
-  }
-
-  private void setCallerContextForGetFileLocation(FilesetDataOperation 
operation) {
-    Map<String, String> contextMap = Maps.newHashMap();
-    contextMap.put(
-        FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE,
-        InternalClientType.HADOOP_GVFS.name());
-    contextMap.put(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION, 
operation.name());
-    CallerContext callerContext = 
CallerContext.builder().withContext(contextMap).build();
-    CallerContext.CallerContextHolder.set(callerContext);
-  }
-
-  private void setCallerContextForGetCredentials(String locationName) {
-    Map<String, String> contextMap = Maps.newHashMap();
-    contextMap.put(CredentialConstants.HTTP_HEADER_CURRENT_LOCATION_NAME, 
locationName);
-    CallerContext callerContext = 
CallerContext.builder().withContext(contextMap).build();
-    CallerContext.CallerContextHolder.set(callerContext);
+  protected Schema getSchema(NameIdentifier schemaIdent) {
+    return filesetMetadataCache
+        .map(cache -> cache.getSchema(schemaIdent))
+        .orElseGet(
+            () -> {
+              NameIdentifier catalogIdent =
+                  NameIdentifier.of(
+                      schemaIdent.namespace().level(0), 
schemaIdent.namespace().level(1));
+              Catalog c = gravitinoClient.loadCatalog(catalogIdent.name());
+              return c.asSchemas().loadSchema(schemaIdent.name());
+            });
   }
 
   /**
@@ -727,8 +712,16 @@ public abstract class BaseGVFSOperations implements 
Closeable {
           filesetIdent);
 
       Path targetLocation = new 
Path(fileset.storageLocations().get(targetLocationName));
-      Map<String, String> allProperties =
-          getAllProperties(filesetIdent, targetLocation.toUri().getScheme(), 
targetLocationName);
+      Map<String, String> allProperties = getAllProperties(filesetIdent, 
fileset.properties());
+      
allProperties.putAll(getUserDefinedConfigs(getBaseLocation(targetLocation)));
+
+      if (enableCredentialVending()) {
+        allProperties.putAll(
+            getCredentialProperties(
+                
getFileSystemProviderByScheme(targetLocation.toUri().getScheme()),
+                filesetIdent,
+                locationName));
+      }
 
       FileSystem actualFileSystem = getActualFileSystemByPath(targetLocation, 
allProperties);
       createFilesetLocationIfNeed(filesetIdent, actualFileSystem, 
targetLocation);
@@ -762,6 +755,92 @@ public abstract class BaseGVFSOperations implements 
Closeable {
     }
   }
 
+  @VisibleForTesting
+  Cache<FileSystemCacheKey, FileSystem> internalFileSystemCache() {
+    return fileSystemCache;
+  }
+
+  /**
+   * Lazy initialization of GravitinoClient using double-checked locking 
pattern. This ensures the
+   * expensive client creation only happens when actually needed.
+   *
+   * @return the GravitinoClient
+   */
+  @VisibleForTesting
+  GravitinoClient getGravitinoClient() {
+    if (gravitinoClient == null) {
+      synchronized (this) {
+        if (gravitinoClient == null) {
+          this.gravitinoClient = 
GravitinoVirtualFileSystemUtils.createClient(conf);
+        }
+      }
+    }
+    return gravitinoClient;
+  }
+
+  private void setCallerContextForGetFileLocation(FilesetDataOperation 
operation) {
+    Map<String, String> contextMap = Maps.newHashMap();
+    contextMap.put(
+        FilesetAuditConstants.HTTP_HEADER_INTERNAL_CLIENT_TYPE,
+        InternalClientType.HADOOP_GVFS.name());
+    contextMap.put(FilesetAuditConstants.HTTP_HEADER_FILESET_DATA_OPERATION, 
operation.name());
+    CallerContext callerContext = 
CallerContext.builder().withContext(contextMap).build();
+    CallerContext.CallerContextHolder.set(callerContext);
+  }
+
+  private void setCallerContextForGetCredentials(String locationName) {
+    Map<String, String> contextMap = Maps.newHashMap();
+    contextMap.put(CredentialConstants.HTTP_HEADER_CURRENT_LOCATION_NAME, 
locationName);
+    CallerContext callerContext = 
CallerContext.builder().withContext(contextMap).build();
+    CallerContext.CallerContextHolder.set(callerContext);
+  }
+
+  /**
+   * Get user defined configurations for a specific location. Configuration 
format:
+   *
+   * @param baseLocation the base location of fileset
+   * @return a map of configuration properties for the specified location
+   */
+  private Map<String, String> getUserDefinedConfigs(String baseLocation) {
+    // Prepare a map to hold the properties for the specified location
+    // fs.path.config.<location_name> = location
+    // fs.path.config.<location_name>.<property_name> = <property_value>
+    // eg:
+    //   fs.path.config.cluster1 = s3://bucket/path/
+    //   fs.path.config.cluster1.aws-access-key = XXX
+    //   fs.path.config.cluster1.aws-secret-key = XXX
+    Map<String, String> properties = new HashMap<>();
+    if (StringUtils.isBlank(baseLocation)) {
+      return properties;
+    }
+    String locationPrefix =
+        GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_PATH_CONFIG_PREFIX
+            + baseLocation
+            + ".";
+
+    // Iterate through all configuration entries
+    for (Map.Entry<String, String> entry : conf) {
+      String key = entry.getKey();
+
+      // Check if this key is a property for the specified location
+      // e.g., "fs.path.config.cluster1.aws-ak" matches prefix 
"fs.path.config.cluster1."
+      if (key.startsWith(locationPrefix)) {
+        // Extract the property name after the location prefix
+        // e.g., "fs.path.config.cluster1.aws-ak" -> "aws-ak"
+        String propertyName = key.substring(locationPrefix.length());
+        if (!propertyName.isEmpty()) {
+          properties.put(propertyName, entry.getValue());
+        }
+      }
+    }
+
+    return properties;
+  }
+
+  private String getBaseLocation(Path targetLocation) {
+    return targetLocation.toUri().getScheme() + "://" + 
targetLocation.toUri().getAuthority();
+  }
+
   /**
    * Get the actual file system by the given actual file path and properties.
    *
@@ -793,7 +872,16 @@ public abstract class BaseGVFSOperations implements 
Closeable {
           // https://github.com/apache/gravitino/issues/5609
           resetFileSystemServiceLoader(scheme);
           try {
-            return provider.getFileSystem(actualFilePath, allProperties);
+            FileSystem fs;
+            if (scheme.equals(SCHEME_HDFS)) {
+              Configuration fsConfig =
+                  FileSystemUtils.createConfiguration(GRAVITINO_BYPASS, 
allProperties);
+              HDFSFileSystemProxy proxy = new 
HDFSFileSystemProxy(actualFilePath, fsConfig);
+              fs = proxy.getProxy();
+            } else {
+              fs = provider.getFileSystem(actualFilePath, allProperties);
+            }
+            return fs;
           } catch (IOException e) {
             throw new GravitinoRuntimeException(
                 e, "Cannot get FileSystem for path: %s", actualFilePath);
@@ -867,20 +955,19 @@ public abstract class BaseGVFSOperations implements 
Closeable {
   }
 
   private Map<String, String> getAllProperties(
-      NameIdentifier filesetIdent, String scheme, String locationName) {
+      NameIdentifier filesetIdent, Map<String, String> filesetProperties) {
+    Map<String, String> allProperties = new HashMap<>();
     Catalog catalog =
         (Catalog)
             getFilesetCatalog(
                 NameIdentifier.of(
                     filesetIdent.namespace().level(0), 
filesetIdent.namespace().level(1)));
+    allProperties.putAll(catalog.properties());
 
-    Map<String, String> allProperties = 
getNecessaryProperties(catalog.properties());
-    allProperties.putAll(getConfigMap(conf));
-    if (enableCredentialVending()) {
-      allProperties.putAll(
-          getCredentialProperties(
-              getFileSystemProviderByScheme(scheme), filesetIdent, 
locationName));
-    }
+    Schema schema = 
getSchema(NameIdentifier.parse(filesetIdent.namespace().toString()));
+    allProperties.putAll(schema.properties());
+    allProperties.putAll(filesetProperties);
+    allProperties.putAll(extractNonDefaultConfig(conf));
     return allProperties;
   }
 
diff --git 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/FilesetMetadataCache.java
 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/FilesetMetadataCache.java
index 239e1a144b..7c84994fd6 100644
--- 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/FilesetMetadataCache.java
+++ 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/FilesetMetadataCache.java
@@ -28,7 +28,9 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
+import org.apache.gravitino.Catalog;
 import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.Schema;
 import org.apache.gravitino.client.GravitinoClient;
 import org.apache.gravitino.file.Fileset;
 import org.apache.gravitino.file.FilesetCatalog;
@@ -39,6 +41,7 @@ public class FilesetMetadataCache implements Closeable {
   private final GravitinoClient client;
   private final Cache<NameIdentifier, FilesetCatalog> catalogCache;
   private final Cache<NameIdentifier, Fileset> filesetCache;
+  private final Cache<NameIdentifier, Schema> schemaCache;
 
   /**
    * Creates a new instance of {@link FilesetMetadataCache}.
@@ -49,6 +52,7 @@ public class FilesetMetadataCache implements Closeable {
     this.client = client;
     this.catalogCache = newCatalogCache();
     this.filesetCache = newFilesetCache();
+    this.schemaCache = newSchemaCache();
   }
 
   /**
@@ -84,6 +88,23 @@ public class FilesetMetadataCache implements Closeable {
                 NameIdentifier.of(filesetIdent.namespace().level(2), 
filesetIdent.name())));
   }
 
+  /**
+   * Gets the schema by the given schema identifier.
+   *
+   * @param schemaIdent the schema identifier.
+   * @return the schema.
+   */
+  public Schema getSchema(NameIdentifier schemaIdent) {
+    NameIdentifier catalogIdent =
+        NameIdentifier.of(schemaIdent.namespace().level(0), 
schemaIdent.namespace().level(1));
+    return schemaCache.get(
+        schemaIdent,
+        ident -> {
+          Catalog c = client.loadCatalog(catalogIdent.name());
+          return c.asSchemas().loadSchema(schemaIdent.name());
+        });
+  }
+
   private Cache<NameIdentifier, FilesetCatalog> newCatalogCache() {
     // In most scenarios, it will not read so many catalog filesets at the 
same time, so we can just
     // set a default value for this cache.
@@ -110,6 +131,18 @@ public class FilesetMetadataCache implements Closeable {
         .build();
   }
 
+  private Cache<NameIdentifier, Schema> newSchemaCache() {
+    return Caffeine.newBuilder()
+        .maximumSize(10000)
+        // Since Caffeine does not ensure that removalListener will be 
involved after expiration
+        // We use a scheduler with one thread to clean up expired filesets.
+        .scheduler(
+            Scheduler.forScheduledExecutorService(
+                new ScheduledThreadPoolExecutor(
+                    1, newDaemonThreadFactory("gvfs-schema-cache-cleaner"))))
+        .build();
+  }
+
   private ThreadFactory newDaemonThreadFactory(String name) {
     return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(name + 
"-%d").build();
   }
@@ -118,6 +151,7 @@ public class FilesetMetadataCache implements Closeable {
   public void close() throws IOException {
     catalogCache.invalidateAll();
     filesetCache.invalidateAll();
+    schemaCache.invalidateAll();
     // Note: We don't close the client here since it's owned and managed by 
BaseGVFSOperations
   }
 }
diff --git 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
index 80d9b6341e..95fc7bafb9 100644
--- 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
+++ 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemConfiguration.java
@@ -184,5 +184,8 @@ public class GravitinoVirtualFileSystemConfiguration {
   /** The default value for whether to enable auto-creation of fileset 
location. */
   public static final boolean FS_GRAVITINO_AUTO_CREATE_LOCATION_DEFAULT = true;
 
+  /** The prefix for user-defined location configs: {@code 
fs.path.config.<locationName>=<path>}. */
+  public static final String FS_GRAVITINO_PATH_CONFIG_PREFIX = 
"fs.path.config.";
+
   private GravitinoVirtualFileSystemConfiguration() {}
 }
diff --git 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemUtils.java
 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemUtils.java
index 3a28fd6ac0..430d2b798c 100644
--- 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemUtils.java
+++ 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/GravitinoVirtualFileSystemUtils.java
@@ -65,6 +65,29 @@ public class GravitinoVirtualFileSystemUtils {
     return maps;
   }
 
+  /**
+   * Extract non-default configuration from Hadoop Configuration.
+   *
+   * @param configuration The Hadoop configuration.
+   * @return The configuration map.
+   */
+  public static Map<String, String> extractNonDefaultConfig(Configuration 
configuration) {
+    Map<String, String> maps = Maps.newHashMap();
+    // Don't use entry.getKey() directly in the lambda, because it cannot
+    // handle variable expansion in the Configuration values.
+    Configuration defaultConf = new Configuration();
+    configuration.forEach(
+        entry -> {
+          String key = entry.getKey();
+          String value = configuration.get(key);
+          // ignore the default configuration
+          if (!StringUtils.equals(value, defaultConf.get(key))) {
+            maps.put(key, value);
+          }
+        });
+    return maps;
+  }
+
   /**
    * Get Gravitino client by the configuration.
    *
diff --git 
a/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/HDFSFileSystemProxy.java
 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/HDFSFileSystemProxy.java
new file mode 100644
index 0000000000..776b844859
--- /dev/null
+++ 
b/clients/filesystem-hadoop3/src/main/java/org/apache/gravitino/filesystem/hadoop/HDFSFileSystemProxy.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.filesystem.hadoop;
+
+import static org.apache.gravitino.catalog.hadoop.fs.Constants.AUTH_KERBEROS;
+import static org.apache.gravitino.catalog.hadoop.fs.Constants.AUTH_SIMPLE;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.FS_DISABLE_CACHE;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.HADOOP_KRB5_CONF;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.HADOOP_SECURITY_KEYTAB;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.HADOOP_SECURITY_PRINCIPAL;
+import static 
org.apache.gravitino.catalog.hadoop.fs.Constants.SECURITY_KRB5_ENV;
+import static 
org.apache.gravitino.catalog.hadoop.fs.HDFSFileSystemProvider.IPC_FALLBACK_TO_SIMPLE_AUTH_ALLOWED;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import net.sf.cglib.proxy.Enhancer;
+import net.sf.cglib.proxy.MethodInterceptor;
+import net.sf.cglib.proxy.MethodProxy;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.gravitino.exceptions.GravitinoRuntimeException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A FileSystem wrapper that runs all operations under a specific UGI 
(UserGroupInformation).
+ * Supports both simple and Kerberos authentication, with automatic ticket 
renewal.
+ */
+public class HDFSFileSystemProxy implements MethodInterceptor {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(HDFSFileSystemProxy.class);
+
+  private static final long DEFAULT_RENEW_INTERVAL_MS = 10 * 60 * 1000L;
+  private static final String SYSTEM_USER_NAME = 
System.getProperty("user.name");
+  private static final String SYSTEM_ENV_HADOOP_USER_NAME = "HADOOP_USER_NAME";
+
+  private final UserGroupInformation ugi;
+  private final FileSystem fs;
+  private final Configuration configuration;
+  private ScheduledExecutorService kerberosRenewExecutor;
+
+  /**
+   * Create a HDFSAuthenticationFileSystem with the given path and 
configuration. Supports both
+   * simple and Kerberos authentication, with automatic ticket renewal for 
Kerberos.
+   *
+   * @param path the HDFS path
+   * @param conf the Hadoop configuration
+   */
+  public HDFSFileSystemProxy(Path path, Configuration conf) {
+    try {
+      conf.setBoolean(FS_DISABLE_CACHE, true);
+      conf.setBoolean(IPC_FALLBACK_TO_SIMPLE_AUTH_ALLOWED, true);
+      this.configuration = conf;
+
+      String authType = conf.get(HADOOP_SECURITY_AUTHENTICATION, AUTH_SIMPLE);
+      if (AUTH_KERBEROS.equalsIgnoreCase(authType)) {
+        String krb5Config = conf.get(HADOOP_KRB5_CONF);
+
+        if (krb5Config != null) {
+          System.setProperty(SECURITY_KRB5_ENV, krb5Config);
+        }
+        UserGroupInformation.setConfiguration(conf);
+        String principal = conf.get(HADOOP_SECURITY_PRINCIPAL, null);
+        String keytab = conf.get(HADOOP_SECURITY_KEYTAB, null);
+
+        if (principal == null || keytab == null) {
+          throw new GravitinoRuntimeException(
+              "Kerberos principal and keytab must be provided for kerberos 
authentication");
+        }
+
+        this.ugi = 
UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab);
+        startKerberosRenewalTask(principal);
+      } else {
+        String userName = System.getenv(SYSTEM_ENV_HADOOP_USER_NAME);
+        if (StringUtils.isEmpty(userName)) {
+          userName = SYSTEM_USER_NAME;
+        }
+        this.ugi = UserGroupInformation.createRemoteUser(userName);
+      }
+
+      this.fs =
+          ugi.doAs(
+              (PrivilegedExceptionAction<FileSystem>)
+                  () -> FileSystem.newInstance(path.toUri(), conf));
+
+    } catch (Exception e) {
+      throw new GravitinoRuntimeException(e, "Failed to create HDFS FileSystem 
with UGI: %s", path);
+    }
+  }
+
+  /**
+   * Get the proxied FileSystem instance.
+   *
+   * @return the proxied FileSystem
+   * @throws IOException if an I/O error occurs
+   */
+  public FileSystem getProxy() throws IOException {
+    Enhancer e = new Enhancer();
+    e.setClassLoader(fs.getClass().getClassLoader());
+    e.setSuperclass(fs.getClass());
+    e.setCallback(this);
+    FileSystem proxyFs = (FileSystem) e.create();
+    fs.setConf(configuration);
+    return proxyFs;
+  }
+
+  @Override
+  public Object intercept(Object o, Method method, Object[] objects, 
MethodProxy methodProxy)
+      throws Throwable {
+    // Intercept close() method to clean up the Kerberos renewal executor
+    boolean isCloseMethod = "close".equals(method.getName());
+    try {
+      Object result = invokeWithUgi(methodProxy, objects);
+      // Close the Kerberos renewal executor after FileSystem.close()
+      if (isCloseMethod) {
+        close();
+      }
+      return result;
+    } catch (Throwable e) {
+      if (isCloseMethod) {
+        close();
+      }
+      throw e;
+    }
+  }
+
+  /** Schedule periodic Kerberos re-login to refresh TGT before expiry. */
+  private void startKerberosRenewalTask(String principal) {
+    kerberosRenewExecutor =
+        Executors.newSingleThreadScheduledExecutor(
+            r -> {
+              Thread t = new Thread(r, "HDFSFileSystemProxy 
Kerberos-Renewal-Thread");
+              t.setDaemon(true);
+              return t;
+            });
+
+    kerberosRenewExecutor.scheduleAtFixedRate(
+        () -> {
+          try {
+            if (ugi.hasKerberosCredentials()) {
+              ugi.checkTGTAndReloginFromKeytab();
+            }
+          } catch (Exception e) {
+            LOG.error(
+                "[Kerberos] Failed to renew TGT for principal {}: {}",
+                principal,
+                e.getMessage(),
+                e);
+          }
+        },
+        DEFAULT_RENEW_INTERVAL_MS,
+        DEFAULT_RENEW_INTERVAL_MS,
+        TimeUnit.MILLISECONDS);
+  }
+
+  /** Close the Kerberos renewal executor service to prevent resource leaks. */
+  private void close() {
+    if (kerberosRenewExecutor != null) {
+      kerberosRenewExecutor.shutdownNow();
+      kerberosRenewExecutor = null;
+    }
+  }
+
+  /** Invoke the method on the underlying FileSystem using ugi.doAs. */
+  private Object invokeWithUgi(MethodProxy methodProxy, Object[] objects) 
throws Throwable {
+    return ugi.doAs(
+        (PrivilegedExceptionAction<Object>)
+            () -> {
+              try {
+                return methodProxy.invoke(fs, objects);
+              } catch (IOException e) {
+                throw e;
+              } catch (Throwable e) {
+                if (RuntimeException.class.isAssignableFrom(e.getClass())) {
+                  throw (RuntimeException) e;
+                }
+                throw new RuntimeException("Failed to invoke method", e);
+              }
+            });
+  }
+}
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestBaseGVFSOperationsUserConfigs.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestBaseGVFSOperationsUserConfigs.java
new file mode 100644
index 0000000000..5d552b1991
--- /dev/null
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestBaseGVFSOperationsUserConfigs.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.filesystem.hadoop;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.withSettings;
+
+import java.lang.reflect.Method;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+/** Unit test covering the user-defined configuration extraction logic in 
BaseGVFSOperations. */
+public class TestBaseGVFSOperationsUserConfigs {
+
+  private Configuration createBaseConfiguration() {
+    Configuration conf = new Configuration();
+    conf.set(
+        
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_CLIENT_METALAKE_KEY, 
"test_metalake");
+    conf.set(
+        GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_SERVER_URI_KEY,
+        "http://localhost:8090";);
+    return conf;
+  }
+
+  @SuppressWarnings("unchecked")
+  private Map<String, String> invokeGetUserDefinedConfigs(
+      BaseGVFSOperations operations, String locationName) {
+    try {
+      Method method =
+          BaseGVFSOperations.class.getDeclaredMethod("getUserDefinedConfigs", 
String.class);
+      method.setAccessible(true);
+      return (Map<String, String>) method.invoke(operations, locationName);
+    } catch (ReflectiveOperationException e) {
+      throw new RuntimeException("Unable to invoke getUserDefinedConfigs", e);
+    }
+  }
+
+  @Test
+  public void testGetUserDefinedConfigsReturnsExpectedEntries() throws 
Exception {
+    Configuration conf = createBaseConfiguration();
+    conf.set(
+        
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_PATH_CONFIG_PREFIX + 
"cluster1",
+        "hdfs://cluster1/");
+    conf.set(
+        
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_PATH_CONFIG_PREFIX + 
"cluster1.aws-ak",
+        "AK1");
+    conf.set(
+        
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_PATH_CONFIG_PREFIX + 
"cluster1.aws-sk",
+        "SK1");
+    conf.set(
+        GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_PATH_CONFIG_PREFIX
+            + "cluster1.config.resource",
+        "/etc/core-site.xml,hdfs-site.xml");
+    // Another location's property which should be ignored
+    conf.set(
+        
GravitinoVirtualFileSystemConfiguration.FS_GRAVITINO_PATH_CONFIG_PREFIX + 
"cluster2.aws-ak",
+        "AK2");
+
+    BaseGVFSOperations operations =
+        mock(
+            BaseGVFSOperations.class,
+            
withSettings().useConstructor(conf).defaultAnswer(Mockito.CALLS_REAL_METHODS));
+
+    Map<String, String> properties = invokeGetUserDefinedConfigs(operations, 
"cluster1");
+
+    assertEquals(3, properties.size(), "Only cluster1 scoped properties should 
be returned");
+    assertEquals("AK1", properties.get("aws-ak"));
+    assertEquals("SK1", properties.get("aws-sk"));
+    assertEquals("/etc/core-site.xml,hdfs-site.xml", 
properties.get("config.resource"));
+  }
+}
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
index 53a04ddc65..d39bf26b3b 100644
--- 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/TestGvfsBase.java
@@ -66,6 +66,7 @@ import org.apache.gravitino.NameIdentifier;
 import org.apache.gravitino.Version;
 import org.apache.gravitino.dto.AuditDTO;
 import org.apache.gravitino.dto.CatalogDTO;
+import org.apache.gravitino.dto.SchemaDTO;
 import org.apache.gravitino.dto.credential.CredentialDTO;
 import org.apache.gravitino.dto.file.FilesetDTO;
 import org.apache.gravitino.dto.responses.CatalogResponse;
@@ -73,6 +74,7 @@ import org.apache.gravitino.dto.responses.CredentialResponse;
 import org.apache.gravitino.dto.responses.ErrorResponse;
 import org.apache.gravitino.dto.responses.FileLocationResponse;
 import org.apache.gravitino.dto.responses.FilesetResponse;
+import org.apache.gravitino.dto.responses.SchemaResponse;
 import org.apache.gravitino.dto.responses.VersionResponse;
 import org.apache.gravitino.exceptions.NoSuchCatalogException;
 import org.apache.gravitino.exceptions.NoSuchFilesetException;
@@ -1142,6 +1144,24 @@ public class TestGvfsBase extends 
GravitinoMockServerBase {
         null,
         catalogResponse,
         SC_OK);
+    SchemaResponse schemaResponse =
+        new SchemaResponse(
+            SchemaDTO.builder()
+                .withName(schemaNameLocal)
+                .withAudit(AuditDTO.builder().build())
+                .withProperties(ImmutableMap.of())
+                .build());
+    buildMockResource(
+        Method.GET,
+        "/api/metalakes/"
+            + metalakeName
+            + "/catalogs/"
+            + catalogNameWithFsOpsDisabled
+            + "/schemas/"
+            + schemaNameLocal,
+        null,
+        schemaResponse,
+        SC_OK);
 
     Path managedFilesetPath =
         FileSystemTestUtils.createFilesetPath(
@@ -1468,6 +1488,17 @@ public class TestGvfsBase extends 
GravitinoMockServerBase {
 
   private void buildMockResourceForCredential(String filesetName, String 
filesetLocation)
       throws JsonProcessingException {
+    String schemaPath =
+        String.format(
+            "/api/metalakes/%s/catalogs/%s/schemas/%s", metalakeName, 
catalogName, schemaName);
+    SchemaResponse schemaResponse =
+        new SchemaResponse(
+            SchemaDTO.builder()
+                .withName(schemaName)
+                .withAudit(AuditDTO.builder().build())
+                .withProperties(ImmutableMap.of())
+                .build());
+
     String filesetPath =
         String.format(
             "/api/metalakes/%s/catalogs/%s/schemas/%s/filesets/%s",
@@ -1488,6 +1519,7 @@ public class TestGvfsBase extends GravitinoMockServerBase 
{
                 .build());
     CredentialResponse credentialResponse = new CredentialResponse(new 
CredentialDTO[] {});
 
+    buildMockResource(Method.GET, schemaPath, ImmutableMap.of(), null, 
schemaResponse, SC_OK);
     buildMockResource(Method.GET, filesetPath, ImmutableMap.of(), null, 
filesetResponse, SC_OK);
     buildMockResource(
         Method.GET, credentialsPath, ImmutableMap.of(), null, 
credentialResponse, SC_OK);
diff --git 
a/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GvfsMultipleClusterIT.java
 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GvfsMultipleClusterIT.java
new file mode 100644
index 0000000000..2833088639
--- /dev/null
+++ 
b/clients/filesystem-hadoop3/src/test/java/org/apache/gravitino/filesystem/hadoop/integration/test/GvfsMultipleClusterIT.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.filesystem.hadoop.integration.test;
+
+import static org.apache.gravitino.file.Fileset.LOCATION_NAME_UNKNOWN;
+import static org.apache.gravitino.file.Fileset.PROPERTY_DEFAULT_LOCATION_NAME;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.gravitino.Catalog;
+import org.apache.gravitino.NameIdentifier;
+import org.apache.gravitino.client.GravitinoMetalake;
+import org.apache.gravitino.file.Fileset;
+import org.apache.gravitino.integration.test.container.ContainerSuite;
+import org.apache.gravitino.integration.test.container.HiveContainer;
+import org.apache.gravitino.integration.test.util.BaseIT;
+import org.apache.gravitino.integration.test.util.GravitinoITUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.authentication.util.KerberosName;
+import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIf;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+In embedded mode, the Hadoop FileSystem within a single JVM process has some
+static variables and caches that interfere with each other,
+ leading to authentication failures.
+*/
+@Tag("gravitino-docker-test")
+@DisabledIf("org.apache.gravitino.integration.test.util.ITUtils#isEmbedded")
+public class GvfsMultipleClusterIT extends BaseIT {
+  private static final Logger LOG = 
LoggerFactory.getLogger(GvfsMultipleClusterIT.class);
+  protected static final ContainerSuite containerSuite = 
ContainerSuite.getInstance();
+  protected HiveContainer hiveContainer;
+  protected HiveContainer kerberosHiveContainer;
+
+  protected String metalakeName = 
GravitinoITUtils.genRandomName("gvfs_it_metalake");
+  protected String catalogName = GravitinoITUtils.genRandomName("catalog");
+  protected String schemaName = GravitinoITUtils.genRandomName("schema");
+  protected GravitinoMetalake metalake;
+  protected Configuration conf = new Configuration();
+  protected Map<String, String> properties = Maps.newHashMap();
+  protected String configResourcesPath;
+
+  @BeforeAll
+  public void startUp() throws Exception {
+    containerSuite.startHiveContainer();
+    hiveContainer = containerSuite.getHiveContainer();
+
+    containerSuite.startKerberosHiveContainer();
+    kerberosHiveContainer = containerSuite.getKerberosHiveContainer();
+
+    setupKerberosEnv();
+
+    Assertions.assertFalse(client.metalakeExists(metalakeName));
+    metalake = client.createMetalake(metalakeName, "metalake comment", 
Collections.emptyMap());
+    Assertions.assertTrue(client.metalakeExists(metalakeName));
+
+    Catalog catalog =
+        metalake.createCatalog(
+            catalogName, Catalog.Type.FILESET, "hadoop", "catalog comment", 
properties);
+    Assertions.assertTrue(metalake.catalogExists(catalogName));
+
+    catalog.asSchemas().createSchema(schemaName, "schema comment", properties);
+    Assertions.assertTrue(catalog.asSchemas().schemaExists(schemaName));
+
+    conf.set("fs.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.GravitinoVirtualFileSystem");
+    conf.set("fs.AbstractFileSystem.gvfs.impl", 
"org.apache.gravitino.filesystem.hadoop.Gvfs");
+    conf.set("fs.gvfs.impl.disable.cache", "true");
+    conf.set("fs.gravitino.server.uri", serverUri);
+    conf.set("fs.gravitino.client.metalake", metalakeName);
+  }
+
+  private void setupKerberosEnv() throws Exception {
+    File baseDir = new File(System.getProperty("java.io.tmpdir"));
+    File file = Files.createTempDirectory(baseDir.toPath(), "test").toFile();
+    file.deleteOnExit();
+    String tmpDir = file.getAbsolutePath();
+    this.configResourcesPath = tmpDir;
+
+    // Keytab of the Gravitino SDK client
+    String keytabPath = tmpDir + "/admin.keytab";
+    
kerberosHiveContainer.getContainer().copyFileFromContainer("/etc/admin.keytab", 
keytabPath);
+
+    String tmpKrb5Path = tmpDir + "/krb5.conf_tmp";
+    String krb5Path = tmpDir + "/krb5.conf";
+    
kerberosHiveContainer.getContainer().copyFileFromContainer("/etc/krb5.conf", 
tmpKrb5Path);
+
+    // Modify the krb5.conf and change the kdc and admin_server to the 
container IP
+    String ip = 
containerSuite.getKerberosHiveContainer().getContainerIpAddress();
+    String content = FileUtils.readFileToString(new File(tmpKrb5Path), 
StandardCharsets.UTF_8);
+    content = content.replace("kdc = localhost:88", "kdc = " + ip + ":88");
+    content = content.replace("admin_server = localhost", "admin_server = " + 
ip + ":749");
+    FileUtils.write(new File(krb5Path), content, StandardCharsets.UTF_8);
+
+    LOG.info("Kerberos kdc config:\n{}, path: {}", content, krb5Path);
+    System.setProperty("java.security.krb5.conf", krb5Path);
+    System.setProperty("sun.security.krb5.debug", "true");
+
+    // create hdfs-site.xml and core-site.xml
+    // read hdfs-site.xml from resources
+    String hdfsSiteXml = readResourceFile("hd_kbs_conf/hdfs-site.xml");
+    FileUtils.write(new File(tmpDir + "/hdfs-site.xml"), hdfsSiteXml, 
StandardCharsets.UTF_8);
+    String coreSiteXml = readResourceFile("hd_kbs_conf/core-site.xml");
+    coreSiteXml = coreSiteXml.replace("XXX_KEYTAB_XXX", keytabPath);
+    coreSiteXml = coreSiteXml.replace("XXX_KRB_CONF_XXX", krb5Path);
+    FileUtils.write(new File(tmpDir + "/core-site.xml"), coreSiteXml, 
StandardCharsets.UTF_8);
+
+    LOG.info("Kerberos config resources created in {}", tmpDir);
+    refreshKerberosConfig();
+    KerberosName.resetDefaultRealm();
+
+    LOG.info("Kerberos default realm: {}", KerberosUtil.getDefaultRealm());
+  }
+
+  private static String readResourceFile(String resourcePath) throws 
IOException {
+    return new String(
+        GvfsMultipleClusterIT.class
+            .getClassLoader()
+            .getResourceAsStream(resourcePath)
+            .readAllBytes(),
+        StandardCharsets.UTF_8);
+  }
+
+  private static void refreshKerberosConfig() {
+    Class<?> classRef;
+    try {
+      if (System.getProperty("java.vendor").contains("IBM")) {
+        classRef = Class.forName("com.ibm.security.krb5.internal.Config");
+      } else {
+        classRef = Class.forName("sun.security.krb5.Config");
+      }
+
+      Method refreshMethod = classRef.getMethod("refresh");
+      refreshMethod.invoke(null);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @AfterAll
+  public void tearDown() throws IOException {
+    if (metalake == null) {
+      return;
+    }
+
+    Catalog catalog = metalake.loadCatalog(catalogName);
+    catalog.asSchemas().dropSchema(schemaName, true);
+    metalake.dropCatalog(catalogName, true);
+    client.dropMetalake(metalakeName, true);
+
+    if (client != null) {
+      client.close();
+      client = null;
+    }
+
+    try {
+      closer.close();
+    } catch (Exception e) {
+      LOG.error("Exception in closing CloseableGroup", e);
+    }
+  }
+
+  protected Path genGvfsPath(String fileset) {
+    return new Path(String.format("gvfs://fileset/%s/%s/%s", catalogName, 
schemaName, fileset));
+  }
+
+  private String baseHdfsPath(String ip, String filesetName) {
+    return String.format(
+        "hdfs://%s:%d/tmp/%s/%s/%s",
+        ip, HiveContainer.HDFS_DEFAULTFS_PORT, catalogName, schemaName, 
filesetName);
+  }
+
+  @Test
+  public void testFsOperation() throws IOException {
+    // create a fileset with normal cluster
+    String normalFilesetName = 
GravitinoITUtils.genRandomName("fileset_normal");
+    NameIdentifier normalFilesetIdent = NameIdentifier.of(schemaName, 
normalFilesetName);
+    Catalog catalog = metalake.loadCatalog(catalogName);
+    String location = baseHdfsPath(hiveContainer.getContainerIpAddress(), 
normalFilesetName);
+    catalog
+        .asFilesetCatalog()
+        .createMultipleLocationFileset(
+            normalFilesetIdent,
+            "fileset comment",
+            Fileset.Type.MANAGED,
+            ImmutableMap.of(LOCATION_NAME_UNKNOWN, location),
+            ImmutableMap.of(PROPERTY_DEFAULT_LOCATION_NAME, 
LOCATION_NAME_UNKNOWN));
+    
Assertions.assertTrue(catalog.asFilesetCatalog().filesetExists(normalFilesetIdent));
+
+    // create a fileset with kerberos cluster
+    String kerberosFilesetName = 
GravitinoITUtils.genRandomName("fileset_kerberos");
+    NameIdentifier kerberosFilesetIdent = NameIdentifier.of(schemaName, 
kerberosFilesetName);
+    location = baseHdfsPath(kerberosHiveContainer.getContainerIpAddress(), 
kerberosFilesetName);
+    String configResources =
+        configResourcesPath + "/core-site.xml," + configResourcesPath + 
"/hdfs-site.xml";
+    catalog
+        .asFilesetCatalog()
+        .createMultipleLocationFileset(
+            kerberosFilesetIdent,
+            "fileset comment",
+            Fileset.Type.MANAGED,
+            ImmutableMap.of(LOCATION_NAME_UNKNOWN, location),
+            ImmutableMap.of(
+                PROPERTY_DEFAULT_LOCATION_NAME,
+                LOCATION_NAME_UNKNOWN,
+                "gravitino.bypass.dfs.namenode.kerberos.principal.pattern",
+                "*",
+                "config.resources",
+                configResources));
+    
Assertions.assertTrue(catalog.asFilesetCatalog().filesetExists(kerberosFilesetIdent));
+
+    Path normalGvfsPath = genGvfsPath(normalFilesetName);
+    Path kerberosGvfsPath = genGvfsPath(kerberosFilesetName);
+    try (FileSystem gvfs = normalGvfsPath.getFileSystem(conf)) {
+      if (!gvfs.exists(normalGvfsPath)) {
+        gvfs.mkdirs(normalGvfsPath);
+      }
+      if (!gvfs.exists(kerberosGvfsPath)) {
+        gvfs.mkdirs(kerberosGvfsPath);
+      }
+
+      gvfs.create(new Path(normalGvfsPath + "/file1.txt")).close();
+      gvfs.create(new Path(kerberosGvfsPath + "/file1.txt")).close();
+      gvfs.create(new Path(normalGvfsPath + "/file2.txt")).close();
+      gvfs.create(new Path(kerberosGvfsPath + "/file2.txt")).close();
+    }
+  }
+}
diff --git 
a/clients/filesystem-hadoop3/src/test/resources/hd_kbs_conf/core-site.xml 
b/clients/filesystem-hadoop3/src/test/resources/hd_kbs_conf/core-site.xml
new file mode 100644
index 0000000000..81cbfcc191
--- /dev/null
+++ b/clients/filesystem-hadoop3/src/test/resources/hd_kbs_conf/core-site.xml
@@ -0,0 +1,36 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+  -->
+<configuration>
+    <property>
+        <name>hadoop.security.authentication</name>
+        <value>kerberos</value>
+    </property>
+    <property>
+        <name>hadoop.security.authentication.kerberos.principal</name>
+        <value>cli@HADOOPKRB</value>
+    </property>
+    <property>
+        <name>hadoop.security.authentication.kerberos.keytab</name>
+        <value>XXX_KEYTAB_XXX</value>
+    </property>
+    <property>
+        <name>hadoop.security.authentication.kerberos.krb5.conf</name>
+        <value>XXX_KRB_CONF_XXX</value>
+    </property>
+</configuration>
diff --git 
a/clients/filesystem-hadoop3/src/test/resources/hd_kbs_conf/hdfs-site.xml 
b/clients/filesystem-hadoop3/src/test/resources/hd_kbs_conf/hdfs-site.xml
new file mode 100644
index 0000000000..56478dc0ec
--- /dev/null
+++ b/clients/filesystem-hadoop3/src/test/resources/hd_kbs_conf/hdfs-site.xml
@@ -0,0 +1,24 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing,
+  software distributed under the License is distributed on an
+  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  KIND, either express or implied.  See the License for the
+  specific language governing permissions and limitations
+  under the License.
+  -->
+<configuration>
+    <property>
+        <name>dfs.namenode.kerberos.principal</name>
+        <value>cli@HADOOPKRB</value>
+    </property>
+</configuration>
diff --git a/common/src/main/java/org/apache/gravitino/Config.java 
b/common/src/main/java/org/apache/gravitino/Config.java
index 3200854db9..91acfb0018 100644
--- a/common/src/main/java/org/apache/gravitino/Config.java
+++ b/common/src/main/java/org/apache/gravitino/Config.java
@@ -43,7 +43,8 @@ public abstract class Config {
 
   private static final String CONFIG_PREPEND = "gravitino.";
 
-  private final ConcurrentMap<String, String> configMap;
+  /** The concurrent map to hold configuration key-value pairs. */
+  protected final ConcurrentMap<String, String> configMap;
 
   private final Map<String, DeprecatedConfig> deprecatedConfigMap;
   // Constant Array to hold all deprecated configuration keys, when a 
configuration is deprecated,
diff --git 
a/core/src/jmh/java/org/apache/gravitino/cache/it/MixEntityStorageBenchmark.java
 
b/core/src/jmh/java/org/apache/gravitino/cache/it/MixEntityStorageBenchmark.java
index 1c2e07498b..ffca2b3be3 100644
--- 
a/core/src/jmh/java/org/apache/gravitino/cache/it/MixEntityStorageBenchmark.java
+++ 
b/core/src/jmh/java/org/apache/gravitino/cache/it/MixEntityStorageBenchmark.java
@@ -82,7 +82,6 @@ public class MixEntityStorageBenchmark<E extends Entity & 
HasIdentifier>
    * Test get from store.
    *
    * @return The entity.
-   * @throws IOException if an I/O error occurs.
    */
   @Benchmark
   @Group("ops")
diff --git a/docs/fileset-catalog.md b/docs/fileset-catalog.md
index 6bb194a5f4..f1b9407ef6 100644
--- a/docs/fileset-catalog.md
+++ b/docs/fileset-catalog.md
@@ -9,12 +9,12 @@ license: "This software is licensed under the Apache License 
version 2."
 ## Introduction
 
 Fileset catalog is a fileset catalog that using Hadoop Compatible File System 
(HCFS) to manage
-the storage location of the fileset. Currently, it supports the local 
filesystem and HDFS. Since 
+the storage location of the fileset. Currently, it supports the local 
filesystem and HDFS. Since
 0.7.0-incubating, Gravitino supports [S3](fileset-catalog-with-s3.md), 
[GCS](fileset-catalog-with-gcs.md),
-[OSS](fileset-catalog-with-oss.md) and [Azure Blob 
Storage](fileset-catalog-with-adls.md) through Fileset catalog. 
+[OSS](fileset-catalog-with-oss.md) and [Azure Blob 
Storage](fileset-catalog-with-adls.md) through Fileset catalog.
 
 The rest of this document will use HDFS or local file as an example to 
illustrate how to use the Fileset catalog.
-For S3, GCS, OSS and Azure Blob Storage, the configuration is similar to HDFS, 
+For S3, GCS, OSS and Azure Blob Storage, the configuration is similar to HDFS,
 please refer to the corresponding document for more details.
 
 Note that Gravitino uses Hadoop 3 dependencies to build Fileset catalog. 
Theoretically, it should be
@@ -25,7 +25,8 @@ Hadoop 3. If there's any compatibility issue, please create 
an [issue](https://g
 
 ### Catalog properties
 
-Besides the [common catalog 
properties](./gravitino-server-config.md#apache-gravitino-catalog-properties-configuration),
 the Fileset catalog has the following properties:
+Besides the [common catalog 
properties](./gravitino-server-config.md#apache-gravitino-catalog-properties-configuration),
+the Fileset catalog has the following properties:
 
 | Property Name                        | Description                           
                                                                                
                                                                                
                                                                                
                              | Default Value   | Required | Since Version    |
 
|--------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------|----------|------------------|
@@ -38,23 +39,40 @@ Besides the [common catalog 
properties](./gravitino-server-config.md#apache-grav
 | `disable-filesystem-ops`             | The configuration to disable file 
system operations in the server side. If set to true, the Fileset catalog in 
the server side will not create, drop files or folder when the schema, fileset 
is created, dropped.                                                            
                                      | false           | No       | 
0.9.0-incubating |
 | `fileset-cache-eviction-interval-ms` | The interval in milliseconds to evict 
the fileset cache, -1 means never evict.                                        
                                                                                
                                                                                
                              | 3600000         | No       | 0.9.0-incubating |
 | `fileset-cache-max-size`             | The maximum number of the filesets 
the cache may contain, -1 means no limit.                                       
                                                                                
                                                                                
                                 | 200000          | No       | 
0.9.0-incubating |
+| `config.resources`                   | The configuration resources, 
separated by comma. For example, `hdfs-site.xml,core-site.xml`.                 
                                                                                
                                                                                
                                       | (none)          | No       | 1.1.0     
       |
 
 Please refer to [Credential vending](./security/credential-vending.md) for 
more details about credential vending.
 
-### HDFS fileset 
+### HDFS fileset
 
-Apart from the above properties, to access fileset like HDFS fileset, you need 
to configure the following extra properties.
+Apart from the above properties, to access fileset like HDFS fileset, you need 
to configure the following extra
+properties.
 
-| Property Name                                      | Description             
                                                                       | 
Default Value | Required                                                    | 
Since Version |
-|----------------------------------------------------|------------------------------------------------------------------------------------------------|---------------|-------------------------------------------------------------|---------------|
+| Property Name                                      | Description             
                                                                        | 
Default Value | Required                                                    | 
Since Version |
+|----------------------------------------------------|-------------------------------------------------------------------------------------------------|---------------|-------------------------------------------------------------|---------------|
 | `authentication.impersonation-enable`              | Whether to enable 
impersonation for the Fileset catalog.                                        | 
`false`       | No                                                          | 
0.5.1         |
 | `authentication.type`                              | The type of 
authentication for Fileset catalog, currently we only support `kerberos`, 
`simple`. | `simple`      | No                                                  
        | 0.5.1         |
-| `authentication.kerberos.principal`                | The principal of the 
Kerberos authentication                                                   | 
(none)        | required if the value of `authentication.type` is Kerberos. | 
0.5.1         |
-| `authentication.kerberos.keytab-uri`               | The URI of The keytab 
for the Kerberos authentication.                                         | 
(none)        | required if the value of `authentication.type` is Kerberos. | 
0.5.1         |
+| `authentication.kerberos.principal`                | The principal of the 
Kerberos authentication                                                    | 
(none)        | required if the value of `authentication.type` is Kerberos. | 
0.5.1         |
+| `authentication.kerberos.keytab-uri`               | The URI of The keytab 
for the Kerberos authentication.                                          | 
(none)        | required if the value of `authentication.type` is Kerberos. | 
0.5.1         |
 | `authentication.kerberos.check-interval-sec`       | The check interval of 
Kerberos credential for Fileset catalog.                                  | 60  
          | No                                                          | 0.5.1 
        |
-| `authentication.kerberos.keytab-fetch-timeout-sec` | The fetch timeout of 
retrieving Kerberos keytab from `authentication.kerberos.keytab-uri`.     | 60  
          | No                                                          | 0.5.1 
        |
+| `authentication.kerberos.keytab-fetch-timeout-sec` | The fetch timeout of 
retrieving Kerberos keytab from `authentication.kerberos.keytab-uri`.      | 60 
           | No                                                          | 
0.5.1         |
+
+The `config.resources` property allows users to specify custom configuration 
files.
+
+The Gravitino Fileset extends the following properties in the `xxx-site.xml`:
+
+| Property Name                                     | Description              
                                               | Default Value | Required       
                                             | Since Version |
+|---------------------------------------------------|-------------------------------------------------------------------------|---------------|-------------------------------------------------------------|---------------|
+| hadoop.security.authentication.kerberos.principal | The principal of the 
Kerberos authentication for HDFS client.           | (none)        | required 
if the value of `authentication.type` is Kerberos. | 1.1.0         |
+| hadoop.security.authentication.kerberos.keytab    | The keytab file path of 
the Kerberos authentication for HDFS client.    | (none)        | required if 
the value of `authentication.type` is Kerberos. | 1.1.0         |
+| hadoop.security.authentication.kerberos.krb5.conf | The krb5.conf file path 
of the Kerberos authentication for HDFS client. | (none)        | No            
                                              | 1.1.0         |
 
 ### Fileset catalog with Cloud Storage
+
+In the current implementation, the fileset uses the HDFS protocol to access 
its location. If users use S3, GCS, OSS,
+or Azure Blob Storage, they can also configure the `config.resources` to 
specify custom configuration
+files.
+
 - For S3, please refer to 
[Fileset-catalog-with-s3](./fileset-catalog-with-s3.md) for more details.
 - For GCS, please refer to 
[Fileset-catalog-with-gcs](./fileset-catalog-with-gcs.md) for more details.
 - For OSS, please refer to 
[Fileset-catalog-with-oss](./fileset-catalog-with-oss.md) for more details.
@@ -62,7 +80,9 @@ Apart from the above properties, to access fileset like HDFS 
fileset, you need t
 
 ### How to custom your own HCFS file system fileset?
 
-Developers and users can custom their own HCFS file system fileset by 
implementing the`FileSystemProvider` interface in the jar 
[gravitino-hadoop-common](https://repo1.maven.org/maven2/org/apache/gravitino/gravitino-hadoop-common/).
 The `FileSystemProvider` interface is defined as follows:
+Developers and users can custom their own HCFS file system fileset by 
implementing the`FileSystemProvider` interface in
+the jar 
[gravitino-hadoop-common](https://repo1.maven.org/maven2/org/apache/gravitino/gravitino-hadoop-common/).
 The
+`FileSystemProvider` interface is defined as follows:
 
 ```java
   
@@ -90,16 +110,22 @@ After implementing the `FileSystemProvider` interface, you 
need to put the jar f
 `$GRAVITINO_HOME/catalogs/fileset/libs` directory. Then you can set the 
`filesystem-providers`
 property to use your custom file system provider.
 
-### Authentication for Fileset Catalog
+### Authentication for fileset catalog
 
-The Fileset catalog supports multi-level authentication to control access, 
allowing different authentication settings for the catalog, schema, and 
fileset. The priority of authentication settings is as follows: catalog < 
schema < fileset. Specifically:
+The Fileset catalog supports multi-level authentication to control access, 
allowing different authentication settings
+for the catalog, schema, and fileset. The priority of authentication settings 
is as follows: catalog < schema < fileset.
+Specifically:
 
 - **Catalog**: The default authentication is `simple`.
-- **Schema**: Inherits the authentication setting from the catalog if not 
explicitly set. For more information about schema settings, please refer to 
[Schema properties](#schema-properties).
-- **Fileset**: Inherits the authentication setting from the schema if not 
explicitly set. For more information about fileset settings, please refer to 
[Fileset properties](#fileset-properties).
+- **Schema**: Inherits the authentication setting from the catalog if not 
explicitly set. For more information about
+  schema settings, please refer to [Schema properties](#schema-properties).
+- **Fileset**: Inherits the authentication setting from the schema if not 
explicitly set. For more information about
+  fileset settings, please refer to [Fileset properties](#fileset-properties).
 
-The default value of `authentication.impersonation-enable` is false, and the 
default value for catalogs about this configuration is false, for
-schemas and filesets, the default value is inherited from the parent. Value 
set by the user will override the parent value, and the priority mechanism is 
the same as authentication.
+The default value of `authentication.impersonation-enable` is false, and the 
default value for catalogs about this
+configuration is false, for
+schemas and filesets, the default value is inherited from the parent. Value 
set by the user will override the parent
+value, and the priority mechanism is the same as authentication.
 
 ### Catalog operations
 
@@ -113,6 +139,9 @@ The Fileset catalog supports creating, updating, deleting, 
and listing schema.
 
 ### Schema properties
 
+All the catalog properties are inherited by the schema. Besides, the Fileset 
catalog schema has the following
+properties:
+
 | Property name                         | Description                          
                                                                                
     | Default value             | Required | Since Version    |
 
|---------------------------------------|---------------------------------------------------------------------------------------------------------------------------|---------------------------|----------|------------------|
 | `location`                            | The storage location managed by 
schema. Its location name is `unknown`. It's also should be a directory or path 
prefix.   | (none)                    | No       | 0.5.0            |
@@ -122,14 +151,17 @@ The Fileset catalog supports creating, updating, 
deleting, and listing schema.
 | `authentication.kerberos.principal`   | The principal of the Kerberos 
authentication for this schema.                                                 
            | The parent(catalog) value | No       | 0.6.0-incubating |
 | `authentication.kerberos.keytab-uri`  | The URI of The keytab for the 
Kerberos authentication for this schema.                                        
            | The parent(catalog) value | No       | 0.6.0-incubating |
 | `credential-providers`                | The credential provider types, 
separated by comma.                                                             
           | (none)                    | No       | 0.8.0-incubating |
+| `config.resources`                    | The configuration resources, 
separated by comma. For example, `hdfs-site.xml,core-site.xml`.                 
             | (none)                    | No       | 1.1.0            |
 
 ### Schema operations
 
 Refer to [Schema 
operation](./manage-fileset-metadata-using-gravitino.md#schema-operations) for 
more details.
 
 :::note
-During schema creation or deletion, Gravitino automatically creates or removes 
the corresponding filesystem directories for the schema locations. 
+During schema creation or deletion, Gravitino automatically creates or removes 
the corresponding filesystem directories
+for the schema locations.
 This behavior is skipped in either of these cases:
+
 1. When the catalog property `disable-filesystem-ops` is set to `true`
 2. When the location contains 
[placeholders](./manage-fileset-metadata-using-gravitino.md#placeholder)
 :::
@@ -142,25 +174,31 @@ This behavior is skipped in either of these cases:
 
 ### Fileset properties
 
-| Property name                         | Description                          
                                                                                
| Default value                                                                 
                                 | Required                                   | 
Immutable | Since Version    |
-|---------------------------------------|----------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------|--------------------------------------------|-----------|------------------|
-| `authentication.impersonation-enable` | Whether to enable impersonation for 
the Fileset catalog fileset.                                                    
 | The parent(schema) value                                                     
                                  | No                                         
| Yes       | 0.6.0-incubating |
-| `authentication.type`                 | The type of authentication for 
Fileset catalog fileset, currently we only support `kerberos`, `simple`.        
      | The parent(schema) value                                                
                                       | No                                     
    | No        | 0.6.0-incubating |
-| `authentication.kerberos.principal`   | The principal of the Kerberos 
authentication for the fileset.                                                 
       | The parent(schema) value                                               
                                        | No                                    
     | No        | 0.6.0-incubating |
-| `authentication.kerberos.keytab-uri`  | The URI of The keytab for the 
Kerberos authentication for the fileset.                                        
       | The parent(schema) value                                               
                                        | No                                    
     | No        | 0.6.0-incubating |
-| `credential-providers`                | The credential provider types, 
separated by comma.                                                             
      | (none)                                                                  
                                       | No                                     
    | No        | 0.8.0-incubating |
-| `placeholder-`                        | Properties that start with 
`placeholder-` are used to replace placeholders in the location.                
          | (none)                                                              
                                           | No                                 
        | Yes       | 0.9.0-incubating |
-| `default-location-name`               | The name of the default location of 
the fileset, mainly used for GVFS operations without specifying a location 
name. | When the fileset has only one location, its location name will be 
automatically selected as the default value. | Yes, if the fileset has multiple 
locations | Yes       | 0.9.0-incubating |
+All the schema properties are inherited by the fileset. include the properties 
inherited from the catalog.
+Besides, the Fileset catalog fileset has the following properties:
+
+| Property name                         | Description                          
                                                                                
   | Default value                                                              
                                    | Required                                  
 | Immutable | Since Version    |
+|---------------------------------------|-------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------|--------------------------------------------|-----------|------------------|
+| `location`                            | The storage location managed by 
schema. Its location name is `unknown`. It's also should be a directory or path 
prefix. | (none)                                                                
                                         | No                                   
      | 0.5.0     |
+| `authentication.impersonation-enable` | Whether to enable impersonation for 
the Fileset catalog fileset.                                                    
    | The parent(schema) value                                                  
                                     | No                                       
  | Yes       | 0.6.0-incubating |
+| `authentication.type`                 | The type of authentication for 
Fileset catalog fileset, currently we only support `kerberos`, `simple`.        
         | The parent(schema) value                                             
                                          | No                                  
       | No        | 0.6.0-incubating |
+| `authentication.kerberos.principal`   | The principal of the Kerberos 
authentication for the fileset.                                                 
          | The parent(schema) value                                            
                                           | No                                 
        | No        | 0.6.0-incubating |
+| `authentication.kerberos.keytab-uri`  | The URI of The keytab for the 
Kerberos authentication for the fileset.                                        
          | The parent(schema) value                                            
                                           | No                                 
        | No        | 0.6.0-incubating |
+| `credential-providers`                | The credential provider types, 
separated by comma.                                                             
         | (none)                                                               
                                          | No                                  
       | No        | 0.8.0-incubating |
+| `placeholder-`                        | Properties that start with 
`placeholder-` are used to replace placeholders in the location.                
             | (none)                                                           
                                              | No                              
           | Yes       | 0.9.0-incubating |
+| `default-location-name`               | The name of the default location of 
the fileset, mainly used for GVFS operations without specifying a location 
name.    | When the fileset has only one location, its location name will be 
automatically selected as the default value. | Yes, if the fileset has multiple 
locations | Yes       | 0.9.0-incubating |
+| `config.resources`                    | The configuration resources, 
separated by comma. For example, `hdfs-site.xml,core-site.xml`.                 
           | (none)                                                             
                                            | No                                
         | NO        | 1.1.0            |
 
 Some properties are reserved and cannot be set by users:
 
-| Property name         | Description                                          
                                                    | Default value             
  | Since Version    |
-|-----------------------|----------------------------------------------------------------------------------------------------------|-----------------------------|------------------|
-| `placeholder-catalog` | The placeholder for the catalog name.                
                                                    | catalog name of the 
fileset | 0.9.0-incubating |
-| `placeholder-schema`  | The placeholder for the schema name.                 
                                                    | schema name of the 
fileset  | 0.9.0-incubating |
-| `placeholder-fileset` | The placeholder for the fileset name.                
                                                    | fileset name              
  | 0.9.0-incubating |
+| Property name         | Description                           | Default 
value               | Since Version    |
+|-----------------------|---------------------------------------|-----------------------------|------------------|
+| `placeholder-catalog` | The placeholder for the catalog name. | catalog name 
of the fileset | 0.9.0-incubating |
+| `placeholder-schema`  | The placeholder for the schema name.  | schema name 
of the fileset  | 0.9.0-incubating |
+| `placeholder-fileset` | The placeholder for the fileset name. | fileset name 
               | 0.9.0-incubating |
 
-Credential providers can be specified in several places, as listed below. 
Gravitino checks the `credential-providers` setting in the following order of 
precedence:
+Credential providers can be specified in several places, as listed below. 
Gravitino checks the `credential-providers`
+setting in the following order of precedence:
 
 1. Fileset properties
 2. Schema properties
diff --git a/docs/how-to-use-gvfs.md b/docs/how-to-use-gvfs.md
index 478d1ce6dc..dc67abf58f 100644
--- a/docs/how-to-use-gvfs.md
+++ b/docs/how-to-use-gvfs.md
@@ -71,11 +71,44 @@ the path mapping and convert automatically.
 | `fs.gravitino.client.request.header.`                 | The configuration 
key prefix for the Gravitino client request header. You can set the request 
header for the Gravitino client.                                                
                                                                                
                                                       | (none)                 
                                        | No                                  | 
0.9.0-incubating |
 | `fs.gravitino.enableCredentialVending`                | Whether to enable 
credential vending for the Gravitino Virtual File System.                       
                                                                                
                                                                                
                                                   | `false`                    
                                    | No                                  | 
0.9.0-incubating |
 | `fs.gravitino.client.`                                | The configuration 
key prefix for the Gravitino client config.                                     
                                                                                
                                                                                
                                                   | (none)                     
                                    | No                                  | 
1.0.0            |
-| `fs.gravitino.filesetMetadataCache.enable`            | Whether to cache the 
fileset or fileset catalog metadata in the Gravitino Virtual File System. Note 
that this cache causes a side effect: if you modify the fileset or fileset 
catalog metadata, the client can not see the latest changes.                    
                                                      | `false`                 
                                       | No                                  | 
1.0.0            |
-| `fs.gravitino.autoCreateLocation`                     | The configuration 
key for whether to enable auto-creation of fileset location when the 
server-side filesystem ops are disabled and the location does not exist.        
                                                                                
                                                              | `true`          
                                               | No                             
     | 1.1.0           |
+| `fs.gravitino.filesetMetadataCache.enable`            | Whether to cache the 
fileset, fileset schema or fileset catalog metadata in the Gravitino Virtual 
File System. Note that this cache causes a side effect: if you modify the 
fileset or fileset catalog metadata, the client can not see the latest changes. 
                                                         | `false`              
                                          | No                                  
| 1.0.0            |
+| `fs.gravitino.autoCreateLocation`                     | The configuration 
key for whether to enable auto-creation of fileset location when the 
server-side filesystem ops are disabled and the location does not exist.        
                                                                                
                                                              | `true`          
                                               | No                             
     | 1.1.0            |
+| `fs.path.config.<name>`                               | Defines a logical 
location entry. Set `fs.path.config.<name>` to the real base URI (for example, 
`hdfs://cluster1/`). Any key that starts with the same prefix (such as 
`fs.path.config.<name>.config.resource`) is treated as a location-scoped 
property and will be forwarded to the underlying filesystem client. | (none)    
                                                     | No                       
           | 1.1.0            |
 
 To configure the Gravitino client, use properties prefixed with 
`fs.gravitino.client.`. These properties will be passed to the Gravitino client 
after removing the `fs.` prefix.
 
+:::note
+When users work with a multi-cluster fileset catalog, they can configure 
separate sets of properties for the base paths
+of the different clusters. [Manage fileset with multiple 
clusters](./manage-fileset-metadata-using-gravitino.md#manage-fileset-with-multiple-clusters)
+
+For example, a complex catalog structure might look like this:
+
+```text
+catalog1 -> hdfs://cluster1/catalog1
+    schema1 -> hdfs://cluster1/catalog1/schema1
+        fileset1 -> hdfs://cluster1/catalog1/schema1/fileset1
+        fileset2 -> hdfs://cluster1/catalog1/schema1/fileset2
+    schema2 -> hdfs://cluster2/tmp/schema2
+        fileset3 -> hdfs://cluster2/tmp/schema2/fsd
+        fileset4 -> hdfs://cluster3/customers
+```
+
+In this case, users can configure different client properties for each base 
path:
+
+```text
+fs.path.config.cluster1 = hdfs://cluster1/
+fs.path.config.cluster1.config.resource= /etc/core-site.xml,/etc/hdfs-site.xml
+
+fs.path.config.cluster2 = hdfs://cluster2/
+fs.path.config.cluster2.config.resource= 
/etc/fs2/core-site.xml,/etc/fs2/hdfs-site.xml
+
+fs.path.config.cluster3 = hdfs://cluster3/
+fs.path.config.cluster3.config.resource=/etc/fs3/core-site.xml,/etc/fs3/hdfs-site.xml
+```
+
+The plain `fs.path.config.<name>` entry specifies the base path of the 
filesystem. Any additional key under the same prefix 
(`fs.path.config.<name>.<config_key>`) is treated as a location-scoped 
configuration (for example, `config.resource` for HDFS) and is forwarded 
directly to the underlying filesystem client.
+:::
+
 **Example:** Setting `fs.gravitino.client.socketTimeoutMs` is equivalent to 
setting `gravitino.client.socketTimeoutMs` for the Gravitino client.
 
 **Note:** Invalid configuration properties will result in exceptions. Please 
see [Gravitino Java client 
configurations](./how-to-use-gravitino-client.md#gravitino-java-client-configuration)
 for more support client configuration.
diff --git a/docs/manage-fileset-metadata-using-gravitino.md 
b/docs/manage-fileset-metadata-using-gravitino.md
index 0b666eb38f..cb86df26dc 100644
--- a/docs/manage-fileset-metadata-using-gravitino.md
+++ b/docs/manage-fileset-metadata-using-gravitino.md
@@ -802,3 +802,122 @@ fileset_list: List[NameIdentifier] = 
catalog.as_fileset_catalog().list_filesets(
 
 </TabItem>
 </Tabs>
+
+
+## Manage fileset with multiple clusters
+
+
+In general, the locations of all schemas and filesets under a fileset
+catalog belong to a single Hadoop cluster if they are HDFS location.
+
+The example for creating a fileset is as follows:
+```text
+# create fileset catalog
+curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \
+-H "Content-Type: application/json" -d '{
+  "name": "fileset_catalog",
+  "type": "FILESET",
+  "comment": "This is a fileset catalog",
+  "provider": "fileset",
+  "properties": {
+    "location": "hdfs://172.17.0.2:9000/fileset_catalog"
+  }
+}' http://localhost:8090/api/metalakes/test/catalogs
+
+# create a fileset schema under the catalog with inherited properties
+curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \
+-H "Content-Type: application/json" -d '{
+  "name": "test_schema",
+  "comment": "This is a schema",
+  "properties": {
+  }
+}' http://localhost:8090/api/metalakes/test/catalogs/fileset_catalog/schemas
+
+# create a fileset under the schema with inherited properties
+curl -X POST -H "Accept: application/vnd.gravitino.v1+json"
+-H "Content-Type: application/json" -d '{
+  "name": "fs1",
+  "comment": "This is an example fileset",
+  "type": "MANAGED",
+  "properties": {
+  }
+}' 
http://localhost:8090/api/metalakes/test/catalogs/fileset_catalog/schemas/test_schema/filesets
+```
+
+Within a fileset catalog, schemas and filesets can automatically inherit 
configuration properties
+from their parent catalog. For example, the location property can be inherited 
— a schema can inherit
+the catalog’s location as its base path, and a fileset can in turn inherit the 
schema’s location as its base path.
+
+The property inheritance priority is as follows: catalog < schema < fileset.
+
+If a fileset needs to use a different storage path, it can specify its own 
location configuration to
+override the inherited one.
+
+The fileset catalog also supports multiple clusters. Each schema and fileset 
under a catalog can independently
+specify their own cluster locations and connection configurations.
+
+For example, a complex catalog structure might look like this:
+
+```text
+catalog1 -> hdfs://cluster1/catalog1
+    schema1 -> hdfs://cluster1/catalog1/schema1
+        fileset1 -> hdfs://cluster1/catalog1/schema1/fileset1
+        fileset2 -> hdfs://cluster1/catalog1/schema1/fileset2
+    schema2 -> hdfs://cluster2/tmp/schema2
+        fileset3 -> hdfs://cluster2/tmp/schema2/fsd
+        fileset4 -> hdfs://cluster3/customers
+```
+
+In this example, the default location of catalog1 is hdfs://cluster1/catalog1.
+schema1 and its filesets are stored in the same cluster as defined by the 
catalog (cluster1).
+However, schema2 and its filesets (fileset3, fileset4) are located in 
different clusters (cluster2 and cluster3, respectively).
+
+
+The example for creating Filesets with different clusters as follows:
+
+```text
+# create fileset catalog
+curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \
+-H "Content-Type: application/json" -d '{
+  "name": "fileset_catalog",
+  "type": "FILESET",
+  "comment": "This is a fileset catalog",
+  "provider": "fileset",
+  "properties": {
+    "location": "hdfs://172.17.0.2:9000/fileset_catalog"
+  }
+}' http://localhost:8090/api/metalakes/test/catalogs
+
+# create a fileset schema under the catalog with inherited properties
+curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \
+-H "Content-Type: application/json" -d '{
+  "name": "test_schema",
+  "comment": "This is a schema",
+  "properties": {
+  }
+}' http://localhost:8090/api/metalakes/test/catalogs/fileset_catalog/schemas
+
+
+# create fileset fs1 in the default HDFS cluster
+curl -X POST -H "Accept: application/vnd.gravitino.v1+json"
+-H "Content-Type: application/json" -d '{
+  "name": "fs1",
+  "comment": "This is an example fileset",
+  "type": "MANAGED",
+  "properties": {
+  }
+}' 
http://localhost:8090/api/metalakes/test/catalogs/fileset_catalog/schemas/test_schema/filesets
+
+# create fileset fs2 in another HDFS cluster
+curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \
+-H "Content-Type: application/json" -d '{
+  "name": "fs2",
+  "comment": "This is an example fileset",
+  "type": "MANAGED",
+  "storageLocation": "hdfs://172.17.0.3:9000/tmp/fs2",
+  "properties": {
+    "config.resources": 
"/etc/conf/cluster1/core-site.xml,/etc/conf/cluster1/hdfs-site.xml"
+  }
+}' 
http://localhost:8090/api/metalakes/test/catalogs/fileset_catalog/schemas/test_schema/filesets
+```
+
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRestKerberosHiveCatalogIT.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRestKerberosHiveCatalogIT.java
index 7ab4be49a7..90d6907d67 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRestKerberosHiveCatalogIT.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRestKerberosHiveCatalogIT.java
@@ -168,8 +168,8 @@ public class IcebergRestKerberosHiveCatalogIT extends 
IcebergRESTHiveCatalogIT {
         classRef = Class.forName("sun.security.krb5.Config");
       }
 
-      Method refershMethod = classRef.getMethod("refresh");
-      refershMethod.invoke(null);
+      Method refreshMethod = classRef.getMethod("refresh");
+      refreshMethod.invoke(null);
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
diff --git 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/HiveContainer.java
 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/HiveContainer.java
index d152432c1d..874116d31e 100644
--- 
a/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/HiveContainer.java
+++ 
b/integration-test-common/src/test/java/org/apache/gravitino/integration/test/container/HiveContainer.java
@@ -153,9 +153,11 @@ public class HiveContainer extends BaseContainer {
             });
 
     final String showDatabaseSQL = "show databases";
+    // Starting both the regular Hive cluster and the Kerberos-enabled Hive 
cluster at the same time
+    // takes a long time.
     await()
-        .atMost(30, TimeUnit.SECONDS)
-        .pollInterval(30 / retryLimit, TimeUnit.SECONDS)
+        .atMost(120, TimeUnit.SECONDS)
+        .pollInterval(120 / retryLimit, TimeUnit.SECONDS)
         .until(
             () -> {
               try {
@@ -172,8 +174,8 @@ public class HiveContainer extends BaseContainer {
         "CREATE TABLE IF NOT EXISTS default.employee ( eid int, name String, "
             + "salary String, destination String) ";
     await()
-        .atMost(30, TimeUnit.SECONDS)
-        .pollInterval(30 / retryLimit, TimeUnit.SECONDS)
+        .atMost(120, TimeUnit.SECONDS)
+        .pollInterval(120 / retryLimit, TimeUnit.SECONDS)
         .until(
             () -> {
               try {

Reply via email to