This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 393450d205 [rest] Add tag rest api definition (#6792)
393450d205 is described below

commit 393450d20537b4c24382315876d27ffc65dda704
Author: kevin <[email protected]>
AuthorDate: Fri Dec 12 16:49:03 2025 +0800

    [rest] Add tag rest api definition (#6792)
---
 docs/static/rest-catalog-open-api.yaml             | 255 +++++++++++++++++++++
 .../main/java/org/apache/paimon/rest/RESTApi.java  |  92 ++++++++
 .../java/org/apache/paimon/rest/ResourcePaths.java |  24 ++
 .../paimon/rest/requests/CreateTagRequest.java     |  75 ++++++
 .../paimon/rest/responses/GetTagResponse.java      |  87 +++++++
 .../paimon/rest/responses/ListTagsResponse.java    |  63 +++++
 .../org/apache/paimon/catalog/AbstractCatalog.java |  32 +++
 .../java/org/apache/paimon/catalog/Catalog.java    |  98 ++++++++
 .../org/apache/paimon/catalog/DelegateCatalog.java |  32 +++
 .../java/org/apache/paimon/rest/RESTCatalog.java   | 118 ++++++++++
 .../org/apache/paimon/rest/RESTCatalogServer.java  | 185 +++++++++++++++
 .../org/apache/paimon/rest/RESTCatalogTest.java    |  95 ++++++++
 12 files changed, 1156 insertions(+)

diff --git a/docs/static/rest-catalog-open-api.yaml 
b/docs/static/rest-catalog-open-api.yaml
index 9bfc7cbdfb..97f5789ef0 100644
--- a/docs/static/rest-catalog-open-api.yaml
+++ b/docs/static/rest-catalog-open-api.yaml
@@ -1116,6 +1116,176 @@ paths:
           $ref: '#/components/responses/BranchNotExistErrorResponse'
         "500":
           $ref: '#/components/responses/ServerErrorResponse'
+  /v1/{prefix}/databases/{database}/tables/{table}/tags:
+    get:
+      tags:
+        - tag
+      summary: List tags
+      operationId: listTags
+      parameters:
+        - name: prefix
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: database
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: table
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: maxResults
+          in: query
+          schema:
+            type: integer
+            format: int32
+        - name: pageToken
+          in: query
+          schema:
+            type: string
+      responses:
+        "200":
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ListTagsResponse'
+        "401":
+          $ref: '#/components/responses/UnauthorizedErrorResponse'
+        "404":
+          $ref: '#/components/responses/TableNotExistErrorResponse'
+        "500":
+          $ref: '#/components/responses/ServerErrorResponse'
+    post:
+      tags:
+        - tag
+      summary: Create tag
+      operationId: createTag
+      parameters:
+        - name: prefix
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: database
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: table
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/CreateTagRequest'
+      responses:
+        "200":
+          description: Success, no content
+        "400":
+          $ref: '#/components/responses/BadRequestErrorResponse'
+        "401":
+          $ref: '#/components/responses/UnauthorizedErrorResponse'
+        "404":
+          description:
+            Not Found
+            - TableNotExistException, table does not exist
+            - SnapshotNotExistException, the requested snapshot does not exist
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/responses/ResourceNotExistErrorResponse'
+              examples:
+                TableNotExist:
+                  $ref: '#/components/examples/TableNotExistError'
+                SnapshotNotExist:
+                  $ref: '#/components/examples/SnapshotNotExistError'
+        "409":
+          $ref: '#/components/responses/TagAlreadyExistErrorResponse'
+        "500":
+          $ref: '#/components/responses/ServerErrorResponse'
+  /v1/{prefix}/databases/{database}/tables/{table}/tags/{tag}:
+    get:
+      tags:
+        - tag
+      summary: Get tag
+      operationId: getTag
+      parameters:
+        - name: prefix
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: database
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: table
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: tag
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        "200":
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/GetTagResponse'
+        "401":
+          $ref: '#/components/responses/UnauthorizedErrorResponse'
+        "404":
+          $ref: '#/components/responses/TagNotExistErrorResponse'
+        "500":
+          $ref: '#/components/responses/ServerErrorResponse'
+    delete:
+      tags:
+        - tag
+      summary: Delete tag
+      operationId: deleteTag
+      parameters:
+        - name: prefix
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: database
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: table
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: tag
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        "200":
+          description: Success, no content
+        "401":
+          $ref: '#/components/responses/UnauthorizedErrorResponse'
+        "404":
+          $ref: '#/components/responses/TagNotExistErrorResponse'
+        "500":
+          $ref: '#/components/responses/ServerErrorResponse'
   /v1/{prefix}/databases/{database}/views:
     get:
       tags:
@@ -1790,6 +1960,20 @@ components:
               "resourceName": "branch",
               "code": 404
             }
+    TagNotExistErrorResponse:
+      description:
+        Not Found - TagNotExistException, the tag does not exist
+      content:
+        application/json:
+          schema:
+            $ref: '#/components/responses/ResourceNotExistErrorResponse'
+          example:
+            {
+              "message": "The given tag does not exist",
+              "resourceType": "TAG",
+              "resourceName": "tag1",
+              "code": 404
+            }
     ViewNotExistErrorResponse:
       description:
         Not Found - ViewNotExistException, the view does not exist
@@ -1870,6 +2054,19 @@ components:
               "resourceName": "branch",
               "code": 409
             }
+    TagAlreadyExistErrorResponse:
+      description: Conflict - The tag already exists
+      content:
+        application/json:
+          schema:
+            $ref: '#/components/responses/ResourceAlreadyExistErrorResponse'
+          example:
+            {
+              "message": "The given tag already exists",
+              "resourceType": "TAG",
+              "resourceName": "tag1",
+              "code": 409
+            }
     ViewAlreadyExistErrorResponse:
       description: Conflict - The view already exists
       content:
@@ -2844,6 +3041,64 @@ components:
           type: array
           items:
             type: string
+    CreateTagRequest:
+      type: object
+      required:
+        - tagName
+      properties:
+        tagName:
+          type: string
+        snapshotId:
+          type: integer
+          format: int64
+          nullable: true
+          description: Optional snapshot id, if not provided uses latest 
snapshot
+        timeRetained:
+          type: string
+          nullable: true
+          description: Optional time retained as string (e.g., "1d", "12h", 
"30m")
+        ignoreIfExists:
+          type: boolean
+          default: false
+          description: If true, ignore if tag already exists
+    GetTagResponse:
+      type: object
+      properties:
+        tagName:
+          type: string
+        snapshot:
+          $ref: '#/components/schemas/Snapshot'
+        tagCreateTime:
+          type: integer
+          format: int64
+          nullable: true
+        tagTimeRetained:
+          type: string
+          nullable: true
+    ListTagsResponse:
+      type: object
+      properties:
+        tags:
+          type: array
+          items:
+            type: string
+        nextPageToken:
+          type: string
+    TagInfo:
+      type: object
+      properties:
+        tagName:
+          type: string
+        snapshotId:
+          type: integer
+          format: int64
+        tagCreateTime:
+          type: integer
+          format: int64
+          nullable: true
+        tagTimeRetained:
+          type: string
+          nullable: true
     GetViewResponse:
       type: object
       properties:
diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java 
b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
index defcd8d3f3..648c395513 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/RESTApi.java
@@ -42,6 +42,7 @@ import org.apache.paimon.rest.requests.CreateBranchRequest;
 import org.apache.paimon.rest.requests.CreateDatabaseRequest;
 import org.apache.paimon.rest.requests.CreateFunctionRequest;
 import org.apache.paimon.rest.requests.CreateTableRequest;
+import org.apache.paimon.rest.requests.CreateTagRequest;
 import org.apache.paimon.rest.requests.CreateViewRequest;
 import org.apache.paimon.rest.requests.ForwardBranchRequest;
 import org.apache.paimon.rest.requests.MarkDonePartitionsRequest;
@@ -58,6 +59,7 @@ import org.apache.paimon.rest.responses.GetFunctionResponse;
 import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.rest.responses.GetTableSnapshotResponse;
 import org.apache.paimon.rest.responses.GetTableTokenResponse;
+import org.apache.paimon.rest.responses.GetTagResponse;
 import org.apache.paimon.rest.responses.GetVersionSnapshotResponse;
 import org.apache.paimon.rest.responses.GetViewResponse;
 import org.apache.paimon.rest.responses.ListBranchesResponse;
@@ -70,6 +72,7 @@ import org.apache.paimon.rest.responses.ListSnapshotsResponse;
 import org.apache.paimon.rest.responses.ListTableDetailsResponse;
 import org.apache.paimon.rest.responses.ListTablesGloballyResponse;
 import org.apache.paimon.rest.responses.ListTablesResponse;
+import org.apache.paimon.rest.responses.ListTagsResponse;
 import org.apache.paimon.rest.responses.ListViewDetailsResponse;
 import org.apache.paimon.rest.responses.ListViewsGloballyResponse;
 import org.apache.paimon.rest.responses.ListViewsResponse;
@@ -846,6 +849,95 @@ public class RESTApi {
         return response.branches();
     }
 
+    /**
+     * Get tag for table.
+     *
+     * @param identifier database name and table name.
+     * @param tagName tag name
+     * @return {@link GetTagResponse}
+     * @throws NoSuchResourceException Exception thrown on HTTP 404 means the 
tag not exists
+     * @throws ForbiddenException Exception thrown on HTTP 403 means don't 
have the permission for
+     *     this table
+     */
+    public GetTagResponse getTag(Identifier identifier, String tagName) {
+        return client.get(
+                resourcePaths.tag(
+                        identifier.getDatabaseName(), 
identifier.getObjectName(), tagName),
+                GetTagResponse.class,
+                restAuthFunction);
+    }
+
+    /**
+     * Create tag for table.
+     *
+     * @param identifier database name and table name.
+     * @param tagName tag name
+     * @param snapshotId optional snapshot id, if not provided uses latest 
snapshot
+     * @param timeRetained optional time retained as string (e.g., "1d", 
"12h", "30m")
+     * @throws NoSuchResourceException Exception thrown on HTTP 404 means the 
table or snapshot not
+     *     exists
+     * @throws AlreadyExistsException Exception thrown on HTTP 409 means the 
tag already exists
+     * @throws ForbiddenException Exception thrown on HTTP 403 means don't 
have the permission for
+     *     this table
+     */
+    public void createTag(
+            Identifier identifier,
+            String tagName,
+            @Nullable Long snapshotId,
+            @Nullable String timeRetained) {
+        CreateTagRequest request = new CreateTagRequest(tagName, snapshotId, 
timeRetained);
+        client.post(
+                resourcePaths.tags(identifier.getDatabaseName(), 
identifier.getObjectName()),
+                request,
+                restAuthFunction);
+    }
+
+    /**
+     * Get paged list names of tags under this table. An empty list is 
returned if none tag exists.
+     *
+     * @param identifier database name and table name.
+     * @param maxResults Optional parameter indicating the maximum number of 
results to include in
+     *     the result. If maxResults is not specified or set to 0, will return 
the default number of
+     *     max results.
+     * @param pageToken Optional parameter indicating the next page token 
allows list to be start
+     *     from a specific point.
+     * @return {@link PagedList}: elements and nextPageToken.
+     * @throws NoSuchResourceException Exception thrown on HTTP 404 means the 
table not exists
+     * @throws ForbiddenException Exception thrown on HTTP 403 means don't 
have the permission for
+     *     this table
+     */
+    public PagedList<String> listTagsPaged(
+            Identifier identifier, @Nullable Integer maxResults, @Nullable 
String pageToken) {
+        ListTagsResponse response =
+                client.get(
+                        resourcePaths.tags(
+                                identifier.getDatabaseName(), 
identifier.getObjectName()),
+                        buildPagedQueryParams(maxResults, pageToken),
+                        ListTagsResponse.class,
+                        restAuthFunction);
+        List<String> tags = response.tags();
+        if (tags == null) {
+            return new PagedList<>(emptyList(), null);
+        }
+        return new PagedList<>(tags, response.getNextPageToken());
+    }
+
+    /**
+     * Delete tag for table.
+     *
+     * @param identifier database name and table name.
+     * @param tagName tag name
+     * @throws NoSuchResourceException Exception thrown on HTTP 404 means the 
tag not exists
+     * @throws ForbiddenException Exception thrown on HTTP 403 means don't 
have the permission for
+     *     this table
+     */
+    public void deleteTag(Identifier identifier, String tagName) {
+        client.delete(
+                resourcePaths.tag(
+                        identifier.getDatabaseName(), 
identifier.getObjectName(), tagName),
+                restAuthFunction);
+    }
+
     /**
      * List functions for database.
      *
diff --git a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java 
b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
index 7bf67eba40..8d288dc205 100644
--- a/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
+++ b/paimon-api/src/main/java/org/apache/paimon/rest/ResourcePaths.java
@@ -32,6 +32,7 @@ public class ResourcePaths {
     protected static final String TABLES = "tables";
     protected static final String PARTITIONS = "partitions";
     protected static final String BRANCHES = "branches";
+    protected static final String TAGS = "tags";
     protected static final String SNAPSHOTS = "snapshots";
     protected static final String VIEWS = "views";
     protected static final String TABLE_DETAILS = "table-details";
@@ -232,6 +233,29 @@ public class ResourcePaths {
                 "forward");
     }
 
+    public String tags(String databaseName, String objectName) {
+        return SLASH.join(
+                V1,
+                prefix,
+                DATABASES,
+                encodeString(databaseName),
+                TABLES,
+                encodeString(objectName),
+                TAGS);
+    }
+
+    public String tag(String databaseName, String objectName, String tagName) {
+        return SLASH.join(
+                V1,
+                prefix,
+                DATABASES,
+                encodeString(databaseName),
+                TABLES,
+                encodeString(objectName),
+                TAGS,
+                encodeString(tagName));
+    }
+
     public String views(String databaseName) {
         return SLASH.join(V1, prefix, DATABASES, encodeString(databaseName), 
VIEWS);
     }
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/rest/requests/CreateTagRequest.java
 
b/paimon-api/src/main/java/org/apache/paimon/rest/requests/CreateTagRequest.java
new file mode 100644
index 0000000000..49cc0f20b7
--- /dev/null
+++ 
b/paimon-api/src/main/java/org/apache/paimon/rest/requests/CreateTagRequest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.requests;
+
+import org.apache.paimon.rest.RESTRequest;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+/** Request for creating tag. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CreateTagRequest implements RESTRequest {
+
+    private static final String FIELD_TAG_NAME = "tagName";
+    private static final String FIELD_SNAPSHOT_ID = "snapshotId";
+    private static final String FIELD_TIME_RETAINED = "timeRetained";
+
+    @JsonProperty(FIELD_TAG_NAME)
+    private final String tagName;
+
+    @Nullable
+    @JsonProperty(FIELD_SNAPSHOT_ID)
+    private final Long snapshotId;
+
+    @Nullable
+    @JsonProperty(FIELD_TIME_RETAINED)
+    private final String timeRetained;
+
+    @JsonCreator
+    public CreateTagRequest(
+            @JsonProperty(FIELD_TAG_NAME) String tagName,
+            @Nullable @JsonProperty(FIELD_SNAPSHOT_ID) Long snapshotId,
+            @Nullable @JsonProperty(FIELD_TIME_RETAINED) String timeRetained) {
+        this.tagName = tagName;
+        this.snapshotId = snapshotId;
+        this.timeRetained = timeRetained;
+    }
+
+    @JsonGetter(FIELD_TAG_NAME)
+    public String tagName() {
+        return tagName;
+    }
+
+    @Nullable
+    @JsonGetter(FIELD_SNAPSHOT_ID)
+    public Long snapshotId() {
+        return snapshotId;
+    }
+
+    @Nullable
+    @JsonGetter(FIELD_TIME_RETAINED)
+    public String timeRetained() {
+        return timeRetained;
+    }
+}
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/rest/responses/GetTagResponse.java 
b/paimon-api/src/main/java/org/apache/paimon/rest/responses/GetTagResponse.java
new file mode 100644
index 0000000000..28a7c31f71
--- /dev/null
+++ 
b/paimon-api/src/main/java/org/apache/paimon/rest/responses/GetTagResponse.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.responses;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.rest.RESTResponse;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+/** Response for getting tag. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class GetTagResponse implements RESTResponse {
+
+    private static final String FIELD_TAG_NAME = "tagName";
+    private static final String FIELD_SNAPSHOT = "snapshot";
+    private static final String FIELD_TAG_CREATE_TIME = "tagCreateTime";
+    private static final String FIELD_TAG_TIME_RETAINED = "tagTimeRetained";
+
+    @JsonProperty(FIELD_TAG_NAME)
+    private final String tagName;
+
+    @JsonProperty(FIELD_SNAPSHOT)
+    private final Snapshot snapshot;
+
+    @Nullable
+    @JsonProperty(FIELD_TAG_CREATE_TIME)
+    private final Long tagCreateTime;
+
+    @Nullable
+    @JsonProperty(FIELD_TAG_TIME_RETAINED)
+    private final String tagTimeRetained;
+
+    @JsonCreator
+    public GetTagResponse(
+            @JsonProperty(FIELD_TAG_NAME) String tagName,
+            @JsonProperty(FIELD_SNAPSHOT) Snapshot snapshot,
+            @Nullable @JsonProperty(FIELD_TAG_CREATE_TIME) Long tagCreateTime,
+            @Nullable @JsonProperty(FIELD_TAG_TIME_RETAINED) String 
tagTimeRetained) {
+        this.tagName = tagName;
+        this.snapshot = snapshot;
+        this.tagCreateTime = tagCreateTime;
+        this.tagTimeRetained = tagTimeRetained;
+    }
+
+    @JsonGetter(FIELD_TAG_NAME)
+    public String tagName() {
+        return tagName;
+    }
+
+    @JsonGetter(FIELD_SNAPSHOT)
+    public Snapshot snapshot() {
+        return snapshot;
+    }
+
+    @Nullable
+    @JsonGetter(FIELD_TAG_CREATE_TIME)
+    public Long tagCreateTime() {
+        return tagCreateTime;
+    }
+
+    @Nullable
+    @JsonGetter(FIELD_TAG_TIME_RETAINED)
+    public String tagTimeRetained() {
+        return tagTimeRetained;
+    }
+}
diff --git 
a/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListTagsResponse.java
 
b/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListTagsResponse.java
new file mode 100644
index 0000000000..ad02e6a4b2
--- /dev/null
+++ 
b/paimon-api/src/main/java/org/apache/paimon/rest/responses/ListTagsResponse.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.responses;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/** Response for listing tags. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ListTagsResponse implements PagedResponse<String> {
+
+    private static final String FIELD_TAGS = "tags";
+    private static final String FIELD_NEXT_PAGE_TOKEN = "nextPageToken";
+
+    @JsonProperty(FIELD_TAGS)
+    private final List<String> tags;
+
+    @JsonProperty(FIELD_NEXT_PAGE_TOKEN)
+    private final String nextPageToken;
+
+    @JsonCreator
+    public ListTagsResponse(
+            @JsonProperty(FIELD_TAGS) List<String> tags,
+            @JsonProperty(FIELD_NEXT_PAGE_TOKEN) String nextPageToken) {
+        this.tags = tags;
+        this.nextPageToken = nextPageToken;
+    }
+
+    @JsonGetter(FIELD_TAGS)
+    public List<String> tags() {
+        return this.tags;
+    }
+
+    @JsonGetter(FIELD_NEXT_PAGE_TOKEN)
+    public String getNextPageToken() {
+        return this.nextPageToken;
+    }
+
+    @Override
+    public List<String> data() {
+        return tags();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 48933b83d8..97e8436a75 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -30,6 +30,7 @@ import org.apache.paimon.function.FunctionChange;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.partition.PartitionStatistics;
+import org.apache.paimon.rest.responses.GetTagResponse;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
@@ -41,6 +42,7 @@ import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableSnapshot;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.system.SystemTableLoader;
+import org.apache.paimon.utils.SnapshotNotExistException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -500,6 +502,36 @@ public abstract class AbstractCatalog implements Catalog {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public GetTagResponse getTag(Identifier identifier, String tagName)
+            throws TableNotExistException, TagNotExistException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void createTag(
+            Identifier identifier,
+            String tagName,
+            @Nullable Long snapshotId,
+            @Nullable String timeRetained,
+            boolean ignoreIfExists)
+            throws TableNotExistException, SnapshotNotExistException, 
TagAlreadyExistException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public PagedList<String> listTagsPaged(
+            Identifier identifier, @Nullable Integer maxResults, @Nullable 
String pageToken)
+            throws TableNotExistException {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void deleteTag(Identifier identifier, String tagName)
+            throws TableNotExistException, TagNotExistException {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public boolean commitSnapshot(
             Identifier identifier,
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index a3510777d7..c21c64b928 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -26,11 +26,13 @@ import org.apache.paimon.function.Function;
 import org.apache.paimon.function.FunctionChange;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.partition.PartitionStatistics;
+import org.apache.paimon.rest.responses.GetTagResponse;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.Instant;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableSnapshot;
+import org.apache.paimon.utils.SnapshotNotExistException;
 import org.apache.paimon.view.View;
 import org.apache.paimon.view.ViewChange;
 
@@ -762,6 +764,75 @@ public interface Catalog extends AutoCloseable {
      */
     List<String> listBranches(Identifier identifier) throws 
TableNotExistException;
 
+    /**
+     * Get tag for table.
+     *
+     * @param identifier path of the table, cannot be system name.
+     * @param tagName tag name
+     * @return {@link GetTagResponse} containing tag information
+     * @throws TableNotExistException if the table does not exist
+     * @throws TagNotExistException if the tag does not exist
+     * @throws UnsupportedOperationException if the catalog does not {@link
+     *     #supportsVersionManagement()}
+     */
+    GetTagResponse getTag(Identifier identifier, String tagName)
+            throws TableNotExistException, TagNotExistException;
+
+    /**
+     * Create tag for table.
+     *
+     * @param identifier path of the table, cannot be system name.
+     * @param tagName tag name
+     * @param snapshotId optional snapshot id, if not provided uses latest 
snapshot
+     * @param timeRetained optional time retained as string (e.g., "1d", 
"12h", "30m")
+     * @param ignoreIfExists if true, ignore if tag already exists
+     * @throws TableNotExistException if the table does not exist
+     * @throws SnapshotNotExistException if the snapshot does not exist
+     * @throws TagAlreadyExistException if the tag already exists and 
ignoreIfExists is false
+     * @throws UnsupportedOperationException if the catalog does not {@link
+     *     #supportsVersionManagement()}
+     */
+    void createTag(
+            Identifier identifier,
+            String tagName,
+            @Nullable Long snapshotId,
+            @Nullable String timeRetained,
+            boolean ignoreIfExists)
+            throws TableNotExistException, SnapshotNotExistException, 
TagAlreadyExistException;
+
+    /**
+     * Get paged list names of tags under this table. An empty list is 
returned if none tag exists.
+     *
+     * @param identifier path of the table, cannot be system name.
+     * @param maxResults Optional parameter indicating the maximum number of 
results to include in
+     *     the result. If maxResults is not specified or set to 0, will return 
the default number of
+     *     max results.
+     * @param pageToken Optional parameter indicating the next page token 
allows list to be start
+     *     from a specific point.
+     * @return a list of the names of tags with provided page size in this 
table and next page
+     *     token, or a list of the names of all tags in this table if the 
catalog does not {@link
+     *     #supportsListObjectsPaged()}.
+     * @throws TableNotExistException if the table does not exist
+     * @throws UnsupportedOperationException if the catalog does not {@link
+     *     #supportsVersionManagement()} or it does not {@link 
#supportsListByPattern()}
+     */
+    PagedList<String> listTagsPaged(
+            Identifier identifier, @Nullable Integer maxResults, @Nullable 
String pageToken)
+            throws TableNotExistException;
+
+    /**
+     * Delete tag for table.
+     *
+     * @param identifier path of the table, cannot be system name.
+     * @param tagName tag name
+     * @throws TableNotExistException if the table does not exist
+     * @throws TagNotExistException if the tag does not exist
+     * @throws UnsupportedOperationException if the catalog does not {@link
+     *     #supportsVersionManagement()}
+     */
+    void deleteTag(Identifier identifier, String tagName)
+            throws TableNotExistException, TagNotExistException;
+
     // ==================== Partition Modifications ==========================
 
     /**
@@ -1349,6 +1420,33 @@ public interface Catalog extends AutoCloseable {
         }
     }
 
+    /** Exception for trying to create a tag that already exists. */
+    class TagAlreadyExistException extends Exception {
+
+        private static final String MSG = "Tag %s in table %s already exists.";
+
+        private final Identifier identifier;
+        private final String tag;
+
+        public TagAlreadyExistException(Identifier identifier, String tag) {
+            this(identifier, tag, null);
+        }
+
+        public TagAlreadyExistException(Identifier identifier, String tag, 
Throwable cause) {
+            super(String.format(MSG, tag, identifier.getFullName()), cause);
+            this.identifier = identifier;
+            this.tag = tag;
+        }
+
+        public Identifier identifier() {
+            return identifier;
+        }
+
+        public String tag() {
+            return tag;
+        }
+    }
+
     /** Exception for trying to update dialect that doesn't exist. */
     class DialectNotExistException extends Exception {
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index a4e90e5049..9fc057aa90 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -24,11 +24,13 @@ import org.apache.paimon.function.Function;
 import org.apache.paimon.function.FunctionChange;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.partition.PartitionStatistics;
+import org.apache.paimon.rest.responses.GetTagResponse;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.Instant;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableSnapshot;
+import org.apache.paimon.utils.SnapshotNotExistException;
 import org.apache.paimon.view.View;
 import org.apache.paimon.view.ViewChange;
 
@@ -231,6 +233,36 @@ public abstract class DelegateCatalog implements Catalog {
         return wrapped.listBranches(identifier);
     }
 
+    @Override
+    public GetTagResponse getTag(Identifier identifier, String tagName)
+            throws TableNotExistException, TagNotExistException {
+        return wrapped.getTag(identifier, tagName);
+    }
+
+    @Override
+    public void createTag(
+            Identifier identifier,
+            String tagName,
+            @Nullable Long snapshotId,
+            @Nullable String timeRetained,
+            boolean ignoreIfExists)
+            throws TableNotExistException, SnapshotNotExistException, 
TagAlreadyExistException {
+        wrapped.createTag(identifier, tagName, snapshotId, timeRetained, 
ignoreIfExists);
+    }
+
+    @Override
+    public PagedList<String> listTagsPaged(
+            Identifier identifier, @Nullable Integer maxResults, @Nullable 
String pageToken)
+            throws TableNotExistException {
+        return wrapped.listTagsPaged(identifier, maxResults, pageToken);
+    }
+
+    @Override
+    public void deleteTag(Identifier identifier, String tagName)
+            throws TableNotExistException, TagNotExistException {
+        wrapped.deleteTag(identifier, tagName);
+    }
+
     @Override
     public boolean commitSnapshot(
             Identifier identifier,
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index a35e9682c8..cff58bf713 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -47,6 +47,7 @@ import org.apache.paimon.rest.responses.ErrorResponse;
 import org.apache.paimon.rest.responses.GetDatabaseResponse;
 import org.apache.paimon.rest.responses.GetFunctionResponse;
 import org.apache.paimon.rest.responses.GetTableResponse;
+import org.apache.paimon.rest.responses.GetTagResponse;
 import org.apache.paimon.rest.responses.GetViewResponse;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
@@ -58,6 +59,7 @@ import org.apache.paimon.table.TableSnapshot;
 import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.system.SystemTableLoader;
 import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.SnapshotNotExistException;
 import org.apache.paimon.view.View;
 import org.apache.paimon.view.ViewChange;
 import org.apache.paimon.view.ViewImpl;
@@ -969,6 +971,122 @@ public class RESTCatalog implements Catalog {
         }
     }
 
+    /**
+     * Get tag for table.
+     *
+     * @param identifier database name and table name.
+     * @param tagName tag name
+     * @return {@link GetTagResponse}
+     * @throws TableNotExistException if the table does not exist
+     * @throws TagNotExistException if the tag does not exist
+     * @throws TableNoPermissionException if don't have the permission for 
this table
+     */
+    @Override
+    public GetTagResponse getTag(Identifier identifier, String tagName)
+            throws TableNotExistException, TagNotExistException {
+        try {
+            return api.getTag(identifier, tagName);
+        } catch (NoSuchResourceException e) {
+            if (StringUtils.equals(e.resourceType(), 
ErrorResponse.RESOURCE_TYPE_TAG)) {
+                throw new TagNotExistException(identifier, tagName);
+            }
+            throw new TableNotExistException(identifier);
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        }
+    }
+
+    /**
+     * Create tag for table.
+     *
+     * @param identifier database name and table name.
+     * @param tagName tag name
+     * @param snapshotId optional snapshot id, if not provided uses latest 
snapshot
+     * @param timeRetained optional time retained as string (e.g., "1d", 
"12h", "30m")
+     * @param ignoreIfExists if true, ignore if tag already exists
+     * @throws TableNotExistException if the table does not exist
+     * @throws SnapshotNotExistException if the snapshot does not exist
+     * @throws TagAlreadyExistException if the tag already exists and 
ignoreIfExists is false
+     * @throws TableNoPermissionException if don't have the permission for 
this table
+     */
+    @Override
+    public void createTag(
+            Identifier identifier,
+            String tagName,
+            @Nullable Long snapshotId,
+            @Nullable String timeRetained,
+            boolean ignoreIfExists)
+            throws TableNotExistException, SnapshotNotExistException, 
TagAlreadyExistException {
+        try {
+            api.createTag(identifier, tagName, snapshotId, timeRetained);
+        } catch (AlreadyExistsException e) {
+            if (!ignoreIfExists) {
+                throw new TagAlreadyExistException(identifier, tagName);
+            }
+        } catch (NoSuchResourceException e) {
+            if (StringUtils.equals(e.resourceType(), 
ErrorResponse.RESOURCE_TYPE_SNAPSHOT)) {
+                throw new SnapshotNotExistException(
+                        String.format(
+                                "Snapshot %s in table %s doesn't exist.",
+                                e.resourceName(), identifier.getFullName()));
+            }
+            throw new TableNotExistException(identifier);
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        } catch (BadRequestException e) {
+            throw new IllegalArgumentException(e.getMessage());
+        }
+    }
+
+    /**
+     * Get paged list names of tags under this table. An empty list is 
returned if none tag exists.
+     *
+     * @param identifier database name and table name.
+     * @param maxResults Optional parameter indicating the maximum number of 
results to include in
+     *     the result. If maxResults is not specified or set to 0, will return 
the default number of
+     *     max results.
+     * @param pageToken Optional parameter indicating the next page token 
allows list to be start
+     *     from a specific point.
+     * @return a list of the names of tags with provided page size in this 
table and next page token
+     * @throws TableNotExistException if the table does not exist
+     * @throws TableNoPermissionException if don't have the permission for 
this table
+     */
+    @Override
+    public PagedList<String> listTagsPaged(
+            Identifier identifier, @Nullable Integer maxResults, @Nullable 
String pageToken)
+            throws TableNotExistException {
+        try {
+            return api.listTagsPaged(identifier, maxResults, pageToken);
+        } catch (NoSuchResourceException e) {
+            throw new TableNotExistException(identifier);
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        }
+    }
+
+    /**
+     * Delete tag for table.
+     *
+     * @param identifier database name and table name.
+     * @param tagName tag name
+     * @throws TableNotExistException if the table does not exist
+     * @throws TagNotExistException if the tag does not exist
+     * @throws TableNoPermissionException if don't have the permission for 
this table
+     */
+    public void deleteTag(Identifier identifier, String tagName)
+            throws TableNotExistException, TagNotExistException {
+        try {
+            api.deleteTag(identifier, tagName);
+        } catch (NoSuchResourceException e) {
+            if (StringUtils.equals(e.resourceType(), 
ErrorResponse.RESOURCE_TYPE_TAG)) {
+                throw new TagNotExistException(identifier, tagName);
+            }
+            throw new TableNotExistException(identifier);
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        }
+    }
+
     @Override
     public boolean caseSensitive() {
         return context.options().getOptional(CASE_SENSITIVE).orElse(true);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 08fc1c9778..f7ede176c2 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -53,6 +53,7 @@ import org.apache.paimon.rest.requests.CreateBranchRequest;
 import org.apache.paimon.rest.requests.CreateDatabaseRequest;
 import org.apache.paimon.rest.requests.CreateFunctionRequest;
 import org.apache.paimon.rest.requests.CreateTableRequest;
+import org.apache.paimon.rest.requests.CreateTagRequest;
 import org.apache.paimon.rest.requests.CreateViewRequest;
 import org.apache.paimon.rest.requests.MarkDonePartitionsRequest;
 import org.apache.paimon.rest.requests.RenameTableRequest;
@@ -67,6 +68,7 @@ import org.apache.paimon.rest.responses.GetFunctionResponse;
 import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.rest.responses.GetTableSnapshotResponse;
 import org.apache.paimon.rest.responses.GetTableTokenResponse;
+import org.apache.paimon.rest.responses.GetTagResponse;
 import org.apache.paimon.rest.responses.GetVersionSnapshotResponse;
 import org.apache.paimon.rest.responses.GetViewResponse;
 import org.apache.paimon.rest.responses.ListBranchesResponse;
@@ -79,6 +81,7 @@ import org.apache.paimon.rest.responses.ListSnapshotsResponse;
 import org.apache.paimon.rest.responses.ListTableDetailsResponse;
 import org.apache.paimon.rest.responses.ListTablesGloballyResponse;
 import org.apache.paimon.rest.responses.ListTablesResponse;
+import org.apache.paimon.rest.responses.ListTagsResponse;
 import org.apache.paimon.rest.responses.ListViewDetailsResponse;
 import org.apache.paimon.rest.responses.ListViewsGloballyResponse;
 import org.apache.paimon.rest.responses.ListViewsResponse;
@@ -97,6 +100,8 @@ import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.LazyField;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+import org.apache.paimon.utils.TimeUtils;
 import org.apache.paimon.view.View;
 import org.apache.paimon.view.ViewChange;
 import org.apache.paimon.view.ViewImpl;
@@ -113,8 +118,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -389,6 +397,14 @@ public class RESTCatalogServer {
                                 resources.length >= 4
                                         && 
ResourcePaths.TABLES.equals(resources[1])
                                         && "branches".equals(resources[3]);
+                        boolean isTags =
+                                resources.length >= 4
+                                        && 
ResourcePaths.TABLES.equals(resources[1])
+                                        && 
resources[3].startsWith(ResourcePaths.TAGS);
+                        boolean isTag =
+                                resources.length >= 5
+                                        && 
ResourcePaths.TABLES.equals(resources[1])
+                                        && 
ResourcePaths.TAGS.equals(resources[3]);
                         Identifier identifier =
                                 resources.length >= 3
                                                 && 
!"rename".equals(resources[2])
@@ -430,6 +446,13 @@ public class RESTCatalogServer {
                                     restAuthParameter.method(),
                                     restAuthParameter.data(),
                                     identifier);
+                        } else if (isTags || isTag) {
+                            return tagApiHandle(
+                                    resources,
+                                    restAuthParameter.method(),
+                                    restAuthParameter.data(),
+                                    parameters,
+                                    identifier);
                         } else if (isTableToken) {
                             return getDataTokenHandle(identifier);
                         } else if (isTableSnapshot) {
@@ -1683,6 +1706,168 @@ public class RESTCatalogServer {
         return new MockResponse().setResponseCode(404);
     }
 
+    private MockResponse tagApiHandle(
+            String[] resources,
+            String method,
+            String data,
+            Map<String, String> parameters,
+            Identifier identifier)
+            throws Exception {
+        RESTResponse response;
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+        TagManager tagManager = table.tagManager();
+        String tagName = "";
+        try {
+            switch (method) {
+                case "GET":
+                    if (resources.length == 4) {
+                        // GET 
/v1/{prefix}/databases/{database}/tables/{table}/tags
+                        // Page list tags
+                        List<String> tags = new 
ArrayList<>(tagManager.allTagNames());
+                        if (tags.isEmpty()) {
+                            response = new 
ListTagsResponse(Collections.emptyList(), null);
+                            return mockResponse(response, 200);
+                        }
+                        int maxResults;
+                        try {
+                            maxResults = getMaxResults(parameters);
+                        } catch (NumberFormatException e) {
+                            return handleInvalidMaxResults(parameters);
+                        }
+                        String pageToken = parameters.getOrDefault(PAGE_TOKEN, 
null);
+                        PagedList<String> pagedTags =
+                                buildPagedEntities(tags, maxResults, 
pageToken);
+                        response =
+                                new ListTagsResponse(
+                                        pagedTags.getElements(), 
pagedTags.getNextPageToken());
+                        return mockResponse(response, 200);
+                    } else {
+                        // GET 
/v1/{prefix}/databases/{database}/tables/{table}/tags/{tag}
+                        tagName = RESTUtil.decodeString(resources[4]);
+                        Optional<Tag> tag = tagManager.get(tagName);
+                        if (!tag.isPresent()) {
+                            response =
+                                    new ErrorResponse(
+                                            ErrorResponse.RESOURCE_TYPE_TAG,
+                                            tagName,
+                                            String.format(
+                                                    "Tag %s in table %s 
doesn't exist.",
+                                                    tagName, 
identifier.getFullName()),
+                                            404);
+                            return mockResponse(response, 404);
+                        }
+                        Tag tagObj = tag.get();
+                        Long tagCreateTimeMillis =
+                                tagObj.getTagCreateTime() != null
+                                        ? tagObj.getTagCreateTime()
+                                                .atZone(ZoneId.systemDefault())
+                                                .toInstant()
+                                                .toEpochMilli()
+                                        : null;
+                        String timeRetainedStr =
+                                tagObj.getTagTimeRetained() != null
+                                        ? 
tagObj.getTagTimeRetained().toString()
+                                        : null;
+                        response =
+                                new GetTagResponse(
+                                        tagName,
+                                        tagObj.trimToSnapshot(),
+                                        tagCreateTimeMillis,
+                                        timeRetainedStr);
+                        return mockResponse(response, 200);
+                    }
+                case "DELETE":
+                    // DELETE 
/v1/{prefix}/databases/{database}/tables/{table}/tags/{tag}
+                    tagName = RESTUtil.decodeString(resources[4]);
+                    Optional<Tag> tagToDelete = tagManager.get(tagName);
+                    if (!tagToDelete.isPresent()) {
+                        response =
+                                new ErrorResponse(
+                                        ErrorResponse.RESOURCE_TYPE_TAG,
+                                        tagName,
+                                        String.format(
+                                                "Tag %s in table %s doesn't 
exist.",
+                                                tagName, 
identifier.getFullName()),
+                                        404);
+                        return mockResponse(response, 404);
+                    }
+                    table.deleteTag(tagName);
+                    return new MockResponse().setResponseCode(200);
+                case "POST":
+                    // POST 
/v1/{prefix}/databases/{database}/tables/{table}/tags
+                    CreateTagRequest requestBody = RESTApi.fromJson(data, 
CreateTagRequest.class);
+                    tagName = requestBody.tagName();
+
+                    Snapshot snapshot;
+                    SnapshotManager snapshotManager = table.snapshotManager();
+                    if (requestBody.snapshotId() != null) {
+                        try {
+                            snapshot = 
snapshotManager.tryGetSnapshot(requestBody.snapshotId());
+                        } catch (FileNotFoundException e) {
+                            snapshot = null;
+                        }
+                    } else {
+                        // Use latest snapshot
+                        snapshot = snapshotManager.latestSnapshot();
+                    }
+                    if (snapshot == null) {
+                        response =
+                                new ErrorResponse(
+                                        ErrorResponse.RESOURCE_TYPE_SNAPSHOT,
+                                        
String.valueOf(requestBody.snapshotId()),
+                                        String.format(
+                                                "Snapshot %s in table %s 
doesn't exist.",
+                                                requestBody.snapshotId(), 
identifier.getFullName()),
+                                        404);
+                        return mockResponse(response, 404);
+                    }
+
+                    // Parse timeRetained
+                    Duration timeRetained = null;
+                    if (requestBody.timeRetained() != null) {
+                        try {
+                            timeRetained = 
TimeUtils.parseDuration(requestBody.timeRetained());
+                        } catch (Exception e) {
+                            response =
+                                    new ErrorResponse(
+                                            null,
+                                            null,
+                                            "Invalid timeRetained format: "
+                                                    + 
requestBody.timeRetained(),
+                                            400);
+                            return mockResponse(response, 400);
+                        }
+                    }
+
+                    // Create tag
+                    table.createTag(tagName, snapshot.id(), timeRetained);
+                    return new MockResponse().setResponseCode(200);
+                default:
+                    return new MockResponse().setResponseCode(404);
+            }
+        } catch (Exception e) {
+            if (e.getMessage().contains("Tag") && 
e.getMessage().contains("already exists")) {
+                response =
+                        new ErrorResponse(
+                                ErrorResponse.RESOURCE_TYPE_TAG, tagName, 
e.getMessage(), 409);
+                return mockResponse(response, 409);
+            }
+            if (e.getMessage().contains("Tag") && 
e.getMessage().contains("doesn't exist")) {
+                response =
+                        new ErrorResponse(
+                                ErrorResponse.RESOURCE_TYPE_TAG, tagName, 
e.getMessage(), 404);
+                return mockResponse(response, 404);
+            }
+            if (e.getMessage().contains("Snapshot") && 
e.getMessage().contains("doesn't exist")) {
+                response =
+                        new ErrorResponse(
+                                ErrorResponse.RESOURCE_TYPE_SNAPSHOT, tagName, 
e.getMessage(), 404);
+                return mockResponse(response, 404);
+            }
+            throw e;
+        }
+    }
+
     private MockResponse generateFinalListPartitionsResponse(
             Map<String, String> parameters, List<Partition> partitions) {
         RESTResponse response;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 9520f75c1f..c4ac5a6a57 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -45,6 +45,7 @@ import org.apache.paimon.rest.auth.DLFToken;
 import org.apache.paimon.rest.exceptions.BadRequestException;
 import org.apache.paimon.rest.exceptions.ForbiddenException;
 import org.apache.paimon.rest.responses.ConfigResponse;
+import org.apache.paimon.rest.responses.GetTagResponse;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.SchemaManager;
@@ -64,6 +65,7 @@ import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.SnapshotNotExistException;
 import org.apache.paimon.view.View;
 import org.apache.paimon.view.ViewChange;
 
@@ -1888,6 +1890,99 @@ public abstract class RESTCatalogTest extends 
CatalogTestBase {
         assertThat(restCatalog.listBranches(identifier)).isEmpty();
     }
 
+    @Test
+    void testTags() throws Exception {
+        String databaseName = "testTagTable";
+        catalog.dropDatabase(databaseName, true, true);
+        catalog.createDatabase(databaseName, true);
+        Identifier identifier = Identifier.create(databaseName, "table");
+
+        // Test table not exist
+        assertThrows(
+                Catalog.TableNotExistException.class,
+                () -> restCatalog.createTag(identifier, "my_tag", null, null, 
false));
+        assertThrows(
+                Catalog.TableNotExistException.class,
+                () -> restCatalog.getTag(identifier, "my_tag"));
+
+        // Create table
+        catalog.createTable(
+                identifier, Schema.newBuilder().column("col", 
DataTypes.INT()).build(), true);
+
+        // Test tag not exist
+        assertThrows(
+                Catalog.TagNotExistException.class,
+                () -> restCatalog.getTag(identifier, "non_exist_tag"));
+
+        // Create snapshot by writing data
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+        batchWrite(table, Lists.newArrayList(1, 2, 3));
+
+        // Get latest snapshot
+        SnapshotManager snapshotManager = table.snapshotManager();
+        Snapshot latestSnapshot = snapshotManager.latestSnapshot();
+        assertThat(latestSnapshot).isNotNull();
+
+        // Create tag from latest snapshot
+        restCatalog.createTag(identifier, "my_tag", null, null, false);
+
+        // Get tag and verify
+        GetTagResponse tagResponse = restCatalog.getTag(identifier, "my_tag");
+        assertThat(tagResponse.tagName()).isEqualTo("my_tag");
+        assertThat(tagResponse.snapshot().id()).isEqualTo(latestSnapshot.id());
+        assertThat(tagResponse.snapshot()).isEqualTo(latestSnapshot);
+
+        // Create another snapshot
+        batchWrite(table, Lists.newArrayList(4, 5, 6));
+        Snapshot newSnapshot = snapshotManager.latestSnapshot();
+        // Create tag from specific snapshot
+        restCatalog.createTag(identifier, "my_tag_v2", newSnapshot.id(), null, 
false);
+
+        // Get tag and verify
+        GetTagResponse tagResponse2 = restCatalog.getTag(identifier, 
"my_tag_v2");
+        assertThat(tagResponse2.tagName()).isEqualTo("my_tag_v2");
+        assertThat(tagResponse2.snapshot().id()).isEqualTo(newSnapshot.id());
+        assertThat(tagResponse2.snapshot()).isEqualTo(newSnapshot);
+
+        // Test tag already exists
+        assertThrows(
+                Catalog.TagAlreadyExistException.class,
+                () -> restCatalog.createTag(identifier, "my_tag", null, null, 
false));
+
+        // Test create tag with ignoreIfExists = true
+        assertDoesNotThrow(() -> restCatalog.createTag(identifier, "my_tag", 
null, null, true));
+
+        // Test snapshot not exist
+        assertThrows(
+                SnapshotNotExistException.class,
+                () -> restCatalog.createTag(identifier, "my_tag_v3", 99999L, 
null, false));
+
+        // Test listTags
+        PagedList<String> tags = restCatalog.listTagsPaged(identifier, null, 
"my_tag");
+        assertThat(tags.getElements()).containsExactlyInAnyOrder("my_tag_v2");
+        tags = restCatalog.listTagsPaged(identifier, null, null);
+        assertThat(tags.getElements()).containsExactlyInAnyOrder("my_tag", 
"my_tag_v2");
+
+        // Test deleteTag
+        restCatalog.deleteTag(identifier, "my_tag");
+        tags = restCatalog.listTagsPaged(identifier, null, null);
+        assertThat(tags.getElements()).containsExactlyInAnyOrder("my_tag_v2");
+
+        // Test deleteTag with non-existent tag
+        assertThrows(
+                Catalog.TagNotExistException.class,
+                () -> restCatalog.deleteTag(identifier, "non_exist_tag"));
+
+        // Verify tag is deleted
+        assertThrows(
+                Catalog.TagNotExistException.class, () -> 
restCatalog.getTag(identifier, "my_tag"));
+
+        // Delete remaining tag
+        restCatalog.deleteTag(identifier, "my_tag_v2");
+        tags = restCatalog.listTagsPaged(identifier, null, null);
+        assertThat(tags.getElements()).isEmpty();
+    }
+
     @Test
     void testListDataFromPageApiWhenLastPageTokenIsNull() {
         List<Integer> testData = ImmutableList.of(1, 2, 3, 4, 5, 6, 7);

Reply via email to