This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ce0f6bf770 [core] RESTCatalog refactor RESTCatalogServer and add  test 
(#5143)
ce0f6bf770 is described below

commit ce0f6bf770a31e8757a42e4e75e325a85e175d19
Author: jerry <[email protected]>
AuthorDate: Fri Feb 28 14:13:48 2025 +0800

    [core] RESTCatalog refactor RESTCatalogServer and add  test (#5143)
---
 docs/content/spark/sql-ddl.md                      |  44 ++
 .../org/apache/paimon/rest/DataTokenProvider.java  |  61 ++
 .../rest/MetadataInMemoryFileSystemCatalog.java    | 443 -------------
 .../org/apache/paimon/rest/RESTCatalogServer.java  | 725 +++++++++++++--------
 .../org/apache/paimon/rest/RESTCatalogTest.java    | 120 +++-
 .../org/apache/paimon/flink/RESTCatalogITCase.java |  10 +
 paimon-spark/paimon-spark-ut/pom.xml               |   7 +
 .../paimon/spark/SparkCatalogWithRestTest.java     |  84 +++
 8 files changed, 781 insertions(+), 713 deletions(-)

diff --git a/docs/content/spark/sql-ddl.md b/docs/content/spark/sql-ddl.md
index cfe105f6ac..8be5304d54 100644
--- a/docs/content/spark/sql-ddl.md
+++ b/docs/content/spark/sql-ddl.md
@@ -116,6 +116,50 @@ spark-sql ... \
     
 ```
 
+```sql
+USE paimon.default;
+```
+#### Creating REST Catalog
+
+By using the Paimon REST catalog, changes to the catalog will be directly 
stored in remote server.
+
+##### bear token
+```bash
+spark-sql ... \
+    --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
+    --conf spark.sql.catalog.paimon.metastore=rest \
+    --conf spark.sql.catalog.paimon.uri=<catalog server url> \
+    --conf spark.sql.catalog.paimon.token.provider=bear \
+    --conf spark.sql.catalog.paimon.token=<token>
+    
+```
+
+##### dlf ak
+```bash
+spark-sql ... \
+    --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
+    --conf spark.sql.catalog.paimon.metastore=rest \
+    --conf spark.sql.catalog.paimon.uri=<catalog server url> \
+    --conf spark.sql.catalog.paimon.token.provider=dlf \
+    --conf spark.sql.catalog.paimon.dlf.accessKeyId=<accessKeyId> \
+    --conf spark.sql.catalog.paimon.dlf.accessKeySecret=<accessKeySecret>
+    
+```
+
+##### dlf sts token
+```bash
+spark-sql ... \
+    --conf spark.sql.catalog.paimon=org.apache.paimon.spark.SparkCatalog \
+    --conf spark.sql.catalog.paimon.metastore=rest \
+    --conf spark.sql.catalog.paimon.uri=<catalog server url> \
+    --conf spark.sql.catalog.paimon.token.provider=dlf \
+    --conf spark.sql.catalog.paimon.dlf.accessKeyId=<accessKeyId> \
+    --conf spark.sql.catalog.paimon.dlf.accessKeySecret=<accessKeySecret> \
+    --conf spark.sql.catalog.paimon.dlf.securityToken=<securityToken> 
+    
+    
+```
+
 ```sql
 USE paimon.default;
 ```
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/DataTokenProvider.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/DataTokenProvider.java
new file mode 100644
index 0000000000..081ada0fba
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/DataTokenProvider.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest;
+
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+
+/** Refresh data token in test mode. */
+public class DataTokenProvider {
+
+    private Map<String, String> token;
+    private long expiresAtMillis;
+
+    public DataTokenProvider(Map<String, String> token, long expiresAtMillis) {
+        this.token = token;
+        this.expiresAtMillis = expiresAtMillis;
+    }
+
+    public void setExpiresAtMillis(long expiresAtMillis) {
+        this.expiresAtMillis = expiresAtMillis;
+    }
+
+    public Map<String, String> getToken() {
+        return token;
+    }
+
+    public long getExpiresAtMillis() {
+        return expiresAtMillis;
+    }
+
+    public void setToken(Map<String, String> token) {
+        this.token = token;
+    }
+
+    public void refresh() {
+        this.token =
+                ImmutableMap.of(
+                        "ak",
+                        "ak-" + System.currentTimeMillis(),
+                        "sk",
+                        "sk-" + System.currentTimeMillis());
+        this.expiresAtMillis = System.currentTimeMillis();
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/MetadataInMemoryFileSystemCatalog.java
 
b/paimon-core/src/test/java/org/apache/paimon/rest/MetadataInMemoryFileSystemCatalog.java
deleted file mode 100644
index bd52d7bf10..0000000000
--- 
a/paimon-core/src/test/java/org/apache/paimon/rest/MetadataInMemoryFileSystemCatalog.java
+++ /dev/null
@@ -1,443 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.rest;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.Snapshot;
-import org.apache.paimon.TableType;
-import org.apache.paimon.catalog.CatalogContext;
-import org.apache.paimon.catalog.CatalogFactory;
-import org.apache.paimon.catalog.Database;
-import org.apache.paimon.catalog.FileSystemCatalog;
-import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.catalog.PropertyChange;
-import org.apache.paimon.catalog.SupportsSnapshots;
-import org.apache.paimon.catalog.TableMetadata;
-import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.partition.Partition;
-import org.apache.paimon.schema.Schema;
-import org.apache.paimon.schema.SchemaChange;
-import org.apache.paimon.schema.SchemaManager;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.utils.Pair;
-import org.apache.paimon.view.View;
-
-import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
-
-import java.io.IOException;
-import java.io.UncheckedIOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.stream.Collectors;
-
-import static org.apache.paimon.CoreOptions.PATH;
-
-/** A catalog for testing RESTCatalog. */
-public class MetadataInMemoryFileSystemCatalog extends FileSystemCatalog
-        implements SupportsSnapshots {
-
-    public final Map<String, Database> databaseStore;
-    public final Map<String, TableMetadata> tableMetadataStore;
-    public final Map<String, List<Partition>> tablePartitionsStore;
-    public final Map<String, View> viewStore;
-    public final Map<String, Snapshot> tableSnapshotStore;
-    public final Map<String, RESTToken> dataTokenStore;
-    public FileSystemCatalog fileSystemCatalog;
-
-    public MetadataInMemoryFileSystemCatalog(
-            FileIO fileIO,
-            Path warehouse,
-            Options options,
-            Map<String, Database> databaseStore,
-            Map<String, TableMetadata> tableMetadataStore,
-            Map<String, Snapshot> tableSnapshotStore,
-            Map<String, List<Partition>> tablePartitionsStore,
-            Map<String, View> viewStore,
-            Map<String, RESTToken> dataTokenStore) {
-        super(fileIO, warehouse, options);
-        this.fileSystemCatalog = new FileSystemCatalog(fileIO, warehouse, 
options);
-        this.databaseStore = databaseStore;
-        this.tableMetadataStore = tableMetadataStore;
-        this.tablePartitionsStore = tablePartitionsStore;
-        this.tableSnapshotStore = tableSnapshotStore;
-        this.viewStore = viewStore;
-        this.dataTokenStore = dataTokenStore;
-    }
-
-    public static MetadataInMemoryFileSystemCatalog create(
-            CatalogContext context,
-            Map<String, Database> databaseStore,
-            Map<String, TableMetadata> tableMetadataStore,
-            Map<String, Snapshot> tableSnapshotStore,
-            Map<String, List<Partition>> tablePartitionsStore,
-            Map<String, View> viewStore,
-            Map<String, RESTToken> dataTokenStore) {
-        String warehouse = 
CatalogFactory.warehouse(context).toUri().toString();
-
-        Path warehousePath = new Path(warehouse);
-        FileIO fileIO;
-
-        try {
-            fileIO = FileIO.get(warehousePath, context);
-            fileIO.checkOrMkdirs(warehousePath);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-
-        return new MetadataInMemoryFileSystemCatalog(
-                fileIO,
-                warehousePath,
-                context.options(),
-                databaseStore,
-                tableMetadataStore,
-                tableSnapshotStore,
-                tablePartitionsStore,
-                viewStore,
-                dataTokenStore);
-    }
-
-    @Override
-    public List<String> listDatabases() {
-        return new ArrayList<>(databaseStore.keySet());
-    }
-
-    @Override
-    protected void createDatabaseImpl(String name, Map<String, String> 
properties) {
-        super.createDatabaseImpl(name, properties);
-        databaseStore.put(name, Database.of(name, properties, null));
-    }
-
-    @Override
-    public Database getDatabaseImpl(String name) throws 
DatabaseNotExistException {
-        if (databaseStore.containsKey(name)) {
-            return databaseStore.get(name);
-        }
-        throw new DatabaseNotExistException(name);
-    }
-
-    @Override
-    protected void dropDatabaseImpl(String name) {
-        super.dropDatabaseImpl(name);
-        databaseStore.remove(name);
-    }
-
-    protected void alterDatabaseImpl(String name, List<PropertyChange> changes)
-            throws DatabaseNotExistException {
-        if (databaseStore.containsKey(name)) {
-            Pair<Map<String, String>, Set<String>> setPropertiesToRemoveKeys =
-                    PropertyChange.getSetPropertiesToRemoveKeys(changes);
-            Map<String, String> setProperties = 
setPropertiesToRemoveKeys.getLeft();
-            Set<String> removeKeys = setPropertiesToRemoveKeys.getRight();
-            Database database = databaseStore.get(name);
-            Map<String, String> parameter = new HashMap<>(database.options());
-            if (!setProperties.isEmpty()) {
-                parameter.putAll(setProperties);
-            }
-            if (!removeKeys.isEmpty()) {
-                parameter.keySet().removeAll(removeKeys);
-            }
-            Database alterDatabase = Database.of(name, parameter, null);
-            databaseStore.put(name, alterDatabase);
-        } else {
-            throw new DatabaseNotExistException(name);
-        }
-    }
-
-    @Override
-    protected List<String> listTablesImpl(String databaseName) {
-        List<String> tables = new ArrayList<>();
-        for (Map.Entry<String, TableMetadata> entry : 
tableMetadataStore.entrySet()) {
-            Identifier identifier = Identifier.fromString(entry.getKey());
-            if (databaseName.equals(identifier.getDatabaseName())) {
-                tables.add(identifier.getTableName());
-            }
-        }
-        return tables;
-    }
-
-    @Override
-    public void createTableImpl(Identifier identifier, Schema schema) {
-        super.createTableImpl(identifier, schema);
-        try {
-            TableMetadata tableMetadata =
-                    createTableMetadata(
-                            identifier, 1L, schema, 
UUID.randomUUID().toString(), false);
-            tableMetadataStore.put(identifier.getFullName(), tableMetadata);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private TableMetadata createTableMetadata(
-            Identifier identifier, long schemaId, Schema schema, String uuid, 
boolean isExternal) {
-        Map<String, String> options = new HashMap<>(schema.options());
-        Path path = getTableLocation(identifier);
-        options.put(PATH.key(), path.toString());
-        TableSchema tableSchema =
-                new TableSchema(
-                        schemaId,
-                        schema.fields(),
-                        schema.fields().size() - 1,
-                        schema.partitionKeys(),
-                        schema.primaryKeys(),
-                        options,
-                        schema.comment());
-        TableMetadata tableMetadata = new TableMetadata(tableSchema, 
isExternal, uuid);
-        return tableMetadata;
-    }
-
-    @Override
-    protected void dropTableImpl(Identifier identifier, List<Path> 
externalPaths) {
-        if (tableMetadataStore.containsKey(identifier.getFullName())) {
-            tableMetadataStore.remove(identifier.getFullName());
-            super.dropTableImpl(identifier, externalPaths);
-        }
-    }
-
-    @Override
-    public void renameTableImpl(Identifier fromTable, Identifier toTable) {
-        if (tableMetadataStore.containsKey(fromTable.getFullName())) {
-            super.renameTableImpl(fromTable, toTable);
-            TableMetadata tableMetadata = 
tableMetadataStore.get(fromTable.getFullName());
-            tableMetadataStore.remove(fromTable.getFullName());
-            tableMetadataStore.put(toTable.getFullName(), tableMetadata);
-        }
-    }
-
-    @Override
-    protected void alterTableImpl(Identifier identifier, List<SchemaChange> 
changes)
-            throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
-        if (tableMetadataStore.containsKey(identifier.getFullName())) {
-            TableMetadata tableMetadata = 
tableMetadataStore.get(identifier.getFullName());
-            TableSchema schema = tableMetadata.schema();
-            Options options = Options.fromMap(schema.options());
-            if (options.get(CoreOptions.TYPE) == TableType.FORMAT_TABLE) {
-                throw new UnsupportedOperationException("Only data table 
support alter table.");
-            }
-            SchemaManager schemaManager = schemaManager(identifier);
-            try {
-                TableSchema newSchema =
-                        runWithLock(identifier, () -> 
schemaManager.commitChanges(changes));
-                TableMetadata newTableMetadata =
-                        createTableMetadata(
-                                identifier,
-                                newSchema.id(),
-                                newSchema.toSchema(),
-                                tableMetadata.uuid(),
-                                tableMetadata.isExternal());
-                tableMetadataStore.put(identifier.getFullName(), 
newTableMetadata);
-            } catch (TableNotExistException
-                    | ColumnAlreadyExistException
-                    | ColumnNotExistException
-                    | RuntimeException e) {
-                throw e;
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    private SchemaManager schemaManager(Identifier identifier) {
-        Path path = getTableLocation(identifier);
-        return new SchemaManager(fileIO, path, 
identifier.getBranchNameOrDefault());
-    }
-
-    @Override
-    public void createFormatTable(Identifier identifier, Schema schema) {
-        TableMetadata tableMetadata =
-                createTableMetadata(identifier, 1L, schema, 
UUID.randomUUID().toString(), true);
-        tableMetadataStore.put(identifier.getFullName(), tableMetadata);
-    }
-
-    @Override
-    public TableSchema loadTableSchema(Identifier identifier) throws 
TableNotExistException {
-        if (tableMetadataStore.containsKey(identifier.getFullName())) {
-            return tableMetadataStore.get(identifier.getFullName()).schema();
-        }
-        throw new TableNotExistException(identifier);
-    }
-
-    @Override
-    protected TableMetadata loadTableMetadata(Identifier identifier) throws 
TableNotExistException {
-        if (tableMetadataStore.containsKey(identifier.getFullName())) {
-            return tableMetadataStore.get(identifier.getFullName());
-        }
-        throw new TableNotExistException(identifier);
-    }
-
-    @Override
-    public void createPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
-            throws TableNotExistException {
-        getTable(identifier);
-        tablePartitionsStore.put(
-                identifier.getFullName(),
-                partitions.stream()
-                        .map(partition -> spec2Partition(partition))
-                        .collect(Collectors.toList()));
-    }
-
-    @Override
-    public void dropPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
-            throws TableNotExistException {
-        getTable(identifier);
-        List<Partition> existPartitions = 
tablePartitionsStore.get(identifier.getFullName());
-        partitions.forEach(
-                partition -> {
-                    for (Map.Entry<String, String> entry : 
partition.entrySet()) {
-                        existPartitions.stream()
-                                .filter(
-                                        p ->
-                                                
p.spec().containsKey(entry.getKey())
-                                                        && p.spec()
-                                                                
.get(entry.getKey())
-                                                                
.equals(entry.getValue()))
-                                .findFirst()
-                                .ifPresent(
-                                        existPartition -> 
existPartitions.remove(existPartition));
-                    }
-                });
-    }
-
-    @Override
-    public void alterPartitions(Identifier identifier, List<Partition> 
partitions)
-            throws TableNotExistException {
-        getTable(identifier);
-        List<Partition> existPartitions = 
tablePartitionsStore.get(identifier.getFullName());
-        partitions.forEach(
-                partition -> {
-                    for (Map.Entry<String, String> entry : 
partition.spec().entrySet()) {
-                        existPartitions.stream()
-                                .filter(
-                                        p ->
-                                                
p.spec().containsKey(entry.getKey())
-                                                        && p.spec()
-                                                                
.get(entry.getKey())
-                                                                
.equals(entry.getValue()))
-                                .findFirst()
-                                .ifPresent(
-                                        existPartition -> 
existPartitions.remove(existPartition));
-                    }
-                });
-        existPartitions.addAll(partitions);
-        tablePartitionsStore.put(identifier.getFullName(), existPartitions);
-    }
-
-    @Override
-    public List<Partition> listPartitions(Identifier identifier) throws 
TableNotExistException {
-        getTable(identifier);
-        return tablePartitionsStore.get(identifier.getFullName());
-    }
-
-    @Override
-    public View getView(Identifier identifier) throws ViewNotExistException {
-        if (viewStore.containsKey(identifier.getFullName())) {
-            return viewStore.get(identifier.getFullName());
-        }
-        throw new ViewNotExistException(identifier);
-    }
-
-    @Override
-    public void dropView(Identifier identifier, boolean ignoreIfNotExists)
-            throws ViewNotExistException {
-        if (viewStore.containsKey(identifier.getFullName())) {
-            viewStore.remove(identifier.getFullName());
-        }
-        if (!ignoreIfNotExists) {
-            throw new ViewNotExistException(identifier);
-        }
-    }
-
-    @Override
-    public void createView(Identifier identifier, View view, boolean 
ignoreIfExists)
-            throws ViewAlreadyExistException, DatabaseNotExistException {
-        getDatabase(identifier.getDatabaseName());
-        if (viewStore.containsKey(identifier.getFullName()) && 
!ignoreIfExists) {
-            throw new ViewAlreadyExistException(identifier);
-        }
-        viewStore.put(identifier.getFullName(), view);
-    }
-
-    @Override
-    public List<String> listViews(String databaseName) throws 
DatabaseNotExistException {
-        getDatabase(databaseName);
-        return viewStore.keySet().stream()
-                .map(Identifier::fromString)
-                .filter(identifier -> 
identifier.getDatabaseName().equals(databaseName))
-                .map(Identifier::getTableName)
-                .collect(Collectors.toList());
-    }
-
-    @Override
-    public void renameView(Identifier fromView, Identifier toView, boolean 
ignoreIfNotExists)
-            throws ViewNotExistException, ViewAlreadyExistException {
-        if (!viewStore.containsKey(fromView.getFullName()) && 
!ignoreIfNotExists) {
-            throw new ViewNotExistException(fromView);
-        }
-        if (viewStore.containsKey(toView.getFullName())) {
-            throw new ViewAlreadyExistException(toView);
-        }
-        if (viewStore.containsKey(fromView.getFullName())) {
-            View view = viewStore.get(fromView.getFullName());
-            viewStore.remove(fromView.getFullName());
-            viewStore.put(toView.getFullName(), view);
-        }
-    }
-
-    @Override
-    public boolean commitSnapshot(
-            Identifier identifier, Snapshot snapshot, List<Partition> 
statistics)
-            throws TableNotExistException {
-        tableSnapshotStore.put(identifier.getFullName(), snapshot);
-        return false;
-    }
-
-    @Override
-    public Optional<Snapshot> loadSnapshot(Identifier identifier) throws 
TableNotExistException {
-        return 
Optional.ofNullable(tableSnapshotStore.get(identifier.getFullName()));
-    }
-
-    public RESTToken getToken(Identifier identifier) {
-        if (dataTokenStore.containsKey(identifier.getFullName())) {
-            return dataTokenStore.get(identifier.getFullName());
-        }
-        long currentTimeMillis = System.currentTimeMillis();
-        RESTToken token =
-                new RESTToken(
-                        ImmutableMap.of(
-                                "akId",
-                                "akId" + currentTimeMillis,
-                                "akSecret",
-                                "akSecret" + currentTimeMillis),
-                        currentTimeMillis);
-        dataTokenStore.put(identifier.getFullName(), token);
-        return dataTokenStore.get(identifier.getFullName());
-    }
-
-    private Partition spec2Partition(Map<String, String> spec) {
-        // todo: need update
-        return new Partition(spec, 123, 456, 789, 123);
-    }
-}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 15de6189c1..30d7b0a598 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -23,10 +23,13 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.Database;
+import org.apache.paimon.catalog.FileSystemCatalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.catalog.PropertyChange;
 import org.apache.paimon.catalog.RenamingSnapshotCommit;
 import org.apache.paimon.catalog.TableMetadata;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
@@ -61,11 +64,11 @@ import 
org.apache.paimon.rest.responses.ListPartitionsResponse;
 import org.apache.paimon.rest.responses.ListTablesResponse;
 import org.apache.paimon.rest.responses.ListViewsResponse;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.FormatTable;
-import org.apache.paimon.table.Table;
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.utils.BranchManager;
+import org.apache.paimon.utils.Pair;
 import org.apache.paimon.view.View;
 import org.apache.paimon.view.ViewImpl;
 import org.apache.paimon.view.ViewSchema;
@@ -76,17 +79,23 @@ import okhttp3.mockwebserver.Dispatcher;
 import okhttp3.mockwebserver.MockResponse;
 import okhttp3.mockwebserver.MockWebServer;
 import okhttp3.mockwebserver.RecordedRequest;
+import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
 
 import java.io.IOException;
+import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.CoreOptions.TYPE;
+import static org.apache.paimon.TableType.FORMAT_TABLE;
 import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER;
 
 /** Mock REST server for testing. */
@@ -95,7 +104,7 @@ public class RESTCatalogServer {
     private static final String PREFIX = "paimon";
     private static final String DATABASE_URI = 
String.format("/v1/%s/databases", PREFIX);
 
-    private final MetadataInMemoryFileSystemCatalog catalog;
+    private final FileSystemCatalog catalog;
     private final Dispatcher dispatcher;
     private final MockWebServer server;
     private final String authToken;
@@ -111,16 +120,17 @@ public class RESTCatalogServer {
         authToken = initToken;
         Options conf = new Options();
         conf.setString("warehouse", warehouse);
-        this.catalog =
-                MetadataInMemoryFileSystemCatalog.create(
-                        CatalogContext.create(conf),
-                        databaseStore,
-                        tableMetadataStore,
-                        tableSnapshotStore,
-                        tablePartitionsStore,
-                        viewStore,
-                        dataTokenStore);
-        this.dispatcher = initDispatcher(catalog, warehouse, authToken);
+        CatalogContext context = CatalogContext.create(conf);
+        Path warehousePath = new Path(warehouse);
+        FileIO fileIO;
+        try {
+            fileIO = FileIO.get(warehousePath, context);
+            fileIO.checkOrMkdirs(warehousePath);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+        this.catalog = new FileSystemCatalog(fileIO, warehousePath, 
context.options());
+        this.dispatcher = initDispatcher(warehouse, authToken);
         MockWebServer mockWebServer = new MockWebServer();
         mockWebServer.setDispatcher(dispatcher);
         server = mockWebServer;
@@ -146,8 +156,7 @@ public class RESTCatalogServer {
         dataTokenStore.put(identifier.getFullName(), token);
     }
 
-    public static Dispatcher initDispatcher(
-            MetadataInMemoryFileSystemCatalog catalog, String warehouse, 
String authToken) {
+    public Dispatcher initDispatcher(String warehouse, String authToken) {
         return new Dispatcher() {
             @Override
             public MockResponse dispatch(RecordedRequest request) {
@@ -159,17 +168,27 @@ public class RESTCatalogServer {
                         return new MockResponse().setResponseCode(401);
                     }
                     if (request.getPath().startsWith("/v1/config")) {
-                        return new MockResponse()
-                                .setResponseCode(200)
-                                .setBody(getConfigBody(warehouse));
+                        String body =
+                                String.format(
+                                        "{\"defaults\": {\"%s\": \"%s\", 
\"%s\": \"%s\", \"%s\": \"%s\"}}",
+                                        
RESTCatalogInternalOptions.PREFIX.key(),
+                                        PREFIX,
+                                        CatalogOptions.WAREHOUSE.key(),
+                                        warehouse,
+                                        "header.test-header",
+                                        "test-value");
+                        return new 
MockResponse().setResponseCode(200).setBody(body);
                     } else if (DATABASE_URI.equals(request.getPath())) {
-                        return databasesApiHandler(catalog, request);
+                        return databasesApiHandler(request);
                     } else if (request.getPath().startsWith(DATABASE_URI)) {
                         String[] resources =
                                 request.getPath()
                                         .substring((DATABASE_URI + 
"/").length())
                                         .split("/");
                         String databaseName = resources[0];
+                        if (!databaseStore.containsKey(databaseName)) {
+                            throw new 
Catalog.DatabaseNotExistException(databaseName);
+                        }
                         boolean isViews = resources.length == 2 && 
"views".equals(resources[1]);
                         boolean isTables = resources.length == 2 && 
"tables".equals(resources[1]);
                         boolean isTableRename =
@@ -226,44 +245,28 @@ public class RESTCatalogServer {
                                 resources.length >= 4
                                         && "tables".equals(resources[1])
                                         && "branches".equals(resources[3]);
-                        if (isDropPartitions) {
+                        Identifier identifier =
+                                resources.length >= 3
+                                        ? Identifier.create(databaseName, 
resources[2])
+                                        : null;
+                        // validate partition
+                        if (isPartitions
+                                || isDropPartitions
+                                || isAlterPartitions
+                                || isMarkDonePartitions) {
                             String tableName = resources[2];
-                            Identifier identifier = 
Identifier.create(databaseName, tableName);
                             Optional<MockResponse> error =
-                                    checkTablePartitioned(catalog, identifier);
+                                    checkTablePartitioned(
+                                            Identifier.create(databaseName, 
tableName));
                             if (error.isPresent()) {
                                 return error.get();
                             }
-                            DropPartitionsRequest dropPartitionsRequest =
-                                    OBJECT_MAPPER.readValue(
-                                            request.getBody().readUtf8(),
-                                            DropPartitionsRequest.class);
-                            catalog.dropPartitions(
-                                    identifier, 
dropPartitionsRequest.getPartitionSpecs());
-                            return new MockResponse().setResponseCode(200);
+                        }
+                        if (isDropPartitions) {
+                            return dropPartitionsHandle(identifier, request);
                         } else if (isAlterPartitions) {
-                            String tableName = resources[2];
-                            Identifier identifier = 
Identifier.create(databaseName, tableName);
-                            Optional<MockResponse> error =
-                                    checkTablePartitioned(catalog, identifier);
-                            if (error.isPresent()) {
-                                return error.get();
-                            }
-                            AlterPartitionsRequest alterPartitionsRequest =
-                                    OBJECT_MAPPER.readValue(
-                                            request.getBody().readUtf8(),
-                                            AlterPartitionsRequest.class);
-                            catalog.alterPartitions(
-                                    identifier, 
alterPartitionsRequest.getPartitions());
-                            return new MockResponse().setResponseCode(200);
+                            return alterPartitionsHandle(identifier, request);
                         } else if (isMarkDonePartitions) {
-                            String tableName = resources[2];
-                            Identifier identifier = 
Identifier.create(databaseName, tableName);
-                            Optional<MockResponse> error =
-                                    checkTablePartitioned(catalog, identifier);
-                            if (error.isPresent()) {
-                                return error.get();
-                            }
                             MarkDonePartitionsRequest 
markDonePartitionsRequest =
                                     OBJECT_MAPPER.readValue(
                                             request.getBody().readUtf8(),
@@ -272,17 +275,8 @@ public class RESTCatalogServer {
                                     identifier, 
markDonePartitionsRequest.getPartitionSpecs());
                             return new MockResponse().setResponseCode(200);
                         } else if (isPartitions) {
-                            String tableName = resources[2];
-                            Optional<MockResponse> error =
-                                    checkTablePartitioned(
-                                            catalog, 
Identifier.create(databaseName, tableName));
-                            if (error.isPresent()) {
-                                return error.get();
-                            }
-                            return partitionsApiHandler(catalog, request, 
databaseName, tableName);
+                            return partitionsApiHandler(request, identifier);
                         } else if (isBranches) {
-                            String tableName = resources[2];
-                            Identifier identifier = 
Identifier.create(databaseName, tableName);
                             FileStoreTable table = (FileStoreTable) 
catalog.getTable(identifier);
                             BranchManager branchManager = 
table.branchManager();
                             switch (request.getMethod()) {
@@ -318,56 +312,25 @@ public class RESTCatalogServer {
                                     return new 
MockResponse().setResponseCode(404);
                             }
                         } else if (isTableToken) {
-                            RESTToken dataToken =
-                                    
catalog.getToken(Identifier.create(databaseName, resources[2]));
-                            GetTableTokenResponse getTableTokenResponse =
-                                    new GetTableTokenResponse(
-                                            dataToken.token(), 
dataToken.expireAtMillis());
-                            return new MockResponse()
-                                    .setResponseCode(200)
-                                    .setBody(
-                                            OBJECT_MAPPER.writeValueAsString(
-                                                    getTableTokenResponse));
+                            return handleDataToken(identifier);
                         } else if (isTableSnapshot) {
-                            String tableName = resources[2];
-                            Optional<Snapshot> snapshotOptional =
-                                    catalog.loadSnapshot(
-                                            Identifier.create(databaseName, 
tableName));
-                            if (!snapshotOptional.isPresent()) {
-                                response =
-                                        new ErrorResponse(
-                                                
ErrorResponseResourceType.SNAPSHOT,
-                                                databaseName,
-                                                "No Snapshot",
-                                                404);
-                                return mockResponse(response, 404);
-                            }
-                            GetTableSnapshotResponse getTableSnapshotResponse =
-                                    new 
GetTableSnapshotResponse(snapshotOptional.get());
-                            return new MockResponse()
-                                    .setResponseCode(200)
-                                    .setBody(
-                                            OBJECT_MAPPER.writeValueAsString(
-                                                    getTableSnapshotResponse));
+                            return handleSnapshot(identifier);
                         } else if (isTableRename) {
-                            return renameTableApiHandler(catalog, request);
+                            return renameTableApiHandler(request);
                         } else if (isTableCommit) {
-                            return commitTableApiHandler(
-                                    catalog, request, databaseName, 
resources[2]);
+                            return commitTableApiHandler(request);
                         } else if (isTable) {
-                            String tableName = resources[2];
-                            return tableApiHandler(catalog, request, 
databaseName, tableName);
+                            return tableApiHandler(request, identifier);
                         } else if (isTables) {
-                            return tablesApiHandler(catalog, request, 
databaseName);
+                            return tablesApiHandler(request, databaseName);
                         } else if (isViews) {
-                            return viewsApiHandler(catalog, request, 
databaseName);
+                            return viewsApiHandler(request, databaseName);
                         } else if (isViewRename) {
-                            return renameViewApiHandler(catalog, request);
+                            return renameViewApiHandler(request);
                         } else if (isView) {
-                            String viewName = resources[2];
-                            return viewApiHandler(catalog, request, 
databaseName, viewName);
+                            return viewApiHandler(request, identifier);
                         } else {
-                            return databaseApiHandler(catalog, request, 
databaseName);
+                            return databaseApiHandler(request, databaseName);
                         }
                     }
                     return new MockResponse().setResponseCode(404);
@@ -458,31 +421,69 @@ public class RESTCatalogServer {
         };
     }
 
-    private static Optional<MockResponse> checkTablePartitioned(
-            Catalog catalog, Identifier identifier) {
-        Table table;
-        try {
-            table = catalog.getTable(identifier);
-        } catch (Catalog.TableNotExistException e) {
-            return Optional.of(
-                    mockResponse(
-                            new ErrorResponse(ErrorResponseResourceType.TABLE, 
null, "", 404),
-                            404));
+    private MockResponse handleDataToken(Identifier tableIdentifier) throws 
Exception {
+        RESTToken dataToken;
+        if (dataTokenStore.containsKey(tableIdentifier.getFullName())) {
+            dataToken = dataTokenStore.get(tableIdentifier.getFullName());
+        } else {
+            long currentTimeMillis = System.currentTimeMillis();
+            dataToken =
+                    new RESTToken(
+                            ImmutableMap.of(
+                                    "akId",
+                                    "akId" + currentTimeMillis,
+                                    "akSecret",
+                                    "akSecret" + currentTimeMillis),
+                            currentTimeMillis);
+            dataTokenStore.put(tableIdentifier.getFullName(), dataToken);
         }
-        boolean partitioned = 
CoreOptions.fromMap(table.options()).partitionedTableInMetastore();
-        if (!partitioned) {
-            return Optional.of(mockResponse(new ErrorResponse(null, null, "", 
501), 501));
+        GetTableTokenResponse getTableTokenResponse =
+                new GetTableTokenResponse(dataToken.token(), 
dataToken.expireAtMillis());
+        return new MockResponse()
+                .setResponseCode(200)
+                
.setBody(OBJECT_MAPPER.writeValueAsString(getTableTokenResponse));
+    }
+
+    private MockResponse handleSnapshot(Identifier identifier) throws 
Exception {
+        RESTResponse response;
+        Optional<Snapshot> snapshotOptional =
+                
Optional.ofNullable(tableSnapshotStore.get(identifier.getFullName()));
+        if (!snapshotOptional.isPresent()) {
+            response =
+                    new ErrorResponse(
+                            ErrorResponseResourceType.SNAPSHOT,
+                            identifier.getDatabaseName(),
+                            "No Snapshot",
+                            404);
+            return mockResponse(response, 404);
         }
-        return Optional.empty();
+        GetTableSnapshotResponse getTableSnapshotResponse =
+                new GetTableSnapshotResponse(snapshotOptional.get());
+        return new MockResponse()
+                .setResponseCode(200)
+                
.setBody(OBJECT_MAPPER.writeValueAsString(getTableSnapshotResponse));
     }
 
-    private static MockResponse commitTableApiHandler(
-            Catalog catalog, RecordedRequest request, String databaseName, 
String tableName)
-            throws Exception {
+    private Optional<MockResponse> checkTablePartitioned(Identifier 
identifier) {
+        if (tableMetadataStore.containsKey(identifier.getFullName())) {
+            TableMetadata tableMetadata = 
tableMetadataStore.get(identifier.getFullName());
+            boolean partitioned =
+                    CoreOptions.fromMap(tableMetadata.schema().options())
+                            .partitionedTableInMetastore();
+            if (!partitioned) {
+                return Optional.of(mockResponse(new ErrorResponse(null, null, 
"", 501), 501));
+            }
+            return Optional.empty();
+        }
+        return Optional.of(
+                mockResponse(
+                        new ErrorResponse(ErrorResponseResourceType.TABLE, 
null, "", 404), 404));
+    }
+
+    private MockResponse commitTableApiHandler(RecordedRequest request) throws 
Exception {
         CommitTableRequest requestBody =
                 OBJECT_MAPPER.readValue(request.getBody().readUtf8(), 
CommitTableRequest.class);
-        FileStoreTable table =
-                (FileStoreTable) 
catalog.getTable(Identifier.create(databaseName, tableName));
+        FileStoreTable table = (FileStoreTable) 
catalog.getTable(requestBody.getIdentifier());
         RenamingSnapshotCommit commit =
                 new RenamingSnapshotCommit(table.snapshotManager(), 
Lock.empty());
         String branchName = requestBody.getIdentifier().getBranchName();
@@ -491,16 +492,16 @@ public class RESTCatalogServer {
         }
         boolean success =
                 commit.commit(requestBody.getSnapshot(), branchName, 
Collections.emptyList());
+        commitSnapshot(requestBody.getIdentifier(), requestBody.getSnapshot(), 
null);
         CommitTableResponse response = new CommitTableResponse(success);
         return mockResponse(response, 200);
     }
 
-    private static MockResponse databasesApiHandler(Catalog catalog, 
RecordedRequest request)
-            throws Exception {
+    private MockResponse databasesApiHandler(RecordedRequest request) throws 
Exception {
         RESTResponse response;
         switch (request.getMethod()) {
             case "GET":
-                List<String> databaseNameList = catalog.listDatabases();
+                List<String> databaseNameList = new 
ArrayList<>(databaseStore.keySet());
                 response = new ListDatabasesResponse(databaseNameList);
                 return mockResponse(response, 200);
             case "POST":
@@ -509,6 +510,8 @@ public class RESTCatalogServer {
                                 request.getBody().readUtf8(), 
CreateDatabaseRequest.class);
                 String databaseName = requestBody.getName();
                 catalog.createDatabase(databaseName, false);
+                databaseStore.put(
+                        databaseName, Database.of(databaseName, 
requestBody.getOptions(), null));
                 response = new CreateDatabaseResponse(databaseName, 
requestBody.getOptions());
                 return mockResponse(response, 200);
             default:
@@ -516,124 +519,215 @@ public class RESTCatalogServer {
         }
     }
 
-    private static MockResponse databaseApiHandler(
-            Catalog catalog, RecordedRequest request, String databaseName) 
throws Exception {
+    private MockResponse databaseApiHandler(RecordedRequest request, String 
databaseName)
+            throws Exception {
         RESTResponse response;
-        switch (request.getMethod()) {
-            case "GET":
-                Database database = catalog.getDatabase(databaseName);
-                response =
-                        new GetDatabaseResponse(
-                                UUID.randomUUID().toString(), database.name(), 
database.options());
-                return mockResponse(response, 200);
-            case "DELETE":
-                catalog.dropDatabase(databaseName, false, true);
-                return new MockResponse().setResponseCode(200);
-            case "POST":
-                AlterDatabaseRequest requestBody =
-                        OBJECT_MAPPER.readValue(
-                                request.getBody().readUtf8(), 
AlterDatabaseRequest.class);
-                List<PropertyChange> changes = new ArrayList<>();
-                for (String property : requestBody.getRemovals()) {
-                    changes.add(PropertyChange.removeProperty(property));
-                }
-                for (Map.Entry<String, String> entry : 
requestBody.getUpdates().entrySet()) {
-                    changes.add(PropertyChange.setProperty(entry.getKey(), 
entry.getValue()));
-                }
-                catalog.alterDatabase(databaseName, changes, false);
-                AlterDatabaseResponse alterDatabaseResponse =
-                        new AlterDatabaseResponse(
-                                requestBody.getRemovals(),
-                                requestBody.getUpdates().keySet().stream()
-                                        .collect(Collectors.toList()),
-                                Collections.emptyList());
-                return mockResponse(alterDatabaseResponse, 200);
-            default:
-                return new MockResponse().setResponseCode(404);
+        Database database;
+        if (databaseStore.containsKey(databaseName)) {
+            switch (request.getMethod()) {
+                case "GET":
+                    database = databaseStore.get(databaseName);
+                    response =
+                            new GetDatabaseResponse(
+                                    UUID.randomUUID().toString(),
+                                    database.name(),
+                                    database.options());
+                    return mockResponse(response, 200);
+                case "DELETE":
+                    catalog.dropDatabase(databaseName, false, true);
+                    databaseStore.remove(databaseName);
+                    return new MockResponse().setResponseCode(200);
+                case "POST":
+                    AlterDatabaseRequest requestBody =
+                            OBJECT_MAPPER.readValue(
+                                    request.getBody().readUtf8(), 
AlterDatabaseRequest.class);
+                    List<PropertyChange> changes = new ArrayList<>();
+                    for (String property : requestBody.getRemovals()) {
+                        changes.add(PropertyChange.removeProperty(property));
+                    }
+                    for (Map.Entry<String, String> entry : 
requestBody.getUpdates().entrySet()) {
+                        changes.add(PropertyChange.setProperty(entry.getKey(), 
entry.getValue()));
+                    }
+                    if (databaseStore.containsKey(databaseName)) {
+                        Pair<Map<String, String>, Set<String>> 
setPropertiesToRemoveKeys =
+                                
PropertyChange.getSetPropertiesToRemoveKeys(changes);
+                        Map<String, String> setProperties = 
setPropertiesToRemoveKeys.getLeft();
+                        Set<String> removeKeys = 
setPropertiesToRemoveKeys.getRight();
+                        database = databaseStore.get(databaseName);
+                        Map<String, String> parameter = new 
HashMap<>(database.options());
+                        if (!setProperties.isEmpty()) {
+                            parameter.putAll(setProperties);
+                        }
+                        if (!removeKeys.isEmpty()) {
+                            parameter.keySet().removeAll(removeKeys);
+                        }
+                        Database alterDatabase = Database.of(databaseName, 
parameter, null);
+                        databaseStore.put(databaseName, alterDatabase);
+                    } else {
+                        throw new 
Catalog.DatabaseNotExistException(databaseName);
+                    }
+                    AlterDatabaseResponse alterDatabaseResponse =
+                            new AlterDatabaseResponse(
+                                    requestBody.getRemovals(),
+                                    requestBody.getUpdates().keySet().stream()
+                                            .collect(Collectors.toList()),
+                                    Collections.emptyList());
+                    return mockResponse(alterDatabaseResponse, 200);
+                default:
+                    return new MockResponse().setResponseCode(404);
+            }
         }
+        return new MockResponse().setResponseCode(404);
     }
 
-    private static MockResponse tablesApiHandler(
-            Catalog catalog, RecordedRequest request, String databaseName) 
throws Exception {
+    private MockResponse tablesApiHandler(RecordedRequest request, String 
databaseName)
+            throws Exception {
         RESTResponse response;
-        switch (request.getMethod()) {
-            case "GET":
-                response = new 
ListTablesResponse(catalog.listTables(databaseName));
-                return mockResponse(response, 200);
-            case "POST":
-                CreateTableRequest requestBody =
-                        OBJECT_MAPPER.readValue(
-                                request.getBody().readUtf8(), 
CreateTableRequest.class);
-                catalog.createTable(requestBody.getIdentifier(), 
requestBody.getSchema(), false);
-                return new MockResponse().setResponseCode(200);
-            default:
-                return new MockResponse().setResponseCode(404);
+        if (databaseStore.containsKey(databaseName)) {
+            switch (request.getMethod()) {
+                case "GET":
+                    List<String> tables = new ArrayList<>();
+                    for (Map.Entry<String, TableMetadata> entry : 
tableMetadataStore.entrySet()) {
+                        Identifier identifier = 
Identifier.fromString(entry.getKey());
+                        if (databaseName.equals(identifier.getDatabaseName())) 
{
+                            tables.add(identifier.getTableName());
+                        }
+                    }
+                    response = new ListTablesResponse(tables);
+                    return mockResponse(response, 200);
+                case "POST":
+                    CreateTableRequest requestBody =
+                            OBJECT_MAPPER.readValue(
+                                    request.getBody().readUtf8(), 
CreateTableRequest.class);
+                    Identifier identifier = requestBody.getIdentifier();
+                    Schema schema = requestBody.getSchema();
+                    TableMetadata tableMetadata;
+                    if (isFormatTable(schema)) {
+                        tableMetadata = createFormatTable(identifier, schema);
+                    } else {
+                        catalog.createTable(identifier, schema, false);
+                        tableMetadata =
+                                createTableMetadata(
+                                        requestBody.getIdentifier(),
+                                        1L,
+                                        requestBody.getSchema(),
+                                        UUID.randomUUID().toString(),
+                                        false);
+                    }
+                    tableMetadataStore.put(
+                            requestBody.getIdentifier().getFullName(), 
tableMetadata);
+                    return new MockResponse().setResponseCode(200);
+                default:
+                    return new MockResponse().setResponseCode(404);
+            }
         }
+        return mockResponse(
+                new ErrorResponse(ErrorResponseResourceType.DATABASE, null, 
"", 404), 404);
+    }
+
+    private boolean isFormatTable(Schema schema) {
+        return Options.fromMap(schema.options()).get(TYPE) == FORMAT_TABLE;
     }
 
-    private static MockResponse tableApiHandler(
-            Catalog catalog, RecordedRequest request, String databaseName, 
String tableName)
+    private MockResponse tableApiHandler(RecordedRequest request, Identifier 
identifier)
             throws Exception {
         RESTResponse response;
-        Identifier identifier = Identifier.create(databaseName, tableName);
-        switch (request.getMethod()) {
-            case "GET":
-                response = getTable(catalog, databaseName, tableName);
-                return mockResponse(response, 200);
-            case "POST":
-                AlterTableRequest requestBody =
-                        OBJECT_MAPPER.readValue(
-                                request.getBody().readUtf8(), 
AlterTableRequest.class);
-                catalog.alterTable(identifier, requestBody.getChanges(), 
false);
-                return new MockResponse().setResponseCode(200);
-            case "DELETE":
-                catalog.dropTable(identifier, false);
-                return new MockResponse().setResponseCode(200);
-            default:
-                return new MockResponse().setResponseCode(404);
+        if (tableMetadataStore.containsKey(identifier.getFullName())) {
+            switch (request.getMethod()) {
+                case "GET":
+                    TableMetadata tableMetadata = 
tableMetadataStore.get(identifier.getFullName());
+                    response =
+                            new GetTableResponse(
+                                    tableMetadata.uuid(),
+                                    identifier.getTableName(),
+                                    tableMetadata.isExternal(),
+                                    tableMetadata.schema().id(),
+                                    tableMetadata.schema().toSchema());
+                    return mockResponse(response, 200);
+                case "POST":
+                    AlterTableRequest requestBody =
+                            OBJECT_MAPPER.readValue(
+                                    request.getBody().readUtf8(), 
AlterTableRequest.class);
+                    alterTableImpl(identifier, requestBody.getChanges());
+                    return new MockResponse().setResponseCode(200);
+                case "DELETE":
+                    try {
+                        catalog.dropTable(identifier, false);
+                    } catch (Exception e) {
+                        System.out.println(e.getMessage());
+                    }
+                    tableMetadataStore.remove(identifier.getFullName());
+                    return new MockResponse().setResponseCode(200);
+                default:
+                    return new MockResponse().setResponseCode(404);
+            }
+        } else {
+            throw new Catalog.TableNotExistException(identifier);
         }
     }
 
-    private static MockResponse renameTableApiHandler(Catalog catalog, 
RecordedRequest request)
-            throws Exception {
+    private MockResponse renameTableApiHandler(RecordedRequest request) throws 
Exception {
         RenameTableRequest requestBody =
                 OBJECT_MAPPER.readValue(request.getBody().readUtf8(), 
RenameTableRequest.class);
-        catalog.renameTable(requestBody.getSource(), 
requestBody.getDestination(), false);
+        Identifier fromTable = requestBody.getSource();
+        Identifier toTable = requestBody.getDestination();
+        if (tableMetadataStore.containsKey(fromTable.getFullName())) {
+            TableMetadata tableMetadata = 
tableMetadataStore.get(fromTable.getFullName());
+            if (!isFormatTable(tableMetadata.schema().toSchema())) {
+                catalog.renameTable(requestBody.getSource(), 
requestBody.getDestination(), false);
+            }
+            tableMetadataStore.remove(fromTable.getFullName());
+            tableMetadataStore.put(toTable.getFullName(), tableMetadata);
+        } else {
+            throw new Catalog.TableNotExistException(fromTable);
+        }
         return new MockResponse().setResponseCode(200);
     }
 
-    private static MockResponse partitionsApiHandler(
-            Catalog catalog, RecordedRequest request, String databaseName, 
String tableName)
+    private MockResponse partitionsApiHandler(RecordedRequest request, 
Identifier tableIdentifier)
             throws Exception {
         RESTResponse response;
-        Identifier identifier = Identifier.create(databaseName, tableName);
         switch (request.getMethod()) {
             case "GET":
-                List<Partition> partitions = 
catalog.listPartitions(identifier);
+                List<Partition> partitions =
+                        
tablePartitionsStore.get(tableIdentifier.getFullName());
                 response = new ListPartitionsResponse(partitions);
                 return mockResponse(response, 200);
             case "POST":
                 CreatePartitionsRequest requestBody =
                         OBJECT_MAPPER.readValue(
                                 request.getBody().readUtf8(), 
CreatePartitionsRequest.class);
-                catalog.createPartitions(identifier, 
requestBody.getPartitionSpecs());
+                tablePartitionsStore.put(
+                        tableIdentifier.getFullName(),
+                        requestBody.getPartitionSpecs().stream()
+                                .map(partition -> spec2Partition(partition))
+                                .collect(Collectors.toList()));
                 return new MockResponse().setResponseCode(200);
             default:
                 return new MockResponse().setResponseCode(404);
         }
     }
 
-    private static MockResponse viewsApiHandler(
-            Catalog catalog, RecordedRequest request, String databaseName) 
throws Exception {
+    private MockResponse viewsApiHandler(RecordedRequest request, String 
databaseName)
+            throws Exception {
         RESTResponse response;
         switch (request.getMethod()) {
             case "GET":
-                response = new 
ListViewsResponse(catalog.listViews(databaseName));
+                List<String> views =
+                        viewStore.keySet().stream()
+                                .map(Identifier::fromString)
+                                .filter(
+                                        identifier ->
+                                                
identifier.getDatabaseName().equals(databaseName))
+                                .map(Identifier::getTableName)
+                                .collect(Collectors.toList());
+                response = new ListViewsResponse(views);
                 return mockResponse(response, 200);
             case "POST":
                 CreateViewRequest requestBody =
                         OBJECT_MAPPER.readValue(
                                 request.getBody().readUtf8(), 
CreateViewRequest.class);
+                Identifier identifier = requestBody.getIdentifier();
                 ViewSchema schema = requestBody.getSchema();
                 ViewImpl view =
                         new ViewImpl(
@@ -643,71 +737,178 @@ public class RESTCatalogServer {
                                 schema.dialects(),
                                 schema.comment(),
                                 schema.options());
-                catalog.createView(requestBody.getIdentifier(), view, false);
+                if (viewStore.containsKey(identifier.getFullName())) {
+                    throw new Catalog.ViewAlreadyExistException(identifier);
+                }
+                viewStore.put(identifier.getFullName(), view);
                 return new MockResponse().setResponseCode(200);
             default:
                 return new MockResponse().setResponseCode(404);
         }
     }
 
-    private static MockResponse viewApiHandler(
-            Catalog catalog, RecordedRequest request, String databaseName, 
String viewName)
+    private MockResponse viewApiHandler(RecordedRequest request, Identifier 
identifier)
             throws Exception {
         RESTResponse response;
-        Identifier identifier = Identifier.create(databaseName, viewName);
-        switch (request.getMethod()) {
-            case "GET":
-                View view = catalog.getView(identifier);
-                ViewSchema schema =
-                        new ViewSchema(
-                                view.rowType().getFields(),
-                                view.query(),
-                                view.dialects(),
-                                view.comment().orElse(null),
-                                view.options());
-                response = new GetViewResponse("id", 
identifier.getTableName(), schema);
-                return mockResponse(response, 200);
-            case "DELETE":
-                catalog.dropView(identifier, false);
-                return new MockResponse().setResponseCode(200);
-            default:
-                return new MockResponse().setResponseCode(404);
+        if (viewStore.containsKey(identifier.getFullName())) {
+            switch (request.getMethod()) {
+                case "GET":
+                    if (viewStore.containsKey(identifier.getFullName())) {
+                        View view = viewStore.get(identifier.getFullName());
+                        ViewSchema schema =
+                                new ViewSchema(
+                                        view.rowType().getFields(),
+                                        view.query(),
+                                        view.dialects(),
+                                        view.comment().orElse(null),
+                                        view.options());
+                        response = new GetViewResponse("id", 
identifier.getTableName(), schema);
+                        return mockResponse(response, 200);
+                    }
+                    throw new Catalog.ViewNotExistException(identifier);
+                case "DELETE":
+                    viewStore.remove(identifier.getFullName());
+                    return new MockResponse().setResponseCode(200);
+                default:
+                    return new MockResponse().setResponseCode(404);
+            }
         }
+        throw new Catalog.ViewNotExistException(identifier);
     }
 
-    private static MockResponse renameViewApiHandler(Catalog catalog, 
RecordedRequest request)
-            throws Exception {
+    private MockResponse renameViewApiHandler(RecordedRequest request) throws 
Exception {
         RenameTableRequest requestBody =
                 OBJECT_MAPPER.readValue(request.getBody().readUtf8(), 
RenameTableRequest.class);
-        catalog.renameView(requestBody.getSource(), 
requestBody.getDestination(), false);
+        Identifier fromView = requestBody.getSource();
+        Identifier toView = requestBody.getDestination();
+        if (!viewStore.containsKey(fromView.getFullName())) {
+            throw new Catalog.ViewNotExistException(fromView);
+        }
+        if (viewStore.containsKey(toView.getFullName())) {
+            throw new Catalog.ViewAlreadyExistException(toView);
+        }
+        if (viewStore.containsKey(fromView.getFullName())) {
+            View view = viewStore.get(fromView.getFullName());
+            viewStore.remove(fromView.getFullName());
+            viewStore.put(toView.getFullName(), view);
+        }
         return new MockResponse().setResponseCode(200);
     }
 
-    private static GetTableResponse getTable(Catalog catalog, String 
databaseName, String tableName)
-            throws Exception {
-        Identifier identifier = Identifier.create(databaseName, tableName);
-        Table table = catalog.getTable(identifier);
-        Schema schema;
-        Long schemaId = 1L;
-        if (table instanceof FileStoreTable) {
-            FileStoreTable fileStoreTable = (FileStoreTable) table;
-            schema = fileStoreTable.schema().toSchema();
-            schemaId = fileStoreTable.schema().id();
+    protected void alterTableImpl(Identifier identifier, List<SchemaChange> 
changes)
+            throws Catalog.TableNotExistException, 
Catalog.ColumnAlreadyExistException,
+                    Catalog.ColumnNotExistException {
+        if (tableMetadataStore.containsKey(identifier.getFullName())) {
+            TableMetadata tableMetadata = 
tableMetadataStore.get(identifier.getFullName());
+            TableSchema schema = tableMetadata.schema();
+            if (isFormatTable(schema.toSchema())) {
+                throw new UnsupportedOperationException("Only data table 
support alter table.");
+            }
+            try {
+                catalog.alterTable(identifier, changes, false);
+                FileStoreTable table = (FileStoreTable) 
catalog.getTable(identifier);
+                TableSchema newSchema = table.schema();
+                TableMetadata newTableMetadata =
+                        createTableMetadata(
+                                identifier,
+                                newSchema.id(),
+                                newSchema.toSchema(),
+                                tableMetadata.uuid(),
+                                tableMetadata.isExternal());
+                tableMetadataStore.put(identifier.getFullName(), 
newTableMetadata);
+            } catch (Catalog.TableNotExistException
+                    | Catalog.ColumnAlreadyExistException
+                    | Catalog.ColumnNotExistException
+                    | RuntimeException e) {
+                throw e;
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private boolean commitSnapshot(
+            Identifier identifier, Snapshot snapshot, List<Partition> 
statistics)
+            throws Catalog.TableNotExistException {
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+        RenamingSnapshotCommit commit =
+                new RenamingSnapshotCommit(table.snapshotManager(), 
Lock.empty());
+        String branchName = identifier.getBranchName();
+        if (branchName == null) {
+            branchName = "main";
+        }
+        try {
+            boolean success = commit.commit(snapshot, branchName, 
Collections.emptyList());
+            tableSnapshotStore.put(identifier.getFullName(), snapshot);
+            return success;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private MockResponse dropPartitionsHandle(Identifier identifier, 
RecordedRequest request)
+            throws Catalog.TableNotExistException, JsonProcessingException {
+        DropPartitionsRequest dropPartitionsRequest =
+                OBJECT_MAPPER.readValue(request.getBody().readUtf8(), 
DropPartitionsRequest.class);
+        List<Map<String, String>> partitionSpecs = 
dropPartitionsRequest.getPartitionSpecs();
+        if (tableMetadataStore.containsKey(identifier.getFullName())) {
+            List<Partition> existPartitions = 
tablePartitionsStore.get(identifier.getFullName());
+            partitionSpecs.forEach(
+                    partition -> {
+                        for (Map.Entry<String, String> entry : 
partition.entrySet()) {
+                            existPartitions.stream()
+                                    .filter(
+                                            p ->
+                                                    
p.spec().containsKey(entry.getKey())
+                                                            && p.spec()
+                                                                    
.get(entry.getKey())
+                                                                    
.equals(entry.getValue()))
+                                    .findFirst()
+                                    .ifPresent(
+                                            existPartition ->
+                                                    
existPartitions.remove(existPartition));
+                        }
+                    });
+            return new MockResponse().setResponseCode(200);
+
         } else {
-            FormatTable formatTable = (FormatTable) table;
-            List<DataField> fields = formatTable.rowType().getFields();
-            schema =
-                    new Schema(
-                            fields,
-                            table.partitionKeys(),
-                            table.primaryKeys(),
-                            table.options(),
-                            table.comment().orElse(null));
+            throw new Catalog.TableNotExistException(identifier);
         }
-        return new GetTableResponse(table.uuid(), table.name(), false, 
schemaId, schema);
     }
 
-    private static MockResponse mockResponse(RESTResponse response, int 
httpCode) {
+    private MockResponse alterPartitionsHandle(Identifier identifier, 
RecordedRequest request)
+            throws Catalog.TableNotExistException, JsonProcessingException {
+        if (tableMetadataStore.containsKey(identifier.getFullName())) {
+            AlterPartitionsRequest alterPartitionsRequest =
+                    OBJECT_MAPPER.readValue(
+                            request.getBody().readUtf8(), 
AlterPartitionsRequest.class);
+            List<Partition> partitions = 
alterPartitionsRequest.getPartitions();
+            List<Partition> existPartitions = 
tablePartitionsStore.get(identifier.getFullName());
+            partitions.forEach(
+                    partition -> {
+                        for (Map.Entry<String, String> entry : 
partition.spec().entrySet()) {
+                            existPartitions.stream()
+                                    .filter(
+                                            p ->
+                                                    
p.spec().containsKey(entry.getKey())
+                                                            && p.spec()
+                                                                    
.get(entry.getKey())
+                                                                    
.equals(entry.getValue()))
+                                    .findFirst()
+                                    .ifPresent(
+                                            existPartition ->
+                                                    
existPartitions.remove(existPartition));
+                        }
+                    });
+            existPartitions.addAll(partitions);
+            tablePartitionsStore.put(identifier.getFullName(), 
existPartitions);
+            return new MockResponse().setResponseCode(200);
+        } else {
+            throw new Catalog.TableNotExistException(identifier);
+        }
+    }
+
+    private MockResponse mockResponse(RESTResponse response, int httpCode) {
         try {
             return new MockResponse()
                     .setResponseCode(httpCode)
@@ -718,14 +919,30 @@ public class RESTCatalogServer {
         }
     }
 
-    private static String getConfigBody(String warehouseStr) {
-        return String.format(
-                "{\"defaults\": {\"%s\": \"%s\", \"%s\": \"%s\", \"%s\": 
\"%s\"}}",
-                RESTCatalogInternalOptions.PREFIX.key(),
-                PREFIX,
-                CatalogOptions.WAREHOUSE.key(),
-                warehouseStr,
-                "header.test-header",
-                "test-value");
+    private TableMetadata createTableMetadata(
+            Identifier identifier, long schemaId, Schema schema, String uuid, 
boolean isExternal) {
+        Map<String, String> options = new HashMap<>(schema.options());
+        Path path = catalog.getTableLocation(identifier);
+        options.put(PATH.key(), path.toString());
+        TableSchema tableSchema =
+                new TableSchema(
+                        schemaId,
+                        schema.fields(),
+                        schema.fields().size() - 1,
+                        schema.partitionKeys(),
+                        schema.primaryKeys(),
+                        options,
+                        schema.comment());
+        TableMetadata tableMetadata = new TableMetadata(tableSchema, 
isExternal, uuid);
+        return tableMetadata;
+    }
+
+    private TableMetadata createFormatTable(Identifier identifier, Schema 
schema) {
+        return createTableMetadata(identifier, 1L, schema, 
UUID.randomUUID().toString(), true);
+    }
+
+    private Partition spec2Partition(Map<String, String> spec) {
+        // todo: need update
+        return new Partition(spec, 123, 456, 789, 123);
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index b9a550950c..92c6db658d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -23,15 +23,25 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogTestBase;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
+import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.rest.auth.AuthProviderEnum;
 import org.apache.paimon.rest.auth.BearTokenAuthProvider;
 import org.apache.paimon.rest.auth.RESTAuthParameter;
 import org.apache.paimon.rest.exceptions.NotAuthorizedException;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 
@@ -43,11 +53,13 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.UUID;
 
 import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
 import static 
org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis;
@@ -125,12 +137,7 @@ class RESTCatalogTest extends CatalogTestBase {
 
     @Test
     void testRefreshFileIO() throws Exception {
-        Options options = new Options();
-        options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
-        options.set(RESTCatalogOptions.TOKEN, initToken);
-        options.set(RESTCatalogOptions.DATA_TOKEN_ENABLED, true);
-        options.set(RESTCatalogOptions.TOKEN_PROVIDER, 
AuthProviderEnum.BEAR.identifier());
-        this.catalog = new RESTCatalog(CatalogContext.create(options));
+        this.catalog = initDataTokenCatalog();
         List<Identifier> identifiers =
                 Lists.newArrayList(
                         Identifier.create("test_db_a", "test_table_a"),
@@ -140,29 +147,101 @@ class RESTCatalogTest extends CatalogTestBase {
             createTable(identifier, Maps.newHashMap(), 
Lists.newArrayList("col1"));
             FileStoreTable fileStoreTable = (FileStoreTable) 
catalog.getTable(identifier);
             assertEquals(true, 
fileStoreTable.fileIO().exists(fileStoreTable.location()));
+
+            RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO();
+            RESTToken fileDataToken = fileIO.validToken();
+            RESTToken serverDataToken =
+                    
restCatalogServer.dataTokenStore.get(identifier.getFullName());
+            assertEquals(serverDataToken, fileDataToken);
         }
     }
 
     @Test
-    void testSnapshotFromREST() throws Catalog.TableNotExistException {
+    void testRefreshFileIOWhenExpired() throws Exception {
+        this.catalog = initDataTokenCatalog();
+        Identifier identifier =
+                Identifier.create("test_data_token", 
"table_for_testing_date_token");
+        RESTToken expiredDataToken =
+                new RESTToken(
+                        ImmutableMap.of("akId", "akId", "akSecret", 
UUID.randomUUID().toString()),
+                        System.currentTimeMillis());
+        restCatalogServer.setDataToken(identifier, expiredDataToken);
+        createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1"));
+        FileStoreTable fileStoreTable = (FileStoreTable) 
catalog.getTable(identifier);
+        RESTTokenFileIO fileIO = (RESTTokenFileIO) fileStoreTable.fileIO();
+        RESTToken fileDataToken = fileIO.validToken();
+        assertEquals(expiredDataToken, fileDataToken);
+        RESTToken newDataToken =
+                new RESTToken(
+                        ImmutableMap.of("akId", "akId", "akSecret", 
UUID.randomUUID().toString()),
+                        System.currentTimeMillis() + 100_000);
+        restCatalogServer.setDataToken(identifier, newDataToken);
+        RESTToken nextFileDataToken = fileIO.validToken();
+        assertEquals(newDataToken, nextFileDataToken);
+        assertEquals(true, nextFileDataToken.expireAtMillis() - 
fileDataToken.expireAtMillis() > 0);
+    }
+
+    @Test
+    void testSnapshotFromREST() throws Exception {
         Options options = new Options();
         options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
         options.set(RESTCatalogOptions.TOKEN, initToken);
         options.set(RESTCatalogOptions.TOKEN_PROVIDER, 
AuthProviderEnum.BEAR.identifier());
         RESTCatalog catalog = new RESTCatalog(CatalogContext.create(options));
-        Identifier hasSnapshotTable = Identifier.create("test_db_a", 
"my_snapshot_table");
+        Identifier hasSnapshotTableIdentifier = Identifier.create("test_db_a", 
"my_snapshot_table");
+        createTable(hasSnapshotTableIdentifier, Maps.newHashMap(), 
Lists.newArrayList("col1"));
         long id = 10086;
         long millis = System.currentTimeMillis();
-        restCatalogServer.setTableSnapshot(hasSnapshotTable, 
createSnapshotWithMillis(id, millis));
-        Optional<Snapshot> snapshot = catalog.loadSnapshot(hasSnapshotTable);
+        restCatalogServer.setTableSnapshot(
+                hasSnapshotTableIdentifier, createSnapshotWithMillis(id, 
millis));
+        Optional<Snapshot> snapshot = 
catalog.loadSnapshot(hasSnapshotTableIdentifier);
         assertThat(snapshot).isPresent();
         assertThat(snapshot.get().id()).isEqualTo(id);
         assertThat(snapshot.get().timeMillis()).isEqualTo(millis);
-
-        snapshot = catalog.loadSnapshot(Identifier.create("test_db_a", 
"unknown"));
+        Identifier noSnapshotTableIdentifier = 
Identifier.create("test_db_a_1", "unknown");
+        createTable(noSnapshotTableIdentifier, Maps.newHashMap(), 
Lists.newArrayList("col1"));
+        snapshot = catalog.loadSnapshot(noSnapshotTableIdentifier);
         assertThat(snapshot).isEmpty();
     }
 
+    @Test
+    public void testBatchRecordsWrite() throws Exception {
+
+        Identifier tableIdentifier = Identifier.create("my_db", "my_table");
+        createTable(tableIdentifier, Maps.newHashMap(), 
Lists.newArrayList("col1"));
+        FileStoreTable tableTestWrite = (FileStoreTable) 
catalog.getTable(tableIdentifier);
+
+        // write
+        BatchWriteBuilder writeBuilder = tableTestWrite.newBatchWriteBuilder();
+        BatchTableWrite write = writeBuilder.newWrite();
+        GenericRow record1 = GenericRow.of(12);
+        GenericRow record2 = GenericRow.of(5);
+        GenericRow record3 = GenericRow.of(18);
+        write.write(record1);
+        write.write(record2);
+        write.write(record3);
+        List<CommitMessage> messages = write.prepareCommit();
+        BatchTableCommit commit = writeBuilder.newCommit();
+        commit.commit(messages);
+        write.close();
+        commit.close();
+
+        // read
+        ReadBuilder readBuilder = tableTestWrite.newReadBuilder();
+        List<Split> splits = readBuilder.newScan().plan().splits();
+        TableRead read = readBuilder.newRead();
+        RecordReader<InternalRow> reader = read.createReader(splits);
+        List<String> actual = new ArrayList<>();
+        reader.forEachRemaining(
+                row -> {
+                    String rowStr =
+                            String.format("%s[%d]", 
row.getRowKind().shortString(), row.getInt(0));
+                    actual.add(rowStr);
+                });
+
+        assertThat(actual).containsExactlyInAnyOrder("+I[5]", "+I[12]", 
"+I[18]");
+    }
+
     @Test
     void testBranches() throws Exception {
         String databaseName = "testBranchTable";
@@ -199,6 +278,11 @@ class RESTCatalogTest extends CatalogTestBase {
         return true;
     }
 
+    // TODO implement this
+    @Override
+    @Test
+    public void testTableUUID() {}
+
     private void createTable(
             Identifier identifier, Map<String, String> options, List<String> 
partitionKeys)
             throws Exception {
@@ -214,8 +298,12 @@ class RESTCatalogTest extends CatalogTestBase {
                 true);
     }
 
-    // TODO implement this
-    @Override
-    @Test
-    public void testTableUUID() {}
+    private Catalog initDataTokenCatalog() {
+        Options options = new Options();
+        options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
+        options.set(RESTCatalogOptions.TOKEN, initToken);
+        options.set(RESTCatalogOptions.DATA_TOKEN_ENABLED, true);
+        options.set(RESTCatalogOptions.TOKEN_PROVIDER, 
AuthProviderEnum.BEAR.identifier());
+        return new RESTCatalog(CatalogContext.create(options));
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
index dc202ec872..cf66ee425b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RESTCatalogITCase.java
@@ -105,6 +105,16 @@ class RESTCatalogITCase extends CatalogITCaseBase {
         return options;
     }
 
+    @Test
+    public void testWriteAndRead() {
+        batchSql(
+                String.format(
+                        "INSERT INTO %s.%s VALUES ('1', 11), ('2', 22)",
+                        DATABASE_NAME, TABLE_NAME));
+        assertThat(batchSql(String.format("SELECT * FROM %s.%s", 
DATABASE_NAME, TABLE_NAME)))
+                .containsExactlyInAnyOrder(Row.of("1", 11.0D), Row.of("2", 
22.0D));
+    }
+
     @Override
     protected String getTempDirPath() {
         return this.warehouse;
diff --git a/paimon-spark/paimon-spark-ut/pom.xml 
b/paimon-spark/paimon-spark-ut/pom.xml
index 552eb8474f..78127bd46f 100644
--- a/paimon-spark/paimon-spark-ut/pom.xml
+++ b/paimon-spark/paimon-spark-ut/pom.xml
@@ -157,6 +157,13 @@ under the License.
             <artifactId>protobuf-java</artifactId>
             <version>${protobuf-java.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>mockwebserver</artifactId>
+            <version>${okhttp.version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
new file mode 100644
index 0000000000..cd2acdbe86
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithRestTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark;
+
+import org.apache.paimon.rest.RESTCatalogServer;
+import org.apache.paimon.rest.auth.AuthProviderEnum;
+
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for spark read from Rest catalog. */
+public class SparkCatalogWithRestTest {
+
+    private RESTCatalogServer restCatalogServer;
+    private String serverUrl;
+    private String warehouse;
+    private String initToken = "init_token";
+    @TempDir java.nio.file.Path tempFile;
+
+    @BeforeEach
+    public void before() throws IOException {
+        warehouse = tempFile.toUri().toString();
+        restCatalogServer = new RESTCatalogServer(warehouse, initToken);
+        restCatalogServer.start();
+        serverUrl = restCatalogServer.getUrl();
+    }
+
+    @AfterEach()
+    public void after() throws Exception {
+        restCatalogServer.shutdown();
+    }
+
+    @Test
+    public void testTable() {
+        SparkSession spark =
+                SparkSession.builder()
+                        .config("spark.sql.catalog.paimon", 
SparkCatalog.class.getName())
+                        .config("spark.sql.catalog.paimon.metastore", "rest")
+                        .config("spark.sql.catalog.paimon.uri", serverUrl)
+                        .config("spark.sql.catalog.paimon.token", initToken)
+                        .config(
+                                "spark.sql.catalog.paimon.token.provider",
+                                AuthProviderEnum.BEAR.identifier())
+                        .master("local[2]")
+                        .getOrCreate();
+
+        spark.sql("CREATE DATABASE paimon.db2");
+        spark.sql("USE paimon.db2");
+        spark.sql(
+                "CREATE TABLE t1 (a INT, b INT, c STRING) TBLPROPERTIES"
+                        + " ('primary-key'='a', 'bucket'='4', 
'file.format'='avro')");
+        assertThat(
+                        spark.sql("SHOW TABLES").collectAsList().stream()
+                                .map(s -> s.get(1))
+                                .map(Object::toString))
+                .containsExactlyInAnyOrder("t1");
+        spark.sql("DROP TABLE t1");
+        assertThat(spark.sql("SHOW TABLES").collectAsList().size() == 0);
+        spark.close();
+    }
+}


Reply via email to