singhpk234 commented on code in PR #2280:
URL: https://github.com/apache/polaris/pull/2280#discussion_r2505113398


##########
CHANGELOG.md:
##########
@@ -36,6 +36,14 @@ request adding CHANGELOG notes for breaking (!) changes and 
possibly other secti
 
 [Iceberg Metrics Reporting]: 
https://iceberg.apache.org/docs/latest/metrics-reporting/
 
+- **S3 remote request signing** has been added, allowing Polaris to work with 
S3-compatible object storage systems.
+  *Remote signing is currently experimental and not enabled by default*. In 
particular, RBAC checks are currently not
+  production-ready. One new table privilege was introduced: 
`TABLE_REMOTE_SIGN`. To enable remote signing:
+    1. Set the system-wide property `REMOTE_SIGNING_ENABLED` or the 
catalog-level `polaris.request-signing.enabled`
+       property to `true`.
+    2. Grant the `TABLE_REMOTE_SIGN` privilege to a catalog role. The role 
must also be granted the `TABLE_READ_DATA`
+       and `TABLE_WRITE_DATA` privileges.

Review Comment:
   [doubt] is there an assertion when even TABLE_REMOTE_SIGN is granted these 
two privieldges would be there ? 
   
   nit : 
   ```suggestion
       2. Grant the `TABLE_REMOTE_SIGN` privilege to a catalog role. The 
catalog role must also be granted the `TABLE_READ_DATA`
          and `TABLE_WRITE_DATA` privileges.
   ```



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/common/CatalogHandler.java:
##########
@@ -394,6 +394,47 @@ protected void authorizeRenameTableLikeOperationOrThrow(
     initializeCatalog();
   }
 
+  protected void authorizeRemoteSigningOrThrow(
+      EnumSet<PolarisAuthorizableOperation> ops, TableIdentifier identifier) {
+
+    ensureResolutionManifestForTable(identifier);
+    PolarisResolvedPathWrapper target =
+        resolutionManifest.getResolvedPath(
+            identifier, PolarisEntityType.TABLE_LIKE, 
PolarisEntitySubType.ICEBERG_TABLE, true);
+
+    // If the table doesn't exist, we still need to check allowed locations 
from the parent
+    // namespace.

Review Comment:
   [doubt] is this a valid state ? 



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -844,6 +869,37 @@ ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING, 
getResolvedCatalogEntity())) {
     return responseBuilder;
   }
 
+  private AccessDelegationMode selectAccessDelegationMode(
+      Set<AccessDelegationMode> delegationModes) {

Review Comment:
   does linking this part of spec 
https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml#L1859
 helps in putting some context that server is allowed to decide which one to 
choose ?



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/io/AccessConfigProvider.java:
##########
@@ -101,4 +120,58 @@ public AccessConfig getAccessConfig(
         storageInfo.get(),
         refreshCredentialsEndpoint);
   }
+
+  /**
+   * Generates a remote signing configuration for accessing table storage at 
explicit locations.
+   *
+   * @param callContext the call context containing realm, principal, and 
security context
+   * @param catalogName the name of the catalog
+   * @param tableIdentifier the table identifier, used for logging and refresh 
endpoint construction
+   * @param resolvedPath the entity hierarchy to search for storage 
configuration
+   * @return {@link AccessConfig} with scoped credentials and metadata; empty 
if no storage config
+   *     found
+   */
+  public AccessConfig getAccessConfigForRemoteSigning(
+      @Nonnull CallContext callContext,
+      @Nonnull String catalogName,
+      @Nonnull TableIdentifier tableIdentifier,
+      @Nonnull PolarisResolvedPathWrapper resolvedPath) {
+    LOGGER
+        .atDebug()
+        .addKeyValue("tableIdentifier", tableIdentifier)
+        .log("Fetching remote signing config for table");
+
+    Optional<PolarisEntity> storageInfo = 
FileIOUtil.findStorageInfoFromHierarchy(resolvedPath);
+    Optional<PolarisStorageConfigurationInfo> configurationInfo =
+        storageInfo
+            .map(PolarisEntity::getInternalPropertiesAsMap)
+            .map(info -> 
info.get(PolarisEntityConstants.getStorageConfigInfoPropertyName()))
+            .map(PolarisStorageConfigurationInfo::deserialize);
+
+    if (configurationInfo.isEmpty()) {
+      LOGGER
+          .atWarn()
+          .addKeyValue("tableIdentifier", tableIdentifier)
+          .log("Table entity has no storage configuration in its hierarchy");
+      return AccessConfig.EMPTY;
+    }
+
+    PolarisStorageIntegration<AwsStorageConfigurationInfo> storageIntegration =
+        
storageIntegrationProvider.getStorageIntegrationForConfig(configurationInfo.get());
+
+    if (!(storageIntegration
+        instanceof AwsCredentialsStorageIntegration 
awsCredentialsStorageIntegration)) {
+      LOGGER
+          .atWarn()
+          .addKeyValue("tableIdentifier", tableIdentifier)
+          .log("Table entity storage integration is not an AWS credentials 
storage integration");
+      return AccessConfig.EMPTY;
+    }
+
+    String prefix = 
prefixParser.catalogNameToPrefix(callContext.getRealmContext(), catalogName);
+    URI signerUri = 
uriInfo.getBaseUriBuilder().path(PolarisResourcePaths.API_PATH_SEGMENT).build();

Review Comment:
   can we add a TODO to handle proxies here ? 



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -844,6 +869,37 @@ ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING, 
getResolvedCatalogEntity())) {
     return responseBuilder;
   }
 
+  private AccessDelegationMode selectAccessDelegationMode(
+      Set<AccessDelegationMode> delegationModes) {
+
+    if (delegationModes.isEmpty()) {
+      return UNKNOWN;
+    }
+
+    if (delegationModes.size() == 1) {
+      return delegationModes.iterator().next();
+    }
+
+    if (delegationModes.contains(VENDED_CREDENTIALS) && 
delegationModes.contains(REMOTE_SIGNING)) {
+
+      boolean skipCredIndirection =
+          
realmConfig.getConfig(FeatureConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION);
+
+      boolean credentialSubscopingAllowed =
+          baseCatalog instanceof IcebergCatalog

Review Comment:
   If its an Iceberg catalog but doesn't support sts wouldn't we prefer 
remote-signing then ?



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/io/AccessConfigProvider.java:
##########
@@ -101,4 +120,58 @@ public AccessConfig getAccessConfig(
         storageInfo.get(),
         refreshCredentialsEndpoint);
   }
+
+  /**
+   * Generates a remote signing configuration for accessing table storage at 
explicit locations.
+   *
+   * @param callContext the call context containing realm, principal, and 
security context
+   * @param catalogName the name of the catalog
+   * @param tableIdentifier the table identifier, used for logging and refresh 
endpoint construction
+   * @param resolvedPath the entity hierarchy to search for storage 
configuration
+   * @return {@link AccessConfig} with scoped credentials and metadata; empty 
if no storage config
+   *     found
+   */
+  public AccessConfig getAccessConfigForRemoteSigning(
+      @Nonnull CallContext callContext,
+      @Nonnull String catalogName,
+      @Nonnull TableIdentifier tableIdentifier,
+      @Nonnull PolarisResolvedPathWrapper resolvedPath) {
+    LOGGER
+        .atDebug()
+        .addKeyValue("tableIdentifier", tableIdentifier)
+        .log("Fetching remote signing config for table");
+
+    Optional<PolarisEntity> storageInfo = 
FileIOUtil.findStorageInfoFromHierarchy(resolvedPath);
+    Optional<PolarisStorageConfigurationInfo> configurationInfo =
+        storageInfo
+            .map(PolarisEntity::getInternalPropertiesAsMap)
+            .map(info -> 
info.get(PolarisEntityConstants.getStorageConfigInfoPropertyName()))
+            .map(PolarisStorageConfigurationInfo::deserialize);
+
+    if (configurationInfo.isEmpty()) {
+      LOGGER
+          .atWarn()
+          .addKeyValue("tableIdentifier", tableIdentifier)
+          .log("Table entity has no storage configuration in its hierarchy");
+      return AccessConfig.EMPTY;
+    }
+
+    PolarisStorageIntegration<AwsStorageConfigurationInfo> storageIntegration =
+        
storageIntegrationProvider.getStorageIntegrationForConfig(configurationInfo.get());
+
+    if (!(storageIntegration
+        instanceof AwsCredentialsStorageIntegration 
awsCredentialsStorageIntegration)) {
+      LOGGER
+          .atWarn()
+          .addKeyValue("tableIdentifier", tableIdentifier)
+          .log("Table entity storage integration is not an AWS credentials 
storage integration");
+      return AccessConfig.EMPTY;
+    }
+
+    String prefix = 
prefixParser.catalogNameToPrefix(callContext.getRealmContext(), catalogName);
+    URI signerUri = 
uriInfo.getBaseUriBuilder().path(PolarisResourcePaths.API_PATH_SEGMENT).build();

Review Comment:
   can we add a TODO in code to handle proxies ?



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -844,6 +869,37 @@ ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING, 
getResolvedCatalogEntity())) {
     return responseBuilder;
   }
 
+  private AccessDelegationMode selectAccessDelegationMode(
+      Set<AccessDelegationMode> delegationModes) {
+
+    if (delegationModes.isEmpty()) {
+      return UNKNOWN;
+    }
+
+    if (delegationModes.size() == 1) {
+      return delegationModes.iterator().next();
+    }
+
+    if (delegationModes.contains(VENDED_CREDENTIALS) && 
delegationModes.contains(REMOTE_SIGNING)) {
+
+      boolean skipCredIndirection =
+          
realmConfig.getConfig(FeatureConfiguration.SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION);
+
+      boolean credentialSubscopingAllowed =
+          baseCatalog instanceof IcebergCatalog

Review Comment:
   what if its an IcebergCatalog but doesn't support STS ? 



##########
runtime/service/src/main/java/org/apache/polaris/service/storage/s3/sign/S3RemoteSigningCatalogHandler.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.storage.s3.sign;
+
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ForbiddenException;
+import org.apache.polaris.core.PolarisDiagnostics;
+import org.apache.polaris.core.auth.PolarisAuthorizableOperation;
+import org.apache.polaris.core.auth.PolarisAuthorizer;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.config.FeatureConfiguration;
+import org.apache.polaris.core.config.RealmConfig;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.CatalogEntity;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory;
+import org.apache.polaris.core.storage.InMemoryStorageIntegration;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
+import org.apache.polaris.core.storage.StorageUtil;
+import org.apache.polaris.service.catalog.common.CatalogHandler;
+import org.apache.polaris.service.catalog.common.CatalogUtils;
+import org.apache.polaris.service.catalog.io.FileIOUtil;
+import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
+import org.apache.polaris.service.s3.sign.model.PolarisS3SignRequest;
+import org.apache.polaris.service.s3.sign.model.PolarisS3SignResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class S3RemoteSigningCatalogHandler extends CatalogHandler implements 
AutoCloseable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(S3RemoteSigningCatalogHandler.class);
+
+  private final CallContextCatalogFactory catalogFactory;
+  private final S3RequestSigner s3RequestSigner;
+
+  private CatalogEntity catalogEntity;
+  private Catalog baseCatalog;
+
+  public S3RemoteSigningCatalogHandler(
+      PolarisDiagnostics diagnostics,
+      CallContext callContext,
+      ResolutionManifestFactory resolutionManifestFactory,
+      CallContextCatalogFactory catalogFactory,
+      PolarisPrincipal polarisPrincipal,
+      String catalogName,
+      PolarisAuthorizer authorizer,
+      S3RequestSigner s3RequestSigner) {
+    super(
+        diagnostics,
+        callContext,
+        resolutionManifestFactory,
+        polarisPrincipal,
+        catalogName,
+        authorizer,
+        // external catalogs are not supported for S3 remote signing
+        null,
+        null);
+    this.catalogFactory = catalogFactory;
+    this.s3RequestSigner = s3RequestSigner;
+  }
+
+  @Override
+  protected void initializeCatalog() {
+    catalogEntity =
+        
CatalogEntity.of(resolutionManifest.getResolvedReferenceCatalogEntity().getRawLeafEntity());
+    if (catalogEntity.isExternal()) {
+      throw new ForbiddenException("Cannot use S3 remote signing with 
federated catalogs.");
+    }
+    baseCatalog =
+        catalogFactory.createCallContextCatalog(callContext, polarisPrincipal, 
resolutionManifest);
+  }
+
+  public PolarisS3SignResponse signS3Request(
+      PolarisS3SignRequest s3SignRequest, TableIdentifier tableIdentifier) {
+
+    LOGGER.debug("Requesting s3 signing for {}: {}", tableIdentifier, 
s3SignRequest);
+
+    PolarisAuthorizableOperation authzOp =
+        s3SignRequest.write()
+            ? PolarisAuthorizableOperation.SIGN_S3_WRITE_REQUEST
+            : PolarisAuthorizableOperation.SIGN_S3_READ_REQUEST;
+
+    authorizeRemoteSigningOrThrow(EnumSet.of(authzOp), tableIdentifier);
+
+    // Must be done after the authorization check, as the auth check creates 
the catalog entity
+    throwIfRemoteSigningNotEnabled(callContext.getRealmConfig(), 
catalogEntity);
+
+    var result =
+        InMemoryStorageIntegration.validateAllowedLocations(
+            callContext.getRealmConfig(),
+            getAllowedLocations(tableIdentifier),
+            getStorageActions(s3SignRequest),
+            getTargetLocations(s3SignRequest));
+
+    if (result.values().stream().anyMatch(r -> r.values().stream().anyMatch(v 
-> !v.isSuccess()))) {
+      throw new ForbiddenException("Requested S3 location is not allowed.");
+    }
+
+    PolarisS3SignResponse s3SignResponse = 
s3RequestSigner.signRequest(s3SignRequest);
+    LOGGER.debug("S3 signing response: {}", s3SignResponse);
+
+    return s3SignResponse;
+  }
+
+  private Collection<String> getAllowedLocations(TableIdentifier 
tableIdentifier) {
+
+    if (baseCatalog.tableExists(tableIdentifier)) {
+
+      // If the table exists, get allowed locations from the table metadata
+      Table table = baseCatalog.loadTable(tableIdentifier);
+      if (table instanceof BaseTable baseTable) {
+        return 
StorageUtil.getLocationsUsedByTable(baseTable.operations().current());
+      }
+
+      throw new ForbiddenException("No storage configuration found for 
table.");
+
+    } else {
+
+      // If the table or view doesn't exist, the engine might be writing the 
manifests before the
+      // table creation is committed. In this case, we still need to check 
allowed locations from
+      // the parent entities.

Review Comment:
   is this the case of staged table ? i wondering if we should block on this 
case ? becasue it may happen the table is different from namespace location ? 



##########
runtime/service/src/main/java/org/apache/polaris/service/storage/s3/sign/S3RemoteSigningCatalogHandler.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.storage.s3.sign;
+
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ForbiddenException;
+import org.apache.polaris.core.PolarisDiagnostics;
+import org.apache.polaris.core.auth.PolarisAuthorizableOperation;
+import org.apache.polaris.core.auth.PolarisAuthorizer;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.config.FeatureConfiguration;
+import org.apache.polaris.core.config.RealmConfig;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.CatalogEntity;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory;
+import org.apache.polaris.core.storage.InMemoryStorageIntegration;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
+import org.apache.polaris.core.storage.StorageUtil;
+import org.apache.polaris.service.catalog.common.CatalogHandler;
+import org.apache.polaris.service.catalog.common.CatalogUtils;
+import org.apache.polaris.service.catalog.io.FileIOUtil;
+import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
+import org.apache.polaris.service.s3.sign.model.PolarisS3SignRequest;
+import org.apache.polaris.service.s3.sign.model.PolarisS3SignResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class S3RemoteSigningCatalogHandler extends CatalogHandler implements 
AutoCloseable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(S3RemoteSigningCatalogHandler.class);
+
+  private final CallContextCatalogFactory catalogFactory;
+  private final S3RequestSigner s3RequestSigner;
+
+  private CatalogEntity catalogEntity;
+  private Catalog baseCatalog;
+
+  public S3RemoteSigningCatalogHandler(
+      PolarisDiagnostics diagnostics,
+      CallContext callContext,
+      ResolutionManifestFactory resolutionManifestFactory,
+      CallContextCatalogFactory catalogFactory,
+      PolarisPrincipal polarisPrincipal,
+      String catalogName,
+      PolarisAuthorizer authorizer,
+      S3RequestSigner s3RequestSigner) {
+    super(
+        diagnostics,
+        callContext,
+        resolutionManifestFactory,
+        polarisPrincipal,
+        catalogName,
+        authorizer,
+        // external catalogs are not supported for S3 remote signing
+        null,
+        null);
+    this.catalogFactory = catalogFactory;
+    this.s3RequestSigner = s3RequestSigner;
+  }
+
+  @Override
+  protected void initializeCatalog() {
+    catalogEntity =
+        
CatalogEntity.of(resolutionManifest.getResolvedReferenceCatalogEntity().getRawLeafEntity());
+    if (catalogEntity.isExternal()) {
+      throw new ForbiddenException("Cannot use S3 remote signing with 
federated catalogs.");
+    }
+    baseCatalog =
+        catalogFactory.createCallContextCatalog(callContext, polarisPrincipal, 
resolutionManifest);
+  }
+
+  public PolarisS3SignResponse signS3Request(
+      PolarisS3SignRequest s3SignRequest, TableIdentifier tableIdentifier) {
+
+    LOGGER.debug("Requesting s3 signing for {}: {}", tableIdentifier, 
s3SignRequest);
+
+    PolarisAuthorizableOperation authzOp =
+        s3SignRequest.write()
+            ? PolarisAuthorizableOperation.SIGN_S3_WRITE_REQUEST
+            : PolarisAuthorizableOperation.SIGN_S3_READ_REQUEST;
+
+    authorizeRemoteSigningOrThrow(EnumSet.of(authzOp), tableIdentifier);
+
+    // Must be done after the authorization check, as the auth check creates 
the catalog entity
+    throwIfRemoteSigningNotEnabled(callContext.getRealmConfig(), 
catalogEntity);
+
+    var result =
+        InMemoryStorageIntegration.validateAllowedLocations(
+            callContext.getRealmConfig(),
+            getAllowedLocations(tableIdentifier),
+            getStorageActions(s3SignRequest),
+            getTargetLocations(s3SignRequest));
+
+    if (result.values().stream().anyMatch(r -> r.values().stream().anyMatch(v 
-> !v.isSuccess()))) {
+      throw new ForbiddenException("Requested S3 location is not allowed.");
+    }
+
+    PolarisS3SignResponse s3SignResponse = 
s3RequestSigner.signRequest(s3SignRequest);
+    LOGGER.debug("S3 signing response: {}", s3SignResponse);
+
+    return s3SignResponse;
+  }
+
+  private Collection<String> getAllowedLocations(TableIdentifier 
tableIdentifier) {
+
+    if (baseCatalog.tableExists(tableIdentifier)) {
+
+      // If the table exists, get allowed locations from the table metadata
+      Table table = baseCatalog.loadTable(tableIdentifier);
+      if (table instanceof BaseTable baseTable) {
+        return 
StorageUtil.getLocationsUsedByTable(baseTable.operations().current());
+      }
+
+      throw new ForbiddenException("No storage configuration found for 
table.");
+
+    } else {
+
+      // If the table or view doesn't exist, the engine might be writing the 
manifests before the
+      // table creation is committed. In this case, we still need to check 
allowed locations from
+      // the parent entities.
+
+      PolarisResolvedPathWrapper resolvedPath =
+          CatalogUtils.findResolvedStorageEntity(resolutionManifest, 
tableIdentifier);
+
+      Optional<PolarisEntity> storageInfo = 
FileIOUtil.findStorageInfoFromHierarchy(resolvedPath);
+
+      var configurationInfo =
+          storageInfo
+              .map(PolarisEntity::getInternalPropertiesAsMap)
+              .map(info -> 
info.get(PolarisEntityConstants.getStorageConfigInfoPropertyName()))
+              .map(PolarisStorageConfigurationInfo::deserialize);
+
+      if (configurationInfo.isEmpty()) {
+        throw new ForbiddenException("No storage configuration found for 
table.");
+      }
+
+      return configurationInfo.get().getAllowedLocations();
+    }
+  }
+
+  private Set<PolarisStorageActions> getStorageActions(PolarisS3SignRequest 
s3SignRequest) {
+    // TODO M2: better handling of DELETE and LIST
+    return s3SignRequest.write()
+        ? Set.of(PolarisStorageActions.WRITE)
+        : Set.of(PolarisStorageActions.READ);
+  }
+
+  private Set<String> getTargetLocations(PolarisS3SignRequest s3SignRequest) {
+    // TODO M2: map http URI to s3 URI
+    return Set.of();
+  }
+
+  public static void throwIfRemoteSigningNotEnabled(
+      RealmConfig realmConfig, CatalogEntity catalogEntity) {
+    if (catalogEntity.isExternal()) {
+      throw new ForbiddenException("Remote signing is not enabled for external 
catalogs.");
+    }

Review Comment:
   I understand we discussed this before for federated catalog we should 
disable remote signing but during that time we didn;t support cred vending for 
them either, i wonder if its  a good time to reconsider that remote signing 
would be possible too (ofc in a new PR) 



##########
runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -844,6 +869,37 @@ ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING, 
getResolvedCatalogEntity())) {
     return responseBuilder;
   }
 
+  private AccessDelegationMode selectAccessDelegationMode(

Review Comment:
   would it be helpful to add a comment here and link irc spec here to pick the 
delegation mode they prefer ?



##########
runtime/service/src/main/java/org/apache/polaris/service/storage/s3/sign/S3RemoteSigningCatalogHandler.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.storage.s3.sign;
+
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ForbiddenException;
+import org.apache.polaris.core.PolarisDiagnostics;
+import org.apache.polaris.core.auth.PolarisAuthorizableOperation;
+import org.apache.polaris.core.auth.PolarisAuthorizer;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.config.FeatureConfiguration;
+import org.apache.polaris.core.config.RealmConfig;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.CatalogEntity;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory;
+import org.apache.polaris.core.storage.InMemoryStorageIntegration;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
+import org.apache.polaris.core.storage.StorageUtil;
+import org.apache.polaris.service.catalog.common.CatalogHandler;
+import org.apache.polaris.service.catalog.common.CatalogUtils;
+import org.apache.polaris.service.catalog.io.FileIOUtil;
+import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
+import org.apache.polaris.service.s3.sign.model.PolarisS3SignRequest;
+import org.apache.polaris.service.s3.sign.model.PolarisS3SignResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class S3RemoteSigningCatalogHandler extends CatalogHandler implements 
AutoCloseable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(S3RemoteSigningCatalogHandler.class);
+
+  private final CallContextCatalogFactory catalogFactory;
+  private final S3RequestSigner s3RequestSigner;
+
+  private CatalogEntity catalogEntity;
+  private Catalog baseCatalog;
+
+  public S3RemoteSigningCatalogHandler(
+      PolarisDiagnostics diagnostics,
+      CallContext callContext,
+      ResolutionManifestFactory resolutionManifestFactory,
+      CallContextCatalogFactory catalogFactory,
+      PolarisPrincipal polarisPrincipal,
+      String catalogName,
+      PolarisAuthorizer authorizer,
+      S3RequestSigner s3RequestSigner) {
+    super(
+        diagnostics,
+        callContext,
+        resolutionManifestFactory,
+        polarisPrincipal,
+        catalogName,
+        authorizer,
+        // external catalogs are not supported for S3 remote signing
+        null,
+        null);
+    this.catalogFactory = catalogFactory;
+    this.s3RequestSigner = s3RequestSigner;
+  }
+
+  @Override
+  protected void initializeCatalog() {
+    catalogEntity =
+        
CatalogEntity.of(resolutionManifest.getResolvedReferenceCatalogEntity().getRawLeafEntity());
+    if (catalogEntity.isExternal()) {
+      throw new ForbiddenException("Cannot use S3 remote signing with 
federated catalogs.");
+    }
+    baseCatalog =
+        catalogFactory.createCallContextCatalog(callContext, polarisPrincipal, 
resolutionManifest);
+  }
+
+  public PolarisS3SignResponse signS3Request(
+      PolarisS3SignRequest s3SignRequest, TableIdentifier tableIdentifier) {
+
+    LOGGER.debug("Requesting s3 signing for {}: {}", tableIdentifier, 
s3SignRequest);
+
+    PolarisAuthorizableOperation authzOp =
+        s3SignRequest.write()
+            ? PolarisAuthorizableOperation.SIGN_S3_WRITE_REQUEST
+            : PolarisAuthorizableOperation.SIGN_S3_READ_REQUEST;
+
+    authorizeRemoteSigningOrThrow(EnumSet.of(authzOp), tableIdentifier);
+
+    // Must be done after the authorization check, as the auth check creates 
the catalog entity
+    throwIfRemoteSigningNotEnabled(callContext.getRealmConfig(), 
catalogEntity);
+
+    var result =
+        InMemoryStorageIntegration.validateAllowedLocations(
+            callContext.getRealmConfig(),
+            getAllowedLocations(tableIdentifier),
+            getStorageActions(s3SignRequest),
+            getTargetLocations(s3SignRequest));
+
+    if (result.values().stream().anyMatch(r -> r.values().stream().anyMatch(v 
-> !v.isSuccess()))) {
+      throw new ForbiddenException("Requested S3 location is not allowed.");
+    }
+
+    PolarisS3SignResponse s3SignResponse = 
s3RequestSigner.signRequest(s3SignRequest);
+    LOGGER.debug("S3 signing response: {}", s3SignResponse);
+
+    return s3SignResponse;
+  }
+
+  private Collection<String> getAllowedLocations(TableIdentifier 
tableIdentifier) {
+
+    if (baseCatalog.tableExists(tableIdentifier)) {
+
+      // If the table exists, get allowed locations from the table metadata
+      Table table = baseCatalog.loadTable(tableIdentifier);

Review Comment:
   for each SIGN request a loadTable call is super expensive, if someone 
enables this in prod by accident for a table million of files the service might 
become unresponsive ? 
   [suggestion / optional to address] wondering for now if we should add a prod 
readiness check to keep this disabled for prod ?



##########
runtime/service/src/main/java/org/apache/polaris/service/storage/s3/sign/S3RemoteSigningCatalogHandler.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.storage.s3.sign;
+
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ForbiddenException;
+import org.apache.polaris.core.PolarisDiagnostics;
+import org.apache.polaris.core.auth.PolarisAuthorizableOperation;
+import org.apache.polaris.core.auth.PolarisAuthorizer;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.config.FeatureConfiguration;
+import org.apache.polaris.core.config.RealmConfig;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.CatalogEntity;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory;
+import org.apache.polaris.core.storage.InMemoryStorageIntegration;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
+import org.apache.polaris.core.storage.StorageUtil;
+import org.apache.polaris.service.catalog.common.CatalogHandler;
+import org.apache.polaris.service.catalog.common.CatalogUtils;
+import org.apache.polaris.service.catalog.io.FileIOUtil;
+import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
+import org.apache.polaris.service.s3.sign.model.PolarisS3SignRequest;
+import org.apache.polaris.service.s3.sign.model.PolarisS3SignResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class S3RemoteSigningCatalogHandler extends CatalogHandler implements 
AutoCloseable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(S3RemoteSigningCatalogHandler.class);
+
+  private final CallContextCatalogFactory catalogFactory;
+  private final S3RequestSigner s3RequestSigner;
+
+  private CatalogEntity catalogEntity;
+  private Catalog baseCatalog;
+
+  public S3RemoteSigningCatalogHandler(
+      PolarisDiagnostics diagnostics,
+      CallContext callContext,
+      ResolutionManifestFactory resolutionManifestFactory,
+      CallContextCatalogFactory catalogFactory,
+      PolarisPrincipal polarisPrincipal,
+      String catalogName,
+      PolarisAuthorizer authorizer,
+      S3RequestSigner s3RequestSigner) {
+    super(
+        diagnostics,
+        callContext,
+        resolutionManifestFactory,
+        polarisPrincipal,
+        catalogName,
+        authorizer,
+        // external catalogs are not supported for S3 remote signing
+        null,
+        null);
+    this.catalogFactory = catalogFactory;
+    this.s3RequestSigner = s3RequestSigner;
+  }
+
+  @Override
+  protected void initializeCatalog() {
+    catalogEntity =
+        
CatalogEntity.of(resolutionManifest.getResolvedReferenceCatalogEntity().getRawLeafEntity());
+    if (catalogEntity.isExternal()) {
+      throw new ForbiddenException("Cannot use S3 remote signing with 
federated catalogs.");
+    }
+    baseCatalog =
+        catalogFactory.createCallContextCatalog(callContext, polarisPrincipal, 
resolutionManifest);
+  }
+
+  public PolarisS3SignResponse signS3Request(
+      PolarisS3SignRequest s3SignRequest, TableIdentifier tableIdentifier) {
+
+    LOGGER.debug("Requesting s3 signing for {}: {}", tableIdentifier, 
s3SignRequest);

Review Comment:
   do we need to explicity log it here or quarkus supports Access log, asking 
because i don't see such logging in Iceberg Catalog Handler



##########
runtime/service/src/main/java/org/apache/polaris/service/storage/s3/sign/S3RemoteSigningCatalogHandler.java:
##########
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.storage.s3.sign;
+
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.Set;
+import org.apache.iceberg.BaseTable;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ForbiddenException;
+import org.apache.polaris.core.PolarisDiagnostics;
+import org.apache.polaris.core.auth.PolarisAuthorizableOperation;
+import org.apache.polaris.core.auth.PolarisAuthorizer;
+import org.apache.polaris.core.auth.PolarisPrincipal;
+import org.apache.polaris.core.config.FeatureConfiguration;
+import org.apache.polaris.core.config.RealmConfig;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.CatalogEntity;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.persistence.resolver.ResolutionManifestFactory;
+import org.apache.polaris.core.storage.InMemoryStorageIntegration;
+import org.apache.polaris.core.storage.PolarisStorageActions;
+import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
+import org.apache.polaris.core.storage.StorageUtil;
+import org.apache.polaris.service.catalog.common.CatalogHandler;
+import org.apache.polaris.service.catalog.common.CatalogUtils;
+import org.apache.polaris.service.catalog.io.FileIOUtil;
+import org.apache.polaris.service.context.catalog.CallContextCatalogFactory;
+import org.apache.polaris.service.s3.sign.model.PolarisS3SignRequest;
+import org.apache.polaris.service.s3.sign.model.PolarisS3SignResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class S3RemoteSigningCatalogHandler extends CatalogHandler implements 
AutoCloseable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(S3RemoteSigningCatalogHandler.class);
+
+  private final CallContextCatalogFactory catalogFactory;
+  private final S3RequestSigner s3RequestSigner;
+
+  private CatalogEntity catalogEntity;
+  private Catalog baseCatalog;
+
+  public S3RemoteSigningCatalogHandler(
+      PolarisDiagnostics diagnostics,
+      CallContext callContext,
+      ResolutionManifestFactory resolutionManifestFactory,
+      CallContextCatalogFactory catalogFactory,
+      PolarisPrincipal polarisPrincipal,
+      String catalogName,
+      PolarisAuthorizer authorizer,
+      S3RequestSigner s3RequestSigner) {
+    super(
+        diagnostics,
+        callContext,
+        resolutionManifestFactory,
+        polarisPrincipal,
+        catalogName,
+        authorizer,
+        // external catalogs are not supported for S3 remote signing
+        null,
+        null);
+    this.catalogFactory = catalogFactory;
+    this.s3RequestSigner = s3RequestSigner;
+  }
+
+  @Override
+  protected void initializeCatalog() {
+    catalogEntity =
+        
CatalogEntity.of(resolutionManifest.getResolvedReferenceCatalogEntity().getRawLeafEntity());
+    if (catalogEntity.isExternal()) {
+      throw new ForbiddenException("Cannot use S3 remote signing with 
federated catalogs.");
+    }
+    baseCatalog =
+        catalogFactory.createCallContextCatalog(callContext, polarisPrincipal, 
resolutionManifest);
+  }
+
+  public PolarisS3SignResponse signS3Request(
+      PolarisS3SignRequest s3SignRequest, TableIdentifier tableIdentifier) {
+
+    LOGGER.debug("Requesting s3 signing for {}: {}", tableIdentifier, 
s3SignRequest);
+
+    PolarisAuthorizableOperation authzOp =
+        s3SignRequest.write()
+            ? PolarisAuthorizableOperation.SIGN_S3_WRITE_REQUEST
+            : PolarisAuthorizableOperation.SIGN_S3_READ_REQUEST;
+
+    authorizeRemoteSigningOrThrow(EnumSet.of(authzOp), tableIdentifier);
+
+    // Must be done after the authorization check, as the auth check creates 
the catalog entity
+    throwIfRemoteSigningNotEnabled(callContext.getRealmConfig(), 
catalogEntity);
+
+    var result =
+        InMemoryStorageIntegration.validateAllowedLocations(
+            callContext.getRealmConfig(),
+            getAllowedLocations(tableIdentifier),
+            getStorageActions(s3SignRequest),
+            getTargetLocations(s3SignRequest));
+
+    if (result.values().stream().anyMatch(r -> r.values().stream().anyMatch(v 
-> !v.isSuccess()))) {
+      throw new ForbiddenException("Requested S3 location is not allowed.");
+    }
+
+    PolarisS3SignResponse s3SignResponse = 
s3RequestSigner.signRequest(s3SignRequest);
+    LOGGER.debug("S3 signing response: {}", s3SignResponse);
+
+    return s3SignResponse;
+  }
+
+  private Collection<String> getAllowedLocations(TableIdentifier 
tableIdentifier) {
+
+    if (baseCatalog.tableExists(tableIdentifier)) {
+
+      // If the table exists, get allowed locations from the table metadata
+      Table table = baseCatalog.loadTable(tableIdentifier);
+      if (table instanceof BaseTable baseTable) {
+        return 
StorageUtil.getLocationsUsedByTable(baseTable.operations().current());
+      }
+
+      throw new ForbiddenException("No storage configuration found for 
table.");
+
+    } else {
+
+      // If the table or view doesn't exist, the engine might be writing the 
manifests before the
+      // table creation is committed. In this case, we still need to check 
allowed locations from
+      // the parent entities.
+
+      PolarisResolvedPathWrapper resolvedPath =
+          CatalogUtils.findResolvedStorageEntity(resolutionManifest, 
tableIdentifier);
+
+      Optional<PolarisEntity> storageInfo = 
FileIOUtil.findStorageInfoFromHierarchy(resolvedPath);
+
+      var configurationInfo =
+          storageInfo
+              .map(PolarisEntity::getInternalPropertiesAsMap)
+              .map(info -> 
info.get(PolarisEntityConstants.getStorageConfigInfoPropertyName()))
+              .map(PolarisStorageConfigurationInfo::deserialize);
+
+      if (configurationInfo.isEmpty()) {
+        throw new ForbiddenException("No storage configuration found for 
table.");
+      }
+
+      return configurationInfo.get().getAllowedLocations();
+    }
+  }
+
+  private Set<PolarisStorageActions> getStorageActions(PolarisS3SignRequest 
s3SignRequest) {
+    // TODO M2: better handling of DELETE and LIST

Review Comment:
   [for my understanding] can you please elaborate this case more ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to