This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 406c4e58d7 [core] Add supportsVersionManagement to Catalog (#5326)
406c4e58d7 is described below

commit 406c4e58d7dac8a8dc50e1e04a8e13c1f2cfb5b6
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Mar 21 23:15:34 2025 +0800

    [core] Add supportsVersionManagement to Catalog (#5326)
---
 .../org/apache/paimon/catalog/AbstractCatalog.java |  25 +++++
 .../java/org/apache/paimon/catalog/Catalog.java    | 117 ++++++++++++---------
 .../paimon/catalog/CatalogSnapshotCommit.java      |  31 ++----
 .../org/apache/paimon/catalog/CatalogUtils.java    |   3 +-
 .../org/apache/paimon/catalog/DelegateCatalog.java |   5 +
 .../java/org/apache/paimon/rest/RESTCatalog.java   |  31 ++++++
 .../paimon/table/AbstractFileStoreTable.java       |  12 +--
 .../apache/paimon/table/CatalogEnvironment.java    |  17 ++-
 .../apache/paimon/utils/CatalogBranchManager.java  |  37 ++-----
 .../paimon/operation/PartitionExpireTest.java      |   2 +-
 .../org/apache/paimon/rest/RESTCatalogServer.java  |   3 +-
 11 files changed, 163 insertions(+), 120 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 498e13e38b..18e2f8141e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -38,6 +38,7 @@ import org.apache.paimon.table.Instant;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableSnapshot;
 import org.apache.paimon.table.object.ObjectTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.system.SystemTableLoader;
 import org.apache.paimon.types.RowType;
 
@@ -479,6 +480,30 @@ public abstract class AbstractCatalog implements Catalog {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public boolean supportsVersionManagement() {
+        return false;
+    }
+
+    @Override
+    public void createPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
+            throws TableNotExistException {}
+
+    @Override
+    public void dropPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
+            throws TableNotExistException {
+        Table table = getTable(identifier);
+        try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
+            commit.truncatePartitions(partitions);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void alterPartitions(Identifier identifier, 
List<PartitionStatistics> partitions)
+            throws TableNotExistException {}
+
     /**
      * Create a {@link FormatTable} identified by the given {@link Identifier}.
      *
diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
index fa2bc0ea80..3698f7a670 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Catalog.java
@@ -28,7 +28,6 @@ import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.table.Instant;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableSnapshot;
-import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.view.View;
 
 import javax.annotation.Nullable;
@@ -464,7 +463,60 @@ public interface Catalog extends AutoCloseable {
         throw new UnsupportedOperationException();
     }
 
-    // ==================== Branch methods ==========================
+    // ==================== Version management methods 
==========================
+
+    /**
+     * Whether this catalog supports version management for tables. If not, 
corresponding methods
+     * will throw an {@link UnsupportedOperationException}, affect the 
following methods:
+     *
+     * <ul>
+     *   <li>{@link #commitSnapshot(Identifier, Snapshot, List)}.
+     *   <li>{@link #loadSnapshot(Identifier)}.
+     *   <li>{@link #rollbackTo(Identifier, Instant)}.
+     *   <li>{@link #createBranch(Identifier, String, String)}.
+     *   <li>{@link #dropBranch(Identifier, String)}.
+     *   <li>{@link #listBranches(Identifier)}.
+     * </ul>
+     */
+    boolean supportsVersionManagement();
+
+    /**
+     * Commit the {@link Snapshot} for table identified by the given {@link 
Identifier}.
+     *
+     * @param identifier Path of the table
+     * @param snapshot Snapshot to be committed
+     * @param statistics statistics information of this change
+     * @return Success or not
+     * @throws Catalog.TableNotExistException if the target does not exist
+     * @throws UnsupportedOperationException if the catalog does not {@link
+     *     #supportsVersionManagement()}
+     */
+    boolean commitSnapshot(
+            Identifier identifier, Snapshot snapshot, 
List<PartitionStatistics> statistics)
+            throws Catalog.TableNotExistException;
+
+    /**
+     * Return the snapshot of table identified by the given {@link Identifier}.
+     *
+     * @param identifier Path of the table
+     * @return The requested snapshot of the table
+     * @throws Catalog.TableNotExistException if the target does not exist
+     * @throws UnsupportedOperationException if the catalog does not {@link
+     *     #supportsVersionManagement()}
+     */
+    Optional<TableSnapshot> loadSnapshot(Identifier identifier)
+            throws Catalog.TableNotExistException;
+
+    /**
+     * rollback table by the given {@link Identifier} and instant.
+     *
+     * @param identifier path of the table
+     * @param instant like snapshotId or tagName
+     * @throws Catalog.TableNotExistException if the table does not exist
+     * @throws UnsupportedOperationException if the catalog does not {@link
+     *     #supportsVersionManagement()}
+     */
+    void rollbackTo(Identifier identifier, Instant instant) throws 
Catalog.TableNotExistException;
 
     /**
      * Create a new branch for this table. By default, an empty branch will be 
created using the
@@ -477,6 +529,8 @@ public interface Catalog extends AutoCloseable {
      * @throws TableNotExistException if the table in identifier doesn't exist
      * @throws BranchAlreadyExistException if the branch already exists
      * @throws TagNotExistException if the tag doesn't exist
+     * @throws UnsupportedOperationException if the catalog does not {@link
+     *     #supportsVersionManagement()}
      */
     void createBranch(Identifier identifier, String branch, @Nullable String 
fromTag)
             throws TableNotExistException, BranchAlreadyExistException, 
TagNotExistException;
@@ -487,6 +541,8 @@ public interface Catalog extends AutoCloseable {
      * @param identifier path of the table, cannot be system or branch name.
      * @param branch the branch name
      * @throws BranchNotExistException if the branch doesn't exist
+     * @throws UnsupportedOperationException if the catalog does not {@link
+     *     #supportsVersionManagement()}
      */
     void dropBranch(Identifier identifier, String branch) throws 
BranchNotExistException;
 
@@ -496,6 +552,8 @@ public interface Catalog extends AutoCloseable {
      * @param identifier path of the table, cannot be system or branch name.
      * @param branch the branch name
      * @throws BranchNotExistException if the branch doesn't exist
+     * @throws UnsupportedOperationException if the catalog does not {@link
+     *     #supportsVersionManagement()}
      */
     void fastForward(Identifier identifier, String branch) throws 
BranchNotExistException;
 
@@ -504,43 +562,11 @@ public interface Catalog extends AutoCloseable {
      *
      * @param identifier path of the table, cannot be system or branch name.
      * @throws TableNotExistException if the table in identifier doesn't exist
+     * @throws UnsupportedOperationException if the catalog does not {@link
+     *     #supportsVersionManagement()}
      */
     List<String> listBranches(Identifier identifier) throws 
TableNotExistException;
 
-    // ==================== Snapshot Operations ==========================
-
-    /**
-     * Commit the {@link Snapshot} for table identified by the given {@link 
Identifier}.
-     *
-     * @param identifier Path of the table
-     * @param snapshot Snapshot to be committed
-     * @param statistics statistics information of this change
-     * @return Success or not
-     * @throws Catalog.TableNotExistException if the target does not exist
-     */
-    boolean commitSnapshot(
-            Identifier identifier, Snapshot snapshot, 
List<PartitionStatistics> statistics)
-            throws Catalog.TableNotExistException;
-
-    /**
-     * Return the snapshot of table identified by the given {@link Identifier}.
-     *
-     * @param identifier Path of the table
-     * @return The requested snapshot of the table
-     * @throws Catalog.TableNotExistException if the target does not exist
-     */
-    Optional<TableSnapshot> loadSnapshot(Identifier identifier)
-            throws Catalog.TableNotExistException;
-
-    /**
-     * rollback table by the given {@link Identifier} and instant.
-     *
-     * @param identifier path of the table
-     * @param instant like snapshotId or tagName
-     * @throws Catalog.TableNotExistException if the table does not exist
-     */
-    void rollbackTo(Identifier identifier, Instant instant) throws 
Catalog.TableNotExistException;
-
     // ==================== Partition Modifications ==========================
 
     /**
@@ -550,8 +576,8 @@ public interface Catalog extends AutoCloseable {
      * @param partitions partitions to be created
      * @throws TableNotExistException if the table does not exist
      */
-    default void createPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
-            throws TableNotExistException {}
+    void createPartitions(Identifier identifier, List<Map<String, String>> 
partitions)
+            throws TableNotExistException;
 
     /**
      * Drop partitions of the specify table. Ignore non-existent partitions.
@@ -560,15 +586,8 @@ public interface Catalog extends AutoCloseable {
      * @param partitions partitions to be deleted
      * @throws TableNotExistException if the table does not exist
      */
-    default void dropPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
-            throws TableNotExistException {
-        Table table = getTable(identifier);
-        try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
-            commit.truncatePartitions(partitions);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
+    void dropPartitions(Identifier identifier, List<Map<String, String>> 
partitions)
+            throws TableNotExistException;
 
     /**
      * Alter partitions of the specify table. For non-existent partitions, 
partitions will be
@@ -578,8 +597,8 @@ public interface Catalog extends AutoCloseable {
      * @param partitions partitions to be altered
      * @throws TableNotExistException if the table does not exist
      */
-    default void alterPartitions(Identifier identifier, 
List<PartitionStatistics> partitions)
-            throws TableNotExistException {}
+    void alterPartitions(Identifier identifier, List<PartitionStatistics> 
partitions)
+            throws TableNotExistException;
 
     // ==================== Catalog Information ==========================
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogSnapshotCommit.java
 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogSnapshotCommit.java
index 491cd22d88..62e5b58ac5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogSnapshotCommit.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogSnapshotCommit.java
@@ -22,8 +22,6 @@ import org.apache.paimon.Snapshot;
 import org.apache.paimon.partition.PartitionStatistics;
 import org.apache.paimon.utils.SnapshotManager;
 
-import javax.annotation.Nullable;
-
 import java.util.List;
 
 /** A {@link SnapshotCommit} using {@link Catalog} to commit. */
@@ -31,25 +29,18 @@ public class CatalogSnapshotCommit implements 
SnapshotCommit {
 
     private final Catalog catalog;
     private final Identifier identifier;
-    private final RenamingSnapshotCommit renamingCommit;
 
-    public CatalogSnapshotCommit(
-            Catalog catalog, Identifier identifier, RenamingSnapshotCommit 
renamingCommit) {
+    public CatalogSnapshotCommit(Catalog catalog, Identifier identifier) {
         this.catalog = catalog;
         this.identifier = identifier;
-        this.renamingCommit = renamingCommit;
     }
 
     @Override
     public boolean commit(Snapshot snapshot, String branch, 
List<PartitionStatistics> statistics)
             throws Exception {
-        try {
-            Identifier newIdentifier =
-                    new Identifier(identifier.getDatabaseName(), 
identifier.getTableName(), branch);
-            return catalog.commitSnapshot(newIdentifier, snapshot, statistics);
-        } catch (UnsupportedOperationException e) {
-            return renamingCommit.commit(snapshot, branch, statistics);
-        }
+        Identifier newIdentifier =
+                new Identifier(identifier.getDatabaseName(), 
identifier.getTableName(), branch);
+        return catalog.commitSnapshot(newIdentifier, snapshot, statistics);
     }
 
     @Override
@@ -63,24 +54,14 @@ public class CatalogSnapshotCommit implements 
SnapshotCommit {
         private static final long serialVersionUID = 1L;
 
         private final CatalogLoader catalogLoader;
-        @Nullable private final CatalogLockFactory lockFactory;
-        @Nullable private final CatalogLockContext lockContext;
 
-        public Factory(
-                CatalogLoader catalogLoader,
-                @Nullable CatalogLockFactory lockFactory,
-                @Nullable CatalogLockContext lockContext) {
+        public Factory(CatalogLoader catalogLoader) {
             this.catalogLoader = catalogLoader;
-            this.lockFactory = lockFactory;
-            this.lockContext = lockContext;
         }
 
         @Override
         public SnapshotCommit create(Identifier identifier, SnapshotManager 
snapshotManager) {
-            RenamingSnapshotCommit renamingCommit =
-                    new RenamingSnapshotCommit.Factory(lockFactory, 
lockContext)
-                            .create(identifier, snapshotManager);
-            return new CatalogSnapshotCommit(catalogLoader.load(), identifier, 
renamingCommit);
+            return new CatalogSnapshotCommit(catalogLoader.load(), identifier);
         }
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index 416941e6a6..bdbfe41587 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -194,7 +194,8 @@ public class CatalogUtils {
                         metadata.uuid(),
                         catalog.catalogLoader(),
                         lockFactory,
-                        lockContext);
+                        lockContext,
+                        catalog.supportsVersionManagement());
         Path path = new Path(schema.options().get(PATH.key()));
         FileStoreTable table =
                 FileStoreTableFactory.create(dataFileIO.apply(path), path, 
schema, catalogEnv);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
index 39c8914192..756881af17 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/DelegateCatalog.java
@@ -135,6 +135,11 @@ public abstract class DelegateCatalog implements Catalog {
         wrapped.alterTable(identifier, changes, ignoreIfNotExists);
     }
 
+    @Override
+    public boolean supportsVersionManagement() {
+        return wrapped.supportsVersionManagement();
+    }
+
     @Override
     public Optional<TableSnapshot> loadSnapshot(Identifier identifier)
             throws TableNotExistException {
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 48b665cf3a..7ea1f84a2c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -76,6 +76,7 @@ import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.Instant;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.TableSnapshot;
+import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.system.SystemTableLoader;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.view.View;
@@ -383,6 +384,11 @@ public class RESTCatalog implements Catalog {
         return Optional.of(response.getSnapshot());
     }
 
+    @Override
+    public boolean supportsVersionManagement() {
+        return true;
+    }
+
     @Override
     public boolean commitSnapshot(
             Identifier identifier, Snapshot snapshot, 
List<PartitionStatistics> statistics)
@@ -727,6 +733,31 @@ public class RESTCatalog implements Catalog {
         }
     }
 
+    @Override
+    public void createPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
+            throws TableNotExistException {
+        // partitions of the REST Catalog server are automatically calculated 
and do not require
+        // special creating.
+    }
+
+    @Override
+    public void dropPartitions(Identifier identifier, List<Map<String, 
String>> partitions)
+            throws TableNotExistException {
+        Table table = getTable(identifier);
+        try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
+            commit.truncatePartitions(partitions);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void alterPartitions(Identifier identifier, 
List<PartitionStatistics> partitions)
+            throws TableNotExistException {
+        // The partition statistics of the REST Catalog server are 
automatically calculated and do
+        // not require special reporting.
+    }
+
     @Override
     public View getView(Identifier identifier) throws ViewNotExistException {
         try {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 4a4aa88cd9..2e9647f9d5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -711,14 +711,12 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
 
     @Override
     public BranchManager branchManager() {
-        FileSystemBranchManager branchManager =
-                new FileSystemBranchManager(
-                        fileIO, path, snapshotManager(), tagManager(), 
schemaManager());
-        if (catalogEnvironment.catalogLoader() != null) {
-            return new CatalogBranchManager(
-                    catalogEnvironment.catalogLoader(), identifier(), 
branchManager);
+        if (catalogEnvironment.catalogLoader() != null
+                && catalogEnvironment.supportsVersionManagement()) {
+            return new 
CatalogBranchManager(catalogEnvironment.catalogLoader(), identifier());
         }
-        return branchManager;
+        return new FileSystemBranchManager(
+                fileIO, path, snapshotManager(), tagManager(), 
schemaManager());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java 
b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
index ef7e18be2c..3d7cb3c248 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/CatalogEnvironment.java
@@ -44,22 +44,25 @@ public class CatalogEnvironment implements Serializable {
     @Nullable private final CatalogLoader catalogLoader;
     @Nullable private final CatalogLockFactory lockFactory;
     @Nullable private final CatalogLockContext lockContext;
+    private final boolean supportsVersionManagement;
 
     public CatalogEnvironment(
             @Nullable Identifier identifier,
             @Nullable String uuid,
             @Nullable CatalogLoader catalogLoader,
             @Nullable CatalogLockFactory lockFactory,
-            @Nullable CatalogLockContext lockContext) {
+            @Nullable CatalogLockContext lockContext,
+            boolean supportsVersionManagement) {
         this.identifier = identifier;
         this.uuid = uuid;
         this.catalogLoader = catalogLoader;
         this.lockFactory = lockFactory;
         this.lockContext = lockContext;
+        this.supportsVersionManagement = supportsVersionManagement;
     }
 
     public static CatalogEnvironment empty() {
-        return new CatalogEnvironment(null, null, null, null, null);
+        return new CatalogEnvironment(null, null, null, null, null, false);
     }
 
     @Nullable
@@ -81,13 +84,17 @@ public class CatalogEnvironment implements Serializable {
         return PartitionHandler.create(catalog, identifier);
     }
 
+    public boolean supportsVersionManagement() {
+        return supportsVersionManagement;
+    }
+
     @Nullable
     public SnapshotCommit snapshotCommit(SnapshotManager snapshotManager) {
         SnapshotCommit.Factory factory;
-        if (catalogLoader == null) {
-            factory = new RenamingSnapshotCommit.Factory(lockFactory, 
lockContext);
+        if (catalogLoader != null && supportsVersionManagement) {
+            factory = new CatalogSnapshotCommit.Factory(catalogLoader);
         } else {
-            factory = new CatalogSnapshotCommit.Factory(catalogLoader, 
lockFactory, lockContext);
+            factory = new RenamingSnapshotCommit.Factory(lockFactory, 
lockContext);
         }
         return factory.create(identifier, snapshotManager);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
index 5ba283c0d5..d87f97badd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/utils/CatalogBranchManager.java
@@ -31,15 +31,10 @@ public class CatalogBranchManager implements BranchManager {
 
     private final CatalogLoader catalogLoader;
     private final Identifier identifier;
-    private final FileSystemBranchManager branchManager;
 
-    public CatalogBranchManager(
-            CatalogLoader catalogLoader,
-            Identifier identifier,
-            FileSystemBranchManager branchManager) {
+    public CatalogBranchManager(CatalogLoader catalogLoader, Identifier 
identifier) {
         this.catalogLoader = catalogLoader;
         this.identifier = identifier;
-        this.branchManager = branchManager;
     }
 
     private void executePost(ThrowingConsumer<Catalog, Exception> func) {
@@ -62,46 +57,26 @@ public class CatalogBranchManager implements BranchManager {
 
     @Override
     public void createBranch(String branchName) {
-        try {
-            executePost(catalog -> catalog.createBranch(identifier, 
branchName, null));
-        } catch (UnsupportedOperationException e) {
-            branchManager.createBranch(branchName);
-        }
+        executePost(catalog -> catalog.createBranch(identifier, branchName, 
null));
     }
 
     @Override
     public void createBranch(String branchName, @Nullable String tagName) {
-        try {
-            executePost(catalog -> catalog.createBranch(identifier, 
branchName, tagName));
-        } catch (UnsupportedOperationException e) {
-            branchManager.createBranch(branchName, tagName);
-        }
+        executePost(catalog -> catalog.createBranch(identifier, branchName, 
tagName));
     }
 
     @Override
     public void dropBranch(String branchName) {
-        try {
-            executePost(catalog -> catalog.dropBranch(identifier, branchName));
-        } catch (UnsupportedOperationException e) {
-            branchManager.dropBranch(branchName);
-        }
+        executePost(catalog -> catalog.dropBranch(identifier, branchName));
     }
 
     @Override
     public void fastForward(String branchName) {
-        try {
-            executePost(catalog -> catalog.fastForward(identifier, 
branchName));
-        } catch (UnsupportedOperationException e) {
-            branchManager.fastForward(branchName);
-        }
+        executePost(catalog -> catalog.fastForward(identifier, branchName));
     }
 
     @Override
     public List<String> branches() {
-        try {
-            return executeGet(catalog -> catalog.listBranches(identifier));
-        } catch (UnsupportedOperationException e) {
-            return branchManager.branches();
-        }
+        return executeGet(catalog -> catalog.listBranches(identifier));
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 2f835440aa..aba37d4cce 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -134,7 +134,7 @@ public class PartitionExpireTest {
                 };
 
         CatalogEnvironment env =
-                new CatalogEnvironment(null, null, null, null, null) {
+                new CatalogEnvironment(null, null, null, null, null, false) {
 
                     @Override
                     public PartitionHandler partitionHandler() {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java 
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 62382e10b1..98e11830a2 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -1546,7 +1546,8 @@ public class RESTCatalogServer {
                             tableMetadata.uuid(),
                             catalog.catalogLoader(),
                             catalog.lockFactory().orElse(null),
-                            catalog.lockContext().orElse(null));
+                            catalog.lockContext().orElse(null),
+                            false);
             Path path = new Path(schema.options().get(PATH.key()));
             FileIO dataFileIO = catalog.fileIO();
             FileStoreTable table =

Reply via email to