This is an automated email from the ASF dual-hosted git repository.
emaynard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/polaris.git
The following commit(s) were added to refs/heads/main by this push:
new fdb59b0a2 Support snapshots=refs (#1405)
fdb59b0a2 is described below
commit fdb59b0a2a8e2ec18f61d85ca12a44a910129100
Author: Eric Maynard <[email protected]>
AuthorDate: Tue Apr 22 09:46:44 2025 -0700
Support snapshots=refs (#1405)
* initial commit
* autolint
* small revert
* rebase
* autolint
* simpler
* autolint
* tests
* autolint
* stable
* fix leak
* ready for review
* improved test
* autolint
* logic flip again
* Update
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
Co-authored-by: Alexandre Dutra <[email protected]>
* Update
integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java
Co-authored-by: Alexandre Dutra <[email protected]>
* adjustments for committed suggestions
* autolint
---------
Co-authored-by: Alexandre Dutra <[email protected]>
---
.../apache/polaris/service/it/env/CatalogApi.java | 22 ++++++++
.../it/test/PolarisRestCatalogIntegrationTest.java | 63 +++++++++++++++++++++-
.../catalog/iceberg/IcebergCatalogHandler.java | 43 +++++++++++++--
3 files changed, 121 insertions(+), 7 deletions(-)
diff --git
a/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java
b/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java
index 8312e488b..7be67f194 100644
---
a/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java
+++
b/integration-tests/src/main/java/org/apache/polaris/service/it/env/CatalogApi.java
@@ -34,10 +34,14 @@ import java.util.List;
import java.util.Map;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.exceptions.RESTException;
+import org.apache.iceberg.rest.ErrorHandler;
+import org.apache.iceberg.rest.ErrorHandlers;
import org.apache.iceberg.rest.RESTUtil;
import org.apache.iceberg.rest.requests.CreateNamespaceRequest;
import org.apache.iceberg.rest.responses.ListNamespacesResponse;
import org.apache.iceberg.rest.responses.ListTablesResponse;
+import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.rest.responses.OAuthTokenResponse;
/**
@@ -149,6 +153,24 @@ public class CatalogApi extends RestApi {
}
}
+ public LoadTableResponse loadTable(String catalog, TableIdentifier id,
String snapshots) {
+ String ns = RESTUtil.encodeNamespace(id.namespace());
+ try (Response res =
+ request(
+ "v1/{cat}/namespaces/" + ns + "/tables/{table}",
+ Map.of("cat", catalog, "table", id.name()),
+ snapshots == null ? Map.of() : Map.of("snapshots", snapshots))
+ .get()) {
+ if (res.getStatus() == Response.Status.OK.getStatusCode()) {
+ return res.readEntity(LoadTableResponse.class);
+ }
+ throw new RESTException(
+ "Unhandled error: %s",
+ ((ErrorHandler) ErrorHandlers.defaultErrorHandler())
+ .parseResponse(res.getStatus(), res.readEntity(String.class)));
+ }
+ }
+
public List<TableIdentifier> listViews(String catalog, Namespace namespace) {
String ns = RESTUtil.encodeNamespace(namespace);
try (Response res =
diff --git
a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java
b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java
index 33bc77954..7b02a9baa 100644
---
a/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java
+++
b/integration-tests/src/main/java/org/apache/polaris/service/it/test/PolarisRestCatalogIntegrationTest.java
@@ -19,8 +19,10 @@
package org.apache.polaris.service.it.test;
import static jakarta.ws.rs.core.Response.Status.NOT_FOUND;
+import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static org.apache.polaris.service.it.env.PolarisClient.polarisClient;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import com.google.common.collect.ImmutableMap;
@@ -58,6 +60,7 @@ import org.apache.iceberg.catalog.TableCommit;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.exceptions.ForbiddenException;
+import org.apache.iceberg.exceptions.RESTException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.ResolvingFileIO;
import org.apache.iceberg.rest.RESTCatalog;
@@ -1267,8 +1270,7 @@ public class PolarisRestCatalogIntegrationTest extends
CatalogTests<RESTCatalog>
genericTableApi.dropGenericTable(currentCatalogName, tableIdentifier);
- Assertions.assertThatCode(
- () -> genericTableApi.getGenericTable(currentCatalogName,
tableIdentifier))
+ assertThatCode(() -> genericTableApi.getGenericTable(currentCatalogName,
tableIdentifier))
.isInstanceOf(ProcessingException.class);
genericTableApi.purge(currentCatalogName, namespace);
@@ -1343,4 +1345,61 @@ public class PolarisRestCatalogIntegrationTest extends
CatalogTests<RESTCatalog>
genericTableApi.purge(currentCatalogName, namespace);
}
+
+ @Test
+ public void testLoadTableWithSnapshots() {
+ Namespace namespace = Namespace.of("ns1");
+ restCatalog.createNamespace(namespace);
+ TableIdentifier tableIdentifier = TableIdentifier.of(namespace, "tbl1");
+ restCatalog.createTable(tableIdentifier, SCHEMA);
+
+ assertThatCode(() -> catalogApi.loadTable(currentCatalogName,
tableIdentifier, "ALL"))
+ .doesNotThrowAnyException();
+ assertThatCode(() -> catalogApi.loadTable(currentCatalogName,
tableIdentifier, "all"))
+ .doesNotThrowAnyException();
+ assertThatCode(() -> catalogApi.loadTable(currentCatalogName,
tableIdentifier, "refs"))
+ .doesNotThrowAnyException();
+ assertThatCode(() -> catalogApi.loadTable(currentCatalogName,
tableIdentifier, "REFS"))
+ .doesNotThrowAnyException();
+ assertThatCode(() -> catalogApi.loadTable(currentCatalogName,
tableIdentifier, "not-real"))
+ .isInstanceOf(RESTException.class)
+ .hasMessageContaining("Unrecognized snapshots")
+ .hasMessageContaining("code=" + BAD_REQUEST.getStatusCode());
+
+ catalogApi.purge(currentCatalogName, namespace);
+ }
+
+ @Test
+ public void testLoadTableWithRefFiltering() {
+ Namespace namespace = Namespace.of("ns1");
+ restCatalog.createNamespace(namespace);
+ TableIdentifier tableIdentifier = TableIdentifier.of(namespace, "tbl1");
+
+ restCatalog.createTable(tableIdentifier, SCHEMA);
+
+ Table table = restCatalog.loadTable(tableIdentifier);
+
+ // Create an orphaned snapshot:
+ table.newAppend().appendFile(FILE_A).commit();
+ long snapshotIdA = table.currentSnapshot().snapshotId();
+ table.newAppend().appendFile(FILE_B).commit();
+ table.manageSnapshots().setCurrentSnapshot(snapshotIdA).commit();
+
+ var allSnapshots =
+ catalogApi
+ .loadTable(currentCatalogName, tableIdentifier, "ALL")
+ .tableMetadata()
+ .snapshots();
+ assertThat(allSnapshots).hasSize(2);
+
+ var refsSnapshots =
+ catalogApi
+ .loadTable(currentCatalogName, tableIdentifier, "REFS")
+ .tableMetadata()
+ .snapshots();
+ assertThat(refsSnapshots).hasSize(1);
+ assertThat(refsSnapshots.getFirst().snapshotId()).isEqualTo(snapshotIdA);
+
+ catalogApi.purge(currentCatalogName, namespace);
+ }
}
diff --git
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
index c4105ddba..2785e004c 100644
---
a/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
+++
b/service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java
@@ -20,6 +20,7 @@ package org.apache.polaris.service.catalog.iceberg;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
+import jakarta.annotation.Nonnull;
import jakarta.ws.rs.core.SecurityContext;
import java.io.Closeable;
import java.time.OffsetDateTime;
@@ -36,6 +37,7 @@ import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.MetadataUpdate;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
@@ -127,6 +129,9 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
protected SupportsNamespaces namespaceCatalog = null;
protected ViewCatalog viewCatalog = null;
+ public static final String SNAPSHOTS_ALL = "all";
+ public static final String SNAPSHOTS_REFS = "refs";
+
public IcebergCatalogHandler(
CallContext callContext,
PolarisEntityManager entityManager,
@@ -380,7 +385,8 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
Set.of(
PolarisStorageActions.READ,
PolarisStorageActions.WRITE,
- PolarisStorageActions.LIST))
+ PolarisStorageActions.LIST),
+ SNAPSHOTS_ALL)
.build();
} else if (table instanceof BaseMetadataTable) {
// metadata tables are loaded on the client side, return
NoSuchTableException for now
@@ -471,7 +477,7 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
TableMetadata metadata = stageTableCreateHelper(namespace, request);
return buildLoadTableResponseWithDelegationCredentials(
- ident, metadata, Set.of(PolarisStorageActions.ALL))
+ ident, metadata, Set.of(PolarisStorageActions.ALL), SNAPSHOTS_ALL)
.build();
}
@@ -580,7 +586,8 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
}
}
- return Optional.of(CatalogHandlers.loadTable(baseCatalog,
tableIdentifier));
+ LoadTableResponse rawResponse = CatalogHandlers.loadTable(baseCatalog,
tableIdentifier);
+ return Optional.of(filterResponseToSnapshots(rawResponse, snapshots));
}
public LoadTableResponse loadTableWithAccessDelegation(
@@ -679,7 +686,7 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
TableMetadata tableMetadata = baseTable.operations().current();
return Optional.of(
buildLoadTableResponseWithDelegationCredentials(
- tableIdentifier, tableMetadata, actionsRequested)
+ tableIdentifier, tableMetadata, actionsRequested, snapshots)
.build());
} else if (table instanceof BaseMetadataTable) {
// metadata tables are loaded on the client side, return
NoSuchTableException for now
@@ -692,7 +699,8 @@ public class IcebergCatalogHandler extends CatalogHandler
implements AutoCloseab
private LoadTableResponse.Builder
buildLoadTableResponseWithDelegationCredentials(
TableIdentifier tableIdentifier,
TableMetadata tableMetadata,
- Set<PolarisStorageActions> actions) {
+ Set<PolarisStorageActions> actions,
+ String snapshots) {
LoadTableResponse.Builder responseBuilder =
LoadTableResponse.builder().withTableMetadata(tableMetadata);
if (baseCatalog instanceof SupportsCredentialDelegation
credentialDelegation) {
@@ -1010,6 +1018,31 @@ public class IcebergCatalogHandler extends
CatalogHandler implements AutoCloseab
CatalogHandlers.renameView(viewCatalog, request);
}
+ private @Nonnull LoadTableResponse filterResponseToSnapshots(
+ LoadTableResponse loadTableResponse, String snapshots) {
+ if (snapshots == null || snapshots.equalsIgnoreCase(SNAPSHOTS_ALL)) {
+ return loadTableResponse;
+ } else if (snapshots.equalsIgnoreCase(SNAPSHOTS_REFS)) {
+ TableMetadata metadata = loadTableResponse.tableMetadata();
+
+ Set<Long> referencedSnapshotIds =
+ metadata.refs().values().stream()
+ .map(SnapshotRef::snapshotId)
+ .collect(Collectors.toSet());
+
+ TableMetadata filteredMetadata =
+ metadata.removeSnapshotsIf(s ->
!referencedSnapshotIds.contains(s.snapshotId()));
+
+ return LoadTableResponse.builder()
+ .withTableMetadata(filteredMetadata)
+ .addAllConfig(loadTableResponse.config())
+ .addAllCredentials(loadTableResponse.credentials())
+ .build();
+ } else {
+ throw new IllegalArgumentException("Unrecognized snapshots: " +
snapshots);
+ }
+ }
+
@Override
public void close() throws Exception {
if (baseCatalog instanceof Closeable closeable) {