This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 998ad3ea30 [core] RESTCatalog: add table test and fix branch (#5313)
998ad3ea30 is described below

commit 998ad3ea303fdafb34f13ab1e8b697eba4d8269f
Author: jerry <[email protected]>
AuthorDate: Mon Mar 24 16:42:45 2025 +0800

    [core] RESTCatalog: add table test and fix branch (#5313)
---
 .../org/apache/paimon/utils/BranchManager.java     |  29 ++++++
 .../apache/paimon/utils/CatalogBranchManager.java  |  27 ++++-
 .../paimon/utils/FileSystemBranchManager.java      |  32 ++----
 ...STCatalogTest.java => RESTCatalogMockTest.java} | 110 +++++++++++----------
 .../org/apache/paimon/rest/RESTCatalogServer.java  |  55 ++++++++---
 .../apache/paimon/rest/RESTCatalogTestBase.java    |  27 ++++-
 .../table/SimpleTableInRESTCatalogMockTest.java    |  64 ++++++++++++
 .../table/SimpleTableInRESTCatalogTestBase.java    | 106 ++++++++++++++++++++
 .../apache/paimon/table/SimpleTableTestBase.java   |  60 +++++------
 paimon-open-api/rest-catalog-open-api.yaml         |  12 +--
 10 files changed, 385 insertions(+), 137 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
index f54eb98752..d144db7d51 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java
@@ -24,6 +24,8 @@ import javax.annotation.Nullable;
 
 import java.util.List;
 
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
 /** Manager for {@code Branch}. */
 public interface BranchManager {
 
@@ -58,4 +60,31 @@ public interface BranchManager {
     static boolean isMainBranch(String branch) {
         return branch.equals(DEFAULT_MAIN_BRANCH);
     }
+
+    static void validateBranch(String branchName) {
+        checkArgument(
+                !BranchManager.isMainBranch(branchName),
+                String.format(
+                        "Branch name '%s' is the default branch and cannot be 
used.",
+                        DEFAULT_MAIN_BRANCH));
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(branchName),
+                "Branch name '%s' is blank.",
+                branchName);
+        checkArgument(
+                !branchName.chars().allMatch(Character::isDigit),
+                "Branch name cannot be pure numeric string but is '%s'.",
+                branchName);
+    }
+
+    static void fastForwardValidate(String branchName) {
+        checkArgument(
+                !branchName.equals(DEFAULT_MAIN_BRANCH),
+                "Branch name '%s' do not use in fast-forward.",
+                branchName);
+        checkArgument(
+                !StringUtils.isNullOrWhitespaceOnly(branchName),
+                "Branch name '%s' is blank.",
+                branchName);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
index d87f97badd..92357e5802 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
@@ -40,8 +40,19 @@ public class CatalogBranchManager implements BranchManager {
     private void executePost(ThrowingConsumer<Catalog, Exception> func) {
         executeGet(
                 catalog -> {
-                    func.accept(catalog);
-                    return null;
+                    try {
+                        func.accept(catalog);
+                        return null;
+                    } catch (Catalog.BranchNotExistException e) {
+                        throw new IllegalArgumentException(
+                                String.format("Branch name '%s' doesn't 
exist.", e.branch()));
+                    } catch (Catalog.TagNotExistException e) {
+                        throw new IllegalArgumentException(
+                                String.format("Tag '%s' doesn't exist.", 
e.tag()));
+                    } catch (Catalog.BranchAlreadyExistException e) {
+                        throw new IllegalArgumentException(
+                                String.format("Branch name '%s' already 
exists..", e.branch()));
+                    }
                 });
     }
 
@@ -62,7 +73,11 @@ public class CatalogBranchManager implements BranchManager {
 
     @Override
     public void createBranch(String branchName, @Nullable String tagName) {
-        executePost(catalog -> catalog.createBranch(identifier, branchName, 
tagName));
+        executePost(
+                catalog -> {
+                    BranchManager.validateBranch(branchName);
+                    catalog.createBranch(identifier, branchName, tagName);
+                });
     }
 
     @Override
@@ -72,7 +87,11 @@ public class CatalogBranchManager implements BranchManager {
 
     @Override
     public void fastForward(String branchName) {
-        executePost(catalog -> catalog.fastForward(identifier, branchName));
+        executePost(
+                catalog -> {
+                    BranchManager.fastForwardValidate(branchName);
+                    catalog.fastForward(identifier, branchName);
+                });
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
index 3d4dd7df43..ab3905f2ec 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/FileSystemBranchManager.java
@@ -74,7 +74,6 @@ public class FileSystemBranchManager implements BranchManager 
{
     @Override
     public void createBranch(String branchName) {
         validateBranch(branchName);
-
         try {
             TableSchema latestSchema = schemaManager.latest().get();
             copySchemasToBranch(branchName, latestSchema.id());
@@ -139,14 +138,7 @@ public class FileSystemBranchManager implements 
BranchManager {
 
     @Override
     public void fastForward(String branchName) {
-        checkArgument(
-                !branchName.equals(DEFAULT_MAIN_BRANCH),
-                "Branch name '%s' do not use in fast-forward.",
-                branchName);
-        checkArgument(
-                !StringUtils.isNullOrWhitespaceOnly(branchName),
-                "Branch name '%s' is blank.",
-                branchName);
+        BranchManager.fastForwardValidate(branchName);
         checkArgument(branchExists(branchName), "Branch name '%s' doesn't 
exist.", branchName);
 
         Long earliestSnapshotId = 
snapshotManager.copyWithBranch(branchName).earliestSnapshotId();
@@ -207,6 +199,11 @@ public class FileSystemBranchManager implements 
BranchManager {
         return fileExists(branchPath);
     }
 
+    public void validateBranch(String branchName) {
+        BranchManager.validateBranch(branchName);
+        checkArgument(!branchExists(branchName), "Branch name '%s' already 
exists.", branchName);
+    }
+
     @Override
     public List<String> branches() {
         try {
@@ -218,23 +215,6 @@ public class FileSystemBranchManager implements 
BranchManager {
         }
     }
 
-    private void validateBranch(String branchName) {
-        checkArgument(
-                !BranchManager.isMainBranch(branchName),
-                String.format(
-                        "Branch name '%s' is the default branch and cannot be 
used.",
-                        DEFAULT_MAIN_BRANCH));
-        checkArgument(
-                !StringUtils.isNullOrWhitespaceOnly(branchName),
-                "Branch name '%s' is blank.",
-                branchName);
-        checkArgument(!branchExists(branchName), "Branch name '%s' already 
exists.", branchName);
-        checkArgument(
-                !branchName.chars().allMatch(Character::isDigit),
-                "Branch name cannot be pure numeric string but is '%s'.",
-                branchName);
-    }
-
     private void copySchemasToBranch(String branchName, long schemaId) throws 
IOException {
         for (int i = 0; i <= schemaId; i++) {
             if (schemaManager.schemaExists(i)) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogMockTest.java
similarity index 76%
rename from 
paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogMockTest.java
index e9370760f8..65770bdecb 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/MockRESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogMockTest.java
@@ -42,6 +42,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
@@ -50,7 +51,7 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 /** Test REST Catalog on Mocked REST server. */
-class MockRESTCatalogTest extends RESTCatalogTestBase {
+class RESTCatalogMockTest extends RESTCatalogTestBase {
 
     private RESTCatalogServer restCatalogServer;
     private final String initToken = "init_token";
@@ -58,32 +59,21 @@ class MockRESTCatalogTest extends RESTCatalogTestBase {
     private final String serverDefineHeaderValue = "test-value";
     private String dataPath;
     private AuthProvider authProvider;
+    private Map<String, String> authMap;
 
     @BeforeEach
     @Override
     public void setUp() throws Exception {
         super.setUp();
         dataPath = warehouse;
-        String restWarehouse = UUID.randomUUID().toString();
-        this.config =
-                new ConfigResponse(
-                        ImmutableMap.of(
-                                RESTCatalogInternalOptions.PREFIX.key(),
-                                "paimon",
-                                "header." + serverDefineHeaderName,
-                                serverDefineHeaderValue,
-                                CatalogOptions.WAREHOUSE.key(),
-                                restWarehouse),
-                        ImmutableMap.of());
         this.authProvider = new BearTokenAuthProvider(initToken);
-        restCatalogServer =
-                new RESTCatalogServer(dataPath, authProvider, this.config, 
restWarehouse);
-        restCatalogServer.start();
-        options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
-        options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
-        options.set(RESTCatalogOptions.TOKEN, initToken);
-        options.set(RESTCatalogOptions.TOKEN_PROVIDER, 
AuthProviderEnum.BEAR.identifier());
-        this.restCatalog = new RESTCatalog(CatalogContext.create(options));
+        this.authMap =
+                ImmutableMap.of(
+                        RESTCatalogOptions.TOKEN.key(),
+                        initToken,
+                        RESTCatalogOptions.TOKEN_PROVIDER.key(),
+                        AuthProviderEnum.BEAR.identifier());
+        this.restCatalog = initCatalog(false);
         this.catalog = restCatalog;
     }
 
@@ -105,30 +95,24 @@ class MockRESTCatalogTest extends RESTCatalogTestBase {
 
     @Test
     void testDlfStSTokenAuth() throws Exception {
-        String restWarehouse = UUID.randomUUID().toString();
         String akId = "akId" + UUID.randomUUID();
         String akSecret = "akSecret" + UUID.randomUUID();
         String securityToken = "securityToken" + UUID.randomUUID();
         String region = "cn-hangzhou";
-        DLFAuthProvider authProvider =
-                DLFAuthProvider.buildAKToken(akId, akSecret, securityToken, 
region);
-        restCatalogServer =
-                new RESTCatalogServer(dataPath, authProvider, this.config, 
restWarehouse);
-        restCatalogServer.start();
-        options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
-        options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
-        options.set(RESTCatalogOptions.TOKEN_PROVIDER, 
AuthProviderEnum.DLF.identifier());
-        options.set(RESTCatalogOptions.DLF_REGION, region);
-        options.set(RESTCatalogOptions.DLF_ACCESS_KEY_ID, akId);
-        options.set(RESTCatalogOptions.DLF_ACCESS_KEY_SECRET, akSecret);
-        options.set(RESTCatalogOptions.DLF_SECURITY_TOKEN, securityToken);
-        RESTCatalog restCatalog = new 
RESTCatalog(CatalogContext.create(options));
+        this.authProvider = DLFAuthProvider.buildAKToken(akId, akSecret, 
securityToken, region);
+        this.authMap =
+                ImmutableMap.of(
+                        RESTCatalogOptions.TOKEN_PROVIDER.key(), 
AuthProviderEnum.DLF.identifier(),
+                        RESTCatalogOptions.DLF_REGION.key(), region,
+                        RESTCatalogOptions.DLF_ACCESS_KEY_ID.key(), akId,
+                        RESTCatalogOptions.DLF_ACCESS_KEY_SECRET.key(), 
akSecret,
+                        RESTCatalogOptions.DLF_SECURITY_TOKEN.key(), 
securityToken);
+        RESTCatalog restCatalog = initCatalog(false);
         testDlfAuth(restCatalog);
     }
 
     @Test
     void testDlfStSTokenPathAuth() throws Exception {
-        String restWarehouse = UUID.randomUUID().toString();
         String region = "cn-hangzhou";
         String tokenPath = dataPath + UUID.randomUUID();
         generateTokenAndWriteToFile(tokenPath);
@@ -138,17 +122,13 @@ class MockRESTCatalogTest extends RESTCatalogTestBase {
                         new Options(
                                 ImmutableMap.of(
                                         
RESTCatalogOptions.DLF_TOKEN_PATH.key(), tokenPath)));
-        DLFAuthProvider authProvider =
-                DLFAuthProvider.buildRefreshToken(tokenLoader, 1000_000L, 
region);
-        restCatalogServer =
-                new RESTCatalogServer(dataPath, authProvider, this.config, 
restWarehouse);
-        restCatalogServer.start();
-        options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
-        options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
-        options.set(RESTCatalogOptions.TOKEN_PROVIDER, 
AuthProviderEnum.DLF.identifier());
-        options.set(RESTCatalogOptions.DLF_REGION, region);
-        options.set(RESTCatalogOptions.DLF_TOKEN_PATH, tokenPath);
-        RESTCatalog restCatalog = new 
RESTCatalog(CatalogContext.create(options));
+        this.authProvider = DLFAuthProvider.buildRefreshToken(tokenLoader, 
1000_000L, region);
+        this.authMap =
+                ImmutableMap.of(
+                        RESTCatalogOptions.TOKEN_PROVIDER.key(), 
AuthProviderEnum.DLF.identifier(),
+                        RESTCatalogOptions.DLF_REGION.key(), region,
+                        RESTCatalogOptions.DLF_TOKEN_PATH.key(), tokenPath);
+        RESTCatalog restCatalog = initCatalog(false);
         testDlfAuth(restCatalog);
         File file = new File(tokenPath);
         file.delete();
@@ -184,12 +164,8 @@ class MockRESTCatalogTest extends RESTCatalogTestBase {
     }
 
     @Override
-    protected Catalog newRestCatalogWithDataToken() {
-        options.set(RESTTokenFileIO.DATA_TOKEN_ENABLED, true);
-        options.set(
-                RESTTestFileIO.DATA_PATH_CONF_KEY,
-                dataPath.replaceFirst("file", RESTFileIOTestLoader.SCHEME));
-        return new RESTCatalog(CatalogContext.create(options));
+    protected Catalog newRestCatalogWithDataToken() throws IOException {
+        return initCatalog(true);
     }
 
     @Override
@@ -234,4 +210,34 @@ class MockRESTCatalogTest extends RESTCatalogTestBase {
                 fileCount,
                 lastFileCreationTime);
     }
+
+    private RESTCatalog initCatalog(boolean enableDataToken) throws 
IOException {
+        String restWarehouse = UUID.randomUUID().toString();
+        this.config =
+                new ConfigResponse(
+                        ImmutableMap.of(
+                                RESTCatalogInternalOptions.PREFIX.key(),
+                                "paimon",
+                                "header." + serverDefineHeaderName,
+                                serverDefineHeaderValue,
+                                RESTTokenFileIO.DATA_TOKEN_ENABLED.key(),
+                                enableDataToken + "",
+                                CatalogOptions.WAREHOUSE.key(),
+                                restWarehouse),
+                        ImmutableMap.of());
+        restCatalogServer =
+                new RESTCatalogServer(dataPath, this.authProvider, 
this.config, restWarehouse);
+        restCatalogServer.start();
+        for (Map.Entry<String, String> entry : this.authMap.entrySet()) {
+            options.set(entry.getKey(), entry.getValue());
+        }
+        options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
+        options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
+        String path =
+                enableDataToken
+                        ? dataPath.replaceFirst("file", 
RESTFileIOTestLoader.SCHEME)
+                        : dataPath;
+        options.set(RESTTestFileIO.DATA_PATH_CONF_KEY, path);
+        return new RESTCatalog(CatalogContext.create(options));
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 373d830019..b755c7772c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -106,6 +106,7 @@ import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.CoreOptions.SNAPSHOT_CLEAN_EMPTY_DIRECTORIES;
 import static org.apache.paimon.CoreOptions.TYPE;
 import static org.apache.paimon.TableType.FORMAT_TABLE;
 import static org.apache.paimon.rest.RESTObjectMapper.OBJECT_MAPPER;
@@ -603,7 +604,16 @@ public class RESTCatalogServer {
         FileStoreTable table = getFileTable(identifier);
         String identifierWithSnapshotId = 
geTableFullNameWithSnapshotId(identifier, snapshotId);
         if 
(tableWithSnapshotId2SnapshotStore.containsKey(identifierWithSnapshotId)) {
-            rollbackTo(identifier, table, snapshotId);
+            table =
+                    table.copy(
+                            Collections.singletonMap(
+                                    SNAPSHOT_CLEAN_EMPTY_DIRECTORIES.key(), 
"true"));
+            long latestSnapshotId = table.snapshotManager().latestSnapshotId();
+            table.rollbackTo(snapshotId);
+            cleanSnapshot(identifier, snapshotId, latestSnapshotId);
+            tableLatestSnapshotStore.put(
+                    identifier.getFullName(),
+                    
tableWithSnapshotId2SnapshotStore.get(identifierWithSnapshotId));
             return new MockResponse().setResponseCode(200);
         }
         return mockResponse(
@@ -616,12 +626,20 @@ public class RESTCatalogServer {
         FileStoreTable table = getFileTable(identifier);
         boolean isExist = table.tagManager().tagExists(tagName);
         if (isExist) {
-            table.tagManager().get(tagName);
             Snapshot snapshot = 
table.tagManager().getOrThrow(tagName).trimToSnapshot();
             String identifierWithSnapshotId =
                     geTableFullNameWithSnapshotId(identifier, snapshot.id());
             if 
(tableWithSnapshotId2SnapshotStore.containsKey(identifierWithSnapshotId)) {
-                rollbackTo(identifier, table, snapshot.id());
+                table =
+                        table.copy(
+                                Collections.singletonMap(
+                                        
SNAPSHOT_CLEAN_EMPTY_DIRECTORIES.key(), "true"));
+                long latestSnapshotId = 
table.snapshotManager().latestSnapshotId();
+                table.rollbackTo(tagName);
+                cleanSnapshot(identifier, snapshot.id(), latestSnapshotId);
+                tableLatestSnapshotStore.put(
+                        identifier.getFullName(),
+                        
tableWithSnapshotId2SnapshotStore.get(identifierWithSnapshotId));
                 return new MockResponse().setResponseCode(200);
             }
         }
@@ -629,19 +647,14 @@ public class RESTCatalogServer {
                 new ErrorResponse(ErrorResponseResourceType.TAG, "" + tagName, 
"", 404), 404);
     }
 
-    private void rollbackTo(Identifier identifier, FileStoreTable table, Long 
snapshotId) {
-        String identifierWithSnapshotId = 
geTableFullNameWithSnapshotId(identifier, snapshotId);
-        long latestSnapshotId = table.latestSnapshot().get().id();
+    private void cleanSnapshot(Identifier identifier, Long snapshotId, Long 
latestSnapshotId)
+            throws IOException {
         if (latestSnapshotId > snapshotId) {
             for (long i = snapshotId + 1; i < latestSnapshotId + 1; i++) {
                 tableWithSnapshotId2SnapshotStore.remove(
                         geTableFullNameWithSnapshotId(identifier, i));
             }
         }
-        table.rollbackTo(snapshotId);
-        tableLatestSnapshotStore.put(
-                identifier.getFullName(),
-                
tableWithSnapshotId2SnapshotStore.get(identifierWithSnapshotId));
     }
 
     private MockResponse databasesApiHandler(
@@ -1060,6 +1073,14 @@ public class RESTCatalogServer {
                     if (resources.length == 6) {
                         branch = RESTUtil.decodeString(resources[4]);
                         branchManager.fastForward(branch);
+                        branchIdentifier =
+                                new Identifier(
+                                        identifier.getDatabaseName(),
+                                        identifier.getTableName(),
+                                        branch);
+                        tableLatestSnapshotStore.put(
+                                identifier.getFullName(),
+                                
tableLatestSnapshotStore.get(branchIdentifier.getFullName()));
                     } else {
                         CreateBranchRequest requestBody =
                                 OBJECT_MAPPER.readValue(data, 
CreateBranchRequest.class);
@@ -1075,6 +1096,9 @@ public class RESTCatalogServer {
                                         identifier.getDatabaseName(),
                                         identifier.getTableName(),
                                         requestBody.branch());
+                        tableLatestSnapshotStore.put(
+                                branchIdentifier.getFullName(),
+                                
tableLatestSnapshotStore.get(identifier.getFullName()));
                         tableMetadataStore.put(
                                 branchIdentifier.getFullName(),
                                 
tableMetadataStore.get(identifier.getFullName()));
@@ -1515,8 +1539,15 @@ public class RESTCatalogServer {
             Identifier identifier, long schemaId, Schema schema, String uuid, 
boolean isExternal) {
         Map<String, String> options = new HashMap<>(schema.options());
         Path path = catalog.getTableLocation(identifier);
-        String restPath =
-                path.toString().replaceFirst(LocalFileIOLoader.SCHEME, 
RESTFileIOTestLoader.SCHEME);
+        String restPath = path.toString();
+        if (this.configResponse
+                .getDefaults()
+                .getOrDefault(RESTTokenFileIO.DATA_TOKEN_ENABLED.key(), 
"false")
+                .equals("true")) {
+            restPath =
+                    path.toString()
+                            .replaceFirst(LocalFileIOLoader.SCHEME, 
RESTFileIOTestLoader.SCHEME);
+        }
         options.put(PATH.key(), restPath);
         TableSchema tableSchema =
                 new TableSchema(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTestBase.java
index 11beccb2a8..3d507a680b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTestBase.java
@@ -960,6 +960,31 @@ public abstract class RESTCatalogTestBase extends 
CatalogTestBase {
         assertThat(result).containsExactlyInAnyOrder("+I[5]", "+I[12]", 
"+I[18]");
     }
 
+    @Test
+    public void testBranchBatchRecordsWrite() throws Exception {
+        Identifier tableIdentifier = Identifier.create("my_db", "my_table");
+
+        Identifier tableBranchIdentifier =
+                new Identifier(
+                        tableIdentifier.getDatabaseName(),
+                        tableIdentifier.getTableName(),
+                        "branch1");
+        createTable(tableIdentifier, Maps.newHashMap(), 
Lists.newArrayList("col1"));
+        FileStoreTable tableTestWrite = (FileStoreTable) 
catalog.getTable(tableIdentifier);
+        // write
+        batchWrite(tableTestWrite, Lists.newArrayList(12, 5, 18));
+        restCatalog.createBranch(tableIdentifier, 
tableBranchIdentifier.getBranchName(), null);
+        FileStoreTable branchTableTestWrite =
+                (FileStoreTable) catalog.getTable(tableBranchIdentifier);
+        batchWrite(branchTableTestWrite, Lists.newArrayList(1, 9, 2));
+        // read
+        List<String> result = batchRead(tableTestWrite);
+        List<String> branchResult = batchRead(branchTableTestWrite);
+        assertThat(result).containsExactlyInAnyOrder("+I[5]", "+I[12]", 
"+I[18]");
+        assertThat(branchResult)
+                .containsExactlyInAnyOrder("+I[5]", "+I[12]", "+I[18]", 
"+I[2]", "+I[1]", "+I[9]");
+    }
+
     @Test
     void testBranches() throws Exception {
         String databaseName = "testBranchTable";
@@ -1099,7 +1124,7 @@ public abstract class RESTCatalogTestBase extends 
CatalogTestBase {
                 true);
     }
 
-    protected abstract Catalog newRestCatalogWithDataToken();
+    protected abstract Catalog newRestCatalogWithDataToken() throws 
IOException;
 
     protected abstract void revokeTablePermission(Identifier identifier);
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableInRESTCatalogMockTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableInRESTCatalogMockTest.java
new file mode 100644
index 0000000000..2e603af51d
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableInRESTCatalogMockTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.options.CatalogOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTCatalog;
+import org.apache.paimon.rest.RESTCatalogInternalOptions;
+import org.apache.paimon.rest.RESTCatalogOptions;
+import org.apache.paimon.rest.RESTCatalogServer;
+import org.apache.paimon.rest.auth.AuthProvider;
+import org.apache.paimon.rest.auth.AuthProviderEnum;
+import org.apache.paimon.rest.auth.BearTokenAuthProvider;
+import org.apache.paimon.rest.responses.ConfigResponse;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
+
+import java.io.IOException;
+import java.util.UUID;
+
+/** test table in rest catalog. */
+public class SimpleTableInRESTCatalogMockTest extends 
SimpleTableInRESTCatalogTestBase {
+
+    @Override
+    protected RESTCatalog createRESTCatalog() throws IOException {
+        String restWarehouse = UUID.randomUUID().toString();
+        String initToken = UUID.randomUUID().toString();
+        ConfigResponse config =
+                new ConfigResponse(
+                        ImmutableMap.of(
+                                RESTCatalogInternalOptions.PREFIX.key(),
+                                "paimon",
+                                CatalogOptions.WAREHOUSE.key(),
+                                restWarehouse),
+                        ImmutableMap.of());
+        AuthProvider authProvider = new BearTokenAuthProvider(initToken);
+        RESTCatalogServer restCatalogServer =
+                new RESTCatalogServer(dataPath, authProvider, config, 
restWarehouse);
+        restCatalogServer.start();
+        Options options = new Options();
+        options.set(CatalogOptions.WAREHOUSE.key(), restWarehouse);
+        options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
+        options.set(RESTCatalogOptions.TOKEN, initToken);
+        options.set(RESTCatalogOptions.TOKEN_PROVIDER, 
AuthProviderEnum.BEAR.identifier());
+        return new RESTCatalog(CatalogContext.create(options));
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableInRESTCatalogTestBase.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableInRESTCatalogTestBase.java
new file mode 100644
index 0000000000..c550ffe2af
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableInRESTCatalogTestBase.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.rest.RESTCatalog;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.BUCKET_KEY;
+
+/** base test table in rest catalog. */
+public abstract class SimpleTableInRESTCatalogTestBase extends 
SimpleTableTestBase {
+    protected RESTCatalog restCatalog;
+    protected String dataPath;
+
+    @BeforeEach
+    public void before() throws Exception {
+        super.before();
+        dataPath = tablePath.toString();
+        restCatalog = createRESTCatalog();
+        restCatalog.createDatabase(identifier.getDatabaseName(), true);
+    }
+
+    protected abstract RESTCatalog createRESTCatalog() throws IOException;
+
+    @BeforeEach
+    public void after() throws Exception {
+        super.after();
+        restCatalog.dropTable(identifier, true);
+    }
+
+    @Override
+    protected FileStoreTable createFileStoreTable(Consumer<Options> configure, 
RowType rowType)
+            throws Exception {
+        Options conf = new Options();
+        configure.accept(conf);
+        if (!conf.contains(BUCKET_KEY) && conf.get(BUCKET) != -1) {
+            conf.set(BUCKET_KEY, "a");
+        }
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), tablePath),
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.singletonList("pt"),
+                                Collections.emptyList(),
+                                conf.toMap(),
+                                ""));
+        if (restCatalog
+                .listTables(identifier.getDatabaseName())
+                .contains(identifier.getTableName())) {
+            List<SchemaChange> schemaChangeList = new ArrayList<>();
+            for (Map.Entry<String, String> entry : conf.toMap().entrySet()) {
+                schemaChangeList.add(SchemaChange.setOption(entry.getKey(), 
entry.getValue()));
+            }
+            restCatalog.alterTable(identifier, schemaChangeList, true);
+        } else {
+            restCatalog.createTable(identifier, tableSchema.toSchema(), false);
+        }
+        return (FileStoreTable) restCatalog.getTable(identifier);
+    }
+
+    @Override
+    protected FileStoreTable createBranchTable(String branch) throws Exception 
{
+        if (!restCatalog.listBranches(identifier).contains(branch)) {
+            restCatalog.createBranch(identifier, branch, null);
+        }
+        return (FileStoreTable)
+                restCatalog.getTable(
+                        new Identifier(
+                                identifier.getDatabaseName(), 
identifier.getTableName(), branch));
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
index cbf9e106ae..3ffe71a496 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SimpleTableTestBase.java
@@ -22,6 +22,7 @@ import org.apache.paimon.CoreOptions;
 import org.apache.paimon.FileStore;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.TestFileStore;
+import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryRowWriter;
 import org.apache.paimon.data.BinaryString;
@@ -80,7 +81,6 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.File;
-import java.io.IOException;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
@@ -180,16 +180,18 @@ public abstract class SimpleTableTestBase {
     @TempDir java.nio.file.Path tempDir;
 
     protected Path tablePath;
+    protected Identifier identifier;
     protected String commitUser;
 
     @BeforeEach
-    public void before() {
-        tablePath = new Path(TraceableFileIO.SCHEME + "://" + 
tempDir.toString());
+    public void before() throws Exception {
+        identifier = Identifier.create("default", "table_test");
+        tablePath = new Path(String.format("%s://%s", TraceableFileIO.SCHEME, 
tempDir.toString()));
         commitUser = UUID.randomUUID().toString();
     }
 
     @AfterEach
-    public void after() throws IOException {
+    public void after() throws Exception {
         // assert all connections are closed
         Predicate<Path> pathPredicate = path -> 
path.toString().contains(tempDir.toString());
         assertThat(TraceableFileIO.openInputStreams(pathPredicate)).isEmpty();
@@ -442,7 +444,8 @@ public abstract class SimpleTableTestBase {
         commit.abort(messages);
 
         FileStatus[] files =
-                LocalFileIO.create().listStatus(new Path(tablePath + 
"/pt=1/bucket-0"));
+                LocalFileIO.create()
+                        .listStatus(new Path(table.location().toString() + 
"/pt=1/bucket-0"));
         assertThat(files).isEmpty();
         write.close();
         commit.close();
@@ -586,7 +589,6 @@ public abstract class SimpleTableTestBase {
                             conf.set(SNAPSHOT_NUM_RETAINED_MIN, 3);
                             conf.set(SNAPSHOT_NUM_RETAINED_MAX, 3);
                         });
-
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
         for (int i = 0; i < 10; i++) {
@@ -605,7 +607,7 @@ public abstract class SimpleTableTestBase {
         }
 
         SnapshotManager snapshotManager =
-                newSnapshotManager(FileIOFinder.find(tablePath), 
table.location());
+                newSnapshotManager(FileIOFinder.find(table.location()), 
table.location());
         Long latestSnapshotId = snapshotManager.latestSnapshotId();
         assertThat(latestSnapshotId).isNotNull();
         for (int i = 1; i <= latestSnapshotId; i++) {
@@ -794,7 +796,7 @@ public abstract class SimpleTableTestBase {
                 
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
 
         List<java.nio.file.Path> files =
-                Files.walk(new File(tablePath.toUri().getPath()).toPath())
+                Files.walk(new 
File(table.location().toUri().getPath()).toPath())
                         .collect(Collectors.toList());
         assertThat(files.size()).isEqualTo(14);
     }
@@ -820,7 +822,7 @@ public abstract class SimpleTableTestBase {
                 
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
 
         List<java.nio.file.Path> files =
-                Files.walk(new File(tablePath.toUri().getPath()).toPath())
+                Files.walk(new 
File(table.location().toUri().getPath()).toPath())
                         .collect(Collectors.toList());
         assertThat(files.size()).isEqualTo(15);
         // table-path
@@ -872,7 +874,7 @@ public abstract class SimpleTableTestBase {
                 
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
 
         List<java.nio.file.Path> files =
-                Files.walk(new File(tablePath.toUri().getPath()).toPath())
+                Files.walk(new 
File(table.location().toUri().getPath()).toPath())
                         .collect(Collectors.toList());
         assertThat(files.size()).isEqualTo(16);
         // case 0 plus 1:
@@ -913,7 +915,7 @@ public abstract class SimpleTableTestBase {
                 
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
 
         List<java.nio.file.Path> files =
-                Files.walk(new File(tablePath.toUri().getPath()).toPath())
+                Files.walk(new 
File(table.location().toUri().getPath()).toPath())
                         .collect(Collectors.toList());
         assertThat(files.size()).isEqualTo(23);
         // case 0 plus 7:
@@ -969,7 +971,7 @@ public abstract class SimpleTableTestBase {
                 
.containsExactlyInAnyOrder("0|0|0|binary|varbinary|mapKey:mapVal|multiset");
 
         List<java.nio.file.Path> files =
-                Files.walk(new File(tablePath.toUri().getPath()).toPath())
+                Files.walk(new 
File(table.location().toUri().getPath()).toPath())
                         .collect(Collectors.toList());
         assertThat(files.size()).isEqualTo(16);
         // rollback snapshot case 0 plus 1:
@@ -1016,7 +1018,7 @@ public abstract class SimpleTableTestBase {
         table.createTag("test-tag", 2);
 
         // verify that tag file exist
-        TagManager tagManager = new TagManager(new TraceableFileIO(), 
tablePath);
+        TagManager tagManager = new TagManager(new TraceableFileIO(), 
table.location());
         assertThat(tagManager.tagExists("test-tag")).isTrue();
 
         // verify that test-tag is equal to snapshot 2
@@ -1040,7 +1042,7 @@ public abstract class SimpleTableTestBase {
             commit.commit(0, write.prepareCommit(false, 1));
             table.createTag("test-tag", 1);
             // verify that tag file exist
-            TagManager tagManager = new TagManager(new TraceableFileIO(), 
tablePath);
+            TagManager tagManager = new TagManager(new TraceableFileIO(), 
table.location());
             assertThat(tagManager.tagExists("test-tag")).isTrue();
             // verify that test-tag is equal to snapshot 1
             Snapshot tagged = 
tagManager.getOrThrow("test-tag").trimToSnapshot();
@@ -1049,7 +1051,8 @@ public abstract class SimpleTableTestBase {
             // snapshot 2
             write.write(rowData(2, 20, 200L));
             commit.commit(1, write.prepareCommit(false, 2));
-            SnapshotManager snapshotManager = newSnapshotManager(new 
TraceableFileIO(), tablePath);
+            SnapshotManager snapshotManager =
+                    newSnapshotManager(new TraceableFileIO(), 
table.location());
             // The snapshot 1 is expired.
             assertThat(snapshotManager.snapshotExists(1)).isFalse();
             table.createTag("test-tag-2", 1);
@@ -1072,7 +1075,7 @@ public abstract class SimpleTableTestBase {
             // snapshot 2
             write.write(rowData(1, 10, 100L));
             commit.commit(1, write.prepareCommit(false, 2));
-            TagManager tagManager = new TagManager(new TraceableFileIO(), 
tablePath);
+            TagManager tagManager = new TagManager(new TraceableFileIO(), 
table.location());
             table.createTag("test-tag", 1);
             // verify that tag file exist
             assertThat(tagManager.tagExists("test-tag")).isTrue();
@@ -1087,7 +1090,6 @@ public abstract class SimpleTableTestBase {
     @Test
     public void testCreateBranch() throws Exception {
         FileStoreTable table = createFileStoreTable();
-
         try (StreamTableWrite write = table.newWrite(commitUser);
                 StreamTableCommit commit = table.newCommit(commitUser)) {
             // snapshot 1
@@ -1101,7 +1103,7 @@ public abstract class SimpleTableTestBase {
         table.createTag("test-tag", 2);
 
         // verify that tag file exist
-        TagManager tagManager = new TagManager(new TraceableFileIO(), 
tablePath);
+        TagManager tagManager = new TagManager(new TraceableFileIO(), 
table.location());
         assertThat(tagManager.tagExists("test-tag")).isTrue();
 
         // verify that test-tag is equal to snapshot 2
@@ -1124,14 +1126,14 @@ public abstract class SimpleTableTestBase {
 
         // verify snapshot in test-branch is equal to snapshot 2
         SnapshotManager snapshotManager =
-                newSnapshotManager(new TraceableFileIO(), tablePath, 
"test-branch");
+                newSnapshotManager(new TraceableFileIO(), table.location(), 
"test-branch");
         Snapshot branchSnapshot =
                 Snapshot.fromPath(new TraceableFileIO(), 
snapshotManager.snapshotPath(2));
         assertThat(branchSnapshot.equals(snapshot2)).isTrue();
 
         // verify schema in test-branch is equal to schema 0
         SchemaManager schemaManager =
-                new SchemaManager(new TraceableFileIO(), tablePath, 
"test-branch");
+                new SchemaManager(new TraceableFileIO(), table.location(), 
"test-branch");
         TableSchema branchSchema =
                 TableSchema.fromPath(new TraceableFileIO(), 
schemaManager.toSchemaPath(0));
         TableSchema schema0 = schemaManager.schema(0);
@@ -1288,7 +1290,8 @@ public abstract class SimpleTableTestBase {
                         "2|20|200|binary|varbinary|mapKey:mapVal|multiset");
 
         // verify snapshot in branch1 and main branch is same
-        SnapshotManager snapshotManager = newSnapshotManager(new 
TraceableFileIO(), tablePath);
+        SnapshotManager snapshotManager =
+                newSnapshotManager(new TraceableFileIO(), table.location());
         Snapshot branchSnapshot =
                 Snapshot.fromPath(
                         new TraceableFileIO(),
@@ -1298,7 +1301,7 @@ public abstract class SimpleTableTestBase {
         assertThat(branchSnapshot.equals(snapshot)).isTrue();
 
         // verify schema in branch1 and main branch is same
-        SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), 
tablePath);
+        SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), 
table.location());
         TableSchema branchSchema =
                 TableSchema.fromPath(
                         new TraceableFileIO(),
@@ -1535,19 +1538,6 @@ public abstract class SimpleTableTestBase {
         commit.close();
     }
 
-    @Test
-    public void testSchemaPathOption() throws Exception {
-        String fakePath = "fake path";
-        FileStoreTable table = createFileStoreTable(conf -> 
conf.set(CoreOptions.PATH, fakePath));
-        String originSchemaPath = 
table.schema().options().get(CoreOptions.PATH.key());
-        assertThat(originSchemaPath).isEqualTo(fakePath);
-        // reset PATH of schema option to table location
-        table = table.copy(Collections.emptyMap());
-        String schemaPath = 
table.schema().options().get(CoreOptions.PATH.key());
-        String tablePath = table.location().toString();
-        assertThat(schemaPath).isEqualTo(tablePath);
-    }
-
     @Test
     public void testBranchWriteAndRead() throws Exception {
         FileStoreTable table = createFileStoreTable();
diff --git a/paimon-open-api/rest-catalog-open-api.yaml 
b/paimon-open-api/rest-catalog-open-api.yaml
index c30f09f8ac..211be78a68 100644
--- a/paimon-open-api/rest-catalog-open-api.yaml
+++ b/paimon-open-api/rest-catalog-open-api.yaml
@@ -1519,13 +1519,13 @@ components:
       properties:
         message:
           type: string
+        resourceType:
+          type: string
+        resourceName:
+          type: string
         code:
           type: integer
           format: int32
-        stack:
-          type: array
-          items:
-            type: string
     CreateTableRequest:
       type: object
       properties:
@@ -1606,9 +1606,7 @@ components:
       properties:
         database:
           type: string
-        table:
-          type: string
-        branch:
+        object:
           type: string
     Schema:
       type: object


Reply via email to