This is an automated email from the ASF dual-hosted git repository.
yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new c39c26325 Core/Service: Implement PolicyCatalog Stage 2:
detach/attach/getApplicablePolicies (#1314)
c39c26325 is described below
commit c39c2632599956d2a3a0224a9be9e228b161f380
Author: Yufei Gu <[email protected]>
AuthorDate: Wed Apr 9 13:14:09 2025 -0700
Core/Service: Implement PolicyCatalog Stage 2:
detach/attach/getApplicablePolicies (#1314)
---
.../PolicyMappingAlreadyExistsException.java | 9 +-
.../exceptions/PolicyAttachException.java} | 27 +-
.../core/policy/validator/PolicyValidators.java | 9 +
.../service/quarkus/catalog/PolicyCatalogTest.java | 259 +++++++++++++-----
.../service/catalog/policy/PolicyCatalog.java | 290 ++++++++++++++++++---
.../service/exception/PolarisExceptionMapper.java | 6 +
6 files changed, 481 insertions(+), 119 deletions(-)
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolicyMappingAlreadyExistsException.java
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolicyMappingAlreadyExistsException.java
index 2cd714f25..a31e500b0 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolicyMappingAlreadyExistsException.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolicyMappingAlreadyExistsException.java
@@ -18,13 +18,15 @@
*/
package org.apache.polaris.core.persistence;
+import com.google.errorprone.annotations.FormatMethod;
+import org.apache.polaris.core.exceptions.PolarisException;
import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
/**
* Exception raised when an existing policy mapping preveents the attempted
creation of a new policy
* mapping record.
*/
-public class PolicyMappingAlreadyExistsException extends RuntimeException {
+public class PolicyMappingAlreadyExistsException extends PolarisException {
private PolarisPolicyMappingRecord existingRecord;
/**
@@ -35,6 +37,11 @@ public class PolicyMappingAlreadyExistsException extends
RuntimeException {
this.existingRecord = existingRecord;
}
+ @FormatMethod
+ public PolicyMappingAlreadyExistsException(String message, Object... arg) {
+ super(String.format(message, arg));
+ }
+
public PolarisPolicyMappingRecord getExistingRecord() {
return this.existingRecord;
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolicyMappingAlreadyExistsException.java
b/polaris-core/src/main/java/org/apache/polaris/core/policy/exceptions/PolicyAttachException.java
similarity index 52%
copy from
polaris-core/src/main/java/org/apache/polaris/core/persistence/PolicyMappingAlreadyExistsException.java
copy to
polaris-core/src/main/java/org/apache/polaris/core/policy/exceptions/PolicyAttachException.java
index 2cd714f25..e47d978d0 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolicyMappingAlreadyExistsException.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/policy/exceptions/PolicyAttachException.java
@@ -16,26 +16,19 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.polaris.core.persistence;
+package org.apache.polaris.core.policy.exceptions;
-import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
+import com.google.errorprone.annotations.FormatMethod;
+import org.apache.polaris.core.exceptions.PolarisException;
-/**
- * Exception raised when an existing policy mapping preveents the attempted
creation of a new policy
- * mapping record.
- */
-public class PolicyMappingAlreadyExistsException extends RuntimeException {
- private PolarisPolicyMappingRecord existingRecord;
-
- /**
- * @param existingRecord The conflicting record that caused creation to fail.
- */
- public PolicyMappingAlreadyExistsException(PolarisPolicyMappingRecord
existingRecord) {
- super("Existing Policy Mapping Record: " + existingRecord);
- this.existingRecord = existingRecord;
+public class PolicyAttachException extends PolarisException {
+ @FormatMethod
+ public PolicyAttachException(String message, Object... args) {
+ super(String.format(message, args));
}
- public PolarisPolicyMappingRecord getExistingRecord() {
- return this.existingRecord;
+ @FormatMethod
+ public PolicyAttachException(Throwable cause, String message, Object...
args) {
+ super(String.format(message, args), cause);
}
}
diff --git
a/polaris-core/src/main/java/org/apache/polaris/core/policy/validator/PolicyValidators.java
b/polaris-core/src/main/java/org/apache/polaris/core/policy/validator/PolicyValidators.java
index 3ccb2f6d2..4e1b4e288 100644
---
a/polaris-core/src/main/java/org/apache/polaris/core/policy/validator/PolicyValidators.java
+++
b/polaris-core/src/main/java/org/apache/polaris/core/policy/validator/PolicyValidators.java
@@ -26,6 +26,7 @@ import
org.apache.polaris.core.policy.content.maintenance.DataCompactionPolicyCo
import
org.apache.polaris.core.policy.content.maintenance.MetadataCompactionPolicyContent;
import
org.apache.polaris.core.policy.content.maintenance.OrphanFileRemovalPolicyContent;
import
org.apache.polaris.core.policy.content.maintenance.SnapshotRetentionPolicyContent;
+import org.apache.polaris.core.policy.exceptions.PolicyAttachException;
import
org.apache.polaris.core.policy.validator.maintenance.BaseMaintenancePolicyValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -102,4 +103,12 @@ public class PolicyValidators {
return false;
}
}
+
+ public static void validateAttach(PolicyEntity policy, PolarisEntity
targetEntity) {
+ if (!canAttach(policy, targetEntity)) {
+ throw new PolicyAttachException(
+ "Cannot attach policy '%s' to target entity '%s'",
+ policy.getName(), targetEntity.getName());
+ }
+ }
}
diff --git
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java
index 83bdecaaa..9e4d4b38a 100644
---
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java
+++
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java
@@ -18,6 +18,13 @@
*/
package org.apache.polaris.service.quarkus.catalog;
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static
org.apache.polaris.core.policy.PredefinedPolicyTypes.DATA_COMPACTION;
+import static
org.apache.polaris.core.policy.PredefinedPolicyTypes.METADATA_COMPACTION;
+import static
org.apache.polaris.core.policy.PredefinedPolicyTypes.ORPHAN_FILE_REMOVAL;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.when;
@@ -37,9 +44,11 @@ import java.util.Set;
import java.util.function.Supplier;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.types.Types;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
@@ -58,6 +67,7 @@ import org.apache.polaris.core.entity.PrincipalEntity;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.core.persistence.PolarisEntityManager;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException;
import org.apache.polaris.core.persistence.bootstrap.RootCredentialsSet;
import org.apache.polaris.core.persistence.cache.EntityCache;
import org.apache.polaris.core.persistence.dao.entity.BaseResult;
@@ -82,8 +92,8 @@ import
org.apache.polaris.service.config.RealmEntityManagerFactory;
import
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
import org.apache.polaris.service.task.TaskExecutor;
import org.apache.polaris.service.types.Policy;
+import org.apache.polaris.service.types.PolicyAttachmentTarget;
import org.apache.polaris.service.types.PolicyIdentifier;
-import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -112,18 +122,26 @@ public class PolicyCatalogTest {
}
}
- protected static final Namespace NS = Namespace.of("newdb");
- protected static final TableIdentifier TABLE = TableIdentifier.of(NS,
"table");
- public static final String CATALOG_NAME = "polaris-catalog";
- public static final String TEST_ACCESS_KEY = "test_access_key";
- public static final String SECRET_ACCESS_KEY = "secret_access_key";
- public static final String SESSION_TOKEN = "session_token";
-
- private static final Namespace NS1 = Namespace.of("ns1");
- private static final PolicyIdentifier POLICY1 = new PolicyIdentifier(NS1,
"p1");
- private static final PolicyIdentifier POLICY2 = new PolicyIdentifier(NS1,
"p2");
- private static final PolicyIdentifier POLICY3 = new PolicyIdentifier(NS1,
"p3");
- private static final PolicyIdentifier POLICY4 = new PolicyIdentifier(NS1,
"p4");
+ private static final Namespace NS = Namespace.of("ns1");
+ private static final TableIdentifier TABLE = TableIdentifier.of(NS, "table");
+ private static final String CATALOG_NAME = "polaris-catalog";
+ private static final String TEST_ACCESS_KEY = "test_access_key";
+ private static final String SECRET_ACCESS_KEY = "secret_access_key";
+ private static final String SESSION_TOKEN = "session_token";
+ private static final Schema SCHEMA =
+ new Schema(
+ required(3, "id", Types.IntegerType.get(), "unique ID"),
+ required(4, "data", Types.StringType.get()));
+
+ private static final PolicyIdentifier POLICY1 = new PolicyIdentifier(NS,
"p1");
+ private static final PolicyIdentifier POLICY2 = new PolicyIdentifier(NS,
"p2");
+ private static final PolicyIdentifier POLICY3 = new PolicyIdentifier(NS,
"p3");
+ private static final PolicyIdentifier POLICY4 = new PolicyIdentifier(NS,
"p4");
+ private static final PolicyAttachmentTarget POLICY_ATTACH_TARGET_NS =
+ new PolicyAttachmentTarget(PolicyAttachmentTarget.TypeEnum.NAMESPACE,
List.of(NS.levels()));
+ private static final PolicyAttachmentTarget POLICY_ATTACH_TARGET_TBL =
+ new PolicyAttachmentTarget(
+ PolicyAttachmentTarget.TypeEnum.TABLE_LIKE,
List.of(TABLE.toString().split("\\.")));
@Inject MetaStoreManagerFactory managerFactory;
@Inject PolarisConfigurationStore configurationStore;
@@ -304,8 +322,8 @@ public class PolicyCatalogTest {
@Test
public void testCreatePolicyDoesNotThrow() {
- icebergCatalog.createNamespace(NS1);
- Assertions.assertThatCode(
+ icebergCatalog.createNamespace(NS);
+ assertThatCode(
() ->
policyCatalog.createPolicy(
POLICY1,
@@ -317,11 +335,10 @@ public class PolicyCatalogTest {
@Test
public void testCreatePolicyAlreadyExists() {
- icebergCatalog.createNamespace(NS1);
+ icebergCatalog.createNamespace(NS);
policyCatalog.createPolicy(
POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test",
"{\"enable\": false}");
-
- Assertions.assertThatThrownBy(
+ assertThatThrownBy(
() ->
policyCatalog.createPolicy(
POLICY1,
@@ -330,7 +347,7 @@ public class PolicyCatalogTest {
"{\"enable\": false}"))
.isInstanceOf(AlreadyExistsException.class);
- Assertions.assertThatThrownBy(
+ assertThatThrownBy(
() ->
policyCatalog.createPolicy(
POLICY1,
@@ -342,7 +359,7 @@ public class PolicyCatalogTest {
@Test
public void testListPolicies() {
- icebergCatalog.createNamespace(NS1);
+ icebergCatalog.createNamespace(NS);
policyCatalog.createPolicy(
POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test",
"{\"enable\": false}");
@@ -356,19 +373,16 @@ public class PolicyCatalogTest {
POLICY3, PredefinedPolicyTypes.SNAPSHOT_RETENTION.getName(), "test",
"{\"enable\": false}");
policyCatalog.createPolicy(
- POLICY4,
- PredefinedPolicyTypes.ORPHAN_FILE_REMOVAL.getName(),
- "test",
- "{\"enable\": false}");
+ POLICY4, ORPHAN_FILE_REMOVAL.getName(), "test", "{\"enable\": false}");
- List<PolicyIdentifier> listResult = policyCatalog.listPolicies(NS1, null);
- Assertions.assertThat(listResult).hasSize(4);
- Assertions.assertThat(listResult).containsExactlyInAnyOrder(POLICY1,
POLICY2, POLICY3, POLICY4);
+ List<PolicyIdentifier> listResult = policyCatalog.listPolicies(NS, null);
+ assertThat(listResult).hasSize(4);
+ assertThat(listResult).containsExactlyInAnyOrder(POLICY1, POLICY2,
POLICY3, POLICY4);
}
@Test
public void testListPoliciesFilterByPolicyType() {
- icebergCatalog.createNamespace(NS1);
+ icebergCatalog.createNamespace(NS);
policyCatalog.createPolicy(
POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test",
"{\"enable\": false}");
@@ -382,37 +396,32 @@ public class PolicyCatalogTest {
POLICY3, PredefinedPolicyTypes.SNAPSHOT_RETENTION.getName(), "test",
"{\"enable\": false}");
policyCatalog.createPolicy(
- POLICY4,
- PredefinedPolicyTypes.ORPHAN_FILE_REMOVAL.getName(),
- "test",
- "{\"enable\": false}");
+ POLICY4, ORPHAN_FILE_REMOVAL.getName(), "test", "{\"enable\": false}");
- List<PolicyIdentifier> listResult =
- policyCatalog.listPolicies(NS1,
PredefinedPolicyTypes.ORPHAN_FILE_REMOVAL);
- Assertions.assertThat(listResult).hasSize(1);
- Assertions.assertThat(listResult).containsExactlyInAnyOrder(POLICY4);
+ List<PolicyIdentifier> listResult = policyCatalog.listPolicies(NS,
ORPHAN_FILE_REMOVAL);
+ assertThat(listResult).hasSize(1);
+ assertThat(listResult).containsExactlyInAnyOrder(POLICY4);
}
@Test
public void testLoadPolicy() {
- icebergCatalog.createNamespace(NS1);
+ icebergCatalog.createNamespace(NS);
policyCatalog.createPolicy(
POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test",
"{\"enable\": false}");
Policy policy = policyCatalog.loadPolicy(POLICY1);
- Assertions.assertThat(policy.getVersion()).isEqualTo(0);
- Assertions.assertThat(policy.getPolicyType())
- .isEqualTo(PredefinedPolicyTypes.DATA_COMPACTION.getName());
- Assertions.assertThat(policy.getContent()).isEqualTo("{\"enable\":
false}");
- Assertions.assertThat(policy.getName()).isEqualTo("p1");
- Assertions.assertThat(policy.getDescription()).isEqualTo("test");
+ assertThat(policy.getVersion()).isEqualTo(0);
+
assertThat(policy.getPolicyType()).isEqualTo(PredefinedPolicyTypes.DATA_COMPACTION.getName());
+ assertThat(policy.getContent()).isEqualTo("{\"enable\": false}");
+ assertThat(policy.getName()).isEqualTo("p1");
+ assertThat(policy.getDescription()).isEqualTo("test");
}
@Test
public void testCreatePolicyWithInvalidContent() {
- icebergCatalog.createNamespace(NS1);
+ icebergCatalog.createNamespace(NS);
- Assertions.assertThatThrownBy(
+ assertThatThrownBy(
() ->
policyCatalog.createPolicy(
POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(),
"test", "invalid"))
@@ -421,76 +430,192 @@ public class PolicyCatalogTest {
@Test
public void testLoadPolicyNotExist() {
- icebergCatalog.createNamespace(NS1);
+ icebergCatalog.createNamespace(NS);
- Assertions.assertThatThrownBy(() -> policyCatalog.loadPolicy(POLICY1))
+ assertThatThrownBy(() -> policyCatalog.loadPolicy(POLICY1))
.isInstanceOf(NoSuchPolicyException.class);
}
@Test
public void testUpdatePolicy() {
- icebergCatalog.createNamespace(NS1);
+ icebergCatalog.createNamespace(NS);
policyCatalog.createPolicy(
POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test",
"{\"enable\": false}");
-
policyCatalog.updatePolicy(POLICY1, "updated", "{\"enable\": true}", 0);
Policy policy = policyCatalog.loadPolicy(POLICY1);
- Assertions.assertThat(policy.getVersion()).isEqualTo(1);
- Assertions.assertThat(policy.getPolicyType())
- .isEqualTo(PredefinedPolicyTypes.DATA_COMPACTION.getName());
- Assertions.assertThat(policy.getContent()).isEqualTo("{\"enable\": true}");
- Assertions.assertThat(policy.getName()).isEqualTo("p1");
- Assertions.assertThat(policy.getDescription()).isEqualTo("updated");
+ assertThat(policy.getVersion()).isEqualTo(1);
+
assertThat(policy.getPolicyType()).isEqualTo(PredefinedPolicyTypes.DATA_COMPACTION.getName());
+ assertThat(policy.getContent()).isEqualTo("{\"enable\": true}");
+ assertThat(policy.getName()).isEqualTo("p1");
+ assertThat(policy.getDescription()).isEqualTo("updated");
}
@Test
public void testUpdatePolicyWithWrongVersion() {
- icebergCatalog.createNamespace(NS1);
+ icebergCatalog.createNamespace(NS);
policyCatalog.createPolicy(
POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test",
"{\"enable\": false}");
- Assertions.assertThatThrownBy(
+ assertThatThrownBy(
() -> policyCatalog.updatePolicy(POLICY1, "updated", "{\"enable\":
true}", 1))
.isInstanceOf(PolicyVersionMismatchException.class);
}
@Test
public void testUpdatePolicyWithInvalidContent() {
- icebergCatalog.createNamespace(NS1);
+ icebergCatalog.createNamespace(NS);
policyCatalog.createPolicy(
POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test",
"{\"enable\": false}");
- Assertions.assertThatThrownBy(
- () -> policyCatalog.updatePolicy(POLICY1, "updated", "invalid", 0))
+ assertThatThrownBy(() -> policyCatalog.updatePolicy(POLICY1, "updated",
"invalid", 0))
.isInstanceOf(InvalidPolicyException.class);
}
@Test
public void testUpdatePolicyNotExist() {
- icebergCatalog.createNamespace(NS1);
+ icebergCatalog.createNamespace(NS);
- Assertions.assertThatThrownBy(
+ assertThatThrownBy(
() -> policyCatalog.updatePolicy(POLICY1, "updated", "{\"enable\":
true}", 0))
.isInstanceOf(NoSuchPolicyException.class);
}
@Test
public void testDropPolicy() {
- icebergCatalog.createNamespace(NS1);
+ icebergCatalog.createNamespace(NS);
policyCatalog.createPolicy(
POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test",
"{\"enable\": false}");
- Assertions.assertThat(policyCatalog.dropPolicy(POLICY1, false)).isTrue();
- Assertions.assertThatThrownBy(() -> policyCatalog.loadPolicy(POLICY1))
+ policyCatalog.dropPolicy(POLICY1, false);
+ assertThatThrownBy(() -> policyCatalog.loadPolicy(POLICY1))
.isInstanceOf(NoSuchPolicyException.class);
}
@Test
public void testDropPolicyNotExist() {
- icebergCatalog.createNamespace(NS1);
+ icebergCatalog.createNamespace(NS);
- Assertions.assertThatThrownBy(() -> policyCatalog.dropPolicy(POLICY1,
false))
+ assertThatThrownBy(() -> policyCatalog.dropPolicy(POLICY1, false))
.isInstanceOf(NoSuchPolicyException.class);
}
+
+ @Test
+ public void testAttachPolicy() {
+ icebergCatalog.createNamespace(NS);
+ policyCatalog.createPolicy(POLICY1, DATA_COMPACTION.getName(), "test",
"{\"enable\": false}");
+
+ var target = new
PolicyAttachmentTarget(PolicyAttachmentTarget.TypeEnum.CATALOG, List.of());
+ policyCatalog.attachPolicy(POLICY1, target, null);
+ assertThat(policyCatalog.getApplicablePolicies(null, null,
null).size()).isEqualTo(1);
+ }
+
+ @Test
+ public void testAttachPolicyConflict() {
+ icebergCatalog.createNamespace(NS);
+ policyCatalog.createPolicy(POLICY1, DATA_COMPACTION.getName(), "test",
"{\"enable\": false}");
+ policyCatalog.createPolicy(POLICY2, DATA_COMPACTION.getName(), "test",
"{\"enable\": true}");
+
+ var target = new
PolicyAttachmentTarget(PolicyAttachmentTarget.TypeEnum.CATALOG, List.of());
+ policyCatalog.attachPolicy(POLICY1, target, null);
+ // Attempt to attach a conflicting second policy and expect an exception
+ assertThatThrownBy(() -> policyCatalog.attachPolicy(POLICY2, target, null))
+ .isInstanceOf(PolicyMappingAlreadyExistsException.class)
+ .hasMessage(
+ String.format(
+ "The policy mapping of same type (%s) for %s already exists",
+ DATA_COMPACTION.getName(), CATALOG_NAME));
+ }
+
+ @Test
+ public void testDetachPolicy() {
+ icebergCatalog.createNamespace(NS);
+ policyCatalog.createPolicy(POLICY1, DATA_COMPACTION.getName(), "test",
"{\"enable\": false}");
+
+ policyCatalog.attachPolicy(POLICY1, POLICY_ATTACH_TARGET_NS, null);
+ assertThat(policyCatalog.getApplicablePolicies(NS, null,
null).size()).isEqualTo(1);
+ policyCatalog.detachPolicy(POLICY1, POLICY_ATTACH_TARGET_NS);
+ assertThat(policyCatalog.getApplicablePolicies(NS, null,
null).size()).isEqualTo(0);
+ }
+
+ @Test
+ public void testPolicyOverwrite() {
+ icebergCatalog.createNamespace(NS);
+ policyCatalog.createPolicy(POLICY1, DATA_COMPACTION.getName(), "test",
"{\"enable\": false}");
+ policyCatalog.createPolicy(POLICY2, DATA_COMPACTION.getName(), "test",
"{\"enable\": true}");
+
+ // attach to catalog
+ var target = new
PolicyAttachmentTarget(PolicyAttachmentTarget.TypeEnum.CATALOG, List.of());
+ policyCatalog.attachPolicy(POLICY1, target, null);
+
+ // attach to namespace
+ policyCatalog.attachPolicy(POLICY2, POLICY_ATTACH_TARGET_NS, null);
+ var applicablePolicies = policyCatalog.getApplicablePolicies(NS, null,
null);
+ assertThat(applicablePolicies.size()).isEqualTo(1);
+ assertThat(applicablePolicies.getFirst().getName())
+ .isEqualTo(POLICY2.getName())
+ .as("Namespace level policy overwrite the catalog level policy with
the same type");
+ }
+
+ @Test
+ public void testPolicyInheritance() {
+ icebergCatalog.createNamespace(NS);
+ var p1 =
+ policyCatalog.createPolicy(
+ POLICY1, METADATA_COMPACTION.getName(), "test", "{\"enable\":
false}");
+ var p2 =
+ policyCatalog.createPolicy(
+ POLICY2, DATA_COMPACTION.getName(), "test", "{\"enable\": true}");
+
+ // attach a policy to catalog
+ var target = new
PolicyAttachmentTarget(PolicyAttachmentTarget.TypeEnum.CATALOG, List.of());
+ policyCatalog.attachPolicy(POLICY1, target, null);
+
+ // attach a different type of policy to namespace
+ policyCatalog.attachPolicy(POLICY2, POLICY_ATTACH_TARGET_NS, null);
+ var applicablePolicies = policyCatalog.getApplicablePolicies(NS, null,
null);
+ assertThat(applicablePolicies.size()).isEqualTo(2);
+ assertThat(applicablePolicies.contains(p1)).isTrue();
+ assertThat(applicablePolicies.contains(p2)).isTrue();
+
+ // attach policies to a table
+ icebergCatalog.createTable(TABLE, SCHEMA);
+ applicablePolicies = policyCatalog.getApplicablePolicies(NS, TABLE.name(),
null);
+ assertThat(applicablePolicies.size()).isEqualTo(2);
+ // attach a third type of policy to a table
+ policyCatalog.createPolicy(
+ POLICY3, ORPHAN_FILE_REMOVAL.getName(), "test", "{\"enable\": false}");
+ policyCatalog.attachPolicy(POLICY3, POLICY_ATTACH_TARGET_TBL, null);
+ applicablePolicies = policyCatalog.getApplicablePolicies(NS, TABLE.name(),
null);
+ assertThat(applicablePolicies.size()).isEqualTo(3);
+ // create policy 4 with one of types from its parent
+ var p4 =
+ policyCatalog.createPolicy(
+ POLICY4, DATA_COMPACTION.getName(), "test", "{\"enable\": false}");
+ policyCatalog.attachPolicy(POLICY4, POLICY_ATTACH_TARGET_TBL, null);
+ applicablePolicies = policyCatalog.getApplicablePolicies(NS, TABLE.name(),
null);
+ // p2 should be overwritten by p4, as they are the same type
+ assertThat(applicablePolicies.contains(p4)).isTrue();
+ assertThat(applicablePolicies.contains(p2)).isFalse();
+ }
+
+ @Test
+ public void testGetApplicablePoliciesFilterOnType() {
+ icebergCatalog.createNamespace(NS);
+ policyCatalog.createPolicy(
+ POLICY1, METADATA_COMPACTION.getName(), "test", "{\"enable\": false}");
+ var p2 =
+ policyCatalog.createPolicy(
+ POLICY2, DATA_COMPACTION.getName(), "test", "{\"enable\": true}");
+
+ // attach a policy to catalog
+ var target = new
PolicyAttachmentTarget(PolicyAttachmentTarget.TypeEnum.CATALOG, List.of());
+ policyCatalog.attachPolicy(POLICY1, target, null);
+
+ // attach a different type of policy to namespace
+ policyCatalog.attachPolicy(POLICY2, POLICY_ATTACH_TARGET_NS, null);
+ var applicablePolicies = policyCatalog.getApplicablePolicies(NS, null,
DATA_COMPACTION);
+ // only p2 is with the type fetched
+ assertThat(applicablePolicies.contains(p2)).isTrue();
+ }
}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
index a4fb23da7..718b4ef9e 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
@@ -18,12 +18,22 @@
*/
package org.apache.polaris.service.catalog.policy;
+import static
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.POLICY_MAPPING_OF_SAME_TYPE_ALREADY_EXISTS;
+import static
org.apache.polaris.service.types.PolicyAttachmentTarget.TypeEnum.CATALOG;
+
+import com.google.common.base.Strings;
+import jakarta.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.BadRequestException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.entity.CatalogEntity;
import org.apache.polaris.core.entity.PolarisEntity;
@@ -31,15 +41,18 @@ import org.apache.polaris.core.entity.PolarisEntitySubType;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
-import org.apache.polaris.core.persistence.dao.entity.DropEntityResult;
+import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException;
import org.apache.polaris.core.persistence.dao.entity.EntityResult;
+import org.apache.polaris.core.persistence.dao.entity.LoadPolicyMappingsResult;
import
org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView;
import org.apache.polaris.core.policy.PolicyEntity;
import org.apache.polaris.core.policy.PolicyType;
import org.apache.polaris.core.policy.exceptions.NoSuchPolicyException;
+import org.apache.polaris.core.policy.exceptions.PolicyAttachException;
import
org.apache.polaris.core.policy.exceptions.PolicyVersionMismatchException;
import org.apache.polaris.core.policy.validator.PolicyValidators;
import org.apache.polaris.service.types.Policy;
+import org.apache.polaris.service.types.PolicyAttachmentTarget;
import org.apache.polaris.service.types.PolicyIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -177,16 +190,8 @@ public class PolicyCatalog {
}
public Policy loadPolicy(PolicyIdentifier policyIdentifier) {
- PolarisResolvedPathWrapper resolvedEntities =
- resolvedEntityView.getPassthroughResolvedPath(
- policyIdentifier, PolarisEntityType.POLICY,
PolarisEntitySubType.NULL_SUBTYPE);
-
- PolicyEntity policy =
- PolicyEntity.of(resolvedEntities == null ? null :
resolvedEntities.getRawLeafEntity());
-
- if (policy == null) {
- throw new NoSuchPolicyException(String.format("Policy does not exist:
%s", policyIdentifier));
- }
+ var resolvedPolicyPath = getResolvedPathWrapper(policyIdentifier);
+ var policy = PolicyEntity.of(resolvedPolicyPath.getRawLeafEntity());
return constructPolicy(policy);
}
@@ -195,16 +200,8 @@ public class PolicyCatalog {
String newDescription,
String newContent,
int currentPolicyVersion) {
- PolarisResolvedPathWrapper resolvedEntities =
- resolvedEntityView.getPassthroughResolvedPath(
- policyIdentifier, PolarisEntityType.POLICY,
PolarisEntitySubType.NULL_SUBTYPE);
-
- PolicyEntity policy =
- PolicyEntity.of(resolvedEntities == null ? null :
resolvedEntities.getRawLeafEntity());
-
- if (policy == null) {
- throw new NoSuchPolicyException(String.format("Policy does not exist:
%s", policyIdentifier));
- }
+ var resolvedPolicyPath = getResolvedPathWrapper(policyIdentifier);
+ var policy = PolicyEntity.of(resolvedPolicyPath.getRawLeafEntity());
// Verify that the current version of the policy matches the version that
the user is trying to
// update
@@ -229,7 +226,7 @@ public class PolicyCatalog {
PolicyValidators.validate(newPolicyEntity);
- List<PolarisEntity> catalogPath = resolvedEntities.getRawParentPath();
+ List<PolarisEntity> catalogPath = resolvedPolicyPath.getRawParentPath();
newPolicyEntity =
Optional.ofNullable(
metaStoreManager
@@ -251,25 +248,250 @@ public class PolicyCatalog {
public boolean dropPolicy(PolicyIdentifier policyIdentifier, boolean
detachAll) {
// TODO: Implement detachAll when we support attach/detach policy
- PolarisResolvedPathWrapper resolvedEntities =
- resolvedEntityView.getPassthroughResolvedPath(
- policyIdentifier, PolarisEntityType.POLICY,
PolarisEntitySubType.NULL_SUBTYPE);
- if (resolvedEntities == null) {
- throw new NoSuchPolicyException(String.format("Policy does not exist:
%s", policyIdentifier));
- }
-
- List<PolarisEntity> catalogPath = resolvedEntities.getRawParentPath();
- PolarisEntity leafEntity = resolvedEntities.getRawLeafEntity();
+ var resolvedPolicyPath = getResolvedPathWrapper(policyIdentifier);
+ var catalogPath = resolvedPolicyPath.getRawParentPath();
+ var policyEntity = resolvedPolicyPath.getRawLeafEntity();
- DropEntityResult dropEntityResult =
+ var result =
metaStoreManager.dropEntityIfExists(
callContext.getPolarisCallContext(),
PolarisEntity.toCoreList(catalogPath),
- leafEntity,
+ policyEntity,
Map.of(),
false);
- return dropEntityResult.isSuccess();
+ if (!result.isSuccess()) {
+ throw new IllegalStateException(
+ String.format(
+ "Failed to drop policy %s error status: %s with extraInfo: %s",
+ policyIdentifier, result.getReturnStatus(),
result.getExtraInformation()));
+ }
+
+ return true;
+ }
+
+ public boolean attachPolicy(
+ PolicyIdentifier policyIdentifier,
+ PolicyAttachmentTarget target,
+ Map<String, String> parameters) {
+
+ var resolvedPolicyPath = getResolvedPathWrapper(policyIdentifier);
+ var policyCatalogPath =
PolarisEntity.toCoreList(resolvedPolicyPath.getRawParentPath());
+ var policyEntity = PolicyEntity.of(resolvedPolicyPath.getRawLeafEntity());
+
+ var resolvedTargetPath = getResolvedPathWrapper(target);
+ var targetCatalogPath =
PolarisEntity.toCoreList(resolvedTargetPath.getRawParentPath());
+ var targetEntity = resolvedTargetPath.getRawLeafEntity();
+
+ PolicyValidators.validateAttach(policyEntity, targetEntity);
+
+ var result =
+ metaStoreManager.attachPolicyToEntity(
+ callContext.getPolarisCallContext(),
+ targetCatalogPath,
+ targetEntity,
+ policyCatalogPath,
+ policyEntity,
+ parameters);
+
+ if (!result.isSuccess()) {
+ var targetId = getIdentifier(target);
+ if (result.getReturnStatus() ==
POLICY_MAPPING_OF_SAME_TYPE_ALREADY_EXISTS) {
+ throw new PolicyMappingAlreadyExistsException(
+ "The policy mapping of same type (%s) for %s already exists",
+ policyEntity.getPolicyType().getName(), targetId);
+ }
+
+ throw new PolicyAttachException(
+ "Failed to attach policy %s to %s: %s with extraInfo: %s",
+ policyIdentifier, targetId, result.getReturnStatus(),
result.getExtraInformation());
+ }
+
+ return true;
+ }
+
+ public boolean detachPolicy(PolicyIdentifier policyIdentifier,
PolicyAttachmentTarget target) {
+ var resolvedPolicyPath = getResolvedPathWrapper(policyIdentifier);
+ var policyCatalogPath =
PolarisEntity.toCoreList(resolvedPolicyPath.getRawParentPath());
+ var policyEntity = PolicyEntity.of(resolvedPolicyPath.getRawLeafEntity());
+
+ var resolvedTargetPath = getResolvedPathWrapper(target);
+ var targetCatalogPath =
PolarisEntity.toCoreList(resolvedTargetPath.getRawParentPath());
+ var targetEntity = resolvedTargetPath.getRawLeafEntity();
+
+ var result =
+ metaStoreManager.detachPolicyFromEntity(
+ callContext.getPolarisCallContext(),
+ targetCatalogPath,
+ targetEntity,
+ policyCatalogPath,
+ policyEntity);
+
+ if (!result.isSuccess()) {
+ throw new IllegalStateException(
+ String.format(
+ "Failed to detach policy %s from %s error status: %s with
extraInfo: %s",
+ policyIdentifier,
+ getIdentifier(target),
+ result.getReturnStatus(),
+ result.getExtraInformation()));
+ }
+
+ return true;
+ }
+
+ public List<Policy> getApplicablePolicies(
+ Namespace namespace, String targetName, PolicyType policyType) {
+ var targetFullPath = getFullPath(namespace, targetName);
+ return getEffectivePolicies(targetFullPath, policyType);
+ }
+
+ /**
+ * Returns the effective policies for a given hierarchical path and policy
type.
+ *
+ * <p>Potential Performance Improvements:
+ *
+ * <ul>
+ * <li>Range Query Optimization: Enhance the query mechanism to fetch
policies for all entities
+ * in a single range query, reducing the number of individual queries
against the mapping
+ * table.
+ * <li>Filtering on Inheritable: Improve the filtering process by applying
the inheritable
+ * condition at the data retrieval level, so that only the relevant
policies for non-leaf
+ * nodes are processed.
+ * <li>Caching: Implement caching for up-level policies to avoid redundant
calculations and
+ * lookups, especially for frequently accessed paths.
+ * </ul>
+ *
+ * @param path the list of entities representing the hierarchical path
+ * @param policyType the type of policy to filter on
+ * @return a list of effective policies, combining inherited policies from
upper levels and
+ * non-inheritable policies from the final entity
+ */
+ private List<Policy> getEffectivePolicies(List<PolarisEntity> path,
PolicyType policyType) {
+ if (path == null || path.isEmpty()) {
+ return List.of();
+ }
+
+ Map<String, PolicyEntity> inheritedPolicies = new LinkedHashMap<>();
+ // Final list of effective policies (inheritable + last-entity
non-inheritable)
+ List<PolicyEntity> finalPolicies = new ArrayList<>();
+
+ // Process all entities except the last one
+ for (int i = 0; i < path.size() - 1; i++) {
+ PolarisEntity entity = path.get(i);
+ var currentPolicies = getPolicies(entity, policyType);
+
+ for (var policy : currentPolicies) {
+ // For non-last entities, we only carry forward inheritable policies
+ if (policy.getPolicyType().isInheritable()) {
+ // Put in map; overwrites by policyType if encountered again
+ inheritedPolicies.put(policy.getPolicyType().getName(), policy);
+ }
+ }
+ }
+
+ // Now handle the last entity's policies
+ List<PolicyEntity> lastPolicies = getPolicies(path.getLast(), policyType);
+
+ for (var policy : lastPolicies) {
+ if (policy.getPolicyType().isInheritable()) {
+ // Overwrite anything by the same policyType in the inherited map
+ inheritedPolicies.put(policy.getPolicyType().getName(), policy);
+ } else {
+ // Non-inheritable => goes directly to final list
+ finalPolicies.add(policy);
+ }
+ }
+
+ // Append all inherited policies at the end, preserving insertion order
+ finalPolicies.addAll(inheritedPolicies.values());
+
+ return finalPolicies.stream().map(PolicyCatalog::constructPolicy).toList();
+ }
+
+ private List<PolicyEntity> getPolicies(PolarisEntity target, PolicyType
policyType) {
+ LoadPolicyMappingsResult result;
+ if (policyType == null) {
+ result =
metaStoreManager.loadPoliciesOnEntity(callContext.getPolarisCallContext(),
target);
+ } else {
+ result =
+ metaStoreManager.loadPoliciesOnEntityByType(
+ callContext.getPolarisCallContext(), target, policyType);
+ }
+
+ return result.getEntities().stream().map(PolicyEntity::of).toList();
+ }
+
+ private List<PolarisEntity> getFullPath(Namespace namespace, String
targetName) {
+ if (namespace == null || namespace.isEmpty()) {
+ // catalog
+ return List.of(catalogEntity);
+ } else if (Strings.isNullOrEmpty(targetName)) {
+ // namespace
+ var resolvedTargetEntity = resolvedEntityView.getResolvedPath(namespace);
+ if (resolvedTargetEntity == null) {
+ throw new NoSuchNamespaceException("Namespace does not exist: %s",
namespace);
+ }
+ return resolvedTargetEntity.getRawFullPath();
+ } else {
+ // table
+ var tableIdentifier = TableIdentifier.of(namespace, targetName);
+ // only Iceberg tables are supported
+ var resolvedTableEntity =
+ resolvedEntityView.getResolvedPath(
+ tableIdentifier, PolarisEntityType.TABLE_LIKE,
PolarisEntitySubType.ICEBERG_TABLE);
+ if (resolvedTableEntity == null) {
+ throw new NoSuchTableException("Iceberg Table does not exist: %s",
tableIdentifier);
+ }
+ return resolvedTableEntity.getRawFullPath();
+ }
+ }
+
+ private String getIdentifier(PolicyAttachmentTarget target) {
+ String identifier = catalogEntity.getName();
+ // If the target is not of type CATALOG, append the additional path
segments.
+ if (target.getType() != CATALOG) {
+ identifier += "." + String.join(".", target.getPath());
+ }
+ return identifier;
+ }
+
+ private PolarisResolvedPathWrapper getResolvedPathWrapper(PolicyIdentifier
policyIdentifier) {
+ var resolvedEntities =
+ resolvedEntityView.getPassthroughResolvedPath(
+ policyIdentifier, PolarisEntityType.POLICY,
PolarisEntitySubType.NULL_SUBTYPE);
+ if (resolvedEntities == null || resolvedEntities.getResolvedLeafEntity()
== null) {
+ throw new NoSuchPolicyException(String.format("Policy does not exist:
%s", policyIdentifier));
+ }
+ return resolvedEntities;
+ }
+
+ private PolarisResolvedPathWrapper getResolvedPathWrapper(
+ @Nonnull PolicyAttachmentTarget target) {
+ return switch (target.getType()) {
+ // get the current catalog entity, since policy cannot apply across
catalog at this moment
+ case CATALOG -> resolvedEntityView.getResolvedReferenceCatalogEntity();
+ case NAMESPACE -> {
+ var namespace = Namespace.of(target.getPath().toArray(new String[0]));
+ var resolvedTargetEntity =
resolvedEntityView.getResolvedPath(namespace);
+ if (resolvedTargetEntity == null) {
+ throw new NoSuchNamespaceException("Namespace does not exist: %s",
namespace);
+ }
+ yield resolvedTargetEntity;
+ }
+ case TABLE_LIKE -> {
+ var tableIdentifier = TableIdentifier.of(target.getPath().toArray(new
String[0]));
+ // only Iceberg tables are supported
+ var resolvedTableEntity =
+ resolvedEntityView.getResolvedPath(
+ tableIdentifier, PolarisEntityType.TABLE_LIKE,
PolarisEntitySubType.ICEBERG_TABLE);
+ if (resolvedTableEntity == null) {
+ throw new NoSuchTableException("Iceberg Table does not exist: %s",
tableIdentifier);
+ }
+ yield resolvedTableEntity;
+ }
+ default -> throw new IllegalArgumentException("Unsupported target type:
" + target.getType());
+ };
}
private static Policy constructPolicy(PolicyEntity policyEntity) {
diff --git
a/service/common/src/main/java/org/apache/polaris/service/exception/PolarisExceptionMapper.java
b/service/common/src/main/java/org/apache/polaris/service/exception/PolarisExceptionMapper.java
index 22b9e187d..57df2cbc5 100644
---
a/service/common/src/main/java/org/apache/polaris/service/exception/PolarisExceptionMapper.java
+++
b/service/common/src/main/java/org/apache/polaris/service/exception/PolarisExceptionMapper.java
@@ -25,7 +25,9 @@ import jakarta.ws.rs.ext.Provider;
import org.apache.iceberg.rest.responses.ErrorResponse;
import org.apache.polaris.core.exceptions.AlreadyExistsException;
import org.apache.polaris.core.exceptions.PolarisException;
+import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException;
import org.apache.polaris.core.policy.exceptions.NoSuchPolicyException;
+import org.apache.polaris.core.policy.exceptions.PolicyAttachException;
import
org.apache.polaris.core.policy.exceptions.PolicyVersionMismatchException;
import org.apache.polaris.core.policy.validator.InvalidPolicyException;
import org.apache.polaris.service.context.UnresolvableRealmContextException;
@@ -49,10 +51,14 @@ public class PolarisExceptionMapper implements
ExceptionMapper<PolarisException>
return Response.Status.NOT_FOUND;
} else if (exception instanceof InvalidPolicyException) {
return Response.Status.BAD_REQUEST;
+ } else if (exception instanceof PolicyAttachException) {
+ return Response.Status.BAD_REQUEST;
} else if (exception instanceof NoSuchPolicyException) {
return Response.Status.NOT_FOUND;
} else if (exception instanceof PolicyVersionMismatchException) {
return Response.Status.CONFLICT;
+ } else if (exception instanceof PolicyMappingAlreadyExistsException) {
+ return Response.Status.CONFLICT;
} else {
return Response.Status.INTERNAL_SERVER_ERROR;
}