This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2050d841db [core] Refactor FileStore.newCommit and avoid using it 
(#5212)
2050d841db is described below

commit 2050d841db25e5116a81f0c95d4dac3718a8048f
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 5 17:51:03 2025 +0800

    [core] Refactor FileStore.newCommit and avoid using it (#5212)
---
 .../java/org/apache/paimon/AbstractFileStore.java  | 39 +++++++++++++++++-----
 .../org/apache/paimon/AppendOnlyFileStore.java     | 19 -----------
 .../src/main/java/org/apache/paimon/FileStore.java | 15 +++++++--
 .../java/org/apache/paimon/KeyValueFileStore.java  | 19 -----------
 .../org/apache/paimon/catalog/AbstractCatalog.java | 15 +++------
 .../iceberg/AppendOnlyIcebergCommitCallback.java   | 35 -------------------
 ...mitCallback.java => IcebergCommitCallback.java} | 17 +++++++---
 .../iceberg/PrimaryKeyIcebergCommitCallback.java   | 36 --------------------
 .../paimon/privilege/PrivilegedFileStore.java      | 23 ++++++++++---
 .../java/org/apache/paimon/rest/RESTCatalog.java   | 18 ++++------
 .../paimon/table/AbstractFileStoreTable.java       |  4 +--
 .../apache/paimon/table/sink/BatchTableCommit.java | 15 +++++++++
 .../apache/paimon/table/sink/TableCommitImpl.java  | 16 +++++++++
 .../org/apache/paimon/TestAppendFileStore.java     |  2 +-
 .../test/java/org/apache/paimon/TestFileStore.java |  6 ++--
 .../paimon/operation/FileStoreCommitTest.java      |  4 +--
 .../paimon/operation/PartitionExpireTest.java      |  6 ++--
 .../apache/paimon/operation/TestCommitThread.java  |  2 +-
 .../paimon/table/PrimaryKeyFileStoreTableTest.java |  6 ++--
 .../flink/procedure/ExpirePartitionsProcedure.java | 10 +++---
 .../java/org/apache/paimon/flink/FlinkCatalog.java | 13 ++++----
 .../paimon/flink/action/DropPartitionAction.java   | 17 +++-------
 .../flink/action/ExpirePartitionsAction.java       | 10 +++---
 .../flink/procedure/CompactManifestProcedure.java  | 24 ++++++-------
 .../flink/procedure/DropPartitionProcedure.java    | 21 ++++--------
 .../flink/procedure/ExpirePartitionsProcedure.java | 10 +++---
 .../SupportsRowLevelOperationFlinkTableSink.java   | 19 ++++-------
 .../FileStoreTableStatisticsTestBase.java          | 39 ++++++++++------------
 .../iceberg/IcebergHiveMetadataCommitter.java      |  2 +-
 .../spark/procedure/CompactManifestProcedure.java  | 15 ++++-----
 .../spark/procedure/ExpirePartitionsProcedure.java | 10 +++---
 .../paimon/spark/PaimonPartitionManagement.scala   |  4 +--
 .../commands/DeleteFromPaimonTableCommand.scala    |  6 ++--
 .../commands/PaimonAnalyzeTableColumnCommand.scala |  4 +--
 .../commands/PaimonTruncateTableCommand.scala      |  9 +++--
 35 files changed, 217 insertions(+), 293 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 0ee0d667b2..b7a0350a4b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -25,6 +25,8 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.IcebergCommitCallback;
+import org.apache.paimon.iceberg.IcebergOptions;
 import org.apache.paimon.index.HashIndexFile;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.manifest.IndexManifestFile;
@@ -50,6 +52,7 @@ import org.apache.paimon.stats.StatsFile;
 import org.apache.paimon.stats.StatsFileHandler;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.PartitionHandler;
 import org.apache.paimon.table.sink.CallbackUtils;
 import org.apache.paimon.table.sink.CommitCallback;
@@ -282,7 +285,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
     }
 
     @Override
-    public FileStoreCommitImpl newCommit(String commitUser) {
+    public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable 
table) {
         SnapshotManager snapshotManager = snapshotManager();
         SnapshotCommit snapshotCommit = 
catalogEnvironment.snapshotCommit(snapshotManager);
         if (snapshotCommit == null) {
@@ -313,7 +316,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 newStatsFileHandler(),
                 bucketMode(),
                 options.scanManifestParallelism(),
-                createCommitCallbacks(commitUser),
+                createCommitCallbacks(commitUser, table),
                 options.commitMaxRetries(),
                 options.commitTimeout());
     }
@@ -365,7 +368,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
 
     public abstract Comparator<InternalRow> newKeyComparator();
 
-    protected List<CommitCallback> createCommitCallbacks(String commitUser) {
+    private List<CommitCallback> createCommitCallbacks(String commitUser, 
FileStoreTable table) {
         List<CommitCallback> callbacks =
                 new ArrayList<>(CallbackUtils.loadCommitCallbacks(options));
 
@@ -397,28 +400,48 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
             }
         }
 
+        if 
(options.toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE)
+                != IcebergOptions.StorageType.DISABLED) {
+            callbacks.add(new IcebergCommitCallback(table, commitUser));
+        }
+
         return callbacks;
     }
 
     @Override
     @Nullable
-    public PartitionExpire newPartitionExpire(String commitUser) {
+    public PartitionExpire newPartitionExpire(String commitUser, 
FileStoreTable table) {
         Duration partitionExpireTime = options.partitionExpireTime();
         if (partitionExpireTime == null || partitionType().getFieldCount() == 
0) {
             return null;
         }
 
+        return newPartitionExpire(
+                commitUser,
+                table,
+                partitionExpireTime,
+                options.partitionExpireCheckInterval(),
+                PartitionExpireStrategy.createPartitionExpireStrategy(options, 
partitionType()));
+    }
+
+    @Override
+    public PartitionExpire newPartitionExpire(
+            String commitUser,
+            FileStoreTable table,
+            Duration expirationTime,
+            Duration checkInterval,
+            PartitionExpireStrategy expireStrategy) {
         PartitionHandler partitionHandler = null;
         if (options.partitionedTableInMetastore()) {
             partitionHandler = catalogEnvironment.partitionHandler();
         }
 
         return new PartitionExpire(
-                partitionExpireTime,
-                options.partitionExpireCheckInterval(),
-                PartitionExpireStrategy.createPartitionExpireStrategy(options, 
partitionType()),
+                expirationTime,
+                checkInterval,
+                expireStrategy,
                 newScan(ScanType.FOR_COMMIT),
-                newCommit(commitUser),
+                newCommit(commitUser, table),
                 partitionHandler,
                 options.endInputCheckPartitionExpire(),
                 options.partitionExpireMaxNum());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 4ee81feae7..dc5171a744 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -22,8 +22,6 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.iceberg.AppendOnlyIcebergCommitCallback;
-import org.apache.paimon.iceberg.IcebergOptions;
 import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.operation.AppendOnlyFileStoreScan;
 import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
@@ -34,10 +32,8 @@ import org.apache.paimon.operation.RawFileSplitRead;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.AppendOnlyFileStoreTable;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.CatalogEnvironment;
-import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.types.RowType;
 
 import java.util.Comparator;
@@ -171,19 +167,4 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
     public Comparator<InternalRow> newKeyComparator() {
         return null;
     }
-
-    protected List<CommitCallback> createCommitCallbacks(String commitUser) {
-        List<CommitCallback> callbacks = 
super.createCommitCallbacks(commitUser);
-
-        if 
(options.toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE)
-                != IcebergOptions.StorageType.DISABLED) {
-            callbacks.add(
-                    new AppendOnlyIcebergCommitCallback(
-                            new AppendOnlyFileStoreTable(
-                                    fileIO, pathFactory().root(), schema, 
catalogEnvironment),
-                            commitUser));
-        }
-
-        return callbacks;
-    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 76306eacd7..4d50021b3c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -32,9 +32,11 @@ import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.operation.SnapshotDeletion;
 import org.apache.paimon.operation.SplitRead;
 import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.partition.PartitionExpireStrategy;
 import org.apache.paimon.service.ServiceManager;
 import org.apache.paimon.stats.StatsFileHandler;
 import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.types.RowType;
@@ -48,6 +50,7 @@ import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cach
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.List;
 
 /**
@@ -87,7 +90,7 @@ public interface FileStore<T> {
 
     FileStoreWrite<T> newWrite(String commitUser, ManifestCacheFilter 
manifestFilter);
 
-    FileStoreCommit newCommit(String commitUser);
+    FileStoreCommit newCommit(String commitUser, FileStoreTable table);
 
     SnapshotDeletion newSnapshotDeletion();
 
@@ -98,7 +101,15 @@ public interface FileStore<T> {
     TagDeletion newTagDeletion();
 
     @Nullable
-    PartitionExpire newPartitionExpire(String commitUser);
+    PartitionExpire newPartitionExpire(String commitUser, FileStoreTable 
table);
+
+    @Nullable
+    PartitionExpire newPartitionExpire(
+            String commitUser,
+            FileStoreTable table,
+            Duration expirationTime,
+            Duration checkInterval,
+            PartitionExpireStrategy expireStrategy);
 
     TagAutoManager newTagCreationManager();
 
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 3e18446496..18316901bf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -23,8 +23,6 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.iceberg.IcebergOptions;
-import org.apache.paimon.iceberg.PrimaryKeyIcebergCommitCallback;
 import org.apache.paimon.index.HashIndexMaintainer;
 import org.apache.paimon.index.IndexMaintainer;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
@@ -43,8 +41,6 @@ import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.CatalogEnvironment;
-import org.apache.paimon.table.PrimaryKeyFileStoreTable;
-import org.apache.paimon.table.sink.CommitCallback;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.KeyComparatorSupplier;
@@ -265,19 +261,4 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
     public Comparator<InternalRow> newKeyComparator() {
         return keyComparatorSupplier.get();
     }
-
-    protected List<CommitCallback> createCommitCallbacks(String commitUser) {
-        List<CommitCallback> callbacks = 
super.createCommitCallbacks(commitUser);
-
-        if 
(options.toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE)
-                != IcebergOptions.StorageType.DISABLED) {
-            callbacks.add(
-                    new PrimaryKeyIcebergCommitCallback(
-                            new PrimaryKeyFileStoreTable(
-                                    fileIO, pathFactory().root(), schema, 
catalogEnvironment),
-                            commitUser));
-        }
-
-        return callbacks;
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index b2ef6afd44..7bda21eb51 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -24,7 +24,6 @@ import org.apache.paimon.factories.FactoryUtil;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.FileStatus;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.schema.Schema;
@@ -35,7 +34,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.object.ObjectTable;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.system.SystemTableLoader;
 import org.apache.paimon.types.RowType;
 
@@ -55,7 +54,6 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.CoreOptions.OBJECT_LOCATION;
 import static org.apache.paimon.CoreOptions.PATH;
 import static org.apache.paimon.CoreOptions.TYPE;
-import static org.apache.paimon.CoreOptions.createCommitUser;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
@@ -168,13 +166,10 @@ public abstract class AbstractCatalog implements Catalog {
             throws TableNotExistException {
         checkNotSystemTable(identifier, "dropPartition");
         Table table = getTable(identifier);
-        FileStoreTable fileStoreTable = (FileStoreTable) table;
-        try (FileStoreCommit commit =
-                fileStoreTable
-                        .store()
-                        .newCommit(
-                                
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
-            commit.dropPartitions(partitions, 
BatchWriteBuilder.COMMIT_IDENTIFIER);
+        try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
+            commit.truncatePartitions(partitions);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/AppendOnlyIcebergCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/AppendOnlyIcebergCommitCallback.java
deleted file mode 100644
index 3356107545..0000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/AppendOnlyIcebergCommitCallback.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.iceberg;
-
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.table.FileStoreTable;
-
-/** {@link AbstractIcebergCommitCallback} for append only tables. */
-public class AppendOnlyIcebergCommitCallback extends 
AbstractIcebergCommitCallback {
-
-    public AppendOnlyIcebergCommitCallback(FileStoreTable table, String 
commitUser) {
-        super(table, commitUser);
-    }
-
-    @Override
-    protected boolean shouldAddFileToIceberg(DataFileMeta meta) {
-        return true;
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
similarity index 98%
rename from 
paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
rename to 
paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index 71e5b57fbe..e1b205d9b0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -83,12 +83,12 @@ import java.util.stream.Collectors;
  * A {@link CommitCallback} to create Iceberg compatible metadata, so Iceberg 
readers can read
  * Paimon's {@link RawFile}.
  */
-public abstract class AbstractIcebergCommitCallback implements CommitCallback {
+public class IcebergCommitCallback implements CommitCallback {
 
     // see org.apache.iceberg.hadoop.Util
     private static final String VERSION_HINT_FILENAME = "version-hint.text";
 
-    protected final FileStoreTable table;
+    private final FileStoreTable table;
     private final String commitUser;
 
     private final IcebergPathFactory pathFactory;
@@ -102,7 +102,7 @@ public abstract class AbstractIcebergCommitCallback 
implements CommitCallback {
     // Public interface
     // 
-------------------------------------------------------------------------------------
 
-    public AbstractIcebergCommitCallback(FileStoreTable table, String 
commitUser) {
+    public IcebergCommitCallback(FileStoreTable table, String commitUser) {
         this.table = table;
         this.commitUser = commitUser;
 
@@ -125,7 +125,7 @@ public abstract class AbstractIcebergCommitCallback 
implements CommitCallback {
         try {
             metadataCommitterFactory =
                     FactoryUtil.discoverFactory(
-                            
AbstractIcebergCommitCallback.class.getClassLoader(),
+                            IcebergCommitCallback.class.getClassLoader(),
                             IcebergMetadataCommitterFactory.class,
                             storageType.toString());
         } catch (FactoryException ignore) {
@@ -488,7 +488,14 @@ public abstract class AbstractIcebergCommitCallback 
implements CommitCallback {
                 addedFiles);
     }
 
-    protected abstract boolean shouldAddFileToIceberg(DataFileMeta meta);
+    private boolean shouldAddFileToIceberg(DataFileMeta meta) {
+        if (table.primaryKeys().isEmpty()) {
+            return true;
+        } else {
+            int maxLevel = table.coreOptions().numLevels() - 1;
+            return meta.level() == maxLevel;
+        }
+    }
 
     private List<IcebergManifestFileMeta> createNewlyAddedManifestFileMetas(
             Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles, long 
currentSnapshotId)
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/PrimaryKeyIcebergCommitCallback.java
 
b/paimon-core/src/main/java/org/apache/paimon/iceberg/PrimaryKeyIcebergCommitCallback.java
deleted file mode 100644
index 12b9c3e604..0000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/iceberg/PrimaryKeyIcebergCommitCallback.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.iceberg;
-
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.table.FileStoreTable;
-
-/** {@link AbstractIcebergCommitCallback} for primary key tables. */
-public class PrimaryKeyIcebergCommitCallback extends 
AbstractIcebergCommitCallback {
-
-    public PrimaryKeyIcebergCommitCallback(FileStoreTable table, String 
commitUser) {
-        super(table, commitUser);
-    }
-
-    @Override
-    protected boolean shouldAddFileToIceberg(DataFileMeta meta) {
-        int maxLevel = table.coreOptions().numLevels() - 1;
-        return meta.level() == maxLevel;
-    }
-}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index ed899ba833..ebcac8252c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -36,9 +36,11 @@ import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.operation.SnapshotDeletion;
 import org.apache.paimon.operation.SplitRead;
 import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.partition.PartitionExpireStrategy;
 import org.apache.paimon.service.ServiceManager;
 import org.apache.paimon.stats.StatsFileHandler;
 import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.sink.TagCallback;
 import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.types.RowType;
@@ -52,6 +54,7 @@ import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cach
 
 import javax.annotation.Nullable;
 
+import java.time.Duration;
 import java.util.List;
 
 /** {@link FileStore} with privilege checks. */
@@ -150,9 +153,9 @@ public class PrivilegedFileStore<T> implements FileStore<T> 
{
     }
 
     @Override
-    public FileStoreCommit newCommit(String commitUser) {
+    public FileStoreCommit newCommit(String commitUser, FileStoreTable table) {
         privilegeChecker.assertCanInsert(identifier);
-        return wrapped.newCommit(commitUser);
+        return wrapped.newCommit(commitUser, table);
     }
 
     @Override
@@ -181,9 +184,21 @@ public class PrivilegedFileStore<T> implements 
FileStore<T> {
 
     @Nullable
     @Override
-    public PartitionExpire newPartitionExpire(String commitUser) {
+    public PartitionExpire newPartitionExpire(String commitUser, 
FileStoreTable table) {
         privilegeChecker.assertCanInsert(identifier);
-        return wrapped.newPartitionExpire(commitUser);
+        return wrapped.newPartitionExpire(commitUser, table);
+    }
+
+    @Override
+    public PartitionExpire newPartitionExpire(
+            String commitUser,
+            FileStoreTable table,
+            Duration expirationTime,
+            Duration checkInterval,
+            PartitionExpireStrategy expireStrategy) {
+        privilegeChecker.assertCanInsert(identifier);
+        return wrapped.newPartitionExpire(
+                commitUser, table, expirationTime, checkInterval, 
expireStrategy);
     }
 
     @Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 7ded01995e..2016427b89 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -31,7 +31,6 @@ import org.apache.paimon.catalog.SupportsSnapshots;
 import org.apache.paimon.catalog.TableMetadata;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.partition.Partition;
 import org.apache.paimon.rest.auth.AuthSession;
@@ -76,9 +75,8 @@ import org.apache.paimon.rest.responses.ListViewsResponse;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
 import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.view.View;
 import org.apache.paimon.view.ViewImpl;
@@ -105,7 +103,6 @@ import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.stream.Collectors;
 
-import static org.apache.paimon.CoreOptions.createCommitUser;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
 import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
@@ -613,14 +610,11 @@ public class RESTCatalog implements Catalog, 
SupportsSnapshots, SupportsBranches
             throw new TableNotExistException(identifier);
         } catch (NotImplementedException ignored) {
             // not a metastore partitioned table
-            FileStoreTable fileStoreTable = (FileStoreTable) 
getTable(identifier);
-            try (FileStoreCommit commit =
-                    fileStoreTable
-                            .store()
-                            .newCommit(
-                                    createCommitUser(
-                                            
fileStoreTable.coreOptions().toConfiguration()))) {
-                commit.dropPartitions(partitions, 
BatchWriteBuilder.COMMIT_IDENTIFIER);
+            try (BatchTableCommit commit =
+                    getTable(identifier).newBatchWriteBuilder().newCommit()) {
+                commit.truncatePartitions(partitions);
+            } catch (Exception e) {
+                throw new RuntimeException(e);
             }
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 1771a5998f..46554e24d6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -455,9 +455,9 @@ abstract class AbstractFileStoreTable implements 
FileStoreTable {
         }
 
         return new TableCommitImpl(
-                store().newCommit(commitUser),
+                store().newCommit(commitUser, this),
                 snapshotExpire,
-                options.writeOnly() ? null : 
store().newPartitionExpire(commitUser),
+                options.writeOnly() ? null : 
store().newPartitionExpire(commitUser, this),
                 options.writeOnly() ? null : store().newTagCreationManager(),
                 CoreOptions.fromMap(options()).consumerExpireTime(),
                 new ConsumerManager(fileIO, path, snapshotManager().branch()),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java
index f0c9b59e31..84215980d5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java
@@ -18,9 +18,12 @@
 
 package org.apache.paimon.table.sink;
 
+import org.apache.paimon.Snapshot.CommitKind;
 import org.apache.paimon.annotation.Public;
+import org.apache.paimon.stats.Statistics;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * A {@link TableCommit} for batch processing. Recommended for one-time 
committing.
@@ -56,4 +59,16 @@ public interface BatchTableCommit extends TableCommit {
      * logically deleted and will be deleted after the snapshot expires.
      */
     void truncateTable();
+
+    /**
+     * Truncate partitions, like normal {@link #commit}, files are not 
immediately deleted, they are
+     * only logically deleted and will be deleted after the snapshot expires.
+     */
+    void truncatePartitions(List<Map<String, String>> partitionSpecs);
+
+    /** Commit new statistics. Generates a snapshot with {@link 
CommitKind#ANALYZE}. */
+    void updateStatistics(Statistics statistics);
+
+    /** Compact the manifest entries. Generates a snapshot with {@link 
CommitKind#COMPACT}. */
+    void compactManifests();
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 44c89ae06b..e91153f5bc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -29,6 +29,7 @@ import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.operation.metrics.CommitMetrics;
+import org.apache.paimon.stats.Statistics;
 import org.apache.paimon.tag.TagAutoManager;
 import org.apache.paimon.utils.DataFilePathFactories;
 import org.apache.paimon.utils.ExecutorThreadFactory;
@@ -165,6 +166,21 @@ public class TableCommitImpl implements InnerTableCommit {
         commit.truncateTable(COMMIT_IDENTIFIER);
     }
 
+    @Override
+    public void truncatePartitions(List<Map<String, String>> partitionSpecs) {
+        commit.dropPartitions(partitionSpecs, COMMIT_IDENTIFIER);
+    }
+
+    @Override
+    public void updateStatistics(Statistics statistics) {
+        commit.commitStatistics(statistics, COMMIT_IDENTIFIER);
+    }
+
+    @Override
+    public void compactManifests() {
+        commit.compactManifest();
+    }
+
     private void checkCommitted() {
         checkState(!batchCommitted, "BatchTableCommit only support one-time 
committing.");
         batchCommitted = true;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
index 94f5777c10..3c794e146c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
@@ -93,7 +93,7 @@ public class TestAppendFileStore extends AppendOnlyFileStore {
     }
 
     public FileStoreCommitImpl newCommit() {
-        return super.newCommit(commitUser);
+        return super.newCommit(commitUser, null);
     }
 
     public void commit(CommitMessage... commitMessages) {
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 2187418ee0..0da4489e3f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -157,7 +157,7 @@ public class TestFileStore extends KeyValueFileStore {
     }
 
     public FileStoreCommitImpl newCommit() {
-        return super.newCommit(commitUser);
+        return super.newCommit(commitUser, null);
     }
 
     public ExpireSnapshots newExpire(int numRetainedMin, int numRetainedMax, 
long millisRetained) {
@@ -262,7 +262,7 @@ public class TestFileStore extends KeyValueFileStore {
             snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
         }
 
-        try (FileStoreCommit commit = newCommit(commitUser)) {
+        try (FileStoreCommit commit = newCommit(commitUser, null)) {
             commit.dropPartitions(partitions, Long.MAX_VALUE);
         }
 
@@ -352,7 +352,7 @@ public class TestFileStore extends KeyValueFileStore {
             snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
         }
 
-        try (FileStoreCommit commit = newCommit(commitUser)) {
+        try (FileStoreCommit commit = newCommit(commitUser, null)) {
             commitFunction.accept(commit, committable);
         }
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index b40dc32d53..561a144c5c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -187,7 +187,7 @@ public class FileStoreCommitTest {
         Path firstSnapshotPath = 
snapshotManager.snapshotPath(Snapshot.FIRST_SNAPSHOT_ID);
         LocalFileIO.create().deleteQuietly(firstSnapshotPath);
         // this test succeeds if this call does not fail
-        try (FileStoreCommit commit = 
store.newCommit(UUID.randomUUID().toString())) {
+        try (FileStoreCommit commit = 
store.newCommit(UUID.randomUUID().toString(), null)) {
             commit.filterCommitted(Collections.singletonList(new 
ManifestCommittable(999L)));
         }
     }
@@ -208,7 +208,7 @@ public class FileStoreCommitTest {
         }
 
         // all commit identifiers should be filtered out
-        try (FileStoreCommit commit = store.newCommit(user)) {
+        try (FileStoreCommit commit = store.newCommit(user, null)) {
             assertThat(
                             commit.filterCommitted(
                                     commitIdentifiers.stream()
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 438fe3fcbb..e0c5ac696b 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -115,7 +115,8 @@ public class PartitionExpireTest {
                                 table.store()
                                         .newCommit(
                                                 createCommitUser(
-                                                        
table.coreOptions().toConfiguration()))) {
+                                                        
table.coreOptions().toConfiguration()),
+                                                null)) {
                             commit.dropPartitions(partitions, 
BatchWriteBuilder.COMMIT_IDENTIFIER);
                         }
                     }
@@ -365,7 +366,8 @@ public class PartitionExpireTest {
     }
 
     private PartitionExpire newExpire() {
-        return newExpireTable().store().newPartitionExpire("");
+        FileStoreTable table = newExpireTable();
+        return table.store().newPartitionExpire("", table);
     }
 
     private FileStoreTable newExpireTable() {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
index 872cb554f5..ba735594a9 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
@@ -96,7 +96,7 @@ public class TestCommitThread extends Thread {
         this.write = safeStore.newWrite(commitUser);
         this.commit =
                 retryArtificialException(
-                        () -> 
testStore.newCommit(commitUser).ignoreEmptyCommit(false));
+                        () -> testStore.newCommit(commitUser, 
null).ignoreEmptyCommit(false));
 
         this.commitIdentifier = 0;
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 9427ed61c9..94bc52305d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -31,7 +31,6 @@ import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.BundleRecords;
 import org.apache.paimon.manifest.FileKind;
-import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
@@ -89,7 +88,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -1023,8 +1021,8 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         reader.close();
 
         // truncate table
-        FileStoreCommit truncateCommit = 
table.store().newCommit(UUID.randomUUID().toString());
-        truncateCommit.truncateTable(2);
+        BatchTableCommit truncateCommit = 
table.newBatchWriteBuilder().newCommit();
+        truncateCommit.truncateTable();
         truncateCommit.close();
 
         // test parquet row ranges filtering
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
index 36e2bd1f09..2b0e0d1c68 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
+++ 
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
@@ -79,15 +79,13 @@ public class ExpirePartitionsProcedure extends 
ProcedureBase {
         map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), 
timestampPattern);
 
         PartitionExpire partitionExpire =
-                new PartitionExpire(
+                fileStore.newPartitionExpire(
+                        "",
+                        fileStoreTable,
                         TimeUtils.parseDuration(expirationTime),
                         Duration.ofMillis(0L),
                         createPartitionExpireStrategy(
-                                CoreOptions.fromMap(map), 
fileStore.partitionType()),
-                        fileStore.newScan(),
-                        fileStore.newCommit(""),
-                        fileStoreTable.catalogEnvironment().partitionHandler(),
-                        fileStore.options().partitionExpireMaxNum());
+                                CoreOptions.fromMap(map), 
fileStore.partitionType()));
         if (maxExpires != null) {
             partitionExpire.withMaxExpireNum(maxExpires);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 876c51244c..ac59c827b4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -30,7 +30,6 @@ import org.apache.paimon.flink.procedure.ProcedureUtil;
 import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
 import org.apache.paimon.flink.utils.FlinkDescriptorProperties;
 import org.apache.paimon.manifest.PartitionEntry;
-import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
@@ -39,7 +38,7 @@ import org.apache.paimon.stats.Statistics;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataTypeRoot;
@@ -1550,12 +1549,12 @@ public class FlinkCatalog extends AbstractCatalog {
                 return;
             }
 
-            FileStoreTable storeTable = (FileStoreTable) table;
-            Statistics tableStats = statistics.apply(storeTable);
+            Statistics tableStats = statistics.apply((FileStoreTable) table);
             if (tableStats != null) {
-                String commitUser = 
storeTable.coreOptions().createCommitUser();
-                try (FileStoreCommit commit = 
storeTable.store().newCommit(commitUser)) {
-                    commit.commitStatistics(tableStats, 
BatchWriteBuilder.COMMIT_IDENTIFIER);
+                try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
+                    commit.updateStatistics(tableStats);
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
                 }
             }
         } catch (Catalog.TableNotExistException e) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
index 6f568dfe61..86a5a10b80 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
@@ -18,20 +18,17 @@
 
 package org.apache.paimon.flink.action;
 
-import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.BatchTableCommit;
 
 import java.util.List;
 import java.util.Map;
 
-import static org.apache.paimon.CoreOptions.createCommitUser;
-
 /** Table drop partition action for Flink. */
 public class DropPartitionAction extends TableActionBase {
 
     private final List<Map<String, String>> partitions;
-    private final FileStoreCommit commit;
+    private final BatchTableCommit commit;
 
     public DropPartitionAction(
             String databaseName,
@@ -47,17 +44,11 @@ public class DropPartitionAction extends TableActionBase {
         }
 
         this.partitions = partitions;
-
-        FileStoreTable fileStoreTable = (FileStoreTable) table;
-        this.commit =
-                fileStoreTable
-                        .store()
-                        .newCommit(
-                                
createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
+        this.commit = table.newBatchWriteBuilder().newCommit();
     }
 
     @Override
     public void run() throws Exception {
-        commit.dropPartitions(partitions, BatchWriteBuilder.COMMIT_IDENTIFIER);
+        commit.truncatePartitions(partitions);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
index 7448d8f4d3..b025c3a954 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
@@ -57,15 +57,13 @@ public class ExpirePartitionsAction extends TableActionBase 
{
         FileStoreTable fileStoreTable = (FileStoreTable) table;
         FileStore<?> fileStore = fileStoreTable.store();
         this.partitionExpire =
-                new PartitionExpire(
+                fileStore.newPartitionExpire(
+                        "",
+                        fileStoreTable,
                         TimeUtils.parseDuration(expirationTime),
                         Duration.ofMillis(0L),
                         createPartitionExpireStrategy(
-                                CoreOptions.fromMap(map), 
fileStore.partitionType()),
-                        fileStore.newScan(),
-                        fileStore.newCommit(""),
-                        fileStoreTable.catalogEnvironment().partitionHandler(),
-                        fileStore.options().partitionExpireMaxNum());
+                                CoreOptions.fromMap(map), 
fileStore.partitionType()));
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java
index 3e52322a6f..c6b0a170b0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java
@@ -19,8 +19,8 @@
 package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.operation.FileStoreCommit;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
 
 import org.apache.flink.table.annotation.ArgumentHint;
 import org.apache.flink.table.annotation.DataTypeHint;
@@ -42,18 +42,14 @@ public class CompactManifestProcedure extends ProcedureBase 
{
     @ProcedureHint(argument = {@ArgumentHint(name = "table", type = 
@DataTypeHint("STRING"))})
     public String[] call(ProcedureContext procedureContext, String tableId) 
throws Exception {
 
-        FileStoreTable table =
-                (FileStoreTable)
-                        table(tableId)
-                                .copy(
-                                        Collections.singletonMap(
-                                                
CoreOptions.COMMIT_USER_PREFIX.key(), COMMIT_USER));
-
-        try (FileStoreCommit commit =
-                table.store()
-                        .newCommit(table.coreOptions().createCommitUser())
-                        .ignoreEmptyCommit(false)) {
-            commit.compactManifest();
+        Table table =
+                table(tableId)
+                        .copy(
+                                Collections.singletonMap(
+                                        CoreOptions.COMMIT_USER_PREFIX.key(), 
COMMIT_USER));
+
+        try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
+            commit.compactManifests();
         }
         return new String[] {"success"};
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
index 3a231758f4..cef7c33f34 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
@@ -20,14 +20,12 @@ package org.apache.paimon.flink.procedure;
 
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.operation.FileStoreCommit;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.utils.ParameterUtils;
 
 import org.apache.flink.table.procedure.ProcedureContext;
 
-import static org.apache.paimon.CoreOptions.createCommitUser;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /**
@@ -50,16 +48,11 @@ public class DropPartitionProcedure extends ProcedureBase {
         checkArgument(
                 partitionStrings.length > 0, "drop-partition procedure must 
specify partitions.");
 
-        FileStoreTable fileStoreTable =
-                (FileStoreTable) 
catalog.getTable(Identifier.fromString(tableId));
-        try (FileStoreCommit commit =
-                fileStoreTable
-                        .store()
-                        .newCommit(
-                                
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
-            commit.dropPartitions(
-                    ParameterUtils.getPartitions(partitionStrings),
-                    BatchWriteBuilder.COMMIT_IDENTIFIER);
+        Table table = catalog.getTable(Identifier.fromString(tableId));
+        try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
+            
commit.truncatePartitions(ParameterUtils.getPartitions(partitionStrings));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
 
         return new String[] {"Success"};
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
index b9435e12ed..582b4711fd 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
@@ -83,15 +83,13 @@ public class ExpirePartitionsProcedure extends 
ProcedureBase {
         map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), 
timestampPattern);
 
         PartitionExpire partitionExpire =
-                new PartitionExpire(
+                fileStore.newPartitionExpire(
+                        "",
+                        fileStoreTable,
                         TimeUtils.parseDuration(expirationTime),
                         Duration.ofMillis(0L),
                         createPartitionExpireStrategy(
-                                CoreOptions.fromMap(map), 
fileStore.partitionType()),
-                        fileStore.newScan(),
-                        fileStore.newCommit(""),
-                        fileStoreTable.catalogEnvironment().partitionHandler(),
-                        fileStore.options().partitionExpireMaxNum());
+                                CoreOptions.fromMap(map), 
fileStore.partitionType()));
         if (maxExpires != null) {
             partitionExpire.withMaxExpireNum(maxExpires);
         }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
index c0d19abff2..86be6f347a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
@@ -23,14 +23,12 @@ import org.apache.paimon.CoreOptions.MergeEngine;
 import org.apache.paimon.flink.LogicalTypeConversion;
 import org.apache.paimon.flink.PredicateConverter;
 import org.apache.paimon.flink.log.LogStoreTableFactory;
-import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.BatchTableCommit;
 
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -60,7 +58,6 @@ import static 
org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
 import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
 import static 
org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE;
 import static 
org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP;
-import static org.apache.paimon.CoreOptions.createCommitUser;
 import static 
org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
@@ -163,20 +160,16 @@ public abstract class 
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
 
     @Override
     public Optional<Long> executeDeletion() {
-        FileStoreTable fileStoreTable = (FileStoreTable) table;
-        try (FileStoreCommit commit =
-                fileStoreTable
-                        .store()
-                        .newCommit(
-                                
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
-            long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
+        try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
             if (deletePredicate == null) {
-                commit.truncateTable(identifier);
+                commit.truncateTable();
             } else {
                 checkArgument(deleteIsDropPartition());
-                
commit.dropPartitions(Collections.singletonList(deletePartitions()), 
identifier);
+                
commit.truncatePartitions(Collections.singletonList(deletePartitions()));
             }
             return Optional.empty();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
     }
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
index 42a47ea1e2..90e5e6e811 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
@@ -18,19 +18,18 @@
 
 package org.apache.paimon.flink.source.statistics;
 
-import org.apache.paimon.FileStore;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.flink.source.DataTableSource;
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.stats.ColStats;
 import org.apache.paimon.stats.Statistics;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.types.BigIntType;
@@ -83,14 +82,13 @@ public abstract class FileStoreTableStatisticsTestBase {
                         null,
                         null));
 
-        FileStore<?> fileStore = table.store();
-        FileStoreCommit fileStoreCommit = fileStore.newCommit(commitUser);
-        Snapshot latestSnapshot = fileStore.snapshotManager().latestSnapshot();
+        BatchTableCommit commit = table.newBatchWriteBuilder().newCommit();
+        Snapshot latestSnapshot = table.latestSnapshot().get();
         Statistics colStats =
                 new Statistics(
                         latestSnapshot.id(), latestSnapshot.schemaId(), 9L, 
null, colStatsMap);
-        fileStoreCommit.commitStatistics(colStats, Long.MAX_VALUE);
-        fileStoreCommit.close();
+        commit.updateStatistics(colStats);
+        commit.close();
         DataTableSource scanSource = new DataTableSource(identifier, table, 
false, null, null);
         
Assertions.assertThat(scanSource.reportStatistics().getRowCount()).isEqualTo(9L);
         Map<String, ColumnStats> expectedColStats = new HashMap<>();
@@ -170,14 +168,13 @@ public abstract class FileStoreTableStatisticsTestBase {
                         null,
                         null));
 
-        FileStore<?> fileStore = table.store();
-        FileStoreCommit fileStoreCommit = fileStore.newCommit(commitUser);
-        Snapshot latestSnapshot = fileStore.snapshotManager().latestSnapshot();
+        BatchTableCommit commit = table.newBatchWriteBuilder().newCommit();
+        Snapshot latestSnapshot = table.latestSnapshot().get();
         Statistics colStats =
                 new Statistics(
                         latestSnapshot.id(), latestSnapshot.schemaId(), 9L, 
null, colStatsMap);
-        fileStoreCommit.commitStatistics(colStats, Long.MAX_VALUE);
-        fileStoreCommit.close();
+        commit.updateStatistics(colStats);
+        commit.close();
 
         Map<String, ColumnStats> expectedColStats = new HashMap<>();
         expectedColStats.put(
@@ -249,14 +246,13 @@ public abstract class FileStoreTableStatisticsTestBase {
                         null,
                         null));
 
-        FileStore<?> fileStore = table.store();
-        FileStoreCommit fileStoreCommit = fileStore.newCommit(commitUser);
-        Snapshot latestSnapshot = fileStore.snapshotManager().latestSnapshot();
+        BatchTableCommit commit = table.newBatchWriteBuilder().newCommit();
+        Snapshot latestSnapshot = table.latestSnapshot().get();
         Statistics colStats =
                 new Statistics(
                         latestSnapshot.id(), latestSnapshot.schemaId(), 9L, 
null, colStatsMap);
-        fileStoreCommit.commitStatistics(colStats, Long.MAX_VALUE);
-        fileStoreCommit.close();
+        commit.updateStatistics(colStats);
+        commit.close();
 
         Map<String, ColumnStats> expectedColStats = new HashMap<>();
         expectedColStats.put(
@@ -328,14 +324,13 @@ public abstract class FileStoreTableStatisticsTestBase {
                         null,
                         null));
 
-        FileStore<?> fileStore = table.store();
-        FileStoreCommit fileStoreCommit = fileStore.newCommit(commitUser);
-        Snapshot latestSnapshot = fileStore.snapshotManager().latestSnapshot();
+        BatchTableCommit commit = table.newBatchWriteBuilder().newCommit();
+        Snapshot latestSnapshot = table.latestSnapshot().get();
         Statistics colStats =
                 new Statistics(
                         latestSnapshot.id(), latestSnapshot.schemaId(), 9L, 
null, colStatsMap);
-        fileStoreCommit.commitStatistics(colStats, Long.MAX_VALUE);
-        fileStoreCommit.close();
+        commit.updateStatistics(colStats);
+        commit.close();
 
         Map<String, ColumnStats> expectedColStats = new HashMap<>();
         expectedColStats.put(
diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
index 5eb2bfd27a..faba89fffd 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
@@ -50,7 +50,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.stream.Collectors;
 
-import static 
org.apache.paimon.iceberg.AbstractIcebergCommitCallback.catalogDatabasePath;
+import static 
org.apache.paimon.iceberg.IcebergCommitCallback.catalogDatabasePath;
 
 /**
  * {@link IcebergMetadataCommitter} to commit Iceberg metadata to Hive 
metastore, so the table can
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java
index dd064d892c..12a3a286f4 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java
@@ -18,8 +18,8 @@
 
 package org.apache.paimon.spark.procedure;
 
-import org.apache.paimon.operation.FileStoreCommit;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
 
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.catalog.Identifier;
@@ -67,13 +67,12 @@ public class CompactManifestProcedure extends BaseProcedure 
{
     public InternalRow[] call(InternalRow args) {
 
         Identifier tableIdent = toIdentifier(args.getString(0), 
PARAMETERS[0].name());
-        FileStoreTable table = (FileStoreTable) 
loadSparkTable(tableIdent).getTable();
+        Table table = loadSparkTable(tableIdent).getTable();
 
-        try (FileStoreCommit commit =
-                table.store()
-                        .newCommit(table.coreOptions().createCommitUser())
-                        .ignoreEmptyCommit(false)) {
-            commit.compactManifest();
+        try (BatchTableCommit commit = 
table.newBatchWriteBuilder().newCommit()) {
+            commit.compactManifests();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
         }
 
         return new InternalRow[] {newInternalRow(true)};
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
index 4b9d50db8d..dd9388b67a 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
@@ -93,15 +93,13 @@ public class ExpirePartitionsProcedure extends 
BaseProcedure {
                     map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(), 
timestampPattern);
 
                     PartitionExpire partitionExpire =
-                            new PartitionExpire(
+                            fileStore.newPartitionExpire(
+                                    "",
+                                    fileStoreTable,
                                     TimeUtils.parseDuration(expirationTime),
                                     Duration.ofMillis(0L),
                                     createPartitionExpireStrategy(
-                                            CoreOptions.fromMap(map), 
fileStore.partitionType()),
-                                    fileStore.newScan(),
-                                    fileStore.newCommit(""),
-                                    
fileStoreTable.catalogEnvironment().partitionHandler(),
-                                    
fileStore.options().partitionExpireMaxNum());
+                                            CoreOptions.fromMap(map), 
fileStore.partitionType()));
                     if (maxExpires != null) {
                         partitionExpire.withMaxExpireNum(maxExpires);
                     }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index 1817add879..64ebf0ed97 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -75,9 +75,9 @@ trait PaimonPartitionManagement extends 
SupportsAtomicPartitionManagement {
             partitionHandler.close()
           }
         } else {
-          val commit: FileStoreCommit = 
fileStoreTable.store.newCommit(UUID.randomUUID.toString)
+          val commit = fileStoreTable.newBatchWriteBuilder().newCommit()
           try {
-            commit.dropPartitions(partitions, 
BatchWriteBuilder.COMMIT_IDENTIFIER)
+            commit.truncatePartitions(partitions)
           } finally {
             commit.close()
           }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 799551aa6b..222cbe31e7 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -54,9 +54,9 @@ case class DeleteFromPaimonTableCommand(
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
 
-    val commit = fileStore.newCommit(UUID.randomUUID.toString)
+    val commit = table.newBatchWriteBuilder().newCommit()
     if (condition == null || condition == TrueLiteral) {
-      commit.truncateTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
+      commit.truncateTable()
     } else {
       val (partitionCondition, otherCondition) = 
splitPruePartitionAndOtherPredicates(
         condition,
@@ -94,7 +94,7 @@ case class DeleteFromPaimonTableCommand(
           partition => 
rowDataPartitionComputer.generatePartValues(partition).asScala.asJava
         }
         if (dropPartitions.nonEmpty) {
-          commit.dropPartitions(dropPartitions.asJava, 
BatchWriteBuilder.COMMIT_IDENTIFIER)
+          commit.truncatePartitions(dropPartitions.asJava)
         } else {
           writer.commit(Seq.empty)
         }
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
index 9a88ca2e4c..c9a18bd9fd 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
@@ -92,8 +92,8 @@ case class PaimonAnalyzeTableColumnCommand(
       colStatsMap)
 
     // commit stats
-    val commit = table.store.newCommit(UUID.randomUUID.toString)
-    commit.commitStatistics(stats, BatchWriteBuilder.COMMIT_IDENTIFIER)
+    val commit = table.newBatchWriteBuilder().newCommit()
+    commit.updateStatistics(stats)
     commit.close()
 
     Seq.empty[Row]
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
index 9d501e11f9..f55c5011e2 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
@@ -37,14 +37,13 @@ case class PaimonTruncateTableCommand(v2Table: SparkTable, 
partitionSpec: TableP
   override def table: FileStoreTable = 
v2Table.getTable.asInstanceOf[FileStoreTable]
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    val commit = table.store.newCommit(UUID.randomUUID.toString)
+    val commit = table.newBatchWriteBuilder().newCommit()
 
     if (partitionSpec.isEmpty) {
-      commit.truncateTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
+      commit.truncateTable()
     } else {
-      commit.dropPartitions(
-        Collections.singletonList(partitionSpec.asJava),
-        BatchWriteBuilder.COMMIT_IDENTIFIER
+      commit.truncatePartitions(
+        Collections.singletonList(partitionSpec.asJava)
       )
     }
 

Reply via email to