This is an automated email from the ASF dual-hosted git repository.

emaynard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new bd8325208 Introduce an option to add object storage prefix to table 
locations (#1966)
bd8325208 is described below

commit bd8325208675c2b6505888cdd12d2c5abaa8dd2a
Author: Eric Maynard <[email protected]>
AuthorDate: Tue Jul 8 13:43:52 2025 -0600

    Introduce an option to add object storage prefix to table locations (#1966)
    
    ### Problem
    
    Currently, Polaris enforces that the physical layout of entities maps to 
the logical layout:
    ```
    catalog
    └── ns1
        ├── ns2
        │   └── table_b
        └── table_a
    ```
    
    In the above example, the base locations of `table_a` and `ns2` are 
expected to be children of `ns1`, and the location of `table_b` is expected to 
be a child of `ns2`.
    
    This behavior is controlled by `ALLOW_UNSTRUCTURED_TABLE_LOCATION` and is 
the basis for the sibling overlap check when `OPTIMIZED_SIBLING_CHECK` is 
disabled or persistence cannot support the optimized check.
    
    However, some users have reported that this physical organization of data 
can lead to undesirable performance characteristics when hotspotting occurs 
across namespaces. If the underlying storage is range partitioned by key, this 
organization will tend to physically collocate logically-similar entities.
    
    ### Solution
    
    To solve this problem, this PR introduces a new option 
`DEFAULT_LOCATION_OBJECT_STORAGE_PREFIX_ENABLED` which alters the behavior of 
the catalog when creating a table without a user-specified location. With the 
feature disabled, a table such as `ns1.table_a` will have a path like this:
    
    ```
    s3://catalog/base/ns1/table_a/
    ```
    
    With the feature enabled, a prefix is added before the namespace:
    ```
    s3://catalog/base/0010/0101/0110/10010100/ns1/table_a/
    ```
    
    This serves to eliminate the physical collocation of tables in the same 
namespace (or with similarly-named namespaces or table names).
    
    This functionality is similar to Iceberg's `write.object-storage.enabled`, 
but it applies across tables and namespaces. The two features can and should be 
combined to achieve the best distribution of data files throughout the key 
space.
    
    ### Configuration & Sibling Overlap Check
    
    If an admin doesn't care about the risk of vending credentials with the 
sibling overlap check disabled, they can enable the feature with these configs:
    ```
    polaris.features.DEFAULT_LOCATION_OBJECT_STORAGE_PREFIX_ENABLED=true
    polaris.features.ALLOW_UNSTRUCTURED_TABLE_LOCATION=true
    polaris.features.ALLOW_TABLE_LOCATION_OVERLAP=true
    polaris.behavior-changes.VALIDATE_VIEW_LOCATION_OVERLAP=false
    ```
    
    In order to use this feature and to preserve the sibling overlap check, you 
can configure the service with:
    ```
    polaris.features.DEFAULT_LOCATION_OBJECT_STORAGE_PREFIX_ENABLED=true
    polaris.features.ALLOW_UNSTRUCTURED_TABLE_LOCATION=true
    polaris.features.OPTIMIZED_SIBLING_CHECK=true
    ```
    
    However, note that the `OPTIMIZED_SIBLING_CHECK` comes with some caveats as 
outlined in its description. Namely, it currently only works with some 
persistence implementations and it requires all location-based entities to have 
a recently-introduced field set. These locations are expected to be suffixed 
with `/`, and locations with many `/` may not be eligible for the optimized 
check.
    
    Older Polaris deployments may not meet these requirements without a 
migration or backfill. Accordingly combining these two features should be 
considered experimental for the time being.
---
 CHANGELOG.md                                       |   3 +
 LICENSE                                            |   1 +
 .../polaris/core/config/FeatureConfiguration.java  |  13 +
 .../TransactionalMetaStoreManagerImpl.java         |   7 +-
 .../transactional/TreeMapMetaStore.java            |   4 +
 .../TreeMapTransactionalPersistenceImpl.java       |  38 ++-
 .../storage/PolarisStorageConfigurationInfo.java   |  11 +-
 .../quarkus/admin/PolarisOverlappingTableTest.java | 291 ++++++++++++++++++---
 .../service/catalog/common/LocationUtils.java      |  84 ++++++
 .../service/catalog/iceberg/IcebergCatalog.java    | 119 ++++++++-
 .../catalog/iceberg/IcebergCatalogHandler.java     |   5 +-
 .../service/catalog/common/LocationUtilsTest.java  |  75 ++++++
 12 files changed, 589 insertions(+), 62 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index e5712f0b5..fd1a5340d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -41,6 +41,9 @@ request adding CHANGELOG notes for breaking (!) changes and 
possibly other secti
 providing authentication parameters to Polaris. When the authentication type 
is set to `IMPLICIT`, 
 the authentication parameters are picked from the environment or configuration 
files. 
 
+- The `DEFAULT_LOCATION_OBJECT_STORAGE_PREFIX_ENABLED` feature was added to 
support placing tables
+at locations that better optimize for object storage.
+
 ### Changes
 
 ### Deprecations
diff --git a/LICENSE b/LICENSE
index be1be2d17..dff385d90 100644
--- a/LICENSE
+++ b/LICENSE
@@ -222,6 +222,7 @@ This product includes code from Apache Iceberg.
 * 
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/CatalogHandlerUtils.java
 * 
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisRESTCatalog.java
 * 
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/SparkCatalog.java
+* 
service/common/src/main/java/org/apache/polaris/service/catalog/common/LocationUtils.java
 
 Copyright: Copyright 2017-2025 The Apache Software Foundation
 Home page: https://iceberg.apache.org
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
index 5ee36d030..59de973be 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java
@@ -339,4 +339,17 @@ public class FeatureConfiguration<T> extends 
PolarisConfiguration<T> {
                   + "to enforce this when new locations are added. Only 
supported by the JDBC metastore.")
           .defaultValue(false)
           .buildFeatureConfiguration();
+
+  public static final FeatureConfiguration<Boolean> 
DEFAULT_LOCATION_OBJECT_STORAGE_PREFIX_ENABLED =
+      PolarisConfiguration.<Boolean>builder()
+          .key("DEFAULT_LOCATION_OBJECT_STORAGE_PREFIX_ENABLED")
+          
.catalogConfig("polaris.config.default-table-location-object-storage-prefix.enabled")
+          .description(
+              "When enabled, Iceberg tables and views created without a 
location specified will have a prefix "
+                  + "applied to the location within the catalog's base 
location, rather than a location directly "
+                  + "inside the parent namespace. Note that this requires 
ALLOW_EXTERNAL_TABLE_LOCATION to be "
+                  + "enabled, but with OPTIMIZED_SIBLING_CHECK enabled "
+                  + "it is still possible to enforce the uniqueness of table 
locations within a catalog.")
+          .defaultValue(false)
+          .buildFeatureConfiguration();
 }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
index d477ce14c..1286b67e7 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TransactionalMetaStoreManagerImpl.java
@@ -2328,7 +2328,12 @@ public class TransactionalMetaStoreManagerImpl extends 
BaseMetaStoreManager {
   public <T extends PolarisEntity & LocationBasedEntity>
       Optional<Optional<String>> hasOverlappingSiblings(
           @Nonnull PolarisCallContext callContext, T entity) {
-    return Optional.empty();
+    TransactionalPersistence ms = ((TransactionalPersistence) 
callContext.getMetaStore());
+    return ms.runInTransaction(
+        callContext,
+        () -> {
+          return 
callContext.getMetaStore().hasOverlappingSiblings(callContext, entity);
+        });
   }
 
   /** {@inheritDoc} */
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapMetaStore.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapMetaStore.java
index 3e8553b2d..7716b1ad1 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapMetaStore.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapMetaStore.java
@@ -81,6 +81,10 @@ public class TreeMapMetaStore {
      */
     public List<T> readRange(String prefix) {
       TreeMapMetaStore.this.ensureReadTr();
+      if (prefix.isEmpty()) {
+        return new ArrayList<>(this.slice.values());
+      }
+
       // end of the key
       String endKey =
           prefix.substring(0, prefix.length() - 1)
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java
index bf0517a1b..12907b08d 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/transactional/TreeMapTransactionalPersistenceImpl.java
@@ -35,8 +35,10 @@ import org.apache.polaris.core.entity.PolarisBaseEntity;
 import org.apache.polaris.core.entity.PolarisChangeTrackingVersions;
 import org.apache.polaris.core.entity.PolarisEntitiesActiveKey;
 import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntityConstants;
 import org.apache.polaris.core.entity.PolarisEntityCore;
 import org.apache.polaris.core.entity.PolarisEntityId;
+import org.apache.polaris.core.entity.PolarisEntitySubType;
 import org.apache.polaris.core.entity.PolarisEntityType;
 import org.apache.polaris.core.entity.PolarisGrantRecord;
 import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
@@ -50,6 +52,7 @@ import org.apache.polaris.core.policy.PolicyEntity;
 import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
 import org.apache.polaris.core.storage.PolarisStorageIntegration;
 import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider;
+import org.apache.polaris.core.storage.StorageLocation;
 
 public class TreeMapTransactionalPersistenceImpl extends 
AbstractTransactionalPersistence {
 
@@ -667,11 +670,44 @@ public class TreeMapTransactionalPersistenceImpl extends 
AbstractTransactionalPe
         .readRange(this.store.buildPrefixKeyComposite(policyTypeCode, 
policyCatalogId, policyId));
   }
 
+  private Optional<String> getEntityLocationWithoutScheme(PolarisBaseEntity 
entity) {
+    if (entity.getType() == PolarisEntityType.TABLE_LIKE) {
+      if (entity.getSubType() == PolarisEntitySubType.ICEBERG_TABLE
+          || entity.getSubType() == PolarisEntitySubType.ICEBERG_VIEW) {
+        return Optional.of(
+            StorageLocation.of(
+                    
entity.getPropertiesAsMap().get(PolarisEntityConstants.ENTITY_BASE_LOCATION))
+                .withoutScheme());
+      }
+    }
+    if (entity.getType() == PolarisEntityType.NAMESPACE) {
+      return Optional.of(
+          StorageLocation.of(
+                  
entity.getPropertiesAsMap().get(PolarisEntityConstants.ENTITY_BASE_LOCATION))
+              .withoutScheme());
+    }
+    return Optional.empty();
+  }
+
   /** {@inheritDoc} */
   @Override
   public <T extends PolarisEntity & LocationBasedEntity>
       Optional<Optional<String>> hasOverlappingSiblings(
           @Nonnull PolarisCallContext callContext, T entity) {
-    return Optional.empty();
+    // TODO we could optimize this full scan
+    StorageLocation entityLocationWithoutScheme =
+        
StorageLocation.of(StorageLocation.of(entity.getBaseLocation()).withoutScheme());
+    List<PolarisBaseEntity> allEntities = 
this.store.getSliceEntities().readRange("");
+    for (PolarisBaseEntity siblingEntity : allEntities) {
+      Optional<StorageLocation> maybeSiblingLocationWithoutScheme =
+          
getEntityLocationWithoutScheme(siblingEntity).map(StorageLocation::of);
+      if (maybeSiblingLocationWithoutScheme.isPresent()) {
+        if 
(maybeSiblingLocationWithoutScheme.get().isChildOf(entityLocationWithoutScheme)
+            || 
entityLocationWithoutScheme.isChildOf(maybeSiblingLocationWithoutScheme.get())) 
{
+          return 
Optional.of(Optional.of(maybeSiblingLocationWithoutScheme.toString()));
+        }
+      }
+    }
+    return Optional.of(Optional.empty());
   }
 }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java
index 4d54599e4..1d391015f 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java
@@ -36,10 +36,10 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.PolarisDiagnostics;
 import org.apache.polaris.core.admin.model.Catalog;
 import org.apache.polaris.core.config.FeatureConfiguration;
-import org.apache.polaris.core.context.CallContext;
 import org.apache.polaris.core.entity.CatalogEntity;
 import org.apache.polaris.core.entity.PolarisEntity;
 import org.apache.polaris.core.entity.PolarisEntityConstants;
@@ -136,12 +136,12 @@ public abstract class PolarisStorageConfigurationInfo {
   }
 
   public static Optional<PolarisStorageConfigurationInfo> forEntityPath(
-      PolarisDiagnostics diagnostics, List<PolarisEntity> entityPath) {
+      PolarisCallContext callContext, List<PolarisEntity> entityPath) {
     return findStorageInfoFromHierarchy(entityPath)
         .map(
             storageInfo ->
                 deserialize(
-                    diagnostics,
+                    callContext.getDiagServices(),
                     storageInfo
                         .getInternalPropertiesAsMap()
                         
.get(PolarisEntityConstants.getStorageConfigInfoPropertyName())))
@@ -162,11 +162,10 @@ public abstract class PolarisStorageConfigurationInfo {
                       .orElse(null);
               CatalogEntity catalog = CatalogEntity.of(entityPath.get(0));
               boolean allowEscape =
-                  CallContext.getCurrentContext()
-                      .getPolarisCallContext()
+                  callContext
                       .getConfigurationStore()
                       .getConfiguration(
-                          CallContext.getCurrentContext().getRealmContext(),
+                          callContext.getRealmContext(),
                           catalog,
                           
FeatureConfiguration.ALLOW_UNSTRUCTURED_TABLE_LOCATION);
               if (!allowEscape
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingTableTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingTableTest.java
index bc93c51da..a658b8e88 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingTableTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingTableTest.java
@@ -20,6 +20,8 @@ package org.apache.polaris.service.quarkus.admin;
 
 import static 
org.apache.polaris.core.config.FeatureConfiguration.ALLOW_TABLE_LOCATION_OVERLAP;
 import static 
org.apache.polaris.core.config.FeatureConfiguration.ALLOW_UNSTRUCTURED_TABLE_LOCATION;
+import static 
org.apache.polaris.core.config.FeatureConfiguration.DEFAULT_LOCATION_OBJECT_STORAGE_PREFIX_ENABLED;
+import static 
org.apache.polaris.core.config.FeatureConfiguration.OPTIMIZED_SIBLING_CHECK;
 import static 
org.apache.polaris.service.quarkus.admin.PolarisAuthzTestBase.SCHEMA;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -33,13 +35,17 @@ import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.exceptions.ForbiddenException;
 import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
 import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
 import org.apache.polaris.core.admin.model.Catalog;
 import org.apache.polaris.core.admin.model.CatalogProperties;
 import org.apache.polaris.core.admin.model.CreateCatalogRequest;
 import org.apache.polaris.core.admin.model.FileStorageConfigInfo;
 import org.apache.polaris.core.admin.model.StorageConfigInfo;
 import org.apache.polaris.service.TestServices;
+import org.apache.polaris.service.catalog.common.LocationUtils;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -50,10 +56,15 @@ public class PolarisOverlappingTableTest {
   private static final String namespace = "ns";
   private static final String catalog = "test-catalog";
 
+  private String getTableName() {
+    return "table_" + UUID.randomUUID();
+  }
+
+  /** Attempt to create a table at a given location, and return the response 
code */
   private int createTable(TestServices services, String location) {
     CreateTableRequest createTableRequest =
         CreateTableRequest.builder()
-            .withName("table_" + UUID.randomUUID())
+            .withName(getTableName())
             .withLocation(location)
             .withSchema(SCHEMA)
             .build();
@@ -73,6 +84,77 @@ public class PolarisOverlappingTableTest {
     }
   }
 
+  /**
+   * Attempt to create a table without a location, and return the location it 
gets created at If the
+   * creation fails, this should return null
+   */
+  private String createTableWithName(TestServices services, String name) {
+    CreateTableRequest createTableRequest =
+        CreateTableRequest.builder().withName(name).withSchema(SCHEMA).build();
+    try (Response response =
+        services
+            .restApi()
+            .createTable(
+                catalog,
+                namespace,
+                createTableRequest,
+                null,
+                services.realmContext(),
+                services.securityContext())) {
+      if (response.getStatus() != Response.Status.OK.getStatusCode()) {
+        return null;
+      } else {
+        return 
response.readEntity(LoadTableResponse.class).tableMetadata().location();
+      }
+    } catch (ForbiddenException e) {
+      return null;
+    }
+  }
+
+  private void createCatalogAndNamespace(
+      TestServices services, Map<String, String> catalogConfig, String 
catalogLocation) {
+    CatalogProperties.Builder propertiesBuilder =
+        CatalogProperties.builder()
+            .setDefaultBaseLocation(String.format("%s/%s", catalogLocation, 
catalog))
+            .putAll(catalogConfig);
+
+    StorageConfigInfo config =
+        FileStorageConfigInfo.builder()
+            .setStorageType(StorageConfigInfo.StorageTypeEnum.FILE)
+            .build();
+    Catalog catalogObject =
+        new Catalog(
+            Catalog.TypeEnum.INTERNAL,
+            catalog,
+            propertiesBuilder.build(),
+            1725487592064L,
+            1725487592064L,
+            1,
+            config);
+    try (Response response =
+        services
+            .catalogsApi()
+            .createCatalog(
+                new CreateCatalogRequest(catalogObject),
+                services.realmContext(),
+                services.securityContext())) {
+      
assertThat(response.getStatus()).isEqualTo(Response.Status.CREATED.getStatusCode());
+    }
+
+    CreateNamespaceRequest createNamespaceRequest =
+        
CreateNamespaceRequest.builder().withNamespace(Namespace.of(namespace)).build();
+    try (Response response =
+        services
+            .restApi()
+            .createNamespace(
+                catalog,
+                createNamespaceRequest,
+                services.realmContext(),
+                services.securityContext())) {
+      
assertThat(response.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
+    }
+  }
+
   static Stream<Arguments> testTableLocationRestrictions() {
     Map<String, Object> laxServices =
         Map.of(
@@ -129,47 +211,7 @@ public class PolarisOverlappingTableTest {
     if (baseLocation.endsWith("/")) {
       baseLocation = baseLocation.substring(0, baseLocation.length() - 1);
     }
-
-    CatalogProperties.Builder propertiesBuilder =
-        CatalogProperties.builder()
-            .setDefaultBaseLocation(String.format("%s/%s", baseLocation, 
catalog))
-            .putAll(catalogConfig);
-
-    StorageConfigInfo config =
-        FileStorageConfigInfo.builder()
-            .setStorageType(StorageConfigInfo.StorageTypeEnum.FILE)
-            .build();
-    Catalog catalogObject =
-        new Catalog(
-            Catalog.TypeEnum.INTERNAL,
-            catalog,
-            propertiesBuilder.build(),
-            1725487592064L,
-            1725487592064L,
-            1,
-            config);
-    try (Response response =
-        services
-            .catalogsApi()
-            .createCatalog(
-                new CreateCatalogRequest(catalogObject),
-                services.realmContext(),
-                services.securityContext())) {
-      
assertThat(response.getStatus()).isEqualTo(Response.Status.CREATED.getStatusCode());
-    }
-
-    CreateNamespaceRequest createNamespaceRequest =
-        
CreateNamespaceRequest.builder().withNamespace(Namespace.of(namespace)).build();
-    try (Response response =
-        services
-            .restApi()
-            .createNamespace(
-                catalog,
-                createNamespaceRequest,
-                services.realmContext(),
-                services.securityContext())) {
-      
assertThat(response.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
-    }
+    createCatalogAndNamespace(services, catalogConfig, baseLocation);
 
     // Original table
     assertThat(
@@ -214,4 +256,169 @@ public class PolarisOverlappingTableTest {
     assertThat(createTable(services, String.format("%s", baseLocation)))
         .isEqualTo(Response.Status.FORBIDDEN.getStatusCode());
   }
+
+  static Stream<Arguments> testStandardTableLocations() {
+    Map<String, Object> noPrefixCatalog =
+        Map.of(
+            ALLOW_UNSTRUCTURED_TABLE_LOCATION.catalogConfig(),
+            "true",
+            ALLOW_TABLE_LOCATION_OVERLAP.catalogConfig(),
+            "false",
+            DEFAULT_LOCATION_OBJECT_STORAGE_PREFIX_ENABLED.catalogConfig(),
+            "false");
+    return Stream.of(Arguments.of(Map.of()), Arguments.of(noPrefixCatalog));
+  }
+
+  @ParameterizedTest
+  @MethodSource()
+  @DisplayName("Test tables getting created at standard locations")
+  void testStandardTableLocations(Map<String, String> catalogConfig, @TempDir 
Path tempDir) {
+    Map<String, Object> strictServices =
+        Map.of(
+            "ALLOW_UNSTRUCTURED_TABLE_LOCATION",
+            "false",
+            "ALLOW_TABLE_LOCATION_OVERLAP",
+            "false",
+            "ALLOW_INSECURE_STORAGE_TYPES",
+            "true",
+            "SUPPORTED_CATALOG_STORAGE_TYPES",
+            List.of("FILE", "S3"));
+
+    TestServices services = 
TestServices.builder().config(strictServices).build();
+
+    String baseLocation = tempDir.toAbsolutePath().toUri().toString();
+    if (baseLocation.endsWith("/")) {
+      baseLocation = baseLocation.substring(0, baseLocation.length() - 1);
+    }
+    createCatalogAndNamespace(services, catalogConfig, baseLocation);
+
+    String tableName;
+
+    tableName = getTableName();
+    Assertions.assertEquals(
+        String.format("%s/%s/%s/%s", baseLocation, catalog, namespace, 
tableName),
+        createTableWithName(services, tableName));
+
+    // Overlap fails:
+    assertThat(
+            createTable(
+                services,
+                String.format("%s/%s/%s/%s", baseLocation, catalog, namespace, 
tableName)))
+        .isEqualTo(Response.Status.FORBIDDEN.getStatusCode());
+  }
+
+  static Stream<Arguments> testInvalidSetupsForObjectStorageLocation() {
+    Map<String, Object> prefixAndNoOverlapCatalog =
+        Map.of(
+            DEFAULT_LOCATION_OBJECT_STORAGE_PREFIX_ENABLED.catalogConfig(),
+            "true",
+            ALLOW_TABLE_LOCATION_OVERLAP.catalogConfig(),
+            "false");
+    Map<String, Object> prefixAndOverlapButNoOptimizedCatalog =
+        Map.of(
+            DEFAULT_LOCATION_OBJECT_STORAGE_PREFIX_ENABLED.catalogConfig(),
+            "true",
+            ALLOW_TABLE_LOCATION_OVERLAP.catalogConfig(),
+            "true");
+    return Stream.of(
+        Arguments.of(prefixAndNoOverlapCatalog),
+        Arguments.of(prefixAndOverlapButNoOptimizedCatalog));
+  }
+
+  @ParameterizedTest
+  @MethodSource()
+  @DisplayName("Test invalid configurations for enabling prefixed locations")
+  void testInvalidSetupsForObjectStorageLocation(
+      Map<String, String> catalogConfig, @TempDir Path tempDir) {
+    Map<String, Object> strictServicesNoOptimizedOverlapCheck =
+        Map.of(
+            "ALLOW_UNSTRUCTURED_TABLE_LOCATION",
+            "false",
+            "ALLOW_TABLE_LOCATION_OVERLAP",
+            "false",
+            "ALLOW_INSECURE_STORAGE_TYPES",
+            "true",
+            "SUPPORTED_CATALOG_STORAGE_TYPES",
+            List.of("FILE", "S3"),
+            OPTIMIZED_SIBLING_CHECK.key,
+            "false");
+
+    TestServices services =
+        
TestServices.builder().config(strictServicesNoOptimizedOverlapCheck).build();
+
+    String baseLocation = tempDir.toAbsolutePath().toUri().toString();
+    if (baseLocation.endsWith("/")) {
+      baseLocation = baseLocation.substring(0, baseLocation.length() - 1);
+    }
+    createCatalogAndNamespace(services, catalogConfig, baseLocation);
+
+    Assertions.assertThrows(
+        IllegalStateException.class, () -> createTableWithName(services, 
getTableName()));
+  }
+
+  @Test
+  @DisplayName("Test tables getting created at locations with a hash prefix")
+  public void testHashedTableLocations(@TempDir Path tempDir) {
+    Map<String, Object> strictServicesWithOptimizedOverlapCheck =
+        Map.of(
+            "ALLOW_UNSTRUCTURED_TABLE_LOCATION",
+            "false",
+            "ALLOW_TABLE_LOCATION_OVERLAP",
+            "false",
+            "ALLOW_INSECURE_STORAGE_TYPES",
+            "true",
+            "SUPPORTED_CATALOG_STORAGE_TYPES",
+            List.of("FILE", "S3"),
+            OPTIMIZED_SIBLING_CHECK.key,
+            "true");
+    Map<String, String> hashedAndOverlapButNoOptimizedCatalog =
+        Map.of(
+            DEFAULT_LOCATION_OBJECT_STORAGE_PREFIX_ENABLED.catalogConfig(),
+            "true",
+            ALLOW_UNSTRUCTURED_TABLE_LOCATION.catalogConfig(),
+            "true");
+
+    TestServices services =
+        
TestServices.builder().config(strictServicesWithOptimizedOverlapCheck).build();
+
+    String baseLocation = tempDir.toAbsolutePath().toUri().toString();
+    if (baseLocation.endsWith("/")) {
+      baseLocation = baseLocation.substring(0, baseLocation.length() - 1);
+    }
+    createCatalogAndNamespace(services, hashedAndOverlapButNoOptimizedCatalog, 
baseLocation);
+
+    String tableName;
+    String tableLocation;
+
+    // Location check works:
+    tableName = getTableName();
+    Assertions.assertNotNull(createTableWithName(services, tableName));
+
+    // Non-default pattern:
+    tableName = getTableName();
+    Assertions.assertNotEquals(
+        String.format("%s/%s/%s/%s", baseLocation, catalog, namespace, 
tableName),
+        createTableWithName(services, tableName));
+
+    // Verify components:
+    tableName = getTableName();
+    tableLocation = createTableWithName(services, tableName);
+    Assertions.assertEquals(
+        String.format("%s/%s/", baseLocation, catalog),
+        tableLocation.substring(0, String.format("%s/%s/", baseLocation, 
catalog).length()));
+    Assertions.assertEquals(
+        String.format("%s/%s", namespace, tableName),
+        tableLocation.substring(
+            String.format("%s/%s/", baseLocation, catalog).length()
+                + (LocationUtils.HASH_BINARY_STRING_BITS + 
LocationUtils.ENTROPY_DIR_LENGTH)));
+
+    // Overlap fails:
+    assertThat(createTable(services, tableLocation))
+        .isEqualTo(Response.Status.FORBIDDEN.getStatusCode());
+
+    // The hashed prefix does not actually have to be stable, so this test
+    // is okay to change in the future.
+    assertThat(createTableWithName(services, 
"determinism_check").substring(baseLocation.length()))
+        
.isEqualTo("/test-catalog/1110/1010/0001/01111010/ns/determinism_check");
+  }
 }
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/common/LocationUtils.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/common/LocationUtils.java
new file mode 100644
index 000000000..5f1130da7
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/common/LocationUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.catalog.common;
+
+import com.google.common.hash.HashCode;
+import com.google.common.hash.HashFunction;
+import com.google.common.hash.Hashing;
+import jakarta.annotation.Nonnull;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * A collection of utilities related to table locations CODE_COPIED_TO_POLARIS 
From Apache Iceberg
+ * Version: 1.9.1
+ */
+public class LocationUtils {
+
+  private static final HashFunction HASH_FUNC = Hashing.murmur3_32_fixed();
+  // Length of entropy generated in the file location
+  public static final int HASH_BINARY_STRING_BITS = 20;
+  // Entropy generated will be divided into dirs with this lengths
+  public static final int ENTROPY_DIR_LENGTH = 4;
+  // Will create DEPTH many dirs from the entropy
+  public static final int ENTROPY_DIR_DEPTH = 3;
+
+  /**
+   * Given a file path, compute a path fragment derived from its hash. This is 
taken from
+   * LocationProviders.computeHash in Iceberg.
+   *
+   * @param fileName file.txt
+   * @return 1001/1001/1001/10011001
+   */
+  public static String computeHash(@Nonnull String fileName) {
+    HashCode hashCode = HASH_FUNC.hashString(fileName, StandardCharsets.UTF_8);
+
+    // {@link Integer#toBinaryString} excludes leading zeros, which we want to 
preserve.
+    // force the first bit to be set to get around that.
+    String hashAsBinaryString = Integer.toBinaryString(hashCode.asInt() | 
Integer.MIN_VALUE);
+    // Limit hash length to HASH_BINARY_STRING_BITS
+    String hash =
+        hashAsBinaryString.substring(hashAsBinaryString.length() - 
HASH_BINARY_STRING_BITS);
+    return dirsFromHash(hash);
+  }
+
+  /**
+   * Divides hash into directories for optimized orphan removal operation 
using ENTROPY_DIR_DEPTH
+   * and ENTROPY_DIR_LENGTH
+   *
+   * @param hash 10011001100110011001
+   * @return 1001/1001/1001/10011001 with depth 3 and length 4
+   */
+  private static String dirsFromHash(String hash) {
+    StringBuilder hashWithDirs = new StringBuilder();
+
+    for (int i = 0; i < ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH; i += 
ENTROPY_DIR_LENGTH) {
+      if (i > 0) {
+        hashWithDirs.append("/");
+      }
+      hashWithDirs.append(hash, i, Math.min(i + ENTROPY_DIR_LENGTH, 
hash.length()));
+    }
+
+    if (hash.length() > ENTROPY_DIR_DEPTH * ENTROPY_DIR_LENGTH) {
+      hashWithDirs.append("/").append(hash, ENTROPY_DIR_DEPTH * 
ENTROPY_DIR_LENGTH, hash.length());
+    }
+
+    return hashWithDirs.toString();
+  }
+}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
index f8b7edf33..e1b317a91 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalog.java
@@ -30,6 +30,8 @@ import jakarta.annotation.Nullable;
 import jakarta.ws.rs.core.SecurityContext;
 import java.io.Closeable;
 import java.io.IOException;
+import java.net.URLEncoder;
+import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -91,7 +93,9 @@ import org.apache.polaris.core.admin.model.StorageConfigInfo;
 import org.apache.polaris.core.catalog.PolarisCatalogHelpers;
 import org.apache.polaris.core.config.BehaviorChangeConfiguration;
 import org.apache.polaris.core.config.FeatureConfiguration;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
 import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
 import org.apache.polaris.core.entity.CatalogEntity;
 import org.apache.polaris.core.entity.LocationBasedEntity;
 import org.apache.polaris.core.entity.NamespaceEntity;
@@ -123,6 +127,7 @@ import 
org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
 import org.apache.polaris.core.storage.PolarisStorageIntegration;
 import org.apache.polaris.core.storage.StorageLocation;
 import org.apache.polaris.service.catalog.SupportsNotifications;
+import org.apache.polaris.service.catalog.common.LocationUtils;
 import org.apache.polaris.service.catalog.io.FileIOFactory;
 import org.apache.polaris.service.catalog.io.FileIOUtil;
 import 
org.apache.polaris.service.catalog.validation.IcebergPropertiesValidation;
@@ -169,6 +174,7 @@ public class IcebergCatalog extends BaseMetastoreViewCatalog
   private final TaskExecutor taskExecutor;
   private final SecurityContext securityContext;
   private final PolarisEventListener polarisEventListener;
+  private final AtomicBoolean loggedPrefixOverlapWarning = new 
AtomicBoolean(false);
 
   private String ioImplClassName;
   private FileIO catalogFileIO;
@@ -893,12 +899,91 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
         storageInfo.get());
   }
 
+  private String buildPrefixedLocation(TableIdentifier tableIdentifier) {
+    StringBuilder locationBuilder = new StringBuilder();
+    locationBuilder.append(defaultBaseLocation);
+    if (!defaultBaseLocation.endsWith("/")) {
+      locationBuilder.append("/");
+    }
+
+    
locationBuilder.append(LocationUtils.computeHash(tableIdentifier.toString()));
+
+    for (String ns : tableIdentifier.namespace().levels()) {
+      locationBuilder.append("/").append(URLEncoder.encode(ns, 
Charset.defaultCharset()));
+    }
+    locationBuilder
+        .append("/")
+        .append(URLEncoder.encode(tableIdentifier.name(), 
Charset.defaultCharset()))
+        .append("/");
+    return locationBuilder.toString();
+  }
+
   /**
-   * Based on configuration settings, for callsites that need to handle 
potentially setting a new
-   * base location for a TableLike entity, produces the transformed location 
if applicable, or else
-   * the unaltered specified location.
+   * Applies the rule controlled by 
DEFAULT_LOCATION_OBJECT_STORAGE_PREFIX_ENABLED to a tablelike
+   * location
+   */
+  private String applyDefaultLocationObjectStoragePrefix(
+      TableIdentifier tableIdentifier, String location) {
+    RealmContext realmContext = callContext.getRealmContext();
+    PolarisConfigurationStore configurationStore =
+        callContext.getPolarisCallContext().getConfigurationStore();
+    boolean prefixEnabled =
+        configurationStore.getConfiguration(
+            realmContext,
+            catalogEntity,
+            
FeatureConfiguration.DEFAULT_LOCATION_OBJECT_STORAGE_PREFIX_ENABLED);
+    boolean allowUnstructuredTableLocation =
+        configurationStore.getConfiguration(
+            realmContext, catalogEntity, 
FeatureConfiguration.ALLOW_UNSTRUCTURED_TABLE_LOCATION);
+    boolean allowTableLocationOverlap =
+        configurationStore.getConfiguration(
+            realmContext, catalogEntity, 
FeatureConfiguration.ALLOW_TABLE_LOCATION_OVERLAP);
+    boolean optimizedSiblingCheck =
+        configurationStore.getConfiguration(
+            realmContext, catalogEntity, 
FeatureConfiguration.OPTIMIZED_SIBLING_CHECK);
+    if (location != null) {
+      return location;
+    } else if (!prefixEnabled) {
+      return location;
+    } else if (!allowUnstructuredTableLocation) {
+      throw new IllegalStateException(
+          String.format(
+              "The configuration %s is enabled, but %s is not enabled",
+              
FeatureConfiguration.DEFAULT_LOCATION_OBJECT_STORAGE_PREFIX_ENABLED.key,
+              FeatureConfiguration.ALLOW_UNSTRUCTURED_TABLE_LOCATION.key));
+    } else if (!allowTableLocationOverlap) {
+      // TODO consider doing this check any time ALLOW_EXTERNAL_TABLE_LOCATION 
is enabled, not just
+      // here
+      if (!optimizedSiblingCheck) {
+        throw new IllegalStateException(
+            String.format(
+                "%s and %s are both disabled, which means that table location 
overlap checkes are being"
+                    + " performed, but only within each namespace. However, %s 
is enabled, which indicates"
+                    + " that tables may be created outside of their parent 
namespace. This is not a safe"
+                    + " combination of configurations.",
+                FeatureConfiguration.ALLOW_TABLE_LOCATION_OVERLAP.key,
+                FeatureConfiguration.OPTIMIZED_SIBLING_CHECK.key,
+                FeatureConfiguration.ALLOW_UNSTRUCTURED_TABLE_LOCATION.key));
+      } else if (!loggedPrefixOverlapWarning.getAndSet(true)) {
+        LOGGER.warn(
+            "A table is being created with {} and {} enabled, but with {} 
disabled. "
+                + "This is a safe combination of configurations which may 
prevent table overlap, but only if the "
+                + "underlying persistence actually implements %s. Exercise 
caution.",
+            
FeatureConfiguration.DEFAULT_LOCATION_OBJECT_STORAGE_PREFIX_ENABLED.key,
+            FeatureConfiguration.OPTIMIZED_SIBLING_CHECK.key,
+            FeatureConfiguration.ALLOW_TABLE_LOCATION_OVERLAP.key);
+      }
+      return buildPrefixedLocation(tableIdentifier);
+    } else {
+      return buildPrefixedLocation(tableIdentifier);
+    }
+  }
+
+  /**
+   * Applies the rule controlled by 
REPLACE_NEW_LOCATION_PREFIX_WITH_CATALOG_DEFAULT_KEY to a
+   * tablelike location
    */
-  public String transformTableLikeLocation(String specifiedTableLikeLocation) {
+  private String applyReplaceNewLocationWithCatalogDefault(String 
specifiedTableLikeLocation) {
     String replaceNewLocationPrefix = 
catalogEntity.getReplaceNewLocationPrefixWithCatalogDefault();
     if (specifiedTableLikeLocation != null
         && replaceNewLocationPrefix != null
@@ -916,6 +1001,16 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
     return specifiedTableLikeLocation;
   }
 
+  /**
+   * Based on configuration settings, for callsites that need to handle 
potentially setting a new
+   * base location for a TableLike entity, produces the transformed location 
if applicable, or else
+   * the unaltered specified location.
+   */
+  public String transformTableLikeLocation(TableIdentifier tableIdentifier, 
String location) {
+    return applyDefaultLocationObjectStoragePrefix(
+        tableIdentifier, applyReplaceNewLocationWithCatalogDefault(location));
+  }
+
   private @Nonnull Optional<PolarisEntity> findStorageInfo(TableIdentifier 
tableIdentifier) {
     PolarisResolvedPathWrapper resolvedTableEntities =
         resolvedEntityView.getResolvedPath(
@@ -968,8 +1063,7 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
       PolarisResolvedPathWrapper resolvedStorageEntity) {
     Optional<PolarisStorageConfigurationInfo> optStorageConfiguration =
         PolarisStorageConfigurationInfo.forEntityPath(
-            callContext.getPolarisCallContext().getDiagServices(),
-            resolvedStorageEntity.getRawFullPath());
+            callContext.getPolarisCallContext(), 
resolvedStorageEntity.getRawFullPath());
 
     optStorageConfiguration.ifPresentOrElse(
         storageConfigInfo -> {
@@ -1215,28 +1309,32 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
 
   private class PolarisIcebergCatalogTableBuilder
       extends BaseMetastoreViewCatalog.BaseMetastoreViewCatalogTableBuilder {
+    private final TableIdentifier identifier;
 
     public PolarisIcebergCatalogTableBuilder(TableIdentifier identifier, 
Schema schema) {
       super(identifier, schema);
+      this.identifier = identifier;
     }
 
     @Override
     public TableBuilder withLocation(String newLocation) {
-      return super.withLocation(transformTableLikeLocation(newLocation));
+      return super.withLocation(transformTableLikeLocation(identifier, 
newLocation));
     }
   }
 
   private class PolarisIcebergCatalogViewBuilder extends 
BaseMetastoreViewCatalog.BaseViewBuilder {
+    private final TableIdentifier identifier;
 
     public PolarisIcebergCatalogViewBuilder(TableIdentifier identifier) {
       super(identifier);
       withProperties(
           PropertyUtil.propertiesWithPrefix(IcebergCatalog.this.properties(), 
"table-default."));
+      this.identifier = identifier;
     }
 
     @Override
     public ViewBuilder withLocation(String newLocation) {
-      return super.withLocation(transformTableLikeLocation(newLocation));
+      return super.withLocation(transformTableLikeLocation(identifier, 
newLocation));
     }
   }
 
@@ -2421,7 +2519,7 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
 
       // Validate location against the resolvedStorageEntity
       String metadataLocation =
-          
transformTableLikeLocation(request.getPayload().getMetadataLocation());
+          transformTableLikeLocation(tableIdentifier, 
request.getPayload().getMetadataLocation());
       validateLocationForTableLike(tableIdentifier, metadataLocation, 
resolvedStorageEntity);
 
       // Validate that we can construct a FileIO
@@ -2450,7 +2548,8 @@ public class IcebergCatalog extends 
BaseMetastoreViewCatalog
               resolvedEntities == null ? null : 
resolvedEntities.getRawLeafEntity());
 
       String existingLocation;
-      String newLocation = 
transformTableLikeLocation(request.getPayload().getMetadataLocation());
+      String newLocation =
+          transformTableLikeLocation(tableIdentifier, 
request.getPayload().getMetadataLocation());
       if (null == entity) {
         existingLocation = null;
         entity =
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
index b39e24822..c84253aa8 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
@@ -474,7 +474,8 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
       // Even if the request provides a location, run it through the catalog's 
TableBuilder
       // to inherit any override behaviors if applicable.
       if (baseCatalog instanceof IcebergCatalog) {
-        location = ((IcebergCatalog) 
baseCatalog).transformTableLikeLocation(request.location());
+        location =
+            ((IcebergCatalog) baseCatalog).transformTableLikeLocation(ident, 
request.location());
       } else {
         location = request.location();
       }
@@ -800,7 +801,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
                     String requestedLocation = ((MetadataUpdate.SetLocation) 
update).location();
                     String filteredLocation =
                         ((IcebergCatalog) baseCatalog)
-                            .transformTableLikeLocation(requestedLocation);
+                            .transformTableLikeLocation(identifier, 
requestedLocation);
                     return new MetadataUpdate.SetLocation(filteredLocation);
                   } else {
                     return update;
diff --git 
a/service/common/src/test/java/org/apache/polaris/service/catalog/common/LocationUtilsTest.java
 
b/service/common/src/test/java/org/apache/polaris/service/catalog/common/LocationUtilsTest.java
new file mode 100644
index 000000000..cd60fe80e
--- /dev/null
+++ 
b/service/common/src/test/java/org/apache/polaris/service/catalog/common/LocationUtilsTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.service.catalog.common;
+
+import java.util.List;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class LocationUtilsTest {
+
+  @Test
+  public void testHashFormat() {
+    for (String input : List.of("", " ", "foo", "かな")) {
+      String hash = LocationUtils.computeHash(input);
+      Assertions.assertThat(hash).isNotNull();
+
+      String[] parts = hash.split("/");
+      Assertions.assertThat(parts).as("Hash must have exactly 4 
segments").hasSize(4);
+
+      Assertions.assertThat(parts[0])
+          .as("First segment must be 4 chars")
+          .hasSize(4)
+          .matches("[01]+");
+
+      Assertions.assertThat(parts[1])
+          .as("Second segment must be 4 chars")
+          .hasSize(4)
+          .matches("[01]+");
+
+      Assertions.assertThat(parts[2])
+          .as("Third segment must be 4 chars")
+          .hasSize(4)
+          .matches("[01]+");
+
+      Assertions.assertThat(parts[3])
+          .as("Fourth segment must be 8 chars")
+          .hasSize(8)
+          .matches("[01]+");
+    }
+  }
+
+  @Test
+  public void testStableHashes() {
+    
Assertions.assertThat(LocationUtils.computeHash("foo")).isEqualTo("0101/1100/0100/00100000");
+    
Assertions.assertThat(LocationUtils.computeHash("foo")).isEqualTo("0101/1100/0100/00100000");
+    Assertions.assertThat(LocationUtils.computeHash("foo "))
+        .isNotEqualTo("0101/1100/0100/00100000");
+    Assertions.assertThat(LocationUtils.computeHash(" foo"))
+        .isNotEqualTo("0101/1100/0100/00100000");
+
+    Assertions.assertThat(LocationUtils.computeHash("/some/path.txt"))
+        .isEqualTo("1101/0101/1110/10001001");
+    Assertions.assertThat(LocationUtils.computeHash("/other/path.txt"))
+        .isEqualTo("1010/0010/1101/11011100");
+    Assertions.assertThat(LocationUtils.computeHash("/some/path.txt/"))
+        .isEqualTo("1110/1011/1111/11111010");
+  }
+}

Reply via email to