This is an automated email from the ASF dual-hosted git repository.

yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new a385268a3 Added support for `s3a` scheme (#1932)
a385268a3 is described below

commit a385268a3770eff69ca0a0907e778af3ee7f5c48
Author: Pavan Lanka <[email protected]>
AuthorDate: Mon Jun 30 16:44:58 2025 -0700

    Added support for `s3a` scheme (#1932)
---
 .../storage/PolarisStorageConfigurationInfo.java   |   2 +-
 .../polaris/core/storage/StorageLocation.java      |   3 +
 .../polaris/core/storage/aws/S3Location.java       |  70 ++++++
 .../storage/InMemoryStorageIntegrationTest.java    |  35 +--
 .../polaris/core/storage/StorageUtilTest.java      |   2 +-
 .../polaris/core/storage/aws/S3LocationTest.java   |  52 ++++
 .../aws/AwsCredentialsStorageIntegrationTest.java  |  12 +-
 .../admin/PolarisOverlappingCatalogTest.java       |  69 +++++-
 .../admin/PolarisS3InteroperabilityTest.java       | 266 +++++++++++++++++++++
 .../service/quarkus/entity/CatalogEntityTest.java  |  52 ++--
 .../service/catalog/io/FileIOFactoryTest.java      |  23 +-
 11 files changed, 537 insertions(+), 49 deletions(-)

diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java
index 1d5fc8098..4d54599e4 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java
@@ -233,7 +233,7 @@ public abstract class PolarisStorageConfigurationInfo {
 
   /** Polaris' storage type, each has a fixed prefix for its location */
   public enum StorageType {
-    S3("s3://"),
+    S3(List.of("s3://", "s3a://")),
     AZURE(List.of("abfs://", "wasb://", "abfss://", "wasbs://")),
     GCS("gs://"),
     FILE("file://"),
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageLocation.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageLocation.java
index 7bc768b6d..536b1bf8a 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageLocation.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageLocation.java
@@ -22,6 +22,7 @@ import jakarta.annotation.Nonnull;
 import java.net.URI;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import org.apache.polaris.core.storage.aws.S3Location;
 import org.apache.polaris.core.storage.azure.AzureLocation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +41,8 @@ public class StorageLocation {
     // TODO implement StorageLocation for all supported file systems and add 
isValidLocation
     if (AzureLocation.isAzureLocation(location)) {
       return new AzureLocation(location);
+    } else if (S3Location.isS3Location(location)) {
+      return new S3Location(location);
     } else {
       return new StorageLocation(location);
     }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/S3Location.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/S3Location.java
new file mode 100644
index 000000000..2146a5aee
--- /dev/null
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/S3Location.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.storage.aws;
+
+import jakarta.annotation.Nonnull;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.polaris.core.storage.StorageLocation;
+
+public class S3Location extends StorageLocation {
+  private static final Pattern URI_PATTERN = Pattern.compile("^(s3a?):(.+)$");
+  private final String scheme;
+  private final String locationWithoutScheme;
+
+  public S3Location(@Nonnull String location) {
+    super(location);
+    Matcher matcher = URI_PATTERN.matcher(location);
+    if (!matcher.matches()) {
+      throw new IllegalArgumentException("Invalid S3 location uri " + 
location);
+    }
+    this.scheme = matcher.group(1);
+    this.locationWithoutScheme = matcher.group(2);
+  }
+
+  public static boolean isS3Location(String location) {
+    if (location == null) {
+      return false;
+    }
+    Matcher matcher = URI_PATTERN.matcher(location);
+    return matcher.matches();
+  }
+
+  @Override
+  public boolean isChildOf(StorageLocation potentialParent) {
+    if (potentialParent instanceof S3Location) {
+      S3Location that = (S3Location) potentialParent;
+      // Given that S3 and S3A are to be treated similarly, the parent check 
ignores the prefix
+      String slashTerminatedObjectKey = 
ensureTrailingSlash(this.locationWithoutScheme);
+      String slashTerminatedObjectKeyThat = 
ensureTrailingSlash(that.locationWithoutScheme);
+      return slashTerminatedObjectKey.startsWith(slashTerminatedObjectKeyThat);
+    }
+    return false;
+  }
+
+  public String getScheme() {
+    return scheme;
+  }
+
+  @Override
+  public String withoutScheme() {
+    return locationWithoutScheme;
+  }
+}
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java
index e679d3e32..d0bf9de8a 100644
--- 
a/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java
+++ 
b/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java
@@ -33,42 +33,46 @@ import org.apache.polaris.core.context.RealmContext;
 import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.Mockito;
 
 class InMemoryStorageIntegrationTest {
 
-  @Test
-  public void testValidateAccessToLocations() {
+  @ParameterizedTest
+  @CsvSource({"s3,s3", "s3,s3a", "s3a,s3", "s3a,s3a"})
+  public void testValidateAccessToLocations(String allowedScheme, String 
locationScheme) {
     MockInMemoryStorageIntegration storage = new 
MockInMemoryStorageIntegration();
     Map<String, Map<PolarisStorageActions, 
PolarisStorageIntegration.ValidationResult>> result =
         storage.validateAccessToLocations(
             new AwsStorageConfigurationInfo(
                 PolarisStorageConfigurationInfo.StorageType.S3,
                 List.of(
-                    "s3://bucket/path/to/warehouse",
-                    "s3://bucket/anotherpath/to/warehouse",
-                    "s3://bucket2/warehouse/"),
+                    allowedScheme + "://bucket/path/to/warehouse",
+                    allowedScheme + "://bucket/anotherpath/to/warehouse",
+                    allowedScheme + "://bucket2/warehouse/"),
                 "arn:aws:iam::012345678901:role/jdoe",
                 "us-east-2"),
             Set.of(PolarisStorageActions.READ),
             Set.of(
-                "s3://bucket/path/to/warehouse/namespace/table",
-                "s3://bucket2/warehouse",
-                "s3://arandombucket/path/to/warehouse/namespace/table"));
+                locationScheme + "://bucket/path/to/warehouse/namespace/table",
+                locationScheme + "://bucket2/warehouse",
+                locationScheme + 
"://arandombucket/path/to/warehouse/namespace/table"));
     Assertions.assertThat(result)
         .hasSize(3)
         .containsEntry(
-            "s3://bucket/path/to/warehouse/namespace/table",
+            locationScheme + "://bucket/path/to/warehouse/namespace/table",
             Map.of(
                 PolarisStorageActions.READ,
                 new PolarisStorageIntegration.ValidationResult(true, "")))
         .containsEntry(
-            "s3://bucket2/warehouse",
+            locationScheme + "://bucket2/warehouse",
             Map.of(
                 PolarisStorageActions.READ,
                 new PolarisStorageIntegration.ValidationResult(true, "")))
         .containsEntry(
-            "s3://arandombucket/path/to/warehouse/namespace/table",
+            locationScheme + 
"://arandombucket/path/to/warehouse/namespace/table",
             Map.of(
                 PolarisStorageActions.READ,
                 new PolarisStorageIntegration.ValidationResult(false, "")));
@@ -89,8 +93,9 @@ class InMemoryStorageIntegrationTest {
     Assertions.assertThat(actualAccountId).isEqualTo(expectedAccountId);
   }
 
-  @Test
-  public void testValidateAccessToLocationsWithWildcard() {
+  @ParameterizedTest
+  @ValueSource(strings = {"s3", "s3a"})
+  public void testValidateAccessToLocationsWithWildcard(String s3Scheme) {
     MockInMemoryStorageIntegration storage = new 
MockInMemoryStorageIntegration();
     Map<String, Boolean> config = Map.of("ALLOW_WILDCARD_LOCATION", true);
     PolarisCallContext polarisCallContext =
@@ -113,13 +118,13 @@ class InMemoryStorageIntegrationTest {
             new FileStorageConfigurationInfo(List.of("file://", "*")),
             Set.of(PolarisStorageActions.READ),
             Set.of(
-                "s3://bucket/path/to/warehouse/namespace/table",
+                s3Scheme + "://bucket/path/to/warehouse/namespace/table",
                 "file:///etc/passwd",
                 "a/relative/subdirectory"));
     Assertions.assertThat(result)
         .hasSize(3)
         .hasEntrySatisfying(
-            "s3://bucket/path/to/warehouse/namespace/table",
+            s3Scheme + "://bucket/path/to/warehouse/namespace/table",
             val ->
                 Assertions.assertThat(val)
                     .hasSize(1)
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/core/storage/StorageUtilTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/core/storage/StorageUtilTest.java
index 99a80d75f..d11d3b927 100644
--- 
a/polaris-core/src/test/java/org/apache/polaris/core/storage/StorageUtilTest.java
+++ 
b/polaris-core/src/test/java/org/apache/polaris/core/storage/StorageUtilTest.java
@@ -31,7 +31,7 @@ public class StorageUtilTest {
   }
 
   @ParameterizedTest
-  @ValueSource(strings = {"s3", "gcs", "abfs", "wasb", "file"})
+  @ValueSource(strings = {"s3", "s3a", "gcs", "abfs", "wasb", "file"})
   public void testAbsolutePaths(String scheme) {
     Assertions.assertThat(StorageUtil.getBucket(scheme + 
"://bucket/path/file.txt"))
         .isEqualTo("bucket");
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/core/storage/aws/S3LocationTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/core/storage/aws/S3LocationTest.java
new file mode 100644
index 000000000..d89720b0d
--- /dev/null
+++ 
b/polaris-core/src/test/java/org/apache/polaris/core/storage/aws/S3LocationTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.storage.aws;
+
+import org.apache.polaris.core.storage.StorageLocation;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class S3LocationTest {
+  @ParameterizedTest
+  @ValueSource(strings = {"s3a", "s3"})
+  public void testLocation(String scheme) {
+    String locInput = scheme + "://bucket/schema1/table1";
+    StorageLocation loc = StorageLocation.of(locInput);
+    Assertions.assertThat(loc).isInstanceOf(S3Location.class);
+    S3Location s3Loc = (S3Location) loc;
+    Assertions.assertThat(s3Loc.getScheme()).isEqualTo(scheme);
+    
Assertions.assertThat(s3Loc.withoutScheme()).isEqualTo("//bucket/schema1/table1");
+    Assertions.assertThat(s3Loc.withoutScheme()).doesNotStartWith(scheme);
+    Assertions.assertThat(scheme + ":" + 
s3Loc.withoutScheme()).isEqualTo(locInput);
+  }
+
+  @ParameterizedTest
+  @CsvSource({"s3,s3a", "s3a,s3"})
+  public void testPrefixValidationIgnoresScheme(String parentScheme, String 
childScheme) {
+    StorageLocation loc1 = StorageLocation.of(childScheme + 
"://bucket/schema1/table1");
+    StorageLocation loc2 = StorageLocation.of(parentScheme + 
"://bucket/schema1");
+    Assertions.assertThat(loc1.isChildOf(loc2)).isTrue();
+
+    StorageLocation loc3 = StorageLocation.of(childScheme + 
"://bucket/schema1");
+    Assertions.assertThat(loc2.equals(loc3)).isFalse();
+  }
+}
diff --git 
a/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java
 
b/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java
index 93f96b358..7dee37ebb 100644
--- 
a/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java
+++ 
b/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java
@@ -64,8 +64,9 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
           .build();
   public static final String AWS_PARTITION = "aws";
 
-  @Test
-  public void testGetSubscopedCreds() {
+  @ParameterizedTest
+  @ValueSource(strings = {"s3a", "s3"})
+  public void testGetSubscopedCreds(String scheme) {
     StsClient stsClient = Mockito.mock(StsClient.class);
     String roleARN = "arn:aws:iam::012345678901:role/jdoe";
     String externalId = "externalId";
@@ -76,10 +77,13 @@ class AwsCredentialsStorageIntegrationTest extends 
BaseStorageIntegrationTest {
                   .isInstanceOf(AssumeRoleRequest.class)
                   
.asInstanceOf(InstanceOfAssertFactories.type(AssumeRoleRequest.class))
                   .returns(externalId, AssumeRoleRequest::externalId)
-                  .returns(roleARN, AssumeRoleRequest::roleArn);
+                  .returns(roleARN, AssumeRoleRequest::roleArn)
+                  // ensure that the policy content does not refer to S3A
+                  .extracting(AssumeRoleRequest::policy)
+                  .doesNotMatch(s -> s.contains("s3a"));
               return ASSUME_ROLE_RESPONSE;
             });
-    String warehouseDir = "s3://bucket/path/to/warehouse";
+    String warehouseDir = scheme + "://bucket/path/to/warehouse";
     EnumMap<StorageAccessProperty, String> credentials =
         new AwsCredentialsStorageIntegration(stsClient)
             .getSubscopedCreds(
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingCatalogTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingCatalogTest.java
index a1d308b00..dec374259 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingCatalogTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingCatalogTest.java
@@ -43,10 +43,25 @@ public class PolarisOverlappingCatalogTest {
       TestServices.builder().config(Map.of("ALLOW_OVERLAPPING_CATALOG_URLS", 
"false")).build();
 
   private Response createCatalog(String prefix, String defaultBaseLocation, 
boolean isExternal) {
-    return createCatalog(prefix, defaultBaseLocation, isExternal, new 
ArrayList<String>());
+    return createCatalog("s3", prefix, defaultBaseLocation, isExternal, new 
ArrayList<String>());
   }
 
   private Response createCatalog(
+      String s3Scheme, String prefix, String defaultBaseLocation, boolean 
isExternal) {
+    return createCatalog(
+        s3Scheme, prefix, defaultBaseLocation, isExternal, new 
ArrayList<String>());
+  }
+
+  private Response createCatalog(
+      String prefix,
+      String defaultBaseLocation,
+      boolean isExternal,
+      List<String> allowedLocations) {
+    return createCatalog("s3", prefix, defaultBaseLocation, isExternal, 
allowedLocations);
+  }
+
+  private Response createCatalog(
+      String s3Scheme,
       String prefix,
       String defaultBaseLocation,
       boolean isExternal,
@@ -62,7 +77,7 @@ public class PolarisOverlappingCatalogTest {
                 allowedLocations.stream()
                     .map(
                         l -> {
-                          return String.format("s3://bucket/%s/%s", prefix, l);
+                          return String.format(s3Scheme + "://bucket/%s/%s", 
prefix, l);
                         })
                     .toList())
             .build();
@@ -70,7 +85,8 @@ public class PolarisOverlappingCatalogTest {
         new Catalog(
             isExternal ? Catalog.TypeEnum.EXTERNAL : Catalog.TypeEnum.INTERNAL,
             String.format("overlap_catalog_%s", uuid),
-            new CatalogProperties(String.format("s3://bucket/%s/%s", prefix, 
defaultBaseLocation)),
+            new CatalogProperties(
+                String.format(s3Scheme + "://bucket/%s/%s", prefix, 
defaultBaseLocation)),
             System.currentTimeMillis(),
             System.currentTimeMillis(),
             1,
@@ -112,6 +128,20 @@ public class PolarisOverlappingCatalogTest {
         .hasMessageContaining("One or more of its locations overlaps with an 
existing catalog");
   }
 
+  @ParameterizedTest
+  @CsvSource({"s3,s3a", "s3a,s3"})
+  public void testBasicOverlappingCatalogWSchemeChange(String rootScheme, 
String overlapScheme) {
+    String prefix = UUID.randomUUID().toString();
+
+    assertThat(createCatalog(rootScheme, prefix, "root", false))
+        .returns(Response.Status.CREATED.getStatusCode(), Response::getStatus);
+
+    // - inside `root` but using different scheme
+    assertThatThrownBy(() -> createCatalog(overlapScheme, prefix, 
"root/child", false))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("One or more of its locations overlaps with an 
existing catalog");
+  }
+
   @ParameterizedTest
   @CsvSource({"true, true", "true, false", "false, true", "false, false"})
   public void testAllowedLocationOverlappingCatalogs(
@@ -146,4 +176,37 @@ public class PolarisOverlappingCatalogTest {
         .isInstanceOf(ValidationException.class)
         .hasMessageContaining("One or more of its locations overlaps with an 
existing catalog");
   }
+
+  @ParameterizedTest
+  @CsvSource({"s3,s3a", "s3a,s3"})
+  public void testAllowedLocationOverlappingCatalogsWSchemeChange(
+      String rootScheme, String overlapScheme) {
+    String prefix = UUID.randomUUID().toString();
+
+    assertThat(createCatalog(rootScheme, prefix, "animals", false, 
Arrays.asList("dogs", "cats")))
+        .returns(Response.Status.CREATED.getStatusCode(), Response::getStatus);
+
+    // This DBL overlaps with initial AL
+    assertThatThrownBy(
+            () ->
+                createCatalog(
+                    overlapScheme, prefix, "dogs", false, 
Arrays.asList("huskies", "labs")))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("One or more of its locations overlaps with an 
existing catalog");
+
+    // This AL overlaps with initial DBL
+    assertThatThrownBy(
+            () ->
+                createCatalog(
+                    overlapScheme, prefix, "kingdoms", false, 
Arrays.asList("plants", "animals")))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("One or more of its locations overlaps with an 
existing catalog");
+
+    // This AL overlaps with an initial AL
+    assertThatThrownBy(
+            () ->
+                createCatalog(overlapScheme, prefix, "plays", false, 
Arrays.asList("rent", "cats")))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("One or more of its locations overlaps with an 
existing catalog");
+  }
 }
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisS3InteroperabilityTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisS3InteroperabilityTest.java
new file mode 100644
index 000000000..4a3312292
--- /dev/null
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisS3InteroperabilityTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.quarkus.admin;
+
+import static 
org.apache.polaris.core.entity.CatalogEntity.DEFAULT_BASE_LOCATION_KEY;
+import static 
org.apache.polaris.core.entity.PolarisEntityConstants.ENTITY_BASE_LOCATION;
+import static 
org.apache.polaris.service.quarkus.admin.PolarisAuthzTestBase.SCHEMA;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import jakarta.ws.rs.core.Response;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.ForbiddenException;
+import org.apache.iceberg.inmemory.InMemoryFileIO;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.responses.GetNamespaceResponse;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.core.admin.model.CatalogProperties;
+import org.apache.polaris.core.admin.model.CreateCatalogRequest;
+import org.apache.polaris.core.admin.model.PolarisCatalog;
+import org.apache.polaris.core.admin.model.StorageConfigInfo;
+import org.apache.polaris.service.TestServices;
+import org.apache.polaris.service.catalog.io.FileIOFactory;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class PolarisS3InteroperabilityTest {
+
+  private static final String BASE_LOCATION = "://bucket1/base";
+  private static final Map<String, Object> SERVER_CONFIG =
+      Map.of(
+          "ALLOW_UNSTRUCTURED_TABLE_LOCATION",
+          "false",
+          "ALLOW_TABLE_LOCATION_OVERLAP",
+          "false",
+          "ALLOW_INSECURE_STORAGE_TYPES",
+          "true",
+          "SUPPORTED_CATALOG_STORAGE_TYPES",
+          List.of("FILE", "S3"));
+  private static final FileIO fileIO = new InMemoryFileIO();
+
+  private final TestServices services;
+
+  private static String makeNamespaceLocation(String catalogName, String 
namespace, String scheme) {
+    return "%s%s/%s/%s".formatted(scheme, BASE_LOCATION, catalogName, 
namespace);
+  }
+
+  private static String makeTableLocation(
+      String catalogName, String namespace, String tableName, String scheme) {
+    return "%s%s/%s/%s/%s".formatted(scheme, BASE_LOCATION, catalogName, 
namespace, tableName);
+  }
+
+  public PolarisS3InteroperabilityTest() {
+    TestServices.FileIOFactorySupplier fileIOFactorySupplier =
+        (entityManagerFactory, metaStoreManagerFactory, configurationStore) ->
+            (FileIOFactory)
+                (callContext,
+                    ioImplClassName,
+                    properties,
+                    identifier,
+                    tableLocations,
+                    storageActions,
+                    resolvedEntityPath) -> new InMemoryFileIO();
+    services =
+        TestServices.builder()
+            .config(SERVER_CONFIG)
+            .fileIOFactorySupplier(fileIOFactorySupplier)
+            .build();
+  }
+
+  private PolarisCatalog createCatalog(String catalogName, String scheme) {
+    CatalogProperties.Builder propertiesBuilder =
+        CatalogProperties.builder()
+            .setDefaultBaseLocation(String.format("%s%s/%s", scheme, 
BASE_LOCATION, catalogName))
+            .putAll(Map.of());
+
+    StorageConfigInfo config =
+        AwsStorageConfigInfo.builder()
+            .setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
+            .setRoleArn("arn:aws:iam::123456789012:role/catalog_role")
+            .build();
+    Catalog catalogObject =
+        new Catalog(
+            Catalog.TypeEnum.INTERNAL,
+            catalogName,
+            propertiesBuilder.build(),
+            1725487592064L,
+            1725487592064L,
+            1,
+            config);
+    try (Response response =
+        services
+            .catalogsApi()
+            .createCatalog(
+                new CreateCatalogRequest(catalogObject),
+                services.realmContext(),
+                services.securityContext())) {
+      
assertThat(response.getStatus()).isEqualTo(Response.Status.CREATED.getStatusCode());
+    }
+    try (Response response =
+        services
+            .catalogsApi()
+            .getCatalog(catalogName, services.realmContext(), 
services.securityContext())) {
+      return response.readEntity(PolarisCatalog.class);
+    }
+  }
+
+  private GetNamespaceResponse createNamespace(
+      String catalogName, String namespace, String scheme) {
+    Map<String, String> properties = new HashMap<>();
+    properties.put(ENTITY_BASE_LOCATION, makeNamespaceLocation(catalogName, 
namespace, scheme));
+    CreateNamespaceRequest createNamespaceRequest =
+        CreateNamespaceRequest.builder()
+            .withNamespace(Namespace.of(namespace))
+            .setProperties(properties)
+            .build();
+    try (Response response =
+        services
+            .restApi()
+            .createNamespace(
+                catalogName,
+                createNamespaceRequest,
+                services.realmContext(),
+                services.securityContext())) {
+      
assertThat(response.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
+    }
+    try (Response response =
+        services
+            .restApi()
+            .loadNamespaceMetadata(
+                catalogName, namespace, services.realmContext(), 
services.securityContext())) {
+      return response.readEntity(GetNamespaceResponse.class);
+    }
+  }
+
+  private LoadTableResponse createTable(
+      String catalogName, String namespace, String tableName, String scheme, 
String location) {
+    String tableLocation =
+        location == null ? makeTableLocation(catalogName, namespace, 
tableName, scheme) : location;
+    CreateTableRequest createTableRequest =
+        CreateTableRequest.builder()
+            .withName(tableName)
+            .withLocation(tableLocation)
+            .withSchema(SCHEMA)
+            .build();
+    try (Response response =
+        services
+            .restApi()
+            .createTable(
+                catalogName,
+                namespace,
+                createTableRequest,
+                null,
+                services.realmContext(),
+                services.securityContext())) {
+      
assertThat(response.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
+    }
+    try (Response response =
+        services
+            .restApi()
+            .loadTable(
+                catalogName,
+                namespace,
+                tableName,
+                null,
+                null,
+                "ALL",
+                services.realmContext(),
+                services.securityContext())) {
+      return response.readEntity(LoadTableResponse.class);
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(strings = {"s3", "s3a"})
+  void testCatalog(String scheme) {
+    PolarisCatalog catalogEntity = createCatalog("cat_" + scheme, scheme);
+    assertThat(catalogEntity).isNotNull();
+    assertThat(catalogEntity.getName()).isEqualTo("cat_" + scheme);
+    assertThat(catalogEntity.getProperties())
+        .extractingByKey(DEFAULT_BASE_LOCATION_KEY)
+        .asString()
+        .startsWith(scheme);
+  }
+
+  @ParameterizedTest
+  @CsvSource({"s3,s3a", "s3a,s3"})
+  public void testNamespace(String catalogScheme, String namespaceScheme) {
+    String catalogName = "cat_n_" + catalogScheme;
+    createCatalog(catalogName, catalogScheme);
+    GetNamespaceResponse namespaceEntity = createNamespace(catalogName, "ns1", 
namespaceScheme);
+    assertThat(namespaceEntity).isNotNull();
+    assertThat(namespaceEntity.properties())
+        .extractingByKey("location")
+        .asString()
+        .startsWith(namespaceScheme);
+  }
+
+  @ParameterizedTest
+  @CsvSource({"s3,s3a", "s3a,s3"})
+  public void testTable(String catalogScheme, String tableScheme) {
+    String catalogName = "cat_nt_" + catalogScheme;
+    createCatalog(catalogName, catalogScheme);
+    createNamespace(catalogName, "ns1", catalogScheme);
+    LoadTableResponse tableEntity1 = createTable(catalogName, "ns1", "tbl1", 
tableScheme, null);
+    assertThat(tableEntity1).isNotNull();
+    assertThat(tableEntity1.metadataLocation()).startsWith(tableScheme);
+    
assertThat(tableEntity1.tableMetadata().location()).startsWith(tableScheme);
+
+    LoadTableResponse tableEntity2 = createTable(catalogName, "ns1", "tbl2", 
catalogScheme, null);
+    assertThat(tableEntity2).isNotNull();
+    assertThat(tableEntity2.metadataLocation()).startsWith(catalogScheme);
+    
assertThat(tableEntity2.tableMetadata().location()).startsWith(catalogScheme);
+  }
+
+  @ParameterizedTest
+  @CsvSource({"s3,s3a", "s3a,s3"})
+  public void testTableOverlap(String table1Scheme, String table2Scheme) {
+    String catalogName = "cat_nt";
+    createCatalog(catalogName, "s3");
+    createNamespace(catalogName, "ns1", "s3");
+    createTable(
+        catalogName,
+        "ns1",
+        "tbl1",
+        table1Scheme,
+        makeTableLocation(catalogName, "ns1", "tbl1", table1Scheme));
+    ForbiddenException ex =
+        assertThrows(
+            ForbiddenException.class,
+            () ->
+                createTable(
+                    catalogName,
+                    "ns1",
+                    "tbl2",
+                    table2Scheme,
+                    makeTableLocation(catalogName, "ns1", "tbl1", 
table2Scheme)));
+    assertThat(ex.getMessage()).contains("Unable to create table at location");
+    assertThat(ex.getMessage()).contains(table1Scheme);
+    assertThat(ex.getMessage()).contains(table2Scheme);
+  }
+}
diff --git 
a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/entity/CatalogEntityTest.java
 
b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/entity/CatalogEntityTest.java
index ed44510a9..aa477d0ac 100644
--- 
a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/entity/CatalogEntityTest.java
+++ 
b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/entity/CatalogEntityTest.java
@@ -56,8 +56,9 @@ public class CatalogEntityTest {
     CallContext.setCurrentContext(polarisCallContext);
   }
 
-  @Test
-  public void testInvalidAllowedLocationPrefix() {
+  @ParameterizedTest
+  @ValueSource(strings = {"s3", "s3a"})
+  public void testInvalidAllowedLocationPrefixS3(String scheme) {
     String storageLocation = "unsupportPrefix://mybucket/path";
     AwsStorageConfigInfo awsStorageConfigModel =
         AwsStorageConfigInfo.builder()
@@ -65,20 +66,25 @@ public class CatalogEntityTest {
             .setExternalId("externalId")
             .setUserArn("aws::a:user:arn")
             .setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
-            .setAllowedLocations(List.of(storageLocation, 
"s3://externally-owned-bucket"))
+            .setAllowedLocations(List.of(storageLocation, scheme + 
"://externally-owned-bucket"))
             .build();
-    CatalogProperties prop = new CatalogProperties(storageLocation);
+    CatalogProperties props = new CatalogProperties(storageLocation);
     Catalog awsCatalog =
         PolarisCatalog.builder()
             .setType(Catalog.TypeEnum.INTERNAL)
             .setName("name")
-            .setProperties(prop)
+            .setProperties(props)
             .setStorageConfigInfo(awsStorageConfigModel)
             .build();
     Assertions.assertThatThrownBy(() -> CatalogEntity.fromCatalog(callContext, 
awsCatalog))
         .isInstanceOf(IllegalArgumentException.class)
         .hasMessageContaining(
             "Location prefix not allowed: 'unsupportPrefix://mybucket/path', 
expected prefixes");
+  }
+
+  @Test
+  public void testInvalidAllowedLocationPrefix() {
+    String storageLocation = "unsupportPrefix://mybucket/path";
 
     // Invalid azure prefix
     AzureStorageConfigInfo azureStorageConfigModel =
@@ -119,9 +125,10 @@ public class CatalogEntityTest {
             "Location prefix not allowed: 'unsupportPrefix://mybucket/path', 
expected prefixes");
   }
 
-  @Test
-  public void testExceedMaxAllowedLocations() {
-    String storageLocation = "s3://mybucket/path/";
+  @ParameterizedTest
+  @ValueSource(strings = {"s3", "s3a"})
+  public void testExceedMaxAllowedLocations(String scheme) {
+    String storageLocation = scheme + "://mybucket/path/";
     AwsStorageConfigInfo awsStorageConfigModel =
         AwsStorageConfigInfo.builder()
             .setRoleArn("arn:aws:iam::012345678901:role/jdoe")
@@ -149,19 +156,20 @@ public class CatalogEntityTest {
         .doesNotThrowAnyException();
   }
 
-  @Test
-  public void testValidAllowedLocationPrefix() {
-    String basedLocation = "s3://externally-owned-bucket";
+  @ParameterizedTest
+  @ValueSource(strings = {"s3", "s3a"})
+  public void testValidAllowedLocationPrefixS3(String scheme) {
+    String baseLocation = scheme + "://externally-owned-bucket";
     AwsStorageConfigInfo awsStorageConfigModel =
         AwsStorageConfigInfo.builder()
             .setRoleArn("arn:aws:iam::012345678901:role/jdoe")
             .setExternalId("externalId")
             .setUserArn("aws::a:user:arn")
             .setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
-            .setAllowedLocations(List.of(basedLocation))
+            .setAllowedLocations(List.of(baseLocation))
             .build();
 
-    CatalogProperties prop = new CatalogProperties(basedLocation);
+    CatalogProperties prop = new CatalogProperties(baseLocation);
     Catalog awsCatalog =
         PolarisCatalog.builder()
             .setType(Catalog.TypeEnum.INTERNAL)
@@ -171,15 +179,29 @@ public class CatalogEntityTest {
             .build();
     Assertions.assertThatNoException()
         .isThrownBy(() -> CatalogEntity.fromCatalog(callContext, awsCatalog));
+  }
 
-    basedLocation = "abfs://[email protected]/path";
-    prop.put(CatalogEntity.DEFAULT_BASE_LOCATION_KEY, basedLocation);
+  @Test
+  public void testValidAllowedLocationPrefix() {
+    String basedLocation = 
"abfs://[email protected]/path";
     AzureStorageConfigInfo azureStorageConfigModel =
         AzureStorageConfigInfo.builder()
             .setAllowedLocations(List.of(basedLocation))
             .setStorageType(StorageConfigInfo.StorageTypeEnum.AZURE)
             .setTenantId("tenantId")
             .build();
+    CatalogProperties prop = new CatalogProperties(basedLocation);
+    Catalog awsCatalog =
+        PolarisCatalog.builder()
+            .setType(Catalog.TypeEnum.INTERNAL)
+            .setName("name")
+            .setProperties(prop)
+            .setStorageConfigInfo(azureStorageConfigModel)
+            .build();
+    Assertions.assertThatNoException()
+        .isThrownBy(() -> CatalogEntity.fromCatalog(callContext, awsCatalog));
+    prop.put(CatalogEntity.DEFAULT_BASE_LOCATION_KEY, basedLocation);
+
     Catalog azureCatalog =
         PolarisCatalog.builder()
             .setType(Catalog.TypeEnum.INTERNAL)
diff --git 
a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
 
b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
index 9928e00d1..1158632eb 100644
--- 
a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
+++ 
b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
@@ -54,8 +54,9 @@ import org.apache.polaris.service.task.TaskFileIOSupplier;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.mockito.Mockito;
 import software.amazon.awssdk.services.sts.StsClient;
 import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
@@ -148,9 +149,10 @@ public class FileIOFactoryTest {
   @AfterEach
   public void after() {}
 
-  @Test
-  public void testLoadFileIOForTableLike() {
-    IcebergCatalog catalog = createCatalog(testServices);
+  @ParameterizedTest
+  @ValueSource(strings = {"s3a", "s3"})
+  public void testLoadFileIOForTableLike(String scheme) {
+    IcebergCatalog catalog = createCatalog(testServices, scheme);
     catalog.createNamespace(NS);
     catalog.createTable(TABLE, SCHEMA);
 
@@ -166,9 +168,10 @@ public class FileIOFactoryTest {
             Mockito.any());
   }
 
-  @Test
-  public void testLoadFileIOForCleanupTask() {
-    IcebergCatalog catalog = createCatalog(testServices);
+  @ParameterizedTest
+  @ValueSource(strings = {"s3a", "s3"})
+  public void testLoadFileIOForCleanupTask(String scheme) {
+    IcebergCatalog catalog = createCatalog(testServices, scheme);
     catalog.createNamespace(NS);
     catalog.createTable(TABLE, SCHEMA);
     catalog.dropTable(TABLE, true);
@@ -201,8 +204,8 @@ public class FileIOFactoryTest {
             Mockito.any());
   }
 
-  IcebergCatalog createCatalog(TestServices services) {
-    String storageLocation = "s3://my-bucket/path/to/data";
+  IcebergCatalog createCatalog(TestServices services, String scheme) {
+    String storageLocation = scheme + "://my-bucket/path/to/data";
     AwsStorageConfigInfo awsStorageConfigInfo =
         AwsStorageConfigInfo.builder()
             .setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
@@ -215,7 +218,7 @@ public class FileIOFactoryTest {
         PolarisCatalog.builder()
             .setType(Catalog.TypeEnum.INTERNAL)
             .setName(CATALOG_NAME)
-            .setProperties(new CatalogProperties("s3://tmp/path/to/data"))
+            .setProperties(new CatalogProperties(scheme + 
"://tmp/path/to/data"))
             .setStorageConfigInfo(awsStorageConfigInfo)
             .build();
     services


Reply via email to