This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cfb5ccd85 [core] Add load version snapshot and list snapshots to 
Catalog (#5660)
9cfb5ccd85 is described below

commit 9cfb5ccd859da7b3380be9e32678f3de1443674b
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon May 26 17:51:56 2025 +0800

    [core] Add load version snapshot and list snapshots to Catalog (#5660)
---
 docs/static/rest-catalog-open-api.yaml             | 110 +++++++++++++++++++++
 .../main/java/org/apache/paimon/rest/RESTApi.java  |  64 +++++++++++-
 .../java/org/apache/paimon/rest/ResourcePaths.java |  24 +++++
 .../rest/responses/GetVersionSnapshotResponse.java |  47 +++++++++
 .../rest/responses/ListSnapshotsResponse.java      |  69 +++++++++++++
 .../org/apache/paimon/catalog/AbstractCatalog.java |  11 +++
 .../java/org/apache/paimon/catalog/Catalog.java    |  45 ++++++++-
 .../org/apache/paimon/catalog/DelegateCatalog.java |  13 +++
 .../java/org/apache/paimon/rest/RESTCatalog.java   |  28 ++++++
 .../org/apache/paimon/rest/RESTCatalogServer.java  |  60 +++++++++++
 .../org/apache/paimon/rest/RESTCatalogTest.java    |  52 +++++++++-
 11 files changed, 515 insertions(+), 8 deletions(-)

diff --git a/docs/static/rest-catalog-open-api.yaml 
b/docs/static/rest-catalog-open-api.yaml
index 7e99e46392..00878178c8 100644
--- a/docs/static/rest-catalog-open-api.yaml
+++ b/docs/static/rest-catalog-open-api.yaml
@@ -732,6 +732,102 @@ paths:
                   $ref: '#/components/examples/SnapshotNotExistError'
         "500":
           $ref: '#/components/responses/ServerErrorResponse'
+  /v1/{prefix}/databases/{database}/tables/{table}/snapshots/{version}:
+    get:
+      tags:
+        - table
+      summary: Get version snapshot
+      operationId: getVersionSnapshot
+      parameters:
+        - name: prefix
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: database
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: table
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: version
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        "200":
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/GetVersionSnapshotResponse'
+        "401":
+          $ref: '#/components/responses/UnauthorizedErrorResponse'
+        404:
+          description:
+            Not Found
+            - TableNotExistException, table does not exist
+            - SnapshotNotExistException, the requested snapshot does not exist
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/responses/ResourceNotExistErrorResponse'
+              examples:
+                TableNotExist:
+                  $ref: '#/components/examples/TableNotExistError'
+                SnapshotNotExist:
+                  $ref: '#/components/examples/SnapshotNotExistError'
+        "500":
+          $ref: '#/components/responses/ServerErrorResponse'
+  /v1/{prefix}/databases/{database}/tables/{table}/snapshots:
+    get:
+      tags:
+        - table
+      summary: List snapshots
+      operationId: listSnapshots
+      parameters:
+        - name: prefix
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: database
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: table
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: maxResults
+          in: query
+          schema:
+            type: integer
+            format: int32
+        - name: pageToken
+          in: query
+          schema:
+            type: string
+      responses:
+        "200":
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ListSnapshotsResponse'
+        "401":
+          $ref: '#/components/responses/UnauthorizedErrorResponse'
+        "404":
+          $ref: '#/components/responses/TableNotExistErrorResponse'
+        "500":
+          $ref: '#/components/responses/ServerErrorResponse'
   /v1/{prefix}/databases/{database}/tables/{table}/partitions:
     get:
       tags:
@@ -2428,6 +2524,20 @@ components:
       properties:
         snapshot:
           $ref: '#/components/schemas/TableSnapshot'
+    GetVersionSnapshotResponse:
+      type: object
+      properties:
+        snapshot:
+          $ref: '#/components/schemas/Snapshot'
+    ListSnapshotsResponse:
+      type: object
+      properties:
+        snapshots:
+          type: array
+          items:
+            $ref: '#/components/schemas/Snapshot'
+        nextPageToken:
+          type: string
     AuthTableQueryRequest:
       type: object
       properties:
diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java 
b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
index 48d197d881..9fb78cd76b 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
@@ -56,11 +56,13 @@ import org.apache.paimon.rest.responses.GetFunctionResponse;
 import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.rest.responses.GetTableSnapshotResponse;
 import org.apache.paimon.rest.responses.GetTableTokenResponse;
+import org.apache.paimon.rest.responses.GetVersionSnapshotResponse;
 import org.apache.paimon.rest.responses.GetViewResponse;
 import org.apache.paimon.rest.responses.ListBranchesResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
 import org.apache.paimon.rest.responses.ListFunctionsResponse;
 import org.apache.paimon.rest.responses.ListPartitionsResponse;
+import org.apache.paimon.rest.responses.ListSnapshotsResponse;
 import org.apache.paimon.rest.responses.ListTableDetailsResponse;
 import org.apache.paimon.rest.responses.ListTablesResponse;
 import org.apache.paimon.rest.responses.ListViewDetailsResponse;
@@ -462,7 +464,7 @@ public class RESTApi {
      * Load latest snapshot for table.
      *
      * @param identifier database name and table name.
-     * @return {@link TableSnapshot} Optional snapshot.
+     * @return {@link TableSnapshot} snapshot with statistics.
      * @throws NoSuchResourceException Exception thrown on HTTP 404 means the 
table or the latest
      *     snapshot not exists
      * @throws ForbiddenException Exception thrown on HTTP 403 means don't 
have the permission for
@@ -478,6 +480,66 @@ public class RESTApi {
         return response.getSnapshot();
     }
 
+    /**
+     * Return the snapshot of table for given version. Version parsing order 
is:
+     *
+     * <ul>
+     *   <li>1. If it is 'EARLIEST', get the earliest snapshot.
+     *   <li>2. If it is 'LATEST', get the latest snapshot.
+     *   <li>3. If it is a number, get snapshot by snapshot id.
+     *   <li>4. Else try to get snapshot from Tag name.
+     * </ul>
+     *
+     * @param identifier database name and table name.
+     * @param version version to snapshot
+     * @return Optional snapshot.
+     * @throws NoSuchResourceException Exception thrown on HTTP 404 means the 
table or the snapshot
+     *     not exists
+     * @throws ForbiddenException Exception thrown on HTTP 403 means don't 
have the permission for
+     *     this table
+     */
+    public Snapshot loadSnapshot(Identifier identifier, String version) {
+        GetVersionSnapshotResponse response =
+                client.get(
+                        resourcePaths.tableSnapshot(
+                                identifier.getDatabaseName(), 
identifier.getObjectName(), version),
+                        GetVersionSnapshotResponse.class,
+                        restAuthFunction);
+        return response.getSnapshot();
+    }
+
+    /**
+     * Get paged snapshot list of the table, the snapshot list will be 
returned in descending order.
+     *
+     * @param identifier path of the table to list partitions
+     * @param maxResults Optional parameter indicating the maximum number of 
results to include in
+     *     the result. If maxResults is not specified or set to 0, will return 
the default number of
+     *     max results.
+     * @param pageToken Optional parameter indicating the next page token 
allows list to be start
+     *     from a specific point.
+     * @return a list of the snapshots with provided page size(@param 
maxResults) in this table and
+     *     next page token.
+     * @throws NoSuchResourceException Exception thrown on HTTP 404 means the 
table or the latest
+     *     snapshot not exists
+     * @throws ForbiddenException Exception thrown on HTTP 403 means don't 
have the permission for
+     *     this table
+     */
+    public PagedList<Snapshot> listSnapshotsPaged(
+            Identifier identifier, @Nullable Integer maxResults, @Nullable 
String pageToken) {
+        ListSnapshotsResponse response =
+                client.get(
+                        resourcePaths.snapshots(
+                                identifier.getDatabaseName(), 
identifier.getObjectName()),
+                        buildPagedQueryParams(maxResults, pageToken),
+                        ListSnapshotsResponse.class,
+                        restAuthFunction);
+        List<Snapshot> snapshots = response.getSnapshots();
+        if (snapshots == null) {
+            return new PagedList<>(emptyList(), null);
+        }
+        return new PagedList<>(snapshots, response.getNextPageToken());
+    }
+
     /**
      * Commit snapshot for table.
      *
diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java 
b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
index 70048e2a16..99020f65d2 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
@@ -32,6 +32,7 @@ public class ResourcePaths {
     protected static final String TABLES = "tables";
     protected static final String PARTITIONS = "partitions";
     protected static final String BRANCHES = "branches";
+    protected static final String SNAPSHOTS = "snapshots";
     protected static final String VIEWS = "views";
     protected static final String TABLE_DETAILS = "table-details";
     protected static final String VIEW_DETAILS = "view-details";
@@ -132,6 +133,29 @@ public class ResourcePaths {
                 "snapshot");
     }
 
+    public String tableSnapshot(String databaseName, String objectName, String 
version) {
+        return SLASH.join(
+                V1,
+                prefix,
+                DATABASES,
+                encodeString(databaseName),
+                TABLES,
+                encodeString(objectName),
+                SNAPSHOTS,
+                version);
+    }
+
+    public String snapshots(String databaseName, String objectName) {
+        return SLASH.join(
+                V1,
+                prefix,
+                DATABASES,
+                encodeString(databaseName),
+                TABLES,
+                encodeString(objectName),
+                SNAPSHOTS);
+    }
+
     public String authTable(String databaseName, String objectName) {
         return SLASH.join(
                 V1,
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/rest/responses/GetVersionSnapshotResponse.java
 
b/paimon-api/src/main/java/org/apache/paimon/rest/responses/GetVersionSnapshotResponse.java
new file mode 100644
index 0000000000..854f275426
--- /dev/null
+++ 
b/paimon-api/src/main/java/org/apache/paimon/rest/responses/GetVersionSnapshotResponse.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.responses;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.rest.RESTResponse;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+/** Response for table snapshot by a version. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class GetVersionSnapshotResponse implements RESTResponse {
+
+    private static final String FIELD_SNAPSHOT = "snapshot";
+
+    @JsonProperty(FIELD_SNAPSHOT)
+    private final Snapshot snapshot;
+
+    @JsonCreator
+    public GetVersionSnapshotResponse(@JsonProperty(FIELD_SNAPSHOT) Snapshot 
snapshot) {
+        this.snapshot = snapshot;
+    }
+
+    @JsonGetter(FIELD_SNAPSHOT)
+    public Snapshot getSnapshot() {
+        return snapshot;
+    }
+}
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListSnapshotsResponse.java
 
b/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListSnapshotsResponse.java
new file mode 100644
index 0000000000..5f084a89bb
--- /dev/null
+++ 
b/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListSnapshotsResponse.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.responses;
+
+import org.apache.paimon.Snapshot;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/** Response for list snapshots. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ListSnapshotsResponse implements PagedResponse<Snapshot> {
+
+    private static final String FIELD_SNAPSHOTS = "snapshots";
+    private static final String FIELD_NEXT_PAGE_TOKEN = "nextPageToken";
+
+    @JsonProperty(FIELD_SNAPSHOTS)
+    private final List<Snapshot> snapshots;
+
+    @JsonProperty(FIELD_NEXT_PAGE_TOKEN)
+    private final String nextPageToken;
+
+    public ListSnapshotsResponse(@JsonProperty(FIELD_SNAPSHOTS) List<Snapshot> 
snapshots) {
+        this(snapshots, null);
+    }
+
+    @JsonCreator
+    public ListSnapshotsResponse(
+            @JsonProperty(FIELD_SNAPSHOTS) List<Snapshot> snapshots,
+            @JsonProperty(FIELD_NEXT_PAGE_TOKEN) String nextPageToken) {
+        this.snapshots = snapshots;
+        this.nextPageToken = nextPageToken;
+    }
+
+    @JsonGetter(FIELD_SNAPSHOTS)
+    public List<Snapshot> getSnapshots() {
+        return this.snapshots;
+    }
+
+    @Override
+    public List<Snapshot> data() {
+        return snapshots;
+    }
+
+    @JsonGetter(FIELD_NEXT_PAGE_TOKEN)
+    public String getNextPageToken() {
+        return this.nextPageToken;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index c292fd69b8..eca8c68675 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -486,6 +486,17 @@ public abstract class AbstractCatalog implements Catalog {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public Optional<Snapshot> loadSnapshot(Identifier identifier, String 
version) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public PagedList<Snapshot> listSnapshotsPaged(
+            Identifier identifier, @Nullable Integer maxResults, @Nullable 
String pageToken) {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public void rollbackTo(Identifier identifier, Instant instant)
             throws Catalog.TableNotExistException {
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 9fc8fc0065..4e5fd10046 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -352,7 +352,8 @@ public interface Catalog extends AutoCloseable {
     List<Partition> listPartitions(Identifier identifier) throws 
TableNotExistException;
 
     /**
-     * Get paged partitioned list of the table.
+     * Get paged partition list of the table, the partition list will be 
returned in descending
+     * order.
      *
      * @param identifier path of the table to list partitions
      * @param maxResults Optional parameter indicating the maximum number of 
results to include in
@@ -361,7 +362,7 @@ public interface Catalog extends AutoCloseable {
      * @param pageToken Optional parameter indicating the next page token 
allows list to be start
      *     from a specific point.
      * @param partitionNamePattern A sql LIKE pattern (%) for partition names. 
All partitions will
-     *     be * returned if not set or empty. Currently, only prefix matching 
is supported.
+     *     be returned if not set or empty. Currently, only prefix matching is 
supported.
      * @return a list of the partitions with provided page size(@param 
maxResults) in this table and
      *     next page token, or a list of all partitions of the table if the 
catalog does not {@link
      *     #supportsListObjectsPaged()}.
@@ -639,6 +640,46 @@ public interface Catalog extends AutoCloseable {
     Optional<TableSnapshot> loadSnapshot(Identifier identifier)
             throws Catalog.TableNotExistException;
 
+    /**
+     * Return the snapshot of table for given version. Version parsing order 
is:
+     *
+     * <ul>
+     *   <li>1. If it is 'EARLIEST', get the earliest snapshot.
+     *   <li>2. If it is 'LATEST', get the latest snapshot.
+     *   <li>3. If it is a number, get snapshot by snapshot id.
+     *   <li>4. Else try to get snapshot from Tag name.
+     * </ul>
+     *
+     * @param identifier Path of the table
+     * @param version version to snapshot
+     * @return The requested snapshot
+     * @throws Catalog.TableNotExistException if the target does not exist
+     * @throws UnsupportedOperationException if the catalog does not {@link
+     *     #supportsVersionManagement()}
+     */
+    Optional<Snapshot> loadSnapshot(Identifier identifier, String version)
+            throws Catalog.TableNotExistException;
+
+    /**
+     * Get paged snapshot list of the table, the snapshot list will be 
returned in descending order.
+     *
+     * @param identifier path of the table to list partitions
+     * @param maxResults Optional parameter indicating the maximum number of 
results to include in
+     *     the result. If maxResults is not specified or set to 0, will return 
the default number of
+     *     max results.
+     * @param pageToken Optional parameter indicating the next page token 
allows list to be start
+     *     from a specific point.
+     * @return a list of the snapshots with provided page size(@param 
maxResults) in this table and
+     *     next page token, or a list of all snapshots of the table if the 
catalog does not {@link
+     *     #supportsListObjectsPaged()}.
+     * @throws TableNotExistException if the table does not exist
+     * @throws UnsupportedOperationException if the catalog does not {@link
+     *     #supportsVersionManagement()}
+     */
+    PagedList<Snapshot> listSnapshotsPaged(
+            Identifier identifier, @Nullable Integer maxResults, @Nullable 
String pageToken)
+            throws TableNotExistException;
+
     /**
      * rollback table by the given {@link Identifier} and instant.
      *
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index d494dc56ba..96470c9b9b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -170,6 +170,19 @@ public abstract class DelegateCatalog implements Catalog {
         return wrapped.loadSnapshot(identifier);
     }
 
+    @Override
+    public Optional<Snapshot> loadSnapshot(Identifier identifier, String 
version)
+            throws TableNotExistException {
+        return wrapped.loadSnapshot(identifier, version);
+    }
+
+    @Override
+    public PagedList<Snapshot> listSnapshotsPaged(
+            Identifier identifier, @Nullable Integer maxResults, @Nullable 
String pageToken)
+            throws TableNotExistException {
+        return wrapped.listSnapshotsPaged(identifier, maxResults, pageToken);
+    }
+
     @Override
     public void rollbackTo(Identifier identifier, Instant instant)
             throws Catalog.TableNotExistException {
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index e006828045..72e4373efb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -286,6 +286,34 @@ public class RESTCatalog implements Catalog {
         }
     }
 
+    @Override
+    public Optional<Snapshot> loadSnapshot(Identifier identifier, String 
version)
+            throws TableNotExistException {
+        try {
+            return Optional.ofNullable(api.loadSnapshot(identifier, version));
+        } catch (NoSuchResourceException e) {
+            if (StringUtils.equals(e.resourceType(), 
ErrorResponse.RESOURCE_TYPE_SNAPSHOT)) {
+                return Optional.empty();
+            }
+            throw new TableNotExistException(identifier);
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        }
+    }
+
+    @Override
+    public PagedList<Snapshot> listSnapshotsPaged(
+            Identifier identifier, @Nullable Integer maxResults, @Nullable 
String pageToken)
+            throws TableNotExistException {
+        try {
+            return api.listSnapshotsPaged(identifier, maxResults, pageToken);
+        } catch (NoSuchResourceException e) {
+            throw new TableNotExistException(identifier);
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        }
+    }
+
     @Override
     public boolean supportsListObjectsPaged() {
         return true;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 441834dfd9..3b88e76d90 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -67,11 +67,13 @@ import org.apache.paimon.rest.responses.GetFunctionResponse;
 import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.rest.responses.GetTableSnapshotResponse;
 import org.apache.paimon.rest.responses.GetTableTokenResponse;
+import org.apache.paimon.rest.responses.GetVersionSnapshotResponse;
 import org.apache.paimon.rest.responses.GetViewResponse;
 import org.apache.paimon.rest.responses.ListBranchesResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
 import org.apache.paimon.rest.responses.ListFunctionsResponse;
 import org.apache.paimon.rest.responses.ListPartitionsResponse;
+import org.apache.paimon.rest.responses.ListSnapshotsResponse;
 import org.apache.paimon.rest.responses.ListTableDetailsResponse;
 import org.apache.paimon.rest.responses.ListTablesResponse;
 import org.apache.paimon.rest.responses.ListViewDetailsResponse;
@@ -86,6 +88,7 @@ import org.apache.paimon.table.Instant;
 import org.apache.paimon.table.TableSnapshot;
 import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.SnapshotManager;
 import org.apache.paimon.view.View;
 import org.apache.paimon.view.ViewChange;
 import org.apache.paimon.view.ViewImpl;
@@ -109,6 +112,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -325,6 +329,14 @@ public class RESTCatalogServer {
                                 resources.length == 4
                                         && 
ResourcePaths.TABLES.equals(resources[1])
                                         && "snapshot".equals(resources[3]);
+                        boolean isListSnapshots =
+                                resources.length == 4
+                                        && 
ResourcePaths.TABLES.equals(resources[1])
+                                        && 
ResourcePaths.SNAPSHOTS.equals(resources[3]);
+                        boolean isLoadSnapshot =
+                                resources.length == 5
+                                        && 
ResourcePaths.TABLES.equals(resources[1])
+                                        && 
ResourcePaths.SNAPSHOTS.equals(resources[3]);
                         boolean isTableAuth =
                                 resources.length == 4
                                         && 
ResourcePaths.TABLES.equals(resources[1])
@@ -397,6 +409,10 @@ public class RESTCatalogServer {
                             return getDataTokenHandle(identifier);
                         } else if (isTableSnapshot) {
                             return snapshotHandle(identifier);
+                        } else if (isListSnapshots) {
+                            return listSnapshots(identifier);
+                        } else if (isLoadSnapshot) {
+                            return loadSnapshot(identifier, resources[4]);
                         } else if (isTableAuth) {
                             return authTable(identifier, 
restAuthParameter.data());
                         } else if (isCommitSnapshot) {
@@ -658,6 +674,50 @@ public class RESTCatalogServer {
                 .setBody(RESTApi.toJson(getTableSnapshotResponse));
     }
 
+    private MockResponse listSnapshots(Identifier identifier) throws Exception 
{
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+        Iterator<Snapshot> snapshots = table.snapshotManager().snapshots();
+        List<Snapshot> snapshotList = new ArrayList<>();
+        while (snapshots.hasNext()) {
+            snapshotList.add(snapshots.next());
+        }
+        ListSnapshotsResponse response = new 
ListSnapshotsResponse(snapshotList, null);
+        return new 
MockResponse().setResponseCode(200).setBody(RESTApi.toJson(response));
+    }
+
+    private MockResponse loadSnapshot(Identifier identifier, String version) 
throws Exception {
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+        SnapshotManager snapshotManager = table.snapshotManager();
+        Snapshot snapshot = null;
+        try {
+            if (version.equals("EARLIEST")) {
+                snapshot = snapshotManager.earliestSnapshot();
+            } else if (version.equals("LATEST")) {
+                snapshot = snapshotManager.latestSnapshot();
+            } else {
+                try {
+                    long snapshotId = Long.parseLong(version);
+                    snapshot = snapshotManager.snapshot(snapshotId);
+                } catch (NumberFormatException e) {
+                    snapshot = 
table.tagManager().get(version).get().trimToSnapshot();
+                }
+            }
+        } catch (Exception ignored) {
+        }
+
+        if (snapshot == null) {
+            RESTResponse response =
+                    new ErrorResponse(
+                            ErrorResponse.RESOURCE_TYPE_SNAPSHOT,
+                            identifier.getDatabaseName(),
+                            "No Snapshot",
+                            404);
+            return mockResponse(response, 404);
+        }
+        GetVersionSnapshotResponse response = new 
GetVersionSnapshotResponse(snapshot);
+        return new 
MockResponse().setResponseCode(200).setBody(RESTApi.toJson(response));
+    }
+
     private Optional<MockResponse> checkTablePartitioned(Identifier 
identifier) {
         if (tableMetadataStore.containsKey(identifier.getFullName())) {
             TableMetadata tableMetadata = 
tableMetadataStore.get(identifier.getFullName());
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 316af2e585..44f8a3a63e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -82,6 +82,7 @@ import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static java.util.Collections.emptyMap;
 import static java.util.Collections.singletonList;
 import static java.util.Collections.singletonMap;
 import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
@@ -1262,7 +1263,7 @@ public abstract class RESTCatalogTest extends 
CatalogTestBase {
                         restCatalog.commitSnapshot(
                                 hasSnapshotTableIdentifier,
                                 createSnapshotWithMillis(1L, 
System.currentTimeMillis()),
-                                new ArrayList<PartitionStatistics>()));
+                                new ArrayList<>()));
 
         createTable(hasSnapshotTableIdentifier, Maps.newHashMap(), 
Lists.newArrayList("col1"));
         long id = 10086;
@@ -1685,6 +1686,47 @@ public abstract class RESTCatalogTest extends 
CatalogTestBase {
         table.newReadBuilder().withProjection(new int[] {1}).newScan().plan();
     }
 
+    @Test
+    void testSnapshotMethods() throws Exception {
+        Identifier identifier = Identifier.create("test_table_db", 
"snapshots_table");
+        catalog.createDatabase(identifier.getDatabaseName(), true);
+        catalog.createTable(
+                identifier,
+                new Schema(
+                        Lists.newArrayList(new DataField(0, "col", 
DataTypes.INT())),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        emptyMap(),
+                        ""),
+                true);
+        Table table = catalog.getTable(identifier);
+        batchWrite(table, singletonList(1));
+        batchWrite(table, singletonList(2));
+        batchWrite(table, singletonList(3));
+        batchWrite(table, singletonList(4));
+
+        assertThat(catalog.listSnapshotsPaged(identifier, null, 
null).getElements())
+                .containsExactlyInAnyOrder(
+                        table.snapshot(1), table.snapshot(2), 
table.snapshot(3), table.snapshot(4));
+
+        assertThat(catalog.loadSnapshot(identifier, "3"))
+                .isPresent()
+                .get()
+                .isEqualTo(table.snapshot(3));
+        assertThat(catalog.loadSnapshot(identifier, "EARLIEST"))
+                .isPresent()
+                .get()
+                .isEqualTo(table.snapshot(1));
+
+        table.createTag("MY_TAG", 2);
+        assertThat(catalog.loadSnapshot(identifier, "MY_TAG"))
+                .isPresent()
+                .get()
+                .isEqualTo(table.snapshot(2));
+
+        assertThat(catalog.loadSnapshot(identifier, "15")).isEmpty();
+    }
+
     private TestPagedResponse generateTestPagedResponse(
             Map<String, String> queryParams,
             List<Integer> testData,
@@ -1778,8 +1820,8 @@ public abstract class RESTCatalogTest extends 
CatalogTestBase {
             long fileCount,
             long lastFileCreationTime);
 
-    protected void batchWrite(FileStoreTable tableTestWrite, List<Integer> 
data) throws Exception {
-        BatchWriteBuilder writeBuilder = tableTestWrite.newBatchWriteBuilder();
+    protected void batchWrite(Table table, List<Integer> data) throws 
Exception {
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
         BatchTableWrite write = writeBuilder.newWrite();
         for (Integer i : data) {
             GenericRow record = GenericRow.of(i);
@@ -1792,8 +1834,8 @@ public abstract class RESTCatalogTest extends 
CatalogTestBase {
         commit.close();
     }
 
-    protected List<String> batchRead(FileStoreTable tableTestWrite) throws 
IOException {
-        ReadBuilder readBuilder = tableTestWrite.newReadBuilder();
+    protected List<String> batchRead(Table table) throws IOException {
+        ReadBuilder readBuilder = table.newReadBuilder();
         List<Split> splits = readBuilder.newScan().plan().splits();
         TableRead read = readBuilder.newRead();
         RecordReader<InternalRow> reader = read.createReader(splits);


Reply via email to