This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9cfb5ccd85 [core] Add load version snapshot and list snapshots to
Catalog (#5660)
9cfb5ccd85 is described below
commit 9cfb5ccd859da7b3380be9e32678f3de1443674b
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon May 26 17:51:56 2025 +0800
[core] Add load version snapshot and list snapshots to Catalog (#5660)
---
docs/static/rest-catalog-open-api.yaml | 110 +++++++++++++++++++++
.../main/java/org/apache/paimon/rest/RESTApi.java | 64 +++++++++++-
.../java/org/apache/paimon/rest/ResourcePaths.java | 24 +++++
.../rest/responses/GetVersionSnapshotResponse.java | 47 +++++++++
.../rest/responses/ListSnapshotsResponse.java | 69 +++++++++++++
.../org/apache/paimon/catalog/AbstractCatalog.java | 11 +++
.../java/org/apache/paimon/catalog/Catalog.java | 45 ++++++++-
.../org/apache/paimon/catalog/DelegateCatalog.java | 13 +++
.../java/org/apache/paimon/rest/RESTCatalog.java | 28 ++++++
.../org/apache/paimon/rest/RESTCatalogServer.java | 60 +++++++++++
.../org/apache/paimon/rest/RESTCatalogTest.java | 52 +++++++++-
11 files changed, 515 insertions(+), 8 deletions(-)
diff --git a/docs/static/rest-catalog-open-api.yaml
b/docs/static/rest-catalog-open-api.yaml
index 7e99e46392..00878178c8 100644
--- a/docs/static/rest-catalog-open-api.yaml
+++ b/docs/static/rest-catalog-open-api.yaml
@@ -732,6 +732,102 @@ paths:
$ref: '#/components/examples/SnapshotNotExistError'
"500":
$ref: '#/components/responses/ServerErrorResponse'
+ /v1/{prefix}/databases/{database}/tables/{table}/snapshots/{version}:
+ get:
+ tags:
+ - table
+ summary: Get version snapshot
+ operationId: getVersionSnapshot
+ parameters:
+ - name: prefix
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: database
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: table
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: version
+ in: path
+ required: true
+ schema:
+ type: string
+ responses:
+ "200":
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/GetVersionSnapshotResponse'
+ "401":
+ $ref: '#/components/responses/UnauthorizedErrorResponse'
+ 404:
+ description:
+ Not Found
+ - TableNotExistException, table does not exist
+ - SnapshotNotExistException, the requested snapshot does not exist
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/responses/ResourceNotExistErrorResponse'
+ examples:
+ TableNotExist:
+ $ref: '#/components/examples/TableNotExistError'
+ SnapshotNotExist:
+ $ref: '#/components/examples/SnapshotNotExistError'
+ "500":
+ $ref: '#/components/responses/ServerErrorResponse'
+ /v1/{prefix}/databases/{database}/tables/{table}/snapshots:
+ get:
+ tags:
+ - table
+ summary: List snapshots
+ operationId: listSnapshots
+ parameters:
+ - name: prefix
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: database
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: table
+ in: path
+ required: true
+ schema:
+ type: string
+ - name: maxResults
+ in: query
+ schema:
+ type: integer
+ format: int32
+ - name: pageToken
+ in: query
+ schema:
+ type: string
+ responses:
+ "200":
+ description: OK
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ListSnapshotsResponse'
+ "401":
+ $ref: '#/components/responses/UnauthorizedErrorResponse'
+ "404":
+ $ref: '#/components/responses/TableNotExistErrorResponse'
+ "500":
+ $ref: '#/components/responses/ServerErrorResponse'
/v1/{prefix}/databases/{database}/tables/{table}/partitions:
get:
tags:
@@ -2428,6 +2524,20 @@ components:
properties:
snapshot:
$ref: '#/components/schemas/TableSnapshot'
+ GetVersionSnapshotResponse:
+ type: object
+ properties:
+ snapshot:
+ $ref: '#/components/schemas/Snapshot'
+ ListSnapshotsResponse:
+ type: object
+ properties:
+ snapshots:
+ type: array
+ items:
+ $ref: '#/components/schemas/Snapshot'
+ nextPageToken:
+ type: string
AuthTableQueryRequest:
type: object
properties:
diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
index 48d197d881..9fb78cd76b 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
@@ -56,11 +56,13 @@ import org.apache.paimon.rest.responses.GetFunctionResponse;
import org.apache.paimon.rest.responses.GetTableResponse;
import org.apache.paimon.rest.responses.GetTableSnapshotResponse;
import org.apache.paimon.rest.responses.GetTableTokenResponse;
+import org.apache.paimon.rest.responses.GetVersionSnapshotResponse;
import org.apache.paimon.rest.responses.GetViewResponse;
import org.apache.paimon.rest.responses.ListBranchesResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
import org.apache.paimon.rest.responses.ListFunctionsResponse;
import org.apache.paimon.rest.responses.ListPartitionsResponse;
+import org.apache.paimon.rest.responses.ListSnapshotsResponse;
import org.apache.paimon.rest.responses.ListTableDetailsResponse;
import org.apache.paimon.rest.responses.ListTablesResponse;
import org.apache.paimon.rest.responses.ListViewDetailsResponse;
@@ -462,7 +464,7 @@ public class RESTApi {
* Load latest snapshot for table.
*
* @param identifier database name and table name.
- * @return {@link TableSnapshot} Optional snapshot.
+ * @return {@link TableSnapshot} snapshot with statistics.
* @throws NoSuchResourceException Exception thrown on HTTP 404 means the
table or the latest
* snapshot not exists
* @throws ForbiddenException Exception thrown on HTTP 403 means don't
have the permission for
@@ -478,6 +480,66 @@ public class RESTApi {
return response.getSnapshot();
}
+ /**
+ * Return the snapshot of table for given version. Version parsing order
is:
+ *
+ * <ul>
+ * <li>1. If it is 'EARLIEST', get the earliest snapshot.
+ * <li>2. If it is 'LATEST', get the latest snapshot.
+ * <li>3. If it is a number, get snapshot by snapshot id.
+ * <li>4. Else try to get snapshot from Tag name.
+ * </ul>
+ *
+ * @param identifier database name and table name.
+ * @param version version to snapshot
+ * @return Optional snapshot.
+ * @throws NoSuchResourceException Exception thrown on HTTP 404 means the
table or the snapshot
+ * not exists
+ * @throws ForbiddenException Exception thrown on HTTP 403 means don't
have the permission for
+ * this table
+ */
+ public Snapshot loadSnapshot(Identifier identifier, String version) {
+ GetVersionSnapshotResponse response =
+ client.get(
+ resourcePaths.tableSnapshot(
+ identifier.getDatabaseName(),
identifier.getObjectName(), version),
+ GetVersionSnapshotResponse.class,
+ restAuthFunction);
+ return response.getSnapshot();
+ }
+
+ /**
+ * Get paged snapshot list of the table, the snapshot list will be
returned in descending order.
+ *
+ * @param identifier path of the table to list partitions
+ * @param maxResults Optional parameter indicating the maximum number of
results to include in
+ * the result. If maxResults is not specified or set to 0, will return
the default number of
+ * max results.
+ * @param pageToken Optional parameter indicating the next page token
allows list to be start
+ * from a specific point.
+ * @return a list of the snapshots with provided page size(@param
maxResults) in this table and
+ * next page token.
+ * @throws NoSuchResourceException Exception thrown on HTTP 404 means the
table or the latest
+ * snapshot not exists
+ * @throws ForbiddenException Exception thrown on HTTP 403 means don't
have the permission for
+ * this table
+ */
+ public PagedList<Snapshot> listSnapshotsPaged(
+ Identifier identifier, @Nullable Integer maxResults, @Nullable
String pageToken) {
+ ListSnapshotsResponse response =
+ client.get(
+ resourcePaths.snapshots(
+ identifier.getDatabaseName(),
identifier.getObjectName()),
+ buildPagedQueryParams(maxResults, pageToken),
+ ListSnapshotsResponse.class,
+ restAuthFunction);
+ List<Snapshot> snapshots = response.getSnapshots();
+ if (snapshots == null) {
+ return new PagedList<>(emptyList(), null);
+ }
+ return new PagedList<>(snapshots, response.getNextPageToken());
+ }
+
/**
* Commit snapshot for table.
*
diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
index 70048e2a16..99020f65d2 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
@@ -32,6 +32,7 @@ public class ResourcePaths {
protected static final String TABLES = "tables";
protected static final String PARTITIONS = "partitions";
protected static final String BRANCHES = "branches";
+ protected static final String SNAPSHOTS = "snapshots";
protected static final String VIEWS = "views";
protected static final String TABLE_DETAILS = "table-details";
protected static final String VIEW_DETAILS = "view-details";
@@ -132,6 +133,29 @@ public class ResourcePaths {
"snapshot");
}
+ public String tableSnapshot(String databaseName, String objectName, String
version) {
+ return SLASH.join(
+ V1,
+ prefix,
+ DATABASES,
+ encodeString(databaseName),
+ TABLES,
+ encodeString(objectName),
+ SNAPSHOTS,
+ version);
+ }
+
+ public String snapshots(String databaseName, String objectName) {
+ return SLASH.join(
+ V1,
+ prefix,
+ DATABASES,
+ encodeString(databaseName),
+ TABLES,
+ encodeString(objectName),
+ SNAPSHOTS);
+ }
+
public String authTable(String databaseName, String objectName) {
return SLASH.join(
V1,
diff --git
a/paimon-api/src/main/java/org/apache/paimon/rest/responses/GetVersionSnapshotResponse.java
b/paimon-api/src/main/java/org/apache/paimon/rest/responses/GetVersionSnapshotResponse.java
new file mode 100644
index 0000000000..854f275426
--- /dev/null
+++
b/paimon-api/src/main/java/org/apache/paimon/rest/responses/GetVersionSnapshotResponse.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.responses;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.rest.RESTResponse;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/** Response for table snapshot by a version. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class GetVersionSnapshotResponse implements RESTResponse {
+
+ private static final String FIELD_SNAPSHOT = "snapshot";
+
+ @JsonProperty(FIELD_SNAPSHOT)
+ private final Snapshot snapshot;
+
+ @JsonCreator
+ public GetVersionSnapshotResponse(@JsonProperty(FIELD_SNAPSHOT) Snapshot
snapshot) {
+ this.snapshot = snapshot;
+ }
+
+ @JsonGetter(FIELD_SNAPSHOT)
+ public Snapshot getSnapshot() {
+ return snapshot;
+ }
+}
diff --git
a/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListSnapshotsResponse.java
b/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListSnapshotsResponse.java
new file mode 100644
index 0000000000..5f084a89bb
--- /dev/null
+++
b/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListSnapshotsResponse.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.responses;
+
+import org.apache.paimon.Snapshot;
+
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/** Response for list snapshots. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ListSnapshotsResponse implements PagedResponse<Snapshot> {
+
+ private static final String FIELD_SNAPSHOTS = "snapshots";
+ private static final String FIELD_NEXT_PAGE_TOKEN = "nextPageToken";
+
+ @JsonProperty(FIELD_SNAPSHOTS)
+ private final List<Snapshot> snapshots;
+
+ @JsonProperty(FIELD_NEXT_PAGE_TOKEN)
+ private final String nextPageToken;
+
+ public ListSnapshotsResponse(@JsonProperty(FIELD_SNAPSHOTS) List<Snapshot>
snapshots) {
+ this(snapshots, null);
+ }
+
+ @JsonCreator
+ public ListSnapshotsResponse(
+ @JsonProperty(FIELD_SNAPSHOTS) List<Snapshot> snapshots,
+ @JsonProperty(FIELD_NEXT_PAGE_TOKEN) String nextPageToken) {
+ this.snapshots = snapshots;
+ this.nextPageToken = nextPageToken;
+ }
+
+ @JsonGetter(FIELD_SNAPSHOTS)
+ public List<Snapshot> getSnapshots() {
+ return this.snapshots;
+ }
+
+ @Override
+ public List<Snapshot> data() {
+ return snapshots;
+ }
+
+ @JsonGetter(FIELD_NEXT_PAGE_TOKEN)
+ public String getNextPageToken() {
+ return this.nextPageToken;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index c292fd69b8..eca8c68675 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -486,6 +486,17 @@ public abstract class AbstractCatalog implements Catalog {
throw new UnsupportedOperationException();
}
+ @Override
+ public Optional<Snapshot> loadSnapshot(Identifier identifier, String
version) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public PagedList<Snapshot> listSnapshotsPaged(
+ Identifier identifier, @Nullable Integer maxResults, @Nullable
String pageToken) {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public void rollbackTo(Identifier identifier, Instant instant)
throws Catalog.TableNotExistException {
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 9fc8fc0065..4e5fd10046 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -352,7 +352,8 @@ public interface Catalog extends AutoCloseable {
List<Partition> listPartitions(Identifier identifier) throws
TableNotExistException;
/**
- * Get paged partitioned list of the table.
+ * Get paged partition list of the table, the partition list will be
returned in descending
+ * order.
*
* @param identifier path of the table to list partitions
* @param maxResults Optional parameter indicating the maximum number of
results to include in
@@ -361,7 +362,7 @@ public interface Catalog extends AutoCloseable {
* @param pageToken Optional parameter indicating the next page token
allows list to be start
* from a specific point.
* @param partitionNamePattern A sql LIKE pattern (%) for partition names.
All partitions will
- * be * returned if not set or empty. Currently, only prefix matching
is supported.
+ * be returned if not set or empty. Currently, only prefix matching is
supported.
* @return a list of the partitions with provided page size(@param
maxResults) in this table and
* next page token, or a list of all partitions of the table if the
catalog does not {@link
* #supportsListObjectsPaged()}.
@@ -639,6 +640,46 @@ public interface Catalog extends AutoCloseable {
Optional<TableSnapshot> loadSnapshot(Identifier identifier)
throws Catalog.TableNotExistException;
+ /**
+ * Return the snapshot of table for given version. Version parsing order
is:
+ *
+ * <ul>
+ * <li>1. If it is 'EARLIEST', get the earliest snapshot.
+ * <li>2. If it is 'LATEST', get the latest snapshot.
+ * <li>3. If it is a number, get snapshot by snapshot id.
+ * <li>4. Else try to get snapshot from Tag name.
+ * </ul>
+ *
+ * @param identifier Path of the table
+ * @param version version to snapshot
+ * @return The requested snapshot
+ * @throws Catalog.TableNotExistException if the target does not exist
+ * @throws UnsupportedOperationException if the catalog does not {@link
+ * #supportsVersionManagement()}
+ */
+ Optional<Snapshot> loadSnapshot(Identifier identifier, String version)
+ throws Catalog.TableNotExistException;
+
+ /**
+ * Get paged snapshot list of the table, the snapshot list will be
returned in descending order.
+ *
+ * @param identifier path of the table to list partitions
+ * @param maxResults Optional parameter indicating the maximum number of
results to include in
+ * the result. If maxResults is not specified or set to 0, will return
the default number of
+ * max results.
+ * @param pageToken Optional parameter indicating the next page token
allows list to be start
+ * from a specific point.
+ * @return a list of the snapshots with provided page size(@param
maxResults) in this table and
+ * next page token, or a list of all snapshots of the table if the
catalog does not {@link
+ * #supportsListObjectsPaged()}.
+ * @throws TableNotExistException if the table does not exist
+ * @throws UnsupportedOperationException if the catalog does not {@link
+ * #supportsVersionManagement()}
+ */
+ PagedList<Snapshot> listSnapshotsPaged(
+ Identifier identifier, @Nullable Integer maxResults, @Nullable
String pageToken)
+ throws TableNotExistException;
+
/**
* rollback table by the given {@link Identifier} and instant.
*
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index d494dc56ba..96470c9b9b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -170,6 +170,19 @@ public abstract class DelegateCatalog implements Catalog {
return wrapped.loadSnapshot(identifier);
}
+ @Override
+ public Optional<Snapshot> loadSnapshot(Identifier identifier, String
version)
+ throws TableNotExistException {
+ return wrapped.loadSnapshot(identifier, version);
+ }
+
+ @Override
+ public PagedList<Snapshot> listSnapshotsPaged(
+ Identifier identifier, @Nullable Integer maxResults, @Nullable
String pageToken)
+ throws TableNotExistException {
+ return wrapped.listSnapshotsPaged(identifier, maxResults, pageToken);
+ }
+
@Override
public void rollbackTo(Identifier identifier, Instant instant)
throws Catalog.TableNotExistException {
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index e006828045..72e4373efb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -286,6 +286,34 @@ public class RESTCatalog implements Catalog {
}
}
+ @Override
+ public Optional<Snapshot> loadSnapshot(Identifier identifier, String
version)
+ throws TableNotExistException {
+ try {
+ return Optional.ofNullable(api.loadSnapshot(identifier, version));
+ } catch (NoSuchResourceException e) {
+ if (StringUtils.equals(e.resourceType(),
ErrorResponse.RESOURCE_TYPE_SNAPSHOT)) {
+ return Optional.empty();
+ }
+ throw new TableNotExistException(identifier);
+ } catch (ForbiddenException e) {
+ throw new TableNoPermissionException(identifier, e);
+ }
+ }
+
+ @Override
+ public PagedList<Snapshot> listSnapshotsPaged(
+ Identifier identifier, @Nullable Integer maxResults, @Nullable
String pageToken)
+ throws TableNotExistException {
+ try {
+ return api.listSnapshotsPaged(identifier, maxResults, pageToken);
+ } catch (NoSuchResourceException e) {
+ throw new TableNotExistException(identifier);
+ } catch (ForbiddenException e) {
+ throw new TableNoPermissionException(identifier, e);
+ }
+ }
+
@Override
public boolean supportsListObjectsPaged() {
return true;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 441834dfd9..3b88e76d90 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -67,11 +67,13 @@ import org.apache.paimon.rest.responses.GetFunctionResponse;
import org.apache.paimon.rest.responses.GetTableResponse;
import org.apache.paimon.rest.responses.GetTableSnapshotResponse;
import org.apache.paimon.rest.responses.GetTableTokenResponse;
+import org.apache.paimon.rest.responses.GetVersionSnapshotResponse;
import org.apache.paimon.rest.responses.GetViewResponse;
import org.apache.paimon.rest.responses.ListBranchesResponse;
import org.apache.paimon.rest.responses.ListDatabasesResponse;
import org.apache.paimon.rest.responses.ListFunctionsResponse;
import org.apache.paimon.rest.responses.ListPartitionsResponse;
+import org.apache.paimon.rest.responses.ListSnapshotsResponse;
import org.apache.paimon.rest.responses.ListTableDetailsResponse;
import org.apache.paimon.rest.responses.ListTablesResponse;
import org.apache.paimon.rest.responses.ListViewDetailsResponse;
@@ -86,6 +88,7 @@ import org.apache.paimon.table.Instant;
import org.apache.paimon.table.TableSnapshot;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.view.View;
import org.apache.paimon.view.ViewChange;
import org.apache.paimon.view.ViewImpl;
@@ -109,6 +112,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -325,6 +329,14 @@ public class RESTCatalogServer {
resources.length == 4
&&
ResourcePaths.TABLES.equals(resources[1])
&& "snapshot".equals(resources[3]);
+ boolean isListSnapshots =
+ resources.length == 4
+ &&
ResourcePaths.TABLES.equals(resources[1])
+ &&
ResourcePaths.SNAPSHOTS.equals(resources[3]);
+ boolean isLoadSnapshot =
+ resources.length == 5
+ &&
ResourcePaths.TABLES.equals(resources[1])
+ &&
ResourcePaths.SNAPSHOTS.equals(resources[3]);
boolean isTableAuth =
resources.length == 4
&&
ResourcePaths.TABLES.equals(resources[1])
@@ -397,6 +409,10 @@ public class RESTCatalogServer {
return getDataTokenHandle(identifier);
} else if (isTableSnapshot) {
return snapshotHandle(identifier);
+ } else if (isListSnapshots) {
+ return listSnapshots(identifier);
+ } else if (isLoadSnapshot) {
+ return loadSnapshot(identifier, resources[4]);
} else if (isTableAuth) {
return authTable(identifier,
restAuthParameter.data());
} else if (isCommitSnapshot) {
@@ -658,6 +674,50 @@ public class RESTCatalogServer {
.setBody(RESTApi.toJson(getTableSnapshotResponse));
}
+ private MockResponse listSnapshots(Identifier identifier) throws Exception
{
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+ Iterator<Snapshot> snapshots = table.snapshotManager().snapshots();
+ List<Snapshot> snapshotList = new ArrayList<>();
+ while (snapshots.hasNext()) {
+ snapshotList.add(snapshots.next());
+ }
+ ListSnapshotsResponse response = new
ListSnapshotsResponse(snapshotList, null);
+ return new
MockResponse().setResponseCode(200).setBody(RESTApi.toJson(response));
+ }
+
+ private MockResponse loadSnapshot(Identifier identifier, String version)
throws Exception {
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+ SnapshotManager snapshotManager = table.snapshotManager();
+ Snapshot snapshot = null;
+ try {
+ if (version.equals("EARLIEST")) {
+ snapshot = snapshotManager.earliestSnapshot();
+ } else if (version.equals("LATEST")) {
+ snapshot = snapshotManager.latestSnapshot();
+ } else {
+ try {
+ long snapshotId = Long.parseLong(version);
+ snapshot = snapshotManager.snapshot(snapshotId);
+ } catch (NumberFormatException e) {
+ snapshot =
table.tagManager().get(version).get().trimToSnapshot();
+ }
+ }
+ } catch (Exception ignored) {
+ }
+
+ if (snapshot == null) {
+ RESTResponse response =
+ new ErrorResponse(
+ ErrorResponse.RESOURCE_TYPE_SNAPSHOT,
+ identifier.getDatabaseName(),
+ "No Snapshot",
+ 404);
+ return mockResponse(response, 404);
+ }
+ GetVersionSnapshotResponse response = new
GetVersionSnapshotResponse(snapshot);
+ return new
MockResponse().setResponseCode(200).setBody(RESTApi.toJson(response));
+ }
+
private Optional<MockResponse> checkTablePartitioned(Identifier
identifier) {
if (tableMetadataStore.containsKey(identifier.getFullName())) {
TableMetadata tableMetadata =
tableMetadataStore.get(identifier.getFullName());
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 316af2e585..44f8a3a63e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -82,6 +82,7 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
+import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
@@ -1262,7 +1263,7 @@ public abstract class RESTCatalogTest extends
CatalogTestBase {
restCatalog.commitSnapshot(
hasSnapshotTableIdentifier,
createSnapshotWithMillis(1L,
System.currentTimeMillis()),
- new ArrayList<PartitionStatistics>()));
+ new ArrayList<>()));
createTable(hasSnapshotTableIdentifier, Maps.newHashMap(),
Lists.newArrayList("col1"));
long id = 10086;
@@ -1685,6 +1686,47 @@ public abstract class RESTCatalogTest extends
CatalogTestBase {
table.newReadBuilder().withProjection(new int[] {1}).newScan().plan();
}
+ @Test
+ void testSnapshotMethods() throws Exception {
+ Identifier identifier = Identifier.create("test_table_db",
"snapshots_table");
+ catalog.createDatabase(identifier.getDatabaseName(), true);
+ catalog.createTable(
+ identifier,
+ new Schema(
+ Lists.newArrayList(new DataField(0, "col",
DataTypes.INT())),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ emptyMap(),
+ ""),
+ true);
+ Table table = catalog.getTable(identifier);
+ batchWrite(table, singletonList(1));
+ batchWrite(table, singletonList(2));
+ batchWrite(table, singletonList(3));
+ batchWrite(table, singletonList(4));
+
+ assertThat(catalog.listSnapshotsPaged(identifier, null,
null).getElements())
+ .containsExactlyInAnyOrder(
+ table.snapshot(1), table.snapshot(2),
table.snapshot(3), table.snapshot(4));
+
+ assertThat(catalog.loadSnapshot(identifier, "3"))
+ .isPresent()
+ .get()
+ .isEqualTo(table.snapshot(3));
+ assertThat(catalog.loadSnapshot(identifier, "EARLIEST"))
+ .isPresent()
+ .get()
+ .isEqualTo(table.snapshot(1));
+
+ table.createTag("MY_TAG", 2);
+ assertThat(catalog.loadSnapshot(identifier, "MY_TAG"))
+ .isPresent()
+ .get()
+ .isEqualTo(table.snapshot(2));
+
+ assertThat(catalog.loadSnapshot(identifier, "15")).isEmpty();
+ }
+
private TestPagedResponse generateTestPagedResponse(
Map<String, String> queryParams,
List<Integer> testData,
@@ -1778,8 +1820,8 @@ public abstract class RESTCatalogTest extends
CatalogTestBase {
long fileCount,
long lastFileCreationTime);
- protected void batchWrite(FileStoreTable tableTestWrite, List<Integer>
data) throws Exception {
- BatchWriteBuilder writeBuilder = tableTestWrite.newBatchWriteBuilder();
+ protected void batchWrite(Table table, List<Integer> data) throws
Exception {
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
BatchTableWrite write = writeBuilder.newWrite();
for (Integer i : data) {
GenericRow record = GenericRow.of(i);
@@ -1792,8 +1834,8 @@ public abstract class RESTCatalogTest extends
CatalogTestBase {
commit.close();
}
- protected List<String> batchRead(FileStoreTable tableTestWrite) throws
IOException {
- ReadBuilder readBuilder = tableTestWrite.newReadBuilder();
+ protected List<String> batchRead(Table table) throws IOException {
+ ReadBuilder readBuilder = table.newReadBuilder();
List<Split> splits = readBuilder.newScan().plan().splits();
TableRead read = readBuilder.newRead();
RecordReader<InternalRow> reader = read.createReader(splits);