This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 3eb3fd8458 [rest] Abstract paged methods in RESTCatalog and add more 
tests (#5208)
3eb3fd8458 is described below

commit 3eb3fd845859c9262af4548febcedc020686c50b
Author: jerry <[email protected]>
AuthorDate: Wed Mar 5 20:32:07 2025 +0800

    [rest] Abstract paged methods in RESTCatalog and add more tests (#5208)
---
 .../java/org/apache/paimon/catalog/Catalog.java    |   8 +
 .../java/org/apache/paimon/rest/RESTCatalog.java   | 207 ++++++------
 .../paimon/rest/auth/DLFAuthProviderFactory.java   |   2 +-
 .../rest/responses/ListPartitionsResponse.java     |   8 +-
 .../rest/responses/ListTableDetailsResponse.java   |   9 +-
 .../paimon/rest/responses/ListTablesResponse.java  |   9 +-
 .../rest/responses/ListViewDetailsResponse.java    |   9 +-
 .../paimon/rest/responses/ListViewsResponse.java   |   9 +-
 .../paimon/rest/responses/PagedResponse.java       |  30 ++
 .../org/apache/paimon/catalog/CatalogTestBase.java |  16 +-
 .../org/apache/paimon/rest/RESTCatalogServer.java  | 261 ++++++++++-----
 .../org/apache/paimon/rest/RESTCatalogTest.java    | 354 ++++++++++++++++-----
 .../org/apache/paimon/rest/TestPagedResponse.java  |  45 +++
 .../rest/auth/DLFAuthProviderFactoryTest.java      |   2 +-
 14 files changed, 675 insertions(+), 294 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index 5a5beb7117..54f16bae60 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -596,6 +596,10 @@ public interface Catalog extends AutoCloseable {
             this.database = database;
         }
 
+        public DatabaseNoPermissionException(String database) {
+            this(database, null);
+        }
+
         public String database() {
             return database;
         }
@@ -655,6 +659,10 @@ public interface Catalog extends AutoCloseable {
             this.identifier = identifier;
         }
 
+        public TableNoPermissionException(Identifier identifier) {
+            this(identifier, null);
+        }
+
         public Identifier identifier() {
             return identifier;
         }
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 2016427b89..c0e173c38d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -72,6 +72,7 @@ import 
org.apache.paimon.rest.responses.ListTableDetailsResponse;
 import org.apache.paimon.rest.responses.ListTablesResponse;
 import org.apache.paimon.rest.responses.ListViewDetailsResponse;
 import org.apache.paimon.rest.responses.ListViewsResponse;
+import org.apache.paimon.rest.responses.PagedResponse;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.TableSchema;
@@ -101,6 +102,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
@@ -269,31 +271,14 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
     @Override
     public List<String> listTables(String databaseName) throws 
DatabaseNotExistException {
         try {
-            String pageToken = null;
-            Map<String, String> queryParams = Maps.newHashMap();
-            List<String> tables = new ArrayList<>();
-            do {
-                if (pageToken != null) {
-                    queryParams.put(PAGE_TOKEN, pageToken);
-                }
-                ListTablesResponse response =
-                        client.get(
+            return listDataFromPageApi(
+                    queryParams -> {
+                        return client.get(
                                 resourcePaths.tables(databaseName),
                                 queryParams,
                                 ListTablesResponse.class,
                                 restAuthFunction);
-                if (Objects.nonNull(response)) {
-                    pageToken = response.getNextPageToken();
-                    tables.addAll(response.getTables());
-                } else {
-                    LOG.warn(
-                            "response of listTables for {} is null with params 
{}",
-                            databaseName,
-                            queryParams);
-                    break;
-                }
-            } while (StringUtils.isNotEmpty(pageToken));
-            return tables;
+                    });
         } catch (NoSuchResourceException e) {
             throw new DatabaseNotExistException(databaseName);
         }
@@ -311,7 +296,7 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
                             queryParams,
                             ListTablesResponse.class,
                             restAuthFunction);
-            if (Objects.nonNull(response) && 
Objects.nonNull(response.getTables())) {
+            if (Objects.nonNull(response.getTables())) {
                 return new PagedList<>(response.getTables(), 
response.getNextPageToken());
             } else {
                 LOG.warn(
@@ -338,7 +323,7 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
                             queryParams,
                             ListTableDetailsResponse.class,
                             restAuthFunction);
-            if (Objects.nonNull(response) && 
Objects.nonNull(response.getTableDetails())) {
+            if (Objects.nonNull(response.getTableDetails())) {
                 return new PagedList<>(
                         response.getTableDetails().stream()
                                 .map(
@@ -383,38 +368,6 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
                 null);
     }
 
-    private FileIO fileIOForData(Path path, Identifier identifier) {
-        return dataTokenEnabled
-                ? new RESTTokenFileIO(catalogLoader(), this, identifier, path)
-                : fileIOFromOptions(path);
-    }
-
-    private FileIO fileIOFromOptions(Path path) {
-        try {
-            return FileIO.get(path, context);
-        } catch (IOException e) {
-            throw new UncheckedIOException(e);
-        }
-    }
-
-    protected GetTableTokenResponse loadTableToken(Identifier identifier)
-            throws TableNotExistException {
-        GetTableTokenResponse response;
-        try {
-            response =
-                    client.get(
-                            resourcePaths.tableToken(
-                                    identifier.getDatabaseName(), 
identifier.getObjectName()),
-                            GetTableTokenResponse.class,
-                            restAuthFunction);
-        } catch (NoSuchResourceException e) {
-            throw new TableNotExistException(identifier);
-        } catch (ForbiddenException e) {
-            throw new TableNoPermissionException(identifier, e);
-        }
-        return response;
-    }
-
     @Override
     public Optional<Snapshot> loadSnapshot(Identifier identifier) throws 
TableNotExistException {
         GetTableSnapshotResponse response;
@@ -466,7 +419,7 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
             response =
                     client.get(
                             resourcePaths.table(
-                                    identifier.getDatabaseName(), 
identifier.getTableName()),
+                                    identifier.getDatabaseName(), 
identifier.getObjectName()),
                             GetTableResponse.class,
                             restAuthFunction);
         } catch (NoSuchResourceException e) {
@@ -537,7 +490,7 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
         try {
             AlterTableRequest request = new AlterTableRequest(changes);
             client.post(
-                    resourcePaths.table(identifier.getDatabaseName(), 
identifier.getTableName()),
+                    resourcePaths.table(identifier.getDatabaseName(), 
identifier.getObjectName()),
                     request,
                     restAuthFunction);
         } catch (NoSuchResourceException e) {
@@ -568,7 +521,7 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
         checkNotSystemTable(identifier, "dropTable");
         try {
             client.delete(
-                    resourcePaths.table(identifier.getDatabaseName(), 
identifier.getTableName()),
+                    resourcePaths.table(identifier.getDatabaseName(), 
identifier.getObjectName()),
                     restAuthFunction);
         } catch (NoSuchResourceException e) {
             if (!ignoreIfNotExists) {
@@ -586,7 +539,7 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
             CreatePartitionsRequest request = new 
CreatePartitionsRequest(partitions);
             client.post(
                     resourcePaths.partitions(
-                            identifier.getDatabaseName(), 
identifier.getTableName()),
+                            identifier.getDatabaseName(), 
identifier.getObjectName()),
                     request,
                     restAuthFunction);
         } catch (NoSuchResourceException e) {
@@ -603,7 +556,7 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
             DropPartitionsRequest request = new 
DropPartitionsRequest(partitions);
             client.post(
                     resourcePaths.dropPartitions(
-                            identifier.getDatabaseName(), 
identifier.getTableName()),
+                            identifier.getDatabaseName(), 
identifier.getObjectName()),
                     request,
                     restAuthFunction);
         } catch (NoSuchResourceException e) {
@@ -626,7 +579,7 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
             AlterPartitionsRequest request = new 
AlterPartitionsRequest(partitions);
             client.post(
                     resourcePaths.alterPartitions(
-                            identifier.getDatabaseName(), 
identifier.getTableName()),
+                            identifier.getDatabaseName(), 
identifier.getObjectName()),
                     request,
                     restAuthFunction);
         } catch (NoSuchResourceException e) {
@@ -643,7 +596,7 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
             MarkDonePartitionsRequest request = new 
MarkDonePartitionsRequest(partitions);
             client.post(
                     resourcePaths.markDonePartitions(
-                            identifier.getDatabaseName(), 
identifier.getTableName()),
+                            identifier.getDatabaseName(), 
identifier.getObjectName()),
                     request,
                     restAuthFunction);
         } catch (NoSuchResourceException e) {
@@ -656,33 +609,15 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
     @Override
     public List<Partition> listPartitions(Identifier identifier) throws 
TableNotExistException {
         try {
-            String pageToken = null;
-            Map<String, String> queryParams = Maps.newHashMap();
-            List<Partition> partitions = new ArrayList<>();
-            do {
-                if (pageToken != null) {
-                    queryParams.put(PAGE_TOKEN, pageToken);
-                }
-                ListPartitionsResponse response =
-                        client.get(
+            return listDataFromPageApi(
+                    queryParams -> {
+                        return client.get(
                                 resourcePaths.partitions(
-                                        identifier.getDatabaseName(), 
identifier.getTableName()),
+                                        identifier.getDatabaseName(), 
identifier.getObjectName()),
                                 queryParams,
                                 ListPartitionsResponse.class,
                                 restAuthFunction);
-                if (Objects.nonNull(response)) {
-                    pageToken = response.getNextPageToken();
-                    partitions.addAll(response.getPartitions());
-                } else {
-                    LOG.warn(
-                            "response of listPartitions for {}.{} is null with 
params {}",
-                            identifier.getDatabaseName(),
-                            identifier.getTableName(),
-                            queryParams);
-                    break;
-                }
-            } while (StringUtils.isNotEmpty(pageToken));
-            return partitions;
+                    });
         } catch (NoSuchResourceException e) {
             throw new TableNotExistException(identifier);
         } catch (ForbiddenException e) {
@@ -699,7 +634,8 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
         try {
             CreateBranchRequest request = new CreateBranchRequest(branch, 
fromTag);
             client.post(
-                    resourcePaths.branches(identifier.getDatabaseName(), 
identifier.getTableName()),
+                    resourcePaths.branches(
+                            identifier.getDatabaseName(), 
identifier.getObjectName()),
                     request,
                     restAuthFunction);
         } catch (NoSuchResourceException e) {
@@ -722,7 +658,7 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
         try {
             client.delete(
                     resourcePaths.branch(
-                            identifier.getDatabaseName(), 
identifier.getTableName(), branch),
+                            identifier.getDatabaseName(), 
identifier.getObjectName(), branch),
                     restAuthFunction);
         } catch (NoSuchResourceException e) {
             throw new BranchNotExistException(identifier, branch, e);
@@ -737,7 +673,7 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
             ForwardBranchRequest request = new ForwardBranchRequest(branch);
             client.post(
                     resourcePaths.forwardBranch(
-                            identifier.getDatabaseName(), 
identifier.getTableName()),
+                            identifier.getDatabaseName(), 
identifier.getObjectName()),
                     request,
                     restAuthFunction);
         } catch (NoSuchResourceException e) {
@@ -753,10 +689,10 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
             ListBranchesResponse response =
                     client.get(
                             resourcePaths.branches(
-                                    identifier.getDatabaseName(), 
identifier.getTableName()),
+                                    identifier.getDatabaseName(), 
identifier.getObjectName()),
                             ListBranchesResponse.class,
                             restAuthFunction);
-            if (response == null || response.branches() == null) {
+            if (response.branches() == null) {
                 return Collections.emptyList();
             }
             return response.branches();
@@ -776,16 +712,15 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
             ListPartitionsResponse response =
                     client.get(
                             resourcePaths.partitions(
-                                    identifier.getDatabaseName(), 
identifier.getTableName()),
+                                    identifier.getDatabaseName(), 
identifier.getObjectName()),
                             queryParams,
                             ListPartitionsResponse.class,
                             restAuthFunction);
-            if (Objects.nonNull(response)) {
+            if (Objects.nonNull(response.getPartitions())) {
                 return new PagedList<>(response.getPartitions(), 
response.getNextPageToken());
             } else {
                 return new PagedList<>(Collections.emptyList(), null);
             }
-
         } catch (NoSuchResourceException e) {
             throw new TableNotExistException(identifier);
         } catch (ForbiddenException e) {
@@ -802,7 +737,7 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
             GetViewResponse response =
                     client.get(
                             resourcePaths.view(
-                                    identifier.getDatabaseName(), 
identifier.getTableName()),
+                                    identifier.getDatabaseName(), 
identifier.getObjectName()),
                             GetViewResponse.class,
                             restAuthFunction);
             ViewSchema schema = response.getSchema();
@@ -823,7 +758,7 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
             throws ViewNotExistException {
         try {
             client.delete(
-                    resourcePaths.view(identifier.getDatabaseName(), 
identifier.getTableName()),
+                    resourcePaths.view(identifier.getDatabaseName(), 
identifier.getObjectName()),
                     restAuthFunction);
         } catch (NoSuchResourceException e) {
             if (!ignoreIfNotExists) {
@@ -859,31 +794,14 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
     @Override
     public List<String> listViews(String databaseName) throws 
DatabaseNotExistException {
         try {
-            String pageToken = null;
-            Map<String, String> queryParams = Maps.newHashMap();
-            List<String> views = new ArrayList<>();
-            do {
-                if (pageToken != null) {
-                    queryParams.put(PAGE_TOKEN, pageToken);
-                }
-                ListViewsResponse response =
-                        client.get(
+            return listDataFromPageApi(
+                    queryParams -> {
+                        return client.get(
                                 resourcePaths.views(databaseName),
                                 queryParams,
                                 ListViewsResponse.class,
                                 restAuthFunction);
-                if (Objects.nonNull(response)) {
-                    pageToken = response.getNextPageToken();
-                    views.addAll(response.getViews());
-                } else {
-                    LOG.warn(
-                            "response of listViews for {} is null with params 
{}",
-                            databaseName,
-                            queryParams);
-                    break;
-                }
-            } while (StringUtils.isNotEmpty(pageToken));
-            return views;
+                    });
         } catch (NoSuchResourceException e) {
             throw new DatabaseNotExistException(databaseName);
         }
@@ -901,7 +819,7 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
                             queryParams,
                             ListViewsResponse.class,
                             restAuthFunction);
-            if (Objects.nonNull(response) && 
Objects.nonNull(response.getViews())) {
+            if (Objects.nonNull(response.getViews())) {
                 return new PagedList<>(response.getViews(), 
response.getNextPageToken());
             } else {
                 LOG.warn(
@@ -928,7 +846,7 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
                             queryParams,
                             ListViewDetailsResponse.class,
                             restAuthFunction);
-            if (Objects.nonNull(response) && 
Objects.nonNull(response.getViewDetails())) {
+            if (Objects.nonNull(response.getViewDetails())) {
                 return new PagedList<>(
                         response.getViewDetails().stream()
                                 .map(
@@ -996,6 +914,45 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
         return restAuthFunction.apply(restAuthParameter);
     }
 
+    protected GetTableTokenResponse loadTableToken(Identifier identifier)
+            throws TableNotExistException {
+        GetTableTokenResponse response;
+        try {
+            response =
+                    client.get(
+                            resourcePaths.tableToken(
+                                    identifier.getDatabaseName(), 
identifier.getObjectName()),
+                            GetTableTokenResponse.class,
+                            restAuthFunction);
+        } catch (NoSuchResourceException e) {
+            throw new TableNotExistException(identifier);
+        } catch (ForbiddenException e) {
+            throw new TableNoPermissionException(identifier, e);
+        }
+        return response;
+    }
+
+    protected <T> List<T> listDataFromPageApi(
+            Function<Map<String, String>, PagedResponse<T>> pageApi) {
+        List<T> results = new ArrayList<>();
+        Map<String, String> queryParams = Maps.newHashMap();
+        String pageToken = null;
+        do {
+            if (pageToken != null) {
+                queryParams.put(PAGE_TOKEN, pageToken);
+            }
+            PagedResponse<T> response = pageApi.apply(queryParams);
+            pageToken = response.getNextPageToken();
+            if (response.data() != null) {
+                results.addAll(response.data());
+            }
+            if (pageToken == null || response.data() == null || 
response.data().isEmpty()) {
+                break;
+            }
+        } while (StringUtils.isNotEmpty(pageToken));
+        return results;
+    }
+
     private ScheduledExecutorService tokenRefreshExecutor() {
         if (refreshExecutor == null) {
             synchronized (this) {
@@ -1008,6 +965,20 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
         return refreshExecutor;
     }
 
+    private FileIO fileIOForData(Path path, Identifier identifier) {
+        return dataTokenEnabled
+                ? new RESTTokenFileIO(catalogLoader(), this, identifier, path)
+                : fileIOFromOptions(path);
+    }
+
+    private FileIO fileIOFromOptions(Path path) {
+        try {
+            return FileIO.get(path, context);
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
     private Table loadTableByResponse(GetTableResponse getTableResponse, 
String databaseName)
             throws TableNotExistException {
         Identifier identifier = Identifier.create(databaseName, 
getTableResponse.getName());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProviderFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProviderFactory.java
index 79dd517106..d81f37bb91 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProviderFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/auth/DLFAuthProviderFactory.java
@@ -58,7 +58,7 @@ public class DLFAuthProviderFactory implements 
AuthProviderFactory {
 
     protected static String parseRegionFromUri(String uri) {
         try {
-            String regex = 
"dlf-(?:pre-)?([a-z]+-[a-z]+(?:-\\d+)?)(?:-internal)?";
+            String regex = "dlf-(?:pre-)?([a-z]+-[a-z]+(?:-\\d+)?)";
 
             Pattern pattern = Pattern.compile(regex);
             Matcher matcher = pattern.matcher(uri);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListPartitionsResponse.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListPartitionsResponse.java
index 8c91d88ca6..ddbcd55d6d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListPartitionsResponse.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListPartitionsResponse.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.rest.responses;
 
 import org.apache.paimon.partition.Partition;
-import org.apache.paimon.rest.RESTResponse;
 
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
@@ -30,7 +29,7 @@ import java.util.List;
 
 /** Response for listing partitions. */
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class ListPartitionsResponse implements RESTResponse {
+public class ListPartitionsResponse implements PagedResponse<Partition> {
 
     public static final String FIELD_PARTITIONS = "partitions";
 
@@ -63,4 +62,9 @@ public class ListPartitionsResponse implements RESTResponse {
     public String getNextPageToken() {
         return this.nextPageToken;
     }
+
+    @Override
+    public List<Partition> data() {
+        return getPartitions();
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListTableDetailsResponse.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListTableDetailsResponse.java
index 84f55e842f..4383dfbcac 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListTableDetailsResponse.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListTableDetailsResponse.java
@@ -18,8 +18,6 @@
 
 package org.apache.paimon.rest.responses;
 
-import org.apache.paimon.rest.RESTResponse;
-
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@@ -29,7 +27,7 @@ import java.util.List;
 
 /** Response for listing table details. */
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class ListTableDetailsResponse implements RESTResponse {
+public class ListTableDetailsResponse implements 
PagedResponse<GetTableResponse> {
 
     private static final String FIELD_TABLE_DETAILS = "tableDetails";
     private static final String FIELD_NEXT_PAGE_TOKEN = "nextPageToken";
@@ -62,4 +60,9 @@ public class ListTableDetailsResponse implements RESTResponse 
{
     public String getNextPageToken() {
         return this.nextPageToken;
     }
+
+    @Override
+    public List<GetTableResponse> data() {
+        return getTableDetails();
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListTablesResponse.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListTablesResponse.java
index 419ea91484..356c31610f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListTablesResponse.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListTablesResponse.java
@@ -18,8 +18,6 @@
 
 package org.apache.paimon.rest.responses;
 
-import org.apache.paimon.rest.RESTResponse;
-
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@@ -29,7 +27,7 @@ import java.util.List;
 
 /** Response for listing tables. */
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class ListTablesResponse implements RESTResponse {
+public class ListTablesResponse implements PagedResponse<String> {
 
     private static final String FIELD_TABLES = "tables";
     private static final String FIELD_NEXT_PAGE_TOKEN = "nextPageToken";
@@ -61,4 +59,9 @@ public class ListTablesResponse implements RESTResponse {
     public String getNextPageToken() {
         return this.nextPageToken;
     }
+
+    @Override
+    public List<String> data() {
+        return getTables();
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListViewDetailsResponse.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListViewDetailsResponse.java
index 3a080f1f25..90c51d6738 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListViewDetailsResponse.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListViewDetailsResponse.java
@@ -18,8 +18,6 @@
 
 package org.apache.paimon.rest.responses;
 
-import org.apache.paimon.rest.RESTResponse;
-
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@@ -29,7 +27,7 @@ import java.util.List;
 
 /** Response for listing view details. */
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class ListViewDetailsResponse implements RESTResponse {
+public class ListViewDetailsResponse implements PagedResponse<GetViewResponse> 
{
 
     private static final String FIELD_VIEW_DETAILS = "viewDetails";
     private static final String FIELD_NEXT_PAGE_TOKEN = "nextPageToken";
@@ -62,4 +60,9 @@ public class ListViewDetailsResponse implements RESTResponse {
     public String getNextPageToken() {
         return this.nextPageToken;
     }
+
+    @Override
+    public List<GetViewResponse> data() {
+        return getViewDetails();
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListViewsResponse.java
 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListViewsResponse.java
index 851412ca5f..74cb47515b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListViewsResponse.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/ListViewsResponse.java
@@ -18,8 +18,6 @@
 
 package org.apache.paimon.rest.responses;
 
-import org.apache.paimon.rest.RESTResponse;
-
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@@ -29,7 +27,7 @@ import java.util.List;
 
 /** Response for listing views. */
 @JsonIgnoreProperties(ignoreUnknown = true)
-public class ListViewsResponse implements RESTResponse {
+public class ListViewsResponse implements PagedResponse<String> {
 
     private static final String FIELD_VIEWS = "views";
 
@@ -62,4 +60,9 @@ public class ListViewsResponse implements RESTResponse {
     public String getNextPageToken() {
         return this.nextPageToken;
     }
+
+    @Override
+    public List<String> data() {
+        return getViews();
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/rest/responses/PagedResponse.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/PagedResponse.java
new file mode 100644
index 0000000000..9cc16e09ac
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/rest/responses/PagedResponse.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest.responses;
+
+import org.apache.paimon.rest.RESTResponse;
+
+import java.util.List;
+
+/** Response for page. */
+public interface PagedResponse<T> extends RESTResponse {
+    List<T> data();
+
+    String getNextPageToken();
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 1013a562e4..de383bccc3 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -492,16 +492,6 @@ public abstract class CatalogTestBase {
         Table dataTable = catalog.getTable(identifier);
         assertThat(dataTable).isNotNull();
 
-        // Get system table throws Exception when table contains multiple '$' 
separator
-        assertThatExceptionOfType(IllegalArgumentException.class)
-                .isThrownBy(
-                        () ->
-                                catalog.getTable(
-                                        Identifier.create(
-                                                "test_db", 
"test_table$snapshots$snapshots")))
-                .withMessage(
-                        "System table can only contain one '$' separator, but 
this is: test_table$snapshots$snapshots");
-
         // Get system table throws TableNotExistException when data table does 
not exist
         assertThatExceptionOfType(Catalog.TableNotExistException.class)
                 .isThrownBy(
@@ -1292,6 +1282,12 @@ public abstract class CatalogTestBase {
                                 catalog.markDonePartitions(
                                         Identifier.create(databaseName, 
"non_existing_table"),
                                         partitionSpecs));
+        assertThatExceptionOfType(Catalog.TableNotExistException.class)
+                .isThrownBy(
+                        () ->
+                                catalog.dropPartitions(
+                                        Identifier.create(databaseName, 
"non_existing_table"),
+                                        partitionSpecs));
     }
 
     @Test
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index cce4a94df9..7a11a0a62d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -127,11 +127,13 @@ public class RESTCatalogServer {
     private final MockWebServer server;
     private final String authToken;
 
-    public final Map<String, Database> databaseStore = new HashMap<>();
-    public final Map<String, TableMetadata> tableMetadataStore = new 
HashMap<>();
-    public final Map<String, List<Partition>> tablePartitionsStore = new 
HashMap<>();
-    public final Map<String, View> viewStore = new HashMap<>();
-    public final Map<String, Snapshot> tableSnapshotStore = new HashMap<>();
+    private final Map<String, Database> databaseStore = new HashMap<>();
+    private final Map<String, TableMetadata> tableMetadataStore = new 
HashMap<>();
+    private final Map<String, List<Partition>> tablePartitionsStore = new 
HashMap<>();
+    private final Map<String, View> viewStore = new HashMap<>();
+    private final Map<String, Snapshot> tableSnapshotStore = new HashMap<>();
+    private final List<String> noPermissionDatabases = new ArrayList<>();
+    private final List<String> noPermissionTables = new ArrayList<>();
     public final ConfigResponse configResponse;
     public final String warehouse;
 
@@ -189,6 +191,14 @@ public class RESTCatalogServer {
         DataTokenStore.removeDataToken(warehouse, identifier.getFullName());
     }
 
+    public void addNoPermissionDatabase(String database) {
+        noPermissionDatabases.add(database);
+    }
+
+    public void addNoPermissionTable(Identifier identifier) {
+        noPermissionTables.add(identifier.getFullName());
+    }
+
     public RESTToken getDataToken(Identifier identifier) {
         return DataTokenStore.getDataToken(warehouse, 
identifier.getFullName());
     }
@@ -214,6 +224,9 @@ public class RESTCatalogServer {
                                         .substring((databaseUri + 
"/").length())
                                         .split("/");
                         String databaseName = resources[0];
+                        if (noPermissionDatabases.contains(databaseName)) {
+                            throw new 
Catalog.DatabaseNoPermissionException(databaseName);
+                        }
                         if (!databaseStore.containsKey(databaseName)) {
                             throw new 
Catalog.DatabaseNotExistException(databaseName);
                         }
@@ -280,8 +293,19 @@ public class RESTCatalogServer {
                                         && "branches".equals(resources[3]);
                         Identifier identifier =
                                 resources.length >= 3
+                                                && 
!"rename".equals(resources[2])
+                                                && 
!"commit".equals(resources[2])
                                         ? Identifier.create(databaseName, 
resources[2])
                                         : null;
+                        if (identifier != null && 
"tables".equals(resources[1])) {
+                            if (!identifier.isSystemTable()
+                                    && 
!tableMetadataStore.containsKey(identifier.getFullName())) {
+                                throw new 
Catalog.TableNotExistException(identifier);
+                            }
+                            if 
(noPermissionTables.contains(identifier.getFullName())) {
+                                throw new 
Catalog.TableNoPermissionException(identifier);
+                            }
+                        }
                         // validate partition
                         if (isPartitions
                                 || isDropPartitions
@@ -310,40 +334,7 @@ public class RESTCatalogServer {
                         } else if (isPartitions) {
                             return partitionsApiHandle(request, identifier);
                         } else if (isBranches) {
-                            FileStoreTable table = (FileStoreTable) 
catalog.getTable(identifier);
-                            BranchManager branchManager = 
table.branchManager();
-                            switch (request.getMethod()) {
-                                case "DELETE":
-                                    String branch = resources[4];
-                                    table.deleteBranch(branch);
-                                    return new 
MockResponse().setResponseCode(200);
-                                case "GET":
-                                    List<String> branches = 
branchManager.branches();
-                                    response = new 
ListBranchesResponse(branches);
-                                    return mockResponse(response, 200);
-                                case "POST":
-                                    if (resources.length == 5) {
-                                        ForwardBranchRequest requestBody =
-                                                OBJECT_MAPPER.readValue(
-                                                        
request.getBody().readUtf8(),
-                                                        
ForwardBranchRequest.class);
-                                        
branchManager.fastForward(requestBody.branch());
-                                    } else {
-                                        CreateBranchRequest requestBody =
-                                                OBJECT_MAPPER.readValue(
-                                                        
request.getBody().readUtf8(),
-                                                        
CreateBranchRequest.class);
-                                        if (requestBody.fromTag() == null) {
-                                            
branchManager.createBranch(requestBody.branch());
-                                        } else {
-                                            branchManager.createBranch(
-                                                    requestBody.branch(), 
requestBody.fromTag());
-                                        }
-                                    }
-                                    return new 
MockResponse().setResponseCode(200);
-                                default:
-                                    return new 
MockResponse().setResponseCode(404);
-                            }
+                            return branchApiHandle(resources, request, 
identifier);
                         } else if (isTableToken) {
                             return getDataTokenHandle(identifier);
                         } else if (isTableSnapshot) {
@@ -395,6 +386,22 @@ public class RESTCatalogServer {
                                     e.getMessage(),
                                     404);
                     return mockResponse(response, 404);
+                } catch (Catalog.DatabaseNoPermissionException e) {
+                    response =
+                            new ErrorResponse(
+                                    ErrorResponseResourceType.DATABASE,
+                                    e.database(),
+                                    e.getMessage(),
+                                    403);
+                    return mockResponse(response, 403);
+                } catch (Catalog.TableNoPermissionException e) {
+                    response =
+                            new ErrorResponse(
+                                    ErrorResponseResourceType.TABLE,
+                                    e.identifier().getTableName(),
+                                    e.getMessage(),
+                                    403);
+                    return mockResponse(response, 403);
                 } catch (Catalog.DatabaseAlreadyExistException e) {
                     response =
                             new ErrorResponse(
@@ -519,6 +526,12 @@ public class RESTCatalogServer {
         CommitTableRequest requestBody =
                 OBJECT_MAPPER.readValue(request.getBody().readUtf8(), 
CommitTableRequest.class);
         Identifier identifier = requestBody.getIdentifier();
+        if 
(noPermissionTables.contains(requestBody.getIdentifier().getFullName())) {
+            throw new 
Catalog.TableNoPermissionException(requestBody.getIdentifier());
+        }
+        if 
(!tableMetadataStore.containsKey(requestBody.getIdentifier().getFullName())) {
+            throw new 
Catalog.TableNotExistException(requestBody.getIdentifier());
+        }
         FileStoreTable table = getFileTable(identifier);
         RenamingSnapshotCommit commit =
                 new RenamingSnapshotCommit(table.snapshotManager(), 
Lock.empty());
@@ -545,6 +558,9 @@ public class RESTCatalogServer {
                         OBJECT_MAPPER.readValue(
                                 request.getBody().readUtf8(), 
CreateDatabaseRequest.class);
                 String databaseName = requestBody.getName();
+                if (noPermissionDatabases.contains(databaseName)) {
+                    throw new 
Catalog.DatabaseNoPermissionException(databaseName);
+                }
                 catalog.createDatabase(databaseName, false);
                 databaseStore.put(
                         databaseName, Database.of(databaseName, 
requestBody.getOptions(), null));
@@ -825,37 +841,45 @@ public class RESTCatalogServer {
     private MockResponse tableHandle(RecordedRequest request, Identifier 
identifier)
             throws Exception {
         RESTResponse response;
-        if (tableMetadataStore.containsKey(identifier.getFullName())) {
-            switch (request.getMethod()) {
-                case "GET":
-                    TableMetadata tableMetadata = 
tableMetadataStore.get(identifier.getFullName());
-                    response =
-                            new GetTableResponse(
-                                    tableMetadata.uuid(),
-                                    identifier.getTableName(),
-                                    tableMetadata.isExternal(),
-                                    tableMetadata.schema().id(),
-                                    tableMetadata.schema().toSchema());
-                    return mockResponse(response, 200);
-                case "POST":
-                    AlterTableRequest requestBody =
-                            OBJECT_MAPPER.readValue(
-                                    request.getBody().readUtf8(), 
AlterTableRequest.class);
-                    alterTableImpl(identifier, requestBody.getChanges());
-                    return new MockResponse().setResponseCode(200);
-                case "DELETE":
-                    try {
-                        catalog.dropTable(identifier, false);
-                    } catch (Exception e) {
-                        System.out.println(e.getMessage());
-                    }
-                    tableMetadataStore.remove(identifier.getFullName());
-                    return new MockResponse().setResponseCode(200);
-                default:
-                    return new MockResponse().setResponseCode(404);
-            }
-        } else {
-            throw new Catalog.TableNotExistException(identifier);
+        if (noPermissionTables.contains(identifier.getFullName())) {
+            throw new Catalog.TableNoPermissionException(identifier);
+        }
+        switch (request.getMethod()) {
+            case "GET":
+                TableMetadata tableMetadata;
+                identifier.isSystemTable();
+                if (identifier.isSystemTable()) {
+                    TableSchema schema = catalog.loadTableSchema(identifier);
+                    tableMetadata =
+                            createTableMetadata(
+                                    identifier, schema.id(), 
schema.toSchema(), null, false);
+                } else {
+                    tableMetadata = 
tableMetadataStore.get(identifier.getFullName());
+                }
+                response =
+                        new GetTableResponse(
+                                tableMetadata.uuid(),
+                                identifier.getTableName(),
+                                tableMetadata.isExternal(),
+                                tableMetadata.schema().id(),
+                                tableMetadata.schema().toSchema());
+                return mockResponse(response, 200);
+            case "POST":
+                AlterTableRequest requestBody =
+                        OBJECT_MAPPER.readValue(
+                                request.getBody().readUtf8(), 
AlterTableRequest.class);
+                alterTableImpl(identifier, requestBody.getChanges());
+                return new MockResponse().setResponseCode(200);
+            case "DELETE":
+                try {
+                    catalog.dropTable(identifier, false);
+                } catch (Exception e) {
+                    System.out.println(e.getMessage());
+                }
+                tableMetadataStore.remove(identifier.getFullName());
+                return new MockResponse().setResponseCode(200);
+            default:
+                return new MockResponse().setResponseCode(404);
         }
     }
 
@@ -864,11 +888,16 @@ public class RESTCatalogServer {
                 OBJECT_MAPPER.readValue(request.getBody().readUtf8(), 
RenameTableRequest.class);
         Identifier fromTable = requestBody.getSource();
         Identifier toTable = requestBody.getDestination();
-        if (tableMetadataStore.containsKey(fromTable.getFullName())) {
+        if (noPermissionTables.contains(fromTable.getFullName())) {
+            throw new Catalog.TableNoPermissionException(fromTable);
+        } else if (tableMetadataStore.containsKey(fromTable.getFullName())) {
             TableMetadata tableMetadata = 
tableMetadataStore.get(fromTable.getFullName());
             if (!isFormatTable(tableMetadata.schema().toSchema())) {
                 catalog.renameTable(requestBody.getSource(), 
requestBody.getDestination(), false);
             }
+            if (tableMetadataStore.containsKey(toTable.getFullName())) {
+                throw new Catalog.TableAlreadyExistException(toTable);
+            }
             tableMetadataStore.remove(fromTable.getFullName());
             tableMetadataStore.put(toTable.getFullName(), tableMetadata);
         } else {
@@ -884,8 +913,14 @@ public class RESTCatalogServer {
                 && Objects.nonNull(request.getRequestUrl())) {
             switch (request.getMethod()) {
                 case "GET":
-                    List<Partition> partitions =
-                            
tablePartitionsStore.get(tableIdentifier.getFullName());
+                    List<Partition> partitions = new ArrayList<>();
+                    for (Map.Entry<String, List<Partition>> entry :
+                            tablePartitionsStore.entrySet()) {
+                        String tableName = 
Identifier.fromString(entry.getKey()).getTableName();
+                        if (tableName.equals(tableIdentifier.getTableName())) {
+                            partitions.addAll(entry.getValue());
+                        }
+                    }
                     return generateFinalListPartitionsResponse(request, 
partitions);
                 case "POST":
                     CreatePartitionsRequest requestBody =
@@ -908,6 +943,86 @@ public class RESTCatalogServer {
         }
     }
 
+    private MockResponse branchApiHandle(
+            String[] resources, RecordedRequest request, Identifier 
identifier) throws Exception {
+        RESTResponse response;
+        FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+        BranchManager branchManager = table.branchManager();
+        String fromTag = "";
+        String branch = "";
+        Identifier branchIdentifier;
+        try {
+            switch (request.getMethod()) {
+                case "DELETE":
+                    branch = resources[4];
+                    branchIdentifier =
+                            new Identifier(
+                                    identifier.getDatabaseName(),
+                                    identifier.getTableName(),
+                                    branch);
+                    table.deleteBranch(branch);
+                    tableMetadataStore.remove(branchIdentifier.getFullName());
+                    return new MockResponse().setResponseCode(200);
+                case "GET":
+                    List<String> branches = branchManager.branches();
+                    response = new ListBranchesResponse(branches.isEmpty() ? 
null : branches);
+                    return mockResponse(response, 200);
+                case "POST":
+                    if (resources.length == 5) {
+                        ForwardBranchRequest requestBody =
+                                OBJECT_MAPPER.readValue(
+                                        request.getBody().readUtf8(), 
ForwardBranchRequest.class);
+                        branch = requestBody.branch();
+                        branchManager.fastForward(requestBody.branch());
+                    } else {
+                        CreateBranchRequest requestBody =
+                                OBJECT_MAPPER.readValue(
+                                        request.getBody().readUtf8(), 
CreateBranchRequest.class);
+                        branch = requestBody.branch();
+                        if (requestBody.fromTag() == null) {
+                            branchManager.createBranch(requestBody.branch());
+                        } else {
+                            fromTag = requestBody.fromTag();
+                            branchManager.createBranch(requestBody.branch(), 
requestBody.fromTag());
+                        }
+                        branchIdentifier =
+                                new Identifier(
+                                        identifier.getDatabaseName(),
+                                        identifier.getTableName(),
+                                        requestBody.branch());
+                        tableMetadataStore.put(
+                                branchIdentifier.getFullName(),
+                                
tableMetadataStore.get(identifier.getFullName()));
+                    }
+                    return new MockResponse().setResponseCode(200);
+                default:
+                    return new MockResponse().setResponseCode(404);
+            }
+        } catch (Exception e) {
+            if (e.getMessage().contains("Tag")) {
+                response =
+                        new ErrorResponse(
+                                ErrorResponseResourceType.TAG, fromTag, 
e.getMessage(), 404);
+                return mockResponse(response, 404);
+            }
+            if (e.getMessage().contains("Branch name")
+                    && e.getMessage().contains("already exists")) {
+                response =
+                        new ErrorResponse(
+                                ErrorResponseResourceType.BRANCH, branch, 
e.getMessage(), 409);
+                return mockResponse(response, 409);
+            }
+            if (e.getMessage().contains("Branch name")
+                    && e.getMessage().contains("doesn't exist")) {
+                response =
+                        new ErrorResponse(
+                                ErrorResponseResourceType.BRANCH, branch, 
e.getMessage(), 404);
+                return mockResponse(response, 404);
+            }
+        }
+        return new MockResponse().setResponseCode(404);
+    }
+
     private MockResponse generateFinalListPartitionsResponse(
             RecordedRequest request, List<Partition> partitions) {
         RESTResponse response;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 124a82154a..5e47bbe7b6 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -24,6 +24,8 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogContext;
 import org.apache.paimon.catalog.CatalogTestBase;
 import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.catalog.PropertyChange;
+import org.apache.paimon.catalog.SupportsBranches;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.options.CatalogOptions;
@@ -36,6 +38,7 @@ import org.apache.paimon.rest.auth.RESTAuthParameter;
 import org.apache.paimon.rest.exceptions.NotAuthorizedException;
 import org.apache.paimon.rest.responses.ConfigResponse;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.BatchTableCommit;
@@ -49,6 +52,7 @@ import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.view.View;
 
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
 import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
 import org.apache.paimon.shade.guava30.com.google.common.collect.Maps;
@@ -68,9 +72,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
 import static org.apache.paimon.CoreOptions.METASTORE_TAG_TO_PARTITION;
+import static org.apache.paimon.catalog.Catalog.SYSTEM_DATABASE_NAME;
+import static org.apache.paimon.rest.RESTCatalog.PAGE_TOKEN;
 import static 
org.apache.paimon.utils.SnapshotManagerTest.createSnapshotWithMillis;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
@@ -89,6 +96,7 @@ class RESTCatalogTest extends CatalogTestBase {
     private ConfigResponse config;
     private Options options = new Options();
     private String dataPath;
+    private RESTCatalog restCatalog;
 
     @BeforeEach
     @Override
@@ -112,7 +120,8 @@ class RESTCatalogTest extends CatalogTestBase {
         options.set(RESTCatalogOptions.URI, restCatalogServer.getUrl());
         options.set(RESTCatalogOptions.TOKEN, initToken);
         options.set(RESTCatalogOptions.TOKEN_PROVIDER, 
AuthProviderEnum.BEAR.identifier());
-        this.catalog = new RESTCatalog(CatalogContext.create(options));
+        this.restCatalog = new RESTCatalog(CatalogContext.create(options));
+        this.catalog = restCatalog;
     }
 
     @AfterEach
@@ -133,7 +142,6 @@ class RESTCatalogTest extends CatalogTestBase {
 
     @Test
     void testHeader() {
-        RESTCatalog restCatalog = (RESTCatalog) catalog;
         Map<String, String> parameters = new HashMap<>();
         parameters.put("k1", "v1");
         parameters.put("k2", "v2");
@@ -145,6 +153,121 @@ class RESTCatalogTest extends CatalogTestBase {
         assertEquals(headers.get(serverDefineHeaderName), 
serverDefineHeaderValue);
     }
 
+    @Test
+    void testDatabaseApiWhenNoPermission() {
+        String database = "test_no_permission_db";
+        restCatalogServer.addNoPermissionDatabase(database);
+        assertThrows(
+                Catalog.DatabaseNoPermissionException.class,
+                () -> catalog.createDatabase(database, false, 
Maps.newHashMap()));
+        assertThrows(
+                Catalog.DatabaseNoPermissionException.class, () -> 
catalog.getDatabase(database));
+        assertThrows(
+                Catalog.DatabaseNoPermissionException.class,
+                () -> catalog.dropDatabase(database, false, false));
+        assertThrows(
+                Catalog.DatabaseNoPermissionException.class,
+                () ->
+                        catalog.alterDatabase(
+                                database,
+                                
Lists.newArrayList(PropertyChange.setProperty("key1", "value1")),
+                                false));
+    }
+
+    @Test
+    void testApiWhenDatabaseNoExistAndNotIgnore() {
+        String database = "test_no_exist_db";
+        assertThrows(
+                Catalog.DatabaseNotExistException.class,
+                () -> catalog.dropDatabase(database, false, false));
+        assertThrows(
+                Catalog.DatabaseNotExistException.class,
+                () ->
+                        catalog.alterDatabase(
+                                database,
+                                
Lists.newArrayList(PropertyChange.setProperty("key1", "value1")),
+                                false));
+        assertThrows(
+                Catalog.DatabaseNotExistException.class,
+                () -> catalog.listTablesPaged(database, 100, null));
+        assertThrows(
+                Catalog.DatabaseNotExistException.class,
+                () -> catalog.listTableDetailsPaged(database, 100, null));
+    }
+
+    @Test
+    void testGetSystemDatabase() throws Catalog.DatabaseNotExistException {
+        assertThat(catalog.getDatabase(SYSTEM_DATABASE_NAME).name())
+                .isEqualTo(SYSTEM_DATABASE_NAME);
+    }
+
+    @Test
+    void testApiWhenTableNoPermission() throws Exception {
+        Identifier identifier = Identifier.create("test_table_db", 
"no_permission_table");
+        createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1"));
+        restCatalogServer.addNoPermissionTable(identifier);
+        assertThrows(Catalog.TableNoPermissionException.class, () -> 
catalog.getTable(identifier));
+        assertThrows(
+                Catalog.TableNoPermissionException.class,
+                () ->
+                        catalog.alterTable(
+                                identifier,
+                                Lists.newArrayList(
+                                        SchemaChange.addColumn("col2", 
DataTypes.DATE())),
+                                false));
+        assertThrows(
+                Catalog.TableNoPermissionException.class,
+                () -> catalog.dropTable(identifier, false));
+        assertThrows(
+                Catalog.TableNoPermissionException.class,
+                () ->
+                        catalog.renameTable(
+                                identifier,
+                                Identifier.create("test_table_db", 
"no_permission_table2"),
+                                false));
+        assertThrows(
+                Catalog.TableNoPermissionException.class, () -> 
catalog.listPartitions(identifier));
+        assertThrows(
+                Catalog.TableNoPermissionException.class,
+                () -> catalog.listPartitionsPaged(identifier, 100, null));
+        assertThrows(
+                Catalog.TableNoPermissionException.class,
+                () -> restCatalog.createBranch(identifier, "test_branch", 
null));
+        assertThrows(
+                Catalog.TableNoPermissionException.class,
+                () -> restCatalog.listBranches(identifier));
+        assertThrows(
+                Catalog.TableNoPermissionException.class,
+                () -> restCatalog.dropBranch(identifier, "test_branch"));
+        assertThrows(
+                Catalog.TableNoPermissionException.class,
+                () -> restCatalog.fastForward(identifier, "test_branch"));
+        assertThrows(
+                Catalog.TableNoPermissionException.class,
+                () -> restCatalog.loadTableToken(identifier));
+        assertThrows(
+                Catalog.TableNoPermissionException.class,
+                () -> restCatalog.loadSnapshot(identifier));
+        assertThrows(
+                Catalog.TableNoPermissionException.class,
+                () ->
+                        restCatalog.commitSnapshot(
+                                identifier,
+                                createSnapshotWithMillis(1L, 
System.currentTimeMillis()),
+                                new ArrayList<Partition>()));
+    }
+
+    @Test
+    void renameWhenTargetTableExist() throws Exception {
+        Identifier identifier = Identifier.create("test_table_db", 
"rename_table");
+        Identifier targetIdentifier = Identifier.create("test_table_db", 
"target_table");
+        createTable(identifier, Maps.newHashMap(), Lists.newArrayList("col1"));
+        createTable(targetIdentifier, Maps.newHashMap(), 
Lists.newArrayList("col1"));
+        assertThrows(
+                Catalog.TableAlreadyExistException.class,
+                () -> catalog.renameTable(identifier, targetIdentifier, 
false));
+    }
+
     @Test
     public void testListTables() throws Exception {
         super.testListTables();
@@ -153,25 +276,22 @@ class RESTCatalogTest extends CatalogTestBase {
         String[] tableNames = {"table4", "table5", "table1", "table2", 
"table3"};
         String[] sortedTableNames = 
Arrays.stream(tableNames).sorted().toArray(String[]::new);
         Options options = new Options(this.catalog.options());
-        try (RESTCatalog restCatalog = new 
RESTCatalog(CatalogContext.create(options))) {
-
-            restCatalog.createDatabase(databaseName, false);
-            List<String> restTables = restCatalog.listTables(databaseName);
-            assertThat(restTables).isEmpty();
-
-            // List tables returns a list with the names of all tables in the 
database
+        restCatalog.createDatabase(databaseName, false);
+        List<String> restTables = restCatalog.listTables(databaseName);
+        assertThat(restTables).isEmpty();
 
-            for (String tableName : tableNames) {
-                restCatalog.createTable(
-                        Identifier.create(databaseName, tableName), 
DEFAULT_TABLE_SCHEMA, false);
-            }
-            restTables = restCatalog.listTables(databaseName);
-            assertThat(restTables).containsExactly(sortedTableNames);
+        // List tables returns a list with the names of all tables in the 
database
 
-            // List tables throws DatabaseNotExistException when the database 
does not exist
-            assertThatExceptionOfType(Catalog.DatabaseNotExistException.class)
-                    .isThrownBy(() -> 
restCatalog.listTables("non_existing_db"));
+        for (String tableName : tableNames) {
+            restCatalog.createTable(
+                    Identifier.create(databaseName, tableName), 
DEFAULT_TABLE_SCHEMA, false);
         }
+        restTables = restCatalog.listTables(databaseName);
+        assertThat(restTables).containsExactly(sortedTableNames);
+
+        // List tables throws DatabaseNotExistException when the database does 
not exist
+        assertThatExceptionOfType(Catalog.DatabaseNotExistException.class)
+                .isThrownBy(() -> restCatalog.listTables("non_existing_db"));
     }
 
     @Test
@@ -311,30 +431,23 @@ class RESTCatalogTest extends CatalogTestBase {
 
     @Test
     void testListViews() throws Exception {
-        Options options = new Options(this.catalog.options());
-        String databaseName;
+        String databaseName = "views_paged_db";
         List<String> views;
         String[] viewNames = new String[] {"view1", "view2", "view3", "abd", 
"def", "opr", "xyz"};
         String[] sortedViewNames = 
Arrays.stream(viewNames).sorted().toArray(String[]::new);
-        List<String> restViews;
-        try (RESTCatalog restCatalog = new 
RESTCatalog(CatalogContext.create(options))) {
-
-            // List tables returns an empty list when there are no tables in 
the database
-            databaseName = "views_paged_db";
-            restCatalog.createDatabase(databaseName, false);
-            views = restCatalog.listViews(databaseName);
-            assertThat(views).isEmpty();
-
-            View view = buildView(databaseName);
+        // List tables returns an empty list when there are no tables in the 
database
+        restCatalog.createDatabase(databaseName, false);
+        views = restCatalog.listViews(databaseName);
+        assertThat(views).isEmpty();
 
-            for (String viewName : viewNames) {
-                restCatalog.createView(Identifier.create(databaseName, 
viewName), view, false);
-            }
+        View view = buildView(databaseName);
 
-            // when maxResults is null or 0, the page length is set to a 
server configured value
-            restViews = restCatalog.listViews(databaseName);
+        for (String viewName : viewNames) {
+            restCatalog.createView(Identifier.create(databaseName, viewName), 
view, false);
         }
-        assertThat(restViews).containsExactly(sortedViewNames);
+
+        // when maxResults is null or 0, the page length is set to a server 
configured value
+        
assertThat(restCatalog.listViews(databaseName)).containsExactly(sortedViewNames);
     }
 
     @Test
@@ -467,13 +580,26 @@ class RESTCatalogTest extends CatalogTestBase {
 
     @Test
     void testListPartitionsWhenMetastorePartitionedIsTrue() throws Exception {
+        String branchName = "test_branch";
         Identifier identifier = Identifier.create("test_db", "test_table");
+        Identifier branchIdentifier = new Identifier("test_db", "test_table", 
branchName);
+        assertThrows(
+                Catalog.TableNotExistException.class, () -> 
restCatalog.listPartitions(identifier));
+
         createTable(
                 identifier,
                 ImmutableMap.of(METASTORE_PARTITIONED_TABLE.key(), "" + true),
                 Lists.newArrayList("col1"));
         List<Partition> result = catalog.listPartitions(identifier);
         assertEquals(0, result.size());
+        List<Map<String, String>> partitionSpecs =
+                Arrays.asList(
+                        Collections.singletonMap("dt", "20250101"),
+                        Collections.singletonMap("dt", "20250102"));
+        restCatalog.createBranch(identifier, branchName, null);
+        restCatalog.createPartitions(branchIdentifier, 
Lists.newArrayList(partitionSpecs));
+        
assertThat(catalog.listPartitions(identifier).stream().map(Partition::spec))
+                .containsExactlyInAnyOrder(partitionSpecs.get(0), 
partitionSpecs.get(1));
     }
 
     @Test
@@ -499,34 +625,23 @@ class RESTCatalogTest extends CatalogTestBase {
                         .sorted(Comparator.comparing(i -> i.get("dt")))
                         .toArray(Map[]::new);
 
-        Options options = new Options(this.catalog.options());
-        Identifier identifier;
-        try (RESTCatalog restCatalog = new 
RESTCatalog(CatalogContext.create(options))) {
-
-            String databaseName = "partitions_db";
-            identifier = Identifier.create(databaseName, "table");
-            Schema schema =
-                    Schema.newBuilder()
-                            .option(METASTORE_PARTITIONED_TABLE.key(), "true")
-                            .option(METASTORE_TAG_TO_PARTITION.key(), "dt")
-                            .column("col", DataTypes.INT())
-                            .column("dt", DataTypes.STRING())
-                            .partitionKeys("dt")
-                            .build();
-
-            restCatalog.createDatabase(databaseName, true);
-            restCatalog.createTable(identifier, schema, true);
-            restCatalog.createPartitions(identifier, partitionSpecs);
-
-            List<Partition> restPartitions = 
restCatalog.listPartitions(identifier);
-            
assertThat(restPartitions.stream().map(Partition::spec)).containsExactly(sortedSpecs);
-
-            assertThatExceptionOfType(Catalog.TableNotExistException.class)
-                    .isThrownBy(
-                            () ->
-                                    restCatalog.listPartitions(
-                                            Identifier.create(databaseName, 
"non_existing_table")));
-        }
+        String databaseName = "partitions_db";
+        Identifier identifier = Identifier.create(databaseName, "table");
+        Schema schema =
+                Schema.newBuilder()
+                        .option(METASTORE_PARTITIONED_TABLE.key(), "true")
+                        .option(METASTORE_TAG_TO_PARTITION.key(), "dt")
+                        .column("col", DataTypes.INT())
+                        .column("dt", DataTypes.STRING())
+                        .partitionKeys("dt")
+                        .build();
+
+        restCatalog.createDatabase(databaseName, true);
+        restCatalog.createTable(identifier, schema, true);
+        restCatalog.createPartitions(identifier, partitionSpecs);
+
+        List<Partition> restPartitions = 
restCatalog.listPartitions(identifier);
+        
assertThat(restPartitions.stream().map(Partition::spec)).containsExactly(sortedSpecs);
     }
 
     @Test
@@ -543,6 +658,11 @@ class RESTCatalogTest extends CatalogTestBase {
         catalog.dropDatabase(databaseName, true, true);
         catalog.createDatabase(databaseName, true);
         Identifier identifier = Identifier.create(databaseName, "table");
+
+        assertThrows(
+                Catalog.TableNotExistException.class,
+                () -> catalog.listPartitionsPaged(identifier, 10, 
"dt=20250101"));
+
         catalog.createTable(
                 identifier,
                 Schema.newBuilder()
@@ -604,16 +724,6 @@ class RESTCatalogTest extends CatalogTestBase {
                 partitionSpecs.get(4),
                 partitionSpecs.get(3));
         assertNull(pagedPartitions.getNextPageToken());
-
-        // List partitions paged throws TableNotExistException when the table 
does not exist
-        final int finalMaxResults = maxResults;
-        assertThatExceptionOfType(Catalog.TableNotExistException.class)
-                .isThrownBy(
-                        () ->
-                                catalog.listPartitionsPaged(
-                                        Identifier.create(databaseName, 
"non_existing_table"),
-                                        finalMaxResults,
-                                        "dt=20250101"));
     }
 
     @Test
@@ -665,6 +775,19 @@ class RESTCatalogTest extends CatalogTestBase {
     void testSnapshotFromREST() throws Exception {
         RESTCatalog catalog = (RESTCatalog) this.catalog;
         Identifier hasSnapshotTableIdentifier = Identifier.create("test_db_a", 
"my_snapshot_table");
+
+        assertThrows(
+                Catalog.TableNotExistException.class,
+                () -> restCatalog.loadSnapshot(hasSnapshotTableIdentifier));
+
+        assertThrows(
+                Catalog.TableNotExistException.class,
+                () ->
+                        restCatalog.commitSnapshot(
+                                hasSnapshotTableIdentifier,
+                                createSnapshotWithMillis(1L, 
System.currentTimeMillis()),
+                                new ArrayList<Partition>()));
+
         createTable(hasSnapshotTableIdentifier, Maps.newHashMap(), 
Lists.newArrayList("col1"));
         long id = 10086;
         long millis = System.currentTimeMillis();
@@ -743,16 +866,93 @@ class RESTCatalogTest extends CatalogTestBase {
         catalog.dropDatabase(databaseName, true, true);
         catalog.createDatabase(databaseName, true);
         Identifier identifier = Identifier.create(databaseName, "table");
+
+        assertThrows(
+                Catalog.TableNotExistException.class,
+                () -> restCatalog.createBranch(identifier, "my_branch", null));
+
+        assertThrows(
+                Catalog.TableNotExistException.class, () -> 
restCatalog.listBranches(identifier));
+
         catalog.createTable(
                 identifier, Schema.newBuilder().column("col", 
DataTypes.INT()).build(), true);
-
-        RESTCatalog restCatalog = (RESTCatalog) catalog;
+        assertThrows(
+                SupportsBranches.TagNotExistException.class,
+                () -> restCatalog.createBranch(identifier, "my_branch", 
"tag"));
         restCatalog.createBranch(identifier, "my_branch", null);
+        Identifier branchIdentifier = new Identifier(databaseName, "table", 
"my_branch");
+        assertThat(restCatalog.getTable(branchIdentifier)).isNotNull();
+        assertThrows(
+                SupportsBranches.BranchAlreadyExistException.class,
+                () -> restCatalog.createBranch(identifier, "my_branch", null));
         
assertThat(restCatalog.listBranches(identifier)).containsOnly("my_branch");
         restCatalog.dropBranch(identifier, "my_branch");
+
+        assertThrows(
+                SupportsBranches.BranchNotExistException.class,
+                () -> restCatalog.dropBranch(identifier, "no_exist_branch"));
+        assertThrows(
+                SupportsBranches.BranchNotExistException.class,
+                () -> restCatalog.fastForward(identifier, "no_exist_branch"));
         assertThat(restCatalog.listBranches(identifier)).isEmpty();
     }
 
+    @Test
+    void testListDataFromPageApiWhenLastPageTokenIsNull() {
+        List<Integer> testData = ImmutableList.of(1, 2, 3, 4, 5, 6, 7);
+        int maxResults = 2;
+        AtomicInteger fetchTimes = new AtomicInteger(0);
+        List<Integer> fetchData =
+                restCatalog.listDataFromPageApi(
+                        queryParams -> {
+                            return generateTestPagedResponse(
+                                    queryParams, testData, maxResults, 
fetchTimes, true);
+                        });
+        assertEquals(fetchTimes.get(), 4);
+        assertThat(fetchData).containsSequence(testData);
+    }
+
+    @Test
+    void testListDataFromPageApiWhenLastPageTokenIsNotNullAndDataIsNull() {
+        List<Integer> testData = ImmutableList.of(1, 2, 3, 4, 5, 6);
+        int maxResults = 2;
+        AtomicInteger fetchTimes = new AtomicInteger(0);
+        List<Integer> fetchData =
+                restCatalog.listDataFromPageApi(
+                        queryParams -> {
+                            return generateTestPagedResponse(
+                                    queryParams, testData, maxResults, 
fetchTimes, false);
+                        });
+
+        assertEquals(fetchTimes.get(), testData.size() / maxResults + 1);
+        assertThat(fetchData).containsSequence(testData);
+    }
+
+    private TestPagedResponse generateTestPagedResponse(
+            Map<String, String> queryParams,
+            List<Integer> testData,
+            int maxResults,
+            AtomicInteger fetchTimes,
+            boolean supportPageTokenNull) {
+        String nextToken = queryParams.getOrDefault(PAGE_TOKEN, null);
+        fetchTimes.incrementAndGet();
+        if (nextToken == null) {
+            return new TestPagedResponse(maxResults + "", testData.subList(0, 
maxResults));
+        } else {
+            int index = Integer.parseInt(nextToken);
+            if (index >= testData.size()) {
+                return new TestPagedResponse(null, null);
+            } else {
+                int endIndex = Math.min((index + maxResults), testData.size());
+                String nextPageToken =
+                        supportPageTokenNull && endIndex >= (testData.size())
+                                ? null
+                                : endIndex + "";
+                return new TestPagedResponse(nextPageToken, 
testData.subList(index, endIndex));
+            }
+        }
+    }
+
     @Override
     protected boolean supportsFormatTable() {
         return true;
@@ -786,7 +986,7 @@ class RESTCatalogTest extends CatalogTestBase {
     private void createTable(
             Identifier identifier, Map<String, String> options, List<String> 
partitionKeys)
             throws Exception {
-        catalog.createDatabase(identifier.getDatabaseName(), false);
+        catalog.createDatabase(identifier.getDatabaseName(), true);
         catalog.createTable(
                 identifier,
                 new Schema(
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/TestPagedResponse.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/TestPagedResponse.java
new file mode 100644
index 0000000000..ed49666aca
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/TestPagedResponse.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.rest;
+
+import org.apache.paimon.rest.responses.PagedResponse;
+
+import java.util.List;
+
+/** test for page response. */
+public class TestPagedResponse implements PagedResponse<Integer> {
+
+    private final String nextPageToken;
+    private final List<Integer> data;
+
+    public TestPagedResponse(String nextPageToken, List<Integer> data) {
+        this.nextPageToken = nextPageToken;
+        this.data = data;
+    }
+
+    @Override
+    public List<Integer> data() {
+        return this.data;
+    }
+
+    @Override
+    public String getNextPageToken() {
+        return this.nextPageToken;
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/DLFAuthProviderFactoryTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/DLFAuthProviderFactoryTest.java
index 723cf01d08..5dfc5768cc 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/rest/auth/DLFAuthProviderFactoryTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/rest/auth/DLFAuthProviderFactoryTest.java
@@ -29,7 +29,6 @@ class DLFAuthProviderFactoryTest {
     @Test
     void getRegion() {
         String region = "cn-hangzhou";
-        String ipPortUri = "http://127.0.0.1:8080";;
         String url = "https://dlf-"; + region + "-internal.aliyuncs.com";
         assertEquals(region, DLFAuthProviderFactory.parseRegionFromUri(url));
         url = "https://dlf-"; + region + ".aliyuncs.com";
@@ -41,6 +40,7 @@ class DLFAuthProviderFactoryTest {
         assertEquals(region, DLFAuthProviderFactory.parseRegionFromUri(url));
         url = "https://dlf-"; + region + "-internal.aliyuncs.com";
         assertEquals(region, DLFAuthProviderFactory.parseRegionFromUri(url));
+        String ipPortUri = "http://127.0.0.1:8080";;
         assertThrows(
                 IllegalArgumentException.class,
                 () -> DLFAuthProviderFactory.parseRegionFromUri(ipPortUri));

Reply via email to