This is an automated email from the ASF dual-hosted git repository.

yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 4ed1d41bc Service: Always validate allowed locations from Storage 
Config (#2473)
4ed1d41bc is described below

commit 4ed1d41bce1c2789c98b59b9ea3a25830e11b479
Author: Yufei Gu <[email protected]>
AuthorDate: Sat Aug 30 17:13:56 2025 -0700

    Service: Always validate allowed locations from Storage Config (#2473)
---
 .../PolarisRestCatalogViewIntegrationBase.java     |  46 +----
 .../core/storage/InMemoryStorageIntegration.java   |  16 +-
 .../polaris/core/storage/LocationRestrictions.java | 123 ++++++++++++++
 .../storage/PolarisStorageConfigurationInfo.java   |  17 +-
 .../core/storage/StorageConfigurationOverride.java |  68 --------
 .../apache/polaris/core/storage/StorageUtil.java   |   8 +-
 .../polaris/core/storage/StorageUtilTest.java      |  21 ++-
 .../service/catalog/iceberg/IcebergCatalog.java    |  76 +++------
 .../iceberg/IcebergAllowedLocationTest.java        | 189 +++++++++++++++++++++
 9 files changed, 367 insertions(+), 197 deletions(-)

diff --git 
a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogViewIntegrationBase.java
 
b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogViewIntegrationBase.java
index 8cf61a5c4..ccb9bc18d 100644
--- 
a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogViewIntegrationBase.java
+++ 
b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogViewIntegrationBase.java
@@ -20,7 +20,6 @@ package org.apache.polaris.service.it.test;
 
 import static org.apache.polaris.service.it.env.PolarisClient.polarisClient;
 
-import com.google.common.collect.ImmutableMap;
 import java.lang.reflect.Method;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -88,6 +87,7 @@ public abstract class PolarisRestCatalogViewIntegrationBase 
extends ViewCatalogT
   private static ManagementApi managementApi;
 
   private RESTCatalog restCatalog;
+  private StorageConfigInfo storageConfig;
 
   @BeforeAll
   static void setup(PolarisApiEndpoints apiEndpoints, ClientCredentials 
credentials) {
@@ -114,7 +114,7 @@ public abstract class PolarisRestCatalogViewIntegrationBase 
extends ViewCatalogT
     Method method = testInfo.getTestMethod().orElseThrow();
     String catalogName = client.newEntityName(method.getName());
 
-    StorageConfigInfo storageConfig = getStorageConfigInfo();
+    storageConfig = getStorageConfigInfo();
     String defaultBaseLocation =
         storageConfig.getAllowedLocations().getFirst()
             + "/"
@@ -201,16 +201,10 @@ public abstract class 
PolarisRestCatalogViewIntegrationBase extends ViewCatalogT
     TableIdentifier identifier = TableIdentifier.of("ns", "view");
 
     String location = Paths.get(tempDir.toUri().toString()).toString();
-    String customLocation = Paths.get(tempDir.toUri().toString(), 
"custom-location").toString();
-    String customLocation2 = Paths.get(tempDir.toUri().toString(), 
"custom-location2").toString();
-    String customLocationChild =
-        Paths.get(tempDir.toUri().toString(), 
"custom-location/child").toString();
+    String customLocation =
+        Paths.get(storageConfig.getAllowedLocations().getFirst(), 
"/custom-location1").toString();
 
-    catalog()
-        .createNamespace(
-            identifier.namespace(),
-            ImmutableMap.of(
-                
IcebergTableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY, location));
+    catalog().createNamespace(identifier.namespace());
 
     Assertions.assertThat(catalog().viewExists(identifier)).as("View should 
not exist").isFalse();
 
@@ -234,35 +228,5 @@ public abstract class 
PolarisRestCatalogViewIntegrationBase extends ViewCatalogT
     Assertions.assertThat(((BaseView) 
view).operations().current().metadataFileLocation())
         .isNotNull()
         .startsWith(customLocation);
-
-    // CANNOT update the view with a new metadata location 
`baseLocation/customLocation2`,
-    // even though the new location is still under the parent namespace's
-    // `write.metadata.path=baseLocation`.
-    Assertions.assertThatThrownBy(
-            () ->
-                catalog()
-                    .loadView(identifier)
-                    .updateProperties()
-                    .set(
-                        
IcebergTableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY,
-                        customLocation2)
-                    .commit())
-        .isInstanceOf(ForbiddenException.class)
-        .hasMessageContaining("Forbidden: Invalid locations");
-
-    // CANNOT update the view with a child metadata location 
`baseLocation/customLocation/child`,
-    // even though it is a subpath of the original view's
-    // `write.metadata.path=baseLocation/customLocation`.
-    Assertions.assertThatThrownBy(
-            () ->
-                catalog()
-                    .loadView(identifier)
-                    .updateProperties()
-                    .set(
-                        
IcebergTableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY,
-                        customLocationChild)
-                    .commit())
-        .isInstanceOf(ForbiddenException.class)
-        .hasMessageContaining("Forbidden: Invalid locations");
   }
 }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/InMemoryStorageIntegration.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/InMemoryStorageIntegration.java
index 7e719a91d..ea4ca1c59 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/InMemoryStorageIntegration.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/InMemoryStorageIntegration.java
@@ -53,16 +53,15 @@ public abstract class InMemoryStorageIntegration<T extends 
PolarisStorageConfigu
    *     implementation, all actions have the same validation result, as we 
only verify the
    *     locations are equal to or subdirectories of the allowed locations.
    */
-  public static Map<String, Map<PolarisStorageActions, ValidationResult>>
-      validateSubpathsOfAllowedLocations(
-          @Nonnull RealmConfig realmConfig,
-          @Nonnull PolarisStorageConfigurationInfo storageConfig,
-          @Nonnull Set<PolarisStorageActions> actions,
-          @Nonnull Set<String> locations) {
+  public static Map<String, Map<PolarisStorageActions, ValidationResult>> 
validateAllowedLocations(
+      @Nonnull RealmConfig realmConfig,
+      @Nonnull List<String> allowedLocationsToValid,
+      @Nonnull Set<PolarisStorageActions> actions,
+      @Nonnull Set<String> locations) {
     // trim trailing / from allowed locations so that locations missing the 
trailing slash still
     // match
     Set<String> allowedLocationStrings =
-        storageConfig.getAllowedLocations().stream()
+        allowedLocationsToValid.stream()
             .map(
                 str -> {
                   if (str.endsWith("/") && str.length() > 1) {
@@ -123,6 +122,7 @@ public abstract class InMemoryStorageIntegration<T extends 
PolarisStorageConfigu
       @Nonnull T storageConfig,
       @Nonnull Set<PolarisStorageActions> actions,
       @Nonnull Set<String> locations) {
-    return validateSubpathsOfAllowedLocations(realmConfig, storageConfig, 
actions, locations);
+    return validateAllowedLocations(
+        realmConfig, storageConfig.getAllowedLocations(), actions, locations);
   }
 }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/LocationRestrictions.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/LocationRestrictions.java
new file mode 100644
index 000000000..90ccf50d6
--- /dev/null
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/LocationRestrictions.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.storage;
+
+import jakarta.annotation.Nonnull;
+import java.util.List;
+import java.util.Set;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.ForbiddenException;
+import org.apache.polaris.core.config.RealmConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Defines storage location access restrictions for Polaris entities within a 
specific context. */
+public class LocationRestrictions {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LocationRestrictions.class);
+
+  /**
+   * The complete set of storage locations that are permitted for access.
+   *
+   * <p>This list contains all storage URIs that entities can read from or 
write to, including both
+   * catalog-level allowed locations and any additional user-specified 
locations when unstructured
+   * table access is enabled.
+   *
+   * <p>All locations in this list have been validated to conform to the 
storage type's URI scheme
+   * requirements during construction.
+   */
+  private final List<String> allowedLocations;
+
+  /**
+   * The parent location for structured table enforcement.
+   *
+   * <p>When non-null, this location represents the root under which all new 
tables must be created,
+   * enforcing a structured hierarchy in addition to residing under {@code 
allowedLocations}. When
+   * null, table creation is allowed anywhere within the {@code 
allowedLocations}.
+   */
+  private final String parentLocation;
+
+  public LocationRestrictions(
+      @Nonnull PolarisStorageConfigurationInfo storageConfigurationInfo, 
String parentLocation) {
+    this.allowedLocations = storageConfigurationInfo.getAllowedLocations();
+    
allowedLocations.forEach(storageConfigurationInfo::validatePrefixForStorageType);
+    this.parentLocation = parentLocation;
+  }
+
+  public LocationRestrictions(@Nonnull PolarisStorageConfigurationInfo 
storageConfigurationInfo) {
+    this(storageConfigurationInfo, null);
+  }
+
+  /**
+   * Validates that the requested storage locations are permitted for the 
given table identifier.
+   *
+   * <p>This method performs location validation by checking the requested 
locations against:
+   *
+   * <ul>
+   *   <li>The parent location (if configured) for structured table enforcement
+   *   <li>The allowed locations list for general access permissions
+   * </ul>
+   *
+   * <p>The validation ensures that all requested locations conform to the 
storage access
+   * restrictions defined for this context. If a parent location is 
configured, all requests must be
+   * under that location hierarchy in addition to being within the allowed 
locations.
+   *
+   * @param realmConfig the realm configuration containing storage validation 
rules
+   * @param identifier the table identifier for which locations are being 
validated
+   * @param requestLocations the set of storage locations that need validation
+   * @throws ForbiddenException if any of the requested locations violate the 
configured
+   *     restrictions
+   */
+  public void validate(
+      RealmConfig realmConfig, TableIdentifier identifier, Set<String> 
requestLocations) {
+    if (parentLocation != null) {
+      validateLocations(realmConfig, List.of(parentLocation), 
requestLocations, identifier);
+    }
+
+    validateLocations(realmConfig, allowedLocations, requestLocations, 
identifier);
+  }
+
+  private void validateLocations(
+      RealmConfig realmConfig,
+      List<String> allowedLocations,
+      Set<String> requestLocations,
+      TableIdentifier identifier) {
+    var validationResults =
+        InMemoryStorageIntegration.validateAllowedLocations(
+            realmConfig, allowedLocations, Set.of(PolarisStorageActions.ALL), 
requestLocations);
+    validationResults
+        .values()
+        .forEach(
+            actionResult ->
+                actionResult
+                    .values()
+                    .forEach(
+                        result -> {
+                          if (!result.isSuccess()) {
+                            throw new ForbiddenException(
+                                "Invalid locations '%s' for identifier '%s': 
%s",
+                                requestLocations, identifier, 
result.getMessage());
+                          } else {
+                            LOGGER.debug(
+                                "Validated locations '{}' for identifier '{}'",
+                                requestLocations,
+                                identifier);
+                          }
+                        }));
+  }
+}
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java
index e3947a20b..d94879e28 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java
@@ -26,14 +26,12 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
 import jakarta.annotation.Nonnull;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Optional;
-import java.util.Set;
 import org.apache.polaris.core.admin.model.Catalog;
 import org.apache.polaris.core.config.FeatureConfiguration;
 import org.apache.polaris.core.config.RealmConfig;
@@ -116,7 +114,7 @@ public abstract class PolarisStorageConfigurationInfo {
     }
   }
 
-  public static Optional<PolarisStorageConfigurationInfo> forEntityPath(
+  public static Optional<LocationRestrictions> forEntityPath(
       RealmConfig realmConfig, List<PolarisEntity> entityPath) {
     return findStorageInfoFromHierarchy(entityPath)
         .map(
@@ -150,22 +148,13 @@ public abstract class PolarisStorageConfigurationInfo {
                 LOGGER.debug(
                     "Not allowing unstructured table location for entity: {}",
                     entityPathReversed.get(0).getName());
-                return new StorageConfigurationOverride(configInfo, 
List.of(baseLocation));
+                return new LocationRestrictions(configInfo, baseLocation);
               } else {
                 LOGGER.debug(
                     "Allowing unstructured table location for entity: {}",
                     entityPathReversed.get(0).getName());
 
-                // TODO: figure out the purpose of adding 
`userSpecifiedWriteLocations`
-                Set<String> locations =
-                    StorageUtil.getLocationsAllowedToBeAccessed(
-                        null, entityPathReversed.get(0).getPropertiesAsMap());
-                return new StorageConfigurationOverride(
-                    configInfo,
-                    ImmutableList.<String>builder()
-                        .addAll(configInfo.getAllowedLocations())
-                        .addAll(locations)
-                        .build());
+                return new LocationRestrictions(configInfo);
               }
             });
   }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageConfigurationOverride.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageConfigurationOverride.java
deleted file mode 100644
index b38ebf2fc..000000000
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageConfigurationOverride.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.polaris.core.storage;
-
-import jakarta.annotation.Nonnull;
-import java.util.List;
-
-/**
- * Allows overriding the allowed locations for specific entities. Only the 
allowedLocations
- * specified in the constructor are allowed. allowedLocations are not 
inherited from the parent
- * storage configuration. All other storage configuration is inherited from 
the parent configuration
- * and cannot be overridden.
- */
-public class StorageConfigurationOverride extends 
PolarisStorageConfigurationInfo {
-
-  private final PolarisStorageConfigurationInfo parentStorageConfiguration;
-  private final List<String> allowedLocations;
-
-  public StorageConfigurationOverride(
-      @Nonnull PolarisStorageConfigurationInfo parentStorageConfiguration,
-      List<String> allowedLocations) {
-    this.parentStorageConfiguration = parentStorageConfiguration;
-    this.allowedLocations = List.copyOf(allowedLocations);
-    allowedLocations.forEach(this::validatePrefixForStorageType);
-  }
-
-  @Override
-  public List<String> getAllowedLocations() {
-    return allowedLocations;
-  }
-
-  @Override
-  public StorageType getStorageType() {
-    return parentStorageConfiguration.getStorageType();
-  }
-
-  @Override
-  public String getFileIoImplClassName() {
-    return parentStorageConfiguration.getFileIoImplClassName();
-  }
-
-  @Override
-  public boolean validatePrefix() {
-    return false;
-  }
-
-  // delegate to the wrapped class in case they override the parent behavior
-  @Override
-  protected void validatePrefixForStorageType(String loc) {
-    parentStorageConfiguration.validatePrefixForStorageType(loc);
-  }
-}
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageUtil.java 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageUtil.java
index 3a67d1e73..16fdf8024 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageUtil.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageUtil.java
@@ -70,12 +70,12 @@ public class StorageUtil {
   }
 
   /** Given a TableMetadata, extracts the locations where the table's 
[meta]data might be found. */
-  public static @Nonnull Set<String> 
getLocationsAllowedToBeAccessed(TableMetadata tableMetadata) {
-    return getLocationsAllowedToBeAccessed(tableMetadata.location(), 
tableMetadata.properties());
+  public static @Nonnull Set<String> getLocationsUsedByTable(TableMetadata 
tableMetadata) {
+    return getLocationsUsedByTable(tableMetadata.location(), 
tableMetadata.properties());
   }
 
   /** Given a baseLocation and entity (table?) properties, extracts the 
relevant locations */
-  public static @Nonnull Set<String> getLocationsAllowedToBeAccessed(
+  public static @Nonnull Set<String> getLocationsUsedByTable(
       String baseLocation, Map<String, String> properties) {
     Set<String> locations = new HashSet<>();
     locations.add(baseLocation);
@@ -87,7 +87,7 @@ public class StorageUtil {
   }
 
   /** Given a ViewMetadata, extracts the locations where the view's [meta]data 
might be found. */
-  public static @Nonnull Set<String> 
getLocationsAllowedToBeAccessed(ViewMetadata viewMetadata) {
+  public static @Nonnull Set<String> getLocationsUsedByTable(ViewMetadata 
viewMetadata) {
     return Set.of(viewMetadata.location());
   }
 
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/core/storage/StorageUtilTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/core/storage/StorageUtilTest.java
index 593aecc6a..b3b9cfb29 100644
--- 
a/polaris-core/src/test/java/org/apache/polaris/core/storage/StorageUtilTest.java
+++ 
b/polaris-core/src/test/java/org/apache/polaris/core/storage/StorageUtilTest.java
@@ -70,38 +70,37 @@ public class StorageUtilTest {
   }
 
   @Test
-  public void getLocationsAllowedToBeAccessed() {
-    Assertions.assertThat(StorageUtil.getLocationsAllowedToBeAccessed(null, 
Map.of())).isEmpty();
-    Assertions.assertThat(StorageUtil.getLocationsAllowedToBeAccessed("", 
Map.of())).isNotEmpty();
-    Assertions.assertThat(StorageUtil.getLocationsAllowedToBeAccessed("/foo/", 
Map.of()))
-        .contains("/foo/");
+  public void getLocationsUsedByTable() {
+    Assertions.assertThat(StorageUtil.getLocationsUsedByTable(null, 
Map.of())).isEmpty();
+    Assertions.assertThat(StorageUtil.getLocationsUsedByTable("", 
Map.of())).isNotEmpty();
+    Assertions.assertThat(StorageUtil.getLocationsUsedByTable("/foo/", 
Map.of())).contains("/foo/");
     Assertions.assertThat(
-            StorageUtil.getLocationsAllowedToBeAccessed(
+            StorageUtil.getLocationsUsedByTable(
                 "/foo/",
                 
Map.of(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY, "/foo/")))
         .contains("/foo/");
     Assertions.assertThat(
-            StorageUtil.getLocationsAllowedToBeAccessed(
+            StorageUtil.getLocationsUsedByTable(
                 "/foo/",
                 
Map.of(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY, "/bar/")))
         .contains("/foo/", "/bar/");
     Assertions.assertThat(
-            StorageUtil.getLocationsAllowedToBeAccessed(
+            StorageUtil.getLocationsUsedByTable(
                 "/foo/",
                 
Map.of(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY, 
"/foo/bar/")))
         .contains("/foo/");
     Assertions.assertThat(
-            StorageUtil.getLocationsAllowedToBeAccessed(
+            StorageUtil.getLocationsUsedByTable(
                 "/foo/bar/",
                 
Map.of(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY, "/foo/")))
         .contains("/foo/");
     Assertions.assertThat(
-            StorageUtil.getLocationsAllowedToBeAccessed(
+            StorageUtil.getLocationsUsedByTable(
                 "/foo/bar/",
                 
Map.of(IcebergTableLikeEntity.USER_SPECIFIED_WRITE_METADATA_LOCATION_KEY, 
"/foo/")))
         .contains("/foo/");
     Assertions.assertThat(
-            StorageUtil.getLocationsAllowedToBeAccessed(
+            StorageUtil.getLocationsUsedByTable(
                 "/1/",
                 Map.of(
                     
IcebergTableLikeEntity.USER_SPECIFIED_WRITE_DATA_LOCATION_KEY, "/2/",
diff --git 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
index 5da1eeb12..03aa41c80 100644
--- 
a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
+++ 
b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
@@ -119,11 +119,9 @@ import 
org.apache.polaris.core.persistence.resolver.ResolverFactory;
 import org.apache.polaris.core.persistence.resolver.ResolverPath;
 import org.apache.polaris.core.persistence.resolver.ResolverStatus;
 import org.apache.polaris.core.storage.AccessConfig;
-import org.apache.polaris.core.storage.InMemoryStorageIntegration;
 import org.apache.polaris.core.storage.PolarisCredentialVendor;
 import org.apache.polaris.core.storage.PolarisStorageActions;
 import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
-import org.apache.polaris.core.storage.PolarisStorageIntegration;
 import org.apache.polaris.core.storage.StorageLocation;
 import org.apache.polaris.core.storage.StorageUtil;
 import org.apache.polaris.core.storage.cache.StorageCredentialCache;
@@ -845,7 +843,7 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog
         storageCredentialCache,
         getCredentialVendor(),
         tableIdentifier,
-        StorageUtil.getLocationsAllowedToBeAccessed(tableMetadata),
+        StorageUtil.getLocationsUsedByTable(tableMetadata),
         storageActions,
         storageInfo.get(),
         refreshCredentialsEndpoint);
@@ -1006,51 +1004,27 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
       TableIdentifier identifier,
       Set<String> locations,
       PolarisResolvedPathWrapper resolvedStorageEntity) {
-    Optional<PolarisStorageConfigurationInfo> optStorageConfiguration =
-        PolarisStorageConfigurationInfo.forEntityPath(
-            realmConfig, resolvedStorageEntity.getRawFullPath());
-
-    optStorageConfiguration.ifPresentOrElse(
-        storageConfigInfo -> {
-          Map<String, Map<PolarisStorageActions, 
PolarisStorageIntegration.ValidationResult>>
-              validationResults =
-                  
InMemoryStorageIntegration.validateSubpathsOfAllowedLocations(
-                      realmConfig, storageConfigInfo, 
Set.of(PolarisStorageActions.ALL), locations);
-          validationResults
-              .values()
-              .forEach(
-                  actionResult ->
-                      actionResult
-                          .values()
-                          .forEach(
-                              result -> {
-                                if (!result.isSuccess()) {
-                                  throw new ForbiddenException(
-                                      "Invalid locations '%s' for identifier 
'%s': %s",
-                                      locations, identifier, 
result.getMessage());
-                                } else {
-                                  LOGGER.debug(
-                                      "Validated locations '{}' for identifier 
'{}'",
-                                      locations,
-                                      identifier);
-                                }
-                              }));
-        },
-        () -> {
-          List<String> allowedStorageTypes =
-              
realmConfig.getConfig(FeatureConfiguration.SUPPORTED_CATALOG_STORAGE_TYPES);
-          if 
(!allowedStorageTypes.contains(StorageConfigInfo.StorageTypeEnum.FILE.name())) {
-            List<String> invalidLocations =
-                locations.stream()
-                    .filter(location -> location.startsWith("file:") || 
location.startsWith("http"))
-                    .collect(Collectors.toList());
-            if (!invalidLocations.isEmpty()) {
-              throw new ForbiddenException(
-                  "Invalid locations '%s' for identifier '%s': File locations 
are not allowed",
-                  invalidLocations, identifier);
-            }
-          }
-        });
+
+    PolarisStorageConfigurationInfo.forEntityPath(
+            realmConfig, resolvedStorageEntity.getRawFullPath())
+        .ifPresentOrElse(
+            restrictions -> restrictions.validate(realmConfig, identifier, 
locations),
+            () -> {
+              List<String> allowedStorageTypes =
+                  
realmConfig.getConfig(FeatureConfiguration.SUPPORTED_CATALOG_STORAGE_TYPES);
+              if 
(!allowedStorageTypes.contains(StorageConfigInfo.StorageTypeEnum.FILE.name())) {
+                List<String> invalidLocations =
+                    locations.stream()
+                        .filter(
+                            location -> location.startsWith("file:") || 
location.startsWith("http"))
+                        .collect(Collectors.toList());
+                if (!invalidLocations.isEmpty()) {
+                  throw new ForbiddenException(
+                      "Invalid locations '%s' for identifier '%s': File 
locations are not allowed",
+                      invalidLocations, identifier);
+                }
+              }
+            });
   }
 
   /**
@@ -1437,7 +1411,7 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
       tableFileIO =
           loadFileIOForTableLike(
               tableIdentifier,
-              StorageUtil.getLocationsAllowedToBeAccessed(metadata),
+              StorageUtil.getLocationsUsedByTable(metadata),
               resolvedStorageEntity,
               new HashMap<>(metadata.properties()),
               Set.of(
@@ -1460,7 +1434,7 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
         // If location is changing then we must validate that the requested 
location is valid
         // for the storage configuration inherited under this entity's path.
         Set<String> dataLocations =
-            StorageUtil.getLocationsAllowedToBeAccessed(metadata.location(), 
metadata.properties());
+            StorageUtil.getLocationsUsedByTable(metadata.location(), 
metadata.properties());
         validateLocationsForTableLike(tableIdentifier, dataLocations, 
resolvedStorageEntity);
         // also validate that the table location doesn't overlap an existing 
table
         dataLocations.forEach(
@@ -1821,7 +1795,7 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
       viewFileIO =
           loadFileIOForTableLike(
               identifier,
-              StorageUtil.getLocationsAllowedToBeAccessed(metadata),
+              StorageUtil.getLocationsUsedByTable(metadata),
               resolvedStorageEntity,
               tableProperties,
               Set.of(PolarisStorageActions.READ, PolarisStorageActions.WRITE));
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergAllowedLocationTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergAllowedLocationTest.java
new file mode 100644
index 000000000..f24d3dd37
--- /dev/null
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/catalog/iceberg/IcebergAllowedLocationTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.catalog.iceberg;
+
+import static 
org.apache.polaris.core.config.FeatureConfiguration.OPTIMIZED_SIBLING_CHECK;
+import static org.apache.polaris.service.admin.PolarisAuthzTestBase.SCHEMA;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import jakarta.ws.rs.core.Response;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.ForbiddenException;
+import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.core.admin.model.CatalogProperties;
+import org.apache.polaris.core.admin.model.CreateCatalogRequest;
+import org.apache.polaris.core.admin.model.FileStorageConfigInfo;
+import org.apache.polaris.core.admin.model.StorageConfigInfo;
+import org.apache.polaris.service.TestServices;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class IcebergAllowedLocationTest {
+  private static final String namespace = "ns";
+  private static final String catalog = "test-catalog";
+
+  private String getTableName() {
+    return "table_" + UUID.randomUUID();
+  }
+
+  @Test
+  void testCreateTableOutSideOfCatalogAllowedLocations(@TempDir Path tmpDir) {
+    var services = getTestServices();
+
+    var catalogLocation = 
tmpDir.resolve(catalog).toAbsolutePath().toUri().toString();
+    var namespaceLocation = 
tmpDir.resolve(namespace).toAbsolutePath().toUri().toString();
+    assertNotEquals(catalogLocation, namespaceLocation);
+
+    createCatalog(services, Map.of(), catalogLocation, null);
+
+    // create a namespace outside of catalog allowed locations
+    createNamespace(services, namespaceLocation);
+
+    // create a table under the namespace
+    var createTableRequest =
+        
CreateTableRequest.builder().withName(getTableName()).withSchema(SCHEMA).build();
+
+    assertThrows(
+        ForbiddenException.class,
+        () ->
+            services
+                .restApi()
+                .createTable(
+                    catalog,
+                    namespace,
+                    createTableRequest,
+                    null,
+                    services.realmContext(),
+                    services.securityContext()));
+  }
+
+  @Test
+  void testCreateTableInsideOfCatalogAllowedLocations(@TempDir Path tmpDir) {
+    var services = getTestServices();
+
+    var catalogLocation = 
tmpDir.resolve(catalog).toAbsolutePath().toUri().toString();
+    var namespaceLocation = 
tmpDir.resolve(namespace).toAbsolutePath().toUri().toString();
+    assertNotEquals(catalogLocation, namespaceLocation);
+
+    // add the namespace location to the allowed locations of the catalog
+    createCatalog(services, Map.of(), catalogLocation, 
List.of(namespaceLocation));
+
+    createNamespace(services, namespaceLocation);
+
+    // create a table under the namespace
+    var createTableRequest =
+        
CreateTableRequest.builder().withName(getTableName()).withSchema(SCHEMA).build();
+
+    var response =
+        services
+            .restApi()
+            .createTable(
+                catalog,
+                namespace,
+                createTableRequest,
+                null,
+                services.realmContext(),
+                services.securityContext());
+
+    assertEquals(response.getStatus(), Response.Status.OK.getStatusCode());
+  }
+
+  private static TestServices getTestServices() {
+    Map<String, Object> strictServicesWithOptimizedOverlapCheck =
+        Map.of(
+            "ALLOW_TABLE_LOCATION_OVERLAP",
+            "true",
+            "ALLOW_INSECURE_STORAGE_TYPES",
+            "true",
+            "SUPPORTED_CATALOG_STORAGE_TYPES",
+            List.of("FILE"),
+            OPTIMIZED_SIBLING_CHECK.key(),
+            "true");
+    TestServices services =
+        
TestServices.builder().config(strictServicesWithOptimizedOverlapCheck).build();
+    return services;
+  }
+
+  private void createCatalog(
+      TestServices services,
+      Map<String, String> catalogConfig,
+      String catalogLocation,
+      List<String> allowedLocations) {
+    CatalogProperties.Builder propertiesBuilder =
+        CatalogProperties.builder()
+            .setDefaultBaseLocation(String.format("%s/%s", catalogLocation, 
catalog))
+            .putAll(catalogConfig);
+
+    StorageConfigInfo config =
+        FileStorageConfigInfo.builder()
+            .setStorageType(StorageConfigInfo.StorageTypeEnum.FILE)
+            .setAllowedLocations(allowedLocations)
+            .build();
+
+    Catalog catalogObject =
+        new Catalog(
+            Catalog.TypeEnum.INTERNAL,
+            catalog,
+            propertiesBuilder.build(),
+            1725487592064L,
+            1725487592064L,
+            1,
+            config);
+    try (Response response =
+        services
+            .catalogsApi()
+            .createCatalog(
+                new CreateCatalogRequest(catalogObject),
+                services.realmContext(),
+                services.securityContext())) {
+      
assertThat(response.getStatus()).isEqualTo(Response.Status.CREATED.getStatusCode());
+    }
+  }
+
+  private void createNamespace(TestServices services, String location) {
+    Map<String, String> properties = new HashMap<>();
+    properties.put("location", location);
+    CreateNamespaceRequest createNamespaceRequest =
+        CreateNamespaceRequest.builder()
+            .withNamespace(Namespace.of(namespace))
+            .setProperties(properties)
+            .build();
+    try (Response response =
+        services
+            .restApi()
+            .createNamespace(
+                catalog,
+                createNamespaceRequest,
+                services.realmContext(),
+                services.securityContext())) {
+      
assertThat(response.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
+    }
+  }
+}


Reply via email to