This is an automated email from the ASF dual-hosted git repository.

emaynard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 7f3150590 Implement PolicyCatalogHandler and Add Policy Privileges 
Stage 2: AttachPolicy + DetachPolicy (#1416)
7f3150590 is described below

commit 7f3150590db7c8517908d2b5495c93c58ce58d56
Author: Honah (Jonas) J. <[email protected]>
AuthorDate: Tue Apr 22 09:50:19 2025 -0700

    Implement PolicyCatalogHandler and Add Policy Privileges Stage 2: 
AttachPolicy + DetachPolicy (#1416)
    
    * add auth test for attach/detach
    
    * apply formatter
    
    * refactor authorizePolicyAttachmentOperation
    
    * address comment
    
    * better naming
---
 .../core/auth/PolarisAuthorizableOperation.java    |  18 +
 .../polaris/core/auth/PolarisAuthorizerImpl.java   |  22 ++
 .../polaris/core/entity/PolarisPrivilege.java      |   8 +
 .../quarkus/admin/PolarisAuthzTestBase.java        |  52 ++-
 .../catalog/PolicyCatalogHandlerAuthzTest.java     | 421 +++++++++++++++++++++
 .../catalog/policy/PolicyCatalogHandler.java       | 122 ++++++
 .../service/catalog/policy/PolicyCatalogUtils.java |  62 +++
 7 files changed, 689 insertions(+), 16 deletions(-)

diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/auth/PolarisAuthorizableOperation.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/auth/PolarisAuthorizableOperation.java
index 0957a4e9d..1eaea6c3f 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/auth/PolarisAuthorizableOperation.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/auth/PolarisAuthorizableOperation.java
@@ -18,7 +18,9 @@
  */
 package org.apache.polaris.core.auth;
 
+import static 
org.apache.polaris.core.entity.PolarisPrivilege.CATALOG_ATTACH_POLICY;
 import static org.apache.polaris.core.entity.PolarisPrivilege.CATALOG_CREATE;
+import static 
org.apache.polaris.core.entity.PolarisPrivilege.CATALOG_DETACH_POLICY;
 import static org.apache.polaris.core.entity.PolarisPrivilege.CATALOG_DROP;
 import static org.apache.polaris.core.entity.PolarisPrivilege.CATALOG_LIST;
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.CATALOG_LIST_GRANTS;
@@ -33,14 +35,18 @@ import static 
org.apache.polaris.core.entity.PolarisPrivilege.CATALOG_ROLE_MANAG
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.CATALOG_ROLE_READ_PROPERTIES;
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.CATALOG_ROLE_WRITE_PROPERTIES;
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.CATALOG_WRITE_PROPERTIES;
+import static 
org.apache.polaris.core.entity.PolarisPrivilege.NAMESPACE_ATTACH_POLICY;
 import static org.apache.polaris.core.entity.PolarisPrivilege.NAMESPACE_CREATE;
+import static 
org.apache.polaris.core.entity.PolarisPrivilege.NAMESPACE_DETACH_POLICY;
 import static org.apache.polaris.core.entity.PolarisPrivilege.NAMESPACE_DROP;
 import static org.apache.polaris.core.entity.PolarisPrivilege.NAMESPACE_LIST;
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.NAMESPACE_LIST_GRANTS;
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.NAMESPACE_MANAGE_GRANTS_ON_SECURABLE;
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.NAMESPACE_READ_PROPERTIES;
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.NAMESPACE_WRITE_PROPERTIES;
+import static org.apache.polaris.core.entity.PolarisPrivilege.POLICY_ATTACH;
 import static org.apache.polaris.core.entity.PolarisPrivilege.POLICY_CREATE;
+import static org.apache.polaris.core.entity.PolarisPrivilege.POLICY_DETACH;
 import static org.apache.polaris.core.entity.PolarisPrivilege.POLICY_DROP;
 import static org.apache.polaris.core.entity.PolarisPrivilege.POLICY_LIST;
 import static org.apache.polaris.core.entity.PolarisPrivilege.POLICY_READ;
@@ -64,7 +70,9 @@ import static 
org.apache.polaris.core.entity.PolarisPrivilege.PRINCIPAL_ROLE_WRI
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.PRINCIPAL_ROTATE_CREDENTIALS;
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.PRINCIPAL_WRITE_PROPERTIES;
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.SERVICE_MANAGE_ACCESS;
+import static 
org.apache.polaris.core.entity.PolarisPrivilege.TABLE_ATTACH_POLICY;
 import static org.apache.polaris.core.entity.PolarisPrivilege.TABLE_CREATE;
+import static 
org.apache.polaris.core.entity.PolarisPrivilege.TABLE_DETACH_POLICY;
 import static org.apache.polaris.core.entity.PolarisPrivilege.TABLE_DROP;
 import static org.apache.polaris.core.entity.PolarisPrivilege.TABLE_LIST;
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.TABLE_LIST_GRANTS;
@@ -192,6 +200,16 @@ public enum PolarisAuthorizableOperation {
   DROP_POLICY(POLICY_DROP),
   UPDATE_POLICY(POLICY_WRITE),
   LIST_POLICY(POLICY_LIST),
+  ATTACH_POLICY_TO_CATALOG(POLICY_ATTACH, CATALOG_ATTACH_POLICY),
+  ATTACH_POLICY_TO_NAMESPACE(POLICY_ATTACH, NAMESPACE_ATTACH_POLICY),
+  ATTACH_POLICY_TO_TABLE(POLICY_ATTACH, TABLE_ATTACH_POLICY),
+  DETACH_POLICY_FROM_CATALOG(POLICY_DETACH, CATALOG_DETACH_POLICY),
+  DETACH_POLICY_FROM_NAMESPACE(POLICY_DETACH, NAMESPACE_DETACH_POLICY),
+  DETACH_POLICY_FROM_TABLE(POLICY_DETACH, TABLE_DETACH_POLICY),
+  GET_APPLICABLE_POLICIES_ON_CATALOG(CATALOG_READ_PROPERTIES),
+  GET_APPLICABLE_POLICIES_ON_NAMESPACE(NAMESPACE_READ_PROPERTIES),
+  GET_APPLICABLE_POLICIES_ON_TABLE(TABLE_READ_PROPERTIES),
+  GET_APPLICABLE_POLICIES_ON_VIEW(VIEW_READ_PROPERTIES),
   ;
 
   private final EnumSet<PolarisPrivilege> privilegesOnTarget;
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/auth/PolarisAuthorizerImpl.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/auth/PolarisAuthorizerImpl.java
index abeefa2b6..ae606ff96 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/auth/PolarisAuthorizerImpl.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/auth/PolarisAuthorizerImpl.java
@@ -18,7 +18,9 @@
  */
 package org.apache.polaris.core.auth;
 
+import static 
org.apache.polaris.core.entity.PolarisPrivilege.CATALOG_ATTACH_POLICY;
 import static org.apache.polaris.core.entity.PolarisPrivilege.CATALOG_CREATE;
+import static 
org.apache.polaris.core.entity.PolarisPrivilege.CATALOG_DETACH_POLICY;
 import static org.apache.polaris.core.entity.PolarisPrivilege.CATALOG_DROP;
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.CATALOG_FULL_METADATA;
 import static org.apache.polaris.core.entity.PolarisPrivilege.CATALOG_LIST;
@@ -39,7 +41,9 @@ import static 
org.apache.polaris.core.entity.PolarisPrivilege.CATALOG_ROLE_READ_
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.CATALOG_ROLE_USAGE;
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.CATALOG_ROLE_WRITE_PROPERTIES;
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.CATALOG_WRITE_PROPERTIES;
+import static 
org.apache.polaris.core.entity.PolarisPrivilege.NAMESPACE_ATTACH_POLICY;
 import static org.apache.polaris.core.entity.PolarisPrivilege.NAMESPACE_CREATE;
+import static 
org.apache.polaris.core.entity.PolarisPrivilege.NAMESPACE_DETACH_POLICY;
 import static org.apache.polaris.core.entity.PolarisPrivilege.NAMESPACE_DROP;
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.NAMESPACE_FULL_METADATA;
 import static org.apache.polaris.core.entity.PolarisPrivilege.NAMESPACE_LIST;
@@ -47,7 +51,9 @@ import static 
org.apache.polaris.core.entity.PolarisPrivilege.NAMESPACE_LIST_GRA
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.NAMESPACE_MANAGE_GRANTS_ON_SECURABLE;
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.NAMESPACE_READ_PROPERTIES;
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.NAMESPACE_WRITE_PROPERTIES;
+import static org.apache.polaris.core.entity.PolarisPrivilege.POLICY_ATTACH;
 import static org.apache.polaris.core.entity.PolarisPrivilege.POLICY_CREATE;
+import static org.apache.polaris.core.entity.PolarisPrivilege.POLICY_DETACH;
 import static org.apache.polaris.core.entity.PolarisPrivilege.POLICY_DROP;
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.POLICY_FULL_METADATA;
 import static org.apache.polaris.core.entity.PolarisPrivilege.POLICY_LIST;
@@ -75,7 +81,9 @@ import static 
org.apache.polaris.core.entity.PolarisPrivilege.PRINCIPAL_ROLE_WRI
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.PRINCIPAL_ROTATE_CREDENTIALS;
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.PRINCIPAL_WRITE_PROPERTIES;
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.SERVICE_MANAGE_ACCESS;
+import static 
org.apache.polaris.core.entity.PolarisPrivilege.TABLE_ATTACH_POLICY;
 import static org.apache.polaris.core.entity.PolarisPrivilege.TABLE_CREATE;
+import static 
org.apache.polaris.core.entity.PolarisPrivilege.TABLE_DETACH_POLICY;
 import static org.apache.polaris.core.entity.PolarisPrivilege.TABLE_DROP;
 import static 
org.apache.polaris.core.entity.PolarisPrivilege.TABLE_FULL_METADATA;
 import static org.apache.polaris.core.entity.PolarisPrivilege.TABLE_LIST;
@@ -495,6 +503,20 @@ public class PolarisAuthorizerImpl implements 
PolarisAuthorizer {
             POLICY_FULL_METADATA,
             CATALOG_MANAGE_METADATA,
             CATALOG_MANAGE_CONTENT));
+    SUPER_PRIVILEGES.putAll(POLICY_ATTACH, List.of(POLICY_ATTACH, 
CATALOG_MANAGE_CONTENT));
+    SUPER_PRIVILEGES.putAll(POLICY_DETACH, List.of(POLICY_DETACH, 
CATALOG_MANAGE_CONTENT));
+    SUPER_PRIVILEGES.putAll(
+        CATALOG_ATTACH_POLICY, List.of(CATALOG_ATTACH_POLICY, 
CATALOG_MANAGE_CONTENT));
+    SUPER_PRIVILEGES.putAll(
+        NAMESPACE_ATTACH_POLICY, List.of(NAMESPACE_ATTACH_POLICY, 
CATALOG_MANAGE_CONTENT));
+    SUPER_PRIVILEGES.putAll(
+        TABLE_ATTACH_POLICY, List.of(TABLE_ATTACH_POLICY, 
CATALOG_MANAGE_CONTENT));
+    SUPER_PRIVILEGES.putAll(
+        CATALOG_DETACH_POLICY, List.of(CATALOG_DETACH_POLICY, 
CATALOG_MANAGE_CONTENT));
+    SUPER_PRIVILEGES.putAll(
+        NAMESPACE_DETACH_POLICY, List.of(NAMESPACE_DETACH_POLICY, 
CATALOG_MANAGE_CONTENT));
+    SUPER_PRIVILEGES.putAll(
+        TABLE_DETACH_POLICY, List.of(TABLE_DETACH_POLICY, 
CATALOG_MANAGE_CONTENT));
   }
 
   private final PolarisConfigurationStore featureConfig;
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisPrivilege.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisPrivilege.java
index 03585790b..71b7b0df8 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisPrivilege.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/entity/PolarisPrivilege.java
@@ -142,6 +142,14 @@ public enum PolarisPrivilege {
   POLICY_WRITE(73, PolarisEntityType.POLICY),
   POLICY_LIST(74, PolarisEntityType.NAMESPACE),
   POLICY_FULL_METADATA(75, PolarisEntityType.POLICY),
+  POLICY_ATTACH(76, PolarisEntityType.POLICY),
+  POLICY_DETACH(77, PolarisEntityType.POLICY),
+  CATALOG_ATTACH_POLICY(78, PolarisEntityType.CATALOG),
+  NAMESPACE_ATTACH_POLICY(79, PolarisEntityType.NAMESPACE),
+  TABLE_ATTACH_POLICY(80, PolarisEntityType.TABLE_LIKE, 
PolarisEntitySubType.ICEBERG_TABLE),
+  CATALOG_DETACH_POLICY(81, PolarisEntityType.CATALOG),
+  NAMESPACE_DETACH_POLICY(82, PolarisEntityType.NAMESPACE),
+  TABLE_DETACH_POLICY(83, PolarisEntityType.TABLE_LIKE, 
PolarisEntitySubType.ICEBERG_TABLE),
   ;
 
   /**
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
index 10972b602..6f3498d31 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/admin/PolarisAuthzTestBase.java
@@ -627,24 +627,44 @@ public abstract class PolarisAuthzTestBase {
       Runnable action,
       Function<PolarisPrivilege, Boolean> grantAction,
       Function<PolarisPrivilege, Boolean> revokeAction) {
-    for (PolarisPrivilege privilege : insufficientPrivileges) {
-      // Grant the single privilege at a catalog level to cascade to all 
objects.
-      Assertions.assertThat(grantAction.apply(privilege)).isTrue();
+    doTestInsufficientPrivilegeSets(
+        insufficientPrivileges.stream().map(priv -> Set.of(priv)).toList(),
+        principalName,
+        action,
+        grantAction,
+        revokeAction);
+  }
 
-      // Should be insufficient
-      try {
-        Assertions.assertThatThrownBy(() -> action.run())
-            .isInstanceOf(ForbiddenException.class)
-            .hasMessageContaining(principalName)
-            .hasMessageContaining("is not authorized");
-      } catch (Throwable t) {
-        Assertions.fail(
-            String.format("Expected failure with insufficientPrivilege '%s'", 
privilege), t);
-      }
+  /**
+   * Tests each "insufficient" privilege individually using CATALOG_ROLE1 by 
granting at the
+   * CATALOG_NAME level, ensuring the action fails, then revoking after each 
test case.
+   */
+  protected void doTestInsufficientPrivilegeSets(
+      List<Set<PolarisPrivilege>> insufficientPrivilegeSets,
+      String principalName,
+      Runnable action,
+      Function<PolarisPrivilege, Boolean> grantAction,
+      Function<PolarisPrivilege, Boolean> revokeAction) {
+    for (Set<PolarisPrivilege> privilegeSet : insufficientPrivilegeSets) {
+      for (PolarisPrivilege privilege : privilegeSet) {
+        // Grant the single privilege at a catalog level to cascade to all 
objects.
+        Assertions.assertThat(grantAction.apply(privilege)).isTrue();
 
-      // Revoking only matters in case there are some multi-privilege actions 
being tested with
-      // only granting individual privileges in isolation.
-      Assertions.assertThat(revokeAction.apply(privilege)).isTrue();
+        // Should be insufficient
+        try {
+          Assertions.assertThatThrownBy(() -> action.run())
+              .isInstanceOf(ForbiddenException.class)
+              .hasMessageContaining(principalName)
+              .hasMessageContaining("is not authorized");
+        } catch (Throwable t) {
+          Assertions.fail(
+              String.format("Expected failure with insufficientPrivilege 
'%s'", privilege), t);
+        }
+
+        // Revoking only matters in case there are some multi-privilege 
actions being tested with
+        // only granting individual privileges in isolation.
+        Assertions.assertThat(revokeAction.apply(privilege)).isTrue();
+      }
     }
   }
 }
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogHandlerAuthzTest.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogHandlerAuthzTest.java
index 74a54b807..b821be31e 100644
--- 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogHandlerAuthzTest.java
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogHandlerAuthzTest.java
@@ -19,14 +19,19 @@
 package org.apache.polaris.service.quarkus.catalog;
 
 import io.quarkus.test.junit.QuarkusTest;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Set;
 import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
+import org.apache.polaris.core.catalog.PolarisCatalogHelpers;
 import org.apache.polaris.core.entity.PolarisPrivilege;
 import org.apache.polaris.core.policy.PredefinedPolicyTypes;
 import org.apache.polaris.service.catalog.policy.PolicyCatalogHandler;
 import org.apache.polaris.service.quarkus.admin.PolarisAuthzTestBase;
+import org.apache.polaris.service.types.AttachPolicyRequest;
 import org.apache.polaris.service.types.CreatePolicyRequest;
+import org.apache.polaris.service.types.DetachPolicyRequest;
+import org.apache.polaris.service.types.PolicyAttachmentTarget;
 import org.apache.polaris.service.types.PolicyIdentifier;
 import org.apache.polaris.service.types.UpdatePolicyRequest;
 import org.assertj.core.api.Assertions;
@@ -77,6 +82,17 @@ public class PolicyCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
         PRINCIPAL_NAME);
   }
 
+  /**
+   * @param sufficientPrivileges each set of concurrent privileges expected to 
be sufficient
+   *     together.
+   * @param action
+   * @param cleanupAction
+   */
+  private void doTestSufficientPrivilegeSets(
+      List<Set<PolarisPrivilege>> sufficientPrivileges, Runnable action, 
Runnable cleanupAction) {
+    doTestSufficientPrivilegeSets(sufficientPrivileges, action, cleanupAction, 
PRINCIPAL_NAME);
+  }
+
   /**
    * @param sufficientPrivileges each set of concurrent privileges expected to 
be sufficient
    *     together.
@@ -123,6 +139,11 @@ public class PolicyCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
     doTestInsufficientPrivileges(insufficientPrivileges, PRINCIPAL_NAME, 
action);
   }
 
+  private void doTestInsufficientPrivilegeSets(
+      List<Set<PolarisPrivilege>> insufficientPrivilegesSets, Runnable action) 
{
+    doTestInsufficientPrivilegeSets(insufficientPrivilegesSets, 
PRINCIPAL_NAME, action);
+  }
+
   /**
    * Tests each "insufficient" privilege individually using CATALOG_ROLE1 by 
granting at the
    * CATALOG_NAME level, ensuring the action fails, then revoking after each 
test case.
@@ -139,6 +160,20 @@ public class PolicyCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
             adminService.revokePrivilegeOnCatalogFromRole(CATALOG_NAME, 
CATALOG_ROLE1, privilege));
   }
 
+  private void doTestInsufficientPrivilegeSets(
+      List<Set<PolarisPrivilege>> insufficientPrivilegeSets,
+      String principalName,
+      Runnable action) {
+    doTestInsufficientPrivilegeSets(
+        insufficientPrivilegeSets,
+        principalName,
+        action,
+        (privilege) ->
+            adminService.grantPrivilegeOnCatalogToRole(CATALOG_NAME, 
CATALOG_ROLE1, privilege),
+        (privilege) ->
+            adminService.revokePrivilegeOnCatalogFromRole(CATALOG_NAME, 
CATALOG_ROLE1, privilege));
+  }
+
   @Test
   public void testListPoliciesAllSufficientPrivileges() {
     doTestSufficientPrivileges(
@@ -301,4 +336,390 @@ public class PolicyCatalogHandlerAuthzTest extends 
PolarisAuthzTestBase {
             PolarisPrivilege.POLICY_WRITE),
         () -> newWrapper().dropPolicy(POLICY_NS1_1, true));
   }
+
+  @Test
+  public void testAttachPolicyToCatalogSufficientPrivileges() {
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, PolarisPrivilege.POLICY_DETACH))
+        .isTrue();
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, 
PolarisPrivilege.CATALOG_DETACH_POLICY))
+        .isTrue();
+    PolicyAttachmentTarget namespaceTarget =
+        
PolicyAttachmentTarget.builder().setType(PolicyAttachmentTarget.TypeEnum.CATALOG).build();
+    AttachPolicyRequest attachPolicyRequest =
+        AttachPolicyRequest.builder().setTarget(namespaceTarget).build();
+    DetachPolicyRequest detachPolicyRequest =
+        DetachPolicyRequest.builder().setTarget(namespaceTarget).build();
+
+    doTestSufficientPrivilegeSets(
+        List.of(
+            Set.of(PolarisPrivilege.POLICY_ATTACH, 
PolarisPrivilege.CATALOG_ATTACH_POLICY),
+            Set.of(PolarisPrivilege.CATALOG_MANAGE_CONTENT)),
+        () -> newWrapper(Set.of(PRINCIPAL_ROLE1)).attachPolicy(POLICY_NS1_1, 
attachPolicyRequest),
+        () -> newWrapper(Set.of(PRINCIPAL_ROLE2)).detachPolicy(POLICY_NS1_1, 
detachPolicyRequest),
+        PRINCIPAL_NAME);
+  }
+
+  @Test
+  public void testAttachPolicyToCatalogInsufficientPrivileges() {
+    PolicyAttachmentTarget namespaceTarget =
+        
PolicyAttachmentTarget.builder().setType(PolicyAttachmentTarget.TypeEnum.CATALOG).build();
+    AttachPolicyRequest attachPolicyRequest =
+        AttachPolicyRequest.builder().setTarget(namespaceTarget).build();
+
+    doTestInsufficientPrivilegeSets(
+        List.of(
+            Set.of(PolarisPrivilege.POLICY_ATTACH, 
PolarisPrivilege.NAMESPACE_ATTACH_POLICY),
+            Set.of(PolarisPrivilege.POLICY_ATTACH, 
PolarisPrivilege.TABLE_ATTACH_POLICY),
+            Set.of(PolarisPrivilege.POLICY_ATTACH),
+            Set.of(PolarisPrivilege.CATALOG_ATTACH_POLICY)),
+        () -> newWrapper(Set.of(PRINCIPAL_ROLE1)).attachPolicy(POLICY_NS1_1, 
attachPolicyRequest));
+  }
+
+  @Test
+  public void testAttachPolicyToNamespaceSufficientPrivileges() {
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, PolarisPrivilege.POLICY_DETACH))
+        .isTrue();
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, 
PolarisPrivilege.NAMESPACE_DETACH_POLICY))
+        .isTrue();
+
+    PolicyAttachmentTarget namespaceTarget =
+        PolicyAttachmentTarget.builder()
+            .setType(PolicyAttachmentTarget.TypeEnum.NAMESPACE)
+            .setPath(Arrays.asList(NS2.levels()))
+            .build();
+    AttachPolicyRequest attachPolicyRequest =
+        AttachPolicyRequest.builder().setTarget(namespaceTarget).build();
+    DetachPolicyRequest detachPolicyRequest =
+        DetachPolicyRequest.builder().setTarget(namespaceTarget).build();
+
+    doTestSufficientPrivilegeSets(
+        List.of(
+            Set.of(PolarisPrivilege.POLICY_ATTACH, 
PolarisPrivilege.NAMESPACE_ATTACH_POLICY),
+            Set.of(PolarisPrivilege.CATALOG_MANAGE_CONTENT)),
+        () -> newWrapper(Set.of(PRINCIPAL_ROLE1)).attachPolicy(POLICY_NS1_1, 
attachPolicyRequest),
+        () -> newWrapper(Set.of(PRINCIPAL_ROLE2)).detachPolicy(POLICY_NS1_1, 
detachPolicyRequest));
+  }
+
+  @Test
+  public void testAttachPolicyToNamespaceInsufficientPrivileges() {
+    PolicyAttachmentTarget namespaceTarget =
+        PolicyAttachmentTarget.builder()
+            .setType(PolicyAttachmentTarget.TypeEnum.NAMESPACE)
+            .setPath(Arrays.asList(NS2.levels()))
+            .build();
+    AttachPolicyRequest attachPolicyRequest =
+        AttachPolicyRequest.builder().setTarget(namespaceTarget).build();
+
+    doTestInsufficientPrivilegeSets(
+        List.of(
+            Set.of(PolarisPrivilege.POLICY_ATTACH, 
PolarisPrivilege.CATALOG_ATTACH_POLICY),
+            Set.of(PolarisPrivilege.POLICY_ATTACH, 
PolarisPrivilege.TABLE_ATTACH_POLICY),
+            Set.of(PolarisPrivilege.POLICY_ATTACH),
+            Set.of(PolarisPrivilege.NAMESPACE_ATTACH_POLICY)),
+        () -> newWrapper(Set.of(PRINCIPAL_ROLE1)).attachPolicy(POLICY_NS1_1, 
attachPolicyRequest));
+  }
+
+  @Test
+  public void testAttachPolicyToTableSufficientPrivileges() {
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, PolarisPrivilege.POLICY_DETACH))
+        .isTrue();
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, 
PolarisPrivilege.TABLE_DETACH_POLICY))
+        .isTrue();
+
+    PolicyAttachmentTarget tableTarget =
+        PolicyAttachmentTarget.builder()
+            .setType(PolicyAttachmentTarget.TypeEnum.TABLE_LIKE)
+            .setPath(PolarisCatalogHelpers.tableIdentifierToList(TABLE_NS2_1))
+            .build();
+    AttachPolicyRequest attachPolicyRequest =
+        AttachPolicyRequest.builder().setTarget(tableTarget).build();
+    DetachPolicyRequest detachPolicyRequest =
+        DetachPolicyRequest.builder().setTarget(tableTarget).build();
+
+    doTestSufficientPrivilegeSets(
+        List.of(
+            Set.of(PolarisPrivilege.POLICY_ATTACH, 
PolarisPrivilege.TABLE_ATTACH_POLICY),
+            Set.of(PolarisPrivilege.CATALOG_MANAGE_CONTENT)),
+        () -> newWrapper(Set.of(PRINCIPAL_ROLE1)).attachPolicy(POLICY_NS1_1, 
attachPolicyRequest),
+        () -> newWrapper(Set.of(PRINCIPAL_ROLE2)).detachPolicy(POLICY_NS1_1, 
detachPolicyRequest));
+  }
+
+  @Test
+  public void testAttachPolicyToTableInsufficientPrivileges() {
+    PolicyAttachmentTarget tableTarget =
+        PolicyAttachmentTarget.builder()
+            .setType(PolicyAttachmentTarget.TypeEnum.TABLE_LIKE)
+            .setPath(PolarisCatalogHelpers.tableIdentifierToList(TABLE_NS2_1))
+            .build();
+    AttachPolicyRequest attachPolicyRequest =
+        AttachPolicyRequest.builder().setTarget(tableTarget).build();
+
+    doTestInsufficientPrivilegeSets(
+        List.of(
+            Set.of(PolarisPrivilege.POLICY_ATTACH, 
PolarisPrivilege.CATALOG_ATTACH_POLICY),
+            Set.of(PolarisPrivilege.POLICY_ATTACH, 
PolarisPrivilege.NAMESPACE_ATTACH_POLICY),
+            Set.of(PolarisPrivilege.POLICY_ATTACH),
+            Set.of(PolarisPrivilege.TABLE_ATTACH_POLICY)),
+        () -> newWrapper(Set.of(PRINCIPAL_ROLE1)).attachPolicy(POLICY_NS1_1, 
attachPolicyRequest));
+  }
+
+  @Test
+  public void testDetachPolicyFromCatalogSufficientPrivileges() {
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, PolarisPrivilege.POLICY_ATTACH))
+        .isTrue();
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, 
PolarisPrivilege.CATALOG_ATTACH_POLICY))
+        .isTrue();
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, PolarisPrivilege.POLICY_DETACH))
+        .isTrue();
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, 
PolarisPrivilege.CATALOG_DETACH_POLICY))
+        .isTrue();
+    PolicyAttachmentTarget catalogTarget =
+        
PolicyAttachmentTarget.builder().setType(PolicyAttachmentTarget.TypeEnum.CATALOG).build();
+    AttachPolicyRequest attachPolicyRequest =
+        AttachPolicyRequest.builder().setTarget(catalogTarget).build();
+    DetachPolicyRequest detachPolicyRequest =
+        DetachPolicyRequest.builder().setTarget(catalogTarget).build();
+
+    newWrapper(Set.of(PRINCIPAL_ROLE2)).attachPolicy(POLICY_NS1_1, 
attachPolicyRequest);
+
+    doTestSufficientPrivilegeSets(
+        List.of(
+            Set.of(PolarisPrivilege.POLICY_DETACH, 
PolarisPrivilege.CATALOG_DETACH_POLICY),
+            Set.of(PolarisPrivilege.CATALOG_MANAGE_CONTENT)),
+        () -> newWrapper(Set.of(PRINCIPAL_ROLE1)).detachPolicy(POLICY_NS1_1, 
detachPolicyRequest),
+        () ->
+            newWrapper(Set.of(PRINCIPAL_ROLE2))
+                .attachPolicy(POLICY_NS1_1, attachPolicyRequest) /* 
cleanupAction */);
+
+    newWrapper(Set.of(PRINCIPAL_ROLE2)).detachPolicy(POLICY_NS1_1, 
detachPolicyRequest);
+  }
+
+  @Test
+  public void testDetachPolicyFromCatalogInsufficientPrivileges() {
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, PolarisPrivilege.POLICY_ATTACH))
+        .isTrue();
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, 
PolarisPrivilege.CATALOG_ATTACH_POLICY))
+        .isTrue();
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, PolarisPrivilege.POLICY_DETACH))
+        .isTrue();
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, 
PolarisPrivilege.CATALOG_DETACH_POLICY))
+        .isTrue();
+
+    PolicyAttachmentTarget catalogTarget =
+        
PolicyAttachmentTarget.builder().setType(PolicyAttachmentTarget.TypeEnum.CATALOG).build();
+    AttachPolicyRequest attachPolicyRequest =
+        AttachPolicyRequest.builder().setTarget(catalogTarget).build();
+    DetachPolicyRequest detachPolicyRequest =
+        DetachPolicyRequest.builder().setTarget(catalogTarget).build();
+
+    newWrapper(Set.of(PRINCIPAL_ROLE2)).attachPolicy(POLICY_NS1_1, 
attachPolicyRequest);
+
+    doTestInsufficientPrivilegeSets(
+        List.of(
+            Set.of(PolarisPrivilege.POLICY_DETACH, 
PolarisPrivilege.NAMESPACE_DETACH_POLICY),
+            Set.of(PolarisPrivilege.POLICY_DETACH, 
PolarisPrivilege.TABLE_DETACH_POLICY),
+            Set.of(PolarisPrivilege.POLICY_DETACH),
+            Set.of(PolarisPrivilege.CATALOG_DETACH_POLICY)),
+        () -> newWrapper(Set.of(PRINCIPAL_ROLE1)).detachPolicy(POLICY_NS1_1, 
detachPolicyRequest));
+
+    newWrapper(Set.of(PRINCIPAL_ROLE2)).detachPolicy(POLICY_NS1_1, 
detachPolicyRequest);
+  }
+
+  @Test
+  public void testDetachPolicyFromNamespaceSufficientPrivileges() {
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, PolarisPrivilege.POLICY_ATTACH))
+        .isTrue();
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, 
PolarisPrivilege.NAMESPACE_ATTACH_POLICY))
+        .isTrue();
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, PolarisPrivilege.POLICY_DETACH))
+        .isTrue();
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, 
PolarisPrivilege.NAMESPACE_DETACH_POLICY))
+        .isTrue();
+
+    PolicyAttachmentTarget namespaceTarget =
+        PolicyAttachmentTarget.builder()
+            .setType(PolicyAttachmentTarget.TypeEnum.NAMESPACE)
+            .setPath(Arrays.asList(NS2.levels()))
+            .build();
+    AttachPolicyRequest attachPolicyRequest =
+        AttachPolicyRequest.builder().setTarget(namespaceTarget).build();
+    DetachPolicyRequest detachPolicyRequest =
+        DetachPolicyRequest.builder().setTarget(namespaceTarget).build();
+
+    newWrapper(Set.of(PRINCIPAL_ROLE2)).attachPolicy(POLICY_NS1_1, 
attachPolicyRequest);
+
+    doTestSufficientPrivilegeSets(
+        List.of(
+            Set.of(PolarisPrivilege.POLICY_DETACH, 
PolarisPrivilege.NAMESPACE_DETACH_POLICY),
+            Set.of(PolarisPrivilege.CATALOG_MANAGE_CONTENT)),
+        () -> newWrapper(Set.of(PRINCIPAL_ROLE1)).detachPolicy(POLICY_NS1_1, 
detachPolicyRequest),
+        () ->
+            newWrapper(Set.of(PRINCIPAL_ROLE2))
+                .attachPolicy(POLICY_NS1_1, attachPolicyRequest) /* 
cleanupAction */);
+
+    newWrapper(Set.of(PRINCIPAL_ROLE2)).detachPolicy(POLICY_NS1_1, 
detachPolicyRequest);
+  }
+
+  @Test
+  public void testDetachPolicyFromNamespaceInsufficientPrivilege() {
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, PolarisPrivilege.POLICY_ATTACH))
+        .isTrue();
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, 
PolarisPrivilege.NAMESPACE_ATTACH_POLICY))
+        .isTrue();
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, PolarisPrivilege.POLICY_DETACH))
+        .isTrue();
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, 
PolarisPrivilege.NAMESPACE_DETACH_POLICY))
+        .isTrue();
+
+    PolicyAttachmentTarget namespaceTarget =
+        PolicyAttachmentTarget.builder()
+            .setType(PolicyAttachmentTarget.TypeEnum.NAMESPACE)
+            .setPath(Arrays.asList(NS2.levels()))
+            .build();
+    AttachPolicyRequest attachPolicyRequest =
+        AttachPolicyRequest.builder().setTarget(namespaceTarget).build();
+    DetachPolicyRequest detachPolicyRequest =
+        DetachPolicyRequest.builder().setTarget(namespaceTarget).build();
+
+    newWrapper(Set.of(PRINCIPAL_ROLE2)).attachPolicy(POLICY_NS1_1, 
attachPolicyRequest);
+
+    doTestInsufficientPrivilegeSets(
+        List.of(
+            Set.of(PolarisPrivilege.POLICY_DETACH, 
PolarisPrivilege.CATALOG_DETACH_POLICY),
+            Set.of(PolarisPrivilege.POLICY_DETACH, 
PolarisPrivilege.TABLE_DETACH_POLICY),
+            Set.of(PolarisPrivilege.POLICY_DETACH),
+            Set.of(PolarisPrivilege.NAMESPACE_DETACH_POLICY)),
+        () -> newWrapper(Set.of(PRINCIPAL_ROLE1)).detachPolicy(POLICY_NS1_1, 
detachPolicyRequest));
+
+    newWrapper(Set.of(PRINCIPAL_ROLE2)).detachPolicy(POLICY_NS1_1, 
detachPolicyRequest);
+  }
+
+  @Test
+  public void testDetachPolicyFromTableSufficientPrivileges() {
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, PolarisPrivilege.POLICY_ATTACH))
+        .isTrue();
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, 
PolarisPrivilege.TABLE_ATTACH_POLICY))
+        .isTrue();
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, PolarisPrivilege.POLICY_DETACH))
+        .isTrue();
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, 
PolarisPrivilege.TABLE_DETACH_POLICY))
+        .isTrue();
+
+    PolicyAttachmentTarget tableTarget =
+        PolicyAttachmentTarget.builder()
+            .setType(PolicyAttachmentTarget.TypeEnum.TABLE_LIKE)
+            .setPath(PolarisCatalogHelpers.tableIdentifierToList(TABLE_NS2_1))
+            .build();
+    AttachPolicyRequest attachPolicyRequest =
+        AttachPolicyRequest.builder().setTarget(tableTarget).build();
+    DetachPolicyRequest detachPolicyRequest =
+        DetachPolicyRequest.builder().setTarget(tableTarget).build();
+
+    newWrapper(Set.of(PRINCIPAL_ROLE2)).attachPolicy(POLICY_NS1_1, 
attachPolicyRequest);
+
+    doTestSufficientPrivilegeSets(
+        List.of(
+            Set.of(PolarisPrivilege.POLICY_DETACH, 
PolarisPrivilege.TABLE_DETACH_POLICY),
+            Set.of(PolarisPrivilege.CATALOG_MANAGE_CONTENT)),
+        () -> newWrapper(Set.of(PRINCIPAL_ROLE1)).detachPolicy(POLICY_NS1_1, 
detachPolicyRequest),
+        () ->
+            newWrapper(Set.of(PRINCIPAL_ROLE2))
+                .attachPolicy(POLICY_NS1_1, attachPolicyRequest) /* 
cleanupAction */);
+
+    newWrapper(Set.of(PRINCIPAL_ROLE2)).detachPolicy(POLICY_NS1_1, 
detachPolicyRequest);
+  }
+
+  @Test
+  public void testDetachFromPolicyInsufficientPrivileges() {
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, PolarisPrivilege.POLICY_ATTACH))
+        .isTrue();
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, 
PolarisPrivilege.TABLE_ATTACH_POLICY))
+        .isTrue();
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, PolarisPrivilege.POLICY_DETACH))
+        .isTrue();
+    Assertions.assertThat(
+            adminService.grantPrivilegeOnCatalogToRole(
+                CATALOG_NAME, CATALOG_ROLE2, 
PolarisPrivilege.TABLE_DETACH_POLICY))
+        .isTrue();
+
+    PolicyAttachmentTarget tableTarget =
+        PolicyAttachmentTarget.builder()
+            .setType(PolicyAttachmentTarget.TypeEnum.TABLE_LIKE)
+            .setPath(PolarisCatalogHelpers.tableIdentifierToList(TABLE_NS2_1))
+            .build();
+    AttachPolicyRequest attachPolicyRequest =
+        AttachPolicyRequest.builder().setTarget(tableTarget).build();
+    DetachPolicyRequest detachPolicyRequest =
+        DetachPolicyRequest.builder().setTarget(tableTarget).build();
+
+    newWrapper(Set.of(PRINCIPAL_ROLE2)).attachPolicy(POLICY_NS1_1, 
attachPolicyRequest);
+
+    doTestInsufficientPrivilegeSets(
+        List.of(
+            Set.of(PolarisPrivilege.POLICY_DETACH, 
PolarisPrivilege.CATALOG_DETACH_POLICY),
+            Set.of(PolarisPrivilege.POLICY_DETACH, 
PolarisPrivilege.NAMESPACE_DETACH_POLICY),
+            Set.of(PolarisPrivilege.POLICY_DETACH),
+            Set.of(PolarisPrivilege.TABLE_DETACH_POLICY)),
+        () -> newWrapper(Set.of(PRINCIPAL_ROLE1)).detachPolicy(POLICY_NS1_1, 
detachPolicyRequest));
+
+    newWrapper(Set.of(PRINCIPAL_ROLE2)).detachPolicy(POLICY_NS1_1, 
detachPolicyRequest);
+  }
 }
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalogHandler.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalogHandler.java
index 1eceb362d..fb0c71f2e 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalogHandler.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalogHandler.java
@@ -19,24 +19,33 @@
 package org.apache.polaris.service.catalog.policy;
 
 import jakarta.ws.rs.core.SecurityContext;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.polaris.core.auth.PolarisAuthorizableOperation;
 import org.apache.polaris.core.auth.PolarisAuthorizer;
 import org.apache.polaris.core.catalog.PolarisCatalogHelpers;
 import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.PolarisEntitySubType;
 import org.apache.polaris.core.entity.PolarisEntityType;
 import org.apache.polaris.core.persistence.PolarisEntityManager;
 import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
 import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
 import org.apache.polaris.core.persistence.resolver.ResolverPath;
+import org.apache.polaris.core.persistence.resolver.ResolverStatus;
 import org.apache.polaris.core.policy.PolicyType;
 import org.apache.polaris.core.policy.exceptions.NoSuchPolicyException;
 import org.apache.polaris.service.catalog.common.CatalogHandler;
+import org.apache.polaris.service.types.AttachPolicyRequest;
 import org.apache.polaris.service.types.CreatePolicyRequest;
+import org.apache.polaris.service.types.DetachPolicyRequest;
 import org.apache.polaris.service.types.ListPoliciesResponse;
 import org.apache.polaris.service.types.LoadPolicyResponse;
+import org.apache.polaris.service.types.PolicyAttachmentTarget;
 import org.apache.polaris.service.types.PolicyIdentifier;
 import org.apache.polaris.service.types.UpdatePolicyRequest;
 
@@ -115,6 +124,16 @@ public class PolicyCatalogHandler extends CatalogHandler {
     return policyCatalog.dropPolicy(identifier, detachAll);
   }
 
+  public boolean attachPolicy(PolicyIdentifier identifier, AttachPolicyRequest 
request) {
+    authorizePolicyMappingOperationOrThrow(identifier, request.getTarget(), 
true);
+    return policyCatalog.attachPolicy(identifier, request.getTarget(), 
request.getParameters());
+  }
+
+  public boolean detachPolicy(PolicyIdentifier identifier, DetachPolicyRequest 
request) {
+    authorizePolicyMappingOperationOrThrow(identifier, request.getTarget(), 
false);
+    return policyCatalog.detachPolicy(identifier, request.getTarget());
+  }
+
   private void authorizeBasicPolicyOperationOrThrow(
       PolarisAuthorizableOperation op, PolicyIdentifier identifier) {
     resolutionManifest =
@@ -141,4 +160,107 @@ public class PolicyCatalogHandler extends CatalogHandler {
 
     initializeCatalog();
   }
+
+  private void authorizePolicyMappingOperationOrThrow(
+      PolicyIdentifier identifier, PolicyAttachmentTarget target, boolean 
isAttach) {
+    resolutionManifest =
+        entityManager.prepareResolutionManifest(callContext, securityContext, 
catalogName);
+    resolutionManifest.addPassthroughPath(
+        new ResolverPath(
+            PolarisCatalogHelpers.identifierToList(identifier.getNamespace(), 
identifier.getName()),
+            PolarisEntityType.POLICY,
+            true /* optional */),
+        identifier);
+
+    switch (target.getType()) {
+      case CATALOG -> {}
+      case NAMESPACE -> {
+        Namespace targetNamespace = Namespace.of(target.getPath().toArray(new 
String[0]));
+        resolutionManifest.addPath(
+            new ResolverPath(Arrays.asList(targetNamespace.levels()), 
PolarisEntityType.NAMESPACE),
+            targetNamespace);
+      }
+      case TABLE_LIKE -> {
+        TableIdentifier targetIdentifier =
+            TableIdentifier.of(target.getPath().toArray(new String[0]));
+        resolutionManifest.addPath(
+            new ResolverPath(
+                PolarisCatalogHelpers.tableIdentifierToList(targetIdentifier),
+                PolarisEntityType.TABLE_LIKE),
+            targetIdentifier);
+      }
+      default -> throw new IllegalArgumentException("Unsupported target type: 
" + target.getType());
+    }
+
+    ResolverStatus status = resolutionManifest.resolveAll();
+
+    throwNotFoundExceptionIfFailToResolve(status, identifier);
+
+    PolarisResolvedPathWrapper policyWrapper =
+        resolutionManifest.getPassthroughResolvedPath(
+            identifier, PolarisEntityType.POLICY, 
PolarisEntitySubType.NULL_SUBTYPE);
+    if (policyWrapper == null) {
+      throw new NoSuchPolicyException(String.format("Policy does not exist: 
%s", identifier));
+    }
+
+    PolarisResolvedPathWrapper targetWrapper =
+        PolicyCatalogUtils.getResolvedPathWrapper(resolutionManifest, target);
+
+    PolarisAuthorizableOperation op =
+        determinePolicyMappingOperation(target, targetWrapper, isAttach);
+
+    authorizer.authorizeOrThrow(
+        authenticatedPrincipal,
+        resolutionManifest.getAllActivatedCatalogRoleAndPrincipalRoles(),
+        op,
+        policyWrapper,
+        targetWrapper);
+
+    initializeCatalog();
+  }
+
+  private PolarisAuthorizableOperation determinePolicyMappingOperation(
+      PolicyAttachmentTarget target, PolarisResolvedPathWrapper targetWrapper, 
boolean isAttach) {
+    return switch (targetWrapper.getRawLeafEntity().getType()) {
+      case CATALOG ->
+          isAttach
+              ? PolarisAuthorizableOperation.ATTACH_POLICY_TO_CATALOG
+              : PolarisAuthorizableOperation.DETACH_POLICY_FROM_CATALOG;
+      case NAMESPACE ->
+          isAttach
+              ? PolarisAuthorizableOperation.ATTACH_POLICY_TO_NAMESPACE
+              : PolarisAuthorizableOperation.DETACH_POLICY_FROM_NAMESPACE;
+      case TABLE_LIKE -> {
+        PolarisEntitySubType subType = 
targetWrapper.getRawLeafEntity().getSubType();
+        if (subType == PolarisEntitySubType.ICEBERG_TABLE) {
+          yield isAttach
+              ? PolarisAuthorizableOperation.ATTACH_POLICY_TO_TABLE
+              : PolarisAuthorizableOperation.DETACH_POLICY_FROM_TABLE;
+        }
+        throw new IllegalArgumentException("Unsupported table-like subtype: " 
+ subType);
+      }
+      default -> throw new IllegalArgumentException("Unsupported target type: 
" + target.getType());
+    };
+  }
+
+  private void throwNotFoundExceptionIfFailToResolve(
+      ResolverStatus status, PolicyIdentifier identifier) {
+    if ((status.getStatus() == 
ResolverStatus.StatusEnum.PATH_COULD_NOT_BE_FULLY_RESOLVED)) {
+      switch (status.getFailedToResolvePath().getLastEntityType()) {
+        case PolarisEntityType.TABLE_LIKE ->
+            throw new NoSuchTableException(
+                "Table or view does not exist: %s",
+                PolarisCatalogHelpers.listToTableIdentifier(
+                    status.getFailedToResolvePath().getEntityNames()));
+        case PolarisEntityType.NAMESPACE ->
+            throw new NoSuchNamespaceException(
+                "Namespace does not exist: %s",
+                Namespace.of(
+                    
status.getFailedToResolvePath().getEntityNames().toArray(new String[0])));
+        case PolarisEntityType.POLICY ->
+            throw new NoSuchPolicyException(String.format("Policy does not 
exist: %s", identifier));
+        default -> throw new IllegalStateException("Cannot resolve");
+      }
+    }
+  }
 }
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalogUtils.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalogUtils.java
new file mode 100644
index 000000000..42364a3e4
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalogUtils.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.catalog.policy;
+
+import jakarta.annotation.Nonnull;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
+import org.apache.polaris.core.entity.PolarisEntitySubType;
+import org.apache.polaris.core.entity.PolarisEntityType;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
+import org.apache.polaris.service.types.PolicyAttachmentTarget;
+
+public class PolicyCatalogUtils {
+
+  public static PolarisResolvedPathWrapper getResolvedPathWrapper(
+      @Nonnull PolarisResolutionManifest resolutionManifest,
+      @Nonnull PolicyAttachmentTarget target) {
+    return switch (target.getType()) {
+      // get the current catalog entity, since policy cannot apply across 
catalog at this moment
+      case CATALOG -> resolutionManifest.getResolvedReferenceCatalogEntity();
+      case NAMESPACE -> {
+        var namespace = Namespace.of(target.getPath().toArray(new String[0]));
+        var resolvedTargetEntity = 
resolutionManifest.getResolvedPath(namespace);
+        if (resolvedTargetEntity == null) {
+          throw new NoSuchNamespaceException("Namespace does not exist: %s", 
namespace);
+        }
+        yield resolvedTargetEntity;
+      }
+      case TABLE_LIKE -> {
+        var tableIdentifier = TableIdentifier.of(target.getPath().toArray(new 
String[0]));
+        // only Iceberg tables are supported
+        var resolvedTableEntity =
+            resolutionManifest.getResolvedPath(
+                tableIdentifier, PolarisEntityType.TABLE_LIKE, 
PolarisEntitySubType.ICEBERG_TABLE);
+        if (resolvedTableEntity == null) {
+          throw new NoSuchTableException("Iceberg Table does not exist: %s", 
tableIdentifier);
+        }
+        yield resolvedTableEntity;
+      }
+      default -> throw new IllegalArgumentException("Unsupported target type: 
" + target.getType());
+    };
+  }
+}


Reply via email to