This is an automated email from the ASF dual-hosted git repository.

dimas pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 27ae68430 feat: pass principal name as part of aws subscoped 
credentials session (#3224)
27ae68430 is described below

commit 27ae684304c6c62e5ee516404eee1419bc5d67f8
Author: Tornike Gurgenidze <[email protected]>
AuthorDate: Fri Dec 12 18:09:02 2025 +0400

    feat: pass principal name as part of aws subscoped credentials session 
(#3224)
    
    * feat: pass principal name as part of aws subscoped credentials session 
name
    
    * feat: resolve principal from CurrentIdentityAssociation
    
    * fix: handle principal injection for async tasks
    
    * add feature flag for principal name include
    
    * add changelog, address comments
    
    * handle null identity, refactor tests
---
 CHANGELOG.md                                       |   1 +
 .../polaris/core/config/FeatureConfiguration.java  |  12 ++
 .../AtomicOperationMetaStoreManager.java           |   3 +
 .../TransactionWorkspaceMetaStoreManager.java      |   3 +
 .../TransactionalMetaStoreManagerImpl.java         |   3 +
 .../core/storage/PolarisCredentialVendor.java      |   2 +
 .../core/storage/PolarisStorageIntegration.java    |   2 +
 .../core/storage/StorageCredentialsVendor.java     |   3 +
 .../aws/AwsCredentialsStorageIntegration.java      |  15 ++-
 .../azure/AzureCredentialsStorageIntegration.java  |   2 +
 .../core/storage/cache/StorageCredentialCache.java |  12 +-
 .../storage/cache/StorageCredentialCacheKey.java   |  10 +-
 .../gcp/GcpCredentialsStorageIntegration.java      |   2 +
 .../storage/InMemoryStorageIntegrationTest.java    |   2 +
 .../storage/cache/StorageCredentialCacheTest.java  | 122 ++++++++++++++++++++-
 .../aws/AwsCredentialsStorageIntegrationTest.java  | 117 ++++++++++++++++++++
 .../AzureCredentialStorageIntegrationTest.java     |   4 +
 .../gcp/GcpCredentialsStorageIntegrationTest.java  |   3 +
 .../catalog/io/StorageAccessConfigProvider.java    |   7 +-
 .../polaris/service/config/ServiceProducers.java   |  18 ---
 .../context/catalog/PolarisPrincipalHolder.java    |  70 ++++++++++++
 .../PolarisStorageIntegrationProviderImpl.java     |   2 +
 .../polaris/service/task/TaskExecutorImpl.java     |  27 ++++-
 .../service/catalog/RootPrincipalAugmentor.java    |  67 +++++++++++
 .../iceberg/AbstractIcebergCatalogTest.java        |  10 +-
 .../polaris/service/task/TaskExecutorImplTest.java |   5 +-
 .../org/apache/polaris/service/TestServices.java   |   3 +-
 27 files changed, 491 insertions(+), 36 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 32eec35cb..596b86316 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -59,6 +59,7 @@ request adding CHANGELOG notes for breaking (!) changes and 
possibly other secti
 - Support credential vending for federated catalogs. 
`ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING` (default: true) was added to 
toggle this feature.
 - Enhanced catalog federation with SigV4 authentication support, additional 
authentication types for credential vending, and location-based access 
restrictions to block credential vending for remote tables outside allowed 
location lists.
 - Added `topologySpreadConstraints` support in Helm chart.
+- Added support for including principal name in subscoped credentials. 
`INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL` (default: false) can be used 
to toggle this feature. If enabled, cached credentials issued to one principal 
will no longer be available for others.
 
 ### Changes
 
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
index b843fea58..fbc2ba44a 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
@@ -79,6 +79,18 @@ public class FeatureConfiguration<T> extends 
PolarisConfiguration<T> {
           .defaultValue(false)
           .buildFeatureConfiguration();
 
+  public static final FeatureConfiguration<Boolean> 
INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL =
+      PolarisConfiguration.<Boolean>builder()
+          .key("INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL")
+          .description(
+              "If set to true, principal name will be included in temporary 
subscoped credentials.\n"
+                  + "Currently only AWS credentials are supported for which 
session name of the generated credentials \n"
+                  + "will look like 'polaris-<principal>' rather than simple 
'polaris'.\n"
+                  + "Note that enabling this feature leads to degradation in 
temporary credential caching as \n"
+                  + "catalog will no longer be able to reuse credentials for 
multiple principals.")
+          .defaultValue(false)
+          .buildFeatureConfiguration();
+
   public static final FeatureConfiguration<Boolean> ALLOW_SETTING_S3_ENDPOINTS 
=
       PolarisConfiguration.<Boolean>builder()
           .key("ALLOW_SETTING_S3_ENDPOINTS")
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
index c31886d56..9e702a0a1 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/AtomicOperationMetaStoreManager.java
@@ -35,6 +35,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.PolarisDiagnostics;
+import org.apache.polaris.core.auth.PolarisPrincipal;
 import org.apache.polaris.core.config.FeatureConfiguration;
 import org.apache.polaris.core.entity.AsyncTaskType;
 import org.apache.polaris.core.entity.CatalogEntity;
@@ -1600,6 +1601,7 @@ public class AtomicOperationMetaStoreManager extends 
BaseMetaStoreManager {
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
       @Nonnull Set<String> allowedWriteLocations,
+      @Nonnull PolarisPrincipal polarisPrincipal,
       Optional<String> refreshCredentialsEndpoint) {
 
     // get meta store session we should be using
@@ -1641,6 +1643,7 @@ public class AtomicOperationMetaStoreManager extends 
BaseMetaStoreManager {
               allowListOperation,
               allowedReadLocations,
               allowedWriteLocations,
+              polarisPrincipal,
               refreshCredentialsEndpoint);
       return new ScopedCredentialsResult(storageAccessConfig);
     } catch (Exception ex) {
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
index e788b999c..6ae68a370 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/TransactionWorkspaceMetaStoreManager.java
@@ -28,6 +28,7 @@ import java.util.Optional;
 import java.util.Set;
 import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.PolarisDiagnostics;
+import org.apache.polaris.core.auth.PolarisPrincipal;
 import org.apache.polaris.core.entity.LocationBasedEntity;
 import org.apache.polaris.core.entity.PolarisBaseEntity;
 import org.apache.polaris.core.entity.PolarisEntity;
@@ -324,6 +325,7 @@ public class TransactionWorkspaceMetaStoreManager 
implements PolarisMetaStoreMan
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
       @Nonnull Set<String> allowedWriteLocations,
+      @Nonnull PolarisPrincipal polarisPrincipal,
       Optional<String> refreshCredentialsEndpoint) {
     return delegate.getSubscopedCredsForEntity(
         callCtx,
@@ -333,6 +335,7 @@ public class TransactionWorkspaceMetaStoreManager 
implements PolarisMetaStoreMan
         allowListOperation,
         allowedReadLocations,
         allowedWriteLocations,
+        polarisPrincipal,
         refreshCredentialsEndpoint);
   }
 
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
index 658b7cf94..e0e145cde 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
@@ -34,6 +34,7 @@ import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.PolarisDiagnostics;
+import org.apache.polaris.core.auth.PolarisPrincipal;
 import org.apache.polaris.core.config.FeatureConfiguration;
 import org.apache.polaris.core.entity.AsyncTaskType;
 import org.apache.polaris.core.entity.CatalogEntity;
@@ -2094,6 +2095,7 @@ public class TransactionalMetaStoreManagerImpl extends 
BaseMetaStoreManager {
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
       @Nonnull Set<String> allowedWriteLocations,
+      @Nonnull PolarisPrincipal polarisPrincipal,
       Optional<String> refreshCredentialsEndpoint) {
 
     // get meta store session we should be using
@@ -2130,6 +2132,7 @@ public class TransactionalMetaStoreManagerImpl extends 
BaseMetaStoreManager {
               allowListOperation,
               allowedReadLocations,
               allowedWriteLocations,
+              polarisPrincipal,
               refreshCredentialsEndpoint);
       return new ScopedCredentialsResult(storageAccessConfig);
     } catch (Exception ex) {
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java
index ee90294c6..19e38a251 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisCredentialVendor.java
@@ -22,6 +22,7 @@ import jakarta.annotation.Nonnull;
 import java.util.Optional;
 import java.util.Set;
 import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.auth.PolarisPrincipal;
 import org.apache.polaris.core.entity.PolarisEntityType;
 import org.apache.polaris.core.persistence.dao.entity.ScopedCredentialsResult;
 
@@ -53,5 +54,6 @@ public interface PolarisCredentialVendor {
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
       @Nonnull Set<String> allowedWriteLocations,
+      @Nonnull PolarisPrincipal polarisPrincipal,
       Optional<String> refreshCredentialsEndpoint);
 }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java
index 8a2ae7c3a..b7b7d3ae2 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageIntegration.java
@@ -22,6 +22,7 @@ import jakarta.annotation.Nonnull;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import org.apache.polaris.core.auth.PolarisPrincipal;
 import org.apache.polaris.core.config.RealmConfig;
 
 /**
@@ -67,6 +68,7 @@ public abstract class PolarisStorageIntegration<T extends 
PolarisStorageConfigur
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
       @Nonnull Set<String> allowedWriteLocations,
+      @Nonnull PolarisPrincipal polarisPrincipal,
       Optional<String> refreshCredentialsEndpoint);
 
   /**
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageCredentialsVendor.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageCredentialsVendor.java
index 59bcf86c8..d634f2870 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageCredentialsVendor.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageCredentialsVendor.java
@@ -22,6 +22,7 @@ package org.apache.polaris.core.storage;
 import jakarta.annotation.Nonnull;
 import java.util.Optional;
 import java.util.Set;
+import org.apache.polaris.core.auth.PolarisPrincipal;
 import org.apache.polaris.core.config.RealmConfig;
 import org.apache.polaris.core.context.CallContext;
 import org.apache.polaris.core.context.RealmContext;
@@ -67,6 +68,7 @@ public class StorageCredentialsVendor {
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
       @Nonnull Set<String> allowedWriteLocations,
+      @Nonnull PolarisPrincipal polarisPrincipal,
       Optional<String> refreshCredentialsEndpoint) {
     return polarisCredentialVendor.getSubscopedCredsForEntity(
         callContext.getPolarisCallContext(),
@@ -76,6 +78,7 @@ public class StorageCredentialsVendor {
         allowListOperation,
         allowedReadLocations,
         allowedWriteLocations,
+        polarisPrincipal,
         refreshCredentialsEndpoint);
   }
 }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java
index 299600695..78958d315 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/AwsCredentialsStorageIntegration.java
@@ -28,6 +28,8 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Stream;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.config.FeatureConfiguration;
 import org.apache.polaris.core.config.RealmConfig;
 import org.apache.polaris.core.storage.InMemoryStorageIntegration;
 import org.apache.polaris.core.storage.StorageAccessConfig;
@@ -81,6 +83,7 @@ public class AwsCredentialsStorageIntegration
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
       @Nonnull Set<String> allowedWriteLocations,
+      @Nonnull PolarisPrincipal polarisPrincipal,
       Optional<String> refreshCredentialsEndpoint) {
     int storageCredentialDurationSeconds =
         realmConfig.getConfig(STORAGE_CREDENTIAL_DURATION_SECONDS);
@@ -89,12 +92,22 @@ public class AwsCredentialsStorageIntegration
     String accountId = storageConfig.getAwsAccountId();
     StorageAccessConfig.Builder accessConfig = StorageAccessConfig.builder();
 
+    boolean includePrincipalNameInSubscopedCredential =
+        
realmConfig.getConfig(FeatureConfiguration.INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL);
+
+    String roleSessionName =
+        includePrincipalNameInSubscopedCredential
+            ? "polaris-" + polarisPrincipal.getName()
+            : "PolarisAwsCredentialsStorageIntegration";
+    String cappedRoleSessionName =
+        roleSessionName.substring(0, Math.min(roleSessionName.length(), 64));
+
     if (shouldUseSts(storageConfig)) {
       AssumeRoleRequest.Builder request =
           AssumeRoleRequest.builder()
               .externalId(storageConfig.getExternalId())
               .roleArn(storageConfig.getRoleARN())
-              .roleSessionName("PolarisAwsCredentialsStorageIntegration")
+              .roleSessionName(cappedRoleSessionName)
               .policy(
                   policyString(
                           storageConfig,
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java
index 7763178b9..b78696aa3 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/azure/AzureCredentialsStorageIntegration.java
@@ -53,6 +53,7 @@ import java.util.HashSet;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
+import org.apache.polaris.core.auth.PolarisPrincipal;
 import org.apache.polaris.core.config.RealmConfig;
 import org.apache.polaris.core.storage.InMemoryStorageIntegration;
 import org.apache.polaris.core.storage.StorageAccessConfig;
@@ -84,6 +85,7 @@ public class AzureCredentialsStorageIntegration
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
       @Nonnull Set<String> allowedWriteLocations,
+      @Nonnull PolarisPrincipal polarisPrincipal,
       Optional<String> refreshCredentialsEndpoint) {
     String loc =
         !allowedWriteLocations.isEmpty()
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java
index 0f22863e5..9f9cf7c40 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCache.java
@@ -31,6 +31,7 @@ import java.util.Set;
 import java.util.function.Function;
 import org.apache.iceberg.exceptions.UnprocessableEntityException;
 import org.apache.polaris.core.PolarisDiagnostics;
+import org.apache.polaris.core.auth.PolarisPrincipal;
 import org.apache.polaris.core.config.FeatureConfiguration;
 import org.apache.polaris.core.config.RealmConfig;
 import org.apache.polaris.core.context.RealmContext;
@@ -109,6 +110,7 @@ public class StorageCredentialCache {
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
       @Nonnull Set<String> allowedWriteLocations,
+      @Nonnull PolarisPrincipal polarisPrincipal,
       Optional<String> refreshCredentialsEndpoint) {
     RealmContext realmContext = storageCredentialsVendor.getRealmContext();
     RealmConfig realmConfig = storageCredentialsVendor.getRealmConfig();
@@ -116,6 +118,10 @@ public class StorageCredentialCache {
       diagnostics.fail(
           "entity_type_not_suppported_to_scope_creds", "type={}", 
polarisEntity.getType());
     }
+
+    boolean includePrincipalNameInSubscopedCredential =
+        
realmConfig.getConfig(FeatureConfiguration.INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL);
+
     StorageCredentialCacheKey key =
         StorageCredentialCacheKey.of(
             realmContext.getRealmIdentifier(),
@@ -123,7 +129,10 @@ public class StorageCredentialCache {
             allowListOperation,
             allowedReadLocations,
             allowedWriteLocations,
-            refreshCredentialsEndpoint);
+            refreshCredentialsEndpoint,
+            includePrincipalNameInSubscopedCredential
+                ? Optional.of(polarisPrincipal)
+                : Optional.empty());
     LOGGER.atDebug().addKeyValue("key", key).log("subscopedCredsCache");
     Function<StorageCredentialCacheKey, StorageCredentialCacheEntry> loader =
         k -> {
@@ -134,6 +143,7 @@ public class StorageCredentialCache {
                   allowListOperation,
                   allowedReadLocations,
                   allowedWriteLocations,
+                  polarisPrincipal,
                   refreshCredentialsEndpoint);
           if (scopedCredentialsResult.isSuccess()) {
             long maxCacheDurationMs = maxCacheDurationMs(realmConfig);
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java
index 8b9d0542d..ce6777c4a 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheKey.java
@@ -21,6 +21,7 @@ package org.apache.polaris.core.storage.cache;
 import jakarta.annotation.Nullable;
 import java.util.Optional;
 import java.util.Set;
+import org.apache.polaris.core.auth.PolarisPrincipal;
 import org.apache.polaris.core.entity.PolarisEntity;
 import org.apache.polaris.core.entity.PolarisEntityConstants;
 import org.apache.polaris.immutables.PolarisImmutable;
@@ -51,13 +52,17 @@ public interface StorageCredentialCacheKey {
   @Value.Parameter(order = 7)
   Optional<String> refreshCredentialsEndpoint();
 
+  @Value.Parameter(order = 8)
+  Optional<String> principalName();
+
   static StorageCredentialCacheKey of(
       String realmId,
       PolarisEntity entity,
       boolean allowedListAction,
       Set<String> allowedReadLocations,
       Set<String> allowedWriteLocations,
-      Optional<String> refreshCredentialsEndpoint) {
+      Optional<String> refreshCredentialsEndpoint,
+      Optional<PolarisPrincipal> polarisPrincipal) {
     String storageConfigSerializedStr =
         entity
             .getInternalPropertiesAsMap()
@@ -69,6 +74,7 @@ public interface StorageCredentialCacheKey {
         allowedListAction,
         allowedReadLocations,
         allowedWriteLocations,
-        refreshCredentialsEndpoint);
+        refreshCredentialsEndpoint,
+        polarisPrincipal.map(PolarisPrincipal::getName));
   }
 }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java
index 6964e77c7..427f715c1 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/gcp/GcpCredentialsStorageIntegration.java
@@ -47,6 +47,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.stream.Stream;
+import org.apache.polaris.core.auth.PolarisPrincipal;
 import org.apache.polaris.core.config.RealmConfig;
 import org.apache.polaris.core.storage.InMemoryStorageIntegration;
 import org.apache.polaris.core.storage.PolarisStorageIntegration;
@@ -89,6 +90,7 @@ public class GcpCredentialsStorageIntegration
       boolean allowListOperation,
       @Nonnull Set<String> allowedReadLocations,
       @Nonnull Set<String> allowedWriteLocations,
+      @Nonnull PolarisPrincipal polarisPrincipal,
       Optional<String> refreshCredentialsEndpoint) {
     try {
       sourceCredentials.refresh();
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java
index e9640cef8..bd596d0d2 100644
--- 
a/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java
+++ 
b/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java
@@ -23,6 +23,7 @@ import jakarta.annotation.Nullable;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import org.apache.polaris.core.auth.PolarisPrincipal;
 import org.apache.polaris.core.config.PolarisConfigurationStore;
 import org.apache.polaris.core.config.RealmConfig;
 import org.apache.polaris.core.config.RealmConfigImpl;
@@ -199,6 +200,7 @@ class InMemoryStorageIntegrationTest {
         boolean allowListOperation,
         @Nonnull Set<String> allowedReadLocations,
         @Nonnull Set<String> allowedWriteLocations,
+        @Nonnull PolarisPrincipal polarisPrincipal,
         Optional<String> refreshCredentialsEndpoint) {
       return null;
     }
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java
index c4db87231..b69eb3ceb 100644
--- 
a/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java
+++ 
b/polaris-core/src/test/java/org/apache/polaris/core/storage/cache/StorageCredentialCacheTest.java
@@ -28,6 +28,8 @@ import java.util.Set;
 import org.apache.iceberg.exceptions.UnprocessableEntityException;
 import org.apache.polaris.core.PolarisDefaultDiagServiceImpl;
 import org.apache.polaris.core.PolarisDiagnostics;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.config.FeatureConfiguration;
 import org.apache.polaris.core.config.PolarisConfigurationStore;
 import org.apache.polaris.core.config.RealmConfig;
 import org.apache.polaris.core.config.RealmConfigImpl;
@@ -54,7 +56,6 @@ public class StorageCredentialCacheTest {
   private final RealmConfig realmConfig =
       new RealmConfigImpl(new PolarisConfigurationStore() {}, realmContext);
   private final StorageCredentialsVendor storageCredentialsVendor;
-
   private StorageCredentialCache storageCredentialCache;
 
   public StorageCredentialCacheTest() {
@@ -80,12 +81,16 @@ public class StorageCredentialCacheTest {
                 Mockito.anyBoolean(),
                 Mockito.anySet(),
                 Mockito.anySet(),
+                Mockito.any(),
                 Mockito.any()))
         .thenReturn(badResult);
     PolarisEntity polarisEntity =
         new PolarisEntity(
             new PolarisBaseEntity(
                 1, 2, PolarisEntityType.CATALOG, 
PolarisEntitySubType.ICEBERG_TABLE, 0, "name"));
+
+    PolarisPrincipal polarisPrincipal = PolarisPrincipal.of("principal", 
Map.of(), Set.of());
+
     Assertions.assertThatThrownBy(
             () ->
                 storageCredentialCache.getOrGenerateSubScopeCreds(
@@ -94,6 +99,7 @@ public class StorageCredentialCacheTest {
                     true,
                     Set.of("s3://bucket1/path"),
                     Set.of("s3://bucket3/path"),
+                    polarisPrincipal,
                     Optional.empty()))
         .isInstanceOf(UnprocessableEntityException.class)
         .hasMessage("Failed to get subscoped credentials: extra_error_info");
@@ -109,6 +115,7 @@ public class StorageCredentialCacheTest {
                 Mockito.anyBoolean(),
                 Mockito.anySet(),
                 Mockito.anySet(),
+                Mockito.any(),
                 Mockito.any()))
         .thenReturn(mockedScopedCreds.get(0))
         .thenReturn(mockedScopedCreds.get(1))
@@ -117,6 +124,7 @@ public class StorageCredentialCacheTest {
         new PolarisBaseEntity(
             1, 2, PolarisEntityType.CATALOG, 
PolarisEntitySubType.ICEBERG_TABLE, 0, "name");
     PolarisEntity polarisEntity = new PolarisEntity(baseEntity);
+    PolarisPrincipal polarisPrincipal = PolarisPrincipal.of("principal", 
Map.of(), Set.of());
 
     // add an item to the cache
     storageCredentialCache.getOrGenerateSubScopeCreds(
@@ -125,6 +133,7 @@ public class StorageCredentialCacheTest {
         true,
         Set.of("s3://bucket1/path", "s3://bucket2/path"),
         Set.of("s3://bucket3/path", "s3://bucket4/path"),
+        polarisPrincipal,
         Optional.empty());
     
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(1);
 
@@ -135,8 +144,92 @@ public class StorageCredentialCacheTest {
         true,
         Set.of("s3://bucket1/path", "s3://bucket2/path"),
         Set.of("s3://bucket3/path", "s3://bucket4/path"),
+        polarisPrincipal,
+        Optional.empty());
+    
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(1);
+
+    Optional<PolarisPrincipal> emptyPrincipal = Optional.empty();
+
+    storageCredentialCache.getOrGenerateSubScopeCreds(
+        storageCredentialsVendor,
+        polarisEntity,
+        true,
+        Set.of("s3://bucket1/path", "s3://bucket2/path"),
+        Set.of("s3://bucket3/path", "s3://bucket4/path"),
+        polarisPrincipal,
+        Optional.empty());
+    
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(1);
+  }
+
+  private void testCacheForAnotherPrincipal(boolean hitExpected) {
+    List<ScopedCredentialsResult> mockedScopedCreds =
+        getFakeScopedCreds(3, /* expireSoon= */ false);
+    Mockito.when(
+            storageCredentialsVendor.getSubscopedCredsForEntity(
+                Mockito.any(),
+                Mockito.anyBoolean(),
+                Mockito.anySet(),
+                Mockito.anySet(),
+                Mockito.any(),
+                Mockito.any()))
+        .thenReturn(mockedScopedCreds.get(0))
+        .thenReturn(mockedScopedCreds.get(1))
+        .thenReturn(mockedScopedCreds.get(1));
+    PolarisBaseEntity baseEntity =
+        new PolarisBaseEntity(
+            1, 2, PolarisEntityType.CATALOG, 
PolarisEntitySubType.ICEBERG_TABLE, 0, "name");
+    PolarisEntity polarisEntity = new PolarisEntity(baseEntity);
+    PolarisPrincipal polarisPrincipal = PolarisPrincipal.of("principal", 
Map.of(), Set.of());
+    PolarisPrincipal anotherPolarisPrincipal =
+        PolarisPrincipal.of("anotherPrincipal", Map.of(), Set.of());
+
+    // add an item to the cache
+    storageCredentialCache.getOrGenerateSubScopeCreds(
+        storageCredentialsVendor,
+        polarisEntity,
+        true,
+        Set.of("s3://bucket1/path", "s3://bucket2/path"),
+        Set.of("s3://bucket3/path", "s3://bucket4/path"),
+        polarisPrincipal,
         Optional.empty());
     
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(1);
+
+    storageCredentialCache.getOrGenerateSubScopeCreds(
+        storageCredentialsVendor,
+        polarisEntity,
+        true,
+        Set.of("s3://bucket1/path", "s3://bucket2/path"),
+        Set.of("s3://bucket3/path", "s3://bucket4/path"),
+        anotherPolarisPrincipal,
+        Optional.empty());
+    
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(hitExpected
 ? 1 : 2);
+  }
+
+  @Test
+  public void testCacheHitForAnotherPrincipal() {
+    testCacheForAnotherPrincipal(true);
+  }
+
+  @Test
+  public void testCacheMissForAnotherPrincipal() {
+    Mockito.when(storageCredentialsVendor.getRealmConfig())
+        .thenReturn(
+            new RealmConfigImpl(
+                new PolarisConfigurationStore() {
+                  @SuppressWarnings("unchecked")
+                  @Override
+                  public String getConfiguration(@Nonnull RealmContext ctx, 
String configName) {
+                    if (configName.equals(
+                        
FeatureConfiguration.INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL
+                            .key())) {
+                      return "true";
+                    }
+                    return null;
+                  }
+                },
+                () -> "realm"));
+
+    testCacheForAnotherPrincipal(false);
   }
 
   @RepeatedTest(10)
@@ -149,6 +242,7 @@ public class StorageCredentialCacheTest {
                 Mockito.anyBoolean(),
                 Mockito.anySet(),
                 Mockito.anySet(),
+                Mockito.any(),
                 Mockito.any()))
         .thenReturn(mockedScopedCreds.get(0))
         .thenReturn(mockedScopedCreds.get(1))
@@ -157,6 +251,9 @@ public class StorageCredentialCacheTest {
         new PolarisBaseEntity(
             1, 2, PolarisEntityType.CATALOG, 
PolarisEntitySubType.ICEBERG_TABLE, 0, "name");
     PolarisEntity polarisEntity = new PolarisEntity(baseEntity);
+
+    PolarisPrincipal polarisPrincipal = PolarisPrincipal.of("principal", 
Map.of(), Set.of());
+
     StorageCredentialCacheKey cacheKey =
         StorageCredentialCacheKey.of(
             realmContext.getRealmIdentifier(),
@@ -164,7 +261,8 @@ public class StorageCredentialCacheTest {
             true,
             Set.of("s3://bucket1/path", "s3://bucket2/path"),
             Set.of("s3://bucket/path"),
-            Optional.empty());
+            Optional.empty(),
+            Optional.of(polarisPrincipal));
 
     // the entry will be evicted immediately because the token is expired
     storageCredentialCache.getOrGenerateSubScopeCreds(
@@ -173,6 +271,7 @@ public class StorageCredentialCacheTest {
         true,
         Set.of("s3://bucket1/path", "s3://bucket2/path"),
         Set.of("s3://bucket/path"),
+        polarisPrincipal,
         Optional.empty());
     
Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull();
 
@@ -182,6 +281,7 @@ public class StorageCredentialCacheTest {
         true,
         Set.of("s3://bucket1/path", "s3://bucket2/path"),
         Set.of("s3://bucket/path"),
+        polarisPrincipal,
         Optional.empty());
     
Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull();
 
@@ -191,6 +291,7 @@ public class StorageCredentialCacheTest {
         true,
         Set.of("s3://bucket1/path", "s3://bucket2/path"),
         Set.of("s3://bucket/path"),
+        polarisPrincipal,
         Optional.empty());
     
Assertions.assertThat(storageCredentialCache.getIfPresent(cacheKey)).isNull();
   }
@@ -205,11 +306,13 @@ public class StorageCredentialCacheTest {
                 Mockito.anyBoolean(),
                 Mockito.anySet(),
                 Mockito.anySet(),
+                Mockito.any(),
                 Mockito.any()))
         .thenReturn(mockedScopedCreds.get(0))
         .thenReturn(mockedScopedCreds.get(1))
         .thenReturn(mockedScopedCreds.get(2));
     List<PolarisEntity> entityList = getPolarisEntities();
+    PolarisPrincipal polarisPrincipal = PolarisPrincipal.of("principal", 
Map.of(), Set.of());
     int cacheSize = 0;
     // different catalog will generate new cache entries
     for (PolarisEntity entity : entityList) {
@@ -219,6 +322,7 @@ public class StorageCredentialCacheTest {
           true,
           Set.of("s3://bucket1/path", "s3://bucket2/path"),
           Set.of("s3://bucket/path"),
+          polarisPrincipal,
           Optional.empty());
       
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize);
     }
@@ -236,6 +340,7 @@ public class StorageCredentialCacheTest {
           /* allowedListAction= */ true,
           Set.of("s3://bucket1/path", "s3://bucket2/path"),
           Set.of("s3://bucket/path"),
+          polarisPrincipal,
           Optional.empty());
       
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize);
     }
@@ -247,6 +352,7 @@ public class StorageCredentialCacheTest {
           /* allowedListAction= */ false,
           Set.of("s3://bucket1/path", "s3://bucket2/path"),
           Set.of("s3://bucket/path"),
+          polarisPrincipal,
           Optional.empty());
       
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize);
     }
@@ -258,6 +364,7 @@ public class StorageCredentialCacheTest {
           /* allowedListAction= */ false,
           Set.of("s3://bucket1/path", "s3://bucket2/path"),
           Set.of("s3://differentbucket/path"),
+          polarisPrincipal,
           Optional.empty());
       
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize);
     }
@@ -274,6 +381,7 @@ public class StorageCredentialCacheTest {
           /* allowedListAction= */ false,
           Set.of("s3://differentbucket/path", "s3://bucket2/path"),
           Set.of("s3://bucket/path"),
+          polarisPrincipal,
           Optional.empty());
       
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(++cacheSize);
     }
@@ -290,11 +398,13 @@ public class StorageCredentialCacheTest {
                 Mockito.anyBoolean(),
                 Mockito.anySet(),
                 Mockito.anySet(),
+                Mockito.any(),
                 Mockito.any()))
         .thenReturn(mockedScopedCreds.get(0))
         .thenReturn(mockedScopedCreds.get(1))
         .thenReturn(mockedScopedCreds.get(2));
     List<PolarisEntity> entityList = getPolarisEntities();
+    PolarisPrincipal polarisPrincipal = PolarisPrincipal.of("principal", 
Map.of(), Set.of());
     for (PolarisEntity entity : entityList) {
       storageCredentialCache.getOrGenerateSubScopeCreds(
           storageCredentialsVendor,
@@ -302,6 +412,7 @@ public class StorageCredentialCacheTest {
           true,
           Set.of("s3://bucket1/path", "s3://bucket2/path"),
           Set.of("s3://bucket3/path", "s3://bucket4/path"),
+          polarisPrincipal,
           Optional.empty());
     }
     
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size());
@@ -314,6 +425,7 @@ public class StorageCredentialCacheTest {
           true,
           Set.of("s3://bucket1/path", "s3://bucket2/path"),
           Set.of("s3://bucket3/path", "s3://bucket4/path"),
+          polarisPrincipal,
           Optional.empty());
       
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size());
     }
@@ -326,6 +438,7 @@ public class StorageCredentialCacheTest {
           true,
           Set.of("s3://bucket1/path", "s3://bucket2/path"),
           Set.of("s3://bucket3/path", "s3://bucket4/path"),
+          polarisPrincipal,
           Optional.empty());
       
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size());
     }
@@ -337,6 +450,7 @@ public class StorageCredentialCacheTest {
           true,
           Set.of("s3://bucket2/path", "s3://bucket1/path"),
           Set.of("s3://bucket3/path", "s3://bucket4/path"),
+          polarisPrincipal,
           Optional.empty());
       
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size());
     }
@@ -349,6 +463,7 @@ public class StorageCredentialCacheTest {
           true,
           Set.of("s3://bucket2/path", "s3://bucket1/path"),
           Set.of("s3://bucket4/path", "s3://bucket3/path"),
+          polarisPrincipal,
           Optional.empty());
       
Assertions.assertThat(storageCredentialCache.getEstimatedSize()).isEqualTo(entityList.size());
     }
@@ -428,9 +543,11 @@ public class StorageCredentialCacheTest {
                 Mockito.anyBoolean(),
                 Mockito.anySet(),
                 Mockito.anySet(),
+                Mockito.any(),
                 Mockito.any()))
         .thenReturn(properties);
     List<PolarisEntity> entityList = getPolarisEntities();
+    PolarisPrincipal polarisPrincipal = PolarisPrincipal.of("principal", 
Map.of(), Set.of());
 
     StorageAccessConfig config =
         storageCredentialCache.getOrGenerateSubScopeCreds(
@@ -439,6 +556,7 @@ public class StorageCredentialCacheTest {
             true,
             Set.of("s3://bucket1/path", "s3://bucket2/path"),
             Set.of("s3://bucket3/path", "s3://bucket4/path"),
+            polarisPrincipal,
             Optional.empty());
     Assertions.assertThat(config.credentials())
         .containsExactly(Map.entry("s3.secret-access-key", 
"super-secret-123"));
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java
index e74274656..dc6e98cf8 100644
--- 
a/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java
+++ 
b/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java
@@ -23,8 +23,15 @@ import static org.assertj.core.api.Assertions.assertThat;
 import jakarta.annotation.Nonnull;
 import java.time.Instant;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.config.FeatureConfiguration;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.config.RealmConfig;
+import org.apache.polaris.core.config.RealmConfigImpl;
+import org.apache.polaris.core.context.RealmContext;
 import org.apache.polaris.core.storage.BaseStorageIntegrationTest;
 import org.apache.polaris.core.storage.StorageAccessConfig;
 import org.apache.polaris.core.storage.StorageAccessProperty;
@@ -52,6 +59,21 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
 
   public static final Instant EXPIRE_TIME = Instant.now().plusMillis(3600_000);
 
+  public static final RealmConfig PRINCIPAL_INCLUDER_REALM_CONFIG =
+      new RealmConfigImpl(
+          new PolarisConfigurationStore() {
+            @SuppressWarnings("unchecked")
+            @Override
+            public String getConfiguration(@Nonnull RealmContext ctx, String 
configName) {
+              if (configName.equals(
+                  
FeatureConfiguration.INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL.key())) {
+                return "true";
+              }
+              return null;
+            }
+          },
+          () -> "realm");
+
   public static final AssumeRoleResponse ASSUME_ROLE_RESPONSE =
       AssumeRoleResponse.builder()
           .credentials(
@@ -63,6 +85,8 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                   .build())
           .build();
   public static final String AWS_PARTITION = "aws";
+  public static final PolarisPrincipal POLARIS_PRINCIPAL =
+      PolarisPrincipal.of("test-principal", Map.of(), Set.of());
 
   @ParameterizedTest
   @ValueSource(strings = {"s3a", "s3"})
@@ -70,6 +94,7 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
     StsClient stsClient = Mockito.mock(StsClient.class);
     String roleARN = "arn:aws:iam::012345678901:role/jdoe";
     String externalId = "externalId";
+
     Mockito.when(stsClient.assumeRole(Mockito.isA(AssumeRoleRequest.class)))
         .thenAnswer(
             invocation -> {
@@ -78,6 +103,8 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                   
.asInstanceOf(InstanceOfAssertFactories.type(AssumeRoleRequest.class))
                   .returns(externalId, AssumeRoleRequest::externalId)
                   .returns(roleARN, AssumeRoleRequest::roleArn)
+                  .returns(
+                      "PolarisAwsCredentialsStorageIntegration", 
AssumeRoleRequest::roleSessionName)
                   // ensure that the policy content does not refer to S3A
                   .extracting(AssumeRoleRequest::policy)
                   .doesNotMatch(s -> s.contains("s3a"));
@@ -97,6 +124,7 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                 true,
                 Set.of(warehouseDir + "/namespace/table"),
                 Set.of(warehouseDir + "/namespace/table"),
+                POLARIS_PRINCIPAL,
                 Optional.of("/namespace/table/credentials"));
     assertThat(storageAccessConfig.credentials())
         .isNotEmpty()
@@ -112,6 +140,43 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
             "/namespace/table/credentials");
   }
 
+  // uses different realm config with 
INCLUDE_PRINCIPAL_NAME_IN_SUBSCOPED_CREDENTIAL set to true
+  // tests that the resulting role session name includes principal name
+  @Test
+  public void testGetSubscopedCredsRoleSessionNameWithPrincipalIncluded() {
+    StsClient stsClient = Mockito.mock(StsClient.class);
+    String roleARN = "arn:aws:iam::012345678901:role/jdoe";
+    String externalId = "externalId";
+
+    Mockito.when(stsClient.assumeRole(Mockito.isA(AssumeRoleRequest.class)))
+        .thenAnswer(
+            invocation -> {
+              assertThat(invocation.getArguments()[0])
+                  .isInstanceOf(AssumeRoleRequest.class)
+                  
.asInstanceOf(InstanceOfAssertFactories.type(AssumeRoleRequest.class))
+                  .returns(externalId, AssumeRoleRequest::externalId)
+                  .returns(roleARN, AssumeRoleRequest::roleArn)
+                  .returns("polaris-test-principal", 
AssumeRoleRequest::roleSessionName);
+              return ASSUME_ROLE_RESPONSE;
+            });
+    String warehouseDir = "s3://bucket/path/to/warehouse";
+    StorageAccessConfig storageAccessConfig =
+        new AwsCredentialsStorageIntegration(
+                AwsStorageConfigurationInfo.builder()
+                    .addAllowedLocation(warehouseDir)
+                    .roleARN(roleARN)
+                    .externalId(externalId)
+                    .build(),
+                stsClient)
+            .getSubscopedCreds(
+                PRINCIPAL_INCLUDER_REALM_CONFIG,
+                true,
+                Set.of(warehouseDir + "/namespace/table"),
+                Set.of(warehouseDir + "/namespace/table"),
+                POLARIS_PRINCIPAL,
+                Optional.of("/namespace/table/credentials"));
+  }
+
   @ParameterizedTest
   @ValueSource(strings = {AWS_PARTITION, "aws-cn", "aws-us-gov"})
   public void testGetSubscopedCredsInlinePolicy(String awsPartition) {
@@ -250,6 +315,7 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                     true,
                     Set.of(s3Path(bucket, firstPath), s3Path(bucket, 
secondPath)),
                     Set.of(s3Path(bucket, firstPath)),
+                    POLARIS_PRINCIPAL,
                     Optional.empty());
         assertThat(storageAccessConfig.credentials())
             .isNotEmpty()
@@ -351,6 +417,7 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                 false, /* allowList = false*/
                 Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)),
                 Set.of(s3Path(bucket, firstPath)),
+                POLARIS_PRINCIPAL,
                 Optional.empty());
     assertThat(storageAccessConfig.credentials())
         .isNotEmpty()
@@ -466,6 +533,7 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                 true, /* allowList = true */
                 Set.of(s3Path(bucket, firstPath), s3Path(bucket, secondPath)),
                 Set.of(),
+                POLARIS_PRINCIPAL,
                 Optional.empty());
     assertThat(storageAccessConfig.credentials())
         .isNotEmpty()
@@ -553,6 +621,7 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                 true, /* allowList = true */
                 Set.of(),
                 Set.of(),
+                POLARIS_PRINCIPAL,
                 Optional.empty());
     assertThat(storageAccessConfig.credentials())
         .isNotEmpty()
@@ -596,6 +665,7 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                     true, /* allowList = true */
                     Set.of(),
                     Set.of(),
+                    POLARIS_PRINCIPAL,
                     Optional.empty());
         assertThat(storageAccessConfig.credentials())
             .containsEntry(StorageAccessProperty.AWS_TOKEN.getPropertyName(), 
"sess")
@@ -637,6 +707,7 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                     true, /* allowList = true */
                     Set.of(),
                     Set.of(),
+                    POLARIS_PRINCIPAL,
                     Optional.empty());
         assertThat(storageAccessConfig.credentials())
             .isNotEmpty()
@@ -657,6 +728,7 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                             true, /* allowList = true */
                             Set.of(),
                             Set.of(),
+                            POLARIS_PRINCIPAL,
                             Optional.empty()))
             .isInstanceOf(IllegalArgumentException.class);
         break;
@@ -720,6 +792,7 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
             true,
             Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")),
             Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")),
+            POLARIS_PRINCIPAL,
             Optional.empty());
 
     // Test with allowed KMS keys and read-only permissions
@@ -765,6 +838,7 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
             true,
             Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")),
             Set.of(),
+            POLARIS_PRINCIPAL,
             Optional.empty());
 
     // Test with no KMS keys and read-only (should add wildcard KMS access)
@@ -801,6 +875,7 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
             true,
             Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")),
             Set.of(),
+            POLARIS_PRINCIPAL,
             Optional.empty());
 
     // Test with no KMS keys and write permissions (should not add KMS 
statement)
@@ -834,9 +909,51 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
             true,
             Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")),
             Set.of(s3Path(bucket, warehouseKeyPrefix + "/table")),
+            POLARIS_PRINCIPAL,
             Optional.empty());
   }
 
+  @Test
+  public void testGetSubscopedCredsLongPrincipalName() {
+    StsClient stsClient = Mockito.mock(StsClient.class);
+    String roleARN = "arn:aws:iam::012345678901:role/jdoe";
+    String externalId = "externalId";
+    PolarisPrincipal polarisPrincipalWithLongName =
+        PolarisPrincipal.of(
+            
"very-long-principal-name-that-exceeds-the-maximum-allowed-length-of-64-characters",
+            Map.of(),
+            Set.of());
+
+    Mockito.when(stsClient.assumeRole(Mockito.isA(AssumeRoleRequest.class)))
+        .thenAnswer(
+            invocation -> {
+              assertThat(invocation.getArguments()[0])
+                  .isInstanceOf(AssumeRoleRequest.class)
+                  
.asInstanceOf(InstanceOfAssertFactories.type(AssumeRoleRequest.class))
+                  .returns(externalId, AssumeRoleRequest::externalId)
+                  .returns(roleARN, AssumeRoleRequest::roleArn)
+                  .returns(
+                      
"polaris-very-long-principal-name-that-exceeds-the-maximum-allowe",
+                      AssumeRoleRequest::roleSessionName);
+              return ASSUME_ROLE_RESPONSE;
+            });
+    String warehouseDir = "s3://bucket/path/to/warehouse";
+    new AwsCredentialsStorageIntegration(
+            AwsStorageConfigurationInfo.builder()
+                .addAllowedLocation(warehouseDir)
+                .roleARN(roleARN)
+                .externalId(externalId)
+                .build(),
+            stsClient)
+        .getSubscopedCreds(
+            PRINCIPAL_INCLUDER_REALM_CONFIG,
+            true,
+            Set.of(warehouseDir + "/namespace/table"),
+            Set.of(warehouseDir + "/namespace/table"),
+            polarisPrincipalWithLongName,
+            Optional.of("/namespace/table/credentials"));
+  }
+
   private static @Nonnull String s3Arn(String partition, String bucket, String 
keyPrefix) {
     String bucketArn = "arn:" + partition + ":s3:::" + bucket;
     if (keyPrefix == null) {
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java
index 42a8bd327..e371afde2 100644
--- 
a/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java
+++ 
b/polaris-core/src/test/java/org/apache/polaris/service/storage/azure/AzureCredentialStorageIntegrationTest.java
@@ -45,8 +45,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Stream;
+import org.apache.polaris.core.auth.PolarisPrincipal;
 import org.apache.polaris.core.storage.BaseStorageIntegrationTest;
 import org.apache.polaris.core.storage.StorageAccessConfig;
 import org.apache.polaris.core.storage.StorageAccessProperty;
@@ -354,6 +357,7 @@ public class AzureCredentialStorageIntegrationTest extends 
BaseStorageIntegratio
         allowListAction,
         new HashSet<>(allowedReadLoc),
         new HashSet<>(allowedWriteLoc),
+        PolarisPrincipal.of("principal", Map.of(), Set.of()),
         Optional.empty());
   }
 
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java
index 74ec73303..00c203d81 100644
--- 
a/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java
+++ 
b/polaris-core/src/test/java/org/apache/polaris/service/storage/gcp/GcpCredentialsStorageIntegrationTest.java
@@ -47,8 +47,10 @@ import java.util.Arrays;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import org.apache.polaris.core.auth.PolarisPrincipal;
 import org.apache.polaris.core.storage.BaseStorageIntegrationTest;
 import org.apache.polaris.core.storage.StorageAccessConfig;
 import org.apache.polaris.core.storage.StorageAccessProperty;
@@ -181,6 +183,7 @@ class GcpCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
         allowListAction,
         new HashSet<>(allowedReadLoc),
         new HashSet<>(allowedWriteLoc),
+        PolarisPrincipal.of("principal", Map.of(), Set.of()),
         Optional.of(REFRESH_ENDPOINT));
   }
 
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java
index d6316c6e7..e49bee99a 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/io/StorageAccessConfigProvider.java
@@ -25,6 +25,7 @@ import jakarta.inject.Inject;
 import java.util.Optional;
 import java.util.Set;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.polaris.core.auth.PolarisPrincipal;
 import org.apache.polaris.core.config.FeatureConfiguration;
 import org.apache.polaris.core.entity.PolarisEntity;
 import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
@@ -49,13 +50,16 @@ public class StorageAccessConfigProvider {
 
   private final StorageCredentialCache storageCredentialCache;
   private final StorageCredentialsVendor storageCredentialsVendor;
+  private final PolarisPrincipal polarisPrincipal;
 
   @Inject
   public StorageAccessConfigProvider(
       StorageCredentialCache storageCredentialCache,
-      StorageCredentialsVendor storageCredentialsVendor) {
+      StorageCredentialsVendor storageCredentialsVendor,
+      PolarisPrincipal polarisPrincipal) {
     this.storageCredentialCache = storageCredentialCache;
     this.storageCredentialsVendor = storageCredentialsVendor;
+    this.polarisPrincipal = polarisPrincipal;
   }
 
   /**
@@ -119,6 +123,7 @@ public class StorageAccessConfigProvider {
             allowList,
             tableLocations,
             writeLocations,
+            polarisPrincipal,
             refreshCredentialsEndpoint);
 
     LOGGER
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java
index c9726326f..0042ac84e 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/config/ServiceProducers.java
@@ -30,9 +30,6 @@ import jakarta.enterprise.inject.Disposes;
 import jakarta.enterprise.inject.Instance;
 import jakarta.enterprise.inject.Produces;
 import jakarta.inject.Singleton;
-import jakarta.ws.rs.core.Context;
-import jakarta.ws.rs.core.SecurityContext;
-import java.security.Principal;
 import java.time.Clock;
 import java.util.stream.Collectors;
 import org.apache.polaris.core.PolarisCallContext;
@@ -41,7 +38,6 @@ import org.apache.polaris.core.PolarisDiagnostics;
 import org.apache.polaris.core.auth.DefaultPolarisAuthorizerFactory;
 import org.apache.polaris.core.auth.PolarisAuthorizer;
 import org.apache.polaris.core.auth.PolarisAuthorizerFactory;
-import org.apache.polaris.core.auth.PolarisPrincipal;
 import org.apache.polaris.core.config.PolarisConfigurationStore;
 import org.apache.polaris.core.config.RealmConfig;
 import org.apache.polaris.core.context.CallContext;
@@ -190,20 +186,6 @@ public class ServiceProducers {
     return new ResolutionManifestFactoryImpl(diagnostics, realmContext, 
resolverFactory);
   }
 
-  @Produces
-  @RequestScoped
-  public PolarisPrincipal polarisPrincipal(
-      PolarisDiagnostics diagnostics, @Context SecurityContext 
securityContext) {
-    Principal userPrincipal = securityContext.getUserPrincipal();
-    diagnostics.checkNotNull(userPrincipal, "null_security_context_principal");
-    diagnostics.check(
-        userPrincipal instanceof PolarisPrincipal,
-        "unexpected_principal_type",
-        "class={}",
-        userPrincipal.getClass().getName());
-    return (PolarisPrincipal) userPrincipal;
-  }
-
   // Polaris service beans - selected from @Identifier-annotated beans
 
   @Produces
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/context/catalog/PolarisPrincipalHolder.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/context/catalog/PolarisPrincipalHolder.java
new file mode 100644
index 000000000..5c01ad7b2
--- /dev/null
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/context/catalog/PolarisPrincipalHolder.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.context.catalog;
+
+import io.quarkus.security.identity.CurrentIdentityAssociation;
+import io.quarkus.security.identity.SecurityIdentity;
+import jakarta.enterprise.context.RequestScoped;
+import jakarta.enterprise.inject.Produces;
+import jakarta.enterprise.inject.UnsatisfiedResolutionException;
+import java.security.Principal;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.polaris.core.PolarisDiagnostics;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+
+@RequestScoped
+public class PolarisPrincipalHolder {
+
+  private final AtomicReference<PolarisPrincipal> polarisPrincipal = new 
AtomicReference<>();
+
+  @Produces
+  @RequestScoped
+  public PolarisPrincipal get(
+      PolarisDiagnostics diagnostics, CurrentIdentityAssociation 
currentIdentityAssociation) {
+    PolarisPrincipal setPrincipal = polarisPrincipal.get();
+
+    if (setPrincipal != null) {
+      return setPrincipal;
+    }
+
+    SecurityIdentity identity =
+        
currentIdentityAssociation.getDeferredIdentity().subscribeAsCompletionStage().getNow(null);
+
+    if (identity == null) {
+      throw new UnsatisfiedResolutionException("Not authenticated");
+    }
+
+    Principal userPrincipal = identity.getPrincipal();
+
+    diagnostics.check(
+        userPrincipal instanceof PolarisPrincipal,
+        "unexpected_principal_type",
+        "class={}",
+        userPrincipal.getClass().getName());
+
+    return (PolarisPrincipal) userPrincipal;
+  }
+
+  public void set(PolarisPrincipal rc) {
+    if (!polarisPrincipal.compareAndSet(null, rc)) {
+      throw new IllegalStateException("PolarisPrincipal already set");
+    }
+  }
+}
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java
index 706acb422..504922290 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/storage/PolarisStorageIntegrationProviderImpl.java
@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.function.Supplier;
+import org.apache.polaris.core.auth.PolarisPrincipal;
 import org.apache.polaris.core.config.RealmConfig;
 import org.apache.polaris.core.storage.PolarisStorageActions;
 import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
@@ -114,6 +115,7 @@ public class PolarisStorageIntegrationProviderImpl 
implements PolarisStorageInte
                   boolean allowListOperation,
                   @Nonnull Set<String> allowedReadLocations,
                   @Nonnull Set<String> allowedWriteLocations,
+                  @Nonnull PolarisPrincipal polarisPrincipal,
                   Optional<String> refreshCredentialsEndpoint) {
                 return 
StorageAccessConfig.builder().supportsCredentialVending(false).build();
               }
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java
index aeaa53260..ca25b9357 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/task/TaskExecutorImpl.java
@@ -40,12 +40,15 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.function.TriConsumer;
+import org.apache.polaris.core.auth.ImmutablePolarisPrincipal;
+import org.apache.polaris.core.auth.PolarisPrincipal;
 import org.apache.polaris.core.context.CallContext;
 import org.apache.polaris.core.entity.PolarisBaseEntity;
 import org.apache.polaris.core.entity.PolarisEntityType;
 import org.apache.polaris.core.entity.TaskEntity;
 import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
 import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.service.context.catalog.PolarisPrincipalHolder;
 import org.apache.polaris.service.context.catalog.RealmContextHolder;
 import org.apache.polaris.service.events.AfterAttemptTaskEvent;
 import org.apache.polaris.service.events.BeforeAttemptTaskEvent;
@@ -70,6 +73,8 @@ public class TaskExecutorImpl implements TaskExecutor {
   private final MetaStoreManagerFactory metaStoreManagerFactory;
   private final TaskFileIOSupplier fileIOSupplier;
   private final RealmContextHolder realmContextHolder;
+  private final PolarisPrincipalHolder polarisPrincipalHolder;
+  private final PolarisPrincipal polarisPrincipal;
   private final List<TaskHandler> taskHandlers = new CopyOnWriteArrayList<>();
   private final Optional<TriConsumer<Long, Boolean, Throwable>> errorHandler;
   private final PolarisEventListener polarisEventListener;
@@ -78,7 +83,7 @@ public class TaskExecutorImpl implements TaskExecutor {
 
   @SuppressWarnings("unused") // Required by CDI
   protected TaskExecutorImpl() {
-    this(null, null, null, null, null, null, null, null, null);
+    this(null, null, null, null, null, null, null, null, null, null, null);
   }
 
   @Inject
@@ -92,7 +97,9 @@ public class TaskExecutorImpl implements TaskExecutor {
       RealmContextHolder realmContextHolder,
       PolarisEventListener polarisEventListener,
       PolarisEventMetadataFactory eventMetadataFactory,
-      @Nullable Tracer tracer) {
+      @Nullable Tracer tracer,
+      PolarisPrincipalHolder polarisPrincipalHolder,
+      PolarisPrincipal polarisPrincipal) {
     this.executor = executor;
     this.clock = clock;
     this.metaStoreManagerFactory = metaStoreManagerFactory;
@@ -101,6 +108,8 @@ public class TaskExecutorImpl implements TaskExecutor {
     this.polarisEventListener = polarisEventListener;
     this.eventMetadataFactory = eventMetadataFactory;
     this.tracer = tracer;
+    this.polarisPrincipalHolder = polarisPrincipalHolder;
+    this.polarisPrincipal = polarisPrincipal;
 
     if (errorHandler != null && errorHandler.isResolvable()) {
       this.errorHandler = Optional.of(errorHandler.get());
@@ -145,6 +154,7 @@ public class TaskExecutorImpl implements TaskExecutor {
     // Note: PolarisCallContext has request-scoped beans as well, and must be 
cloned.
     // FIXME replace with context propagation?
     CallContext clone = callContext.copy();
+
     // Capture the metadata now in order to capture the principal and request 
ID, if any.
     PolarisEventMetadata eventMetadata = eventMetadataFactory.create();
     tryHandleTask(taskEntityId, clone, eventMetadata, null, 1);
@@ -160,9 +170,14 @@ public class TaskExecutorImpl implements TaskExecutor {
       return CompletableFuture.failedFuture(e);
     }
     String realmId = callContext.getRealmContext().getRealmIdentifier();
+
+    PolarisPrincipal principalClone =
+        ImmutablePolarisPrincipal.builder().from(polarisPrincipal).build();
+
     return CompletableFuture.runAsync(
             () -> {
-              handleTaskWithTracing(realmId, taskEntityId, callContext, 
eventMetadata, attempt);
+              handleTaskWithTracing(
+                  realmId, taskEntityId, callContext, principalClone, 
eventMetadata, attempt);
               errorHandler.ifPresent(h -> h.accept(taskEntityId, false, null));
             },
             executor)
@@ -234,10 +249,16 @@ public class TaskExecutorImpl implements TaskExecutor {
       String realmId,
       long taskEntityId,
       CallContext callContext,
+      PolarisPrincipal principal,
       PolarisEventMetadata eventMetadata,
       int attempt) {
     // Note: each call to this method runs in a new CDI request context
+
     realmContextHolder.set(() -> realmId);
+    // since this is now a different context we store clone of the principal 
in a holder object
+    // which essentially reauthenticates the principal. PolarisPrincipal bean 
always looks for a
+    // principal set in PolarisPrincipalHolder first and assumes that identity 
if set.
+    polarisPrincipalHolder.set(principal);
 
     if (tracer == null) {
       handleTask(taskEntityId, callContext, eventMetadata, attempt);
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/RootPrincipalAugmentor.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/RootPrincipalAugmentor.java
new file mode 100644
index 000000000..b6127a498
--- /dev/null
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/RootPrincipalAugmentor.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.catalog;
+
+import io.quarkus.security.identity.AuthenticationRequestContext;
+import io.quarkus.security.identity.SecurityIdentity;
+import io.quarkus.security.identity.SecurityIdentityAugmentor;
+import io.quarkus.security.runtime.QuarkusSecurityIdentity;
+import io.smallrye.mutiny.Uni;
+import jakarta.enterprise.context.RequestScoped;
+import jakarta.inject.Inject;
+import java.util.Set;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PrincipalEntity;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+
+@RequestScoped
+public class RootPrincipalAugmentor implements SecurityIdentityAugmentor {
+
+  @Inject PolarisMetaStoreManager innerMetaStoreManager;
+  @Inject CallContext innerCallContext;
+
+  @ConfigProperty(name = "polaris.test.rootAugmentor.enabled", defaultValue = 
"false")
+  boolean enabled;
+
+  @Override
+  public Uni<SecurityIdentity> augment(
+      SecurityIdentity identity, AuthenticationRequestContext context) {
+    if (!enabled || !identity.isAnonymous()) {
+      return Uni.createFrom().item(identity);
+    }
+
+    PrincipalEntity rootPrincipal =
+        innerMetaStoreManager
+            .findRootPrincipal(innerCallContext.getPolarisCallContext())
+            .orElseThrow();
+
+    PolarisPrincipal principal = PolarisPrincipal.of(rootPrincipal, 
Set.of("service_admin"));
+
+    return Uni.createFrom()
+        .item(
+            QuarkusSecurityIdentity.builder()
+                .setPrincipal(principal)
+                .addRole("service_admin")
+                .setAnonymous(false)
+                .build());
+  }
+}
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
index 01b91b60f..b59876140 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/AbstractIcebergCatalogTest.java
@@ -108,7 +108,6 @@ import org.apache.polaris.core.entity.PolarisBaseEntity;
 import org.apache.polaris.core.entity.PolarisEntity;
 import org.apache.polaris.core.entity.PolarisEntitySubType;
 import org.apache.polaris.core.entity.PolarisEntityType;
-import org.apache.polaris.core.entity.PrincipalEntity;
 import org.apache.polaris.core.entity.TaskEntity;
 import org.apache.polaris.core.entity.table.IcebergTableLikeEntity;
 import org.apache.polaris.core.exceptions.CommitConflictException;
@@ -201,6 +200,7 @@ public abstract class AbstractIcebergCatalogTest extends 
CatalogTests<IcebergCat
           .put("polaris.features.\"ALLOW_TABLE_LOCATION_OVERLAP\"", "true")
           .put("polaris.features.\"LIST_PAGINATION_ENABLED\"", "true")
           .put("polaris.behavior-changes.\"ALLOW_NAMESPACE_CUSTOM_LOCATION\"", 
"true")
+          .put("polaris.test.rootAugmentor.enabled", "true")
           .build();
     }
   }
@@ -241,6 +241,7 @@ public abstract class AbstractIcebergCatalogTest extends 
CatalogTests<IcebergCat
   @Inject StorageAccessConfigProvider storageAccessConfigProvider;
   @Inject FileIOFactory fileIOFactory;
   @Inject TaskFileIOSupplier taskFileIOSupplier;
+  @Inject PolarisPrincipal authenticatedRoot;
 
   private IcebergCatalog catalog;
   private String realmName;
@@ -249,7 +250,7 @@ public abstract class AbstractIcebergCatalogTest extends 
CatalogTests<IcebergCat
   private ResolverFactory resolverFactory;
   private InMemoryFileIO fileIO;
   private PolarisEntity catalogEntity;
-  private PolarisPrincipal authenticatedRoot;
+
   private TestPolarisEventListener testPolarisEventListener;
   private ReservedProperties reservedProperties;
 
@@ -295,10 +296,6 @@ public abstract class AbstractIcebergCatalogTest extends 
CatalogTests<IcebergCat
                 referenceCatalogName);
     QuarkusMock.installMockForType(resolverFactory, ResolverFactory.class);
 
-    PrincipalEntity rootPrincipal =
-        metaStoreManager.findRootPrincipal(polarisContext).orElseThrow();
-    authenticatedRoot = PolarisPrincipal.of(rootPrincipal, Set.of());
-
     PolarisAuthorizer authorizer = new PolarisAuthorizerImpl(realmConfig);
     reservedProperties = new ReservedProperties() {};
 
@@ -1891,6 +1888,7 @@ public abstract class AbstractIcebergCatalogTest extends 
CatalogTests<IcebergCat
                 true,
                 Set.of(tableMetadata.location()),
                 Set.of(tableMetadata.location()),
+                authenticatedRoot,
                 Optional.empty())
             .getStorageAccessConfig()
             .credentials();
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java
index 4151c9fce..a75c82bea 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/task/TaskExecutorImplTest.java
@@ -24,6 +24,7 @@ import org.apache.polaris.core.context.RealmContext;
 import org.apache.polaris.core.entity.TaskEntity;
 import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
 import org.apache.polaris.service.TestServices;
+import org.apache.polaris.service.context.catalog.PolarisPrincipalHolder;
 import org.apache.polaris.service.context.catalog.RealmContextHolder;
 import org.apache.polaris.service.events.AfterAttemptTaskEvent;
 import org.apache.polaris.service.events.BeforeAttemptTaskEvent;
@@ -71,7 +72,9 @@ public class TaskExecutorImplTest {
             new RealmContextHolder(),
             testServices.polarisEventListener(),
             testServices.eventMetadataFactory(),
-            null);
+            null,
+            new PolarisPrincipalHolder(),
+            testServices.principal());
 
     executor.addTaskHandler(
         new TaskHandler() {
diff --git 
a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
 
b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
index 1406ddb39..59af4b5a6 100644
--- 
a/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
+++ 
b/runtime/service/src/testFixtures/java/org/apache/polaris/service/TestServices.java
@@ -305,7 +305,8 @@ public record TestServices(
       StorageCredentialsVendor storageCredentialsVendor =
           new StorageCredentialsVendor(metaStoreManager, callContext);
       StorageAccessConfigProvider storageAccessConfigProvider =
-          new StorageAccessConfigProvider(storageCredentialCache, 
storageCredentialsVendor);
+          new StorageAccessConfigProvider(
+              storageCredentialCache, storageCredentialsVendor, principal);
       FileIOFactory fileIOFactory = fileIOFactorySupplier.get();
 
       TaskExecutor taskExecutor = Mockito.mock(TaskExecutor.class);

Reply via email to