This is an automated email from the ASF dual-hosted git repository.

emaynard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git


The following commit(s) were added to refs/heads/main by this push:
     new fdb59b0a2 Support snapshots=refs (#1405)
fdb59b0a2 is described below

commit fdb59b0a2a8e2ec18f61d85ca12a44a910129100
Author: Eric Maynard <[email protected]>
AuthorDate: Tue Apr 22 09:46:44 2025 -0700

    Support snapshots=refs (#1405)
    
    * initial commit
    
    * autolint
    
    * small revert
    
    * rebase
    
    * autolint
    
    * simpler
    
    * autolint
    
    * tests
    
    * autolint
    
    * stable
    
    * fix leak
    
    * ready for review
    
    * improved test
    
    * autolint
    
    * logic flip again
    
    * Update 
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
    
    Co-authored-by: Alexandre Dutra <[email protected]>
    
    * Update 
integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java
    
    Co-authored-by: Alexandre Dutra <[email protected]>
    
    * adjustments for committed suggestions
    
    * autolint
    
    ---------
    
    Co-authored-by: Alexandre Dutra <[email protected]>
---
 .../apache/polaris/service/it/env/CatalogApi.java  | 22 ++++++++
 .../it/test/PolarisRestCatalogIntegrationTest.java | 63 +++++++++++++++++++++-
 .../catalog/iceberg/IcebergCatalogHandler.java     | 43 +++++++++++++--
 3 files changed, 121 insertions(+), 7 deletions(-)

diff --git 
a/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java
 
b/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java
index 8312e488b..7be67f194 100644
--- 
a/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java
+++ 
b/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java
@@ -34,10 +34,14 @@ import java.util.List;
 import java.util.Map;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.RESTException;
+import org.apache.iceberg.rest.ErrorHandler;
+import org.apache.iceberg.rest.ErrorHandlers;
 import org.apache.iceberg.rest.RESTUtil;
 import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
 import org.apache.iceberg.rest.responses.ListNamespacesResponse;
 import org.apache.iceberg.rest.responses.ListTablesResponse;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
 import org.apache.iceberg.rest.responses.OAuthTokenResponse;
 
 /**
@@ -149,6 +153,24 @@ public class CatalogApi extends RestApi {
     }
   }
 
+  public LoadTableResponse loadTable(String catalog, TableIdentifier id, 
String snapshots) {
+    String ns = RESTUtil.encodeNamespace(id.namespace());
+    try (Response res =
+        request(
+                "v1/{cat}/namespaces/" + ns + "/tables/{table}",
+                Map.of("cat", catalog, "table", id.name()),
+                snapshots == null ? Map.of() : Map.of("snapshots", snapshots))
+            .get()) {
+      if (res.getStatus() == Response.Status.OK.getStatusCode()) {
+        return res.readEntity(LoadTableResponse.class);
+      }
+      throw new RESTException(
+          "Unhandled error: %s",
+          ((ErrorHandler) ErrorHandlers.defaultErrorHandler())
+              .parseResponse(res.getStatus(), res.readEntity(String.class)));
+    }
+  }
+
   public List<TableIdentifier> listViews(String catalog, Namespace namespace) {
     String ns = RESTUtil.encodeNamespace(namespace);
     try (Response res =
diff --git 
a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java
 
b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java
index 33bc77954..7b02a9baa 100644
--- 
a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java
+++ 
b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java
@@ -19,8 +19,10 @@
 package org.apache.polaris.service.it.test;
 
 import static jakarta.ws.rs.core.Response.Status.NOT_FOUND;
+import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
 import static org.apache.polaris.service.it.env.PolarisClient.polarisClient;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import com.google.common.collect.ImmutableMap;
@@ -58,6 +60,7 @@ import org.apache.iceberg.catalog.TableCommit;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.CommitFailedException;
 import org.apache.iceberg.exceptions.ForbiddenException;
+import org.apache.iceberg.exceptions.RESTException;
 import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.io.ResolvingFileIO;
 import org.apache.iceberg.rest.RESTCatalog;
@@ -1267,8 +1270,7 @@ public class PolarisRestCatalogIntegrationTest extends 
CatalogTests<RESTCatalog>
 
     genericTableApi.dropGenericTable(currentCatalogName, tableIdentifier);
 
-    Assertions.assertThatCode(
-            () -> genericTableApi.getGenericTable(currentCatalogName, 
tableIdentifier))
+    assertThatCode(() -> genericTableApi.getGenericTable(currentCatalogName, 
tableIdentifier))
         .isInstanceOf(ProcessingException.class);
 
     genericTableApi.purge(currentCatalogName, namespace);
@@ -1343,4 +1345,61 @@ public class PolarisRestCatalogIntegrationTest extends 
CatalogTests<RESTCatalog>
 
     genericTableApi.purge(currentCatalogName, namespace);
   }
+
+  @Test
+  public void testLoadTableWithSnapshots() {
+    Namespace namespace = Namespace.of("ns1");
+    restCatalog.createNamespace(namespace);
+    TableIdentifier tableIdentifier = TableIdentifier.of(namespace, "tbl1");
+    restCatalog.createTable(tableIdentifier, SCHEMA);
+
+    assertThatCode(() -> catalogApi.loadTable(currentCatalogName, 
tableIdentifier, "ALL"))
+        .doesNotThrowAnyException();
+    assertThatCode(() -> catalogApi.loadTable(currentCatalogName, 
tableIdentifier, "all"))
+        .doesNotThrowAnyException();
+    assertThatCode(() -> catalogApi.loadTable(currentCatalogName, 
tableIdentifier, "refs"))
+        .doesNotThrowAnyException();
+    assertThatCode(() -> catalogApi.loadTable(currentCatalogName, 
tableIdentifier, "REFS"))
+        .doesNotThrowAnyException();
+    assertThatCode(() -> catalogApi.loadTable(currentCatalogName, 
tableIdentifier, "not-real"))
+        .isInstanceOf(RESTException.class)
+        .hasMessageContaining("Unrecognized snapshots")
+        .hasMessageContaining("code=" + BAD_REQUEST.getStatusCode());
+
+    catalogApi.purge(currentCatalogName, namespace);
+  }
+
+  @Test
+  public void testLoadTableWithRefFiltering() {
+    Namespace namespace = Namespace.of("ns1");
+    restCatalog.createNamespace(namespace);
+    TableIdentifier tableIdentifier = TableIdentifier.of(namespace, "tbl1");
+
+    restCatalog.createTable(tableIdentifier, SCHEMA);
+
+    Table table = restCatalog.loadTable(tableIdentifier);
+
+    // Create an orphaned snapshot:
+    table.newAppend().appendFile(FILE_A).commit();
+    long snapshotIdA = table.currentSnapshot().snapshotId();
+    table.newAppend().appendFile(FILE_B).commit();
+    table.manageSnapshots().setCurrentSnapshot(snapshotIdA).commit();
+
+    var allSnapshots =
+        catalogApi
+            .loadTable(currentCatalogName, tableIdentifier, "ALL")
+            .tableMetadata()
+            .snapshots();
+    assertThat(allSnapshots).hasSize(2);
+
+    var refsSnapshots =
+        catalogApi
+            .loadTable(currentCatalogName, tableIdentifier, "REFS")
+            .tableMetadata()
+            .snapshots();
+    assertThat(refsSnapshots).hasSize(1);
+    assertThat(refsSnapshots.getFirst().snapshotId()).isEqualTo(snapshotIdA);
+
+    catalogApi.purge(currentCatalogName, namespace);
+  }
 }
diff --git 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
index c4105ddba..2785e004c 100644
--- 
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
+++ 
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
@@ -20,6 +20,7 @@ package org.apache.polaris.service.catalog.iceberg;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.Maps;
+import jakarta.annotation.Nonnull;
 import jakarta.ws.rs.core.SecurityContext;
 import java.io.Closeable;
 import java.time.OffsetDateTime;
@@ -36,6 +37,7 @@ import org.apache.iceberg.BaseMetadataTable;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.MetadataUpdate;
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.SortOrder;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableMetadata;
@@ -127,6 +129,9 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
   protected SupportsNamespaces namespaceCatalog = null;
   protected ViewCatalog viewCatalog = null;
 
+  public static final String SNAPSHOTS_ALL = "all";
+  public static final String SNAPSHOTS_REFS = "refs";
+
   public IcebergCatalogHandler(
       CallContext callContext,
       PolarisEntityManager entityManager,
@@ -380,7 +385,8 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
               Set.of(
                   PolarisStorageActions.READ,
                   PolarisStorageActions.WRITE,
-                  PolarisStorageActions.LIST))
+                  PolarisStorageActions.LIST),
+              SNAPSHOTS_ALL)
           .build();
     } else if (table instanceof BaseMetadataTable) {
       // metadata tables are loaded on the client side, return 
NoSuchTableException for now
@@ -471,7 +477,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
     TableMetadata metadata = stageTableCreateHelper(namespace, request);
 
     return buildLoadTableResponseWithDelegationCredentials(
-            ident, metadata, Set.of(PolarisStorageActions.ALL))
+            ident, metadata, Set.of(PolarisStorageActions.ALL), SNAPSHOTS_ALL)
         .build();
   }
 
@@ -580,7 +586,8 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
       }
     }
 
-    return Optional.of(CatalogHandlers.loadTable(baseCatalog, 
tableIdentifier));
+    LoadTableResponse rawResponse = CatalogHandlers.loadTable(baseCatalog, 
tableIdentifier);
+    return Optional.of(filterResponseToSnapshots(rawResponse, snapshots));
   }
 
   public LoadTableResponse loadTableWithAccessDelegation(
@@ -679,7 +686,7 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
       TableMetadata tableMetadata = baseTable.operations().current();
       return Optional.of(
           buildLoadTableResponseWithDelegationCredentials(
-                  tableIdentifier, tableMetadata, actionsRequested)
+                  tableIdentifier, tableMetadata, actionsRequested, snapshots)
               .build());
     } else if (table instanceof BaseMetadataTable) {
       // metadata tables are loaded on the client side, return 
NoSuchTableException for now
@@ -692,7 +699,8 @@ public class IcebergCatalogHandler extends CatalogHandler 
implements AutoCloseab
   private LoadTableResponse.Builder 
buildLoadTableResponseWithDelegationCredentials(
       TableIdentifier tableIdentifier,
       TableMetadata tableMetadata,
-      Set<PolarisStorageActions> actions) {
+      Set<PolarisStorageActions> actions,
+      String snapshots) {
     LoadTableResponse.Builder responseBuilder =
         LoadTableResponse.builder().withTableMetadata(tableMetadata);
     if (baseCatalog instanceof SupportsCredentialDelegation 
credentialDelegation) {
@@ -1010,6 +1018,31 @@ public class IcebergCatalogHandler extends 
CatalogHandler implements AutoCloseab
     CatalogHandlers.renameView(viewCatalog, request);
   }
 
+  private @Nonnull LoadTableResponse filterResponseToSnapshots(
+      LoadTableResponse loadTableResponse, String snapshots) {
+    if (snapshots == null || snapshots.equalsIgnoreCase(SNAPSHOTS_ALL)) {
+      return loadTableResponse;
+    } else if (snapshots.equalsIgnoreCase(SNAPSHOTS_REFS)) {
+      TableMetadata metadata = loadTableResponse.tableMetadata();
+
+      Set<Long> referencedSnapshotIds =
+          metadata.refs().values().stream()
+              .map(SnapshotRef::snapshotId)
+              .collect(Collectors.toSet());
+
+      TableMetadata filteredMetadata =
+          metadata.removeSnapshotsIf(s -> 
!referencedSnapshotIds.contains(s.snapshotId()));
+
+      return LoadTableResponse.builder()
+          .withTableMetadata(filteredMetadata)
+          .addAllConfig(loadTableResponse.config())
+          .addAllCredentials(loadTableResponse.credentials())
+          .build();
+    } else {
+      throw new IllegalArgumentException("Unrecognized snapshots: " + 
snapshots);
+    }
+  }
+
   @Override
   public void close() throws Exception {
     if (baseCatalog instanceof Closeable closeable) {

Reply via email to