This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ac36bb357 [core] Support Table API in RESTCatalog (#4736)
9ac36bb357 is described below

commit 9ac36bb3576e626c886ddbe8aadefe6a43b630c3
Author: jerry <[email protected]>
AuthorDate: Tue Dec 24 19:22:09 2024 +0800

    [core] Support Table API in RESTCatalog (#4736)
---
 .../org/apache/paimon/catalog/AbstractCatalog.java |  93 +----
 .../java/org/apache/paimon/catalog/Catalog.java    |  33 ++
 .../org/apache/paimon/catalog/CatalogUtils.java    | 101 ++++++
 .../apache/paimon/catalog/FileSystemCatalog.java   |   5 +-
 .../java/org/apache/paimon/catalog/Identifier.java |  30 +-
 .../java/org/apache/paimon/rest/RESTCatalog.java   | 282 ++++++++++++++--
 .../org/apache/paimon/rest/RESTCatalogFactory.java |   2 +-
 .../org/apache/paimon/rest/RESTObjectMapper.java   |  25 ++
 .../java/org/apache/paimon/rest/ResourcePaths.java |  19 ++
 .../CreateTableRequest.java}                       |  38 ++-
 .../apache/paimon/rest/requests/SchemaChanges.java | 238 +++++++++++++
 .../UpdateTableRequest.java}                       |  37 +-
 ...atabasesResponse.java => GetTableResponse.java} |  34 +-
 .../rest/responses/ListDatabasesResponse.java      |   6 +-
 .../{DatabaseName.java => ListTablesResponse.java} |  23 +-
 .../main/java/org/apache/paimon/schema/Schema.java |  33 +-
 .../org/apache/paimon/schema/SchemaChange.java     | 116 ++++++-
 .../org/apache/paimon/utils/JsonSerdeUtil.java     |  30 +-
 .../org/apache/paimon/rest/MockRESTMessage.java    | 141 +++++++-
 .../org/apache/paimon/rest/RESTCatalogTest.java    | 168 ++++++++-
 .../apache/paimon/rest/RESTObjectMapperTest.java   |  62 +++-
 .../java/org/apache/paimon/hive/HiveCatalog.java   |   8 +-
 .../org/apache/paimon/hive/PaimonMetaHook.java     |   4 +-
 .../org/apache/paimon/hive/CreateTableITCase.java  |  20 +-
 .../org/apache/paimon/hive/HiveLocationTest.java   |   4 +-
 .../org/apache/paimon/hive/HiveReadITCaseBase.java |   6 +-
 paimon-open-api/rest-catalog-open-api.yaml         | 376 ++++++++++++++++++++-
 .../paimon/open/api/RESTCatalogController.java     | 140 +++++++-
 .../paimon/open/api/config/OpenAPIConfig.java      |   5 +-
 29 files changed, 1838 insertions(+), 241 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index d7447c37dd..ef6c0e3348 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -20,7 +20,6 @@ package org.apache.paimon.catalog;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.TableType;
-import org.apache.paimon.factories.FactoryUtil;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
@@ -59,8 +58,11 @@ import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.TYPE;
 import static org.apache.paimon.CoreOptions.createCommitUser;
-import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
-import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
+import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
+import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
+import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
+import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
+import static org.apache.paimon.catalog.CatalogUtils.lockFactory;
 import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
@@ -94,31 +96,16 @@ public abstract class AbstractCatalog implements Catalog {
         return fileIO;
     }
 
-    public Optional<CatalogLockFactory> lockFactory() {
-        if (!lockEnabled()) {
-            return Optional.empty();
-        }
-
-        String lock = catalogOptions.get(LOCK_TYPE);
-        if (lock == null) {
-            return defaultLockFactory();
-        }
-
-        return Optional.of(
-                FactoryUtil.discoverFactory(
-                        AbstractCatalog.class.getClassLoader(), 
CatalogLockFactory.class, lock));
-    }
-
     public Optional<CatalogLockFactory> defaultLockFactory() {
         return Optional.empty();
     }
 
     public Optional<CatalogLockContext> lockContext() {
-        return Optional.of(CatalogLockContext.fromOptions(catalogOptions));
+        return CatalogUtils.lockContext(catalogOptions);
     }
 
     protected boolean lockEnabled() {
-        return 
catalogOptions.getOptional(LOCK_ENABLED).orElse(fileIO.isObjectStore());
+        return CatalogUtils.lockEnabled(catalogOptions, fileIO);
     }
 
     protected boolean allowCustomTablePath() {
@@ -397,20 +384,7 @@ public abstract class AbstractCatalog implements Catalog {
                                     identifier.getTableName(),
                                     identifier.getBranchName(),
                                     null));
-            if (!(originTable instanceof FileStoreTable)) {
-                throw new UnsupportedOperationException(
-                        String.format(
-                                "Only data table support system tables, but 
this table %s is %s.",
-                                identifier, originTable.getClass()));
-            }
-            Table table =
-                    SystemTableLoader.load(
-                            
Preconditions.checkNotNull(identifier.getSystemTableName()),
-                            (FileStoreTable) originTable);
-            if (table == null) {
-                throw new TableNotExistException(identifier);
-            }
-            return table;
+            return CatalogUtils.getSystemTable(identifier, originTable);
         } else {
             return getDataOrFormatTable(identifier);
         }
@@ -428,7 +402,8 @@ public abstract class AbstractCatalog implements Catalog {
                                 identifier,
                                 tableMeta.uuid,
                                 Lock.factory(
-                                        lockFactory().orElse(null),
+                                        lockFactory(catalogOptions, fileIO(), 
defaultLockFactory())
+                                                .orElse(null),
                                         lockContext().orElse(null),
                                         identifier),
                                 
metastoreClientFactory(identifier).orElse(null)));
@@ -472,7 +447,7 @@ public abstract class AbstractCatalog implements Catalog {
      * @return The warehouse path for the database
      */
     public Path newDatabasePath(String database) {
-        return newDatabasePath(warehouse(), database);
+        return CatalogUtils.newDatabasePath(warehouse(), database);
     }
 
     public Map<String, Map<String, Path>> allTablePaths() {
@@ -507,16 +482,6 @@ public abstract class AbstractCatalog implements Catalog {
         return new Path(newDatabasePath(identifier.getDatabaseName()), 
identifier.getTableName());
     }
 
-    protected static void checkNotBranch(Identifier identifier, String method) 
{
-        if (identifier.getBranchName() != null) {
-            throw new IllegalArgumentException(
-                    String.format(
-                            "Cannot '%s' for branch table '%s', "
-                                    + "please modify the table with the 
default branch.",
-                            method, identifier));
-        }
-    }
-
     protected void assertMainBranch(Identifier identifier) {
         if (identifier.getBranchName() != null
                 && !DEFAULT_MAIN_BRANCH.equals(identifier.getBranchName())) {
@@ -525,46 +490,10 @@ public abstract class AbstractCatalog implements Catalog {
         }
     }
 
-    protected static boolean isTableInSystemDatabase(Identifier identifier) {
-        return isSystemDatabase(identifier.getDatabaseName()) || 
identifier.isSystemTable();
-    }
-
-    protected static void checkNotSystemTable(Identifier identifier, String 
method) {
-        if (isTableInSystemDatabase(identifier)) {
-            throw new IllegalArgumentException(
-                    String.format(
-                            "Cannot '%s' for system table '%s', please use 
data table.",
-                            method, identifier));
-        }
-    }
-
     private void copyTableDefaultOptions(Map<String, String> options) {
         tableDefaultOptions.forEach(options::putIfAbsent);
     }
 
-    public static Path newTableLocation(String warehouse, Identifier 
identifier) {
-        checkNotBranch(identifier, "newTableLocation");
-        checkNotSystemTable(identifier, "newTableLocation");
-        return new Path(
-                newDatabasePath(warehouse, identifier.getDatabaseName()),
-                identifier.getTableName());
-    }
-
-    public static Path newDatabasePath(String warehouse, String database) {
-        return new Path(warehouse, database + DB_SUFFIX);
-    }
-
-    public static boolean isSystemDatabase(String database) {
-        return SYSTEM_DATABASE_NAME.equals(database);
-    }
-
-    /** Validate database cannot be a system database. */
-    protected void checkNotSystemDatabase(String database) {
-        if (isSystemDatabase(database)) {
-            throw new ProcessSystemDatabaseException();
-        }
-    }
-
     private void validateAutoCreateClose(Map<String, String> options) {
         checkArgument(
                 !Boolean.parseBoolean(
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 37ea6fa5e2..904c969107 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -433,6 +433,22 @@ public interface Catalog extends AutoCloseable {
         }
     }
 
+    /** Exception for trying to operate on the database that doesn't have 
permission. */
+    class DatabaseNoPermissionException extends RuntimeException {
+        private static final String MSG = "Database %s has no permission.";
+
+        private final String database;
+
+        public DatabaseNoPermissionException(String database, Throwable cause) 
{
+            super(String.format(MSG, database), cause);
+            this.database = database;
+        }
+
+        public String database() {
+            return database;
+        }
+    }
+
     /** Exception for trying to create a table that already exists. */
     class TableAlreadyExistException extends Exception {
 
@@ -475,6 +491,23 @@ public interface Catalog extends AutoCloseable {
         }
     }
 
+    /** Exception for trying to operate on the table that doesn't have 
permission. */
+    class TableNoPermissionException extends RuntimeException {
+
+        private static final String MSG = "Table %s has no permission.";
+
+        private final Identifier identifier;
+
+        public TableNoPermissionException(Identifier identifier, Throwable 
cause) {
+            super(String.format(MSG, identifier.getFullName()), cause);
+            this.identifier = identifier;
+        }
+
+        public Identifier identifier() {
+            return identifier;
+        }
+    }
+
     /** Exception for trying to operate on a partition that doesn't exist. */
     class PartitionNotExistException extends Exception {
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 043da0504d..826e2c0847 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -18,12 +18,24 @@
 
 package org.apache.paimon.catalog;
 
+import org.apache.paimon.factories.FactoryUtil;
+import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.system.SystemTableLoader;
+import org.apache.paimon.utils.Preconditions;
 
 import java.util.Map;
+import java.util.Optional;
 
+import static org.apache.paimon.catalog.Catalog.DB_SUFFIX;
+import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
 import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
+import static org.apache.paimon.options.CatalogOptions.LOCK_ENABLED;
+import static org.apache.paimon.options.CatalogOptions.LOCK_TYPE;
 import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
 
 /** Utils for {@link Catalog}. */
@@ -60,4 +72,93 @@ public class CatalogUtils {
     public static Map<String, String> tableDefaultOptions(Map<String, String> 
options) {
         return convertToPropertiesPrefixKey(options, 
TABLE_DEFAULT_OPTION_PREFIX);
     }
+
+    public static boolean isSystemDatabase(String database) {
+        return SYSTEM_DATABASE_NAME.equals(database);
+    }
+
+    /** Validate database cannot be a system database. */
+    public static void checkNotSystemDatabase(String database) {
+        if (isSystemDatabase(database)) {
+            throw new Catalog.ProcessSystemDatabaseException();
+        }
+    }
+
+    public static boolean isTableInSystemDatabase(Identifier identifier) {
+        return isSystemDatabase(identifier.getDatabaseName()) || 
identifier.isSystemTable();
+    }
+
+    public static void checkNotSystemTable(Identifier identifier, String 
method) {
+        if (isTableInSystemDatabase(identifier)) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Cannot '%s' for system table '%s', please use 
data table.",
+                            method, identifier));
+        }
+    }
+
+    public static Path newDatabasePath(String warehouse, String database) {
+        return new Path(warehouse, database + DB_SUFFIX);
+    }
+
+    public static Path newTableLocation(String warehouse, Identifier 
identifier) {
+        checkNotBranch(identifier, "newTableLocation");
+        checkNotSystemTable(identifier, "newTableLocation");
+        return new Path(
+                newDatabasePath(warehouse, identifier.getDatabaseName()),
+                identifier.getTableName());
+    }
+
+    public static void checkNotBranch(Identifier identifier, String method) {
+        if (identifier.getBranchName() != null) {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Cannot '%s' for branch table '%s', "
+                                    + "please modify the table with the 
default branch.",
+                            method, identifier));
+        }
+    }
+
+    public static Optional<CatalogLockFactory> lockFactory(
+            Options options, FileIO fileIO, Optional<CatalogLockFactory> 
defaultLockFactoryOpt) {
+        boolean lockEnabled = lockEnabled(options, fileIO);
+        if (!lockEnabled) {
+            return Optional.empty();
+        }
+
+        String lock = options.get(LOCK_TYPE);
+        if (lock == null) {
+            return defaultLockFactoryOpt;
+        }
+
+        return Optional.of(
+                FactoryUtil.discoverFactory(
+                        AbstractCatalog.class.getClassLoader(), 
CatalogLockFactory.class, lock));
+    }
+
+    public static Optional<CatalogLockContext> lockContext(Options options) {
+        return Optional.of(CatalogLockContext.fromOptions(options));
+    }
+
+    public static boolean lockEnabled(Options options, FileIO fileIO) {
+        return options.getOptional(LOCK_ENABLED).orElse(fileIO != null && 
fileIO.isObjectStore());
+    }
+
+    public static Table getSystemTable(Identifier identifier, Table 
originTable)
+            throws Catalog.TableNotExistException {
+        if (!(originTable instanceof FileStoreTable)) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Only data table support system tables, but this 
table %s is %s.",
+                            identifier, originTable.getClass()));
+        }
+        Table table =
+                SystemTableLoader.load(
+                        
Preconditions.checkNotNull(identifier.getSystemTableName()),
+                        (FileStoreTable) originTable);
+        if (table == null) {
+            throw new Catalog.TableNotExistException(identifier);
+        }
+        return table;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
index cb0c358259..577dd9674e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/FileSystemCatalog.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
+import static org.apache.paimon.catalog.CatalogUtils.lockFactory;
 import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
 
 /** A catalog implementation for {@link FileIO}. */
@@ -123,7 +124,9 @@ public class FileSystemCatalog extends AbstractCatalog {
     private SchemaManager schemaManager(Identifier identifier) {
         Path path = getTableLocation(identifier);
         CatalogLock catalogLock =
-                lockFactory().map(fac -> 
fac.createLock(assertGetLockContext())).orElse(null);
+                lockFactory(catalogOptions, fileIO(), defaultLockFactory())
+                        .map(fac -> fac.createLock(assertGetLockContext()))
+                        .orElse(null);
         return new SchemaManager(fileIO, path, 
identifier.getBranchNameOrDefault())
                 .withLock(catalogLock == null ? null : 
Lock.fromCatalog(catalogLock, identifier));
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java
index 01456f0b3a..ac6996821b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java
@@ -26,6 +26,12 @@ import org.apache.paimon.utils.BranchManager;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
 
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
@@ -40,10 +46,14 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
  * @since 0.4.0
  */
 @Public
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class Identifier implements Serializable {
 
     private static final long serialVersionUID = 1L;
 
+    private static final String FIELD_DATABASE_NAME = "database";
+    private static final String FIELD_OBJECT_NAME = "object";
+
     public static final RowType SCHEMA =
             new RowType(
                     false,
@@ -53,14 +63,22 @@ public class Identifier implements Serializable {
 
     public static final String UNKNOWN_DATABASE = "unknown";
 
+    @JsonProperty(FIELD_DATABASE_NAME)
     private final String database;
+
+    @JsonProperty(FIELD_OBJECT_NAME)
     private final String object;
 
     private transient String table;
+
     private transient String branch;
+
     private transient String systemTable;
 
-    public Identifier(String database, String object) {
+    @JsonCreator
+    public Identifier(
+            @JsonProperty(FIELD_DATABASE_NAME) String database,
+            @JsonProperty(FIELD_OBJECT_NAME) String object) {
         this.database = database;
         this.object = object;
     }
@@ -89,40 +107,48 @@ public class Identifier implements Serializable {
         this.systemTable = systemTable;
     }
 
+    @JsonGetter(FIELD_DATABASE_NAME)
     public String getDatabaseName() {
         return database;
     }
 
+    @JsonGetter(FIELD_OBJECT_NAME)
     public String getObjectName() {
         return object;
     }
 
+    @JsonIgnore
     public String getFullName() {
         return UNKNOWN_DATABASE.equals(this.database)
                 ? object
                 : String.format("%s.%s", database, object);
     }
 
+    @JsonIgnore
     public String getTableName() {
         splitObjectName();
         return table;
     }
 
+    @JsonIgnore
     public @Nullable String getBranchName() {
         splitObjectName();
         return branch;
     }
 
+    @JsonIgnore
     public String getBranchNameOrDefault() {
         String branch = getBranchName();
         return branch == null ? BranchManager.DEFAULT_MAIN_BRANCH : branch;
     }
 
+    @JsonIgnore
     public @Nullable String getSystemTableName() {
         splitObjectName();
         return systemTable;
     }
 
+    @JsonIgnore
     public boolean isSystemTable() {
         return getSystemTableName() != null;
     }
@@ -158,10 +184,12 @@ public class Identifier implements Serializable {
         }
     }
 
+    @JsonIgnore
     public String getEscapedFullName() {
         return getEscapedFullName('`');
     }
 
+    @JsonIgnore
     public String getEscapedFullName(char escapeChar) {
         return String.format(
                 "%c%s%c.%c%s%c", escapeChar, database, escapeChar, escapeChar, 
object, escapeChar);
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 8b53bef848..4836710601 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -18,70 +18,102 @@
 
 package org.apache.paimon.rest;
 
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.TableType;
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogUtils;
 import org.apache.paimon.catalog.Database;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.PropertyChange;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.manifest.PartitionEntry;
+import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.rest.auth.AuthSession;
 import org.apache.paimon.rest.auth.CredentialsProvider;
 import org.apache.paimon.rest.auth.CredentialsProviderFactory;
 import org.apache.paimon.rest.exceptions.AlreadyExistsException;
+import org.apache.paimon.rest.exceptions.ForbiddenException;
 import org.apache.paimon.rest.exceptions.NoSuchResourceException;
 import org.apache.paimon.rest.requests.AlterDatabaseRequest;
 import org.apache.paimon.rest.requests.CreateDatabaseRequest;
+import org.apache.paimon.rest.requests.CreateTableRequest;
+import org.apache.paimon.rest.requests.SchemaChanges;
+import org.apache.paimon.rest.requests.UpdateTableRequest;
 import org.apache.paimon.rest.responses.AlterDatabaseResponse;
 import org.apache.paimon.rest.responses.ConfigResponse;
 import org.apache.paimon.rest.responses.CreateDatabaseResponse;
-import org.apache.paimon.rest.responses.DatabaseName;
 import org.apache.paimon.rest.responses.GetDatabaseResponse;
+import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
+import org.apache.paimon.rest.responses.ListTablesResponse;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.object.ObjectTable;
+import org.apache.paimon.table.system.SystemTableLoader;
 import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Preconditions;
 
 import 
org.apache.paimon.shade.guava30.com.google.common.annotations.VisibleForTesting;
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.stream.Collectors;
+import java.util.function.Supplier;
 
+import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
+import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
+import static org.apache.paimon.catalog.CatalogUtils.lockContext;
+import static org.apache.paimon.catalog.CatalogUtils.lockFactory;
+import static org.apache.paimon.catalog.CatalogUtils.newTableLocation;
 import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
 import static 
org.apache.paimon.utils.ThreadPoolUtils.createScheduledThreadPool;
 
 /** A catalog implementation for REST. */
 public class RESTCatalog implements Catalog {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(RESTCatalog.class);
     private static final ObjectMapper OBJECT_MAPPER = 
RESTObjectMapper.create();
 
     private final RESTClient client;
     private final ResourcePaths resourcePaths;
-    private final Options options;
     private final Map<String, String> baseHeader;
     private final AuthSession catalogAuth;
+    private final CatalogContext context;
+    private final Optional<FileIO> fileIOOptional;
 
     private volatile ScheduledExecutorService refreshExecutor = null;
 
-    public RESTCatalog(Options options) {
-        if (options.getOptional(CatalogOptions.WAREHOUSE).isPresent()) {
+    public RESTCatalog(CatalogContext catalogContext) {
+        Options catalogOptions = catalogContext.options();
+        if (catalogOptions.getOptional(CatalogOptions.WAREHOUSE).isPresent()) {
             throw new IllegalArgumentException("Can not config warehouse in 
RESTCatalog.");
         }
-        String uri = options.get(RESTCatalogOptions.URI);
+        String uri = catalogOptions.get(RESTCatalogOptions.URI);
         Optional<Duration> connectTimeout =
-                options.getOptional(RESTCatalogOptions.CONNECTION_TIMEOUT);
-        Optional<Duration> readTimeout = 
options.getOptional(RESTCatalogOptions.READ_TIMEOUT);
-        Integer threadPoolSize = 
options.get(RESTCatalogOptions.THREAD_POOL_SIZE);
+                
catalogOptions.getOptional(RESTCatalogOptions.CONNECTION_TIMEOUT);
+        Optional<Duration> readTimeout =
+                catalogOptions.getOptional(RESTCatalogOptions.READ_TIMEOUT);
+        Integer threadPoolSize = 
catalogOptions.get(RESTCatalogOptions.THREAD_POOL_SIZE);
         HttpClientOptions httpClientOptions =
                 new HttpClientOptions(
                         uri,
@@ -91,10 +123,10 @@ public class RESTCatalog implements Catalog {
                         threadPoolSize,
                         DefaultErrorHandler.getInstance());
         this.client = new HttpClient(httpClientOptions);
-        this.baseHeader = configHeaders(options.toMap());
+        this.baseHeader = configHeaders(catalogOptions.toMap());
         CredentialsProvider credentialsProvider =
                 CredentialsProviderFactory.createCredentialsProvider(
-                        options, RESTCatalog.class.getClassLoader());
+                        catalogOptions, RESTCatalog.class.getClassLoader());
         if (credentialsProvider.keepRefreshed()) {
             this.catalogAuth =
                     AuthSession.fromRefreshCredentialsProvider(
@@ -104,26 +136,47 @@ public class RESTCatalog implements Catalog {
             this.catalogAuth = new AuthSession(this.baseHeader, 
credentialsProvider);
         }
         Map<String, String> initHeaders =
-                RESTUtil.merge(configHeaders(options.toMap()), 
this.catalogAuth.getHeaders());
-        this.options = new Options(fetchOptionsFromServer(initHeaders, 
options.toMap()));
+                RESTUtil.merge(
+                        configHeaders(catalogOptions.toMap()), 
this.catalogAuth.getHeaders());
+        Options options = new Options(fetchOptionsFromServer(initHeaders, 
initHeaders));
+        this.context =
+                CatalogContext.create(
+                        options, catalogContext.preferIO(), 
catalogContext.fallbackIO());
         this.resourcePaths =
-                ResourcePaths.forCatalogProperties(
-                        this.options.get(RESTCatalogInternalOptions.PREFIX));
+                
ResourcePaths.forCatalogProperties(options.get(RESTCatalogInternalOptions.PREFIX));
+        this.fileIOOptional = getFileIOFromOptions(context);
+    }
+
+    private static Optional<FileIO> getFileIOFromOptions(CatalogContext 
context) {
+        try {
+            Options options = context.options();
+            String warehouseStr = options.get(CatalogOptions.WAREHOUSE);
+            Path warehousePath = new Path(warehouseStr);
+            CatalogContext contextWithNewOptions =
+                    CatalogContext.create(options, context.preferIO(), 
context.fallbackIO());
+            return Optional.of(FileIO.get(warehousePath, 
contextWithNewOptions));
+        } catch (Exception ignore) {
+            LOG.warn("Can not get FileIO from options.");
+        }
+        return Optional.empty();
     }
 
     @Override
     public String warehouse() {
-        throw new UnsupportedOperationException();
+        return context.options().get(CatalogOptions.WAREHOUSE);
     }
 
     @Override
     public Map<String, String> options() {
-        return this.options.toMap();
+        return context.options().toMap();
     }
 
     @Override
     public FileIO fileIO() {
-        throw new UnsupportedOperationException();
+        if (this.fileIOOptional.isPresent()) {
+            return this.fileIOOptional.get();
+        }
+        throw new RuntimeException("FileIO is not configured.");
     }
 
     @Override
@@ -131,9 +184,7 @@ public class RESTCatalog implements Catalog {
         ListDatabasesResponse response =
                 client.get(resourcePaths.databases(), 
ListDatabasesResponse.class, headers());
         if (response.getDatabases() != null) {
-            return response.getDatabases().stream()
-                    .map(DatabaseName::getName)
-                    .collect(Collectors.toList());
+            return response.getDatabases();
         }
         return ImmutableList.of();
     }
@@ -141,6 +192,7 @@ public class RESTCatalog implements Catalog {
     @Override
     public void createDatabase(String name, boolean ignoreIfExists, 
Map<String, String> properties)
             throws DatabaseAlreadyExistException {
+        checkNotSystemDatabase(name);
         CreateDatabaseRequest request = new CreateDatabaseRequest(name, 
properties);
         try {
             client.post(
@@ -149,11 +201,16 @@ public class RESTCatalog implements Catalog {
             if (!ignoreIfExists) {
                 throw new DatabaseAlreadyExistException(name);
             }
+        } catch (ForbiddenException e) {
+            throw new DatabaseNoPermissionException(name, e);
         }
     }
 
     @Override
     public Database getDatabase(String name) throws DatabaseNotExistException {
+        if (isSystemDatabase(name)) {
+            return Database.of(name);
+        }
         try {
             GetDatabaseResponse response =
                     client.get(resourcePaths.database(name), 
GetDatabaseResponse.class, headers());
@@ -161,12 +218,15 @@ public class RESTCatalog implements Catalog {
                     name, response.options(), response.comment().orElseGet(() 
-> null));
         } catch (NoSuchResourceException e) {
             throw new DatabaseNotExistException(name);
+        } catch (ForbiddenException e) {
+            throw new DatabaseNoPermissionException(name, e);
         }
     }
 
     @Override
     public void dropDatabase(String name, boolean ignoreIfNotExists, boolean 
cascade)
             throws DatabaseNotExistException, DatabaseNotEmptyException {
+        checkNotSystemDatabase(name);
         try {
             if (!cascade && !this.listTables(name).isEmpty()) {
                 throw new DatabaseNotEmptyException(name);
@@ -176,12 +236,15 @@ public class RESTCatalog implements Catalog {
             if (!ignoreIfNotExists) {
                 throw new DatabaseNotExistException(name);
             }
+        } catch (ForbiddenException e) {
+            throw new DatabaseNoPermissionException(name, e);
         }
     }
 
     @Override
     public void alterDatabase(String name, List<PropertyChange> changes, 
boolean ignoreIfNotExists)
             throws DatabaseNotExistException {
+        checkNotSystemDatabase(name);
         try {
             Pair<Map<String, String>, Set<String>> setPropertiesToRemoveKeys =
                     PropertyChange.getSetPropertiesToRemoveKeys(changes);
@@ -202,42 +265,94 @@ public class RESTCatalog implements Catalog {
             if (!ignoreIfNotExists) {
                 throw new DatabaseNotExistException(name);
             }
+        } catch (ForbiddenException e) {
+            throw new DatabaseNoPermissionException(name, e);
         }
     }
 
-    @Override
-    public Table getTable(Identifier identifier) throws TableNotExistException 
{
-        throw new UnsupportedOperationException();
-    }
-
     @Override
     public List<String> listTables(String databaseName) throws 
DatabaseNotExistException {
-        return new ArrayList<String>();
+        ListTablesResponse response =
+                client.get(resourcePaths.tables(databaseName), 
ListTablesResponse.class, headers());
+        if (response.getTables() != null) {
+            return response.getTables();
+        }
+        return ImmutableList.of();
     }
 
     @Override
-    public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
-            throws TableNotExistException {
-        throw new UnsupportedOperationException();
+    public Table getTable(Identifier identifier) throws TableNotExistException 
{
+        if (SYSTEM_DATABASE_NAME.equals(identifier.getDatabaseName())) {
+            return getAllInSystemDatabase(identifier);
+        } else if (identifier.isSystemTable()) {
+            return getSystemTable(identifier);
+        } else {
+            return getDataOrFormatTable(identifier);
+        }
     }
 
     @Override
     public void createTable(Identifier identifier, Schema schema, boolean 
ignoreIfExists)
             throws TableAlreadyExistException, DatabaseNotExistException {
-        throw new UnsupportedOperationException();
+        try {
+            CreateTableRequest request = new CreateTableRequest(identifier, 
schema);
+            client.post(
+                    resourcePaths.tables(identifier.getDatabaseName()),
+                    request,
+                    GetTableResponse.class,
+                    headers());
+        } catch (AlreadyExistsException e) {
+            if (!ignoreIfExists) {
+                throw new TableAlreadyExistException(identifier);
+            }
+        }
     }
 
     @Override
     public void renameTable(Identifier fromTable, Identifier toTable, boolean 
ignoreIfNotExists)
             throws TableNotExistException, TableAlreadyExistException {
-        throw new UnsupportedOperationException();
+        try {
+            updateTable(fromTable, toTable, new ArrayList<>());
+        } catch (NoSuchResourceException e) {
+            if (!ignoreIfNotExists) {
+                throw new TableNotExistException(fromTable);
+            }
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(fromTable, e);
+        } catch (AlreadyExistsException e) {
+            throw new TableAlreadyExistException(toTable);
+        }
     }
 
     @Override
     public void alterTable(
             Identifier identifier, List<SchemaChange> changes, boolean 
ignoreIfNotExists)
             throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
-        throw new UnsupportedOperationException();
+        try {
+            updateTable(identifier, identifier, changes);
+        } catch (NoSuchResourceException e) {
+            if (!ignoreIfNotExists) {
+                throw new TableNotExistException(identifier);
+            }
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        }
+    }
+
+    @Override
+    public void dropTable(Identifier identifier, boolean ignoreIfNotExists)
+            throws TableNotExistException {
+        try {
+            client.delete(
+                    resourcePaths.table(identifier.getDatabaseName(), 
identifier.getTableName()),
+                    headers());
+        } catch (NoSuchResourceException e) {
+            if (!ignoreIfNotExists) {
+                throw new TableNotExistException(identifier);
+            }
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        }
     }
 
     @Override
@@ -258,7 +373,7 @@ public class RESTCatalog implements Catalog {
 
     @Override
     public boolean caseSensitive() {
-        return options.getOptional(CASE_SENSITIVE).orElse(true);
+        return context.options().getOptional(CASE_SENSITIVE).orElse(true);
     }
 
     @Override
@@ -279,6 +394,64 @@ public class RESTCatalog implements Catalog {
         return response.merge(clientProperties);
     }
 
+    @VisibleForTesting
+    void updateTable(
+            Identifier fromTable, Identifier newTableIdentifier, 
List<SchemaChange> changes) {
+        UpdateTableRequest request =
+                new UpdateTableRequest(newTableIdentifier, new 
SchemaChanges(changes));
+        client.post(
+                resourcePaths.table(fromTable.getDatabaseName(), 
fromTable.getTableName()),
+                request,
+                GetTableResponse.class,
+                headers());
+    }
+
+    @VisibleForTesting
+    Table getDataOrFormatTable(Identifier identifier) throws 
TableNotExistException {
+        Preconditions.checkArgument(identifier.getSystemTableName() == null);
+        TableSchema tableSchema = getDataTableSchema(identifier);
+        Lock.Factory lockFactory =
+                Lock.factory(
+                        lockFactory(context.options(), fileIO(), 
Optional.empty()).orElse(null),
+                        lockContext(context.options()).orElse(null),
+                        identifier);
+        // MetastoreClient is not used in RESTCatalog so null is ok.
+        FileStoreTable table =
+                FileStoreTableFactory.create(
+                        fileIO(),
+                        newTableLocation(warehouse(), identifier),
+                        tableSchema,
+                        new CatalogEnvironment(identifier, null, lockFactory, 
null));
+        CoreOptions options = table.coreOptions();
+        if (options.type() == TableType.OBJECT_TABLE) {
+            String objectLocation = options.objectLocation();
+            checkNotNull(objectLocation, "Object location should not be null 
for object table.");
+            table =
+                    ObjectTable.builder()
+                            .underlyingTable(table)
+                            .objectLocation(objectLocation)
+                            .objectFileIO(this.fileIO())
+                            .build();
+        }
+        return table;
+    }
+
+    protected TableSchema getDataTableSchema(Identifier identifier) throws 
TableNotExistException {
+        try {
+            GetTableResponse response =
+                    client.get(
+                            resourcePaths.table(
+                                    identifier.getDatabaseName(), 
identifier.getTableName()),
+                            GetTableResponse.class,
+                            headers());
+            return response.getSchema();
+        } catch (NoSuchResourceException e) {
+            throw new TableNotExistException(identifier);
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        }
+    }
+
     private static Map<String, String> configHeaders(Map<String, String> 
properties) {
         return RESTUtil.extractPrefixMap(properties, "header.");
     }
@@ -287,6 +460,47 @@ public class RESTCatalog implements Catalog {
         return catalogAuth.getHeaders();
     }
 
+    private Table getAllInSystemDatabase(Identifier identifier) throws 
TableNotExistException {
+        String tableName = identifier.getTableName();
+        Supplier<Map<String, Map<String, Path>>> getAllTablePathsFunction =
+                () -> {
+                    try {
+                        Map<String, Map<String, Path>> allPaths = new 
HashMap<>();
+                        for (String database : listDatabases()) {
+                            Map<String, Path> tableMap =
+                                    allPaths.computeIfAbsent(database, d -> 
new HashMap<>());
+                            for (String table : listTables(database)) {
+                                Path tableLocation =
+                                        newTableLocation(
+                                                warehouse(), 
Identifier.create(database, table));
+                                tableMap.put(table, tableLocation);
+                            }
+                        }
+                        return allPaths;
+                    } catch (DatabaseNotExistException e) {
+                        throw new RuntimeException("Database is deleted while 
listing", e);
+                    }
+                };
+        Table table =
+                SystemTableLoader.loadGlobal(
+                        tableName, fileIO(), getAllTablePathsFunction, 
context.options());
+        if (table == null) {
+            throw new TableNotExistException(identifier);
+        }
+        return table;
+    }
+
+    private Table getSystemTable(Identifier identifier) throws 
TableNotExistException {
+        Table originTable =
+                getDataOrFormatTable(
+                        new Identifier(
+                                identifier.getDatabaseName(),
+                                identifier.getTableName(),
+                                identifier.getBranchName(),
+                                null));
+        return CatalogUtils.getSystemTable(identifier, originTable);
+    }
+
     private ScheduledExecutorService tokenRefreshExecutor() {
         if (refreshExecutor == null) {
             synchronized (this) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogFactory.java
index a5c773cb4b..aa6e6f4d41 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalogFactory.java
@@ -33,6 +33,6 @@ public class RESTCatalogFactory implements CatalogFactory {
 
     @Override
     public Catalog create(CatalogContext context) {
-        return new RESTCatalog(context.options());
+        return new RESTCatalog(context);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/RESTObjectMapper.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTObjectMapper.java
index b1c83e9022..ce20158d0b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTObjectMapper.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTObjectMapper.java
@@ -18,18 +18,43 @@
 
 package org.apache.paimon.rest;
 
+import org.apache.paimon.schema.SchemaSerializer;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeJsonParser;
+
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
+import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.Module;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.SerializationFeature;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
 
+import static org.apache.paimon.utils.JsonSerdeUtil.registerJsonObjects;
+
 /** Object mapper for REST request and response. */
 public class RESTObjectMapper {
     public static ObjectMapper create() {
         ObjectMapper mapper = new ObjectMapper();
         mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
         mapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
+        mapper.registerModule(createPaimonRestJacksonModule());
         mapper.registerModule(new JavaTimeModule());
         return mapper;
     }
+
+    public static Module createPaimonRestJacksonModule() {
+        SimpleModule module = new SimpleModule("Paimon_REST");
+        registerJsonObjects(
+                module, TableSchema.class, SchemaSerializer.INSTANCE, 
SchemaSerializer.INSTANCE);
+        registerJsonObjects(
+                module,
+                DataField.class,
+                DataField::serializeJson,
+                DataTypeJsonParser::parseDataField);
+        registerJsonObjects(
+                module, DataType.class, DataType::serializeJson, 
DataTypeJsonParser::parseDataType);
+        return module;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
index 51277454ff..567dfea490 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/ResourcePaths.java
@@ -52,4 +52,23 @@ public class ResourcePaths {
                 .add("properties")
                 .toString();
     }
+
+    public String tables(String databaseName) {
+        return SLASH.add("v1")
+                .add(prefix)
+                .add("databases")
+                .add(databaseName)
+                .add("tables")
+                .toString();
+    }
+
+    public String table(String databaseName, String tableName) {
+        return SLASH.add("v1")
+                .add(prefix)
+                .add("databases")
+                .add(databaseName)
+                .add("tables")
+                .add(tableName)
+                .toString();
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/DatabaseName.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/CreateTableRequest.java
similarity index 54%
copy from 
paimon-core/src/main/java/org/apache/paimon/rest/responses/DatabaseName.java
copy to 
paimon-core/src/main/java/org/apache/paimon/rest/requests/CreateTableRequest.java
index 9a93b2fd1e..794dd33c46 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/DatabaseName.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/CreateTableRequest.java
@@ -16,29 +16,43 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.rest.responses;
+package org.apache.paimon.rest.requests;
 
-import org.apache.paimon.rest.RESTMessage;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.rest.RESTRequest;
+import org.apache.paimon.schema.Schema;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-/** Class for Database entity. */
-public class DatabaseName implements RESTMessage {
+/** Request for creating table. */
+public class CreateTableRequest implements RESTRequest {
 
-    private static final String FIELD_NAME = "name";
+    private static final String FIELD_IDENTIFIER = "identifier";
+    private static final String FIELD_SCHEMA = "schema";
 
-    @JsonProperty(FIELD_NAME)
-    private String name;
+    @JsonProperty(FIELD_IDENTIFIER)
+    private Identifier identifier;
+
+    @JsonProperty(FIELD_SCHEMA)
+    private Schema schema;
 
     @JsonCreator
-    public DatabaseName(@JsonProperty(FIELD_NAME) String name) {
-        this.name = name;
+    public CreateTableRequest(
+            @JsonProperty(FIELD_IDENTIFIER) Identifier identifier,
+            @JsonProperty(FIELD_SCHEMA) Schema schema) {
+        this.schema = schema;
+        this.identifier = identifier;
+    }
+
+    @JsonGetter(FIELD_IDENTIFIER)
+    public Identifier getIdentifier() {
+        return identifier;
     }
 
-    @JsonGetter(FIELD_NAME)
-    public String getName() {
-        return this.name;
+    @JsonGetter(FIELD_SCHEMA)
+    public Schema getSchema() {
+        return schema;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/requests/SchemaChanges.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/SchemaChanges.java
new file mode 100644
index 0000000000..1c3e419f13
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/SchemaChanges.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.requests;
+
+import org.apache.paimon.schema.SchemaChange;
+
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/** Schema changes to serialize List of SchemaChange . */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class SchemaChanges {
+
+    private static final String FIELD_SET_OPTIONS = "set-options";
+    private static final String FIELD_REMOVE_OPTIONS = "remove-options";
+    private static final String FIELD_COMMENT = "comment";
+    private static final String FIELD_ADD_COLUMNS = "add-columns";
+    private static final String FIELD_RENAME_COLUMNS = "rename-columns";
+    private static final String FIELD_DROP_COLUMNS = "drop-columns";
+    private static final String FIELD_UPDATE_COLUMN_TYPES = 
"update-column-types";
+    private static final String FIELD_UPDATE_COLUMN_NULLABILITIES = 
"update-column-nullabilities";
+    private static final String FIELD_UPDATE_COLUMN_COMMENTS = 
"update-column-comments";
+    private static final String FIELD_UPDATE_COLUMN_POSITIONS = 
"update-column-positions";
+
+    @JsonProperty(FIELD_SET_OPTIONS)
+    private Map<String, String> setOptions;
+
+    @JsonProperty(FIELD_REMOVE_OPTIONS)
+    private List<String> removeOptions;
+
+    @JsonProperty(FIELD_COMMENT)
+    private String comment;
+
+    @JsonProperty(FIELD_ADD_COLUMNS)
+    private List<SchemaChange.AddColumn> addColumns;
+
+    @JsonProperty(FIELD_RENAME_COLUMNS)
+    private List<SchemaChange.RenameColumn> renameColumns;
+
+    @JsonProperty(FIELD_DROP_COLUMNS)
+    private List<String> dropColumns;
+
+    @JsonProperty(FIELD_UPDATE_COLUMN_TYPES)
+    private List<SchemaChange.UpdateColumnType> updateColumnTypes;
+
+    @JsonProperty(FIELD_UPDATE_COLUMN_NULLABILITIES)
+    private List<SchemaChange.UpdateColumnNullability> 
updateColumnNullabilities;
+
+    @JsonProperty(FIELD_UPDATE_COLUMN_COMMENTS)
+    private List<SchemaChange.UpdateColumnComment> updateColumnComments;
+
+    @JsonProperty(FIELD_UPDATE_COLUMN_POSITIONS)
+    private List<SchemaChange.Move> updateColumnPositions;
+
+    @JsonCreator
+    public SchemaChanges(
+            @JsonProperty(FIELD_SET_OPTIONS) Map<String, String> setOptions,
+            @JsonProperty(FIELD_REMOVE_OPTIONS) List<String> removeOptions,
+            @JsonProperty(FIELD_COMMENT) String comment,
+            @JsonProperty(FIELD_ADD_COLUMNS) List<SchemaChange.AddColumn> 
addColumns,
+            @JsonProperty(FIELD_RENAME_COLUMNS) 
List<SchemaChange.RenameColumn> renameColumns,
+            @JsonProperty(FIELD_DROP_COLUMNS) List<String> dropColumns,
+            @JsonProperty(FIELD_UPDATE_COLUMN_TYPES)
+                    List<SchemaChange.UpdateColumnType> updateColumnTypes,
+            @JsonProperty(FIELD_UPDATE_COLUMN_NULLABILITIES)
+                    List<SchemaChange.UpdateColumnNullability> 
updateColumnNullabilities,
+            @JsonProperty(FIELD_UPDATE_COLUMN_COMMENTS)
+                    List<SchemaChange.UpdateColumnComment> 
updateColumnComments,
+            @JsonProperty(FIELD_UPDATE_COLUMN_POSITIONS)
+                    List<SchemaChange.Move> updateColumnPositions) {
+        this.setOptions = setOptions;
+        this.removeOptions = removeOptions;
+        this.comment = comment;
+        this.addColumns = addColumns;
+        this.renameColumns = renameColumns;
+        this.dropColumns = dropColumns;
+        this.updateColumnTypes = updateColumnTypes;
+        this.updateColumnNullabilities = updateColumnNullabilities;
+        this.updateColumnComments = updateColumnComments;
+        this.updateColumnPositions = updateColumnPositions;
+    }
+
+    public SchemaChanges(List<SchemaChange> changes) {
+        Map<String, String> setOptions = new HashMap<>();
+        List<String> removeOptions = new ArrayList<>();
+        String comment = null;
+        List<SchemaChange.AddColumn> addColumns = new ArrayList<>();
+        List<SchemaChange.RenameColumn> renameColumns = new ArrayList<>();
+        List<String> dropColumns = new ArrayList<>();
+        List<SchemaChange.UpdateColumnType> updateColumnTypes = new 
ArrayList<>();
+        List<SchemaChange.UpdateColumnNullability> updateColumnNullabilities = 
new ArrayList<>();
+        List<SchemaChange.UpdateColumnComment> updateColumnComments = new 
ArrayList<>();
+        List<SchemaChange.Move> updateColumnPositions = new ArrayList<>();
+        for (SchemaChange change : changes) {
+            if (change instanceof SchemaChange.SetOption) {
+                setOptions.put(
+                        ((SchemaChange.SetOption) change).key(),
+                        ((SchemaChange.SetOption) change).value());
+            } else if (change instanceof SchemaChange.RemoveOption) {
+                removeOptions.add(((SchemaChange.RemoveOption) change).key());
+            } else if (change instanceof SchemaChange.UpdateComment) {
+                comment = ((SchemaChange.UpdateComment) change).comment();
+            } else if (change instanceof SchemaChange.AddColumn) {
+                addColumns.add((SchemaChange.AddColumn) change);
+            } else if (change instanceof SchemaChange.RenameColumn) {
+                renameColumns.add((SchemaChange.RenameColumn) change);
+            } else if (change instanceof SchemaChange.DropColumn) {
+                dropColumns.addAll(Arrays.asList(((SchemaChange.DropColumn) 
change).fieldNames()));
+            } else if (change instanceof SchemaChange.UpdateColumnType) {
+                updateColumnTypes.add((SchemaChange.UpdateColumnType) change);
+            } else if (change instanceof SchemaChange.UpdateColumnNullability) 
{
+                
updateColumnNullabilities.add((SchemaChange.UpdateColumnNullability) change);
+            } else if (change instanceof SchemaChange.UpdateColumnComment) {
+                updateColumnComments.add((SchemaChange.UpdateColumnComment) 
change);
+            } else if (change instanceof SchemaChange.UpdateColumnPosition) {
+                updateColumnPositions.add(((SchemaChange.UpdateColumnPosition) 
change).move());
+            }
+        }
+        this.setOptions = setOptions;
+        this.removeOptions = removeOptions;
+        this.comment = comment;
+        this.addColumns = addColumns;
+        this.renameColumns = renameColumns;
+        this.dropColumns = dropColumns;
+        this.updateColumnTypes = updateColumnTypes;
+        this.updateColumnNullabilities = updateColumnNullabilities;
+        this.updateColumnComments = updateColumnComments;
+        this.updateColumnPositions = updateColumnPositions;
+    }
+
+    @JsonGetter(FIELD_SET_OPTIONS)
+    public Map<String, String> getSetOptions() {
+        return setOptions;
+    }
+
+    @JsonGetter(FIELD_REMOVE_OPTIONS)
+    public List<String> getRemoveOptions() {
+        return removeOptions;
+    }
+
+    @JsonGetter(FIELD_COMMENT)
+    public String getComment() {
+        return comment;
+    }
+
+    @JsonGetter(FIELD_ADD_COLUMNS)
+    public List<SchemaChange.AddColumn> getAddColumns() {
+        return addColumns;
+    }
+
+    @JsonGetter(FIELD_RENAME_COLUMNS)
+    public List<SchemaChange.RenameColumn> getRenameColumns() {
+        return renameColumns;
+    }
+
+    @JsonGetter(FIELD_DROP_COLUMNS)
+    public List<String> getDropColumns() {
+        return dropColumns;
+    }
+
+    @JsonGetter(FIELD_UPDATE_COLUMN_TYPES)
+    public List<SchemaChange.UpdateColumnType> getUpdateColumnTypes() {
+        return updateColumnTypes;
+    }
+
+    @JsonGetter(FIELD_UPDATE_COLUMN_NULLABILITIES)
+    public List<SchemaChange.UpdateColumnNullability> 
getUpdateColumnNullabilities() {
+        return updateColumnNullabilities;
+    }
+
+    @JsonGetter(FIELD_UPDATE_COLUMN_COMMENTS)
+    public List<SchemaChange.UpdateColumnComment> getUpdateColumnComments() {
+        return updateColumnComments;
+    }
+
+    @JsonGetter(FIELD_UPDATE_COLUMN_POSITIONS)
+    public List<SchemaChange.Move> getUpdateColumnPositions() {
+        return updateColumnPositions;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        SchemaChanges that = (SchemaChanges) o;
+        return Objects.equals(setOptions, that.setOptions)
+                && Objects.equals(removeOptions, that.removeOptions)
+                && Objects.equals(comment, that.comment)
+                && Objects.equals(addColumns, that.addColumns)
+                && Objects.equals(renameColumns, that.renameColumns)
+                && Objects.equals(dropColumns, that.dropColumns)
+                && Objects.equals(updateColumnTypes, that.updateColumnTypes)
+                && Objects.equals(updateColumnNullabilities, 
that.updateColumnNullabilities)
+                && Objects.equals(updateColumnComments, 
that.updateColumnComments)
+                && Objects.equals(updateColumnPositions, 
that.updateColumnPositions);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                setOptions,
+                removeOptions,
+                comment,
+                addColumns,
+                renameColumns,
+                dropColumns,
+                updateColumnTypes,
+                updateColumnNullabilities,
+                updateColumnComments,
+                updateColumnPositions);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/DatabaseName.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/UpdateTableRequest.java
similarity index 51%
copy from 
paimon-core/src/main/java/org/apache/paimon/rest/responses/DatabaseName.java
copy to 
paimon-core/src/main/java/org/apache/paimon/rest/requests/UpdateTableRequest.java
index 9a93b2fd1e..b522dc8ea1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/DatabaseName.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/requests/UpdateTableRequest.java
@@ -16,29 +16,42 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.rest.responses;
+package org.apache.paimon.rest.requests;
 
-import org.apache.paimon.rest.RESTMessage;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.rest.RESTRequest;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-/** Class for Database entity. */
-public class DatabaseName implements RESTMessage {
+/** Request for updating table. */
+public class UpdateTableRequest implements RESTRequest {
 
-    private static final String FIELD_NAME = "name";
+    private static final String FIELD_IDENTIFIER_NAME = "identifier-change";
+    private static final String FIELD_SCHEMA_CHANGES_NAME = "schema-changes";
 
-    @JsonProperty(FIELD_NAME)
-    private String name;
+    @JsonProperty(FIELD_IDENTIFIER_NAME)
+    private Identifier identifierChange;
+
+    @JsonProperty(FIELD_SCHEMA_CHANGES_NAME)
+    private SchemaChanges changes;
 
     @JsonCreator
-    public DatabaseName(@JsonProperty(FIELD_NAME) String name) {
-        this.name = name;
+    public UpdateTableRequest(
+            @JsonProperty(FIELD_IDENTIFIER_NAME) Identifier identifierChange,
+            @JsonProperty(FIELD_SCHEMA_CHANGES_NAME) SchemaChanges changes) {
+        this.identifierChange = identifierChange;
+        this.changes = changes;
+    }
+
+    @JsonGetter(FIELD_IDENTIFIER_NAME)
+    public Identifier getIdentifierChange() {
+        return identifierChange;
     }
 
-    @JsonGetter(FIELD_NAME)
-    public String getName() {
-        return this.name;
+    @JsonGetter(FIELD_SCHEMA_CHANGES_NAME)
+    public SchemaChanges getChanges() {
+        return changes;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListDatabasesResponse.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableResponse.java
similarity index 59%
copy from 
paimon-core/src/main/java/org/apache/paimon/rest/responses/ListDatabasesResponse.java
copy to 
paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableResponse.java
index 38773f354b..671c50cac5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListDatabasesResponse.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/GetTableResponse.java
@@ -19,27 +19,39 @@
 package org.apache.paimon.rest.responses;
 
 import org.apache.paimon.rest.RESTResponse;
+import org.apache.paimon.schema.TableSchema;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-import java.util.List;
+/** Response for getting table. */
+public class GetTableResponse implements RESTResponse {
 
-/** Response for listing databases. */
-public class ListDatabasesResponse implements RESTResponse {
-    private static final String FIELD_DATABASES = "databases";
+    private static final String FIELD_LOCATION = "location";
+    private static final String FIELD_SCHEMA = "schema";
 
-    @JsonProperty(FIELD_DATABASES)
-    private List<DatabaseName> databases;
+    @JsonProperty(FIELD_LOCATION)
+    private final String location;
+
+    @JsonProperty(FIELD_SCHEMA)
+    private final TableSchema schema;
 
     @JsonCreator
-    public ListDatabasesResponse(@JsonProperty(FIELD_DATABASES) 
List<DatabaseName> databases) {
-        this.databases = databases;
+    public GetTableResponse(
+            @JsonProperty(FIELD_LOCATION) String location,
+            @JsonProperty(FIELD_SCHEMA) TableSchema schema) {
+        this.location = location;
+        this.schema = schema;
+    }
+
+    @JsonGetter(FIELD_LOCATION)
+    public String getLocation() {
+        return this.location;
     }
 
-    @JsonGetter(FIELD_DATABASES)
-    public List<DatabaseName> getDatabases() {
-        return this.databases;
+    @JsonGetter(FIELD_SCHEMA)
+    public TableSchema getSchema() {
+        return this.schema;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListDatabasesResponse.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListDatabasesResponse.java
index 38773f354b..64a17a6be7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListDatabasesResponse.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListDatabasesResponse.java
@@ -31,15 +31,15 @@ public class ListDatabasesResponse implements RESTResponse {
     private static final String FIELD_DATABASES = "databases";
 
     @JsonProperty(FIELD_DATABASES)
-    private List<DatabaseName> databases;
+    private List<String> databases;
 
     @JsonCreator
-    public ListDatabasesResponse(@JsonProperty(FIELD_DATABASES) 
List<DatabaseName> databases) {
+    public ListDatabasesResponse(@JsonProperty(FIELD_DATABASES) List<String> 
databases) {
         this.databases = databases;
     }
 
     @JsonGetter(FIELD_DATABASES)
-    public List<DatabaseName> getDatabases() {
+    public List<String> getDatabases() {
         return this.databases;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/DatabaseName.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListTablesResponse.java
similarity index 70%
rename from 
paimon-core/src/main/java/org/apache/paimon/rest/responses/DatabaseName.java
rename to 
paimon-core/src/main/java/org/apache/paimon/rest/responses/ListTablesResponse.java
index 9a93b2fd1e..bccaa48438 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/DatabaseName.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListTablesResponse.java
@@ -18,27 +18,28 @@
 
 package org.apache.paimon.rest.responses;
 
-import org.apache.paimon.rest.RESTMessage;
+import org.apache.paimon.rest.RESTResponse;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 
-/** Class for Database entity. */
-public class DatabaseName implements RESTMessage {
+import java.util.List;
 
-    private static final String FIELD_NAME = "name";
+/** Response for listing tables. */
+public class ListTablesResponse implements RESTResponse {
+    private static final String FIELD_TABLES = "tables";
 
-    @JsonProperty(FIELD_NAME)
-    private String name;
+    @JsonProperty(FIELD_TABLES)
+    private List<String> tables;
 
     @JsonCreator
-    public DatabaseName(@JsonProperty(FIELD_NAME) String name) {
-        this.name = name;
+    public ListTablesResponse(@JsonProperty(FIELD_TABLES) List<String> tables) 
{
+        this.tables = tables;
     }
 
-    @JsonGetter(FIELD_NAME)
-    public String getName() {
-        return this.name;
+    @JsonGetter(FIELD_TABLES)
+    public List<String> getTables() {
+        return this.tables;
     }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
index a3b30d81a3..ee09841588 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/Schema.java
@@ -26,6 +26,11 @@ import org.apache.paimon.types.ReassignFieldId;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Preconditions;
 
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
@@ -46,24 +51,37 @@ import java.util.stream.Collectors;
  * @since 0.4.0
  */
 @Public
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class Schema {
 
+    private static final String FIELD_FIELDS = "fields";
+    private static final String FIELD_PARTITION_KEYS = "partition-keys";
+    private static final String FIELD_PRIMARY_KEYS = "primary-keys";
+    private static final String FIELD_OPTIONS = "options";
+    private static final String FIELD_COMMENT = "comment";
+
+    @JsonProperty(FIELD_FIELDS)
     private final List<DataField> fields;
 
+    @JsonProperty(FIELD_PARTITION_KEYS)
     private final List<String> partitionKeys;
 
+    @JsonProperty(FIELD_PRIMARY_KEYS)
     private final List<String> primaryKeys;
 
+    @JsonProperty(FIELD_OPTIONS)
     private final Map<String, String> options;
 
+    @JsonProperty(FIELD_COMMENT)
     private final String comment;
 
+    @JsonCreator
     public Schema(
-            List<DataField> fields,
-            List<String> partitionKeys,
-            List<String> primaryKeys,
-            Map<String, String> options,
-            String comment) {
+            @JsonProperty(FIELD_FIELDS) List<DataField> fields,
+            @JsonProperty(FIELD_PARTITION_KEYS) List<String> partitionKeys,
+            @JsonProperty(FIELD_PRIMARY_KEYS) List<String> primaryKeys,
+            @JsonProperty(FIELD_OPTIONS) Map<String, String> options,
+            @JsonProperty(FIELD_COMMENT) String comment) {
         this.options = new HashMap<>(options);
         this.partitionKeys = normalizePartitionKeys(partitionKeys);
         this.primaryKeys = normalizePrimaryKeys(primaryKeys);
@@ -75,22 +93,27 @@ public class Schema {
         return new RowType(false, fields);
     }
 
+    @JsonGetter(FIELD_FIELDS)
     public List<DataField> fields() {
         return fields;
     }
 
+    @JsonGetter(FIELD_PARTITION_KEYS)
     public List<String> partitionKeys() {
         return partitionKeys;
     }
 
+    @JsonGetter(FIELD_PRIMARY_KEYS)
     public List<String> primaryKeys() {
         return primaryKeys;
     }
 
+    @JsonGetter(FIELD_OPTIONS)
     public Map<String, String> options() {
         return options;
     }
 
+    @JsonGetter(FIELD_COMMENT)
     public String comment() {
         return comment;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
index cefa3c6eb9..a600b089c5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaChange.java
@@ -21,6 +21,11 @@ package org.apache.paimon.schema;
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.types.DataType;
 
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
@@ -221,36 +226,58 @@ public interface SchemaChange extends Serializable {
     }
 
     /** A SchemaChange to add a field. */
+    @JsonIgnoreProperties(ignoreUnknown = true)
     final class AddColumn implements SchemaChange {
 
         private static final long serialVersionUID = 1L;
 
+        private static final String FIELD_FILED_NAMES = "field-names";
+        private static final String FIELD_DATA_TYPE = "data-type";
+        private static final String FIELD_COMMENT = "comment";
+        private static final String FIELD_MOVE = "move";
+
+        @JsonProperty(FIELD_FILED_NAMES)
         private final String[] fieldNames;
+
+        @JsonProperty(FIELD_DATA_TYPE)
         private final DataType dataType;
+
+        @JsonProperty(FIELD_COMMENT)
         private final String description;
+
+        @JsonProperty(FIELD_MOVE)
         private final Move move;
 
-        private AddColumn(String[] fieldNames, DataType dataType, String 
description, Move move) {
+        @JsonCreator
+        private AddColumn(
+                @JsonProperty(FIELD_FILED_NAMES) String[] fieldNames,
+                @JsonProperty(FIELD_DATA_TYPE) DataType dataType,
+                @JsonProperty(FIELD_COMMENT) String description,
+                @JsonProperty(FIELD_MOVE) Move move) {
             this.fieldNames = fieldNames;
             this.dataType = dataType;
             this.description = description;
             this.move = move;
         }
 
+        @JsonGetter(FIELD_FILED_NAMES)
         public String[] fieldNames() {
             return fieldNames;
         }
 
+        @JsonGetter(FIELD_DATA_TYPE)
         public DataType dataType() {
             return dataType;
         }
 
         @Nullable
+        @JsonGetter(FIELD_COMMENT)
         public String description() {
             return description;
         }
 
         @Nullable
+        @JsonGetter(FIELD_MOVE)
         public Move move() {
             return move;
         }
@@ -267,7 +294,7 @@ public interface SchemaChange extends Serializable {
             return Arrays.equals(fieldNames, addColumn.fieldNames)
                     && dataType.equals(addColumn.dataType)
                     && Objects.equals(description, addColumn.description)
-                    && move.equals(addColumn.move);
+                    && Objects.equals(move, addColumn.move);
         }
 
         @Override
@@ -280,22 +307,34 @@ public interface SchemaChange extends Serializable {
     }
 
     /** A SchemaChange to rename a field. */
+    @JsonIgnoreProperties(ignoreUnknown = true)
     final class RenameColumn implements SchemaChange {
 
         private static final long serialVersionUID = 1L;
 
+        private static final String FIELD_FILED_NAMES = "field-names";
+        private static final String FIELD_NEW_NAME = "new-name";
+
+        @JsonProperty(FIELD_FILED_NAMES)
         private final String[] fieldNames;
+
+        @JsonProperty(FIELD_NEW_NAME)
         private final String newName;
 
-        private RenameColumn(String[] fieldNames, String newName) {
+        @JsonCreator
+        private RenameColumn(
+                @JsonProperty(FIELD_FILED_NAMES) String[] fieldNames,
+                @JsonProperty(FIELD_NEW_NAME) String newName) {
             this.fieldNames = fieldNames;
             this.newName = newName;
         }
 
+        @JsonGetter(FIELD_FILED_NAMES)
         public String[] fieldNames() {
             return fieldNames;
         }
 
+        @JsonGetter(FIELD_NEW_NAME)
         public String newName() {
             return newName;
         }
@@ -322,16 +361,22 @@ public interface SchemaChange extends Serializable {
     }
 
     /** A SchemaChange to drop a field. */
+    @JsonIgnoreProperties(ignoreUnknown = true)
     final class DropColumn implements SchemaChange {
 
         private static final long serialVersionUID = 1L;
 
+        private static final String FIELD_FILED_NAMES = "field-names";
+
+        @JsonProperty(FIELD_FILED_NAMES)
         private final String[] fieldNames;
 
-        private DropColumn(String[] fieldNames) {
+        @JsonCreator
+        private DropColumn(@JsonProperty(FIELD_FILED_NAMES) String[] 
fieldNames) {
             this.fieldNames = fieldNames;
         }
 
+        @JsonGetter(FIELD_FILED_NAMES)
         public String[] fieldNames() {
             return fieldNames;
         }
@@ -355,30 +400,44 @@ public interface SchemaChange extends Serializable {
     }
 
     /** A SchemaChange to update the field type. */
+    @JsonIgnoreProperties(ignoreUnknown = true)
     final class UpdateColumnType implements SchemaChange {
 
         private static final long serialVersionUID = 1L;
+        private static final String FIELD_FILED_NAMES = "field-names";
+        private static final String FIELD_NEW_DATA_TYPE = "new-data-type";
+        private static final String FIELD_KEEP_NULLABILITY = 
"keep-nullability";
 
+        @JsonProperty(FIELD_FILED_NAMES)
         private final String[] fieldNames;
+
+        @JsonProperty(FIELD_NEW_DATA_TYPE)
         private final DataType newDataType;
         // If true, do not change the target field nullability
+        @JsonProperty(FIELD_KEEP_NULLABILITY)
         private final boolean keepNullability;
 
+        @JsonCreator
         private UpdateColumnType(
-                String[] fieldNames, DataType newDataType, boolean 
keepNullability) {
+                @JsonProperty(FIELD_FILED_NAMES) String[] fieldNames,
+                @JsonProperty(FIELD_NEW_DATA_TYPE) DataType newDataType,
+                @JsonProperty(FIELD_KEEP_NULLABILITY) boolean keepNullability) 
{
             this.fieldNames = fieldNames;
             this.newDataType = newDataType;
             this.keepNullability = keepNullability;
         }
 
+        @JsonGetter(FIELD_FILED_NAMES)
         public String[] fieldNames() {
             return fieldNames;
         }
 
+        @JsonGetter(FIELD_NEW_DATA_TYPE)
         public DataType newDataType() {
             return newDataType;
         }
 
+        @JsonGetter(FIELD_KEEP_NULLABILITY)
         public boolean keepNullability() {
             return keepNullability;
         }
@@ -438,6 +497,7 @@ public interface SchemaChange extends Serializable {
     }
 
     /** Represents a requested column move in a struct. */
+    @JsonIgnoreProperties(ignoreUnknown = true)
     class Move implements Serializable {
 
         public enum MoveType {
@@ -465,24 +525,40 @@ public interface SchemaChange extends Serializable {
 
         private static final long serialVersionUID = 1L;
 
+        private static final String FIELD_FILED_NAMES = "field-name";
+        private static final String FIELD_REFERENCE_FIELD_NAME = 
"reference-field-name";
+        private static final String FIELD_TYPE = "type";
+
+        @JsonProperty(FIELD_FILED_NAMES)
         private final String fieldName;
+
+        @JsonProperty(FIELD_REFERENCE_FIELD_NAME)
         private final String referenceFieldName;
+
+        @JsonProperty(FIELD_TYPE)
         private final MoveType type;
 
-        public Move(String fieldName, String referenceFieldName, MoveType 
type) {
+        @JsonCreator
+        public Move(
+                @JsonProperty(FIELD_FILED_NAMES) String fieldName,
+                @JsonProperty(FIELD_REFERENCE_FIELD_NAME) String 
referenceFieldName,
+                @JsonProperty(FIELD_TYPE) MoveType type) {
             this.fieldName = fieldName;
             this.referenceFieldName = referenceFieldName;
             this.type = type;
         }
 
+        @JsonGetter(FIELD_FILED_NAMES)
         public String fieldName() {
             return fieldName;
         }
 
+        @JsonGetter(FIELD_REFERENCE_FIELD_NAME)
         public String referenceFieldName() {
             return referenceFieldName;
         }
 
+        @JsonGetter(FIELD_TYPE)
         public MoveType type() {
             return type;
         }
@@ -508,22 +584,34 @@ public interface SchemaChange extends Serializable {
     }
 
     /** A SchemaChange to update the (nested) field nullability. */
+    @JsonIgnoreProperties(ignoreUnknown = true)
     final class UpdateColumnNullability implements SchemaChange {
 
         private static final long serialVersionUID = 1L;
 
+        private static final String FIELD_FILED_NAMES = "field-names";
+        private static final String FIELD_NEW_NULLABILITY = "new-nullability";
+
+        @JsonProperty(FIELD_FILED_NAMES)
         private final String[] fieldNames;
+
+        @JsonProperty(FIELD_NEW_NULLABILITY)
         private final boolean newNullability;
 
-        public UpdateColumnNullability(String[] fieldNames, boolean 
newNullability) {
+        @JsonCreator
+        public UpdateColumnNullability(
+                @JsonProperty(FIELD_FILED_NAMES) String[] fieldNames,
+                @JsonProperty(FIELD_NEW_NULLABILITY) boolean newNullability) {
             this.fieldNames = fieldNames;
             this.newNullability = newNullability;
         }
 
+        @JsonGetter(FIELD_FILED_NAMES)
         public String[] fieldNames() {
             return fieldNames;
         }
 
+        @JsonGetter(FIELD_NEW_NULLABILITY)
         public boolean newNullability() {
             return newNullability;
         }
@@ -550,22 +638,34 @@ public interface SchemaChange extends Serializable {
     }
 
     /** A SchemaChange to update the (nested) field comment. */
+    @JsonIgnoreProperties(ignoreUnknown = true)
     final class UpdateColumnComment implements SchemaChange {
 
         private static final long serialVersionUID = 1L;
 
+        private static final String FIELD_FILED_NAMES = "field-names";
+        private static final String FIELD_NEW_COMMENT = "new-comment";
+
+        @JsonProperty(FIELD_FILED_NAMES)
         private final String[] fieldNames;
+
+        @JsonProperty(FIELD_NEW_COMMENT)
         private final String newDescription;
 
-        public UpdateColumnComment(String[] fieldNames, String newDescription) 
{
+        @JsonCreator
+        public UpdateColumnComment(
+                @JsonProperty(FIELD_FILED_NAMES) String[] fieldNames,
+                @JsonProperty(FIELD_NEW_COMMENT) String newDescription) {
             this.fieldNames = fieldNames;
             this.newDescription = newDescription;
         }
 
+        @JsonGetter(FIELD_FILED_NAMES)
         public String[] fieldNames() {
             return fieldNames;
         }
 
+        @JsonGetter(FIELD_NEW_COMMENT)
         public String newDescription() {
             return newDescription;
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
index a919d83c87..edc6dac5f9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/JsonSerdeUtil.java
@@ -154,21 +154,7 @@ public class JsonSerdeUtil {
         }
     }
 
-    private static Module createPaimonJacksonModule() {
-        SimpleModule module = new SimpleModule("Paimon");
-        registerJsonObjects(
-                module, TableSchema.class, SchemaSerializer.INSTANCE, 
SchemaSerializer.INSTANCE);
-        registerJsonObjects(
-                module,
-                DataField.class,
-                DataField::serializeJson,
-                DataTypeJsonParser::parseDataField);
-        registerJsonObjects(
-                module, DataType.class, DataType::serializeJson, 
DataTypeJsonParser::parseDataType);
-        return module;
-    }
-
-    private static <T> void registerJsonObjects(
+    public static <T> void registerJsonObjects(
             SimpleModule module,
             Class<T> clazz,
             JsonSerializer<T> serializer,
@@ -192,6 +178,20 @@ public class JsonSerdeUtil {
                 });
     }
 
+    private static Module createPaimonJacksonModule() {
+        SimpleModule module = new SimpleModule("Paimon");
+        registerJsonObjects(
+                module, TableSchema.class, SchemaSerializer.INSTANCE, 
SchemaSerializer.INSTANCE);
+        registerJsonObjects(
+                module,
+                DataField.class,
+                DataField::serializeJson,
+                DataTypeJsonParser::parseDataField);
+        registerJsonObjects(
+                module, DataType.class, DataType::serializeJson, 
DataTypeJsonParser::parseDataType);
+        return module;
+    }
+
     /**
      * Parses the provided JSON string and casts it to the specified type of 
{@link JsonNode}.
      *
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
index 821257a0e1..3e9f32ba08 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTMessage.java
@@ -18,18 +18,34 @@
 
 package org.apache.paimon.rest;
 
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.rest.requests.AlterDatabaseRequest;
 import org.apache.paimon.rest.requests.CreateDatabaseRequest;
+import org.apache.paimon.rest.requests.CreateTableRequest;
+import org.apache.paimon.rest.requests.SchemaChanges;
+import org.apache.paimon.rest.requests.UpdateTableRequest;
 import org.apache.paimon.rest.responses.AlterDatabaseResponse;
 import org.apache.paimon.rest.responses.CreateDatabaseResponse;
-import org.apache.paimon.rest.responses.DatabaseName;
 import org.apache.paimon.rest.responses.ErrorResponse;
 import org.apache.paimon.rest.responses.GetDatabaseResponse;
+import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
+import org.apache.paimon.rest.responses.ListTablesResponse;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.RowType;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -63,9 +79,8 @@ public class MockRESTMessage {
     }
 
     public static ListDatabasesResponse listDatabasesResponse(String name) {
-        DatabaseName databaseName = new DatabaseName(name);
-        List<DatabaseName> databaseNameList = new ArrayList<>();
-        databaseNameList.add(databaseName);
+        List<String> databaseNameList = new ArrayList<>();
+        databaseNameList.add(name);
         return new ListDatabasesResponse(databaseNameList);
     }
 
@@ -83,4 +98,122 @@ public class MockRESTMessage {
         return new AlterDatabaseResponse(
                 Lists.newArrayList("remove"), Lists.newArrayList("add"), new 
ArrayList<>());
     }
+
+    public static ListTablesResponse listTablesResponse() {
+        return new ListTablesResponse(Lists.newArrayList("table"));
+    }
+
+    public static ListTablesResponse listTablesEmptyResponse() {
+        return new ListTablesResponse(Lists.newArrayList());
+    }
+
+    public static CreateTableRequest createTableRequest(String name) {
+        Identifier identifier = Identifier.create(databaseName(), name);
+        Map<String, String> options = new HashMap<>();
+        options.put("k1", "v1");
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pt", DataTypes.INT())
+                        .column("pk", DataTypes.INT())
+                        .column("col1", DataTypes.INT())
+                        .column("col2", DataTypes.STRING())
+                        .partitionKeys("pt")
+                        .primaryKey("pk", "pt")
+                        .options(options)
+                        .build();
+        return new CreateTableRequest(identifier, schema);
+    }
+
+    public static UpdateTableRequest updateTableRequest(String toTableName) {
+        Identifier identifierChange = Identifier.create(databaseName(), 
toTableName);
+        SchemaChanges changes = new SchemaChanges(getChanges());
+        return new UpdateTableRequest(identifierChange, changes);
+    }
+
+    public static List<SchemaChange> getChanges() {
+        // add option
+        SchemaChange addOption = 
SchemaChange.setOption("snapshot.time-retained", "2h");
+        // remove option
+        SchemaChange removeOption = 
SchemaChange.removeOption("compaction.max.file-num");
+        // add column
+        SchemaChange addColumn =
+                SchemaChange.addColumn("col1_after", 
DataTypes.ARRAY(DataTypes.STRING()));
+        SchemaChange addColumnMap =
+                SchemaChange.addColumn(
+                        "col1_map_type", DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING()));
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT(),
+                            DataTypes.INT(),
+                            DataTypes.BIGINT(),
+                            DataTypes.BINARY(1),
+                            DataTypes.VARBINARY(1),
+                            DataTypes.MAP(DataTypes.VARCHAR(8), 
DataTypes.VARCHAR(8)),
+                            DataTypes.MULTISET(DataTypes.VARCHAR(8))
+                        },
+                        new String[] {"pt", "a", "b", "c", "d", "e", "f"});
+        SchemaChange addColumnRowType = SchemaChange.addColumn("col_row_type", 
rowType);
+        // add a column after col1
+        SchemaChange.Move after = SchemaChange.Move.after("col1_after", 
"col1");
+        SchemaChange addColumnAfterField =
+                SchemaChange.addColumn("col7", DataTypes.STRING(), "", after);
+        // rename column
+        SchemaChange renameColumn = SchemaChange.renameColumn("col3", 
"col3_new_name");
+        // drop column
+        SchemaChange dropColumn = SchemaChange.dropColumn("col6");
+        // update column comment
+        SchemaChange updateColumnComment =
+                SchemaChange.updateColumnComment(new String[] {"col4"}, "col4 
field");
+        // update nested column comment
+        SchemaChange updateNestedColumnComment =
+                SchemaChange.updateColumnComment(new String[] {"col5", "f1"}, 
"col5 f1 field");
+        // update column type
+        SchemaChange updateColumnType = SchemaChange.updateColumnType("col4", 
DataTypes.DOUBLE());
+        // update column position, you need to pass in a parameter of type Move
+        SchemaChange updateColumnPosition =
+                
SchemaChange.updateColumnPosition(SchemaChange.Move.first("col4"));
+        // update column nullability
+        SchemaChange updateColumnNullability =
+                SchemaChange.updateColumnNullability(new String[] {"col4"}, 
false);
+        // update nested column nullability
+        SchemaChange updateNestedColumnNullability =
+                SchemaChange.updateColumnNullability(new String[] {"col5", 
"f2"}, false);
+
+        List<SchemaChange> schemaChanges = new ArrayList<>();
+        schemaChanges.add(addOption);
+        schemaChanges.add(removeOption);
+        schemaChanges.add(addColumn);
+        schemaChanges.add(addColumnMap);
+        schemaChanges.add(addColumnRowType);
+        schemaChanges.add(addColumnAfterField);
+        schemaChanges.add(renameColumn);
+        schemaChanges.add(dropColumn);
+        schemaChanges.add(updateColumnComment);
+        schemaChanges.add(updateNestedColumnComment);
+        schemaChanges.add(updateColumnType);
+        schemaChanges.add(updateColumnPosition);
+        schemaChanges.add(updateColumnNullability);
+        schemaChanges.add(updateNestedColumnNullability);
+        return schemaChanges;
+    }
+
+    public static GetTableResponse getTableResponse() {
+        return new GetTableResponse("location", tableSchema());
+    }
+
+    private static TableSchema tableSchema() {
+        List<DataField> fields =
+                Arrays.asList(
+                        new DataField(0, "f0", new IntType()),
+                        new DataField(1, "f1", new IntType()));
+        List<String> partitionKeys = Collections.singletonList("f0");
+        List<String> primaryKeys = Arrays.asList("f0", "f1");
+        Map<String, String> options = new HashMap<>();
+        options.put("option-1", "value-1");
+        options.put("option-2", "value-2");
+        // set path for test as if not set system will add one
+        options.put(CoreOptions.PATH.key(), "/a/b/c");
+        return new TableSchema(1, fields, 1, partitionKeys, primaryKeys, 
options, "comment");
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 9b15829295..a3c4ab3401 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -19,14 +19,21 @@
 package org.apache.paimon.rest;
 
 import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.Database;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.requests.CreateTableRequest;
 import org.apache.paimon.rest.responses.AlterDatabaseResponse;
 import org.apache.paimon.rest.responses.CreateDatabaseResponse;
 import org.apache.paimon.rest.responses.ErrorResponse;
 import org.apache.paimon.rest.responses.GetDatabaseResponse;
+import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
+import org.apache.paimon.rest.responses.ListTablesResponse;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.Table;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@@ -35,7 +42,9 @@ import okhttp3.mockwebserver.MockResponse;
 import okhttp3.mockwebserver.MockWebServer;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -46,11 +55,12 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
 
 /** Test for REST Catalog. */
 public class RESTCatalogTest {
@@ -59,6 +69,9 @@ public class RESTCatalogTest {
     private MockWebServer mockWebServer;
     private RESTCatalog restCatalog;
     private RESTCatalog mockRestCatalog;
+    private CatalogContext context;
+    private String warehouseStr;
+    @Rule public TemporaryFolder folder = new TemporaryFolder();
 
     @Before
     public void setUp() throws IOException {
@@ -70,12 +83,17 @@ public class RESTCatalogTest {
         String initToken = "init_token";
         options.set(RESTCatalogOptions.TOKEN, initToken);
         options.set(RESTCatalogOptions.THREAD_POOL_SIZE, 1);
+        warehouseStr = folder.getRoot().getPath();
         String mockResponse =
                 String.format(
-                        "{\"defaults\": {\"%s\": \"%s\"}}",
-                        RESTCatalogInternalOptions.PREFIX.key(), "prefix");
+                        "{\"defaults\": {\"%s\": \"%s\", \"%s\": \"%s\"}}",
+                        RESTCatalogInternalOptions.PREFIX.key(),
+                        "prefix",
+                        CatalogOptions.WAREHOUSE.key(),
+                        warehouseStr);
         mockResponse(mockResponse, 200);
-        restCatalog = new RESTCatalog(options);
+        context = CatalogContext.create(options);
+        restCatalog = new RESTCatalog(context);
         mockRestCatalog = spy(restCatalog);
     }
 
@@ -87,8 +105,10 @@ public class RESTCatalogTest {
     @Test
     public void testInitFailWhenDefineWarehouse() {
         Options options = new Options();
-        options.set(CatalogOptions.WAREHOUSE, "/a/b/c");
-        assertThrows(IllegalArgumentException.class, () -> new 
RESTCatalog(options));
+        options.set(CatalogOptions.WAREHOUSE, warehouseStr);
+        assertThrows(
+                IllegalArgumentException.class,
+                () -> new RESTCatalog(CatalogContext.create(options)));
     }
 
     @Test
@@ -164,8 +184,9 @@ public class RESTCatalogTest {
     public void testDropDatabaseWhenCascadeIsFalseAndNoTables() throws 
Exception {
         String name = MockRESTMessage.databaseName();
         boolean cascade = false;
+        ListTablesResponse response = 
MockRESTMessage.listTablesEmptyResponse();
+        mockResponse(mapper.writeValueAsString(response), 200);
         mockResponse("", 200);
-        when(mockRestCatalog.listTables(name)).thenReturn(new ArrayList<>());
         assertDoesNotThrow(() -> mockRestCatalog.dropDatabase(name, false, 
cascade));
         verify(mockRestCatalog, times(1)).dropDatabase(eq(name), eq(false), 
eq(cascade));
         verify(mockRestCatalog, times(1)).listTables(eq(name));
@@ -175,10 +196,8 @@ public class RESTCatalogTest {
     public void testDropDatabaseWhenCascadeIsFalseAndTablesExist() throws 
Exception {
         String name = MockRESTMessage.databaseName();
         boolean cascade = false;
-        mockResponse("", 200);
-        List<String> tables = new ArrayList<>();
-        tables.add("t1");
-        when(mockRestCatalog.listTables(name)).thenReturn(tables);
+        ListTablesResponse response = MockRESTMessage.listTablesResponse();
+        mockResponse(mapper.writeValueAsString(response), 200);
         assertThrows(
                 Catalog.DatabaseNotEmptyException.class,
                 () -> mockRestCatalog.dropDatabase(name, false, cascade));
@@ -213,6 +232,133 @@ public class RESTCatalogTest {
         assertDoesNotThrow(() -> mockRestCatalog.alterDatabase(name, new 
ArrayList<>(), true));
     }
 
+    @Test
+    public void testListTables() throws Exception {
+        String databaseName = MockRESTMessage.databaseName();
+        ListTablesResponse response = MockRESTMessage.listTablesResponse();
+        mockResponse(mapper.writeValueAsString(response), 200);
+        List<String> result = restCatalog.listTables(databaseName);
+        assertEquals(response.getTables().size(), result.size());
+    }
+
+    @Test
+    public void testGetTable() throws Exception {
+        String databaseName = MockRESTMessage.databaseName();
+        GetTableResponse response = MockRESTMessage.getTableResponse();
+        mockResponse(mapper.writeValueAsString(response), 200);
+        Table result = 
mockRestCatalog.getTable(Identifier.create(databaseName, "table"));
+        assertEquals(response.getSchema().options().size(), 
result.options().size());
+        verify(mockRestCatalog, times(1)).getDataOrFormatTable(any());
+    }
+
+    @Test
+    public void testCreateTable() throws Exception {
+        CreateTableRequest request = 
MockRESTMessage.createTableRequest("table");
+        GetTableResponse response = MockRESTMessage.getTableResponse();
+        mockResponse(mapper.writeValueAsString(response), 200);
+        assertDoesNotThrow(
+                () -> restCatalog.createTable(request.getIdentifier(), 
request.getSchema(), false));
+    }
+
+    @Test
+    public void testCreateTableWhenTableAlreadyExistAndIgnoreIfExistsIsFalse() 
throws Exception {
+        CreateTableRequest request = 
MockRESTMessage.createTableRequest("table");
+        mockResponse("", 409);
+        assertThrows(
+                Catalog.TableAlreadyExistException.class,
+                () -> restCatalog.createTable(request.getIdentifier(), 
request.getSchema(), false));
+    }
+
+    @Test
+    public void testRenameTable() throws Exception {
+        String databaseName = MockRESTMessage.databaseName();
+        String fromTableName = "fromTable";
+        String toTableName = "toTable";
+        GetTableResponse response = MockRESTMessage.getTableResponse();
+        mockResponse(mapper.writeValueAsString(response), 200);
+        assertDoesNotThrow(
+                () ->
+                        mockRestCatalog.renameTable(
+                                Identifier.create(databaseName, fromTableName),
+                                Identifier.create(databaseName, toTableName),
+                                true));
+        verify(mockRestCatalog, times(1)).updateTable(any(), any(), anyList());
+    }
+
+    @Test
+    public void testRenameTableWhenTableNotExistAndIgnoreIfNotExistsIsFalse() 
throws Exception {
+        String databaseName = MockRESTMessage.databaseName();
+        String fromTableName = "fromTable";
+        String toTableName = "toTable";
+        mockResponse("", 404);
+        assertThrows(
+                Catalog.TableNotExistException.class,
+                () ->
+                        mockRestCatalog.renameTable(
+                                Identifier.create(databaseName, fromTableName),
+                                Identifier.create(databaseName, toTableName),
+                                false));
+    }
+
+    @Test
+    public void testRenameTableWhenToTableAlreadyExist() throws Exception {
+        String databaseName = MockRESTMessage.databaseName();
+        String fromTableName = "fromTable";
+        String toTableName = "toTable";
+        mockResponse("", 409);
+        assertThrows(
+                Catalog.TableAlreadyExistException.class,
+                () ->
+                        mockRestCatalog.renameTable(
+                                Identifier.create(databaseName, fromTableName),
+                                Identifier.create(databaseName, toTableName),
+                                false));
+    }
+
+    @Test
+    public void testAlterTable() throws Exception {
+        String databaseName = MockRESTMessage.databaseName();
+        List<SchemaChange> changes = MockRESTMessage.getChanges();
+        GetTableResponse response = MockRESTMessage.getTableResponse();
+        mockResponse(mapper.writeValueAsString(response), 200);
+        assertDoesNotThrow(
+                () ->
+                        mockRestCatalog.alterTable(
+                                Identifier.create(databaseName, "t1"), 
changes, true));
+        verify(mockRestCatalog, times(1)).updateTable(any(), any(), anyList());
+    }
+
+    @Test
+    public void testAlterTableWhenTableNotExistAndIgnoreIfNotExistsIsFalse() 
throws Exception {
+        String databaseName = MockRESTMessage.databaseName();
+        List<SchemaChange> changes = MockRESTMessage.getChanges();
+        mockResponse("", 404);
+        assertThrows(
+                Catalog.TableNotExistException.class,
+                () ->
+                        mockRestCatalog.alterTable(
+                                Identifier.create(databaseName, "t1"), 
changes, false));
+    }
+
+    @Test
+    public void testDropTable() throws Exception {
+        String databaseName = MockRESTMessage.databaseName();
+        String tableName = "table";
+        mockResponse("", 200);
+        assertDoesNotThrow(
+                () -> restCatalog.dropTable(Identifier.create(databaseName, 
tableName), true));
+    }
+
+    @Test
+    public void testDropTableWhenTableNotExistAndIgnoreIfNotExistsIsFalse() 
throws Exception {
+        String databaseName = MockRESTMessage.databaseName();
+        String tableName = "table";
+        mockResponse("", 404);
+        assertThrows(
+                Catalog.TableNotExistException.class,
+                () -> restCatalog.dropTable(Identifier.create(databaseName, 
tableName), false));
+    }
+
     private void mockResponse(String mockResponse, int httpCode) {
         MockResponse mockResponseObj =
                 new MockResponse()
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
index 0e5a71be39..9cc362881f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTObjectMapperTest.java
@@ -20,12 +20,19 @@ package org.apache.paimon.rest;
 
 import org.apache.paimon.rest.requests.AlterDatabaseRequest;
 import org.apache.paimon.rest.requests.CreateDatabaseRequest;
+import org.apache.paimon.rest.requests.CreateTableRequest;
+import org.apache.paimon.rest.requests.UpdateTableRequest;
 import org.apache.paimon.rest.responses.AlterDatabaseResponse;
 import org.apache.paimon.rest.responses.ConfigResponse;
 import org.apache.paimon.rest.responses.CreateDatabaseResponse;
 import org.apache.paimon.rest.responses.ErrorResponse;
 import org.apache.paimon.rest.responses.GetDatabaseResponse;
+import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
+import org.apache.paimon.rest.responses.ListTablesResponse;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.IntType;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
@@ -103,7 +110,7 @@ public class RESTObjectMapperTest {
         ListDatabasesResponse parseData =
                 mapper.readValue(responseStr, ListDatabasesResponse.class);
         assertEquals(response.getDatabases().size(), 
parseData.getDatabases().size());
-        assertEquals(name, parseData.getDatabases().get(0).getName());
+        assertEquals(name, parseData.getDatabases().get(0));
     }
 
     @Test
@@ -125,4 +132,57 @@ public class RESTObjectMapperTest {
         assertEquals(response.getUpdated().size(), 
parseData.getUpdated().size());
         assertEquals(response.getMissing().size(), 
parseData.getMissing().size());
     }
+
+    @Test
+    public void createTableRequestParseTest() throws Exception {
+        CreateTableRequest request = MockRESTMessage.createTableRequest("t1");
+        String requestStr = mapper.writeValueAsString(request);
+        CreateTableRequest parseData = mapper.readValue(requestStr, 
CreateTableRequest.class);
+        assertEquals(request.getIdentifier(), parseData.getIdentifier());
+        assertEquals(request.getSchema(), parseData.getSchema());
+    }
+
+    // This test is to guarantee the compatibility of field name in 
RESTCatalog.
+    @Test
+    public void dataFieldParseTest() throws Exception {
+        int id = 1;
+        String name = "col1";
+        IntType type = DataTypes.INT();
+        String descStr = "desc";
+        String dataFieldStr =
+                String.format(
+                        "{\"id\": %d,\"name\":\"%s\",\"type\":\"%s\", 
\"description\":\"%s\"}",
+                        id, name, type, descStr);
+        DataField parseData = mapper.readValue(dataFieldStr, DataField.class);
+        assertEquals(id, parseData.id());
+        assertEquals(name, parseData.name());
+        assertEquals(type, parseData.type());
+        assertEquals(descStr, parseData.description());
+    }
+
+    @Test
+    public void updateTableRequestParseTest() throws Exception {
+        UpdateTableRequest request = MockRESTMessage.updateTableRequest("t2");
+        String requestStr = mapper.writeValueAsString(request);
+        UpdateTableRequest parseData = mapper.readValue(requestStr, 
UpdateTableRequest.class);
+        assertEquals(request.getIdentifierChange(), 
parseData.getIdentifierChange());
+        assertEquals(request.getChanges(), parseData.getChanges());
+    }
+
+    @Test
+    public void getTableResponseParseTest() throws Exception {
+        GetTableResponse response = MockRESTMessage.getTableResponse();
+        String responseStr = mapper.writeValueAsString(response);
+        GetTableResponse parseData = mapper.readValue(responseStr, 
GetTableResponse.class);
+        assertEquals(response.getLocation(), parseData.getLocation());
+        assertEquals(response.getSchema(), parseData.getSchema());
+    }
+
+    @Test
+    public void listTablesResponseParseTest() throws Exception {
+        ListTablesResponse response = MockRESTMessage.listTablesResponse();
+        String responseStr = mapper.writeValueAsString(response);
+        ListTablesResponse parseData = mapper.readValue(responseStr, 
ListTablesResponse.class);
+        assertEquals(response.getTables(), parseData.getTables());
+    }
 }
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 0be872a58c..fd22ca2032 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -100,6 +100,11 @@ import static org.apache.paimon.CoreOptions.FILE_FORMAT;
 import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME;
 import static org.apache.paimon.CoreOptions.TYPE;
 import static org.apache.paimon.TableType.FORMAT_TABLE;
+import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
+import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
+import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
+import static org.apache.paimon.catalog.CatalogUtils.isSystemDatabase;
+import static org.apache.paimon.catalog.CatalogUtils.lockFactory;
 import static org.apache.paimon.hive.HiveCatalogLock.acquireTimeout;
 import static org.apache.paimon.hive.HiveCatalogLock.checkMaxSleep;
 import static org.apache.paimon.hive.HiveCatalogOptions.HADOOP_CONF_DIR;
@@ -632,7 +637,8 @@ public class HiveCatalog extends AbstractCatalog {
                             identifier,
                             tableMeta.uuid(),
                             Lock.factory(
-                                    lockFactory().orElse(null),
+                                    lockFactory(catalogOptions, fileIO(), 
defaultLockFactory())
+                                            .orElse(null),
                                     lockContext().orElse(null),
                                     identifier),
                             metastoreClientFactory(identifier).orElse(null)));
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
index 5cc826b554..38fa4dfe4d 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/main/java/org/apache/paimon/hive/PaimonMetaHook.java
@@ -19,8 +19,8 @@
 package org.apache.paimon.hive;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.AbstractCatalog;
 import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogUtils;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -87,7 +87,7 @@ public class PaimonMetaHook implements HiveMetaHook {
             org.apache.hadoop.fs.Path hadoopPath =
                     getDnsPath(new org.apache.hadoop.fs.Path(warehouse), conf);
             warehouse = hadoopPath.toUri().toString();
-            location = AbstractCatalog.newTableLocation(warehouse, 
identifier).toUri().toString();
+            location = CatalogUtils.newTableLocation(warehouse, 
identifier).toUri().toString();
             table.getSd().setLocation(location);
         }
 
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/CreateTableITCase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/CreateTableITCase.java
index 15856c3c06..992272b0f6 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/CreateTableITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/CreateTableITCase.java
@@ -18,10 +18,10 @@
 
 package org.apache.paimon.hive;
 
-import org.apache.paimon.catalog.AbstractCatalog;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.CatalogUtils;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
@@ -94,7 +94,7 @@ public class CreateTableITCase extends HiveTestBase {
                         Maps.newHashMap(),
                         "");
         Identifier identifier = Identifier.create(DATABASE_TEST, tableName);
-        Path tablePath = AbstractCatalog.newTableLocation(path, identifier);
+        Path tablePath = CatalogUtils.newTableLocation(path, identifier);
         new SchemaManager(LocalFileIO.create(), tablePath).createTable(schema);
 
         // Create hive external table
@@ -189,7 +189,7 @@ public class CreateTableITCase extends HiveTestBase {
 
         // check the paimon table schema
         Identifier identifier = Identifier.create(DATABASE_TEST, tableName);
-        Path tablePath = AbstractCatalog.newTableLocation(path, identifier);
+        Path tablePath = CatalogUtils.newTableLocation(path, identifier);
         Optional<TableSchema> tableSchema =
                 new SchemaManager(LocalFileIO.create(), tablePath).latest();
         assertThat(tableSchema).isPresent();
@@ -245,7 +245,7 @@ public class CreateTableITCase extends HiveTestBase {
         }
         // check the paimon table name and schema
         Identifier identifier = Identifier.create(DATABASE_TEST, 
tableName.toLowerCase());
-        Path tablePath = AbstractCatalog.newTableLocation(path, identifier);
+        Path tablePath = CatalogUtils.newTableLocation(path, identifier);
         Options conf = new Options();
         conf.set(CatalogOptions.WAREHOUSE, path);
         CatalogContext catalogContext = CatalogContext.create(conf);
@@ -310,7 +310,7 @@ public class CreateTableITCase extends HiveTestBase {
 
         // check the paimon db name态table name and schema
         Identifier identifier = Identifier.create(upperDB.toLowerCase(), 
tableName.toLowerCase());
-        Path tablePath = AbstractCatalog.newTableLocation(path, identifier);
+        Path tablePath = CatalogUtils.newTableLocation(path, identifier);
         Options conf = new Options();
         conf.set(CatalogOptions.WAREHOUSE, path);
         CatalogContext catalogContext = CatalogContext.create(conf);
@@ -355,7 +355,7 @@ public class CreateTableITCase extends HiveTestBase {
 
         // check the paimon table schema
         Identifier identifier = Identifier.create(DATABASE_TEST, tableName);
-        Path tablePath = AbstractCatalog.newTableLocation(path, identifier);
+        Path tablePath = CatalogUtils.newTableLocation(path, identifier);
         Optional<TableSchema> tableSchema =
                 new SchemaManager(LocalFileIO.create(), tablePath).latest();
         assertThat(tableSchema).isPresent();
@@ -397,7 +397,7 @@ public class CreateTableITCase extends HiveTestBase {
 
         // check the paimon table schema
         Identifier identifier = Identifier.create(DATABASE_TEST, tableName);
-        Path tablePath = AbstractCatalog.newTableLocation(path, identifier);
+        Path tablePath = CatalogUtils.newTableLocation(path, identifier);
         Optional<TableSchema> tableSchema =
                 new SchemaManager(LocalFileIO.create(), tablePath).latest();
         assertThat(tableSchema).isPresent();
@@ -441,7 +441,7 @@ public class CreateTableITCase extends HiveTestBase {
 
         // check the paimon table schema
         Identifier identifier = Identifier.create(DATABASE_TEST, tableName);
-        Path tablePath = AbstractCatalog.newTableLocation(path, identifier);
+        Path tablePath = CatalogUtils.newTableLocation(path, identifier);
         Optional<TableSchema> tableSchema =
                 new SchemaManager(LocalFileIO.create(), tablePath).latest();
         assertThat(tableSchema).isPresent();
@@ -489,7 +489,7 @@ public class CreateTableITCase extends HiveTestBase {
                             Maps.newHashMap(),
                             "");
             Identifier identifier = Identifier.create(DATABASE_TEST, 
tableName);
-            Path tablePath = AbstractCatalog.newTableLocation(path, 
identifier);
+            Path tablePath = CatalogUtils.newTableLocation(path, identifier);
             new SchemaManager(LocalFileIO.create(), 
tablePath).createTable(schema);
 
             String hiveSql =
@@ -533,7 +533,7 @@ public class CreateTableITCase extends HiveTestBase {
             } catch (Exception ignore) {
             } finally {
                 Identifier identifier = Identifier.create(DATABASE_TEST, 
tableName);
-                Path tablePath = AbstractCatalog.newTableLocation(path, 
identifier);
+                Path tablePath = CatalogUtils.newTableLocation(path, 
identifier);
                 boolean isPresent =
                         new SchemaManager(LocalFileIO.create(), 
tablePath).latest().isPresent();
                 Assertions.assertThat(isPresent).isFalse();
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveLocationTest.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveLocationTest.java
index f3fe03fbba..7e52b89279 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveLocationTest.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveLocationTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.paimon.hive;
 
-import org.apache.paimon.catalog.AbstractCatalog;
 import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogUtils;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -257,7 +257,7 @@ public class HiveLocationTest {
 
             Identifier identifier = Identifier.create(dbName, tableName);
             String location =
-                    AbstractCatalog.newTableLocation(warehouse, 
identifier).toUri().toString();
+                    CatalogUtils.newTableLocation(warehouse, 
identifier).toUri().toString();
 
             String createTableSqlStr =
                     getCreateTableSqlStr(tableName, location, 
locationInProperties);
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveReadITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveReadITCaseBase.java
index 4b16788ee7..882215f7c0 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveReadITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveReadITCaseBase.java
@@ -19,7 +19,7 @@
 package org.apache.paimon.hive;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.catalog.CatalogUtils;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.Decimal;
@@ -975,7 +975,7 @@ public abstract class HiveReadITCaseBase extends 
HiveTestBase {
                         Maps.newHashMap(),
                         "");
         Identifier identifier = Identifier.create(DATABASE_TEST, tableName);
-        Path tablePath = AbstractCatalog.newTableLocation(path, identifier);
+        Path tablePath = CatalogUtils.newTableLocation(path, identifier);
         new SchemaManager(LocalFileIO.create(), tablePath).createTable(schema);
 
         // Create hive external table
@@ -1057,7 +1057,7 @@ public abstract class HiveReadITCaseBase extends 
HiveTestBase {
         commit.close();
 
         // add column, do some ddl which will generate a new version schema-n 
file.
-        Path tablePath = AbstractCatalog.newTableLocation(path, identifier);
+        Path tablePath = CatalogUtils.newTableLocation(path, identifier);
         SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
tablePath);
         schemaManager.commitChanges(SchemaChange.addColumn("N1", 
DataTypes.STRING()));
 
diff --git a/paimon-open-api/rest-catalog-open-api.yaml 
b/paimon-open-api/rest-catalog-open-api.yaml
index f7f9529f53..f3f3f33349 100644
--- a/paimon-open-api/rest-catalog-open-api.yaml
+++ b/paimon-open-api/rest-catalog-open-api.yaml
@@ -80,6 +80,159 @@ paths:
                 $ref: '#/components/schemas/ErrorResponse'
         "500":
           description: Internal Server Error
+  /v1/{prefix}/databases/{database}/tables:
+    get:
+      tags:
+        - table
+      summary: List tables
+      operationId: listTables
+      parameters:
+        - name: prefix
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: database
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        "200":
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ListTablesResponse'
+        "500":
+          description: Internal Server Error
+    post:
+      tags:
+        - table
+      summary: Create table
+      operationId: createTable
+      parameters:
+        - name: prefix
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: database
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/CreateTableRequest'
+      responses:
+        "200":
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/GetTableResponse'
+        "500":
+          description: Internal Server Error
+  /v1/{prefix}/databases/{database}/tables/{table}:
+    get:
+      tags:
+        - table
+      summary: Get table
+      operationId: getTable
+      parameters:
+        - name: prefix
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: database
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: table
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        "200":
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/GetTableResponse'
+        "404":
+          description: Resource not found
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ErrorResponse'
+        "500":
+          description: Internal Server Error
+    post:
+      tags:
+        - table
+      summary: Update table
+      operationId: updateTable
+      parameters:
+        - name: prefix
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: database
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: table
+          in: path
+          required: true
+          schema:
+            type: string
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/UpdateTableRequest'
+      responses:
+        "200":
+          description: OK
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/GetTableResponse'
+        "500":
+          description: Internal Server Error
+    delete:
+      tags:
+        - table
+      summary: Drop table
+      operationId: dropTable
+      parameters:
+        - name: prefix
+          in: path
+          required: true
+          schema:
+            type: string
+        - name: database
+          in: path
+          required: true
+          schema:
+            type: string
+      responses:
+        "404":
+          description: Resource not found
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ErrorResponse'
+        "500":
+          description: Internal Server Error
   /v1/{prefix}/databases/{database}/properties:
     post:
       tags:
@@ -200,6 +353,15 @@ components:
           type: object
           additionalProperties:
             type: string
+    CreateDatabaseResponse:
+      type: object
+      properties:
+        name:
+          type: string
+        options:
+          type: object
+          additionalProperties:
+            type: string
     ErrorResponse:
       type: object
       properties:
@@ -212,15 +374,208 @@ components:
           type: array
           items:
             type: string
-    CreateDatabaseResponse:
+    CreateTableRequest:
       type: object
       properties:
+        identifier:
+          $ref: '#/components/schemas/Identifier'
+        schema:
+          $ref: '#/components/schemas/Schema'
+    DataField:
+      type: object
+      properties:
+        id:
+          type: integer
+          format: int32
         name:
           type: string
+        type:
+          $ref: '#/components/schemas/DataType'
+        description:
+          type: string
+    DataType:
+      anyOf:
+        - type: string
+        - type: object
+    Identifier:
+      type: object
+      properties:
+        database:
+          type: string
+        table:
+          type: string
+        branch:
+          type: string
+    Schema:
+      type: object
+      properties:
+        fields:
+          type: array
+          items:
+            $ref: '#/components/schemas/DataField'
+        partition-keys:
+          type: array
+          items:
+            type: string
+        primary-keys:
+          type: array
+          items:
+            type: string
         options:
           type: object
           additionalProperties:
             type: string
+        comment:
+          type: string
+    GetTableResponse:
+      type: object
+      properties:
+        location:
+          type: string
+        schema:
+          $ref: '#/components/schemas/TableSchema'
+    TableSchema:
+      type: object
+      properties:
+        version:
+          type: integer
+          format: int32
+        id:
+          type: integer
+          format: int64
+        highestFieldId:
+          type: integer
+          format: int32
+        partitionKeys:
+          type: array
+          items:
+            type: string
+        primaryKeys:
+          type: array
+          items:
+            type: string
+        options:
+          type: object
+          additionalProperties:
+            type: string
+        comment:
+          type: string
+        timeMillis:
+          type: integer
+          format: int64
+    AddColumn:
+      type: object
+      properties:
+        field-names:
+          type: array
+          items:
+            type: string
+        data-types:
+          $ref: '#/components/schemas/DataType'
+        comment:
+          type: string
+        move:
+          $ref: '#/components/schemas/Move'
+    Move:
+      type: object
+      properties:
+        field-name:
+          type: string
+        reference-field-name:
+          type: string
+        type:
+          type: string
+          enum:
+            - FIRST
+            - AFTER
+            - BEFORE
+            - LAST
+    RenameColumn:
+      type: object
+      properties:
+        field-names:
+          type: array
+          items:
+            type: string
+        new-name:
+          type: string
+    SchemaChanges:
+      type: object
+      properties:
+        set-options:
+          type: object
+          additionalProperties:
+            type: string
+        remove-options:
+          type: array
+          items:
+            type: string
+        comment:
+          type: string
+        add-columns:
+          type: array
+          items:
+            $ref: '#/components/schemas/AddColumn'
+        rename-columns:
+          type: array
+          items:
+            $ref: '#/components/schemas/RenameColumn'
+        drop-columns:
+          type: array
+          items:
+            type: string
+        update-column-types:
+          type: array
+          items:
+            $ref: '#/components/schemas/UpdateColumnType'
+        update-column-nullabilities:
+          type: array
+          items:
+            $ref: '#/components/schemas/UpdateColumnNullability'
+        update-column-comments:
+          type: array
+          items:
+            $ref: '#/components/schemas/UpdateColumnComment'
+        update-column-positions:
+          type: array
+          items:
+            $ref: '#/components/schemas/Move'
+    UpdateColumnComment:
+      type: object
+      properties:
+        field-names:
+          type: array
+          items:
+            type: string
+        new-comment:
+          type: string
+    UpdateColumnNullability:
+      type: object
+      properties:
+        field-names:
+          type: array
+          items:
+            type: string
+        new-nullability:
+          type: boolean
+    UpdateColumnType:
+      type: object
+      properties:
+        field-names:
+          type: array
+          items:
+            type: string
+        new-data-types:
+          $ref: '#/components/schemas/DataType'
+        keep-nullability:
+          type: boolean
+    UpdateTableRequest:
+      type: object
+      properties:
+        identifier-change:
+          $ref: '#/components/schemas/Identifier'
+        schema-changes:
+          $ref: '#/components/schemas/SchemaChanges'
     AlterDatabaseRequest:
       type: object
       properties:
@@ -247,18 +602,13 @@ components:
           type: array
           items:
             type: string
-    DatabaseName:
-      type: object
-      properties:
-        name:
-          type: string
     ListDatabasesResponse:
       type: object
       properties:
         databases:
           type: array
           items:
-            $ref: '#/components/schemas/DatabaseName'
+            type: string
     GetDatabaseResponse:
       type: object
       properties:
@@ -268,6 +618,13 @@ components:
           type: object
           additionalProperties:
             type: string
+    ListTablesResponse:
+      type: object
+      properties:
+        tables:
+          type: array
+          items:
+            type: string
     ConfigResponse:
       type: object
       properties:
@@ -279,3 +636,8 @@ components:
           type: object
           additionalProperties:
             type: string
+
+  securitySchemes:
+    BearerAuth:
+      type: http
+      scheme: bearer
diff --git 
a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
 
b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
index 5331b65d71..0341b2556e 100644
--- 
a/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
+++ 
b/paimon-open-api/src/main/java/org/apache/paimon/open/api/RESTCatalogController.java
@@ -21,13 +21,17 @@ package org.apache.paimon.open.api;
 import org.apache.paimon.rest.ResourcePaths;
 import org.apache.paimon.rest.requests.AlterDatabaseRequest;
 import org.apache.paimon.rest.requests.CreateDatabaseRequest;
+import org.apache.paimon.rest.requests.CreateTableRequest;
+import org.apache.paimon.rest.requests.UpdateTableRequest;
 import org.apache.paimon.rest.responses.AlterDatabaseResponse;
 import org.apache.paimon.rest.responses.ConfigResponse;
 import org.apache.paimon.rest.responses.CreateDatabaseResponse;
-import org.apache.paimon.rest.responses.DatabaseName;
 import org.apache.paimon.rest.responses.ErrorResponse;
 import org.apache.paimon.rest.responses.GetDatabaseResponse;
+import org.apache.paimon.rest.responses.GetTableResponse;
 import org.apache.paimon.rest.responses.ListDatabasesResponse;
+import org.apache.paimon.rest.responses.ListTablesResponse;
+import org.apache.paimon.schema.TableSchema;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
@@ -48,7 +52,7 @@ import org.springframework.web.bind.annotation.RestController;
 import java.util.HashMap;
 import java.util.Map;
 
-/** * RESTCatalog management APIs. */
+/** RESTCatalog management APIs. */
 @CrossOrigin(origins = "http://localhost:8081";)
 @RestController
 public class RESTCatalogController {
@@ -86,7 +90,7 @@ public class RESTCatalogController {
     })
     @GetMapping("/v1/{prefix}/databases")
     public ListDatabasesResponse listDatabases(@PathVariable String prefix) {
-        return new ListDatabasesResponse(ImmutableList.of(new 
DatabaseName("account")));
+        return new ListDatabasesResponse(ImmutableList.of("account"));
     }
 
     @Operation(
@@ -181,4 +185,134 @@ public class RESTCatalogController {
                 Lists.newArrayList("add"),
                 Lists.newArrayList("missing"));
     }
+
+    @Operation(
+            summary = "List tables",
+            tags = {"table"})
+    @ApiResponses({
+        @ApiResponse(
+                responseCode = "200",
+                content = {@Content(schema = @Schema(implementation = 
ListTablesResponse.class))}),
+        @ApiResponse(
+                responseCode = "500",
+                content = {@Content(schema = @Schema())})
+    })
+    @GetMapping("/v1/{prefix}/databases/{database}/tables")
+    public ListTablesResponse listTables(
+            @PathVariable String prefix, @PathVariable String database) {
+        return new ListTablesResponse(ImmutableList.of("user"));
+    }
+
+    @Operation(
+            summary = "Get table",
+            tags = {"table"})
+    @ApiResponses({
+        @ApiResponse(
+                responseCode = "200",
+                content = {@Content(schema = @Schema(implementation = 
GetTableResponse.class))}),
+        @ApiResponse(
+                responseCode = "404",
+                description = "Resource not found",
+                content = {@Content(schema = @Schema(implementation = 
ErrorResponse.class))}),
+        @ApiResponse(
+                responseCode = "500",
+                content = {@Content(schema = @Schema())})
+    })
+    @GetMapping("/v1/{prefix}/databases/{database}/tables/{table}")
+    public GetTableResponse getTable(
+            @PathVariable String prefix,
+            @PathVariable String database,
+            @PathVariable String table) {
+        return new GetTableResponse(
+                "location",
+                new TableSchema(
+                        1,
+                        1,
+                        ImmutableList.of(),
+                        1,
+                        ImmutableList.of(),
+                        ImmutableList.of(),
+                        new HashMap<>(),
+                        "comment",
+                        1L));
+    }
+
+    @Operation(
+            summary = "Create table",
+            tags = {"table"})
+    @ApiResponses({
+        @ApiResponse(
+                responseCode = "200",
+                content = {@Content(schema = @Schema(implementation = 
GetTableResponse.class))}),
+        @ApiResponse(
+                responseCode = "500",
+                content = {@Content(schema = @Schema())})
+    })
+    @PostMapping("/v1/{prefix}/databases/{database}/tables")
+    public GetTableResponse createTable(
+            @PathVariable String prefix,
+            @PathVariable String database,
+            @RequestBody CreateTableRequest request) {
+        return new GetTableResponse(
+                "location",
+                new TableSchema(
+                        1,
+                        1,
+                        ImmutableList.of(),
+                        1,
+                        ImmutableList.of(),
+                        ImmutableList.of(),
+                        new HashMap<>(),
+                        "comment",
+                        1L));
+    }
+
+    @Operation(
+            summary = "Update table",
+            tags = {"table"})
+    @ApiResponses({
+        @ApiResponse(
+                responseCode = "200",
+                content = {@Content(schema = @Schema(implementation = 
GetTableResponse.class))}),
+        @ApiResponse(
+                responseCode = "500",
+                content = {@Content(schema = @Schema())})
+    })
+    @PostMapping("/v1/{prefix}/databases/{database}/tables/{table}")
+    public GetTableResponse updateTable(
+            @PathVariable String prefix,
+            @PathVariable String database,
+            @PathVariable String table,
+            @RequestBody UpdateTableRequest request) {
+        return new GetTableResponse(
+                "location",
+                new TableSchema(
+                        1,
+                        1,
+                        ImmutableList.of(),
+                        1,
+                        ImmutableList.of(),
+                        ImmutableList.of(),
+                        new HashMap<>(),
+                        "comment",
+                        1L));
+    }
+
+    @Operation(
+            summary = "Drop table",
+            tags = {"table"})
+    @ApiResponses({
+        @ApiResponse(
+                responseCode = "404",
+                description = "Resource not found",
+                content = {@Content(schema = @Schema(implementation = 
ErrorResponse.class))}),
+        @ApiResponse(
+                responseCode = "500",
+                content = {@Content(schema = @Schema())})
+    })
+    @DeleteMapping("/v1/{prefix}/databases/{database}/tables/table")
+    public void dropTable(
+            @PathVariable String prefix,
+            @PathVariable String database,
+            @PathVariable String table) {}
 }
diff --git 
a/paimon-open-api/src/main/java/org/apache/paimon/open/api/config/OpenAPIConfig.java
 
b/paimon-open-api/src/main/java/org/apache/paimon/open/api/config/OpenAPIConfig.java
index 71ac066d4a..0074cde627 100644
--- 
a/paimon-open-api/src/main/java/org/apache/paimon/open/api/config/OpenAPIConfig.java
+++ 
b/paimon-open-api/src/main/java/org/apache/paimon/open/api/config/OpenAPIConfig.java
@@ -22,6 +22,7 @@ import io.swagger.v3.oas.models.OpenAPI;
 import io.swagger.v3.oas.models.info.Info;
 import io.swagger.v3.oas.models.info.License;
 import io.swagger.v3.oas.models.responses.ApiResponses;
+import io.swagger.v3.oas.models.security.SecurityRequirement;
 import io.swagger.v3.oas.models.servers.Server;
 import org.springdoc.core.customizers.OpenApiCustomiser;
 import org.springframework.beans.factory.annotation.Value;
@@ -56,9 +57,11 @@ public class OpenAPIConfig {
                         .version("1.0")
                         .description("This API exposes endpoints to 
RESTCatalog.")
                         .license(mitLicense);
+        SecurityRequirement securityRequirement = new SecurityRequirement();
+        securityRequirement.addList("BearerAuth");
         List<Server> servers = new ArrayList<>();
         servers.add(server);
-        return new OpenAPI().info(info).servers(servers);
+        return new 
OpenAPI().info(info).servers(servers).addSecurityItem(securityRequirement);
     }
 
     /** Sort response alphabetically. So the api generate will in same order 
everytime. */


Reply via email to