This is an automated email from the ASF dual-hosted git repository.

yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new c39c26325 Core/Service: Implement PolicyCatalog Stage 2: 
detach/attach/getApplicablePolicies (#1314)
c39c26325 is described below

commit c39c2632599956d2a3a0224a9be9e228b161f380
Author: Yufei Gu <[email protected]>
AuthorDate: Wed Apr 9 13:14:09 2025 -0700

    Core/Service: Implement PolicyCatalog Stage 2: 
detach/attach/getApplicablePolicies (#1314)
---
 .../PolicyMappingAlreadyExistsException.java       |   9 +-
 .../exceptions/PolicyAttachException.java}         |  27 +-
 .../core/policy/validator/PolicyValidators.java    |   9 +
 .../service/quarkus/catalog/PolicyCatalogTest.java | 259 +++++++++++++-----
 .../service/catalog/policy/PolicyCatalog.java      | 290 ++++++++++++++++++---
 .../service/exception/PolarisExceptionMapper.java  |   6 +
 6 files changed, 481 insertions(+), 119 deletions(-)

diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolicyMappingAlreadyExistsException.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolicyMappingAlreadyExistsException.java
index 2cd714f25..a31e500b0 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolicyMappingAlreadyExistsException.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolicyMappingAlreadyExistsException.java
@@ -18,13 +18,15 @@
  */
 package org.apache.polaris.core.persistence;
 
+import com.google.errorprone.annotations.FormatMethod;
+import org.apache.polaris.core.exceptions.PolarisException;
 import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
 
 /**
  * Exception raised when an existing policy mapping preveents the attempted 
creation of a new policy
  * mapping record.
  */
-public class PolicyMappingAlreadyExistsException extends RuntimeException {
+public class PolicyMappingAlreadyExistsException extends PolarisException {
   private PolarisPolicyMappingRecord existingRecord;
 
   /**
@@ -35,6 +37,11 @@ public class PolicyMappingAlreadyExistsException extends 
RuntimeException {
     this.existingRecord = existingRecord;
   }
 
+  @FormatMethod
+  public PolicyMappingAlreadyExistsException(String message, Object... arg) {
+    super(String.format(message, arg));
+  }
+
   public PolarisPolicyMappingRecord getExistingRecord() {
     return this.existingRecord;
   }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolicyMappingAlreadyExistsException.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/policy/exceptions/PolicyAttachException.java
similarity index 52%
copy from 
polaris-core/src/main/java/org/apache/polaris/core/persistence/PolicyMappingAlreadyExistsException.java
copy to 
polaris-core/src/main/java/org/apache/polaris/core/policy/exceptions/PolicyAttachException.java
index 2cd714f25..e47d978d0 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/persistence/PolicyMappingAlreadyExistsException.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/policy/exceptions/PolicyAttachException.java
@@ -16,26 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.polaris.core.persistence;
+package org.apache.polaris.core.policy.exceptions;
 
-import org.apache.polaris.core.policy.PolarisPolicyMappingRecord;
+import com.google.errorprone.annotations.FormatMethod;
+import org.apache.polaris.core.exceptions.PolarisException;
 
-/**
- * Exception raised when an existing policy mapping preveents the attempted 
creation of a new policy
- * mapping record.
- */
-public class PolicyMappingAlreadyExistsException extends RuntimeException {
-  private PolarisPolicyMappingRecord existingRecord;
-
-  /**
-   * @param existingRecord The conflicting record that caused creation to fail.
-   */
-  public PolicyMappingAlreadyExistsException(PolarisPolicyMappingRecord 
existingRecord) {
-    super("Existing Policy Mapping Record: " + existingRecord);
-    this.existingRecord = existingRecord;
+public class PolicyAttachException extends PolarisException {
+  @FormatMethod
+  public PolicyAttachException(String message, Object... args) {
+    super(String.format(message, args));
   }
 
-  public PolarisPolicyMappingRecord getExistingRecord() {
-    return this.existingRecord;
+  @FormatMethod
+  public PolicyAttachException(Throwable cause, String message, Object... 
args) {
+    super(String.format(message, args), cause);
   }
 }
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/policy/validator/PolicyValidators.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/policy/validator/PolicyValidators.java
index 3ccb2f6d2..4e1b4e288 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/policy/validator/PolicyValidators.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/policy/validator/PolicyValidators.java
@@ -26,6 +26,7 @@ import 
org.apache.polaris.core.policy.content.maintenance.DataCompactionPolicyCo
 import 
org.apache.polaris.core.policy.content.maintenance.MetadataCompactionPolicyContent;
 import 
org.apache.polaris.core.policy.content.maintenance.OrphanFileRemovalPolicyContent;
 import 
org.apache.polaris.core.policy.content.maintenance.SnapshotRetentionPolicyContent;
+import org.apache.polaris.core.policy.exceptions.PolicyAttachException;
 import 
org.apache.polaris.core.policy.validator.maintenance.BaseMaintenancePolicyValidator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -102,4 +103,12 @@ public class PolicyValidators {
         return false;
     }
   }
+
+  public static void validateAttach(PolicyEntity policy, PolarisEntity 
targetEntity) {
+    if (!canAttach(policy, targetEntity)) {
+      throw new PolicyAttachException(
+          "Cannot attach policy '%s' to target entity '%s'",
+          policy.getName(), targetEntity.getName());
+    }
+  }
 }
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java
index 83bdecaaa..9e4d4b38a 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java
@@ -18,6 +18,13 @@
  */
 package org.apache.polaris.service.quarkus.catalog;
 
+import static org.apache.iceberg.types.Types.NestedField.required;
+import static 
org.apache.polaris.core.policy.PredefinedPolicyTypes.DATA_COMPACTION;
+import static 
org.apache.polaris.core.policy.PredefinedPolicyTypes.METADATA_COMPACTION;
+import static 
org.apache.polaris.core.policy.PredefinedPolicyTypes.ORPHAN_FILE_REMOVAL;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.when;
 
@@ -37,9 +44,11 @@ import java.util.Set;
 import java.util.function.Supplier;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.Schema;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.types.Types;
 import org.apache.polaris.core.PolarisCallContext;
 import org.apache.polaris.core.PolarisDiagnostics;
 import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
@@ -58,6 +67,7 @@ import org.apache.polaris.core.entity.PrincipalEntity;
 import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
 import org.apache.polaris.core.persistence.PolarisEntityManager;
 import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException;
 import org.apache.polaris.core.persistence.bootstrap.RootCredentialsSet;
 import org.apache.polaris.core.persistence.cache.EntityCache;
 import org.apache.polaris.core.persistence.dao.entity.BaseResult;
@@ -82,8 +92,8 @@ import 
org.apache.polaris.service.config.RealmEntityManagerFactory;
 import 
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
 import org.apache.polaris.service.task.TaskExecutor;
 import org.apache.polaris.service.types.Policy;
+import org.apache.polaris.service.types.PolicyAttachmentTarget;
 import org.apache.polaris.service.types.PolicyIdentifier;
-import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
@@ -112,18 +122,26 @@ public class PolicyCatalogTest {
     }
   }
 
-  protected static final Namespace NS = Namespace.of("newdb");
-  protected static final TableIdentifier TABLE = TableIdentifier.of(NS, 
"table");
-  public static final String CATALOG_NAME = "polaris-catalog";
-  public static final String TEST_ACCESS_KEY = "test_access_key";
-  public static final String SECRET_ACCESS_KEY = "secret_access_key";
-  public static final String SESSION_TOKEN = "session_token";
-
-  private static final Namespace NS1 = Namespace.of("ns1");
-  private static final PolicyIdentifier POLICY1 = new PolicyIdentifier(NS1, 
"p1");
-  private static final PolicyIdentifier POLICY2 = new PolicyIdentifier(NS1, 
"p2");
-  private static final PolicyIdentifier POLICY3 = new PolicyIdentifier(NS1, 
"p3");
-  private static final PolicyIdentifier POLICY4 = new PolicyIdentifier(NS1, 
"p4");
+  private static final Namespace NS = Namespace.of("ns1");
+  private static final TableIdentifier TABLE = TableIdentifier.of(NS, "table");
+  private static final String CATALOG_NAME = "polaris-catalog";
+  private static final String TEST_ACCESS_KEY = "test_access_key";
+  private static final String SECRET_ACCESS_KEY = "secret_access_key";
+  private static final String SESSION_TOKEN = "session_token";
+  private static final Schema SCHEMA =
+      new Schema(
+          required(3, "id", Types.IntegerType.get(), "unique ID"),
+          required(4, "data", Types.StringType.get()));
+
+  private static final PolicyIdentifier POLICY1 = new PolicyIdentifier(NS, 
"p1");
+  private static final PolicyIdentifier POLICY2 = new PolicyIdentifier(NS, 
"p2");
+  private static final PolicyIdentifier POLICY3 = new PolicyIdentifier(NS, 
"p3");
+  private static final PolicyIdentifier POLICY4 = new PolicyIdentifier(NS, 
"p4");
+  private static final PolicyAttachmentTarget POLICY_ATTACH_TARGET_NS =
+      new PolicyAttachmentTarget(PolicyAttachmentTarget.TypeEnum.NAMESPACE, 
List.of(NS.levels()));
+  private static final PolicyAttachmentTarget POLICY_ATTACH_TARGET_TBL =
+      new PolicyAttachmentTarget(
+          PolicyAttachmentTarget.TypeEnum.TABLE_LIKE, 
List.of(TABLE.toString().split("\\.")));
 
   @Inject MetaStoreManagerFactory managerFactory;
   @Inject PolarisConfigurationStore configurationStore;
@@ -304,8 +322,8 @@ public class PolicyCatalogTest {
 
   @Test
   public void testCreatePolicyDoesNotThrow() {
-    icebergCatalog.createNamespace(NS1);
-    Assertions.assertThatCode(
+    icebergCatalog.createNamespace(NS);
+    assertThatCode(
             () ->
                 policyCatalog.createPolicy(
                     POLICY1,
@@ -317,11 +335,10 @@ public class PolicyCatalogTest {
 
   @Test
   public void testCreatePolicyAlreadyExists() {
-    icebergCatalog.createNamespace(NS1);
+    icebergCatalog.createNamespace(NS);
     policyCatalog.createPolicy(
         POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", 
"{\"enable\": false}");
-
-    Assertions.assertThatThrownBy(
+    assertThatThrownBy(
             () ->
                 policyCatalog.createPolicy(
                     POLICY1,
@@ -330,7 +347,7 @@ public class PolicyCatalogTest {
                     "{\"enable\": false}"))
         .isInstanceOf(AlreadyExistsException.class);
 
-    Assertions.assertThatThrownBy(
+    assertThatThrownBy(
             () ->
                 policyCatalog.createPolicy(
                     POLICY1,
@@ -342,7 +359,7 @@ public class PolicyCatalogTest {
 
   @Test
   public void testListPolicies() {
-    icebergCatalog.createNamespace(NS1);
+    icebergCatalog.createNamespace(NS);
     policyCatalog.createPolicy(
         POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", 
"{\"enable\": false}");
 
@@ -356,19 +373,16 @@ public class PolicyCatalogTest {
         POLICY3, PredefinedPolicyTypes.SNAPSHOT_RETENTION.getName(), "test", 
"{\"enable\": false}");
 
     policyCatalog.createPolicy(
-        POLICY4,
-        PredefinedPolicyTypes.ORPHAN_FILE_REMOVAL.getName(),
-        "test",
-        "{\"enable\": false}");
+        POLICY4, ORPHAN_FILE_REMOVAL.getName(), "test", "{\"enable\": false}");
 
-    List<PolicyIdentifier> listResult = policyCatalog.listPolicies(NS1, null);
-    Assertions.assertThat(listResult).hasSize(4);
-    Assertions.assertThat(listResult).containsExactlyInAnyOrder(POLICY1, 
POLICY2, POLICY3, POLICY4);
+    List<PolicyIdentifier> listResult = policyCatalog.listPolicies(NS, null);
+    assertThat(listResult).hasSize(4);
+    assertThat(listResult).containsExactlyInAnyOrder(POLICY1, POLICY2, 
POLICY3, POLICY4);
   }
 
   @Test
   public void testListPoliciesFilterByPolicyType() {
-    icebergCatalog.createNamespace(NS1);
+    icebergCatalog.createNamespace(NS);
     policyCatalog.createPolicy(
         POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", 
"{\"enable\": false}");
 
@@ -382,37 +396,32 @@ public class PolicyCatalogTest {
         POLICY3, PredefinedPolicyTypes.SNAPSHOT_RETENTION.getName(), "test", 
"{\"enable\": false}");
 
     policyCatalog.createPolicy(
-        POLICY4,
-        PredefinedPolicyTypes.ORPHAN_FILE_REMOVAL.getName(),
-        "test",
-        "{\"enable\": false}");
+        POLICY4, ORPHAN_FILE_REMOVAL.getName(), "test", "{\"enable\": false}");
 
-    List<PolicyIdentifier> listResult =
-        policyCatalog.listPolicies(NS1, 
PredefinedPolicyTypes.ORPHAN_FILE_REMOVAL);
-    Assertions.assertThat(listResult).hasSize(1);
-    Assertions.assertThat(listResult).containsExactlyInAnyOrder(POLICY4);
+    List<PolicyIdentifier> listResult = policyCatalog.listPolicies(NS, 
ORPHAN_FILE_REMOVAL);
+    assertThat(listResult).hasSize(1);
+    assertThat(listResult).containsExactlyInAnyOrder(POLICY4);
   }
 
   @Test
   public void testLoadPolicy() {
-    icebergCatalog.createNamespace(NS1);
+    icebergCatalog.createNamespace(NS);
     policyCatalog.createPolicy(
         POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", 
"{\"enable\": false}");
 
     Policy policy = policyCatalog.loadPolicy(POLICY1);
-    Assertions.assertThat(policy.getVersion()).isEqualTo(0);
-    Assertions.assertThat(policy.getPolicyType())
-        .isEqualTo(PredefinedPolicyTypes.DATA_COMPACTION.getName());
-    Assertions.assertThat(policy.getContent()).isEqualTo("{\"enable\": 
false}");
-    Assertions.assertThat(policy.getName()).isEqualTo("p1");
-    Assertions.assertThat(policy.getDescription()).isEqualTo("test");
+    assertThat(policy.getVersion()).isEqualTo(0);
+    
assertThat(policy.getPolicyType()).isEqualTo(PredefinedPolicyTypes.DATA_COMPACTION.getName());
+    assertThat(policy.getContent()).isEqualTo("{\"enable\": false}");
+    assertThat(policy.getName()).isEqualTo("p1");
+    assertThat(policy.getDescription()).isEqualTo("test");
   }
 
   @Test
   public void testCreatePolicyWithInvalidContent() {
-    icebergCatalog.createNamespace(NS1);
+    icebergCatalog.createNamespace(NS);
 
-    Assertions.assertThatThrownBy(
+    assertThatThrownBy(
             () ->
                 policyCatalog.createPolicy(
                     POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), 
"test", "invalid"))
@@ -421,76 +430,192 @@ public class PolicyCatalogTest {
 
   @Test
   public void testLoadPolicyNotExist() {
-    icebergCatalog.createNamespace(NS1);
+    icebergCatalog.createNamespace(NS);
 
-    Assertions.assertThatThrownBy(() -> policyCatalog.loadPolicy(POLICY1))
+    assertThatThrownBy(() -> policyCatalog.loadPolicy(POLICY1))
         .isInstanceOf(NoSuchPolicyException.class);
   }
 
   @Test
   public void testUpdatePolicy() {
-    icebergCatalog.createNamespace(NS1);
+    icebergCatalog.createNamespace(NS);
     policyCatalog.createPolicy(
         POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", 
"{\"enable\": false}");
-
     policyCatalog.updatePolicy(POLICY1, "updated", "{\"enable\": true}", 0);
 
     Policy policy = policyCatalog.loadPolicy(POLICY1);
-    Assertions.assertThat(policy.getVersion()).isEqualTo(1);
-    Assertions.assertThat(policy.getPolicyType())
-        .isEqualTo(PredefinedPolicyTypes.DATA_COMPACTION.getName());
-    Assertions.assertThat(policy.getContent()).isEqualTo("{\"enable\": true}");
-    Assertions.assertThat(policy.getName()).isEqualTo("p1");
-    Assertions.assertThat(policy.getDescription()).isEqualTo("updated");
+    assertThat(policy.getVersion()).isEqualTo(1);
+    
assertThat(policy.getPolicyType()).isEqualTo(PredefinedPolicyTypes.DATA_COMPACTION.getName());
+    assertThat(policy.getContent()).isEqualTo("{\"enable\": true}");
+    assertThat(policy.getName()).isEqualTo("p1");
+    assertThat(policy.getDescription()).isEqualTo("updated");
   }
 
   @Test
   public void testUpdatePolicyWithWrongVersion() {
-    icebergCatalog.createNamespace(NS1);
+    icebergCatalog.createNamespace(NS);
     policyCatalog.createPolicy(
         POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", 
"{\"enable\": false}");
 
-    Assertions.assertThatThrownBy(
+    assertThatThrownBy(
             () -> policyCatalog.updatePolicy(POLICY1, "updated", "{\"enable\": 
true}", 1))
         .isInstanceOf(PolicyVersionMismatchException.class);
   }
 
   @Test
   public void testUpdatePolicyWithInvalidContent() {
-    icebergCatalog.createNamespace(NS1);
+    icebergCatalog.createNamespace(NS);
     policyCatalog.createPolicy(
         POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", 
"{\"enable\": false}");
 
-    Assertions.assertThatThrownBy(
-            () -> policyCatalog.updatePolicy(POLICY1, "updated", "invalid", 0))
+    assertThatThrownBy(() -> policyCatalog.updatePolicy(POLICY1, "updated", 
"invalid", 0))
         .isInstanceOf(InvalidPolicyException.class);
   }
 
   @Test
   public void testUpdatePolicyNotExist() {
-    icebergCatalog.createNamespace(NS1);
+    icebergCatalog.createNamespace(NS);
 
-    Assertions.assertThatThrownBy(
+    assertThatThrownBy(
             () -> policyCatalog.updatePolicy(POLICY1, "updated", "{\"enable\": 
true}", 0))
         .isInstanceOf(NoSuchPolicyException.class);
   }
 
   @Test
   public void testDropPolicy() {
-    icebergCatalog.createNamespace(NS1);
+    icebergCatalog.createNamespace(NS);
     policyCatalog.createPolicy(
         POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", 
"{\"enable\": false}");
 
-    Assertions.assertThat(policyCatalog.dropPolicy(POLICY1, false)).isTrue();
-    Assertions.assertThatThrownBy(() -> policyCatalog.loadPolicy(POLICY1))
+    policyCatalog.dropPolicy(POLICY1, false);
+    assertThatThrownBy(() -> policyCatalog.loadPolicy(POLICY1))
         .isInstanceOf(NoSuchPolicyException.class);
   }
 
   @Test
   public void testDropPolicyNotExist() {
-    icebergCatalog.createNamespace(NS1);
+    icebergCatalog.createNamespace(NS);
 
-    Assertions.assertThatThrownBy(() -> policyCatalog.dropPolicy(POLICY1, 
false))
+    assertThatThrownBy(() -> policyCatalog.dropPolicy(POLICY1, false))
         .isInstanceOf(NoSuchPolicyException.class);
   }
+
+  @Test
+  public void testAttachPolicy() {
+    icebergCatalog.createNamespace(NS);
+    policyCatalog.createPolicy(POLICY1, DATA_COMPACTION.getName(), "test", 
"{\"enable\": false}");
+
+    var target = new 
PolicyAttachmentTarget(PolicyAttachmentTarget.TypeEnum.CATALOG, List.of());
+    policyCatalog.attachPolicy(POLICY1, target, null);
+    assertThat(policyCatalog.getApplicablePolicies(null, null, 
null).size()).isEqualTo(1);
+  }
+
+  @Test
+  public void testAttachPolicyConflict() {
+    icebergCatalog.createNamespace(NS);
+    policyCatalog.createPolicy(POLICY1, DATA_COMPACTION.getName(), "test", 
"{\"enable\": false}");
+    policyCatalog.createPolicy(POLICY2, DATA_COMPACTION.getName(), "test", 
"{\"enable\": true}");
+
+    var target = new 
PolicyAttachmentTarget(PolicyAttachmentTarget.TypeEnum.CATALOG, List.of());
+    policyCatalog.attachPolicy(POLICY1, target, null);
+    // Attempt to attach a conflicting second policy and expect an exception
+    assertThatThrownBy(() -> policyCatalog.attachPolicy(POLICY2, target, null))
+        .isInstanceOf(PolicyMappingAlreadyExistsException.class)
+        .hasMessage(
+            String.format(
+                "The policy mapping of same type (%s) for %s already exists",
+                DATA_COMPACTION.getName(), CATALOG_NAME));
+  }
+
+  @Test
+  public void testDetachPolicy() {
+    icebergCatalog.createNamespace(NS);
+    policyCatalog.createPolicy(POLICY1, DATA_COMPACTION.getName(), "test", 
"{\"enable\": false}");
+
+    policyCatalog.attachPolicy(POLICY1, POLICY_ATTACH_TARGET_NS, null);
+    assertThat(policyCatalog.getApplicablePolicies(NS, null, 
null).size()).isEqualTo(1);
+    policyCatalog.detachPolicy(POLICY1, POLICY_ATTACH_TARGET_NS);
+    assertThat(policyCatalog.getApplicablePolicies(NS, null, 
null).size()).isEqualTo(0);
+  }
+
+  @Test
+  public void testPolicyOverwrite() {
+    icebergCatalog.createNamespace(NS);
+    policyCatalog.createPolicy(POLICY1, DATA_COMPACTION.getName(), "test", 
"{\"enable\": false}");
+    policyCatalog.createPolicy(POLICY2, DATA_COMPACTION.getName(), "test", 
"{\"enable\": true}");
+
+    // attach to catalog
+    var target = new 
PolicyAttachmentTarget(PolicyAttachmentTarget.TypeEnum.CATALOG, List.of());
+    policyCatalog.attachPolicy(POLICY1, target, null);
+
+    // attach to namespace
+    policyCatalog.attachPolicy(POLICY2, POLICY_ATTACH_TARGET_NS, null);
+    var applicablePolicies = policyCatalog.getApplicablePolicies(NS, null, 
null);
+    assertThat(applicablePolicies.size()).isEqualTo(1);
+    assertThat(applicablePolicies.getFirst().getName())
+        .isEqualTo(POLICY2.getName())
+        .as("Namespace level policy overwrite the catalog level policy with 
the same type");
+  }
+
+  @Test
+  public void testPolicyInheritance() {
+    icebergCatalog.createNamespace(NS);
+    var p1 =
+        policyCatalog.createPolicy(
+            POLICY1, METADATA_COMPACTION.getName(), "test", "{\"enable\": 
false}");
+    var p2 =
+        policyCatalog.createPolicy(
+            POLICY2, DATA_COMPACTION.getName(), "test", "{\"enable\": true}");
+
+    // attach a policy to catalog
+    var target = new 
PolicyAttachmentTarget(PolicyAttachmentTarget.TypeEnum.CATALOG, List.of());
+    policyCatalog.attachPolicy(POLICY1, target, null);
+
+    // attach a different type of policy to namespace
+    policyCatalog.attachPolicy(POLICY2, POLICY_ATTACH_TARGET_NS, null);
+    var applicablePolicies = policyCatalog.getApplicablePolicies(NS, null, 
null);
+    assertThat(applicablePolicies.size()).isEqualTo(2);
+    assertThat(applicablePolicies.contains(p1)).isTrue();
+    assertThat(applicablePolicies.contains(p2)).isTrue();
+
+    // attach policies to a table
+    icebergCatalog.createTable(TABLE, SCHEMA);
+    applicablePolicies = policyCatalog.getApplicablePolicies(NS, TABLE.name(), 
null);
+    assertThat(applicablePolicies.size()).isEqualTo(2);
+    // attach a third type of policy to a table
+    policyCatalog.createPolicy(
+        POLICY3, ORPHAN_FILE_REMOVAL.getName(), "test", "{\"enable\": false}");
+    policyCatalog.attachPolicy(POLICY3, POLICY_ATTACH_TARGET_TBL, null);
+    applicablePolicies = policyCatalog.getApplicablePolicies(NS, TABLE.name(), 
null);
+    assertThat(applicablePolicies.size()).isEqualTo(3);
+    // create policy 4 with one of types from its parent
+    var p4 =
+        policyCatalog.createPolicy(
+            POLICY4, DATA_COMPACTION.getName(), "test", "{\"enable\": false}");
+    policyCatalog.attachPolicy(POLICY4, POLICY_ATTACH_TARGET_TBL, null);
+    applicablePolicies = policyCatalog.getApplicablePolicies(NS, TABLE.name(), 
null);
+    // p2 should be overwritten by p4, as they are the same type
+    assertThat(applicablePolicies.contains(p4)).isTrue();
+    assertThat(applicablePolicies.contains(p2)).isFalse();
+  }
+
+  @Test
+  public void testGetApplicablePoliciesFilterOnType() {
+    icebergCatalog.createNamespace(NS);
+    policyCatalog.createPolicy(
+        POLICY1, METADATA_COMPACTION.getName(), "test", "{\"enable\": false}");
+    var p2 =
+        policyCatalog.createPolicy(
+            POLICY2, DATA_COMPACTION.getName(), "test", "{\"enable\": true}");
+
+    // attach a policy to catalog
+    var target = new 
PolicyAttachmentTarget(PolicyAttachmentTarget.TypeEnum.CATALOG, List.of());
+    policyCatalog.attachPolicy(POLICY1, target, null);
+
+    // attach a different type of policy to namespace
+    policyCatalog.attachPolicy(POLICY2, POLICY_ATTACH_TARGET_NS, null);
+    var applicablePolicies = policyCatalog.getApplicablePolicies(NS, null, 
DATA_COMPACTION);
+    // only p2 is with the type fetched
+    assertThat(applicablePolicies.contains(p2)).isTrue();
+  }
 }
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
index a4fb23da7..718b4ef9e 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
@@ -18,12 +18,22 @@
  */
 package org.apache.polaris.service.catalog.policy;
 
+import static 
org.apache.polaris.core.persistence.dao.entity.BaseResult.ReturnStatus.POLICY_MAPPING_OF_SAME_TYPE_ALREADY_EXISTS;
+import static 
org.apache.polaris.service.types.PolicyAttachmentTarget.TypeEnum.CATALOG;
+
+import com.google.common.base.Strings;
+import jakarta.annotation.Nonnull;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.BadRequestException;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.polaris.core.context.CallContext;
 import org.apache.polaris.core.entity.CatalogEntity;
 import org.apache.polaris.core.entity.PolarisEntity;
@@ -31,15 +41,18 @@ import org.apache.polaris.core.entity.PolarisEntitySubType;
 import org.apache.polaris.core.entity.PolarisEntityType;
 import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
 import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
-import org.apache.polaris.core.persistence.dao.entity.DropEntityResult;
+import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException;
 import org.apache.polaris.core.persistence.dao.entity.EntityResult;
+import org.apache.polaris.core.persistence.dao.entity.LoadPolicyMappingsResult;
 import 
org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView;
 import org.apache.polaris.core.policy.PolicyEntity;
 import org.apache.polaris.core.policy.PolicyType;
 import org.apache.polaris.core.policy.exceptions.NoSuchPolicyException;
+import org.apache.polaris.core.policy.exceptions.PolicyAttachException;
 import 
org.apache.polaris.core.policy.exceptions.PolicyVersionMismatchException;
 import org.apache.polaris.core.policy.validator.PolicyValidators;
 import org.apache.polaris.service.types.Policy;
+import org.apache.polaris.service.types.PolicyAttachmentTarget;
 import org.apache.polaris.service.types.PolicyIdentifier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -177,16 +190,8 @@ public class PolicyCatalog {
   }
 
   public Policy loadPolicy(PolicyIdentifier policyIdentifier) {
-    PolarisResolvedPathWrapper resolvedEntities =
-        resolvedEntityView.getPassthroughResolvedPath(
-            policyIdentifier, PolarisEntityType.POLICY, 
PolarisEntitySubType.NULL_SUBTYPE);
-
-    PolicyEntity policy =
-        PolicyEntity.of(resolvedEntities == null ? null : 
resolvedEntities.getRawLeafEntity());
-
-    if (policy == null) {
-      throw new NoSuchPolicyException(String.format("Policy does not exist: 
%s", policyIdentifier));
-    }
+    var resolvedPolicyPath = getResolvedPathWrapper(policyIdentifier);
+    var policy = PolicyEntity.of(resolvedPolicyPath.getRawLeafEntity());
     return constructPolicy(policy);
   }
 
@@ -195,16 +200,8 @@ public class PolicyCatalog {
       String newDescription,
       String newContent,
       int currentPolicyVersion) {
-    PolarisResolvedPathWrapper resolvedEntities =
-        resolvedEntityView.getPassthroughResolvedPath(
-            policyIdentifier, PolarisEntityType.POLICY, 
PolarisEntitySubType.NULL_SUBTYPE);
-
-    PolicyEntity policy =
-        PolicyEntity.of(resolvedEntities == null ? null : 
resolvedEntities.getRawLeafEntity());
-
-    if (policy == null) {
-      throw new NoSuchPolicyException(String.format("Policy does not exist: 
%s", policyIdentifier));
-    }
+    var resolvedPolicyPath = getResolvedPathWrapper(policyIdentifier);
+    var policy = PolicyEntity.of(resolvedPolicyPath.getRawLeafEntity());
 
     // Verify that the current version of the policy matches the version that 
the user is trying to
     // update
@@ -229,7 +226,7 @@ public class PolicyCatalog {
 
     PolicyValidators.validate(newPolicyEntity);
 
-    List<PolarisEntity> catalogPath = resolvedEntities.getRawParentPath();
+    List<PolarisEntity> catalogPath = resolvedPolicyPath.getRawParentPath();
     newPolicyEntity =
         Optional.ofNullable(
                 metaStoreManager
@@ -251,25 +248,250 @@ public class PolicyCatalog {
 
   public boolean dropPolicy(PolicyIdentifier policyIdentifier, boolean 
detachAll) {
     // TODO: Implement detachAll when we support attach/detach policy
-    PolarisResolvedPathWrapper resolvedEntities =
-        resolvedEntityView.getPassthroughResolvedPath(
-            policyIdentifier, PolarisEntityType.POLICY, 
PolarisEntitySubType.NULL_SUBTYPE);
-    if (resolvedEntities == null) {
-      throw new NoSuchPolicyException(String.format("Policy does not exist: 
%s", policyIdentifier));
-    }
-
-    List<PolarisEntity> catalogPath = resolvedEntities.getRawParentPath();
-    PolarisEntity leafEntity = resolvedEntities.getRawLeafEntity();
+    var resolvedPolicyPath = getResolvedPathWrapper(policyIdentifier);
+    var catalogPath = resolvedPolicyPath.getRawParentPath();
+    var policyEntity = resolvedPolicyPath.getRawLeafEntity();
 
-    DropEntityResult dropEntityResult =
+    var result =
         metaStoreManager.dropEntityIfExists(
             callContext.getPolarisCallContext(),
             PolarisEntity.toCoreList(catalogPath),
-            leafEntity,
+            policyEntity,
             Map.of(),
             false);
 
-    return dropEntityResult.isSuccess();
+    if (!result.isSuccess()) {
+      throw new IllegalStateException(
+          String.format(
+              "Failed to drop policy %s error status: %s with extraInfo: %s",
+              policyIdentifier, result.getReturnStatus(), 
result.getExtraInformation()));
+    }
+
+    return true;
+  }
+
+  public boolean attachPolicy(
+      PolicyIdentifier policyIdentifier,
+      PolicyAttachmentTarget target,
+      Map<String, String> parameters) {
+
+    var resolvedPolicyPath = getResolvedPathWrapper(policyIdentifier);
+    var policyCatalogPath = 
PolarisEntity.toCoreList(resolvedPolicyPath.getRawParentPath());
+    var policyEntity = PolicyEntity.of(resolvedPolicyPath.getRawLeafEntity());
+
+    var resolvedTargetPath = getResolvedPathWrapper(target);
+    var targetCatalogPath = 
PolarisEntity.toCoreList(resolvedTargetPath.getRawParentPath());
+    var targetEntity = resolvedTargetPath.getRawLeafEntity();
+
+    PolicyValidators.validateAttach(policyEntity, targetEntity);
+
+    var result =
+        metaStoreManager.attachPolicyToEntity(
+            callContext.getPolarisCallContext(),
+            targetCatalogPath,
+            targetEntity,
+            policyCatalogPath,
+            policyEntity,
+            parameters);
+
+    if (!result.isSuccess()) {
+      var targetId = getIdentifier(target);
+      if (result.getReturnStatus() == 
POLICY_MAPPING_OF_SAME_TYPE_ALREADY_EXISTS) {
+        throw new PolicyMappingAlreadyExistsException(
+            "The policy mapping of same type (%s) for %s already exists",
+            policyEntity.getPolicyType().getName(), targetId);
+      }
+
+      throw new PolicyAttachException(
+          "Failed to attach policy %s to %s: %s with extraInfo: %s",
+          policyIdentifier, targetId, result.getReturnStatus(), 
result.getExtraInformation());
+    }
+
+    return true;
+  }
+
+  public boolean detachPolicy(PolicyIdentifier policyIdentifier, 
PolicyAttachmentTarget target) {
+    var resolvedPolicyPath = getResolvedPathWrapper(policyIdentifier);
+    var policyCatalogPath = 
PolarisEntity.toCoreList(resolvedPolicyPath.getRawParentPath());
+    var policyEntity = PolicyEntity.of(resolvedPolicyPath.getRawLeafEntity());
+
+    var resolvedTargetPath = getResolvedPathWrapper(target);
+    var targetCatalogPath = 
PolarisEntity.toCoreList(resolvedTargetPath.getRawParentPath());
+    var targetEntity = resolvedTargetPath.getRawLeafEntity();
+
+    var result =
+        metaStoreManager.detachPolicyFromEntity(
+            callContext.getPolarisCallContext(),
+            targetCatalogPath,
+            targetEntity,
+            policyCatalogPath,
+            policyEntity);
+
+    if (!result.isSuccess()) {
+      throw new IllegalStateException(
+          String.format(
+              "Failed to detach policy %s from %s error status: %s with 
extraInfo: %s",
+              policyIdentifier,
+              getIdentifier(target),
+              result.getReturnStatus(),
+              result.getExtraInformation()));
+    }
+
+    return true;
+  }
+
+  public List<Policy> getApplicablePolicies(
+      Namespace namespace, String targetName, PolicyType policyType) {
+    var targetFullPath = getFullPath(namespace, targetName);
+    return getEffectivePolicies(targetFullPath, policyType);
+  }
+
+  /**
+   * Returns the effective policies for a given hierarchical path and policy 
type.
+   *
+   * <p>Potential Performance Improvements:
+   *
+   * <ul>
+   *   <li>Range Query Optimization: Enhance the query mechanism to fetch 
policies for all entities
+   *       in a single range query, reducing the number of individual queries 
against the mapping
+   *       table.
+   *   <li>Filtering on Inheritable: Improve the filtering process by applying 
the inheritable
+   *       condition at the data retrieval level, so that only the relevant 
policies for non-leaf
+   *       nodes are processed.
+   *   <li>Caching: Implement caching for up-level policies to avoid redundant 
calculations and
+   *       lookups, especially for frequently accessed paths.
+   * </ul>
+   *
+   * @param path the list of entities representing the hierarchical path
+   * @param policyType the type of policy to filter on
+   * @return a list of effective policies, combining inherited policies from 
upper levels and
+   *     non-inheritable policies from the final entity
+   */
+  private List<Policy> getEffectivePolicies(List<PolarisEntity> path, 
PolicyType policyType) {
+    if (path == null || path.isEmpty()) {
+      return List.of();
+    }
+
+    Map<String, PolicyEntity> inheritedPolicies = new LinkedHashMap<>();
+    // Final list of effective policies (inheritable + last-entity 
non-inheritable)
+    List<PolicyEntity> finalPolicies = new ArrayList<>();
+
+    // Process all entities except the last one
+    for (int i = 0; i < path.size() - 1; i++) {
+      PolarisEntity entity = path.get(i);
+      var currentPolicies = getPolicies(entity, policyType);
+
+      for (var policy : currentPolicies) {
+        // For non-last entities, we only carry forward inheritable policies
+        if (policy.getPolicyType().isInheritable()) {
+          // Put in map; overwrites by policyType if encountered again
+          inheritedPolicies.put(policy.getPolicyType().getName(), policy);
+        }
+      }
+    }
+
+    // Now handle the last entity's policies
+    List<PolicyEntity> lastPolicies = getPolicies(path.getLast(), policyType);
+
+    for (var policy : lastPolicies) {
+      if (policy.getPolicyType().isInheritable()) {
+        // Overwrite anything by the same policyType in the inherited map
+        inheritedPolicies.put(policy.getPolicyType().getName(), policy);
+      } else {
+        // Non-inheritable => goes directly to final list
+        finalPolicies.add(policy);
+      }
+    }
+
+    // Append all inherited policies at the end, preserving insertion order
+    finalPolicies.addAll(inheritedPolicies.values());
+
+    return finalPolicies.stream().map(PolicyCatalog::constructPolicy).toList();
+  }
+
+  private List<PolicyEntity> getPolicies(PolarisEntity target, PolicyType 
policyType) {
+    LoadPolicyMappingsResult result;
+    if (policyType == null) {
+      result = 
metaStoreManager.loadPoliciesOnEntity(callContext.getPolarisCallContext(), 
target);
+    } else {
+      result =
+          metaStoreManager.loadPoliciesOnEntityByType(
+              callContext.getPolarisCallContext(), target, policyType);
+    }
+
+    return result.getEntities().stream().map(PolicyEntity::of).toList();
+  }
+
+  private List<PolarisEntity> getFullPath(Namespace namespace, String 
targetName) {
+    if (namespace == null || namespace.isEmpty()) {
+      // catalog
+      return List.of(catalogEntity);
+    } else if (Strings.isNullOrEmpty(targetName)) {
+      // namespace
+      var resolvedTargetEntity = resolvedEntityView.getResolvedPath(namespace);
+      if (resolvedTargetEntity == null) {
+        throw new NoSuchNamespaceException("Namespace does not exist: %s", 
namespace);
+      }
+      return resolvedTargetEntity.getRawFullPath();
+    } else {
+      // table
+      var tableIdentifier = TableIdentifier.of(namespace, targetName);
+      // only Iceberg tables are supported
+      var resolvedTableEntity =
+          resolvedEntityView.getResolvedPath(
+              tableIdentifier, PolarisEntityType.TABLE_LIKE, 
PolarisEntitySubType.ICEBERG_TABLE);
+      if (resolvedTableEntity == null) {
+        throw new NoSuchTableException("Iceberg Table does not exist: %s", 
tableIdentifier);
+      }
+      return resolvedTableEntity.getRawFullPath();
+    }
+  }
+
+  private String getIdentifier(PolicyAttachmentTarget target) {
+    String identifier = catalogEntity.getName();
+    // If the target is not of type CATALOG, append the additional path 
segments.
+    if (target.getType() != CATALOG) {
+      identifier += "." + String.join(".", target.getPath());
+    }
+    return identifier;
+  }
+
+  private PolarisResolvedPathWrapper getResolvedPathWrapper(PolicyIdentifier 
policyIdentifier) {
+    var resolvedEntities =
+        resolvedEntityView.getPassthroughResolvedPath(
+            policyIdentifier, PolarisEntityType.POLICY, 
PolarisEntitySubType.NULL_SUBTYPE);
+    if (resolvedEntities == null || resolvedEntities.getResolvedLeafEntity() 
== null) {
+      throw new NoSuchPolicyException(String.format("Policy does not exist: 
%s", policyIdentifier));
+    }
+    return resolvedEntities;
+  }
+
+  private PolarisResolvedPathWrapper getResolvedPathWrapper(
+      @Nonnull PolicyAttachmentTarget target) {
+    return switch (target.getType()) {
+      // get the current catalog entity, since policy cannot apply across 
catalog at this moment
+      case CATALOG -> resolvedEntityView.getResolvedReferenceCatalogEntity();
+      case NAMESPACE -> {
+        var namespace = Namespace.of(target.getPath().toArray(new String[0]));
+        var resolvedTargetEntity = 
resolvedEntityView.getResolvedPath(namespace);
+        if (resolvedTargetEntity == null) {
+          throw new NoSuchNamespaceException("Namespace does not exist: %s", 
namespace);
+        }
+        yield resolvedTargetEntity;
+      }
+      case TABLE_LIKE -> {
+        var tableIdentifier = TableIdentifier.of(target.getPath().toArray(new 
String[0]));
+        // only Iceberg tables are supported
+        var resolvedTableEntity =
+            resolvedEntityView.getResolvedPath(
+                tableIdentifier, PolarisEntityType.TABLE_LIKE, 
PolarisEntitySubType.ICEBERG_TABLE);
+        if (resolvedTableEntity == null) {
+          throw new NoSuchTableException("Iceberg Table does not exist: %s", 
tableIdentifier);
+        }
+        yield resolvedTableEntity;
+      }
+      default -> throw new IllegalArgumentException("Unsupported target type: 
" + target.getType());
+    };
   }
 
   private static Policy constructPolicy(PolicyEntity policyEntity) {
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/exception/PolarisExceptionMapper.java
 
b/service/common/src/main/java/org/apache/polaris/service/exception/PolarisExceptionMapper.java
index 22b9e187d..57df2cbc5 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/exception/PolarisExceptionMapper.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/exception/PolarisExceptionMapper.java
@@ -25,7 +25,9 @@ import jakarta.ws.rs.ext.Provider;
 import org.apache.iceberg.rest.responses.ErrorResponse;
 import org.apache.polaris.core.exceptions.AlreadyExistsException;
 import org.apache.polaris.core.exceptions.PolarisException;
+import org.apache.polaris.core.persistence.PolicyMappingAlreadyExistsException;
 import org.apache.polaris.core.policy.exceptions.NoSuchPolicyException;
+import org.apache.polaris.core.policy.exceptions.PolicyAttachException;
 import 
org.apache.polaris.core.policy.exceptions.PolicyVersionMismatchException;
 import org.apache.polaris.core.policy.validator.InvalidPolicyException;
 import org.apache.polaris.service.context.UnresolvableRealmContextException;
@@ -49,10 +51,14 @@ public class PolarisExceptionMapper implements 
ExceptionMapper<PolarisException>
       return Response.Status.NOT_FOUND;
     } else if (exception instanceof InvalidPolicyException) {
       return Response.Status.BAD_REQUEST;
+    } else if (exception instanceof PolicyAttachException) {
+      return Response.Status.BAD_REQUEST;
     } else if (exception instanceof NoSuchPolicyException) {
       return Response.Status.NOT_FOUND;
     } else if (exception instanceof PolicyVersionMismatchException) {
       return Response.Status.CONFLICT;
+    } else if (exception instanceof PolicyMappingAlreadyExistsException) {
+      return Response.Status.CONFLICT;
     } else {
       return Response.Status.INTERNAL_SERVER_ERROR;
     }


Reply via email to