This is an automated email from the ASF dual-hosted git repository.
yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new a385268a3 Added support for `s3a` scheme (#1932)
a385268a3 is described below
commit a385268a3770eff69ca0a0907e778af3ee7f5c48
Author: Pavan Lanka <[email protected]>
AuthorDate: Mon Jun 30 16:44:58 2025 -0700
Added support for `s3a` scheme (#1932)
---
.../storage/PolarisStorageConfigurationInfo.java | 2 +-
.../polaris/core/storage/StorageLocation.java | 3 +
.../polaris/core/storage/aws/S3Location.java | 70 ++++++
.../storage/InMemoryStorageIntegrationTest.java | 35 +--
.../polaris/core/storage/StorageUtilTest.java | 2 +-
.../polaris/core/storage/aws/S3LocationTest.java | 52 ++++
.../aws/AwsCredentialsStorageIntegrationTest.java | 12 +-
.../admin/PolarisOverlappingCatalogTest.java | 69 +++++-
.../admin/PolarisS3InteroperabilityTest.java | 266 +++++++++++++++++++++
.../service/quarkus/entity/CatalogEntityTest.java | 52 ++--
.../service/catalog/io/FileIOFactoryTest.java | 23 +-
11 files changed, 537 insertions(+), 49 deletions(-)
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java
b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java
index 1d5fc8098..4d54599e4 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/storage/PolarisStorageConfigurationInfo.java
@@ -233,7 +233,7 @@ public abstract class PolarisStorageConfigurationInfo {
/** Polaris' storage type, each has a fixed prefix for its location */
public enum StorageType {
- S3("s3://"),
+ S3(List.of("s3://", "s3a://")),
AZURE(List.of("abfs://", "wasb://", "abfss://", "wasbs://")),
GCS("gs://"),
FILE("file://"),
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageLocation.java
b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageLocation.java
index 7bc768b6d..536b1bf8a 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageLocation.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/storage/StorageLocation.java
@@ -22,6 +22,7 @@ import jakarta.annotation.Nonnull;
import java.net.URI;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.polaris.core.storage.aws.S3Location;
import org.apache.polaris.core.storage.azure.AzureLocation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,6 +41,8 @@ public class StorageLocation {
// TODO implement StorageLocation for all supported file systems and add
isValidLocation
if (AzureLocation.isAzureLocation(location)) {
return new AzureLocation(location);
+ } else if (S3Location.isS3Location(location)) {
+ return new S3Location(location);
} else {
return new StorageLocation(location);
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/S3Location.java
b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/S3Location.java
new file mode 100644
index 000000000..2146a5aee
--- /dev/null
+++
b/polaris-core/src/main/java/org/apache/polaris/core/storage/aws/S3Location.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.storage.aws;
+
+import jakarta.annotation.Nonnull;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.polaris.core.storage.StorageLocation;
+
+public class S3Location extends StorageLocation {
+ private static final Pattern URI_PATTERN = Pattern.compile("^(s3a?):(.+)$");
+ private final String scheme;
+ private final String locationWithoutScheme;
+
+ public S3Location(@Nonnull String location) {
+ super(location);
+ Matcher matcher = URI_PATTERN.matcher(location);
+ if (!matcher.matches()) {
+ throw new IllegalArgumentException("Invalid S3 location uri " +
location);
+ }
+ this.scheme = matcher.group(1);
+ this.locationWithoutScheme = matcher.group(2);
+ }
+
+ public static boolean isS3Location(String location) {
+ if (location == null) {
+ return false;
+ }
+ Matcher matcher = URI_PATTERN.matcher(location);
+ return matcher.matches();
+ }
+
+ @Override
+ public boolean isChildOf(StorageLocation potentialParent) {
+ if (potentialParent instanceof S3Location) {
+ S3Location that = (S3Location) potentialParent;
+ // Given that S3 and S3A are to be treated similarly, the parent check
ignores the prefix
+ String slashTerminatedObjectKey =
ensureTrailingSlash(this.locationWithoutScheme);
+ String slashTerminatedObjectKeyThat =
ensureTrailingSlash(that.locationWithoutScheme);
+ return slashTerminatedObjectKey.startsWith(slashTerminatedObjectKeyThat);
+ }
+ return false;
+ }
+
+ public String getScheme() {
+ return scheme;
+ }
+
+ @Override
+ public String withoutScheme() {
+ return locationWithoutScheme;
+ }
+}
diff --git
a/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java
b/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java
index e679d3e32..d0bf9de8a 100644
---
a/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java
+++
b/polaris-core/src/test/java/org/apache/polaris/core/storage/InMemoryStorageIntegrationTest.java
@@ -33,42 +33,46 @@ import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
class InMemoryStorageIntegrationTest {
- @Test
- public void testValidateAccessToLocations() {
+ @ParameterizedTest
+ @CsvSource({"s3,s3", "s3,s3a", "s3a,s3", "s3a,s3a"})
+ public void testValidateAccessToLocations(String allowedScheme, String
locationScheme) {
MockInMemoryStorageIntegration storage = new
MockInMemoryStorageIntegration();
Map<String, Map<PolarisStorageActions,
PolarisStorageIntegration.ValidationResult>> result =
storage.validateAccessToLocations(
new AwsStorageConfigurationInfo(
PolarisStorageConfigurationInfo.StorageType.S3,
List.of(
- "s3://bucket/path/to/warehouse",
- "s3://bucket/anotherpath/to/warehouse",
- "s3://bucket2/warehouse/"),
+ allowedScheme + "://bucket/path/to/warehouse",
+ allowedScheme + "://bucket/anotherpath/to/warehouse",
+ allowedScheme + "://bucket2/warehouse/"),
"arn:aws:iam::012345678901:role/jdoe",
"us-east-2"),
Set.of(PolarisStorageActions.READ),
Set.of(
- "s3://bucket/path/to/warehouse/namespace/table",
- "s3://bucket2/warehouse",
- "s3://arandombucket/path/to/warehouse/namespace/table"));
+ locationScheme + "://bucket/path/to/warehouse/namespace/table",
+ locationScheme + "://bucket2/warehouse",
+ locationScheme +
"://arandombucket/path/to/warehouse/namespace/table"));
Assertions.assertThat(result)
.hasSize(3)
.containsEntry(
- "s3://bucket/path/to/warehouse/namespace/table",
+ locationScheme + "://bucket/path/to/warehouse/namespace/table",
Map.of(
PolarisStorageActions.READ,
new PolarisStorageIntegration.ValidationResult(true, "")))
.containsEntry(
- "s3://bucket2/warehouse",
+ locationScheme + "://bucket2/warehouse",
Map.of(
PolarisStorageActions.READ,
new PolarisStorageIntegration.ValidationResult(true, "")))
.containsEntry(
- "s3://arandombucket/path/to/warehouse/namespace/table",
+ locationScheme +
"://arandombucket/path/to/warehouse/namespace/table",
Map.of(
PolarisStorageActions.READ,
new PolarisStorageIntegration.ValidationResult(false, "")));
@@ -89,8 +93,9 @@ class InMemoryStorageIntegrationTest {
Assertions.assertThat(actualAccountId).isEqualTo(expectedAccountId);
}
- @Test
- public void testValidateAccessToLocationsWithWildcard() {
+ @ParameterizedTest
+ @ValueSource(strings = {"s3", "s3a"})
+ public void testValidateAccessToLocationsWithWildcard(String s3Scheme) {
MockInMemoryStorageIntegration storage = new
MockInMemoryStorageIntegration();
Map<String, Boolean> config = Map.of("ALLOW_WILDCARD_LOCATION", true);
PolarisCallContext polarisCallContext =
@@ -113,13 +118,13 @@ class InMemoryStorageIntegrationTest {
new FileStorageConfigurationInfo(List.of("file://", "*")),
Set.of(PolarisStorageActions.READ),
Set.of(
- "s3://bucket/path/to/warehouse/namespace/table",
+ s3Scheme + "://bucket/path/to/warehouse/namespace/table",
"file:///etc/passwd",
"a/relative/subdirectory"));
Assertions.assertThat(result)
.hasSize(3)
.hasEntrySatisfying(
- "s3://bucket/path/to/warehouse/namespace/table",
+ s3Scheme + "://bucket/path/to/warehouse/namespace/table",
val ->
Assertions.assertThat(val)
.hasSize(1)
diff --git
a/polaris-core/src/test/java/org/apache/polaris/core/storage/StorageUtilTest.java
b/polaris-core/src/test/java/org/apache/polaris/core/storage/StorageUtilTest.java
index 99a80d75f..d11d3b927 100644
---
a/polaris-core/src/test/java/org/apache/polaris/core/storage/StorageUtilTest.java
+++
b/polaris-core/src/test/java/org/apache/polaris/core/storage/StorageUtilTest.java
@@ -31,7 +31,7 @@ public class StorageUtilTest {
}
@ParameterizedTest
- @ValueSource(strings = {"s3", "gcs", "abfs", "wasb", "file"})
+ @ValueSource(strings = {"s3", "s3a", "gcs", "abfs", "wasb", "file"})
public void testAbsolutePaths(String scheme) {
Assertions.assertThat(StorageUtil.getBucket(scheme +
"://bucket/path/file.txt"))
.isEqualTo("bucket");
diff --git
a/polaris-core/src/test/java/org/apache/polaris/core/storage/aws/S3LocationTest.java
b/polaris-core/src/test/java/org/apache/polaris/core/storage/aws/S3LocationTest.java
new file mode 100644
index 000000000..d89720b0d
--- /dev/null
+++
b/polaris-core/src/test/java/org/apache/polaris/core/storage/aws/S3LocationTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.polaris.core.storage.aws;
+
+import org.apache.polaris.core.storage.StorageLocation;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class S3LocationTest {
+ @ParameterizedTest
+ @ValueSource(strings = {"s3a", "s3"})
+ public void testLocation(String scheme) {
+ String locInput = scheme + "://bucket/schema1/table1";
+ StorageLocation loc = StorageLocation.of(locInput);
+ Assertions.assertThat(loc).isInstanceOf(S3Location.class);
+ S3Location s3Loc = (S3Location) loc;
+ Assertions.assertThat(s3Loc.getScheme()).isEqualTo(scheme);
+
Assertions.assertThat(s3Loc.withoutScheme()).isEqualTo("//bucket/schema1/table1");
+ Assertions.assertThat(s3Loc.withoutScheme()).doesNotStartWith(scheme);
+ Assertions.assertThat(scheme + ":" +
s3Loc.withoutScheme()).isEqualTo(locInput);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"s3,s3a", "s3a,s3"})
+ public void testPrefixValidationIgnoresScheme(String parentScheme, String
childScheme) {
+ StorageLocation loc1 = StorageLocation.of(childScheme +
"://bucket/schema1/table1");
+ StorageLocation loc2 = StorageLocation.of(parentScheme +
"://bucket/schema1");
+ Assertions.assertThat(loc1.isChildOf(loc2)).isTrue();
+
+ StorageLocation loc3 = StorageLocation.of(childScheme +
"://bucket/schema1");
+ Assertions.assertThat(loc2.equals(loc3)).isFalse();
+ }
+}
diff --git
a/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java
b/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java
index 93f96b358..7dee37ebb 100644
---
a/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java
+++
b/polaris-core/src/test/java/org/apache/polaris/service/storage/aws/AwsCredentialsStorageIntegrationTest.java
@@ -64,8 +64,9 @@ class AwsCredentialsStorageIntegrationTest extends
BaseStorageIntegrationTest {
.build();
public static final String AWS_PARTITION = "aws";
- @Test
- public void testGetSubscopedCreds() {
+ @ParameterizedTest
+ @ValueSource(strings = {"s3a", "s3"})
+ public void testGetSubscopedCreds(String scheme) {
StsClient stsClient = Mockito.mock(StsClient.class);
String roleARN = "arn:aws:iam::012345678901:role/jdoe";
String externalId = "externalId";
@@ -76,10 +77,13 @@ class AwsCredentialsStorageIntegrationTest extends
BaseStorageIntegrationTest {
.isInstanceOf(AssumeRoleRequest.class)
.asInstanceOf(InstanceOfAssertFactories.type(AssumeRoleRequest.class))
.returns(externalId, AssumeRoleRequest::externalId)
- .returns(roleARN, AssumeRoleRequest::roleArn);
+ .returns(roleARN, AssumeRoleRequest::roleArn)
+ // ensure that the policy content does not refer to S3A
+ .extracting(AssumeRoleRequest::policy)
+ .doesNotMatch(s -> s.contains("s3a"));
return ASSUME_ROLE_RESPONSE;
});
- String warehouseDir = "s3://bucket/path/to/warehouse";
+ String warehouseDir = scheme + "://bucket/path/to/warehouse";
EnumMap<StorageAccessProperty, String> credentials =
new AwsCredentialsStorageIntegration(stsClient)
.getSubscopedCreds(
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingCatalogTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingCatalogTest.java
index a1d308b00..dec374259 100644
---
a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingCatalogTest.java
+++
b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisOverlappingCatalogTest.java
@@ -43,10 +43,25 @@ public class PolarisOverlappingCatalogTest {
TestServices.builder().config(Map.of("ALLOW_OVERLAPPING_CATALOG_URLS",
"false")).build();
private Response createCatalog(String prefix, String defaultBaseLocation,
boolean isExternal) {
- return createCatalog(prefix, defaultBaseLocation, isExternal, new
ArrayList<String>());
+ return createCatalog("s3", prefix, defaultBaseLocation, isExternal, new
ArrayList<String>());
}
private Response createCatalog(
+ String s3Scheme, String prefix, String defaultBaseLocation, boolean
isExternal) {
+ return createCatalog(
+ s3Scheme, prefix, defaultBaseLocation, isExternal, new
ArrayList<String>());
+ }
+
+ private Response createCatalog(
+ String prefix,
+ String defaultBaseLocation,
+ boolean isExternal,
+ List<String> allowedLocations) {
+ return createCatalog("s3", prefix, defaultBaseLocation, isExternal,
allowedLocations);
+ }
+
+ private Response createCatalog(
+ String s3Scheme,
String prefix,
String defaultBaseLocation,
boolean isExternal,
@@ -62,7 +77,7 @@ public class PolarisOverlappingCatalogTest {
allowedLocations.stream()
.map(
l -> {
- return String.format("s3://bucket/%s/%s", prefix, l);
+ return String.format(s3Scheme + "://bucket/%s/%s",
prefix, l);
})
.toList())
.build();
@@ -70,7 +85,8 @@ public class PolarisOverlappingCatalogTest {
new Catalog(
isExternal ? Catalog.TypeEnum.EXTERNAL : Catalog.TypeEnum.INTERNAL,
String.format("overlap_catalog_%s", uuid),
- new CatalogProperties(String.format("s3://bucket/%s/%s", prefix,
defaultBaseLocation)),
+ new CatalogProperties(
+ String.format(s3Scheme + "://bucket/%s/%s", prefix,
defaultBaseLocation)),
System.currentTimeMillis(),
System.currentTimeMillis(),
1,
@@ -112,6 +128,20 @@ public class PolarisOverlappingCatalogTest {
.hasMessageContaining("One or more of its locations overlaps with an
existing catalog");
}
+ @ParameterizedTest
+ @CsvSource({"s3,s3a", "s3a,s3"})
+ public void testBasicOverlappingCatalogWSchemeChange(String rootScheme,
String overlapScheme) {
+ String prefix = UUID.randomUUID().toString();
+
+ assertThat(createCatalog(rootScheme, prefix, "root", false))
+ .returns(Response.Status.CREATED.getStatusCode(), Response::getStatus);
+
+ // - inside `root` but using different scheme
+ assertThatThrownBy(() -> createCatalog(overlapScheme, prefix,
"root/child", false))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("One or more of its locations overlaps with an
existing catalog");
+ }
+
@ParameterizedTest
@CsvSource({"true, true", "true, false", "false, true", "false, false"})
public void testAllowedLocationOverlappingCatalogs(
@@ -146,4 +176,37 @@ public class PolarisOverlappingCatalogTest {
.isInstanceOf(ValidationException.class)
.hasMessageContaining("One or more of its locations overlaps with an
existing catalog");
}
+
+ @ParameterizedTest
+ @CsvSource({"s3,s3a", "s3a,s3"})
+ public void testAllowedLocationOverlappingCatalogsWSchemeChange(
+ String rootScheme, String overlapScheme) {
+ String prefix = UUID.randomUUID().toString();
+
+ assertThat(createCatalog(rootScheme, prefix, "animals", false,
Arrays.asList("dogs", "cats")))
+ .returns(Response.Status.CREATED.getStatusCode(), Response::getStatus);
+
+ // This DBL overlaps with initial AL
+ assertThatThrownBy(
+ () ->
+ createCatalog(
+ overlapScheme, prefix, "dogs", false,
Arrays.asList("huskies", "labs")))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("One or more of its locations overlaps with an
existing catalog");
+
+ // This AL overlaps with initial DBL
+ assertThatThrownBy(
+ () ->
+ createCatalog(
+ overlapScheme, prefix, "kingdoms", false,
Arrays.asList("plants", "animals")))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("One or more of its locations overlaps with an
existing catalog");
+
+ // This AL overlaps with an initial AL
+ assertThatThrownBy(
+ () ->
+ createCatalog(overlapScheme, prefix, "plays", false,
Arrays.asList("rent", "cats")))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("One or more of its locations overlaps with an
existing catalog");
+ }
}
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisS3InteroperabilityTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisS3InteroperabilityTest.java
new file mode 100644
index 000000000..4a3312292
--- /dev/null
+++
b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisS3InteroperabilityTest.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.quarkus.admin;
+
+import static
org.apache.polaris.core.entity.CatalogEntity.DEFAULT_BASE_LOCATION_KEY;
+import static
org.apache.polaris.core.entity.PolarisEntityConstants.ENTITY_BASE_LOCATION;
+import static
org.apache.polaris.service.quarkus.admin.PolarisAuthzTestBase.SCHEMA;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import jakarta.ws.rs.core.Response;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.ForbiddenException;
+import org.apache.iceberg.inmemory.InMemoryFileIO;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
+import org.apache.iceberg.rest.requests.CreateTableRequest;
+import org.apache.iceberg.rest.responses.GetNamespaceResponse;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
+import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
+import org.apache.polaris.core.admin.model.Catalog;
+import org.apache.polaris.core.admin.model.CatalogProperties;
+import org.apache.polaris.core.admin.model.CreateCatalogRequest;
+import org.apache.polaris.core.admin.model.PolarisCatalog;
+import org.apache.polaris.core.admin.model.StorageConfigInfo;
+import org.apache.polaris.service.TestServices;
+import org.apache.polaris.service.catalog.io.FileIOFactory;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class PolarisS3InteroperabilityTest {
+
+ private static final String BASE_LOCATION = "://bucket1/base";
+ private static final Map<String, Object> SERVER_CONFIG =
+ Map.of(
+ "ALLOW_UNSTRUCTURED_TABLE_LOCATION",
+ "false",
+ "ALLOW_TABLE_LOCATION_OVERLAP",
+ "false",
+ "ALLOW_INSECURE_STORAGE_TYPES",
+ "true",
+ "SUPPORTED_CATALOG_STORAGE_TYPES",
+ List.of("FILE", "S3"));
+ private static final FileIO fileIO = new InMemoryFileIO();
+
+ private final TestServices services;
+
+ private static String makeNamespaceLocation(String catalogName, String
namespace, String scheme) {
+ return "%s%s/%s/%s".formatted(scheme, BASE_LOCATION, catalogName,
namespace);
+ }
+
+ private static String makeTableLocation(
+ String catalogName, String namespace, String tableName, String scheme) {
+ return "%s%s/%s/%s/%s".formatted(scheme, BASE_LOCATION, catalogName,
namespace, tableName);
+ }
+
+ public PolarisS3InteroperabilityTest() {
+ TestServices.FileIOFactorySupplier fileIOFactorySupplier =
+ (entityManagerFactory, metaStoreManagerFactory, configurationStore) ->
+ (FileIOFactory)
+ (callContext,
+ ioImplClassName,
+ properties,
+ identifier,
+ tableLocations,
+ storageActions,
+ resolvedEntityPath) -> new InMemoryFileIO();
+ services =
+ TestServices.builder()
+ .config(SERVER_CONFIG)
+ .fileIOFactorySupplier(fileIOFactorySupplier)
+ .build();
+ }
+
+ private PolarisCatalog createCatalog(String catalogName, String scheme) {
+ CatalogProperties.Builder propertiesBuilder =
+ CatalogProperties.builder()
+ .setDefaultBaseLocation(String.format("%s%s/%s", scheme,
BASE_LOCATION, catalogName))
+ .putAll(Map.of());
+
+ StorageConfigInfo config =
+ AwsStorageConfigInfo.builder()
+ .setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
+ .setRoleArn("arn:aws:iam::123456789012:role/catalog_role")
+ .build();
+ Catalog catalogObject =
+ new Catalog(
+ Catalog.TypeEnum.INTERNAL,
+ catalogName,
+ propertiesBuilder.build(),
+ 1725487592064L,
+ 1725487592064L,
+ 1,
+ config);
+ try (Response response =
+ services
+ .catalogsApi()
+ .createCatalog(
+ new CreateCatalogRequest(catalogObject),
+ services.realmContext(),
+ services.securityContext())) {
+
assertThat(response.getStatus()).isEqualTo(Response.Status.CREATED.getStatusCode());
+ }
+ try (Response response =
+ services
+ .catalogsApi()
+ .getCatalog(catalogName, services.realmContext(),
services.securityContext())) {
+ return response.readEntity(PolarisCatalog.class);
+ }
+ }
+
+ private GetNamespaceResponse createNamespace(
+ String catalogName, String namespace, String scheme) {
+ Map<String, String> properties = new HashMap<>();
+ properties.put(ENTITY_BASE_LOCATION, makeNamespaceLocation(catalogName,
namespace, scheme));
+ CreateNamespaceRequest createNamespaceRequest =
+ CreateNamespaceRequest.builder()
+ .withNamespace(Namespace.of(namespace))
+ .setProperties(properties)
+ .build();
+ try (Response response =
+ services
+ .restApi()
+ .createNamespace(
+ catalogName,
+ createNamespaceRequest,
+ services.realmContext(),
+ services.securityContext())) {
+
assertThat(response.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
+ }
+ try (Response response =
+ services
+ .restApi()
+ .loadNamespaceMetadata(
+ catalogName, namespace, services.realmContext(),
services.securityContext())) {
+ return response.readEntity(GetNamespaceResponse.class);
+ }
+ }
+
+ private LoadTableResponse createTable(
+ String catalogName, String namespace, String tableName, String scheme,
String location) {
+ String tableLocation =
+ location == null ? makeTableLocation(catalogName, namespace,
tableName, scheme) : location;
+ CreateTableRequest createTableRequest =
+ CreateTableRequest.builder()
+ .withName(tableName)
+ .withLocation(tableLocation)
+ .withSchema(SCHEMA)
+ .build();
+ try (Response response =
+ services
+ .restApi()
+ .createTable(
+ catalogName,
+ namespace,
+ createTableRequest,
+ null,
+ services.realmContext(),
+ services.securityContext())) {
+
assertThat(response.getStatus()).isEqualTo(Response.Status.OK.getStatusCode());
+ }
+ try (Response response =
+ services
+ .restApi()
+ .loadTable(
+ catalogName,
+ namespace,
+ tableName,
+ null,
+ null,
+ "ALL",
+ services.realmContext(),
+ services.securityContext())) {
+ return response.readEntity(LoadTableResponse.class);
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"s3", "s3a"})
+ void testCatalog(String scheme) {
+ PolarisCatalog catalogEntity = createCatalog("cat_" + scheme, scheme);
+ assertThat(catalogEntity).isNotNull();
+ assertThat(catalogEntity.getName()).isEqualTo("cat_" + scheme);
+ assertThat(catalogEntity.getProperties())
+ .extractingByKey(DEFAULT_BASE_LOCATION_KEY)
+ .asString()
+ .startsWith(scheme);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"s3,s3a", "s3a,s3"})
+ public void testNamespace(String catalogScheme, String namespaceScheme) {
+ String catalogName = "cat_n_" + catalogScheme;
+ createCatalog(catalogName, catalogScheme);
+ GetNamespaceResponse namespaceEntity = createNamespace(catalogName, "ns1",
namespaceScheme);
+ assertThat(namespaceEntity).isNotNull();
+ assertThat(namespaceEntity.properties())
+ .extractingByKey("location")
+ .asString()
+ .startsWith(namespaceScheme);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"s3,s3a", "s3a,s3"})
+ public void testTable(String catalogScheme, String tableScheme) {
+ String catalogName = "cat_nt_" + catalogScheme;
+ createCatalog(catalogName, catalogScheme);
+ createNamespace(catalogName, "ns1", catalogScheme);
+ LoadTableResponse tableEntity1 = createTable(catalogName, "ns1", "tbl1",
tableScheme, null);
+ assertThat(tableEntity1).isNotNull();
+ assertThat(tableEntity1.metadataLocation()).startsWith(tableScheme);
+
assertThat(tableEntity1.tableMetadata().location()).startsWith(tableScheme);
+
+ LoadTableResponse tableEntity2 = createTable(catalogName, "ns1", "tbl2",
catalogScheme, null);
+ assertThat(tableEntity2).isNotNull();
+ assertThat(tableEntity2.metadataLocation()).startsWith(catalogScheme);
+
assertThat(tableEntity2.tableMetadata().location()).startsWith(catalogScheme);
+ }
+
+ @ParameterizedTest
+ @CsvSource({"s3,s3a", "s3a,s3"})
+ public void testTableOverlap(String table1Scheme, String table2Scheme) {
+ String catalogName = "cat_nt";
+ createCatalog(catalogName, "s3");
+ createNamespace(catalogName, "ns1", "s3");
+ createTable(
+ catalogName,
+ "ns1",
+ "tbl1",
+ table1Scheme,
+ makeTableLocation(catalogName, "ns1", "tbl1", table1Scheme));
+ ForbiddenException ex =
+ assertThrows(
+ ForbiddenException.class,
+ () ->
+ createTable(
+ catalogName,
+ "ns1",
+ "tbl2",
+ table2Scheme,
+ makeTableLocation(catalogName, "ns1", "tbl1",
table2Scheme)));
+ assertThat(ex.getMessage()).contains("Unable to create table at location");
+ assertThat(ex.getMessage()).contains(table1Scheme);
+ assertThat(ex.getMessage()).contains(table2Scheme);
+ }
+}
diff --git
a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/entity/CatalogEntityTest.java
b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/entity/CatalogEntityTest.java
index ed44510a9..aa477d0ac 100644
---
a/runtime/service/src/test/java/org/apache/polaris/service/quarkus/entity/CatalogEntityTest.java
+++
b/runtime/service/src/test/java/org/apache/polaris/service/quarkus/entity/CatalogEntityTest.java
@@ -56,8 +56,9 @@ public class CatalogEntityTest {
CallContext.setCurrentContext(polarisCallContext);
}
- @Test
- public void testInvalidAllowedLocationPrefix() {
+ @ParameterizedTest
+ @ValueSource(strings = {"s3", "s3a"})
+ public void testInvalidAllowedLocationPrefixS3(String scheme) {
String storageLocation = "unsupportPrefix://mybucket/path";
AwsStorageConfigInfo awsStorageConfigModel =
AwsStorageConfigInfo.builder()
@@ -65,20 +66,25 @@ public class CatalogEntityTest {
.setExternalId("externalId")
.setUserArn("aws::a:user:arn")
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
- .setAllowedLocations(List.of(storageLocation,
"s3://externally-owned-bucket"))
+ .setAllowedLocations(List.of(storageLocation, scheme +
"://externally-owned-bucket"))
.build();
- CatalogProperties prop = new CatalogProperties(storageLocation);
+ CatalogProperties props = new CatalogProperties(storageLocation);
Catalog awsCatalog =
PolarisCatalog.builder()
.setType(Catalog.TypeEnum.INTERNAL)
.setName("name")
- .setProperties(prop)
+ .setProperties(props)
.setStorageConfigInfo(awsStorageConfigModel)
.build();
Assertions.assertThatThrownBy(() -> CatalogEntity.fromCatalog(callContext,
awsCatalog))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining(
"Location prefix not allowed: 'unsupportPrefix://mybucket/path',
expected prefixes");
+ }
+
+ @Test
+ public void testInvalidAllowedLocationPrefix() {
+ String storageLocation = "unsupportPrefix://mybucket/path";
// Invalid azure prefix
AzureStorageConfigInfo azureStorageConfigModel =
@@ -119,9 +125,10 @@ public class CatalogEntityTest {
"Location prefix not allowed: 'unsupportPrefix://mybucket/path',
expected prefixes");
}
- @Test
- public void testExceedMaxAllowedLocations() {
- String storageLocation = "s3://mybucket/path/";
+ @ParameterizedTest
+ @ValueSource(strings = {"s3", "s3a"})
+ public void testExceedMaxAllowedLocations(String scheme) {
+ String storageLocation = scheme + "://mybucket/path/";
AwsStorageConfigInfo awsStorageConfigModel =
AwsStorageConfigInfo.builder()
.setRoleArn("arn:aws:iam::012345678901:role/jdoe")
@@ -149,19 +156,20 @@ public class CatalogEntityTest {
.doesNotThrowAnyException();
}
- @Test
- public void testValidAllowedLocationPrefix() {
- String basedLocation = "s3://externally-owned-bucket";
+ @ParameterizedTest
+ @ValueSource(strings = {"s3", "s3a"})
+ public void testValidAllowedLocationPrefixS3(String scheme) {
+ String baseLocation = scheme + "://externally-owned-bucket";
AwsStorageConfigInfo awsStorageConfigModel =
AwsStorageConfigInfo.builder()
.setRoleArn("arn:aws:iam::012345678901:role/jdoe")
.setExternalId("externalId")
.setUserArn("aws::a:user:arn")
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
- .setAllowedLocations(List.of(basedLocation))
+ .setAllowedLocations(List.of(baseLocation))
.build();
- CatalogProperties prop = new CatalogProperties(basedLocation);
+ CatalogProperties prop = new CatalogProperties(baseLocation);
Catalog awsCatalog =
PolarisCatalog.builder()
.setType(Catalog.TypeEnum.INTERNAL)
@@ -171,15 +179,29 @@ public class CatalogEntityTest {
.build();
Assertions.assertThatNoException()
.isThrownBy(() -> CatalogEntity.fromCatalog(callContext, awsCatalog));
+ }
- basedLocation = "abfs://[email protected]/path";
- prop.put(CatalogEntity.DEFAULT_BASE_LOCATION_KEY, basedLocation);
+ @Test
+ public void testValidAllowedLocationPrefix() {
+ String basedLocation =
"abfs://[email protected]/path";
AzureStorageConfigInfo azureStorageConfigModel =
AzureStorageConfigInfo.builder()
.setAllowedLocations(List.of(basedLocation))
.setStorageType(StorageConfigInfo.StorageTypeEnum.AZURE)
.setTenantId("tenantId")
.build();
+ CatalogProperties prop = new CatalogProperties(basedLocation);
+ Catalog awsCatalog =
+ PolarisCatalog.builder()
+ .setType(Catalog.TypeEnum.INTERNAL)
+ .setName("name")
+ .setProperties(prop)
+ .setStorageConfigInfo(azureStorageConfigModel)
+ .build();
+ Assertions.assertThatNoException()
+ .isThrownBy(() -> CatalogEntity.fromCatalog(callContext, awsCatalog));
+ prop.put(CatalogEntity.DEFAULT_BASE_LOCATION_KEY, basedLocation);
+
Catalog azureCatalog =
PolarisCatalog.builder()
.setType(Catalog.TypeEnum.INTERNAL)
diff --git
a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
index 9928e00d1..1158632eb 100644
---
a/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
+++
b/service/common/src/test/java/org/apache/polaris/service/catalog/io/FileIOFactoryTest.java
@@ -54,8 +54,9 @@ import org.apache.polaris.service.task.TaskFileIOSupplier;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import software.amazon.awssdk.services.sts.StsClient;
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
@@ -148,9 +149,10 @@ public class FileIOFactoryTest {
@AfterEach
public void after() {}
- @Test
- public void testLoadFileIOForTableLike() {
- IcebergCatalog catalog = createCatalog(testServices);
+ @ParameterizedTest
+ @ValueSource(strings = {"s3a", "s3"})
+ public void testLoadFileIOForTableLike(String scheme) {
+ IcebergCatalog catalog = createCatalog(testServices, scheme);
catalog.createNamespace(NS);
catalog.createTable(TABLE, SCHEMA);
@@ -166,9 +168,10 @@ public class FileIOFactoryTest {
Mockito.any());
}
- @Test
- public void testLoadFileIOForCleanupTask() {
- IcebergCatalog catalog = createCatalog(testServices);
+ @ParameterizedTest
+ @ValueSource(strings = {"s3a", "s3"})
+ public void testLoadFileIOForCleanupTask(String scheme) {
+ IcebergCatalog catalog = createCatalog(testServices, scheme);
catalog.createNamespace(NS);
catalog.createTable(TABLE, SCHEMA);
catalog.dropTable(TABLE, true);
@@ -201,8 +204,8 @@ public class FileIOFactoryTest {
Mockito.any());
}
- IcebergCatalog createCatalog(TestServices services) {
- String storageLocation = "s3://my-bucket/path/to/data";
+ IcebergCatalog createCatalog(TestServices services, String scheme) {
+ String storageLocation = scheme + "://my-bucket/path/to/data";
AwsStorageConfigInfo awsStorageConfigInfo =
AwsStorageConfigInfo.builder()
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
@@ -215,7 +218,7 @@ public class FileIOFactoryTest {
PolarisCatalog.builder()
.setType(Catalog.TypeEnum.INTERNAL)
.setName(CATALOG_NAME)
- .setProperties(new CatalogProperties("s3://tmp/path/to/data"))
+ .setProperties(new CatalogProperties(scheme +
"://tmp/path/to/data"))
.setStorageConfigInfo(awsStorageConfigInfo)
.build();
services