This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git


The following commit(s) were added to refs/heads/main by this push:
     new 4d788bf912 [#10570] feat(iceberg): Support REST catalog backend 
(#10586)
4d788bf912 is described below

commit 4d788bf912f9a5ad3a549d9ba4b2ee9276b60dbb
Author: roryqi <[email protected]>
AuthorDate: Tue Apr 7 16:09:38 2026 +0800

    [#10570] feat(iceberg): Support REST catalog backend (#10586)
    
    ### What changes were proposed in this pull request?
    
    Support REST catalog backend.
    1. Use the user access token to access remote REST catalog
    2. When credential vending is enabled, use remote REST catalog
    vended-credentials instead of generating a credential
    3. Fetch remote catalog config and retrieve the properties required by
    the client
    
    ### Why are the changes needed?
    
    Fix: #10570
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added UTs.
    
    I deploy two gravitino servers.
    One is using aws, another is using gcp.
    For every Gravitino server, I added a local postgres jdbc backend
    catalog and a remote catalog using rest.
    
    I run spark-sql to test using credential vending and OAuth
    authentication
    
    ```
    use aws_remote;
    create database sales;
    use sales;
    create table customers (customer_id int, customer_name varchar(100), 
customer_email varchar(100));
    describe extended customers;
    insert into customers (customer_id, customer_name, customer_email) values 
(11,'Rory Brown','[email protected]');
    insert into customers (customer_id, customer_name, customer_email) values 
(12,'Jerry Washington','[email protected]');
    ```
---
 .../lakehouse/iceberg/IcebergConstants.java        |   4 +
 .../lakehouse/iceberg/IcebergPropertiesUtils.java  |   1 +
 .../iceberg/IcebergCatalogPropertiesMetadata.java  |   9 +-
 .../credential/CredentialPropertyUtils.java        |  30 ++-
 .../java/org/apache/gravitino/UserPrincipal.java   |  49 ++++-
 .../org/apache/gravitino/utils/PrincipalUtils.java |  13 +-
 docs/iceberg-rest-service.md                       |  28 +--
 docs/lakehouse-iceberg-catalog.md                  |  12 +-
 .../auth/UserPrincipalForwardingAuthManager.java   |  87 +++++++++
 .../iceberg/common/utils/IcebergCatalogUtil.java   |   9 +-
 .../TestUserPrincipalForwardingAuthManager.java    | 105 +++++++++++
 .../iceberg/service/CatalogWrapperForREST.java     | 208 ++++++++++++++++++++-
 .../iceberg/service/IcebergExceptionMapper.java    |   2 +
 .../iceberg/service/TestCatalogWrapperForREST.java |  79 ++++++++
 .../service/TestIcebergExceptionMapper.java        |   2 +
 .../iceberg/service/rest/TestIcebergConfig.java    |   4 +-
 .../authentication/KerberosAuthenticator.java      |  11 +-
 .../authentication/OAuth2TokenAuthenticator.java   |   9 +-
 .../server/authentication/SimpleAuthenticator.java |   7 +-
 .../authentication/TestKerberosAuthenticator.java  |   9 +-
 .../TestOAuth2TokenAuthenticator.java              |  16 +-
 .../authentication/TestSimpleAuthenticator.java    |  35 +++-
 22 files changed, 681 insertions(+), 48 deletions(-)

diff --git 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
index d40f3dd8dd..c8418d7dc7 100644
--- 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
+++ 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergConstants.java
@@ -31,6 +31,10 @@ public class IcebergConstants {
   public static final String ICEBERG_JDBC_PASSWORD = "jdbc.password";
   public static final String ICEBERG_JDBC_INITIALIZE = "jdbc-initialize";
 
+  public static final String DATA_ACCESS = "data-access";
+
+  public static final String ICEBERG_ACCESS_DELEGATION = 
"header.X-Iceberg-Access-Delegation";
+
   public static final String GRAVITINO_JDBC_SCHEMA_VERSION = 
"jdbc-schema-version";
   public static final String ICEBERG_JDBC_SCHEMA_VERSION = 
"jdbc.schema-version";
 
diff --git 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java
 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java
index 854b5f8b6c..e333787eb2 100644
--- 
a/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java
+++ 
b/catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java
@@ -78,6 +78,7 @@ public class IcebergPropertiesUtils {
     map.put(
         IcebergConstants.TABLE_METADATA_CACHE_EXPIRE_MINUTES,
         IcebergConstants.TABLE_METADATA_CACHE_EXPIRE_MINUTES);
+    map.put(IcebergConstants.DATA_ACCESS, 
IcebergConstants.ICEBERG_ACCESS_DELEGATION);
 
     GRAVITINO_CONFIG_TO_ICEBERG = Collections.unmodifiableMap(map);
 
diff --git 
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java
 
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java
index 82b063f242..7f4735dbf3 100644
--- 
a/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java
+++ 
b/catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java
@@ -140,7 +140,14 @@ public class IcebergCatalogPropertiesMetadata extends 
BaseCatalogPropertiesMetad
                 "Table metadata cache TTL in minutes",
                 false /* immutable */,
                 60 /* defaultValue */,
-                false /* hidden */));
+                false /* hidden */),
+            stringOptionalPropertyEntry(
+                IcebergConstants.DATA_ACCESS,
+                "Iceberg REST data access mode. Supported values are 
vended-credentials and"
+                    + " remote-signing.",
+                false,
+                null,
+                false));
     HashMap<String, PropertyEntry<?>> result = Maps.newHashMap();
     result.putAll(Maps.uniqueIndex(propertyEntries, PropertyEntry::getName));
     result.putAll(KerberosConfig.KERBEROS_PROPERTY_ENTRIES);
diff --git 
a/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java
 
b/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java
index 866de92420..b66b650178 100644
--- 
a/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java
+++ 
b/common/src/main/java/org/apache/gravitino/credential/CredentialPropertyUtils.java
@@ -21,8 +21,11 @@ package org.apache.gravitino.credential;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Helper class to generate specific credential properties for different table 
format and engine.
@@ -46,6 +49,8 @@ public class CredentialPropertyUtils {
   @VisibleForTesting
   static final String ICEBERG_ADLS_ACCOUNT_KEY = 
"adls.auth.shared-key.account.key";
 
+  private static final String GCS_OAUTH_2_TOKEN_EXPIRES_AT = 
"gcs.oauth2.token-expires-at";
+
   private static Map<String, String> icebergCredentialPropertyMap =
       ImmutableMap.<String, String>builder()
           .put(GCSTokenCredential.GCS_TOKEN_NAME, ICEBERG_GCS_TOKEN)
@@ -90,7 +95,7 @@ public class CredentialPropertyUtils {
       Map<String, String> icebergGCSCredentialProperties =
           transformProperties(credential.credentialInfo(), 
icebergCredentialPropertyMap);
       icebergGCSCredentialProperties.put(
-          "gcs.oauth2.token-expires-at", 
String.valueOf(credential.expireTimeInMs()));
+          GCS_OAUTH_2_TOKEN_EXPIRES_AT, 
String.valueOf(credential.expireTimeInMs()));
       return icebergGCSCredentialProperties;
     }
 
@@ -109,6 +114,29 @@ public class CredentialPropertyUtils {
     return credential.toProperties();
   }
 
+  /**
+   * Filters a property map down to only Iceberg credential-related keys.
+   *
+   * <p>This is used when an Iceberg table load response contains many table 
and catalog properties,
+   * but the caller only needs the temporary credential properties that should 
be forwarded to the
+   * Iceberg client.
+   *
+   * @param properties the source properties to filter
+   * @return a map containing only credential properties recognized by Iceberg
+   */
+  public static Map<String, String> filterCredentialProperties(Map<String, 
String> properties) {
+    Set<String> credentialPropertyKeys = 
Sets.newHashSet(icebergCredentialPropertyMap.values());
+    credentialPropertyKeys.add(GCS_OAUTH_2_TOKEN_EXPIRES_AT);
+    Map<String, String> filteredProperties = Maps.newHashMap(properties);
+    filteredProperties
+        .entrySet()
+        .removeIf(
+            entry ->
+                !credentialPropertyKeys.contains(entry.getKey())
+                    && !entry.getKey().startsWith(ICEBERG_ADLS_TOKEN));
+    return filteredProperties;
+  }
+
   private static Map<String, String> transformProperties(
       Map<String, String> originProperties, Map<String, String> transformMap) {
     HashMap<String, String> properties = new HashMap();
diff --git a/core/src/main/java/org/apache/gravitino/UserPrincipal.java 
b/core/src/main/java/org/apache/gravitino/UserPrincipal.java
index 296dedaa56..e3980720eb 100644
--- a/core/src/main/java/org/apache/gravitino/UserPrincipal.java
+++ b/core/src/main/java/org/apache/gravitino/UserPrincipal.java
@@ -25,12 +25,19 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
+import javax.annotation.Nullable;
 
-/** A simple implementation of Principal that holds a username and group 
membership. */
+/**
+ * A simple implementation of Principal that holds a username, optional group 
membership, and
+ * optionally the raw {@code Authorization} header value for forwarding to 
downstream Iceberg REST
+ * catalogs.
+ */
 public class UserPrincipal implements Principal {
 
   private final String username;
   private final List<UserGroup> groups;
+  @Nullable private final String accessToken;
 
   /**
    * Constructs a UserPrincipal with the given username.
@@ -38,7 +45,18 @@ public class UserPrincipal implements Principal {
    * @param username the username of the principal
    */
   public UserPrincipal(final String username) {
-    this(username, Collections.emptyList());
+    this(username, Collections.emptyList(), null);
+  }
+
+  /**
+   * Constructs a UserPrincipal with the given username and optional raw 
{@code Authorization}
+   * header value.
+   *
+   * @param accessToken authorization header value (for example, {@code Bearer 
<jwt>} or {@code
+   *     Basic <base64>})
+   */
+  public UserPrincipal(final String username, @Nullable final String 
accessToken) {
+    this(username, Collections.emptyList(), accessToken);
   }
 
   /**
@@ -48,12 +66,27 @@ public class UserPrincipal implements Principal {
    * @param groups the groups of the principal
    */
   public UserPrincipal(final String username, final List<UserGroup> groups) {
+    this(username, groups, null);
+  }
+
+  /**
+   * Constructs a UserPrincipal with the given username, groups and optional 
raw {@code
+   * Authorization} header value.
+   *
+   * @param username the username of the principal
+   * @param groups the groups of the principal
+   * @param accessToken authorization header value (for example, {@code Bearer 
<jwt>} or {@code
+   *     Basic <base64>})
+   */
+  public UserPrincipal(
+      final String username, final List<UserGroup> groups, @Nullable final 
String accessToken) {
     Preconditions.checkArgument(username != null, "UserPrincipal must have the 
username");
     this.username = username;
     this.groups =
         groups != null
             ? Collections.unmodifiableList(new ArrayList<>(groups))
             : Collections.emptyList();
+    this.accessToken = accessToken;
   }
 
   /**
@@ -66,6 +99,11 @@ public class UserPrincipal implements Principal {
     return username;
   }
 
+  /** Returns the raw {@code Authorization} header value when the 
authenticator recorded it. */
+  public Optional<String> getAccessToken() {
+    return Optional.ofNullable(accessToken);
+  }
+
   /**
    * Returns the groups of this principal.
    *
@@ -94,6 +132,11 @@ public class UserPrincipal implements Principal {
 
   @Override
   public String toString() {
-    return "[principal: " + this.username + ", groups: " + this.groups + "]";
+    return "[principal: "
+        + this.username
+        + ", groups: "
+        + this.groups
+        + (accessToken != null ? ", token=***" : "")
+        + "]";
   }
 }
diff --git a/core/src/main/java/org/apache/gravitino/utils/PrincipalUtils.java 
b/core/src/main/java/org/apache/gravitino/utils/PrincipalUtils.java
index 098ba50486..02e2a2670c 100644
--- a/core/src/main/java/org/apache/gravitino/utils/PrincipalUtils.java
+++ b/core/src/main/java/org/apache/gravitino/utils/PrincipalUtils.java
@@ -20,9 +20,11 @@
 package org.apache.gravitino.utils;
 
 import com.google.common.base.Throwables;
+import java.nio.charset.StandardCharsets;
 import java.security.Principal;
 import java.security.PrivilegedActionException;
 import java.security.PrivilegedExceptionAction;
+import java.util.Base64;
 import javax.security.auth.Subject;
 import org.apache.gravitino.UserPrincipal;
 import org.apache.gravitino.auth.AuthConstants;
@@ -31,11 +33,18 @@ import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("removal")
 public class PrincipalUtils {
-
   private static final Logger LOG = 
LoggerFactory.getLogger(PrincipalUtils.class);
 
   private PrincipalUtils() {}
 
+  public static final Principal ANONYMOUS_PRINCIPAL =
+      new UserPrincipal(
+          AuthConstants.ANONYMOUS_USER,
+          AuthConstants.AUTHORIZATION_BASIC_HEADER
+              + " "
+              + Base64.getEncoder()
+                  
.encodeToString(AuthConstants.ANONYMOUS_USER.getBytes(StandardCharsets.UTF_8)));
+
   public static <T> T doAs(Principal principal, PrivilegedExceptionAction<T> 
action)
       throws Exception {
     try {
@@ -57,7 +66,7 @@ public class PrincipalUtils {
     java.security.AccessControlContext context = 
java.security.AccessController.getContext();
     Subject subject = Subject.getSubject(context);
     if (subject == null || 
subject.getPrincipals(UserPrincipal.class).isEmpty()) {
-      return new UserPrincipal(AuthConstants.ANONYMOUS_USER);
+      return ANONYMOUS_PRINCIPAL;
     }
 
     return subject.getPrincipals(UserPrincipal.class).iterator().next();
diff --git a/docs/iceberg-rest-service.md b/docs/iceberg-rest-service.md
index aa5790aeff..59ac6529c4 100644
--- a/docs/iceberg-rest-service.md
+++ b/docs/iceberg-rest-service.md
@@ -107,17 +107,17 @@ The Gravitino Iceberg REST catalog service uses the 
memory catalog backend by de
 
 #### JDBC backend configuration
 
-| Configuration item                            | Description                  
                                                                                
                        | Default value            | Required | Since Version |
-|-----------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------|--------------------------|----------|---------------|
-| `gravitino.iceberg-rest.catalog-backend`      | The Catalog backend of the 
Gravitino Iceberg REST catalog service. Use the value **`jdbc`** for the JDBC 
catalog backend.            | `memory`                 | Yes      | 0.2.0       
  |
-| `gravitino.iceberg-rest.uri`                  | The JDBC connection address, 
such as `jdbc:postgresql://127.0.0.1:5432` for Postgres, or 
`jdbc:mysql://127.0.0.1:3306/` for mysql.   | (none)                   | Yes    
  | 0.2.0         |
-| `gravitino.iceberg-rest.warehouse`            | The warehouse directory of 
JDBC catalog. Set the HDFS prefix if using HDFS, such as 
`hdfs://127.0.0.1:9000/user/hive/warehouse-jdbc` | (none)                   | 
Yes      | 0.2.0         |
-| `gravitino.iceberg-rest.catalog-backend-name` | The catalog name passed to 
underlying Iceberg catalog backend. Catalog name in JDBC backend is used to 
isolate namespace and tables. | `jdbc` for JDBC backend  | No       | 0.5.2     
    |
-| `gravitino.iceberg-rest.jdbc-user`            | The username of the JDBC 
connection.                                                                     
                            | (none)                   | No       | 0.2.0       
  |
-| `gravitino.iceberg-rest.jdbc-password`        | The password of the JDBC 
connection.                                                                     
                            | (none)                   | No       | 0.2.0       
  |
-| `gravitino.iceberg-rest.jdbc-initialize`      | Whether to initialize the 
meta tables when creating the JDBC catalog.                                     
                           | `true`                   | No       | 0.2.0        
 |
-| `gravitino.iceberg-rest.jdbc-driver`          | `com.mysql.jdbc.Driver` or 
`com.mysql.cj.jdbc.Driver` for MySQL, `org.postgresql.Driver` for PostgreSQL.   
                          | (none)                   | Yes      | 0.3.0         
|
-| `gravitino.iceberg-rest.jdbc-schema-version`  | The schema version of the 
JDBC catalog. Set to `V1` to enable view support. Once the underlying database 
is migrated to V1, this property is no longer required on subsequent restarts. 
| `V0`          | No       | 1.2.0         |
+| Configuration item                            | Description                  
                                                                                
                                                                             | 
Default value           | Required | Since Version |
+|-----------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------|----------|---------------|
+| `gravitino.iceberg-rest.catalog-backend`      | The Catalog backend of the 
Gravitino Iceberg REST catalog service. Use the value **`jdbc`** for the JDBC 
catalog backend.                                                                
 | `memory`                | Yes      | 0.2.0         |
+| `gravitino.iceberg-rest.uri`                  | The JDBC connection address, 
such as `jdbc:postgresql://127.0.0.1:5432` for Postgres, or 
`jdbc:mysql://127.0.0.1:3306/` for mysql.                                       
                 | (none)                  | Yes      | 0.2.0         |
+| `gravitino.iceberg-rest.warehouse`            | The warehouse directory of 
JDBC catalog. Set the HDFS prefix if using HDFS, such as 
`hdfs://127.0.0.1:9000/user/hive/warehouse-jdbc`                                
                      | (none)                  | Yes      | 0.2.0         |
+| `gravitino.iceberg-rest.catalog-backend-name` | The catalog name passed to 
underlying Iceberg catalog backend. Catalog name in JDBC backend is used to 
isolate namespace and tables.                                                   
   | `jdbc` for JDBC backend | No       | 0.5.2         |
+| `gravitino.iceberg-rest.jdbc-user`            | The username of the JDBC 
connection.                                                                     
                                                                                
 | (none)                  | No       | 0.2.0         |
+| `gravitino.iceberg-rest.jdbc-password`        | The password of the JDBC 
connection.                                                                     
                                                                                
 | (none)                  | No       | 0.2.0         |
+| `gravitino.iceberg-rest.jdbc-initialize`      | Whether to initialize the 
meta tables when creating the JDBC catalog.                                     
                                                                                
| `true`                  | No       | 0.2.0         |
+| `gravitino.iceberg-rest.jdbc-driver`          | `com.mysql.jdbc.Driver` or 
`com.mysql.cj.jdbc.Driver` for MySQL, `org.postgresql.Driver` for PostgreSQL.   
                                                                               
| (none)                  | Yes      | 0.3.0         |
+| `gravitino.iceberg-rest.jdbc-schema-version`  | The schema version of the 
JDBC catalog. Set to `V1` to enable view support. Once the underlying database 
is migrated to V1, this property is no longer required on subsequent restarts.  
 | `V0`                    | No       | 1.2.0         |
 
 If you have a JDBC Iceberg catalog prior, you must set `catalog-backend-name` 
to keep consistent with your Jdbc Iceberg catalog name to operate the prior 
namespace and tables.
 
@@ -135,6 +135,7 @@ Use the REST backend to proxy another Iceberg REST catalog 
server (IRC2). The Gr
 | `gravitino.iceberg-rest.catalog-backend` | The Catalog backend of the 
Gravitino Iceberg REST catalog service. Use the value **`rest`** for the REST 
catalog backend.     | `memory`      | Yes      | 0.2.0         |
 | `gravitino.iceberg-rest.uri`             | The Iceberg REST catalog URI 
(IRC2), such as `http://127.0.0.1:9001/iceberg`.                                
                 | (none)        | Yes      | 0.2.0         |
 | `gravitino.iceberg-rest.warehouse`       | The catalog name in the Iceberg 
REST spec. Set to a specific catalog name, or leave empty to use the default 
catalog on IRC2. | (none)        | No       | 0.2.0         |
+| `gravitino.iceberg-rest.data-access`     | Data access mode exposed to 
Iceberg REST clients via `/v1/config`. Supported values: `vended-credentials`, 
`remote-signing`.  | (none)        | No       | 1.3.0         |
 
 IRC1 configuration example if IRC2 using HDFS storage:
 
@@ -157,6 +158,11 @@ gravitino.iceberg-rest.header.X-Iceberg-Access-Delegation 
= vended-credentials
 
 IRC1 must also configure S3 configurations if the client side requests 
credential vending.
 
+`data-access` is returned in `/v1/config` defaults for REST clients:
+
+- `vended-credentials`: clients should request credential vending 
(`X-Iceberg-Access-Delegation: vended-credentials`).
+- `remote-signing`: Gravitino doesn't support this mode yet.
+
 #### Custom backend configuration
 
 | Configuration item                            | Description                  
                                                                                
                 | Default value | Required | Since Version    |
diff --git a/docs/lakehouse-iceberg-catalog.md 
b/docs/lakehouse-iceberg-catalog.md
index b65a003467..dfd476d7bf 100644
--- a/docs/lakehouse-iceberg-catalog.md
+++ b/docs/lakehouse-iceberg-catalog.md
@@ -71,6 +71,15 @@ If you are using multiple JDBC catalog backends, setting 
`jdbc-initialize` to tr
 
 For the REST catalog backend, `warehouse` identifies the catalog in the 
Iceberg REST spec. In the Gravitino Iceberg REST server, `warehouse` maps to 
the catalog name. An empty value means the default catalog.
 
+`data-access` controls how the Iceberg REST client accesses table data when 
using a REST backend:
+
+| Property name  | Description                                                 
                                                            | Default value | 
Required | Since Version |
+|----------------|-------------------------------------------------------------------------------------------------------------------------|---------------|----------|---------------|
+| `data-access`  | Data access mode for REST catalog backend. Supported values 
are `vended-credentials` and `remote-signing`.              | (none)        | 
No       | 1.3.0         |
+
+- `vended-credentials`: request credential vending from the Iceberg REST 
server.
+- `remote-signing`: Gravitino doesn't support this mode yet.
+
 Example: create an Iceberg catalog with the REST backend. This targets the 
default catalog and uses a REST path like 
`http://127.0.0.1:9001/iceberg/v1/namespaces/db/tables/table`.
 
 ```shell
@@ -82,7 +91,8 @@ curl -X POST -H "Accept: application/vnd.gravitino.v1+json" \
   "provider": "lakehouse-iceberg",
   "properties": {
     "catalog-backend": "rest",
-    "uri": "http://localhost:9001/iceberg";
+    "uri": "http://localhost:9001/iceberg";,
+    "data-access": "vended-credentials"
   }
 }' http://localhost:8090/api/metalakes/metalake/catalogs
 ```
diff --git 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/rest/auth/UserPrincipalForwardingAuthManager.java
 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/rest/auth/UserPrincipalForwardingAuthManager.java
new file mode 100644
index 0000000000..56af0ad727
--- /dev/null
+++ 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/rest/auth/UserPrincipalForwardingAuthManager.java
@@ -0,0 +1,87 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.gravitino.iceberg.common.rest.auth;
+
+import java.security.Principal;
+import java.util.Map;
+import org.apache.gravitino.UserPrincipal;
+import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.iceberg.rest.HTTPHeaders;
+import org.apache.iceberg.rest.HTTPRequest;
+import org.apache.iceberg.rest.ImmutableHTTPRequest;
+import org.apache.iceberg.rest.RESTClient;
+import org.apache.iceberg.rest.auth.AuthManager;
+import org.apache.iceberg.rest.auth.AuthSession;
+
+/**
+ * Iceberg REST catalog {@link AuthManager} that adds an {@code Authorization} 
header on each
+ * outgoing request using the access token from the current {@link 
UserPrincipal}.
+ *
+ * <p>Enable by setting {@link 
org.apache.iceberg.rest.auth.AuthProperties#AUTH_TYPE} to this class
+ * name, or set catalog property {@code 
gravitino.iceberg-rest-catalog.forward-user-access-token} to
+ * {@code true} for REST catalog backend
+ */
+public class UserPrincipalForwardingAuthManager implements AuthManager {
+
+  /**
+   * @param name catalog name passed by Iceberg when loading this auth manager
+   */
+  @SuppressWarnings("unused")
+  public UserPrincipalForwardingAuthManager(String name) {}
+
+  @Override
+  public AuthSession catalogSession(RESTClient sharedClient, Map<String, 
String> properties) {
+    return new UserPrincipalAuthSession();
+  }
+
+  @Override
+  public void close() {
+    // no resources
+  }
+
+  private static final class UserPrincipalAuthSession implements AuthSession {
+    @Override
+    public HTTPRequest authenticate(HTTPRequest request) {
+      Principal principal = PrincipalUtils.getCurrentPrincipal();
+      if (!(principal instanceof UserPrincipal)) {
+        throw new IllegalStateException(
+            "Current principal must be a UserPrincipal to forward access token 
to Iceberg REST");
+      }
+      String authorizationHeaderValue =
+          ((UserPrincipal) principal)
+              .getAccessToken()
+              .orElseThrow(
+                  () ->
+                      new IllegalStateException("UserPrincipal has no 
authorization header value"));
+      HTTPHeaders newHeaders =
+          request
+              .headers()
+              .putIfAbsent(
+                  HTTPHeaders.of(
+                      Map.of(AuthConstants.HTTP_HEADER_AUTHORIZATION, 
authorizationHeaderValue)));
+      return newHeaders.equals(request.headers())
+          ? request
+          : 
ImmutableHTTPRequest.builder().from(request).headers(newHeaders).build();
+    }
+
+    @Override
+    public void close() {}
+  }
+}
diff --git 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/IcebergCatalogUtil.java
 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/IcebergCatalogUtil.java
index 38f2643353..470276bfd0 100644
--- 
a/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/IcebergCatalogUtil.java
+++ 
b/iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/IcebergCatalogUtil.java
@@ -22,6 +22,7 @@ import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Maps;
 import java.sql.SQLException;
 import java.util.Collections;
 import java.util.HashMap;
@@ -33,6 +34,7 @@ import 
org.apache.gravitino.exceptions.ConnectionFailedException;
 import org.apache.gravitino.iceberg.common.ClosableHiveCatalog;
 import org.apache.gravitino.iceberg.common.IcebergConfig;
 import org.apache.gravitino.iceberg.common.authentication.AuthenticationConfig;
+import 
org.apache.gravitino.iceberg.common.rest.auth.UserPrincipalForwardingAuthManager;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.iceberg.CatalogProperties;
 import org.apache.iceberg.CatalogUtil;
@@ -45,6 +47,7 @@ import 
org.apache.iceberg.jdbc.JdbcCatalogWithMetadataLocationSupport;
 import org.apache.iceberg.jdbc.UncheckedSQLException;
 import org.apache.iceberg.memory.MemoryCatalogWithMetadataLocationSupport;
 import org.apache.iceberg.rest.RESTCatalog;
+import org.apache.iceberg.rest.auth.AuthProperties;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -124,7 +127,11 @@ public class IcebergCatalogUtil {
     String icebergCatalogName = icebergConfig.getCatalogBackendName();
     RESTCatalog restCatalog = new RESTCatalog();
     HdfsConfiguration hdfsConfiguration = new HdfsConfiguration();
-    Map<String, String> properties = 
icebergConfig.getIcebergCatalogProperties();
+    Map<String, String> properties = 
Maps.newHashMap(icebergConfig.getIcebergCatalogProperties());
+
+    // REST catalog must use forward access token from the user request
+    properties.put(AuthProperties.AUTH_TYPE, 
UserPrincipalForwardingAuthManager.class.getName());
+
     properties.forEach(hdfsConfiguration::set);
     restCatalog.setConf(hdfsConfiguration);
     restCatalog.initialize(icebergCatalogName, properties);
diff --git 
a/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/rest/auth/TestUserPrincipalForwardingAuthManager.java
 
b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/rest/auth/TestUserPrincipalForwardingAuthManager.java
new file mode 100644
index 0000000000..a375e250fb
--- /dev/null
+++ 
b/iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/rest/auth/TestUserPrincipalForwardingAuthManager.java
@@ -0,0 +1,105 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.gravitino.iceberg.common.rest.auth;
+
+import java.net.URI;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.gravitino.UserPrincipal;
+import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.iceberg.rest.HTTPHeaders;
+import org.apache.iceberg.rest.HTTPRequest;
+import org.apache.iceberg.rest.ImmutableHTTPRequest;
+import org.apache.iceberg.rest.auth.AuthSession;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class TestUserPrincipalForwardingAuthManager {
+
+  @Test
+  public void testAuthenticateAddsAuthorizationHeaderFromUserPrincipal() 
throws Exception {
+    UserPrincipalForwardingAuthManager manager = new 
UserPrincipalForwardingAuthManager("test");
+    AuthSession session = manager.catalogSession(null, Collections.emptyMap());
+
+    HTTPRequest request =
+        ImmutableHTTPRequest.builder()
+            .baseUri(URI.create("http://remote-iceberg:9001";))
+            .method(HTTPRequest.HTTPMethod.GET)
+            .path("v1/config")
+            .build();
+
+    HTTPRequest authenticated =
+        PrincipalUtils.doAs(
+            new UserPrincipal("alice", 
AuthConstants.AUTHORIZATION_BEARER_HEADER + "jwt-token-abc"),
+            () -> session.authenticate(request));
+
+    Assertions.assertTrue(
+        authenticated.headers().contains("Authorization"),
+        "Expected Authorization header on outgoing Iceberg REST request");
+    String authValue = 
authenticated.headers().entries("Authorization").iterator().next().value();
+    Assertions.assertEquals(AuthConstants.AUTHORIZATION_BEARER_HEADER + 
"jwt-token-abc", authValue);
+    manager.close();
+  }
+
+  @Test
+  public void testAuthenticateWithoutAccessTokenThrows() throws Exception {
+    UserPrincipalForwardingAuthManager manager = new 
UserPrincipalForwardingAuthManager("test");
+    AuthSession session = manager.catalogSession(null, Map.of());
+
+    HTTPRequest request =
+        ImmutableHTTPRequest.builder()
+            .baseUri(URI.create("http://remote-iceberg:9001";))
+            .method(HTTPRequest.HTTPMethod.GET)
+            .path("v1/config")
+            .build();
+
+    Assertions.assertThrows(
+        IllegalStateException.class,
+        () -> PrincipalUtils.doAs(new UserPrincipal("bob"), () -> 
session.authenticate(request)));
+    manager.close();
+  }
+
+  @Test
+  public void testPutIfAbsentDoesNotOverrideExistingAuthorization() throws 
Exception {
+    UserPrincipalForwardingAuthManager manager = new 
UserPrincipalForwardingAuthManager("test");
+    AuthSession session = manager.catalogSession(null, Map.of());
+
+    Map<String, String> headerMap = new HashMap<>();
+    headerMap.put("Authorization", "Bearer existing");
+    HTTPHeaders existing = HTTPHeaders.of(headerMap);
+    HTTPRequest request =
+        ImmutableHTTPRequest.builder()
+            .baseUri(URI.create("http://remote-iceberg:9001";))
+            .method(HTTPRequest.HTTPMethod.GET)
+            .path("v1/config")
+            .headers(existing)
+            .build();
+
+    HTTPRequest authenticated =
+        PrincipalUtils.doAs(
+            new UserPrincipal("alice", 
AuthConstants.AUTHORIZATION_BEARER_HEADER + "jwt-token-abc"),
+            () -> session.authenticate(request));
+
+    String authValue = 
authenticated.headers().entries("Authorization").iterator().next().value();
+    Assertions.assertEquals("Bearer existing", authValue);
+    manager.close();
+  }
+}
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
index be123aed45..35a5acc994 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/CatalogWrapperForREST.java
@@ -22,12 +22,16 @@ package org.apache.gravitino.iceberg.service;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 import java.io.IOException;
 import java.net.URI;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -49,23 +53,34 @@ import org.apache.gravitino.storage.GCSProperties;
 import org.apache.gravitino.utils.ClassUtils;
 import org.apache.gravitino.utils.MapUtils;
 import org.apache.gravitino.utils.PrincipalUtils;
+import org.apache.iceberg.BaseMetadataTable;
+import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DeleteFile;
 import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.IncrementalAppendScan;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Scan;
 import org.apache.iceberg.ScanTaskParser;
+import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.TableScan;
+import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.exceptions.ServiceUnavailableException;
+import org.apache.iceberg.inmemory.InMemoryFileIO;
 import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.rest.CatalogHandlers;
 import org.apache.iceberg.rest.PlanStatus;
+import org.apache.iceberg.rest.RESTCatalog;
 import org.apache.iceberg.rest.requests.CreateTableRequest;
 import org.apache.iceberg.rest.requests.PlanTableScanRequest;
+import org.apache.iceberg.rest.requests.UpdateTableRequest;
 import org.apache.iceberg.rest.responses.ImmutableLoadCredentialsResponse;
 import org.apache.iceberg.rest.responses.LoadCredentialsResponse;
 import org.apache.iceberg.rest.responses.LoadTableResponse;
@@ -80,13 +95,17 @@ public class CatalogWrapperForREST extends 
IcebergCatalogWrapper {
 
   private final ScanPlanCache scanPlanCache;
 
+  private static final String DATA_ACCESS_VENDED_CREDENTIALS = 
"vended-credentials";
+  private static final String DATA_ACCESS_REMOTE_SIGNING = "remote-signing";
+
   private static final Set<String> catalogPropertiesToClientKeys =
       ImmutableSet.of(
           IcebergConstants.IO_IMPL,
           IcebergConstants.AWS_S3_REGION,
           IcebergConstants.ICEBERG_S3_ENDPOINT,
           IcebergConstants.ICEBERG_OSS_ENDPOINT,
-          IcebergConstants.ICEBERG_S3_PATH_STYLE_ACCESS);
+          IcebergConstants.ICEBERG_S3_PATH_STYLE_ACCESS,
+          IcebergConstants.ICEBERG_ACCESS_DELEGATION);
 
   @SuppressWarnings("deprecation")
   private static Map<String, String> deprecatedProperties =
@@ -98,10 +117,7 @@ public class CatalogWrapperForREST extends 
IcebergCatalogWrapper {
 
   public CatalogWrapperForREST(String catalogName, IcebergConfig config) {
     super(config);
-    this.catalogConfigToClients =
-        MapUtils.getFilteredMap(
-            config.getIcebergCatalogProperties(),
-            key -> catalogPropertiesToClientKeys.contains(key));
+    this.catalogConfigToClients = buildCatalogConfigToClients(config, 
getCatalog());
     // To be compatible with old properties
     Map<String, String> catalogProperties =
         checkForCompatibility(config.getAllConfig(), deprecatedProperties);
@@ -111,7 +127,12 @@ public class CatalogWrapperForREST extends 
IcebergCatalogWrapper {
 
   public LoadTableResponse createTable(
       Namespace namespace, CreateTableRequest request, boolean 
requestCredential) {
-    LoadTableResponse loadTableResponse = super.createTable(namespace, 
request);
+    LoadTableResponse loadTableResponse;
+    if (catalog instanceof RESTCatalog) {
+      loadTableResponse = createTableInternal(namespace, request);
+    } else {
+      loadTableResponse = super.createTable(namespace, request);
+    }
     if (shouldGenerateCredential(loadTableResponse, requestCredential)) {
       return injectCredentialConfig(
           TableIdentifier.of(namespace, request.name()),
@@ -123,13 +144,28 @@ public class CatalogWrapperForREST extends 
IcebergCatalogWrapper {
 
   public LoadTableResponse loadTable(
       TableIdentifier identifier, boolean requestCredential, 
CredentialPrivilege privilege) {
-    LoadTableResponse loadTableResponse = super.loadTable(identifier);
+    LoadTableResponse loadTableResponse;
+    if (catalog instanceof RESTCatalog) {
+      loadTableResponse = loadTableInternal(identifier);
+    } else {
+      loadTableResponse = super.loadTable(identifier);
+    }
     if (shouldGenerateCredential(loadTableResponse, requestCredential)) {
       return injectCredentialConfig(identifier, loadTableResponse, privilege);
     }
     return loadTableResponse;
   }
 
+  @Override
+  public LoadTableResponse updateTable(
+      TableIdentifier tableIdentifier, UpdateTableRequest updateTableRequest) {
+    if (catalog instanceof RESTCatalog) {
+      return CatalogHandlers.updateTable(catalog, tableIdentifier, 
updateTableRequest);
+    } else {
+      return super.updateTable(tableIdentifier, updateTableRequest);
+    }
+  }
+
   /**
    * Get table credentials.
    *
@@ -151,6 +187,7 @@ public class CatalogWrapperForREST extends 
IcebergCatalogWrapper {
 
             @Override
             public Map<String, String> config() {
+              // Convert Gravitino credentials to the Iceberg REST credential 
payload format.
               return CredentialPropertyUtils.toIcebergProperties(credential);
             }
 
@@ -186,6 +223,54 @@ public class CatalogWrapperForREST extends 
IcebergCatalogWrapper {
     return catalogConfigToClients;
   }
 
+  /**
+   * Builds properties exposed to Iceberg clients via the IRC {@code 
/v1/config} defaults.
+   *
+   * <p>For {@link RESTCatalog}, uses {@link RESTCatalog#properties()} so 
defaults reflect the
+   * remote catalog's config response merged with client properties (after 
REST handshake), not only
+   * static Gravitino catalog configuration.
+   */
+  @VisibleForTesting
+  static Map<String, String> buildCatalogConfigToClients(IcebergConfig config, 
Catalog catalog) {
+    Map<String, String> sourceProps;
+    if (catalog instanceof RESTCatalog) {
+      Map<String, String> merged = ((RESTCatalog) catalog).properties();
+      sourceProps = merged != null ? new HashMap<>(merged) : new HashMap<>();
+    } else {
+      sourceProps = new HashMap<>(config.getIcebergCatalogProperties());
+    }
+
+    Map<String, String> filtered =
+        MapUtils.getFilteredMap(sourceProps, key -> 
catalogPropertiesToClientKeys.contains(key));
+    filtered = new HashMap<>(filtered);
+    validateAndNormalizeDataAccessProperty(filtered);
+
+    return Collections.unmodifiableMap(filtered);
+  }
+
+  @VisibleForTesting
+  static void validateAndNormalizeDataAccessProperty(Map<String, String> 
properties) {
+    String dataAccess = 
properties.get(IcebergConstants.ICEBERG_ACCESS_DELEGATION);
+    if (StringUtils.isBlank(dataAccess)) {
+      return;
+    }
+
+    String normalizedDataAccess = dataAccess.toLowerCase(Locale.ROOT);
+    if (!DATA_ACCESS_VENDED_CREDENTIALS.equals(normalizedDataAccess)
+        && !DATA_ACCESS_REMOTE_SIGNING.equals(normalizedDataAccess)) {
+      throw new IllegalArgumentException(
+          "Invalid catalog property '"
+              + IcebergConstants.DATA_ACCESS
+              + "': "
+              + dataAccess
+              + ", supported values are ["
+              + DATA_ACCESS_VENDED_CREDENTIALS
+              + ","
+              + DATA_ACCESS_REMOTE_SIGNING
+              + "]");
+    }
+  }
+
   @Override
   protected boolean useDifferentClassLoader() {
     return false;
@@ -202,6 +287,8 @@ public class CatalogWrapperForREST extends 
IcebergCatalogWrapper {
         credential.credentialType(),
         tableIdentifier);
 
+    // Merge temporary credential fields as Iceberg client config entries in 
the load-table
+    // response.
     Map<String, String> credentialConfig = 
CredentialPropertyUtils.toIcebergProperties(credential);
     return LoadTableResponse.builder()
         .withTableMetadata(loadTableResponse.tableMetadata())
@@ -244,6 +331,12 @@ public class CatalogWrapperForREST extends 
IcebergCatalogWrapper {
     if (!requestCredential) {
       return false;
     }
+
+    // RESTCatalog will fetch credential from the remote catalog instead of 
generating credential
+    if (getCatalog() instanceof RESTCatalog) {
+      return false;
+    }
+
     validateCredentialLocation(loadTableResponse.tableMetadata().location());
     return !isLocalOrHdfsTable(loadTableResponse.tableMetadata());
   }
@@ -535,4 +628,105 @@ public class CatalogWrapperForREST extends 
IcebergCatalogWrapper {
       properties.put(newProperty, deprecatedValue);
     }
   }
+
+  private LoadTableResponse createTableInternal(Namespace namespace, 
CreateTableRequest request) {
+
+    request.validate();
+
+    if (request.stageCreate()) {
+      return stageTableCreateInternal(namespace, request);
+    }
+
+    TableIdentifier ident = TableIdentifier.of(namespace, request.name());
+    Table table =
+        catalog
+            .buildTable(ident, request.schema())
+            .withLocation(request.location())
+            .withPartitionSpec(request.spec())
+            .withSortOrder(request.writeOrder())
+            .withProperties(request.properties())
+            .create();
+
+    if (table instanceof BaseTable) {
+      Map<String, String> properties = retrieveFileIOProperties(table.io());
+      return LoadTableResponse.builder()
+          .withTableMetadata(((BaseTable) table).operations().current())
+          .addAllConfig(
+              MapUtils.getFilteredMap(
+                  properties, key -> 
catalogPropertiesToClientKeys.contains(key)))
+          // Keep only credential fields from FileIO properties before 
returning them to the client.
+          
.addAllConfig(CredentialPropertyUtils.filterCredentialProperties(properties))
+          .build();
+    }
+
+    throw new IllegalStateException("Cannot wrap catalog that does not produce 
BaseTable");
+  }
+
+  private LoadTableResponse stageTableCreateInternal(
+      Namespace namespace, CreateTableRequest request) {
+    TableIdentifier ident = TableIdentifier.of(namespace, request.name());
+    if (catalog.tableExists(ident)) {
+      throw new AlreadyExistsException("Table already exists: %s", ident);
+    }
+
+    Map<String, String> properties = Maps.newHashMap();
+    properties.put("created-at", 
OffsetDateTime.now(ZoneOffset.UTC).toString());
+    properties.putAll(request.properties());
+
+    Map<String, String> config = Maps.newHashMap();
+    String location;
+    if (request.location() != null) {
+      location = request.location();
+    } else {
+      Table table =
+          catalog
+              .buildTable(ident, request.schema())
+              .withPartitionSpec(request.spec())
+              .withSortOrder(request.writeOrder())
+              .withProperties(properties)
+              .createTransaction()
+              .table();
+      Map<String, String> tableProperties = 
retrieveFileIOProperties(table.io());
+      config.putAll(
+          MapUtils.getFilteredMap(
+              tableProperties, key -> 
catalogPropertiesToClientKeys.contains(key)));
+      
config.putAll(CredentialPropertyUtils.filterCredentialProperties(tableProperties));
+      location = table.location();
+    }
+
+    TableMetadata metadata =
+        TableMetadata.newTableMetadata(
+            request.schema(),
+            request.spec() != null ? request.spec() : 
PartitionSpec.unpartitioned(),
+            request.writeOrder() != null ? request.writeOrder() : 
SortOrder.unsorted(),
+            location,
+            properties);
+
+    return 
LoadTableResponse.builder().withTableMetadata(metadata).addAllConfig(config).build();
+  }
+
+  private LoadTableResponse loadTableInternal(TableIdentifier ident) {
+    Table table = catalog.loadTable(ident);
+
+    if (table instanceof BaseTable) {
+      Map<String, String> properties = retrieveFileIOProperties(table.io());
+      return LoadTableResponse.builder()
+          .withTableMetadata(((BaseTable) table).operations().current())
+          .addAllConfig(
+              MapUtils.getFilteredMap(
+                  properties, key -> 
catalogPropertiesToClientKeys.contains(key)))
+          // Keep only credential fields from FileIO properties before 
returning them to the client.
+          
.addAllConfig(CredentialPropertyUtils.filterCredentialProperties(properties))
+          .build();
+    } else if (table instanceof BaseMetadataTable) {
+      // metadata tables are loaded on the client side, return 
NoSuchTableException for now
+      throw new NoSuchTableException("Table does not exist: %s", 
ident.toString());
+    }
+
+    throw new IllegalStateException("Cannot wrap catalog that does not produce 
BaseTable");
+  }
+
+  private static Map<String, String> retrieveFileIOProperties(FileIO fileIO) {
+    return fileIO instanceof InMemoryFileIO ? Maps.newHashMap() : 
fileIO.properties();
+  }
 }
diff --git 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java
 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java
index ecc16caca9..438d62fc0d 100644
--- 
a/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java
+++ 
b/iceberg/iceberg-rest-server/src/main/java/org/apache/gravitino/iceberg/service/IcebergExceptionMapper.java
@@ -20,6 +20,7 @@ package org.apache.gravitino.iceberg.service;
 
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
+import javax.ws.rs.NotFoundException;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.ext.ExceptionMapper;
@@ -62,6 +63,7 @@ public class IcebergExceptionMapper implements 
ExceptionMapper<Exception> {
           .put(UnauthorizedException.class, 401)
           .put(org.apache.gravitino.exceptions.ForbiddenException.class, 403)
           .put(ForbiddenException.class, 403)
+          .put(NotFoundException.class, 404)
           .put(NoSuchNamespaceException.class, 404)
           .put(NoSuchTableException.class, 404)
           .put(NoSuchIcebergTableException.class, 404)
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestCatalogWrapperForREST.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestCatalogWrapperForREST.java
index b22ce13cec..17b69c5b2b 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestCatalogWrapperForREST.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestCatalogWrapperForREST.java
@@ -19,8 +19,15 @@
 
 package org.apache.gravitino.iceberg.service;
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import com.google.common.collect.ImmutableMap;
 import java.util.Map;
+import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
+import org.apache.gravitino.iceberg.common.IcebergConfig;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.rest.RESTCatalog;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -74,4 +81,76 @@ public class TestCatalogWrapperForREST {
         IllegalArgumentException.class,
         () -> CatalogWrapperForREST.validateCredentialLocation("   "));
   }
+
+  @Test
+  void testCatalogConfigToClientsForRestBackendUsesMergedRemoteProperties() {
+    IcebergConfig config =
+        new IcebergConfig(
+            ImmutableMap.of(
+                IcebergConstants.CATALOG_BACKEND,
+                "rest",
+                IcebergConstants.URI,
+                "http://client-config-only:8181";));
+
+    RESTCatalog restCatalog = mock(RESTCatalog.class);
+    when(restCatalog.properties())
+        .thenReturn(
+            ImmutableMap.of(
+                IcebergConstants.URI,
+                "http://merged-from-remote-config:9999";,
+                IcebergConstants.IO_IMPL,
+                "org.apache.iceberg.aws.s3.S3FileIO",
+                IcebergConstants.ICEBERG_S3_ENDPOINT,
+                "http://localhost:9000";,
+                IcebergConstants.ICEBERG_ACCESS_DELEGATION,
+                "vended-credentials",
+                IcebergConstants.WAREHOUSE,
+                "/remote/warehouse"));
+
+    Map<String, String> configToClients =
+        CatalogWrapperForREST.buildCatalogConfigToClients(config, restCatalog);
+
+    Assertions.assertEquals(
+        "org.apache.iceberg.aws.s3.S3FileIO", 
configToClients.get(IcebergConstants.IO_IMPL));
+    Assertions.assertEquals(
+        "http://localhost:9000";, 
configToClients.get(IcebergConstants.ICEBERG_S3_ENDPOINT));
+    Assertions.assertEquals(
+        "vended-credentials", 
configToClients.get(IcebergConstants.ICEBERG_ACCESS_DELEGATION));
+  }
+
+  @Test
+  void testCatalogConfigToClientsForNonRestBackend() {
+    Catalog catalog = mock(Catalog.class);
+    IcebergConfig config =
+        new IcebergConfig(
+            ImmutableMap.of(
+                IcebergConstants.CATALOG_BACKEND,
+                "hive",
+                IcebergConstants.URI,
+                "thrift://hive-metastore:9083",
+                IcebergConstants.IO_IMPL,
+                "org.apache.iceberg.aws.s3.S3FileIO"));
+    Map<String, String> configToClients =
+        CatalogWrapperForREST.buildCatalogConfigToClients(config, catalog);
+    Assertions.assertFalse(configToClients.containsKey(IcebergConstants.URI));
+    Assertions.assertEquals(
+        "org.apache.iceberg.aws.s3.S3FileIO", 
configToClients.get(IcebergConstants.IO_IMPL));
+    
Assertions.assertFalse(configToClients.containsKey(IcebergConstants.DATA_ACCESS));
+  }
+
+  @Test
+  void testCatalogConfigToClientsRejectsInvalidDataAccessValue() {
+    Catalog catalog = mock(Catalog.class);
+    IcebergConfig config =
+        new IcebergConfig(
+            ImmutableMap.of(
+                IcebergConstants.CATALOG_BACKEND,
+                "hive",
+                IcebergConstants.ICEBERG_ACCESS_DELEGATION,
+                "invalid-mode"));
+
+    Assertions.assertThrows(
+        IllegalArgumentException.class,
+        () -> CatalogWrapperForREST.buildCatalogConfigToClients(config, 
catalog));
+  }
 }
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergExceptionMapper.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergExceptionMapper.java
index f893ff8221..f7766a04d4 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergExceptionMapper.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/TestIcebergExceptionMapper.java
@@ -18,6 +18,7 @@
  */
 package org.apache.gravitino.iceberg.service;
 
+import javax.ws.rs.NotFoundException;
 import javax.ws.rs.core.Response;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.CommitFailedException;
@@ -49,6 +50,7 @@ public class TestIcebergExceptionMapper {
     checkExceptionStatus(new NamespaceNotEmptyException(""), 400);
     checkExceptionStatus(new NotAuthorizedException(""), 401);
     checkExceptionStatus(new ForbiddenException(""), 403);
+    checkExceptionStatus(new NotFoundException(), 404);
     checkExceptionStatus(new NoSuchNamespaceException(""), 404);
     checkExceptionStatus(new NoSuchTableException(""), 404);
     checkExceptionStatus(new NoSuchIcebergTableException(""), 404);
diff --git 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergConfig.java
 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergConfig.java
index 9fafca21bf..b02d9fa61b 100644
--- 
a/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergConfig.java
+++ 
b/iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/service/rest/TestIcebergConfig.java
@@ -103,9 +103,7 @@ public class TestIcebergConfig extends IcebergTestBase {
     // Per the Iceberg REST spec, the config endpoint does not accept a prefix.
     String path = injectPrefixToPath(IcebergRestTestUtil.CONFIG_PATH, 
IcebergRestTestUtil.PREFIX);
     Response response = getIcebergClientBuilder(path, Optional.empty()).get();
-    // Jersey returns 500 (not 404) for unmatched routes because the
-    // IcebergExceptionMapper converts the NotFoundException into an internal 
error response.
-    Assertions.assertEquals(500, response.getStatus());
+    Assertions.assertEquals(404, response.getStatus());
   }
 
   @Test
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authentication/KerberosAuthenticator.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authentication/KerberosAuthenticator.java
index a14f3b3abb..801133834f 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authentication/KerberosAuthenticator.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authentication/KerberosAuthenticator.java
@@ -22,6 +22,7 @@ import javax.security.auth.kerberos.KerberosPrincipal;
 import javax.security.auth.kerberos.KeyTab;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Config;
+import org.apache.gravitino.UserPrincipal;
 import org.apache.gravitino.auth.AuthConstants;
 import org.apache.gravitino.auth.KerberosUtils;
 import org.apache.gravitino.auth.PrincipalMapper;
@@ -126,12 +127,20 @@ public class KerberosAuthenticator implements 
Authenticator {
         throw new IllegalArgumentException("Principal must start with 
`HTTP/`");
       }
 
+      final String authHeaderValue = authData;
       return Subject.doAs(
           serverSubject,
           new PrivilegedExceptionAction<Principal>() {
             @Override
             public Principal run() throws Exception {
-              return retrievePrincipalFromToken(serverPrincipal, clientToken);
+              Principal principal = 
retrievePrincipalFromToken(serverPrincipal, clientToken);
+              // Keep the raw Authorization header value so downstream 
services can reuse it.
+              if (principal instanceof UserPrincipal) {
+                UserPrincipal userPrincipal = (UserPrincipal) principal;
+                return new UserPrincipal(
+                    userPrincipal.getName(), userPrincipal.getGroups(), 
authHeaderValue);
+              }
+              return new UserPrincipal(principal.getName(), authHeaderValue);
             }
           });
     } catch (Exception e) {
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authentication/OAuth2TokenAuthenticator.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authentication/OAuth2TokenAuthenticator.java
index 4b1f3ff81f..8fb954bb5b 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authentication/OAuth2TokenAuthenticator.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authentication/OAuth2TokenAuthenticator.java
@@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets;
 import java.security.Principal;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.gravitino.Config;
+import org.apache.gravitino.UserPrincipal;
 import org.apache.gravitino.auth.AuthConstants;
 import org.apache.gravitino.exceptions.UnauthorizedException;
 
@@ -71,7 +72,13 @@ class OAuth2TokenAuthenticator implements Authenticator {
     // TODO: If we support multiple OAuth 2.0 servers, we should use multiple
     // signing keys.
     try {
-      return tokenValidator.validateToken(token, serviceAudience);
+      Principal validated = tokenValidator.validateToken(token, 
serviceAudience);
+      if (validated instanceof UserPrincipal) {
+        UserPrincipal userPrincipal = (UserPrincipal) validated;
+        // Keep the raw Authorization header value so downstream services can 
reuse it.
+        return new UserPrincipal(userPrincipal.getName(), 
userPrincipal.getGroups(), authData);
+      }
+      return validated;
     } catch (UnauthorizedException e) {
       // Re-throw validation errors (audience, subject, etc.) without wrapping
       throw e;
diff --git 
a/server-common/src/main/java/org/apache/gravitino/server/authentication/SimpleAuthenticator.java
 
b/server-common/src/main/java/org/apache/gravitino/server/authentication/SimpleAuthenticator.java
index 1ff2195f1c..92020fe458 100644
--- 
a/server-common/src/main/java/org/apache/gravitino/server/authentication/SimpleAuthenticator.java
+++ 
b/server-common/src/main/java/org/apache/gravitino/server/authentication/SimpleAuthenticator.java
@@ -19,6 +19,8 @@
 
 package org.apache.gravitino.server.authentication;
 
+import static org.apache.gravitino.utils.PrincipalUtils.ANONYMOUS_PRINCIPAL;
+
 import java.nio.charset.StandardCharsets;
 import java.security.Principal;
 import java.util.Base64;
@@ -33,8 +35,6 @@ import org.apache.gravitino.auth.AuthConstants;
  */
 class SimpleAuthenticator implements Authenticator {
 
-  private final Principal ANONYMOUS_PRINCIPAL = new 
UserPrincipal(AuthConstants.ANONYMOUS_USER);
-
   @Override
   public boolean isDataFromToken() {
     return true;
@@ -62,7 +62,8 @@ class SimpleAuthenticator implements Authenticator {
       if (userInformation.length < 1 || userInformation[0].isEmpty()) {
         return ANONYMOUS_PRINCIPAL;
       }
-      return new UserPrincipal(userInformation[0]);
+      // Keep the raw Authorization header value so downstream services can 
reuse it.
+      return new UserPrincipal(userInformation[0], authData);
     } catch (IllegalArgumentException ie) {
       return ANONYMOUS_PRINCIPAL;
     }
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/authentication/TestKerberosAuthenticator.java
 
b/server-common/src/test/java/org/apache/gravitino/server/authentication/TestKerberosAuthenticator.java
index 0dace12792..2ee0d6fd95 100644
--- 
a/server-common/src/test/java/org/apache/gravitino/server/authentication/TestKerberosAuthenticator.java
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/authentication/TestKerberosAuthenticator.java
@@ -24,9 +24,11 @@ import static 
org.apache.gravitino.server.authentication.KerberosConfig.PRINCIPA
 
 import java.io.File;
 import java.nio.charset.StandardCharsets;
+import java.security.Principal;
 import java.util.Base64;
 import java.util.concurrent.Callable;
 import org.apache.gravitino.Config;
+import org.apache.gravitino.UserPrincipal;
 import org.apache.gravitino.auth.AuthConstants;
 import org.apache.gravitino.auth.KerberosUtils;
 import org.apache.gravitino.exceptions.UnauthorizedException;
@@ -181,8 +183,11 @@ public class TestKerberosAuthenticator extends 
KerberosSecurityTestcase {
                 }
               }
             });
-    kerberosAuthenticator.authenticateToken(
-        (AuthConstants.AUTHORIZATION_NEGOTIATE_HEADER + 
token).getBytes(StandardCharsets.UTF_8));
+    String authHeader = AuthConstants.AUTHORIZATION_NEGOTIATE_HEADER + token;
+    Principal principal =
+        
kerberosAuthenticator.authenticateToken(authHeader.getBytes(StandardCharsets.UTF_8));
+    Assertions.assertTrue(principal instanceof UserPrincipal);
+    Assertions.assertEquals(authHeader, ((UserPrincipal) 
principal).getAccessToken().get());
   }
 
   private void initKeyTab() throws Exception {
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/authentication/TestOAuth2TokenAuthenticator.java
 
b/server-common/src/test/java/org/apache/gravitino/server/authentication/TestOAuth2TokenAuthenticator.java
index d6571df6bd..12aff6666d 100644
--- 
a/server-common/src/test/java/org/apache/gravitino/server/authentication/TestOAuth2TokenAuthenticator.java
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/authentication/TestOAuth2TokenAuthenticator.java
@@ -38,6 +38,7 @@ import java.util.Base64;
 import java.util.Date;
 import java.util.Map;
 import org.apache.gravitino.Config;
+import org.apache.gravitino.UserPrincipal;
 import org.apache.gravitino.auth.AuthConstants;
 import org.apache.gravitino.exceptions.UnauthorizedException;
 import org.junit.jupiter.api.Test;
@@ -151,13 +152,14 @@ public class TestOAuth2TokenAuthenticator {
             .setAudience("service1")
             .signWith(keyPair.getPrivate(), SignatureAlgorithm.RS256)
             .compact();
+    Principal principal =
+        auth2TokenAuthenticator.authenticateToken(
+            (AuthConstants.AUTHORIZATION_BEARER_HEADER + 
token5).getBytes(StandardCharsets.UTF_8));
+    assertEquals("gravitino", principal.getName());
+    assertTrue(principal instanceof UserPrincipal);
     assertEquals(
-        "gravitino",
-        auth2TokenAuthenticator
-            .authenticateToken(
-                (AuthConstants.AUTHORIZATION_BEARER_HEADER + token5)
-                    .getBytes(StandardCharsets.UTF_8))
-            .getName());
+        AuthConstants.AUTHORIZATION_BEARER_HEADER + token5,
+        ((UserPrincipal) principal).getAccessToken().get());
   }
 
   @Test
@@ -264,6 +266,8 @@ public class TestOAuth2TokenAuthenticator {
     Principal principal = authenticator.authenticateToken(tokenData);
     assertNotNull(principal);
     assertEquals("test-user", principal.getName());
+    assertTrue(principal instanceof UserPrincipal);
+    assertEquals(bearerToken, ((UserPrincipal) 
principal).getAccessToken().get());
   }
 
   @Test
diff --git 
a/server-common/src/test/java/org/apache/gravitino/server/authentication/TestSimpleAuthenticator.java
 
b/server-common/src/test/java/org/apache/gravitino/server/authentication/TestSimpleAuthenticator.java
index c98380b978..6d4e8cb2f5 100644
--- 
a/server-common/src/test/java/org/apache/gravitino/server/authentication/TestSimpleAuthenticator.java
+++ 
b/server-common/src/test/java/org/apache/gravitino/server/authentication/TestSimpleAuthenticator.java
@@ -21,7 +21,9 @@ package org.apache.gravitino.server.authentication;
 import java.nio.charset.StandardCharsets;
 import java.util.Base64;
 import org.apache.gravitino.Config;
+import org.apache.gravitino.UserPrincipal;
 import org.apache.gravitino.auth.AuthConstants;
+import org.apache.gravitino.utils.PrincipalUtils;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -35,27 +37,45 @@ public class TestSimpleAuthenticator {
     Assertions.assertTrue(simpleAuthenticator.isDataFromToken());
     Assertions.assertEquals(
         AuthConstants.ANONYMOUS_USER, 
simpleAuthenticator.authenticateToken(null).getName());
+    Assertions.assertEquals(
+        ((UserPrincipal) PrincipalUtils.ANONYMOUS_PRINCIPAL).getAccessToken(),
+        ((UserPrincipal) 
simpleAuthenticator.authenticateToken(null)).getAccessToken());
     Assertions.assertEquals(
         AuthConstants.ANONYMOUS_USER,
         
simpleAuthenticator.authenticateToken("".getBytes(StandardCharsets.UTF_8)).getName());
+    Assertions.assertEquals(
+        ((UserPrincipal) PrincipalUtils.ANONYMOUS_PRINCIPAL).getAccessToken(),
+        ((UserPrincipal) 
simpleAuthenticator.authenticateToken("".getBytes(StandardCharsets.UTF_8)))
+            .getAccessToken());
     Assertions.assertEquals(
         AuthConstants.ANONYMOUS_USER,
         
simpleAuthenticator.authenticateToken("abc".getBytes(StandardCharsets.UTF_8)).getName());
+    Assertions.assertEquals(
+        ((UserPrincipal) PrincipalUtils.ANONYMOUS_PRINCIPAL).getAccessToken(),
+        ((UserPrincipal)
+                
simpleAuthenticator.authenticateToken("abc".getBytes(StandardCharsets.UTF_8)))
+            .getAccessToken());
     Assertions.assertEquals(
         AuthConstants.ANONYMOUS_USER,
         simpleAuthenticator
             .authenticateToken(
                 
AuthConstants.AUTHORIZATION_BASIC_HEADER.getBytes(StandardCharsets.UTF_8))
             .getName());
+    Assertions.assertEquals(
+        ((UserPrincipal) PrincipalUtils.ANONYMOUS_PRINCIPAL).getAccessToken(),
+        ((UserPrincipal)
+                simpleAuthenticator.authenticateToken(
+                    
AuthConstants.AUTHORIZATION_BASIC_HEADER.getBytes(StandardCharsets.UTF_8)))
+            .getAccessToken());
     String fullCredentials = "test-user:123";
     String basicToken =
         AuthConstants.AUTHORIZATION_BASIC_HEADER
             + 
Base64.getEncoder().encodeToString(fullCredentials.getBytes(StandardCharsets.UTF_8));
-    Assertions.assertEquals(
-        fullCredentials.split(":")[0],
-        simpleAuthenticator
-            .authenticateToken(basicToken.getBytes(StandardCharsets.UTF_8))
-            .getName());
+    UserPrincipal principal =
+        (UserPrincipal)
+            
simpleAuthenticator.authenticateToken(basicToken.getBytes(StandardCharsets.UTF_8));
+    Assertions.assertEquals(fullCredentials.split(":")[0], 
principal.getName());
+    Assertions.assertEquals(basicToken, principal.getAccessToken().get());
     String credentialsOnlyHaveUsername = "test-user:";
     basicToken =
         AuthConstants.AUTHORIZATION_BASIC_HEADER
@@ -76,6 +96,11 @@ public class TestSimpleAuthenticator {
         simpleAuthenticator
             .authenticateToken(basicToken.getBytes(StandardCharsets.UTF_8))
             .getName());
+    Assertions.assertEquals(
+        ((UserPrincipal) PrincipalUtils.ANONYMOUS_PRINCIPAL).getAccessToken(),
+        ((UserPrincipal)
+                
simpleAuthenticator.authenticateToken(basicToken.getBytes(StandardCharsets.UTF_8)))
+            .getAccessToken());
     Assertions.assertEquals(
         "gravitino",
         simpleAuthenticator

Reply via email to