This is an automated email from the ASF dual-hosted git repository.

yufei pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new 17f28a921 Implement PolicyCatalog Stage 1: CRUD + ListPolicies (#1294)
17f28a921 is described below

commit 17f28a92144e2ec00fb8151ac17a41796ebdf901
Author: Honah (Jonas) J. <[email protected]>
AuthorDate: Fri Apr 4 11:55:01 2025 -0500

    Implement PolicyCatalog Stage 1: CRUD + ListPolicies (#1294)
---
 api/polaris-catalog-service/build.gradle.kts       |   5 +-
 .../polaris/service/types/PolicyIdentifier.java    | 140 ++++++
 .../core/catalog/PolarisCatalogHelpers.java        |  10 +-
 .../policy/exceptions/NoSuchPolicyException.java   |  32 ++
 .../exceptions/PolicyVersionMismatchException.java |  31 ++
 .../service/quarkus/catalog/PolicyCatalogTest.java | 496 +++++++++++++++++++++
 service/common/build.gradle.kts                    |   1 +
 .../service/catalog/policy/PolicyCatalog.java      | 285 ++++++++++++
 .../service/exception/PolarisExceptionMapper.java  |   6 +
 .../catalog/PolarisPassthroughResolutionView.java  |  18 +
 10 files changed, 1020 insertions(+), 4 deletions(-)

diff --git a/api/polaris-catalog-service/build.gradle.kts 
b/api/polaris-catalog-service/build.gradle.kts
index 2fd861d80..5f0a8134f 100644
--- a/api/polaris-catalog-service/build.gradle.kts
+++ b/api/polaris-catalog-service/build.gradle.kts
@@ -36,7 +36,6 @@ val policyManagementModels =
     "CatalogIdentifier",
     "CreatePolicyRequest",
     "LoadPolicyResponse",
-    "PolicyIdentifier",
     "Policy",
     "PolicyAttachmentTarget",
     "AttachPolicyRequest",
@@ -59,6 +58,7 @@ dependencies {
   implementation(libs.jakarta.inject.api)
   implementation(libs.jakarta.validation.api)
   implementation(libs.swagger.annotations)
+  implementation(libs.guava)
 
   implementation(libs.jakarta.servlet.api)
   implementation(libs.jakarta.ws.rs.api)
@@ -103,6 +103,9 @@ openApiGenerate {
       "ErrorModel" to "org.apache.iceberg.rest.responses.ErrorResponse",
       "IcebergErrorResponse" to 
"org.apache.iceberg.rest.responses.ErrorResponse",
       "TableIdentifier" to "org.apache.iceberg.catalog.TableIdentifier",
+
+      // Custom types defined below
+      "PolicyIdentifier" to 
"org.apache.polaris.service.types.PolicyIdentifier",
     )
 }
 
diff --git 
a/api/polaris-catalog-service/src/main/java/org/apache/polaris/service/types/PolicyIdentifier.java
 
b/api/polaris-catalog-service/src/main/java/org/apache/polaris/service/types/PolicyIdentifier.java
new file mode 100644
index 000000000..59dfd3024
--- /dev/null
+++ 
b/api/polaris-catalog-service/src/main/java/org/apache/polaris/service/types/PolicyIdentifier.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.types;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import io.swagger.annotations.ApiModelProperty;
+import java.util.Objects;
+import org.apache.iceberg.catalog.Namespace;
+
+/**
+ * Represents a modified version of the PolicyIdentifier that is different 
from the one generated by
+ * the OpenAPI generator
+ *
+ * <p>the open api generation inlines the namespace definition, generates a 
{@code List<String>}
+ * directly, instead of generating a Namespace class. This version uses {@link
+ * org.apache.iceberg.catalog.Namespace} for namespace field.
+ *
+ * <p>TODO: make code generation use {@link 
org.apache.iceberg.catalog.Namespace} directly
+ */
+public class PolicyIdentifier {
+
+  private final Namespace namespace;
+  private final String name;
+
+  /** Reference to one or more levels of a namespace */
+  @ApiModelProperty(
+      example = "[\"accounting\",\"tax\"]",
+      required = true,
+      value = "Reference to one or more levels of a namespace")
+  @JsonProperty(value = "namespace", required = true)
+  public Namespace getNamespace() {
+    return namespace;
+  }
+
+  /** */
+  @ApiModelProperty(required = true, value = "")
+  @JsonProperty(value = "name", required = true)
+  public String getName() {
+    return name;
+  }
+
+  @JsonCreator
+  public PolicyIdentifier(
+      @JsonProperty(value = "namespace", required = true) Namespace namespace,
+      @JsonProperty(value = "name", required = true) String name) {
+    this.namespace = Objects.requireNonNullElse(namespace, Namespace.empty());
+    this.name = name;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static Builder builder(Namespace namespace, String name) {
+    return new Builder(namespace, name);
+  }
+
+  public static final class Builder {
+    private Namespace namespace;
+    private String name;
+
+    private Builder() {}
+
+    private Builder(Namespace namespace, String name) {
+      this.namespace = Objects.requireNonNullElse(namespace, 
Namespace.empty());
+      this.name = name;
+    }
+
+    public Builder setNamespace(Namespace namespace) {
+      this.namespace = namespace;
+      return this;
+    }
+
+    public Builder setName(String name) {
+      this.name = name;
+      return this;
+    }
+
+    public PolicyIdentifier build() {
+      PolicyIdentifier inst = new PolicyIdentifier(namespace, name);
+      return inst;
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    PolicyIdentifier policyIdentifier = (PolicyIdentifier) o;
+    return Objects.equals(this.namespace, policyIdentifier.namespace)
+        && Objects.equals(this.name, policyIdentifier.name);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(namespace, name);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("class PolicyIdentifier {\n");
+
+    sb.append("    namespace: 
").append(toIndentedString(namespace)).append("\n");
+    sb.append("    name: ").append(toIndentedString(name)).append("\n");
+    sb.append("}");
+    return sb.toString();
+  }
+
+  /**
+   * Convert the given object to string with each line indented by 4 spaces 
(except the first line).
+   */
+  private String toIndentedString(Object o) {
+    if (o == null) {
+      return "null";
+    }
+    return o.toString().replace("\n", "\n    ");
+  }
+}
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java
index ce0345bf3..e10c24f2f 100644
--- 
a/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/catalog/PolarisCatalogHelpers.java
@@ -36,10 +36,14 @@ public class PolarisCatalogHelpers {
   private PolarisCatalogHelpers() {}
 
   public static List<String> tableIdentifierToList(TableIdentifier identifier) 
{
+    return identifierToList(identifier.namespace(), identifier.name());
+  }
+
+  public static List<String> identifierToList(Namespace namespace, String 
name) {
     ImmutableList.Builder<String> fullList =
-        ImmutableList.builderWithExpectedSize(identifier.namespace().length() 
+ 1);
-    fullList.addAll(Arrays.asList(identifier.namespace().levels()));
-    fullList.add(identifier.name());
+        ImmutableList.builderWithExpectedSize(namespace.length() + 1);
+    fullList.addAll(Arrays.asList(namespace.levels()));
+    fullList.add(name);
     return fullList.build();
   }
 
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/policy/exceptions/NoSuchPolicyException.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/policy/exceptions/NoSuchPolicyException.java
new file mode 100644
index 000000000..642bd36ad
--- /dev/null
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/policy/exceptions/NoSuchPolicyException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.policy.exceptions;
+
+import org.apache.polaris.core.exceptions.PolarisException;
+
+public class NoSuchPolicyException extends PolarisException {
+
+  public NoSuchPolicyException(String message) {
+    super(message);
+  }
+
+  public NoSuchPolicyException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git 
a/polaris-core/src/main/java/org/apache/polaris/core/policy/exceptions/PolicyVersionMismatchException.java
 
b/polaris-core/src/main/java/org/apache/polaris/core/policy/exceptions/PolicyVersionMismatchException.java
new file mode 100644
index 000000000..c2f8f8f78
--- /dev/null
+++ 
b/polaris-core/src/main/java/org/apache/polaris/core/policy/exceptions/PolicyVersionMismatchException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.core.policy.exceptions;
+
+import org.apache.polaris.core.exceptions.PolarisException;
+
+public class PolicyVersionMismatchException extends PolarisException {
+  public PolicyVersionMismatchException(String message) {
+    super(message);
+  }
+
+  public PolicyVersionMismatchException(String message, Throwable cause) {
+    super(message, cause);
+  }
+}
diff --git 
a/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java
 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java
new file mode 100644
index 000000000..83bdecaaa
--- /dev/null
+++ 
b/quarkus/service/src/test/java/org/apache/polaris/service/quarkus/catalog/PolicyCatalogTest.java
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.quarkus.catalog;
+
+import static org.mockito.ArgumentMatchers.isA;
+import static org.mockito.Mockito.when;
+
+import com.google.common.collect.ImmutableMap;
+import io.quarkus.test.junit.QuarkusMock;
+import io.quarkus.test.junit.QuarkusTest;
+import io.quarkus.test.junit.QuarkusTestProfile;
+import io.quarkus.test.junit.TestProfile;
+import jakarta.inject.Inject;
+import jakarta.ws.rs.core.SecurityContext;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.time.Clock;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+import org.apache.commons.lang3.NotImplementedException;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.polaris.core.PolarisCallContext;
+import org.apache.polaris.core.PolarisDiagnostics;
+import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
+import org.apache.polaris.core.admin.model.StorageConfigInfo;
+import org.apache.polaris.core.auth.AuthenticatedPolarisPrincipal;
+import org.apache.polaris.core.auth.PolarisAuthorizerImpl;
+import org.apache.polaris.core.config.FeatureConfiguration;
+import org.apache.polaris.core.config.PolarisConfigurationStore;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.context.RealmContext;
+import org.apache.polaris.core.entity.CatalogEntity;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntitySubType;
+import org.apache.polaris.core.entity.PolarisEntityType;
+import org.apache.polaris.core.entity.PrincipalEntity;
+import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
+import org.apache.polaris.core.persistence.PolarisEntityManager;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.bootstrap.RootCredentialsSet;
+import org.apache.polaris.core.persistence.cache.EntityCache;
+import org.apache.polaris.core.persistence.dao.entity.BaseResult;
+import org.apache.polaris.core.persistence.dao.entity.PrincipalSecretsResult;
+import 
org.apache.polaris.core.persistence.transactional.TransactionalPersistence;
+import org.apache.polaris.core.policy.PredefinedPolicyTypes;
+import org.apache.polaris.core.policy.exceptions.NoSuchPolicyException;
+import 
org.apache.polaris.core.policy.exceptions.PolicyVersionMismatchException;
+import org.apache.polaris.core.policy.validator.InvalidPolicyException;
+import org.apache.polaris.core.storage.PolarisStorageIntegration;
+import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider;
+import org.apache.polaris.core.storage.aws.AwsCredentialsStorageIntegration;
+import org.apache.polaris.core.storage.aws.AwsStorageConfigurationInfo;
+import org.apache.polaris.core.storage.cache.StorageCredentialCache;
+import org.apache.polaris.service.admin.PolarisAdminService;
+import org.apache.polaris.service.catalog.PolarisPassthroughResolutionView;
+import org.apache.polaris.service.catalog.iceberg.IcebergCatalog;
+import org.apache.polaris.service.catalog.io.DefaultFileIOFactory;
+import org.apache.polaris.service.catalog.io.FileIOFactory;
+import org.apache.polaris.service.catalog.policy.PolicyCatalog;
+import org.apache.polaris.service.config.RealmEntityManagerFactory;
+import 
org.apache.polaris.service.storage.PolarisStorageIntegrationProviderImpl;
+import org.apache.polaris.service.task.TaskExecutor;
+import org.apache.polaris.service.types.Policy;
+import org.apache.polaris.service.types.PolicyIdentifier;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.mockito.Mockito;
+import software.amazon.awssdk.services.sts.StsClient;
+import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
+import software.amazon.awssdk.services.sts.model.AssumeRoleResponse;
+import software.amazon.awssdk.services.sts.model.Credentials;
+
+@QuarkusTest
+@TestProfile(PolicyCatalogTest.Profile.class)
+public class PolicyCatalogTest {
+  public static class Profile implements QuarkusTestProfile {
+
+    @Override
+    public Map<String, String> getConfigOverrides() {
+      return Map.of(
+          "polaris.features.defaults.\"ALLOW_SPECIFYING_FILE_IO_IMPL\"",
+          "true",
+          
"polaris.features.defaults.\"INITIALIZE_DEFAULT_CATALOG_FILEIO_FOR_TEST\"",
+          "true",
+          "polaris.features.defaults.\"SUPPORTED_CATALOG_STORAGE_TYPES\"",
+          "[\"FILE\"]");
+    }
+  }
+
+  protected static final Namespace NS = Namespace.of("newdb");
+  protected static final TableIdentifier TABLE = TableIdentifier.of(NS, 
"table");
+  public static final String CATALOG_NAME = "polaris-catalog";
+  public static final String TEST_ACCESS_KEY = "test_access_key";
+  public static final String SECRET_ACCESS_KEY = "secret_access_key";
+  public static final String SESSION_TOKEN = "session_token";
+
+  private static final Namespace NS1 = Namespace.of("ns1");
+  private static final PolicyIdentifier POLICY1 = new PolicyIdentifier(NS1, 
"p1");
+  private static final PolicyIdentifier POLICY2 = new PolicyIdentifier(NS1, 
"p2");
+  private static final PolicyIdentifier POLICY3 = new PolicyIdentifier(NS1, 
"p3");
+  private static final PolicyIdentifier POLICY4 = new PolicyIdentifier(NS1, 
"p4");
+
+  @Inject MetaStoreManagerFactory managerFactory;
+  @Inject PolarisConfigurationStore configurationStore;
+  @Inject PolarisStorageIntegrationProvider storageIntegrationProvider;
+  @Inject PolarisDiagnostics diagServices;
+
+  private PolicyCatalog policyCatalog;
+  private IcebergCatalog icebergCatalog;
+  private CallContext callContext;
+  private AwsStorageConfigInfo storageConfigModel;
+  private String realmName;
+  private PolarisMetaStoreManager metaStoreManager;
+  private PolarisCallContext polarisContext;
+  private PolarisAdminService adminService;
+  private PolarisEntityManager entityManager;
+  private FileIOFactory fileIOFactory;
+  private AuthenticatedPolarisPrincipal authenticatedRoot;
+  private PolarisEntity catalogEntity;
+  private SecurityContext securityContext;
+
+  @BeforeAll
+  public static void setUpMocks() {
+    PolarisStorageIntegrationProviderImpl mock =
+        Mockito.mock(PolarisStorageIntegrationProviderImpl.class);
+    QuarkusMock.installMockForType(mock, 
PolarisStorageIntegrationProviderImpl.class);
+  }
+
+  @BeforeEach
+  @SuppressWarnings("unchecked")
+  public void before(TestInfo testInfo) {
+    realmName =
+        "realm_%s_%s"
+            .formatted(
+                testInfo.getTestMethod().map(Method::getName).orElse("test"), 
System.nanoTime());
+    RealmContext realmContext = () -> realmName;
+    metaStoreManager = 
managerFactory.getOrCreateMetaStoreManager(realmContext);
+    polarisContext =
+        new PolarisCallContext(
+            managerFactory.getOrCreateSessionSupplier(realmContext).get(),
+            diagServices,
+            configurationStore,
+            Clock.systemDefaultZone());
+    entityManager =
+        new PolarisEntityManager(
+            metaStoreManager, new StorageCredentialCache(), new 
EntityCache(metaStoreManager));
+
+    callContext = CallContext.of(realmContext, polarisContext);
+
+    PrincipalEntity rootEntity =
+        new PrincipalEntity(
+            PolarisEntity.of(
+                metaStoreManager
+                    .readEntityByName(
+                        polarisContext,
+                        null,
+                        PolarisEntityType.PRINCIPAL,
+                        PolarisEntitySubType.NULL_SUBTYPE,
+                        "root")
+                    .getEntity()));
+
+    authenticatedRoot = new AuthenticatedPolarisPrincipal(rootEntity, 
Set.of());
+
+    securityContext = Mockito.mock(SecurityContext.class);
+    when(securityContext.getUserPrincipal()).thenReturn(authenticatedRoot);
+    when(securityContext.isUserInRole(isA(String.class))).thenReturn(true);
+    adminService =
+        new PolarisAdminService(
+            callContext,
+            entityManager,
+            metaStoreManager,
+            securityContext,
+            new PolarisAuthorizerImpl(new PolarisConfigurationStore() {}));
+
+    String storageLocation = "s3://my-bucket/path/to/data";
+    storageConfigModel =
+        AwsStorageConfigInfo.builder()
+            .setRoleArn("arn:aws:iam::012345678901:role/jdoe")
+            .setExternalId("externalId")
+            .setUserArn("aws::a:user:arn")
+            .setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
+            .setAllowedLocations(List.of(storageLocation, 
"s3://externally-owned-bucket"))
+            .build();
+    catalogEntity =
+        adminService.createCatalog(
+            new CatalogEntity.Builder()
+                .setName(CATALOG_NAME)
+                .setDefaultBaseLocation(storageLocation)
+                .setReplaceNewLocationPrefixWithCatalogDefault("file:")
+                .addProperty(
+                    
FeatureConfiguration.ALLOW_EXTERNAL_TABLE_LOCATION.catalogConfig(), "true")
+                .addProperty(
+                    
FeatureConfiguration.ALLOW_UNSTRUCTURED_TABLE_LOCATION.catalogConfig(), "true")
+                .setStorageConfigurationInfo(storageConfigModel, 
storageLocation)
+                .build());
+
+    PolarisPassthroughResolutionView passthroughView =
+        new PolarisPassthroughResolutionView(
+            callContext, entityManager, securityContext, CATALOG_NAME);
+    TaskExecutor taskExecutor = Mockito.mock();
+    RealmEntityManagerFactory realmEntityManagerFactory =
+        new RealmEntityManagerFactory(createMockMetaStoreManagerFactory());
+    this.fileIOFactory =
+        new DefaultFileIOFactory(realmEntityManagerFactory, managerFactory, 
configurationStore);
+
+    StsClient stsClient = Mockito.mock(StsClient.class);
+    when(stsClient.assumeRole(isA(AssumeRoleRequest.class)))
+        .thenReturn(
+            AssumeRoleResponse.builder()
+                .credentials(
+                    Credentials.builder()
+                        .accessKeyId(TEST_ACCESS_KEY)
+                        .secretAccessKey(SECRET_ACCESS_KEY)
+                        .sessionToken(SESSION_TOKEN)
+                        .build())
+                .build());
+    PolarisStorageIntegration<AwsStorageConfigurationInfo> storageIntegration =
+        new AwsCredentialsStorageIntegration(stsClient);
+    when(storageIntegrationProvider.getStorageIntegrationForConfig(
+            isA(AwsStorageConfigurationInfo.class)))
+        .thenReturn((PolarisStorageIntegration) storageIntegration);
+
+    this.policyCatalog = new PolicyCatalog(metaStoreManager, callContext, 
passthroughView);
+    this.icebergCatalog =
+        new IcebergCatalog(
+            entityManager,
+            metaStoreManager,
+            callContext,
+            passthroughView,
+            securityContext,
+            taskExecutor,
+            fileIOFactory);
+    this.icebergCatalog.initialize(
+        CATALOG_NAME,
+        ImmutableMap.of(
+            CatalogProperties.FILE_IO_IMPL, 
"org.apache.iceberg.inmemory.InMemoryFileIO"));
+  }
+
+  @AfterEach
+  public void after() throws IOException {
+    metaStoreManager.purge(polarisContext);
+  }
+
+  private MetaStoreManagerFactory createMockMetaStoreManagerFactory() {
+    return new MetaStoreManagerFactory() {
+      @Override
+      public PolarisMetaStoreManager getOrCreateMetaStoreManager(RealmContext 
realmContext) {
+        return metaStoreManager;
+      }
+
+      @Override
+      public Supplier<TransactionalPersistence> getOrCreateSessionSupplier(
+          RealmContext realmContext) {
+        return () -> ((TransactionalPersistence) 
polarisContext.getMetaStore());
+      }
+
+      @Override
+      public StorageCredentialCache 
getOrCreateStorageCredentialCache(RealmContext realmContext) {
+        return new StorageCredentialCache();
+      }
+
+      @Override
+      public EntityCache getOrCreateEntityCache(RealmContext realmContext) {
+        return new EntityCache(metaStoreManager);
+      }
+
+      @Override
+      public Map<String, PrincipalSecretsResult> bootstrapRealms(
+          Iterable<String> realms, RootCredentialsSet rootCredentialsSet) {
+        throw new NotImplementedException("Bootstrapping realms is not 
supported");
+      }
+
+      @Override
+      public Map<String, BaseResult> purgeRealms(Iterable<String> realms) {
+        throw new NotImplementedException("Purging realms is not supported");
+      }
+    };
+  }
+
+  @Test
+  public void testCreatePolicyDoesNotThrow() {
+    icebergCatalog.createNamespace(NS1);
+    Assertions.assertThatCode(
+            () ->
+                policyCatalog.createPolicy(
+                    POLICY1,
+                    PredefinedPolicyTypes.DATA_COMPACTION.getName(),
+                    "test",
+                    "{\"enable\": false}"))
+        .doesNotThrowAnyException();
+  }
+
+  @Test
+  public void testCreatePolicyAlreadyExists() {
+    icebergCatalog.createNamespace(NS1);
+    policyCatalog.createPolicy(
+        POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", 
"{\"enable\": false}");
+
+    Assertions.assertThatThrownBy(
+            () ->
+                policyCatalog.createPolicy(
+                    POLICY1,
+                    PredefinedPolicyTypes.DATA_COMPACTION.getName(),
+                    "test",
+                    "{\"enable\": false}"))
+        .isInstanceOf(AlreadyExistsException.class);
+
+    Assertions.assertThatThrownBy(
+            () ->
+                policyCatalog.createPolicy(
+                    POLICY1,
+                    PredefinedPolicyTypes.SNAPSHOT_RETENTION.getName(),
+                    "test",
+                    "{\"enable\": false}"))
+        .isInstanceOf(AlreadyExistsException.class);
+  }
+
+  @Test
+  public void testListPolicies() {
+    icebergCatalog.createNamespace(NS1);
+    policyCatalog.createPolicy(
+        POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", 
"{\"enable\": false}");
+
+    policyCatalog.createPolicy(
+        POLICY2,
+        PredefinedPolicyTypes.METADATA_COMPACTION.getName(),
+        "test",
+        "{\"enable\": false}");
+
+    policyCatalog.createPolicy(
+        POLICY3, PredefinedPolicyTypes.SNAPSHOT_RETENTION.getName(), "test", 
"{\"enable\": false}");
+
+    policyCatalog.createPolicy(
+        POLICY4,
+        PredefinedPolicyTypes.ORPHAN_FILE_REMOVAL.getName(),
+        "test",
+        "{\"enable\": false}");
+
+    List<PolicyIdentifier> listResult = policyCatalog.listPolicies(NS1, null);
+    Assertions.assertThat(listResult).hasSize(4);
+    Assertions.assertThat(listResult).containsExactlyInAnyOrder(POLICY1, 
POLICY2, POLICY3, POLICY4);
+  }
+
+  @Test
+  public void testListPoliciesFilterByPolicyType() {
+    icebergCatalog.createNamespace(NS1);
+    policyCatalog.createPolicy(
+        POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", 
"{\"enable\": false}");
+
+    policyCatalog.createPolicy(
+        POLICY2,
+        PredefinedPolicyTypes.METADATA_COMPACTION.getName(),
+        "test",
+        "{\"enable\": false}");
+
+    policyCatalog.createPolicy(
+        POLICY3, PredefinedPolicyTypes.SNAPSHOT_RETENTION.getName(), "test", 
"{\"enable\": false}");
+
+    policyCatalog.createPolicy(
+        POLICY4,
+        PredefinedPolicyTypes.ORPHAN_FILE_REMOVAL.getName(),
+        "test",
+        "{\"enable\": false}");
+
+    List<PolicyIdentifier> listResult =
+        policyCatalog.listPolicies(NS1, 
PredefinedPolicyTypes.ORPHAN_FILE_REMOVAL);
+    Assertions.assertThat(listResult).hasSize(1);
+    Assertions.assertThat(listResult).containsExactlyInAnyOrder(POLICY4);
+  }
+
+  @Test
+  public void testLoadPolicy() {
+    icebergCatalog.createNamespace(NS1);
+    policyCatalog.createPolicy(
+        POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", 
"{\"enable\": false}");
+
+    Policy policy = policyCatalog.loadPolicy(POLICY1);
+    Assertions.assertThat(policy.getVersion()).isEqualTo(0);
+    Assertions.assertThat(policy.getPolicyType())
+        .isEqualTo(PredefinedPolicyTypes.DATA_COMPACTION.getName());
+    Assertions.assertThat(policy.getContent()).isEqualTo("{\"enable\": 
false}");
+    Assertions.assertThat(policy.getName()).isEqualTo("p1");
+    Assertions.assertThat(policy.getDescription()).isEqualTo("test");
+  }
+
+  @Test
+  public void testCreatePolicyWithInvalidContent() {
+    icebergCatalog.createNamespace(NS1);
+
+    Assertions.assertThatThrownBy(
+            () ->
+                policyCatalog.createPolicy(
+                    POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), 
"test", "invalid"))
+        .isInstanceOf(InvalidPolicyException.class);
+  }
+
+  @Test
+  public void testLoadPolicyNotExist() {
+    icebergCatalog.createNamespace(NS1);
+
+    Assertions.assertThatThrownBy(() -> policyCatalog.loadPolicy(POLICY1))
+        .isInstanceOf(NoSuchPolicyException.class);
+  }
+
+  @Test
+  public void testUpdatePolicy() {
+    icebergCatalog.createNamespace(NS1);
+    policyCatalog.createPolicy(
+        POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", 
"{\"enable\": false}");
+
+    policyCatalog.updatePolicy(POLICY1, "updated", "{\"enable\": true}", 0);
+
+    Policy policy = policyCatalog.loadPolicy(POLICY1);
+    Assertions.assertThat(policy.getVersion()).isEqualTo(1);
+    Assertions.assertThat(policy.getPolicyType())
+        .isEqualTo(PredefinedPolicyTypes.DATA_COMPACTION.getName());
+    Assertions.assertThat(policy.getContent()).isEqualTo("{\"enable\": true}");
+    Assertions.assertThat(policy.getName()).isEqualTo("p1");
+    Assertions.assertThat(policy.getDescription()).isEqualTo("updated");
+  }
+
+  @Test
+  public void testUpdatePolicyWithWrongVersion() {
+    icebergCatalog.createNamespace(NS1);
+    policyCatalog.createPolicy(
+        POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", 
"{\"enable\": false}");
+
+    Assertions.assertThatThrownBy(
+            () -> policyCatalog.updatePolicy(POLICY1, "updated", "{\"enable\": 
true}", 1))
+        .isInstanceOf(PolicyVersionMismatchException.class);
+  }
+
+  @Test
+  public void testUpdatePolicyWithInvalidContent() {
+    icebergCatalog.createNamespace(NS1);
+    policyCatalog.createPolicy(
+        POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", 
"{\"enable\": false}");
+
+    Assertions.assertThatThrownBy(
+            () -> policyCatalog.updatePolicy(POLICY1, "updated", "invalid", 0))
+        .isInstanceOf(InvalidPolicyException.class);
+  }
+
+  @Test
+  public void testUpdatePolicyNotExist() {
+    icebergCatalog.createNamespace(NS1);
+
+    Assertions.assertThatThrownBy(
+            () -> policyCatalog.updatePolicy(POLICY1, "updated", "{\"enable\": 
true}", 0))
+        .isInstanceOf(NoSuchPolicyException.class);
+  }
+
+  @Test
+  public void testDropPolicy() {
+    icebergCatalog.createNamespace(NS1);
+    policyCatalog.createPolicy(
+        POLICY1, PredefinedPolicyTypes.DATA_COMPACTION.getName(), "test", 
"{\"enable\": false}");
+
+    Assertions.assertThat(policyCatalog.dropPolicy(POLICY1, false)).isTrue();
+    Assertions.assertThatThrownBy(() -> policyCatalog.loadPolicy(POLICY1))
+        .isInstanceOf(NoSuchPolicyException.class);
+  }
+
+  @Test
+  public void testDropPolicyNotExist() {
+    icebergCatalog.createNamespace(NS1);
+
+    Assertions.assertThatThrownBy(() -> policyCatalog.dropPolicy(POLICY1, 
false))
+        .isInstanceOf(NoSuchPolicyException.class);
+  }
+}
diff --git a/service/common/build.gradle.kts b/service/common/build.gradle.kts
index fec8f12c7..ca00893e0 100644
--- a/service/common/build.gradle.kts
+++ b/service/common/build.gradle.kts
@@ -102,6 +102,7 @@ dependencies {
   testFixturesImplementation(project(":polaris-api-management-model"))
   testFixturesImplementation(project(":polaris-api-management-service"))
   testFixturesImplementation(project(":polaris-api-iceberg-service"))
+  testFixturesImplementation(project(":polaris-api-catalog-service"))
 
   testFixturesImplementation(libs.jakarta.enterprise.cdi.api)
   testFixturesImplementation(libs.jakarta.annotation.api)
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
new file mode 100644
index 000000000..a4fb23da7
--- /dev/null
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/policy/PolicyCatalog.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.polaris.service.catalog.policy;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.exceptions.AlreadyExistsException;
+import org.apache.iceberg.exceptions.BadRequestException;
+import org.apache.polaris.core.context.CallContext;
+import org.apache.polaris.core.entity.CatalogEntity;
+import org.apache.polaris.core.entity.PolarisEntity;
+import org.apache.polaris.core.entity.PolarisEntitySubType;
+import org.apache.polaris.core.entity.PolarisEntityType;
+import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
+import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
+import org.apache.polaris.core.persistence.dao.entity.DropEntityResult;
+import org.apache.polaris.core.persistence.dao.entity.EntityResult;
+import 
org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView;
+import org.apache.polaris.core.policy.PolicyEntity;
+import org.apache.polaris.core.policy.PolicyType;
+import org.apache.polaris.core.policy.exceptions.NoSuchPolicyException;
+import 
org.apache.polaris.core.policy.exceptions.PolicyVersionMismatchException;
+import org.apache.polaris.core.policy.validator.PolicyValidators;
+import org.apache.polaris.service.types.Policy;
+import org.apache.polaris.service.types.PolicyIdentifier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PolicyCatalog {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(PolicyCatalog.class);
+
+  private final CallContext callContext;
+  private final PolarisResolutionManifestCatalogView resolvedEntityView;
+  private final CatalogEntity catalogEntity;
+  private long catalogId = -1;
+  private PolarisMetaStoreManager metaStoreManager;
+
+  public PolicyCatalog(
+      PolarisMetaStoreManager metaStoreManager,
+      CallContext callContext,
+      PolarisResolutionManifestCatalogView resolvedEntityView) {
+    this.callContext = callContext;
+    this.resolvedEntityView = resolvedEntityView;
+    this.catalogEntity =
+        
CatalogEntity.of(resolvedEntityView.getResolvedReferenceCatalogEntity().getRawLeafEntity());
+    this.catalogId = catalogEntity.getId();
+    this.metaStoreManager = metaStoreManager;
+  }
+
+  public Policy createPolicy(
+      PolicyIdentifier policyIdentifier, String type, String description, 
String content) {
+    PolarisResolvedPathWrapper resolvedParent =
+        resolvedEntityView.getResolvedPath(policyIdentifier.getNamespace());
+    if (resolvedParent == null) {
+      // Illegal state because the namespace should've already been in the 
static resolution set.
+      throw new IllegalStateException(
+          String.format("Failed to fetch resolved parent for Policy '%s'", 
policyIdentifier));
+    }
+
+    List<PolarisEntity> catalogPath = resolvedParent.getRawFullPath();
+
+    PolarisResolvedPathWrapper resolvedPolicyEntities =
+        resolvedEntityView.getPassthroughResolvedPath(
+            policyIdentifier, PolarisEntityType.POLICY, 
PolarisEntitySubType.NULL_SUBTYPE);
+
+    PolicyEntity entity =
+        PolicyEntity.of(
+            resolvedPolicyEntities == null ? null : 
resolvedPolicyEntities.getRawLeafEntity());
+
+    if (entity != null) {
+      throw new AlreadyExistsException("Policy already exists %s", 
policyIdentifier);
+    }
+
+    PolicyType policyType = PolicyType.fromName(type);
+    if (policyType == null) {
+      throw new BadRequestException("Unknown policy type: %s", type);
+    }
+
+    entity =
+        new PolicyEntity.Builder(
+                policyIdentifier.getNamespace(), policyIdentifier.getName(), 
policyType)
+            .setCatalogId(catalogId)
+            .setParentId(resolvedParent.getRawLeafEntity().getId())
+            .setDescription(description)
+            .setContent(content)
+            .setId(
+                
metaStoreManager.generateNewEntityId(callContext.getPolarisCallContext()).getId())
+            .setCreateTimestamp(System.currentTimeMillis())
+            .build();
+
+    PolicyValidators.validate(entity);
+
+    EntityResult res =
+        metaStoreManager.createEntityIfNotExists(
+            callContext.getPolarisCallContext(), 
PolarisEntity.toCoreList(catalogPath), entity);
+
+    if (!res.isSuccess()) {
+
+      switch (res.getReturnStatus()) {
+        case ENTITY_ALREADY_EXISTS:
+          throw new AlreadyExistsException("Policy already exists %s", 
policyIdentifier);
+
+        default:
+          throw new IllegalStateException(
+              String.format(
+                  "Unknown error status for identifier %s: %s with extraInfo: 
%s",
+                  policyIdentifier, res.getReturnStatus(), 
res.getExtraInformation()));
+      }
+    }
+
+    PolicyEntity resultEntity = PolicyEntity.of(res.getEntity());
+    LOGGER.debug(
+        "Created Policy entity {} with PolicyIdentifier {}", resultEntity, 
policyIdentifier);
+    return constructPolicy(resultEntity);
+  }
+
+  public List<PolicyIdentifier> listPolicies(Namespace namespace, PolicyType 
policyType) {
+    PolarisResolvedPathWrapper resolvedEntities = 
resolvedEntityView.getResolvedPath(namespace);
+    if (resolvedEntities == null) {
+      throw new IllegalStateException(
+          String.format("Failed to fetch resolved namespace '%s'", namespace));
+    }
+
+    List<PolarisEntity> catalogPath = resolvedEntities.getRawFullPath();
+    List<PolicyEntity> policyEntities =
+        metaStoreManager
+            .listEntities(
+                callContext.getPolarisCallContext(),
+                PolarisEntity.toCoreList(catalogPath),
+                PolarisEntityType.POLICY,
+                PolarisEntitySubType.NULL_SUBTYPE)
+            .getEntities()
+            .stream()
+            .map(
+                polarisEntityActiveRecord ->
+                    PolicyEntity.of(
+                        metaStoreManager
+                            .loadEntity(
+                                callContext.getPolarisCallContext(),
+                                polarisEntityActiveRecord.getCatalogId(),
+                                polarisEntityActiveRecord.getId(),
+                                polarisEntityActiveRecord.getType())
+                            .getEntity()))
+            .filter(
+                policyEntity -> policyType == null || 
policyEntity.getPolicyType() == policyType)
+            .toList();
+
+    List<PolarisEntity.NameAndId> entities =
+        policyEntities.stream().map(PolarisEntity::nameAndId).toList();
+
+    return entities.stream()
+        .map(
+            entity ->
+                PolicyIdentifier.builder()
+                    .setNamespace(namespace)
+                    .setName(entity.getName())
+                    .build())
+        .toList();
+  }
+
+  public Policy loadPolicy(PolicyIdentifier policyIdentifier) {
+    PolarisResolvedPathWrapper resolvedEntities =
+        resolvedEntityView.getPassthroughResolvedPath(
+            policyIdentifier, PolarisEntityType.POLICY, 
PolarisEntitySubType.NULL_SUBTYPE);
+
+    PolicyEntity policy =
+        PolicyEntity.of(resolvedEntities == null ? null : 
resolvedEntities.getRawLeafEntity());
+
+    if (policy == null) {
+      throw new NoSuchPolicyException(String.format("Policy does not exist: 
%s", policyIdentifier));
+    }
+    return constructPolicy(policy);
+  }
+
+  public Policy updatePolicy(
+      PolicyIdentifier policyIdentifier,
+      String newDescription,
+      String newContent,
+      int currentPolicyVersion) {
+    PolarisResolvedPathWrapper resolvedEntities =
+        resolvedEntityView.getPassthroughResolvedPath(
+            policyIdentifier, PolarisEntityType.POLICY, 
PolarisEntitySubType.NULL_SUBTYPE);
+
+    PolicyEntity policy =
+        PolicyEntity.of(resolvedEntities == null ? null : 
resolvedEntities.getRawLeafEntity());
+
+    if (policy == null) {
+      throw new NoSuchPolicyException(String.format("Policy does not exist: 
%s", policyIdentifier));
+    }
+
+    // Verify that the current version of the policy matches the version that 
the user is trying to
+    // update
+    int policyVersion = policy.getPolicyVersion();
+    if (currentPolicyVersion != policyVersion) {
+      throw new PolicyVersionMismatchException(
+          String.format(
+              "Policy version mismatch. Given version is %d, current version 
is %d",
+              currentPolicyVersion, policyVersion));
+    }
+
+    if (newDescription.equals(policy.getDescription()) && 
newContent.equals(policy.getContent())) {
+      // No need to update the policy if the new description and content are 
the same as the current
+      return constructPolicy(policy);
+    }
+
+    PolicyEntity.Builder newPolicyBuilder = new PolicyEntity.Builder(policy);
+    newPolicyBuilder.setContent(newContent);
+    newPolicyBuilder.setDescription(newDescription);
+    newPolicyBuilder.setPolicyVersion(policyVersion + 1);
+    PolicyEntity newPolicyEntity = newPolicyBuilder.build();
+
+    PolicyValidators.validate(newPolicyEntity);
+
+    List<PolarisEntity> catalogPath = resolvedEntities.getRawParentPath();
+    newPolicyEntity =
+        Optional.ofNullable(
+                metaStoreManager
+                    .updateEntityPropertiesIfNotChanged(
+                        callContext.getPolarisCallContext(),
+                        PolarisEntity.toCoreList(catalogPath),
+                        newPolicyEntity)
+                    .getEntity())
+            .map(PolicyEntity::of)
+            .orElse(null);
+
+    if (newPolicyEntity == null) {
+      throw new IllegalStateException(
+          String.format("Failed to update policy %s", policyIdentifier));
+    }
+
+    return constructPolicy(newPolicyEntity);
+  }
+
+  public boolean dropPolicy(PolicyIdentifier policyIdentifier, boolean 
detachAll) {
+    // TODO: Implement detachAll when we support attach/detach policy
+    PolarisResolvedPathWrapper resolvedEntities =
+        resolvedEntityView.getPassthroughResolvedPath(
+            policyIdentifier, PolarisEntityType.POLICY, 
PolarisEntitySubType.NULL_SUBTYPE);
+    if (resolvedEntities == null) {
+      throw new NoSuchPolicyException(String.format("Policy does not exist: 
%s", policyIdentifier));
+    }
+
+    List<PolarisEntity> catalogPath = resolvedEntities.getRawParentPath();
+    PolarisEntity leafEntity = resolvedEntities.getRawLeafEntity();
+
+    DropEntityResult dropEntityResult =
+        metaStoreManager.dropEntityIfExists(
+            callContext.getPolarisCallContext(),
+            PolarisEntity.toCoreList(catalogPath),
+            leafEntity,
+            Map.of(),
+            false);
+
+    return dropEntityResult.isSuccess();
+  }
+
+  private static Policy constructPolicy(PolicyEntity policyEntity) {
+    return Policy.builder()
+        .setPolicyType(policyEntity.getPolicyType().getName())
+        .setInheritable(policyEntity.getPolicyType().isInheritable())
+        .setName(policyEntity.getName())
+        .setDescription(policyEntity.getDescription())
+        .setContent(policyEntity.getContent())
+        .setVersion(policyEntity.getPolicyVersion())
+        .build();
+  }
+}
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/exception/PolarisExceptionMapper.java
 
b/service/common/src/main/java/org/apache/polaris/service/exception/PolarisExceptionMapper.java
index 0636f5b97..22b9e187d 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/exception/PolarisExceptionMapper.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/exception/PolarisExceptionMapper.java
@@ -25,6 +25,8 @@ import jakarta.ws.rs.ext.Provider;
 import org.apache.iceberg.rest.responses.ErrorResponse;
 import org.apache.polaris.core.exceptions.AlreadyExistsException;
 import org.apache.polaris.core.exceptions.PolarisException;
+import org.apache.polaris.core.policy.exceptions.NoSuchPolicyException;
+import 
org.apache.polaris.core.policy.exceptions.PolicyVersionMismatchException;
 import org.apache.polaris.core.policy.validator.InvalidPolicyException;
 import org.apache.polaris.service.context.UnresolvableRealmContextException;
 import org.slf4j.Logger;
@@ -47,6 +49,10 @@ public class PolarisExceptionMapper implements 
ExceptionMapper<PolarisException>
       return Response.Status.NOT_FOUND;
     } else if (exception instanceof InvalidPolicyException) {
       return Response.Status.BAD_REQUEST;
+    } else if (exception instanceof NoSuchPolicyException) {
+      return Response.Status.NOT_FOUND;
+    } else if (exception instanceof PolicyVersionMismatchException) {
+      return Response.Status.CONFLICT;
     } else {
       return Response.Status.INTERNAL_SERVER_ERROR;
     }
diff --git 
a/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/PolarisPassthroughResolutionView.java
 
b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/PolarisPassthroughResolutionView.java
index 8c98e910b..8919ef5f0 100644
--- 
a/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/PolarisPassthroughResolutionView.java
+++ 
b/service/common/src/testFixtures/java/org/apache/polaris/service/catalog/PolarisPassthroughResolutionView.java
@@ -31,6 +31,7 @@ import 
org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
 import org.apache.polaris.core.persistence.resolver.PolarisResolutionManifest;
 import 
org.apache.polaris.core.persistence.resolver.PolarisResolutionManifestCatalogView;
 import org.apache.polaris.core.persistence.resolver.ResolverPath;
+import org.apache.polaris.service.types.PolicyIdentifier;
 
 /**
  * For test purposes or for elevated-privilege scenarios where entity 
resolution is allowed to
@@ -94,6 +95,15 @@ public class PolarisPassthroughResolutionView implements 
PolarisResolutionManife
           identifier);
       manifest.resolveAll();
       return manifest.getResolvedPath(identifier, entityType, subType);
+    } else if (key instanceof PolicyIdentifier policyIdentifier) {
+      manifest.addPath(
+          new ResolverPath(
+              PolarisCatalogHelpers.identifierToList(
+                  policyIdentifier.getNamespace(), policyIdentifier.getName()),
+              entityType),
+          policyIdentifier);
+      manifest.resolveAll();
+      return manifest.getResolvedPath(policyIdentifier, entityType, subType);
     } else {
       throw new IllegalStateException(
           String.format(
@@ -130,6 +140,14 @@ public class PolarisPassthroughResolutionView implements 
PolarisResolutionManife
           new 
ResolverPath(PolarisCatalogHelpers.tableIdentifierToList(identifier), 
entityType),
           identifier);
       return manifest.getPassthroughResolvedPath(identifier, entityType, 
subType);
+    } else if (key instanceof PolicyIdentifier policyIdentifier) {
+      manifest.addPassthroughPath(
+          new ResolverPath(
+              PolarisCatalogHelpers.identifierToList(
+                  policyIdentifier.getNamespace(), policyIdentifier.getName()),
+              entityType),
+          policyIdentifier);
+      return manifest.getPassthroughResolvedPath(policyIdentifier, entityType, 
subType);
     } else {
       throw new IllegalStateException(
           String.format(

Reply via email to