This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new de7a50dc91 [core] Support partition API (#4786)
de7a50dc91 is described below

commit de7a50dc9150bddddfcdabc5c6cd4427745273ae
Author: jerry <[email protected]>
AuthorDate: Mon Dec 30 19:16:10 2024 +0800

    [core] Support partition API (#4786)
---
 .../paimon/utils/InternalRowPartitionComputer.java |   2 +-
 .../java/org/apache/paimon/rest/HttpClient.java    |  17 ++
 .../java/org/apache/paimon/rest/RESTCatalog.java   | 148 +++++++++++--
 .../java/org/apache/paimon/rest/RESTClient.java    |   2 +
 .../java/org/apache/paimon/rest/ResourcePaths.java |  11 +
 .../rest/requests/CreatePartitionRequest.java      |  61 +++++
 .../paimon/rest/requests/DropPartitionRequest.java |  49 +++++
 .../rest/responses/ListPartitionsResponse.java     |  49 +++++
 .../paimon/rest/responses/PartitionResponse.java   |  93 ++++++++
 .../org/apache/paimon/rest/MockRESTMessage.java    |  43 +++-
 .../org/apache/paimon/rest/RESTCatalogTest.java    | 213 +++++++++++++-----
 .../apache/paimon/rest/RESTObjectMapperTest.java   |  45 ++++
 paimon-open-api/rest-catalog-open-api.yaml         | 245 +++++++++++++++++----
 .../paimon/open/api/RESTCatalogController.java     |  80 +++++++
 14 files changed, 931 insertions(+), 127 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
 
b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
index 6bb26d7613..f4aad8f03f 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
@@ -111,7 +111,7 @@ public class InternalRowPartitionComputer {
         List<String> fieldNames = partType.getFieldNames();
         for (Map.Entry<String, String> entry : spec.entrySet()) {
             Object value =
-                    defaultPartValue.equals(entry.getValue())
+                    defaultPartValue != null && 
defaultPartValue.equals(entry.getValue())
                             ? null
                             : castFromString(
                                     entry.getValue(), 
partType.getField(entry.getKey()).type());
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
index 87f3fad9b2..d92cab5102 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/HttpClient.java
@@ -94,6 +94,23 @@ public class HttpClient implements RESTClient {
         return exec(request, null);
     }
 
+    @Override
+    public <T extends RESTResponse> T delete(
+            String path, RESTRequest body, Map<String, String> headers) {
+        try {
+            RequestBody requestBody = buildRequestBody(body);
+            Request request =
+                    new Request.Builder()
+                            .url(uri + path)
+                            .delete(requestBody)
+                            .headers(Headers.of(headers))
+                            .build();
+            return exec(request, null);
+        } catch (JsonProcessingException e) {
+            throw new RESTException(e, "build request failed.");
+        }
+    }
+
     @Override
     public void close() throws IOException {
         okHttpClient.dispatcher().cancelAll();
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index e0b3dccdd5..c430e303b2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -26,9 +26,12 @@ import org.apache.paimon.catalog.CatalogUtils;
 import org.apache.paimon.catalog.Database;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.PropertyChange;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
@@ -41,7 +44,9 @@ import 
org.apache.paimon.rest.exceptions.NoSuchResourceException;
 import org.apache.paimon.rest.requests.AlterDatabaseRequest;
 import org.apache.paimon.rest.requests.AlterTableRequest;
 import org.apache.paimon.rest.requests.CreateDatabaseRequest;
+import org.apache.paimon.rest.requests.CreatePartitionRequest;
 import org.apache.paimon.rest.requests.CreateTableRequest;
+import org.apache.paimon.rest.requests.DropPartitionRequest;
 import org.apache.paimon.rest.requests.RenameTableRequest;
 import org.apache.paimon.rest.responses.AlterDatabaseResponse;
 import org.apache.paimon.rest.responses.ConfigResponse;
@@ -49,7 +54,9 @@ import 
org.apache.paimon.rest.responses.CreateDatabaseResponse;
 import org.apache.paimon.rest.responses.GetDatabaseResponse;
 import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
+import org.apache.paimon.rest.responses.ListPartitionsResponse;
 import org.apache.paimon.rest.responses.ListTablesResponse;
+import org.apache.paimon.rest.responses.PartitionResponse;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.TableSchema;
@@ -58,10 +65,11 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.object.ObjectTable;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
 
-import 
org.apache.paimon.shade.guava30.com.google.common.annotations.VisibleForTesting;
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
@@ -71,15 +79,20 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.stream.Collectors;
 
+import static org.apache.paimon.CoreOptions.createCommitUser;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
+import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
 import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
 import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
+import static 
org.apache.paimon.utils.InternalRowPartitionComputer.convertSpecToInternalRow;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 import static 
org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;
 
@@ -132,7 +145,8 @@ public class RESTCatalog implements Catalog {
         Map<String, String> initHeaders =
                 RESTUtil.merge(
                         configHeaders(catalogOptions.toMap()), 
this.catalogAuth.getHeaders());
-        Options options = new Options(fetchOptionsFromServer(initHeaders, 
initHeaders));
+        Options options =
+                new Options(fetchOptionsFromServer(initHeaders, 
catalogContext.options().toMap()));
         this.context =
                 CatalogContext.create(
                         options, catalogContext.preferIO(), 
catalogContext.fallbackIO());
@@ -141,20 +155,6 @@ public class RESTCatalog implements Catalog {
         this.fileIO = getFileIOFromOptions(context);
     }
 
-    private static FileIO getFileIOFromOptions(CatalogContext context) {
-        try {
-            Options options = context.options();
-            String warehouseStr = options.get(CatalogOptions.WAREHOUSE);
-            Path warehousePath = new Path(warehouseStr);
-            CatalogContext contextWithNewOptions =
-                    CatalogContext.create(options, context.preferIO(), 
context.fallbackIO());
-            return FileIO.get(warehousePath, contextWithNewOptions);
-        } catch (IOException e) {
-            LOG.warn("Can not get FileIO from options.");
-            throw new RuntimeException(e);
-        }
-    }
-
     @Override
     public String warehouse() {
         return context.options().get(CatalogOptions.WAREHOUSE);
@@ -360,17 +360,43 @@ public class RESTCatalog implements Catalog {
     @Override
     public void createPartition(Identifier identifier, Map<String, String> 
partitionSpec)
             throws TableNotExistException {
-        throw new UnsupportedOperationException();
+        try {
+            CreatePartitionRequest request = new 
CreatePartitionRequest(identifier, partitionSpec);
+            client.post(
+                    resourcePaths.partitions(
+                            identifier.getDatabaseName(), 
identifier.getTableName()),
+                    request,
+                    PartitionResponse.class,
+                    headers());
+        } catch (NoSuchResourceException e) {
+            throw new TableNotExistException(identifier);
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        }
     }
 
     @Override
     public void dropPartition(Identifier identifier, Map<String, String> 
partitions)
-            throws TableNotExistException, PartitionNotExistException {}
+            throws TableNotExistException, PartitionNotExistException {
+        checkNotSystemTable(identifier, "dropPartition");
+        dropPartitionMetadata(identifier, partitions);
+        Table table = getTable(identifier);
+        cleanPartitionsInFileSystem(table, partitions);
+    }
 
     @Override
     public List<PartitionEntry> listPartitions(Identifier identifier)
             throws TableNotExistException {
-        throw new UnsupportedOperationException();
+        FileStoreTable table = (FileStoreTable) getTable(identifier);
+        boolean whetherSupportListPartitions =
+                Boolean.parseBoolean(
+                        
table.options().get(CoreOptions.METASTORE_PARTITIONED_TABLE.key()));
+        if (whetherSupportListPartitions) {
+            RowType rowType = table.schema().logicalPartitionType();
+            return listPartitionsFromServer(identifier, rowType);
+        } else {
+            return 
getTable(identifier).newReadBuilder().newScan().listPartitionEntries();
+        }
     }
 
     @Override
@@ -388,16 +414,14 @@ public class RESTCatalog implements Catalog {
         }
     }
 
-    @VisibleForTesting
-    Map<String, String> fetchOptionsFromServer(
+    protected Map<String, String> fetchOptionsFromServer(
             Map<String, String> headers, Map<String, String> clientProperties) 
{
         ConfigResponse response =
                 client.get(ResourcePaths.V1_CONFIG, ConfigResponse.class, 
headers);
         return response.merge(clientProperties);
     }
 
-    @VisibleForTesting
-    Table getDataOrFormatTable(Identifier identifier) throws 
TableNotExistException {
+    private Table getDataOrFormatTable(Identifier identifier) throws 
TableNotExistException {
         Preconditions.checkArgument(identifier.getSystemTableName() == null);
         GetTableResponse response = getTableResponse(identifier);
         FileStoreTable table =
@@ -420,8 +444,42 @@ public class RESTCatalog implements Catalog {
         return table;
     }
 
-    protected GetTableResponse getTableResponse(Identifier identifier)
+    private List<PartitionEntry> listPartitionsFromServer(Identifier 
identifier, RowType rowType)
             throws TableNotExistException {
+        try {
+            ListPartitionsResponse response =
+                    client.get(
+                            resourcePaths.partitions(
+                                    identifier.getDatabaseName(), 
identifier.getTableName()),
+                            ListPartitionsResponse.class,
+                            headers());
+            if (response != null && response.getPartitions() != null) {
+                return response.getPartitions().stream()
+                        .map(p -> convertToPartitionEntry(p, rowType))
+                        .collect(Collectors.toList());
+            } else {
+                return Collections.emptyList();
+            }
+        } catch (NoSuchResourceException e) {
+            throw new TableNotExistException(identifier);
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        }
+    }
+
+    private void cleanPartitionsInFileSystem(Table table, Map<String, String> 
partitions) {
+        FileStoreTable fileStoreTable = (FileStoreTable) table;
+        try (FileStoreCommit commit =
+                fileStoreTable
+                        .store()
+                        .newCommit(
+                                
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
+            commit.dropPartitions(
+                    Collections.singletonList(partitions), 
BatchWriteBuilder.COMMIT_IDENTIFIER);
+        }
+    }
+
+    private GetTableResponse getTableResponse(Identifier identifier) throws 
TableNotExistException {
         try {
             return client.get(
                     resourcePaths.table(identifier.getDatabaseName(), 
identifier.getTableName()),
@@ -434,6 +492,23 @@ public class RESTCatalog implements Catalog {
         }
     }
 
+    private boolean dropPartitionMetadata(Identifier identifier, Map<String, 
String> partitions)
+            throws TableNoPermissionException, PartitionNotExistException {
+        try {
+            DropPartitionRequest request = new 
DropPartitionRequest(partitions);
+            client.delete(
+                    resourcePaths.partitions(
+                            identifier.getDatabaseName(), 
identifier.getTableName()),
+                    request,
+                    headers());
+            return true;
+        } catch (NoSuchResourceException ignore) {
+            throw new PartitionNotExistException(identifier, partitions);
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        }
+    }
+
     private static Map<String, String> configHeaders(Map<String, String> 
properties) {
         return RESTUtil.extractPrefixMap(properties, "header.");
     }
@@ -464,4 +539,29 @@ public class RESTCatalog implements Catalog {
 
         return refreshExecutor;
     }
+
+    private PartitionEntry convertToPartitionEntry(PartitionResponse 
partition, RowType rowType) {
+        InternalRowSerializer serializer = new InternalRowSerializer(rowType);
+        GenericRow row = convertSpecToInternalRow(partition.getSpec(), 
rowType, null);
+        return new PartitionEntry(
+                serializer.toBinaryRow(row).copy(),
+                partition.getRecordCount(),
+                partition.getFileSizeInBytes(),
+                partition.getFileCount(),
+                partition.getLastFileCreationTime());
+    }
+
+    private static FileIO getFileIOFromOptions(CatalogContext context) {
+        try {
+            Options options = context.options();
+            String warehouseStr = options.get(CatalogOptions.WAREHOUSE);
+            Path warehousePath = new Path(warehouseStr);
+            CatalogContext contextWithNewOptions =
+                    CatalogContext.create(options, context.preferIO(), 
context.fallbackIO());
+            return FileIO.get(warehousePath, contextWithNewOptions);
+        } catch (IOException e) {
+            LOG.warn("Can not get FileIO from options.");
+            throw new RuntimeException(e);
+        }
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java
index a255d688bc..d90cb5fa4a 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTClient.java
@@ -30,4 +30,6 @@ public interface RESTClient extends Closeable {
             String path, RESTRequest body, Class<T> responseType, Map<String, 
String> headers);
 
     <T extends RESTResponse> T delete(String path, Map<String, String> 
headers);
+
+    <T extends RESTResponse> T delete(String path, RESTRequest body, 
Map<String, String> headers);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
index f006713fe2..ebfdd2db1e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
@@ -82,4 +82,15 @@ public class ResourcePaths {
                 .add("rename")
                 .toString();
     }
+
+    public String partitions(String databaseName, String tableName) {
+        return SLASH.add("v1")
+                .add(prefix)
+                .add("databases")
+                .add(databaseName)
+                .add("tables")
+                .add(tableName)
+                .add("partitions")
+                .toString();
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/requests/CreatePartitionRequest.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/CreatePartitionRequest.java
new file mode 100644
index 0000000000..e8094ab821
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/CreatePartitionRequest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.requests;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.rest.RESTRequest;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Map;
+
+/** Request for creating partition. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class CreatePartitionRequest implements RESTRequest {
+
+    private static final String FIELD_IDENTIFIER = "identifier";
+    private static final String FIELD_PARTITION_SPEC = "spec";
+
+    @JsonProperty(FIELD_IDENTIFIER)
+    private final Identifier identifier;
+
+    @JsonProperty(FIELD_PARTITION_SPEC)
+    private final Map<String, String> partitionSpec;
+
+    @JsonCreator
+    public CreatePartitionRequest(
+            @JsonProperty(FIELD_IDENTIFIER) Identifier identifier,
+            @JsonProperty(FIELD_PARTITION_SPEC) Map<String, String> 
partitionSpec) {
+        this.identifier = identifier;
+        this.partitionSpec = partitionSpec;
+    }
+
+    @JsonGetter(FIELD_IDENTIFIER)
+    public Identifier getIdentifier() {
+        return identifier;
+    }
+
+    @JsonGetter(FIELD_PARTITION_SPEC)
+    public Map<String, String> getPartitionSpec() {
+        return partitionSpec;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/requests/DropPartitionRequest.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/DropPartitionRequest.java
new file mode 100644
index 0000000000..4fabf11636
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/DropPartitionRequest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.requests;
+
+import org.apache.paimon.rest.RESTRequest;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Map;
+
+/** Request for deleting partition. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class DropPartitionRequest implements RESTRequest {
+
+    private static final String FIELD_PARTITION_SPEC = "spec";
+
+    @JsonProperty(FIELD_PARTITION_SPEC)
+    private final Map<String, String> partitionSpec;
+
+    @JsonCreator
+    public DropPartitionRequest(
+            @JsonProperty(FIELD_PARTITION_SPEC) Map<String, String> 
partitionSpec) {
+        this.partitionSpec = partitionSpec;
+    }
+
+    @JsonGetter(FIELD_PARTITION_SPEC)
+    public Map<String, String> getPartitionSpec() {
+        return partitionSpec;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListPartitionsResponse.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListPartitionsResponse.java
new file mode 100644
index 0000000000..1f194d208e
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListPartitionsResponse.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.responses;
+
+import org.apache.paimon.rest.RESTResponse;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/** Response for listing partitions. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class ListPartitionsResponse implements RESTResponse {
+
+    public static final String FIELD_PARTITIONS = "partitions";
+
+    @JsonProperty(FIELD_PARTITIONS)
+    private final List<PartitionResponse> partitions;
+
+    @JsonCreator
+    public ListPartitionsResponse(
+            @JsonProperty(FIELD_PARTITIONS) List<PartitionResponse> 
partitions) {
+        this.partitions = partitions;
+    }
+
+    @JsonGetter(FIELD_PARTITIONS)
+    public List<PartitionResponse> getPartitions() {
+        return partitions;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/PartitionResponse.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/PartitionResponse.java
new file mode 100644
index 0000000000..2706b5d7da
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/PartitionResponse.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.responses;
+
+import org.apache.paimon.rest.RESTResponse;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Map;
+
+/** Partition for rest api. */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class PartitionResponse implements RESTResponse {
+
+    public static final String FIELD_SPEC = "spec";
+    public static final String FIELD_RECORD_COUNT = "recordCount";
+    public static final String FIELD_FILE_SIZE_IN_BYTES = "fileSizeInBytes";
+    public static final String FIELD_FILE_COUNT = "fileCount";
+    public static final String FIELD_LAST_FILE_CREATION_TIME = 
"lastFileCreationTime";
+
+    @JsonProperty(FIELD_SPEC)
+    private final Map<String, String> spec;
+
+    @JsonProperty(FIELD_RECORD_COUNT)
+    private final long recordCount;
+
+    @JsonProperty(FIELD_FILE_SIZE_IN_BYTES)
+    private final long fileSizeInBytes;
+
+    @JsonProperty(FIELD_FILE_COUNT)
+    private final long fileCount;
+
+    @JsonProperty(FIELD_LAST_FILE_CREATION_TIME)
+    private final long lastFileCreationTime;
+
+    @JsonCreator
+    public PartitionResponse(
+            @JsonProperty(FIELD_SPEC) Map<String, String> spec,
+            @JsonProperty(FIELD_RECORD_COUNT) long recordCount,
+            @JsonProperty(FIELD_FILE_SIZE_IN_BYTES) long fileSizeInBytes,
+            @JsonProperty(FIELD_FILE_COUNT) long fileCount,
+            @JsonProperty(FIELD_LAST_FILE_CREATION_TIME) long 
lastFileCreationTime) {
+        this.spec = spec;
+        this.recordCount = recordCount;
+        this.fileSizeInBytes = fileSizeInBytes;
+        this.fileCount = fileCount;
+        this.lastFileCreationTime = lastFileCreationTime;
+    }
+
+    @JsonGetter(FIELD_SPEC)
+    public Map<String, String> getSpec() {
+        return spec;
+    }
+
+    @JsonGetter(FIELD_RECORD_COUNT)
+    public long getRecordCount() {
+        return recordCount;
+    }
+
+    @JsonGetter(FIELD_FILE_SIZE_IN_BYTES)
+    public long getFileSizeInBytes() {
+        return fileSizeInBytes;
+    }
+
+    @JsonGetter(FIELD_FILE_COUNT)
+    public long getFileCount() {
+        return fileCount;
+    }
+
+    @JsonGetter(FIELD_LAST_FILE_CREATION_TIME)
+    public long getLastFileCreationTime() {
+        return lastFileCreationTime;
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
index 96d0c9d7c7..9b686b6837 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
@@ -18,11 +18,14 @@
 
 package org.apache.paimon.rest;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.rest.requests.AlterDatabaseRequest;
 import org.apache.paimon.rest.requests.AlterTableRequest;
 import org.apache.paimon.rest.requests.CreateDatabaseRequest;
+import org.apache.paimon.rest.requests.CreatePartitionRequest;
 import org.apache.paimon.rest.requests.CreateTableRequest;
+import org.apache.paimon.rest.requests.DropPartitionRequest;
 import org.apache.paimon.rest.requests.RenameTableRequest;
 import org.apache.paimon.rest.responses.AlterDatabaseResponse;
 import org.apache.paimon.rest.responses.CreateDatabaseResponse;
@@ -30,7 +33,9 @@ import org.apache.paimon.rest.responses.ErrorResponse;
 import org.apache.paimon.rest.responses.GetDatabaseResponse;
 import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
+import org.apache.paimon.rest.responses.ListPartitionsResponse;
 import org.apache.paimon.rest.responses.ListTablesResponse;
+import org.apache.paimon.rest.responses.PartitionResponse;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.types.DataField;
@@ -39,6 +44,7 @@ import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
 
 import java.util.ArrayList;
@@ -131,6 +137,26 @@ public class MockRESTMessage {
         return new AlterTableRequest(getChanges());
     }
 
+    public static CreatePartitionRequest createPartitionRequest(String 
tableName) {
+        Identifier identifier = Identifier.create(databaseName(), tableName);
+        return new CreatePartitionRequest(identifier, 
Collections.singletonMap("pt", "1"));
+    }
+
+    public static DropPartitionRequest dropPartitionRequest() {
+        return new DropPartitionRequest(Collections.singletonMap("pt", "1"));
+    }
+
+    public static PartitionResponse partitionResponse() {
+        Map<String, String> spec = new HashMap<>();
+        spec.put("f0", "1");
+        return new PartitionResponse(spec, 1, 1, 1, 1);
+    }
+
+    public static ListPartitionsResponse listPartitionsResponse() {
+        PartitionResponse partition = partitionResponse();
+        return new ListPartitionsResponse(ImmutableList.of(partition));
+    }
+
     public static List<SchemaChange> getChanges() {
         // add option
         SchemaChange addOption = 
SchemaChange.setOption("snapshot.time-retained", "2h");
@@ -202,20 +228,27 @@ public class MockRESTMessage {
         return schemaChanges;
     }
 
+    public static GetTableResponse getTableResponseEnablePartition() {
+        Map<String, String> options = new HashMap<>();
+        options.put("option-1", "value-1");
+        options.put(CoreOptions.METASTORE_PARTITIONED_TABLE.key(), "true");
+        return new GetTableResponse("/tmp/2", 1, schema(options));
+    }
+
     public static GetTableResponse getTableResponse() {
-        return new GetTableResponse("/tmp/1", 1, schema());
+        Map<String, String> options = new HashMap<>();
+        options.put("option-1", "value-1");
+        options.put("option-2", "value-2");
+        return new GetTableResponse("/tmp/1", 1, schema(options));
     }
 
-    private static Schema schema() {
+    private static Schema schema(Map<String, String> options) {
         List<DataField> fields =
                 Arrays.asList(
                         new DataField(0, "f0", new IntType()),
                         new DataField(1, "f1", new IntType()));
         List<String> partitionKeys = Collections.singletonList("f0");
         List<String> primaryKeys = Arrays.asList("f0", "f1");
-        Map<String, String> options = new HashMap<>();
-        options.put("option-1", "value-1");
-        options.put("option-2", "value-2");
         return new Schema(fields, partitionKeys, primaryKeys, options, 
"comment");
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 01555adc3d..67103aaa52 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.Database;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.rest.requests.CreateTableRequest;
@@ -31,7 +32,9 @@ import org.apache.paimon.rest.responses.ErrorResponse;
 import org.apache.paimon.rest.responses.GetDatabaseResponse;
 import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
+import org.apache.paimon.rest.responses.ListPartitionsResponse;
 import org.apache.paimon.rest.responses.ListTablesResponse;
+import org.apache.paimon.rest.responses.PartitionResponse;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.Table;
 
@@ -55,13 +58,6 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 
 /** Test for REST Catalog. */
 public class RESTCatalogTest {
@@ -69,31 +65,19 @@ public class RESTCatalogTest {
     private final ObjectMapper mapper = RESTObjectMapper.create();
     private MockWebServer mockWebServer;
     private RESTCatalog restCatalog;
-    private RESTCatalog mockRestCatalog;
     private String warehouseStr;
+    private String serverUrl;
     @Rule public TemporaryFolder folder = new TemporaryFolder();
 
     @Before
     public void setUp() throws IOException {
         mockWebServer = new MockWebServer();
         mockWebServer.start();
-        String baseUrl = mockWebServer.url("").toString();
-        Options options = new Options();
-        options.set(RESTCatalogOptions.URI, baseUrl);
-        String initToken = "init_token";
-        options.set(RESTCatalogOptions.TOKEN, initToken);
-        options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1);
+        serverUrl = mockWebServer.url("").toString();
+        Options options = mockInitOptions();
         warehouseStr = folder.getRoot().getPath();
-        String mockResponse =
-                String.format(
-                        "{\"defaults\": {\"%s\": \"%s\", \"%s\": \"%s\"}}",
-                        RESTCatalogInternalOptions.PREFIX.key(),
-                        "prefix",
-                        CatalogOptions.WAREHOUSE.key(),
-                        warehouseStr);
-        mockResponse(mockResponse, 200);
+        mockConfig(warehouseStr);
         restCatalog = new RESTCatalog(CatalogContext.create(options));
-        mockRestCatalog = spy(restCatalog);
     }
 
     @After
@@ -154,9 +138,7 @@ public class RESTCatalogTest {
     public void testDropDatabase() throws Exception {
         String name = MockRESTMessage.databaseName();
         mockResponse("", 200);
-        assertDoesNotThrow(() -> mockRestCatalog.dropDatabase(name, false, 
true));
-        verify(mockRestCatalog, times(1)).dropDatabase(eq(name), eq(false), 
eq(true));
-        verify(mockRestCatalog, times(0)).listTables(eq(name));
+        assertDoesNotThrow(() -> restCatalog.dropDatabase(name, false, true));
     }
 
     @Test
@@ -166,7 +148,7 @@ public class RESTCatalogTest {
         mockResponse(mapper.writeValueAsString(response), 404);
         assertThrows(
                 Catalog.DatabaseNotExistException.class,
-                () -> mockRestCatalog.dropDatabase(name, false, true));
+                () -> restCatalog.dropDatabase(name, false, true));
     }
 
     @Test
@@ -174,9 +156,7 @@ public class RESTCatalogTest {
         String name = MockRESTMessage.databaseName();
         ErrorResponse response = 
MockRESTMessage.noSuchResourceExceptionErrorResponse();
         mockResponse(mapper.writeValueAsString(response), 404);
-        assertDoesNotThrow(() -> mockRestCatalog.dropDatabase(name, true, 
true));
-        verify(mockRestCatalog, times(1)).dropDatabase(eq(name), eq(true), 
eq(true));
-        verify(mockRestCatalog, times(0)).listTables(eq(name));
+        assertDoesNotThrow(() -> restCatalog.dropDatabase(name, true, true));
     }
 
     @Test
@@ -186,9 +166,7 @@ public class RESTCatalogTest {
         ListTablesResponse response = 
MockRESTMessage.listTablesEmptyResponse();
         mockResponse(mapper.writeValueAsString(response), 200);
         mockResponse("", 200);
-        assertDoesNotThrow(() -> mockRestCatalog.dropDatabase(name, false, 
cascade));
-        verify(mockRestCatalog, times(1)).dropDatabase(eq(name), eq(false), 
eq(cascade));
-        verify(mockRestCatalog, times(1)).listTables(eq(name));
+        assertDoesNotThrow(() -> restCatalog.dropDatabase(name, false, 
cascade));
     }
 
     @Test
@@ -199,9 +177,7 @@ public class RESTCatalogTest {
         mockResponse(mapper.writeValueAsString(response), 200);
         assertThrows(
                 Catalog.DatabaseNotEmptyException.class,
-                () -> mockRestCatalog.dropDatabase(name, false, cascade));
-        verify(mockRestCatalog, times(1)).dropDatabase(eq(name), eq(false), 
eq(cascade));
-        verify(mockRestCatalog, times(1)).listTables(eq(name));
+                () -> restCatalog.dropDatabase(name, false, cascade));
     }
 
     @Test
@@ -209,7 +185,7 @@ public class RESTCatalogTest {
         String name = MockRESTMessage.databaseName();
         AlterDatabaseResponse response = 
MockRESTMessage.alterDatabaseResponse();
         mockResponse(mapper.writeValueAsString(response), 200);
-        assertDoesNotThrow(() -> mockRestCatalog.alterDatabase(name, new 
ArrayList<>(), true));
+        assertDoesNotThrow(() -> restCatalog.alterDatabase(name, new 
ArrayList<>(), true));
     }
 
     @Test
@@ -220,7 +196,7 @@ public class RESTCatalogTest {
         mockResponse(mapper.writeValueAsString(response), 404);
         assertThrows(
                 Catalog.DatabaseNotExistException.class,
-                () -> mockRestCatalog.alterDatabase(name, new ArrayList<>(), 
false));
+                () -> restCatalog.alterDatabase(name, new ArrayList<>(), 
false));
     }
 
     @Test
@@ -228,7 +204,7 @@ public class RESTCatalogTest {
         String name = MockRESTMessage.databaseName();
         ErrorResponse response = 
MockRESTMessage.noSuchResourceExceptionErrorResponse();
         mockResponse(mapper.writeValueAsString(response), 404);
-        assertDoesNotThrow(() -> mockRestCatalog.alterDatabase(name, new 
ArrayList<>(), true));
+        assertDoesNotThrow(() -> restCatalog.alterDatabase(name, new 
ArrayList<>(), true));
     }
 
     @Test
@@ -245,10 +221,8 @@ public class RESTCatalogTest {
         String databaseName = MockRESTMessage.databaseName();
         GetTableResponse response = MockRESTMessage.getTableResponse();
         mockResponse(mapper.writeValueAsString(response), 200);
-        Table result = 
mockRestCatalog.getTable(Identifier.create(databaseName, "table"));
-        // catalog will add path option
+        Table result = restCatalog.getTable(Identifier.create(databaseName, 
"table"));
         assertEquals(response.getSchema().options().size() + 1, 
result.options().size());
-        verify(mockRestCatalog, times(1)).getDataOrFormatTable(any());
     }
 
     @Test
@@ -278,11 +252,10 @@ public class RESTCatalogTest {
         mockResponse(mapper.writeValueAsString(response), 200);
         assertDoesNotThrow(
                 () ->
-                        mockRestCatalog.renameTable(
+                        restCatalog.renameTable(
                                 Identifier.create(databaseName, fromTableName),
                                 Identifier.create(databaseName, toTableName),
                                 true));
-        verify(mockRestCatalog, times(1)).renameTable(any(), any(), 
anyBoolean());
     }
 
     @Test
@@ -294,7 +267,7 @@ public class RESTCatalogTest {
         assertThrows(
                 Catalog.TableNotExistException.class,
                 () ->
-                        mockRestCatalog.renameTable(
+                        restCatalog.renameTable(
                                 Identifier.create(databaseName, fromTableName),
                                 Identifier.create(databaseName, toTableName),
                                 false));
@@ -309,7 +282,7 @@ public class RESTCatalogTest {
         assertThrows(
                 Catalog.TableAlreadyExistException.class,
                 () ->
-                        mockRestCatalog.renameTable(
+                        restCatalog.renameTable(
                                 Identifier.create(databaseName, fromTableName),
                                 Identifier.create(databaseName, toTableName),
                                 false));
@@ -322,10 +295,7 @@ public class RESTCatalogTest {
         GetTableResponse response = MockRESTMessage.getTableResponse();
         mockResponse(mapper.writeValueAsString(response), 200);
         assertDoesNotThrow(
-                () ->
-                        mockRestCatalog.alterTable(
-                                Identifier.create(databaseName, "t1"), 
changes, true));
-        verify(mockRestCatalog, times(1)).alterTable(any(), anyList(), 
anyBoolean());
+                () -> restCatalog.alterTable(Identifier.create(databaseName, 
"t1"), changes, true));
     }
 
     @Test
@@ -336,7 +306,7 @@ public class RESTCatalogTest {
         assertThrows(
                 Catalog.TableNotExistException.class,
                 () ->
-                        mockRestCatalog.alterTable(
+                        restCatalog.alterTable(
                                 Identifier.create(databaseName, "t1"), 
changes, false));
     }
 
@@ -359,6 +329,127 @@ public class RESTCatalogTest {
                 () -> restCatalog.dropTable(Identifier.create(databaseName, 
tableName), false));
     }
 
+    @Test
+    public void testCreatePartition() throws Exception {
+        String databaseName = MockRESTMessage.databaseName();
+        Map<String, String> partitionSpec = new HashMap<>();
+        partitionSpec.put("p1", "v1");
+        PartitionResponse response = MockRESTMessage.partitionResponse();
+        mockResponse(mapper.writeValueAsString(response), 200);
+        assertDoesNotThrow(
+                () ->
+                        restCatalog.createPartition(
+                                Identifier.create(databaseName, "table"), 
partitionSpec));
+    }
+
+    @Test
+    public void testCreatePartitionWhenTableNotExist() throws Exception {
+        String databaseName = MockRESTMessage.databaseName();
+        Map<String, String> partitionSpec = new HashMap<>();
+        partitionSpec.put("p1", "v1");
+        mockResponse("", 404);
+        assertThrows(
+                Catalog.TableNotExistException.class,
+                () ->
+                        restCatalog.createPartition(
+                                Identifier.create(databaseName, "table"), 
partitionSpec));
+    }
+
+    @Test
+    public void testCreatePartitionWhenTableNoPermissionException() throws 
Exception {
+        String databaseName = MockRESTMessage.databaseName();
+        Map<String, String> partitionSpec = new HashMap<>();
+        partitionSpec.put("p1", "v1");
+        mockResponse("", 403);
+        assertThrows(
+                Catalog.TableNoPermissionException.class,
+                () ->
+                        restCatalog.createPartition(
+                                Identifier.create(databaseName, "table"), 
partitionSpec));
+    }
+
+    @Test
+    public void testDropPartition() throws Exception {
+        String databaseName = MockRESTMessage.databaseName();
+        Map<String, String> partitionSpec = new HashMap<>();
+        GetTableResponse response = MockRESTMessage.getTableResponse();
+        partitionSpec.put(response.getSchema().primaryKeys().get(0), "1");
+        mockResponse(mapper.writeValueAsString(""), 200);
+        mockResponse(mapper.writeValueAsString(response), 200);
+        assertThrows(
+                RuntimeException.class,
+                () ->
+                        restCatalog.dropPartition(
+                                Identifier.create(databaseName, "table"), 
partitionSpec));
+    }
+
+    @Test
+    public void testDropPartitionWhenPartitionNoExist() throws Exception {
+        String databaseName = MockRESTMessage.databaseName();
+        Map<String, String> partitionSpec = new HashMap<>();
+        GetTableResponse response = MockRESTMessage.getTableResponse();
+        partitionSpec.put(response.getSchema().primaryKeys().get(0), "1");
+        mockResponse(mapper.writeValueAsString(""), 404);
+        mockResponse(mapper.writeValueAsString(response), 200);
+        assertThrows(
+                Catalog.PartitionNotExistException.class,
+                () ->
+                        restCatalog.dropPartition(
+                                Identifier.create(databaseName, "table"), 
partitionSpec));
+    }
+
+    @Test
+    public void testDropPartitionWhenTableNoPermission() throws Exception {
+        String databaseName = MockRESTMessage.databaseName();
+        Map<String, String> partitionSpec = new HashMap<>();
+        GetTableResponse response = MockRESTMessage.getTableResponse();
+        partitionSpec.put(response.getSchema().primaryKeys().get(0), "1");
+        mockResponse(mapper.writeValueAsString(""), 403);
+        assertThrows(
+                Catalog.TableNoPermissionException.class,
+                () ->
+                        restCatalog.dropPartition(
+                                Identifier.create(databaseName, "table"), 
partitionSpec));
+    }
+
+    @Test
+    public void testDropPartitionWhenTableNoExist() throws Exception {
+        String databaseName = MockRESTMessage.databaseName();
+        Map<String, String> partitionSpec = new HashMap<>();
+        GetTableResponse response = MockRESTMessage.getTableResponse();
+        partitionSpec.put(response.getSchema().primaryKeys().get(0), "1");
+        mockResponse(mapper.writeValueAsString(""), 200);
+        mockResponse("", 404);
+        assertThrows(
+                Catalog.TableNotExistException.class,
+                () ->
+                        restCatalog.dropPartition(
+                                Identifier.create(databaseName, "table"), 
partitionSpec));
+    }
+
+    @Test
+    public void testListPartitionsWhenMetastorePartitionedIsTrue() throws 
Exception {
+        String databaseName = MockRESTMessage.databaseName();
+        GetTableResponse getTableResponse = 
MockRESTMessage.getTableResponseEnablePartition();
+        mockResponse(mapper.writeValueAsString(getTableResponse), 200);
+        ListPartitionsResponse response = 
MockRESTMessage.listPartitionsResponse();
+        mockResponse(mapper.writeValueAsString(response), 200);
+        List<PartitionEntry> result =
+                restCatalog.listPartitions(Identifier.create(databaseName, 
"table"));
+        assertEquals(response.getPartitions().size(), result.size());
+    }
+
+    @Test
+    public void testListPartitionsFromFile() throws Exception {
+        String databaseName = MockRESTMessage.databaseName();
+        GetTableResponse response = MockRESTMessage.getTableResponse();
+        mockResponse(mapper.writeValueAsString(response), 200);
+        mockResponse(mapper.writeValueAsString(response), 200);
+        List<PartitionEntry> partitionEntries =
+                restCatalog.listPartitions(Identifier.create(databaseName, 
"table"));
+        assertEquals(partitionEntries.size(), 0);
+    }
+
     private void mockResponse(String mockResponse, int httpCode) {
         MockResponse mockResponseObj =
                 new MockResponse()
@@ -367,4 +458,24 @@ public class RESTCatalogTest {
                         .addHeader("Content-Type", "application/json");
         mockWebServer.enqueue(mockResponseObj);
     }
+
+    private void mockConfig(String warehouseStr) {
+        String mockResponse =
+                String.format(
+                        "{\"defaults\": {\"%s\": \"%s\", \"%s\": \"%s\"}}",
+                        RESTCatalogInternalOptions.PREFIX.key(),
+                        "prefix",
+                        CatalogOptions.WAREHOUSE.key(),
+                        warehouseStr);
+        mockResponse(mockResponse, 200);
+    }
+
+    public Options mockInitOptions() {
+        Options options = new Options();
+        options.set(RESTCatalogOptions.URI, serverUrl);
+        String initToken = "init_token";
+        options.set(RESTCatalogOptions.TOKEN, initToken);
+        options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1);
+        return options;
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
index 26b3db615d..6712b7b991 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
@@ -21,7 +21,9 @@ package org.apache.paimon.rest;
 import org.apache.paimon.rest.requests.AlterDatabaseRequest;
 import org.apache.paimon.rest.requests.AlterTableRequest;
 import org.apache.paimon.rest.requests.CreateDatabaseRequest;
+import org.apache.paimon.rest.requests.CreatePartitionRequest;
 import org.apache.paimon.rest.requests.CreateTableRequest;
+import org.apache.paimon.rest.requests.DropPartitionRequest;
 import org.apache.paimon.rest.requests.RenameTableRequest;
 import org.apache.paimon.rest.responses.AlterDatabaseResponse;
 import org.apache.paimon.rest.responses.ConfigResponse;
@@ -30,11 +32,14 @@ import org.apache.paimon.rest.responses.ErrorResponse;
 import org.apache.paimon.rest.responses.GetDatabaseResponse;
 import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
+import org.apache.paimon.rest.responses.ListPartitionsResponse;
 import org.apache.paimon.rest.responses.ListTablesResponse;
+import org.apache.paimon.rest.responses.PartitionResponse;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.IntType;
 
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.junit.Test;
@@ -44,6 +49,7 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 
 /** Test for {@link RESTObjectMapper}. */
 public class RESTObjectMapperTest {
@@ -193,4 +199,43 @@ public class RESTObjectMapperTest {
         AlterTableRequest parseData = mapper.readValue(requestStr, 
AlterTableRequest.class);
         assertEquals(parseData.getChanges().size(), 
parseData.getChanges().size());
     }
+
+    @Test
+    public void createPartitionRequestParseTest() throws 
JsonProcessingException {
+        CreatePartitionRequest request = 
MockRESTMessage.createPartitionRequest("t1");
+        String requestStr = mapper.writeValueAsString(request);
+        CreatePartitionRequest parseData =
+                mapper.readValue(requestStr, CreatePartitionRequest.class);
+        assertEquals(parseData.getIdentifier(), parseData.getIdentifier());
+        assertEquals(parseData.getPartitionSpec().size(), 
parseData.getPartitionSpec().size());
+    }
+
+    @Test
+    public void dropPartitionRequestParseTest() throws JsonProcessingException 
{
+        DropPartitionRequest request = MockRESTMessage.dropPartitionRequest();
+        String requestStr = mapper.writeValueAsString(request);
+        DropPartitionRequest parseData = mapper.readValue(requestStr, 
DropPartitionRequest.class);
+        assertEquals(parseData.getPartitionSpec().size(), 
parseData.getPartitionSpec().size());
+    }
+
+    @Test
+    public void listPartitionsResponseParseTest() throws Exception {
+        ListPartitionsResponse response = 
MockRESTMessage.listPartitionsResponse();
+        String responseStr = mapper.writeValueAsString(response);
+        ListPartitionsResponse parseData =
+                mapper.readValue(responseStr, ListPartitionsResponse.class);
+        assertEquals(
+                response.getPartitions().get(0).getFileCount(),
+                parseData.getPartitions().get(0).getFileCount());
+    }
+
+    @Test
+    public void partitionResponseParseTest() throws Exception {
+        PartitionResponse response = MockRESTMessage.partitionResponse();
+        assertDoesNotThrow(() -> mapper.writeValueAsString(response));
+        assertDoesNotThrow(
+                () ->
+                        mapper.readValue(
+                                mapper.writeValueAsString(response), 
PartitionResponse.class));
+    }
 }
diff --git a/paimon-open-api/rest-catalog-open-api.yaml 
b/paimon-open-api/rest-catalog-open-api.yaml
index 7fefd0254b..7a0c9663b4 100644
--- a/paimon-open-api/rest-catalog-open-api.yaml
+++ b/paimon-open-api/rest-catalog-open-api.yaml
@@ -28,6 +28,21 @@ servers:
   - url: http://localhost:8080
     description: Server URL in Development environment
 paths:
+  /v1/config:
+    get:
+      tags:
+        - config
+      summary: Get Config
+      operationId: getConfig
+      responses:
+        "200":
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ConfigResponse'
+        "500":
+          description: Internal Server Error
   /v1/{prefix}/databases:
     get:
       tags:
@@ -80,6 +95,102 @@ paths:
                 $ref: '#/components/schemas/ErrorResponse'
         "500":
           description: Internal Server Error
+  /v1/{prefix}/databases/{database}:
+    get:
+      tags:
+        - database
+      summary: Get Database
+      operationId: getDatabases
+      parameters:
+        - name: prefix
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: database
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        "200":
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/GetDatabaseResponse'
+        "404":
+          description: Resource not found
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ErrorResponse'
+        "500":
+          description: Internal Server Error
+    delete:
+      tags:
+        - database
+      summary: Drop Database
+      operationId: dropDatabase
+      parameters:
+        - name: prefix
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: database
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        "200":
+          description: Success, no content
+        "404":
+          description: Resource not found
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ErrorResponse'
+        "500":
+          description: Internal Server Error
+  /v1/{prefix}/databases/{database}/properties:
+    post:
+      tags:
+        - database
+      summary: Alter Database
+      operationId: alterDatabase
+      parameters:
+        - name: prefix
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: database
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/AlterDatabaseRequest'
+      responses:
+        "200":
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/AlterDatabaseResponse'
+        "404":
+          description: Resource not found
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ErrorResponse'
+        "500":
+          description: Internal Server Error
   /v1/{prefix}/databases/{database}/tables:
     get:
       tags:
@@ -237,6 +348,8 @@ paths:
           schema:
             type: string
       responses:
+        "200":
+          description: Success, no content
         "404":
           description: Resource not found
           content:
@@ -293,12 +406,12 @@ paths:
                 $ref: '#/components/schemas/ErrorResponse'
         "500":
           description: Internal Server Error
-  /v1/{prefix}/databases/{database}/properties:
-    post:
+  /v1/{prefix}/databases/{database}/tables/{table}/partitions:
+    get:
       tags:
-        - database
-      summary: Alter Database
-      operationId: alterDatabase
+        - partition
+      summary: List partitions
+      operationId: listPartitions
       parameters:
         - name: prefix
           in: path
@@ -310,18 +423,18 @@ paths:
           required: true
           schema:
             type: string
-      requestBody:
-        content:
-          application/json:
-            schema:
-              $ref: '#/components/schemas/AlterDatabaseRequest'
+        - name: table
+          in: path
+          required: true
+          schema:
+            type: string
       responses:
         "200":
           description: OK
           content:
             application/json:
               schema:
-                $ref: '#/components/schemas/AlterDatabaseResponse'
+                $ref: '#/components/schemas/ListPartitionsResponse'
         "404":
           description: Resource not found
           content:
@@ -330,12 +443,11 @@ paths:
                 $ref: '#/components/schemas/ErrorResponse'
         "500":
           description: Internal Server Error
-  /v1/{prefix}/databases/{database}:
-    get:
+    post:
       tags:
-        - database
-      summary: Get Database
-      operationId: getDatabases
+        - partition
+      summary: Create partition
+      operationId: createPartition
       parameters:
         - name: prefix
           in: path
@@ -347,13 +459,23 @@ paths:
           required: true
           schema:
             type: string
+        - name: table
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/CreatePartitionRequest'
       responses:
         "200":
           description: OK
           content:
             application/json:
               schema:
-                $ref: '#/components/schemas/GetDatabaseResponse'
+                $ref: '#/components/schemas/PartitionResponse'
         "404":
           description: Resource not found
           content:
@@ -364,9 +486,9 @@ paths:
           description: Internal Server Error
     delete:
       tags:
-        - database
-      summary: Drop Database
-      operationId: dropDatabase
+        - partition
+      summary: Drop partition
+      operationId: dropPartition
       parameters:
         - name: prefix
           in: path
@@ -378,7 +500,19 @@ paths:
           required: true
           schema:
             type: string
+        - name: table
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/DropPartitionRequest'
       responses:
+        "200":
+          description: Success, no content
         "404":
           description: Resource not found
           content:
@@ -387,21 +521,6 @@ paths:
                 $ref: '#/components/schemas/ErrorResponse'
         "500":
           description: Internal Server Error
-  /v1/config:
-    get:
-      tags:
-        - config
-      summary: Get Config
-      operationId: getConfig
-      responses:
-        "200":
-          description: OK
-          content:
-            application/json:
-              schema:
-                $ref: '#/components/schemas/ConfigResponse'
-        "500":
-          description: Internal Server Error
 components:
   schemas:
     CreateDatabaseRequest:
@@ -413,6 +532,18 @@ components:
           type: object
           additionalProperties:
             type: string
+    CreatePartitionRequest:
+      type: object
+      properties:
+        identifier:
+          $ref: '#/components/schemas/Identifier'
+        spec:
+          type: object
+    DropPartitionRequest:
+      type: object
+      properties:
+        spec:
+          type: object
     CreateDatabaseResponse:
       type: object
       properties:
@@ -468,39 +599,38 @@ components:
         type:
           type: string
           pattern: ^ARRAY.*
+          example: ARRAY
         element:
-          type:
-            $ref: '#/components/schemas/DataType'
+          $ref: '#/components/schemas/DataType'
     MultisetType:
       type: object
       properties:
         type:
           type: string
           pattern: ^MULTISET.*
+          example: MULTISET
         element:
-          type:
-            $ref: '#/components/schemas/DataType'
+          $ref: '#/components/schemas/DataType'
     MapType:
       type: object
       properties:
         type:
           type: string
           pattern: ^MAP.*
+          example: MAP
         key:
-          type:
-            $ref: '#/components/schemas/DataType'
+          $ref: '#/components/schemas/DataType'
         value:
-          type:
-            $ref: '#/components/schemas/DataType'
+          $ref: '#/components/schemas/DataType'
     RowType:
       type: object
       properties:
         type:
           type: string
           pattern: ^ROW.*
+          example: ROW
         fields:
-          type:
-            $ref: '#/components/schemas/DataField'
+          $ref: '#/components/schemas/DataField'
     Identifier:
       type: object
       properties:
@@ -744,7 +874,30 @@ components:
           type: object
           additionalProperties:
             type: string
-
+    ListPartitionsResponse:
+      type: object
+      properties:
+        partitions:
+          type: array
+          items:
+            $ref: '#/components/schemas/PartitionResponse'
+    PartitionResponse:
+      type: object
+      properties:
+        spec:
+          type: object
+        recordCount:
+          type: integer
+          format: int64
+        fileSizeInBytes:
+          type: integer
+          format: int64
+        fileCount:
+          type: integer
+          format: int64
+        lastFileCreationTime:
+          type: integer
+          format: int64
   securitySchemes:
     BearerAuth:
       type: http
diff --git 
a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
 
b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
index 179c23ce46..62f99876a3 100644
--- 
a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
+++ 
b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
@@ -22,7 +22,9 @@ import org.apache.paimon.rest.ResourcePaths;
 import org.apache.paimon.rest.requests.AlterDatabaseRequest;
 import org.apache.paimon.rest.requests.AlterTableRequest;
 import org.apache.paimon.rest.requests.CreateDatabaseRequest;
+import org.apache.paimon.rest.requests.CreatePartitionRequest;
 import org.apache.paimon.rest.requests.CreateTableRequest;
+import org.apache.paimon.rest.requests.DropPartitionRequest;
 import org.apache.paimon.rest.requests.RenameTableRequest;
 import org.apache.paimon.rest.responses.AlterDatabaseResponse;
 import org.apache.paimon.rest.responses.ConfigResponse;
@@ -31,7 +33,9 @@ import org.apache.paimon.rest.responses.ErrorResponse;
 import org.apache.paimon.rest.responses.GetDatabaseResponse;
 import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
+import org.apache.paimon.rest.responses.ListPartitionsResponse;
 import org.apache.paimon.rest.responses.ListTablesResponse;
+import org.apache.paimon.rest.responses.PartitionResponse;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
@@ -143,6 +147,7 @@ public class RESTCatalogController {
             summary = "Drop Database",
             tags = {"database"})
     @ApiResponses({
+        @ApiResponse(responseCode = "200", description = "Success, no 
content"),
         @ApiResponse(
                 responseCode = "404",
                 description = "Resource not found",
@@ -301,6 +306,7 @@ public class RESTCatalogController {
             summary = "Drop table",
             tags = {"table"})
     @ApiResponses({
+        @ApiResponse(responseCode = "200", description = "Success, no 
content"),
         @ApiResponse(
                 responseCode = "404",
                 description = "Resource not found",
@@ -346,4 +352,78 @@ public class RESTCatalogController {
                         new HashMap<>(),
                         "comment"));
     }
+
+    @Operation(
+            summary = "List partitions",
+            tags = {"partition"})
+    @ApiResponses({
+        @ApiResponse(
+                responseCode = "200",
+                content = {
+                    @Content(schema = @Schema(implementation = 
ListPartitionsResponse.class))
+                }),
+        @ApiResponse(
+                responseCode = "404",
+                description = "Resource not found",
+                content = {@Content(schema = @Schema(implementation = 
ErrorResponse.class))}),
+        @ApiResponse(
+                responseCode = "500",
+                content = {@Content(schema = @Schema())})
+    })
+    @GetMapping("/v1/{prefix}/databases/{database}/tables/{table}/partitions")
+    public ListPartitionsResponse listPartitions(
+            @PathVariable String prefix,
+            @PathVariable String database,
+            @PathVariable String table) {
+        Map<String, String> spec = new HashMap<>();
+        spec.put("f1", "1");
+        PartitionResponse partition = new PartitionResponse(spec, 1, 2, 3, 4);
+        return new ListPartitionsResponse(ImmutableList.of(partition));
+    }
+
+    @Operation(
+            summary = "Create partition",
+            tags = {"partition"})
+    @ApiResponses({
+        @ApiResponse(
+                responseCode = "200",
+                content = {@Content(schema = @Schema(implementation = 
PartitionResponse.class))}),
+        @ApiResponse(
+                responseCode = "404",
+                description = "Resource not found",
+                content = {@Content(schema = @Schema(implementation = 
ErrorResponse.class))}),
+        @ApiResponse(
+                responseCode = "500",
+                content = {@Content(schema = @Schema())})
+    })
+    @PostMapping("/v1/{prefix}/databases/{database}/tables/{table}/partitions")
+    public PartitionResponse createPartition(
+            @PathVariable String prefix,
+            @PathVariable String database,
+            @PathVariable String table,
+            @RequestBody CreatePartitionRequest request) {
+        Map<String, String> spec = new HashMap<>();
+        spec.put("f1", "1");
+        return new PartitionResponse(spec, 0, 0, 0, 4);
+    }
+
+    @Operation(
+            summary = "Drop partition",
+            tags = {"partition"})
+    @ApiResponses({
+        @ApiResponse(responseCode = "200", description = "Success, no 
content"),
+        @ApiResponse(
+                responseCode = "404",
+                description = "Resource not found",
+                content = {@Content(schema = @Schema(implementation = 
ErrorResponse.class))}),
+        @ApiResponse(
+                responseCode = "500",
+                content = {@Content(schema = @Schema())})
+    })
+    
@DeleteMapping("/v1/{prefix}/databases/{database}/tables/{table}/partitions")
+    public void dropPartition(
+            @PathVariable String prefix,
+            @PathVariable String database,
+            @PathVariable String table,
+            @RequestBody DropPartitionRequest request) {}
 }

Reply via email to