This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2050d841db [core] Refactor FileStore.newCommit and avoid using it
(#5212)
2050d841db is described below
commit 2050d841db25e5116a81f0c95d4dac3718a8048f
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 5 17:51:03 2025 +0800
[core] Refactor FileStore.newCommit and avoid using it (#5212)
---
.../java/org/apache/paimon/AbstractFileStore.java | 39 +++++++++++++++++-----
.../org/apache/paimon/AppendOnlyFileStore.java | 19 -----------
.../src/main/java/org/apache/paimon/FileStore.java | 15 +++++++--
.../java/org/apache/paimon/KeyValueFileStore.java | 19 -----------
.../org/apache/paimon/catalog/AbstractCatalog.java | 15 +++------
.../iceberg/AppendOnlyIcebergCommitCallback.java | 35 -------------------
...mitCallback.java => IcebergCommitCallback.java} | 17 +++++++---
.../iceberg/PrimaryKeyIcebergCommitCallback.java | 36 --------------------
.../paimon/privilege/PrivilegedFileStore.java | 23 ++++++++++---
.../java/org/apache/paimon/rest/RESTCatalog.java | 18 ++++------
.../paimon/table/AbstractFileStoreTable.java | 4 +--
.../apache/paimon/table/sink/BatchTableCommit.java | 15 +++++++++
.../apache/paimon/table/sink/TableCommitImpl.java | 16 +++++++++
.../org/apache/paimon/TestAppendFileStore.java | 2 +-
.../test/java/org/apache/paimon/TestFileStore.java | 6 ++--
.../paimon/operation/FileStoreCommitTest.java | 4 +--
.../paimon/operation/PartitionExpireTest.java | 6 ++--
.../apache/paimon/operation/TestCommitThread.java | 2 +-
.../paimon/table/PrimaryKeyFileStoreTableTest.java | 6 ++--
.../flink/procedure/ExpirePartitionsProcedure.java | 10 +++---
.../java/org/apache/paimon/flink/FlinkCatalog.java | 13 ++++----
.../paimon/flink/action/DropPartitionAction.java | 17 +++-------
.../flink/action/ExpirePartitionsAction.java | 10 +++---
.../flink/procedure/CompactManifestProcedure.java | 24 ++++++-------
.../flink/procedure/DropPartitionProcedure.java | 21 ++++--------
.../flink/procedure/ExpirePartitionsProcedure.java | 10 +++---
.../SupportsRowLevelOperationFlinkTableSink.java | 19 ++++-------
.../FileStoreTableStatisticsTestBase.java | 39 ++++++++++------------
.../iceberg/IcebergHiveMetadataCommitter.java | 2 +-
.../spark/procedure/CompactManifestProcedure.java | 15 ++++-----
.../spark/procedure/ExpirePartitionsProcedure.java | 10 +++---
.../paimon/spark/PaimonPartitionManagement.scala | 4 +--
.../commands/DeleteFromPaimonTableCommand.scala | 6 ++--
.../commands/PaimonAnalyzeTableColumnCommand.scala | 4 +--
.../commands/PaimonTruncateTableCommand.scala | 9 +++--
35 files changed, 217 insertions(+), 293 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 0ee0d667b2..b7a0350a4b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -25,6 +25,8 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.IcebergCommitCallback;
+import org.apache.paimon.iceberg.IcebergOptions;
import org.apache.paimon.index.HashIndexFile;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.manifest.IndexManifestFile;
@@ -50,6 +52,7 @@ import org.apache.paimon.stats.StatsFile;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.PartitionHandler;
import org.apache.paimon.table.sink.CallbackUtils;
import org.apache.paimon.table.sink.CommitCallback;
@@ -282,7 +285,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
}
@Override
- public FileStoreCommitImpl newCommit(String commitUser) {
+ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable
table) {
SnapshotManager snapshotManager = snapshotManager();
SnapshotCommit snapshotCommit =
catalogEnvironment.snapshotCommit(snapshotManager);
if (snapshotCommit == null) {
@@ -313,7 +316,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
newStatsFileHandler(),
bucketMode(),
options.scanManifestParallelism(),
- createCommitCallbacks(commitUser),
+ createCommitCallbacks(commitUser, table),
options.commitMaxRetries(),
options.commitTimeout());
}
@@ -365,7 +368,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
public abstract Comparator<InternalRow> newKeyComparator();
- protected List<CommitCallback> createCommitCallbacks(String commitUser) {
+ private List<CommitCallback> createCommitCallbacks(String commitUser,
FileStoreTable table) {
List<CommitCallback> callbacks =
new ArrayList<>(CallbackUtils.loadCommitCallbacks(options));
@@ -397,28 +400,48 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
}
}
+ if
(options.toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE)
+ != IcebergOptions.StorageType.DISABLED) {
+ callbacks.add(new IcebergCommitCallback(table, commitUser));
+ }
+
return callbacks;
}
@Override
@Nullable
- public PartitionExpire newPartitionExpire(String commitUser) {
+ public PartitionExpire newPartitionExpire(String commitUser,
FileStoreTable table) {
Duration partitionExpireTime = options.partitionExpireTime();
if (partitionExpireTime == null || partitionType().getFieldCount() ==
0) {
return null;
}
+ return newPartitionExpire(
+ commitUser,
+ table,
+ partitionExpireTime,
+ options.partitionExpireCheckInterval(),
+ PartitionExpireStrategy.createPartitionExpireStrategy(options,
partitionType()));
+ }
+
+ @Override
+ public PartitionExpire newPartitionExpire(
+ String commitUser,
+ FileStoreTable table,
+ Duration expirationTime,
+ Duration checkInterval,
+ PartitionExpireStrategy expireStrategy) {
PartitionHandler partitionHandler = null;
if (options.partitionedTableInMetastore()) {
partitionHandler = catalogEnvironment.partitionHandler();
}
return new PartitionExpire(
- partitionExpireTime,
- options.partitionExpireCheckInterval(),
- PartitionExpireStrategy.createPartitionExpireStrategy(options,
partitionType()),
+ expirationTime,
+ checkInterval,
+ expireStrategy,
newScan(ScanType.FOR_COMMIT),
- newCommit(commitUser),
+ newCommit(commitUser, table),
partitionHandler,
options.endInputCheckPartitionExpire(),
options.partitionExpireMaxNum());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 4ee81feae7..dc5171a744 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -22,8 +22,6 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.iceberg.AppendOnlyIcebergCommitCallback;
-import org.apache.paimon.iceberg.IcebergOptions;
import org.apache.paimon.manifest.ManifestCacheFilter;
import org.apache.paimon.operation.AppendOnlyFileStoreScan;
import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
@@ -34,10 +32,8 @@ import org.apache.paimon.operation.RawFileSplitRead;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
-import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.types.RowType;
import java.util.Comparator;
@@ -171,19 +167,4 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
public Comparator<InternalRow> newKeyComparator() {
return null;
}
-
- protected List<CommitCallback> createCommitCallbacks(String commitUser) {
- List<CommitCallback> callbacks =
super.createCommitCallbacks(commitUser);
-
- if
(options.toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE)
- != IcebergOptions.StorageType.DISABLED) {
- callbacks.add(
- new AppendOnlyIcebergCommitCallback(
- new AppendOnlyFileStoreTable(
- fileIO, pathFactory().root(), schema,
catalogEnvironment),
- commitUser));
- }
-
- return callbacks;
- }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 76306eacd7..4d50021b3c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -32,9 +32,11 @@ import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.partition.PartitionExpireStrategy;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
@@ -48,6 +50,7 @@ import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cach
import javax.annotation.Nullable;
+import java.time.Duration;
import java.util.List;
/**
@@ -87,7 +90,7 @@ public interface FileStore<T> {
FileStoreWrite<T> newWrite(String commitUser, ManifestCacheFilter
manifestFilter);
- FileStoreCommit newCommit(String commitUser);
+ FileStoreCommit newCommit(String commitUser, FileStoreTable table);
SnapshotDeletion newSnapshotDeletion();
@@ -98,7 +101,15 @@ public interface FileStore<T> {
TagDeletion newTagDeletion();
@Nullable
- PartitionExpire newPartitionExpire(String commitUser);
+ PartitionExpire newPartitionExpire(String commitUser, FileStoreTable
table);
+
+ @Nullable
+ PartitionExpire newPartitionExpire(
+ String commitUser,
+ FileStoreTable table,
+ Duration expirationTime,
+ Duration checkInterval,
+ PartitionExpireStrategy expireStrategy);
TagAutoManager newTagCreationManager();
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 3e18446496..18316901bf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -23,8 +23,6 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.iceberg.IcebergOptions;
-import org.apache.paimon.iceberg.PrimaryKeyIcebergCommitCallback;
import org.apache.paimon.index.HashIndexMaintainer;
import org.apache.paimon.index.IndexMaintainer;
import org.apache.paimon.io.KeyValueFileReaderFactory;
@@ -43,8 +41,6 @@ import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
-import org.apache.paimon.table.PrimaryKeyFileStoreTable;
-import org.apache.paimon.table.sink.CommitCallback;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.KeyComparatorSupplier;
@@ -265,19 +261,4 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
public Comparator<InternalRow> newKeyComparator() {
return keyComparatorSupplier.get();
}
-
- protected List<CommitCallback> createCommitCallbacks(String commitUser) {
- List<CommitCallback> callbacks =
super.createCommitCallbacks(commitUser);
-
- if
(options.toConfiguration().get(IcebergOptions.METADATA_ICEBERG_STORAGE)
- != IcebergOptions.StorageType.DISABLED) {
- callbacks.add(
- new PrimaryKeyIcebergCommitCallback(
- new PrimaryKeyFileStoreTable(
- fileIO, pathFactory().root(), schema,
catalogEnvironment),
- commitUser));
- }
-
- return callbacks;
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index b2ef6afd44..7bda21eb51 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -24,7 +24,6 @@ import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.schema.Schema;
@@ -35,7 +34,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.object.ObjectTable;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.system.SystemTableLoader;
import org.apache.paimon.types.RowType;
@@ -55,7 +54,6 @@ import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.OBJECT_LOCATION;
import static org.apache.paimon.CoreOptions.PATH;
import static org.apache.paimon.CoreOptions.TYPE;
-import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
@@ -168,13 +166,10 @@ public abstract class AbstractCatalog implements Catalog {
throws TableNotExistException {
checkNotSystemTable(identifier, "dropPartition");
Table table = getTable(identifier);
- FileStoreTable fileStoreTable = (FileStoreTable) table;
- try (FileStoreCommit commit =
- fileStoreTable
- .store()
- .newCommit(
-
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
- commit.dropPartitions(partitions,
BatchWriteBuilder.COMMIT_IDENTIFIER);
+ try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
+ commit.truncatePartitions(partitions);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/AppendOnlyIcebergCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/AppendOnlyIcebergCommitCallback.java
deleted file mode 100644
index 3356107545..0000000000
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/AppendOnlyIcebergCommitCallback.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.iceberg;
-
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.table.FileStoreTable;
-
-/** {@link AbstractIcebergCommitCallback} for append only tables. */
-public class AppendOnlyIcebergCommitCallback extends
AbstractIcebergCommitCallback {
-
- public AppendOnlyIcebergCommitCallback(FileStoreTable table, String
commitUser) {
- super(table, commitUser);
- }
-
- @Override
- protected boolean shouldAddFileToIceberg(DataFileMeta meta) {
- return true;
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
similarity index 98%
rename from
paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
rename to
paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
index 71e5b57fbe..e1b205d9b0 100644
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java
+++
b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergCommitCallback.java
@@ -83,12 +83,12 @@ import java.util.stream.Collectors;
* A {@link CommitCallback} to create Iceberg compatible metadata, so Iceberg
readers can read
* Paimon's {@link RawFile}.
*/
-public abstract class AbstractIcebergCommitCallback implements CommitCallback {
+public class IcebergCommitCallback implements CommitCallback {
// see org.apache.iceberg.hadoop.Util
private static final String VERSION_HINT_FILENAME = "version-hint.text";
- protected final FileStoreTable table;
+ private final FileStoreTable table;
private final String commitUser;
private final IcebergPathFactory pathFactory;
@@ -102,7 +102,7 @@ public abstract class AbstractIcebergCommitCallback
implements CommitCallback {
// Public interface
//
-------------------------------------------------------------------------------------
- public AbstractIcebergCommitCallback(FileStoreTable table, String
commitUser) {
+ public IcebergCommitCallback(FileStoreTable table, String commitUser) {
this.table = table;
this.commitUser = commitUser;
@@ -125,7 +125,7 @@ public abstract class AbstractIcebergCommitCallback
implements CommitCallback {
try {
metadataCommitterFactory =
FactoryUtil.discoverFactory(
-
AbstractIcebergCommitCallback.class.getClassLoader(),
+ IcebergCommitCallback.class.getClassLoader(),
IcebergMetadataCommitterFactory.class,
storageType.toString());
} catch (FactoryException ignore) {
@@ -488,7 +488,14 @@ public abstract class AbstractIcebergCommitCallback
implements CommitCallback {
addedFiles);
}
- protected abstract boolean shouldAddFileToIceberg(DataFileMeta meta);
+ private boolean shouldAddFileToIceberg(DataFileMeta meta) {
+ if (table.primaryKeys().isEmpty()) {
+ return true;
+ } else {
+ int maxLevel = table.coreOptions().numLevels() - 1;
+ return meta.level() == maxLevel;
+ }
+ }
private List<IcebergManifestFileMeta> createNewlyAddedManifestFileMetas(
Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles, long
currentSnapshotId)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/iceberg/PrimaryKeyIcebergCommitCallback.java
b/paimon-core/src/main/java/org/apache/paimon/iceberg/PrimaryKeyIcebergCommitCallback.java
deleted file mode 100644
index 12b9c3e604..0000000000
---
a/paimon-core/src/main/java/org/apache/paimon/iceberg/PrimaryKeyIcebergCommitCallback.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.iceberg;
-
-import org.apache.paimon.io.DataFileMeta;
-import org.apache.paimon.table.FileStoreTable;
-
-/** {@link AbstractIcebergCommitCallback} for primary key tables. */
-public class PrimaryKeyIcebergCommitCallback extends
AbstractIcebergCommitCallback {
-
- public PrimaryKeyIcebergCommitCallback(FileStoreTable table, String
commitUser) {
- super(table, commitUser);
- }
-
- @Override
- protected boolean shouldAddFileToIceberg(DataFileMeta meta) {
- int maxLevel = table.coreOptions().numLevels() - 1;
- return meta.level() == maxLevel;
- }
-}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
index ed899ba833..ebcac8252c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
+++
b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java
@@ -36,9 +36,11 @@ import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.SplitRead;
import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.partition.PartitionExpireStrategy;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.TagCallback;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.types.RowType;
@@ -52,6 +54,7 @@ import
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cach
import javax.annotation.Nullable;
+import java.time.Duration;
import java.util.List;
/** {@link FileStore} with privilege checks. */
@@ -150,9 +153,9 @@ public class PrivilegedFileStore<T> implements FileStore<T>
{
}
@Override
- public FileStoreCommit newCommit(String commitUser) {
+ public FileStoreCommit newCommit(String commitUser, FileStoreTable table) {
privilegeChecker.assertCanInsert(identifier);
- return wrapped.newCommit(commitUser);
+ return wrapped.newCommit(commitUser, table);
}
@Override
@@ -181,9 +184,21 @@ public class PrivilegedFileStore<T> implements
FileStore<T> {
@Nullable
@Override
- public PartitionExpire newPartitionExpire(String commitUser) {
+ public PartitionExpire newPartitionExpire(String commitUser,
FileStoreTable table) {
privilegeChecker.assertCanInsert(identifier);
- return wrapped.newPartitionExpire(commitUser);
+ return wrapped.newPartitionExpire(commitUser, table);
+ }
+
+ @Override
+ public PartitionExpire newPartitionExpire(
+ String commitUser,
+ FileStoreTable table,
+ Duration expirationTime,
+ Duration checkInterval,
+ PartitionExpireStrategy expireStrategy) {
+ privilegeChecker.assertCanInsert(identifier);
+ return wrapped.newPartitionExpire(
+ commitUser, table, expirationTime, checkInterval,
expireStrategy);
}
@Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 7ded01995e..2016427b89 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -31,7 +31,6 @@ import org.apache.paimon.catalog.SupportsSnapshots;
import org.apache.paimon.catalog.TableMetadata;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.rest.auth.AuthSession;
@@ -76,9 +75,8 @@ import org.apache.paimon.rest.responses.ListViewsResponse;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.view.View;
import org.apache.paimon.view.ViewImpl;
@@ -105,7 +103,6 @@ import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
-import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
@@ -613,14 +610,11 @@ public class RESTCatalog implements Catalog,
SupportsSnapshots, SupportsBranches
throw new TableNotExistException(identifier);
} catch (NotImplementedException ignored) {
// not a metastore partitioned table
- FileStoreTable fileStoreTable = (FileStoreTable)
getTable(identifier);
- try (FileStoreCommit commit =
- fileStoreTable
- .store()
- .newCommit(
- createCommitUser(
-
fileStoreTable.coreOptions().toConfiguration()))) {
- commit.dropPartitions(partitions,
BatchWriteBuilder.COMMIT_IDENTIFIER);
+ try (BatchTableCommit commit =
+ getTable(identifier).newBatchWriteBuilder().newCommit()) {
+ commit.truncatePartitions(partitions);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
index 1771a5998f..46554e24d6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java
@@ -455,9 +455,9 @@ abstract class AbstractFileStoreTable implements
FileStoreTable {
}
return new TableCommitImpl(
- store().newCommit(commitUser),
+ store().newCommit(commitUser, this),
snapshotExpire,
- options.writeOnly() ? null :
store().newPartitionExpire(commitUser),
+ options.writeOnly() ? null :
store().newPartitionExpire(commitUser, this),
options.writeOnly() ? null : store().newTagCreationManager(),
CoreOptions.fromMap(options()).consumerExpireTime(),
new ConsumerManager(fileIO, path, snapshotManager().branch()),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java
index f0c9b59e31..84215980d5 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchTableCommit.java
@@ -18,9 +18,12 @@
package org.apache.paimon.table.sink;
+import org.apache.paimon.Snapshot.CommitKind;
import org.apache.paimon.annotation.Public;
+import org.apache.paimon.stats.Statistics;
import java.util.List;
+import java.util.Map;
/**
* A {@link TableCommit} for batch processing. Recommended for one-time
committing.
@@ -56,4 +59,16 @@ public interface BatchTableCommit extends TableCommit {
* logically deleted and will be deleted after the snapshot expires.
*/
void truncateTable();
+
+ /**
+ * Truncate partitions, like normal {@link #commit}, files are not
immediately deleted, they are
+ * only logically deleted and will be deleted after the snapshot expires.
+ */
+ void truncatePartitions(List<Map<String, String>> partitionSpecs);
+
+ /** Commit new statistics. Generates a snapshot with {@link
CommitKind#ANALYZE}. */
+ void updateStatistics(Statistics statistics);
+
+ /** Compact the manifest entries. Generates a snapshot with {@link
CommitKind#COMPACT}. */
+ void compactManifests();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 44c89ae06b..e91153f5bc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -29,6 +29,7 @@ import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.metrics.CommitMetrics;
+import org.apache.paimon.stats.Statistics;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.utils.DataFilePathFactories;
import org.apache.paimon.utils.ExecutorThreadFactory;
@@ -165,6 +166,21 @@ public class TableCommitImpl implements InnerTableCommit {
commit.truncateTable(COMMIT_IDENTIFIER);
}
+ @Override
+ public void truncatePartitions(List<Map<String, String>> partitionSpecs) {
+ commit.dropPartitions(partitionSpecs, COMMIT_IDENTIFIER);
+ }
+
+ @Override
+ public void updateStatistics(Statistics statistics) {
+ commit.commitStatistics(statistics, COMMIT_IDENTIFIER);
+ }
+
+ @Override
+ public void compactManifests() {
+ commit.compactManifest();
+ }
+
private void checkCommitted() {
checkState(!batchCommitted, "BatchTableCommit only support one-time
committing.");
batchCommitted = true;
diff --git
a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
index 94f5777c10..3c794e146c 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
@@ -93,7 +93,7 @@ public class TestAppendFileStore extends AppendOnlyFileStore {
}
public FileStoreCommitImpl newCommit() {
- return super.newCommit(commitUser);
+ return super.newCommit(commitUser, null);
}
public void commit(CommitMessage... commitMessages) {
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 2187418ee0..0da4489e3f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -157,7 +157,7 @@ public class TestFileStore extends KeyValueFileStore {
}
public FileStoreCommitImpl newCommit() {
- return super.newCommit(commitUser);
+ return super.newCommit(commitUser, null);
}
public ExpireSnapshots newExpire(int numRetainedMin, int numRetainedMax,
long millisRetained) {
@@ -262,7 +262,7 @@ public class TestFileStore extends KeyValueFileStore {
snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
}
- try (FileStoreCommit commit = newCommit(commitUser)) {
+ try (FileStoreCommit commit = newCommit(commitUser, null)) {
commit.dropPartitions(partitions, Long.MAX_VALUE);
}
@@ -352,7 +352,7 @@ public class TestFileStore extends KeyValueFileStore {
snapshotIdBeforeCommit = Snapshot.FIRST_SNAPSHOT_ID - 1;
}
- try (FileStoreCommit commit = newCommit(commitUser)) {
+ try (FileStoreCommit commit = newCommit(commitUser, null)) {
commitFunction.accept(commit, committable);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index b40dc32d53..561a144c5c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -187,7 +187,7 @@ public class FileStoreCommitTest {
Path firstSnapshotPath =
snapshotManager.snapshotPath(Snapshot.FIRST_SNAPSHOT_ID);
LocalFileIO.create().deleteQuietly(firstSnapshotPath);
// this test succeeds if this call does not fail
- try (FileStoreCommit commit =
store.newCommit(UUID.randomUUID().toString())) {
+ try (FileStoreCommit commit =
store.newCommit(UUID.randomUUID().toString(), null)) {
commit.filterCommitted(Collections.singletonList(new
ManifestCommittable(999L)));
}
}
@@ -208,7 +208,7 @@ public class FileStoreCommitTest {
}
// all commit identifiers should be filtered out
- try (FileStoreCommit commit = store.newCommit(user)) {
+ try (FileStoreCommit commit = store.newCommit(user, null)) {
assertThat(
commit.filterCommitted(
commitIdentifiers.stream()
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index 438fe3fcbb..e0c5ac696b 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -115,7 +115,8 @@ public class PartitionExpireTest {
table.store()
.newCommit(
createCommitUser(
-
table.coreOptions().toConfiguration()))) {
+
table.coreOptions().toConfiguration()),
+ null)) {
commit.dropPartitions(partitions,
BatchWriteBuilder.COMMIT_IDENTIFIER);
}
}
@@ -365,7 +366,8 @@ public class PartitionExpireTest {
}
private PartitionExpire newExpire() {
- return newExpireTable().store().newPartitionExpire("");
+ FileStoreTable table = newExpireTable();
+ return table.store().newPartitionExpire("", table);
}
private FileStoreTable newExpireTable() {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
index 872cb554f5..ba735594a9 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
@@ -96,7 +96,7 @@ public class TestCommitThread extends Thread {
this.write = safeStore.newWrite(commitUser);
this.commit =
retryArtificialException(
- () ->
testStore.newCommit(commitUser).ignoreEmptyCommit(false));
+ () -> testStore.newCommit(commitUser,
null).ignoreEmptyCommit(false));
this.commitIdentifier = 0;
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index 9427ed61c9..94bc52305d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -31,7 +31,6 @@ import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.BundleRecords;
import org.apache.paimon.manifest.FileKind;
-import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
@@ -89,7 +88,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
@@ -1023,8 +1021,8 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
reader.close();
// truncate table
- FileStoreCommit truncateCommit =
table.store().newCommit(UUID.randomUUID().toString());
- truncateCommit.truncateTable(2);
+ BatchTableCommit truncateCommit =
table.newBatchWriteBuilder().newCommit();
+ truncateCommit.truncateTable();
truncateCommit.close();
// test parquet row ranges filtering
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
index 36e2bd1f09..2b0e0d1c68 100644
---
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
+++
b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
@@ -79,15 +79,13 @@ public class ExpirePartitionsProcedure extends
ProcedureBase {
map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(),
timestampPattern);
PartitionExpire partitionExpire =
- new PartitionExpire(
+ fileStore.newPartitionExpire(
+ "",
+ fileStoreTable,
TimeUtils.parseDuration(expirationTime),
Duration.ofMillis(0L),
createPartitionExpireStrategy(
- CoreOptions.fromMap(map),
fileStore.partitionType()),
- fileStore.newScan(),
- fileStore.newCommit(""),
- fileStoreTable.catalogEnvironment().partitionHandler(),
- fileStore.options().partitionExpireMaxNum());
+ CoreOptions.fromMap(map),
fileStore.partitionType()));
if (maxExpires != null) {
partitionExpire.withMaxExpireNum(maxExpires);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index 876c51244c..ac59c827b4 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -30,7 +30,6 @@ import org.apache.paimon.flink.procedure.ProcedureUtil;
import org.apache.paimon.flink.utils.FlinkCatalogPropertiesUtil;
import org.apache.paimon.flink.utils.FlinkDescriptorProperties;
import org.apache.paimon.manifest.PartitionEntry;
-import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
@@ -39,7 +38,7 @@ import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypeRoot;
@@ -1550,12 +1549,12 @@ public class FlinkCatalog extends AbstractCatalog {
return;
}
- FileStoreTable storeTable = (FileStoreTable) table;
- Statistics tableStats = statistics.apply(storeTable);
+ Statistics tableStats = statistics.apply((FileStoreTable) table);
if (tableStats != null) {
- String commitUser =
storeTable.coreOptions().createCommitUser();
- try (FileStoreCommit commit =
storeTable.store().newCommit(commitUser)) {
- commit.commitStatistics(tableStats,
BatchWriteBuilder.COMMIT_IDENTIFIER);
+ try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
+ commit.updateStatistics(tableStats);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
} catch (Catalog.TableNotExistException e) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
index 6f568dfe61..86a5a10b80 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/DropPartitionAction.java
@@ -18,20 +18,17 @@
package org.apache.paimon.flink.action;
-import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.BatchTableCommit;
import java.util.List;
import java.util.Map;
-import static org.apache.paimon.CoreOptions.createCommitUser;
-
/** Table drop partition action for Flink. */
public class DropPartitionAction extends TableActionBase {
private final List<Map<String, String>> partitions;
- private final FileStoreCommit commit;
+ private final BatchTableCommit commit;
public DropPartitionAction(
String databaseName,
@@ -47,17 +44,11 @@ public class DropPartitionAction extends TableActionBase {
}
this.partitions = partitions;
-
- FileStoreTable fileStoreTable = (FileStoreTable) table;
- this.commit =
- fileStoreTable
- .store()
- .newCommit(
-
createCommitUser(fileStoreTable.coreOptions().toConfiguration()));
+ this.commit = table.newBatchWriteBuilder().newCommit();
}
@Override
public void run() throws Exception {
- commit.dropPartitions(partitions, BatchWriteBuilder.COMMIT_IDENTIFIER);
+ commit.truncatePartitions(partitions);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
index 7448d8f4d3..b025c3a954 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java
@@ -57,15 +57,13 @@ public class ExpirePartitionsAction extends TableActionBase
{
FileStoreTable fileStoreTable = (FileStoreTable) table;
FileStore<?> fileStore = fileStoreTable.store();
this.partitionExpire =
- new PartitionExpire(
+ fileStore.newPartitionExpire(
+ "",
+ fileStoreTable,
TimeUtils.parseDuration(expirationTime),
Duration.ofMillis(0L),
createPartitionExpireStrategy(
- CoreOptions.fromMap(map),
fileStore.partitionType()),
- fileStore.newScan(),
- fileStore.newCommit(""),
- fileStoreTable.catalogEnvironment().partitionHandler(),
- fileStore.options().partitionExpireMaxNum());
+ CoreOptions.fromMap(map),
fileStore.partitionType()));
}
@Override
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java
index 3e52322a6f..c6b0a170b0 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/CompactManifestProcedure.java
@@ -19,8 +19,8 @@
package org.apache.paimon.flink.procedure;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.operation.FileStoreCommit;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
@@ -42,18 +42,14 @@ public class CompactManifestProcedure extends ProcedureBase
{
@ProcedureHint(argument = {@ArgumentHint(name = "table", type =
@DataTypeHint("STRING"))})
public String[] call(ProcedureContext procedureContext, String tableId)
throws Exception {
- FileStoreTable table =
- (FileStoreTable)
- table(tableId)
- .copy(
- Collections.singletonMap(
-
CoreOptions.COMMIT_USER_PREFIX.key(), COMMIT_USER));
-
- try (FileStoreCommit commit =
- table.store()
- .newCommit(table.coreOptions().createCommitUser())
- .ignoreEmptyCommit(false)) {
- commit.compactManifest();
+ Table table =
+ table(tableId)
+ .copy(
+ Collections.singletonMap(
+ CoreOptions.COMMIT_USER_PREFIX.key(),
COMMIT_USER));
+
+ try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
+ commit.compactManifests();
}
return new String[] {"success"};
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
index 3a231758f4..cef7c33f34 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropPartitionProcedure.java
@@ -20,14 +20,12 @@ package org.apache.paimon.flink.procedure;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
-import org.apache.paimon.operation.FileStoreCommit;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.flink.table.procedure.ProcedureContext;
-import static org.apache.paimon.CoreOptions.createCommitUser;
import static org.apache.paimon.utils.Preconditions.checkArgument;
/**
@@ -50,16 +48,11 @@ public class DropPartitionProcedure extends ProcedureBase {
checkArgument(
partitionStrings.length > 0, "drop-partition procedure must
specify partitions.");
- FileStoreTable fileStoreTable =
- (FileStoreTable)
catalog.getTable(Identifier.fromString(tableId));
- try (FileStoreCommit commit =
- fileStoreTable
- .store()
- .newCommit(
-
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
- commit.dropPartitions(
- ParameterUtils.getPartitions(partitionStrings),
- BatchWriteBuilder.COMMIT_IDENTIFIER);
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
+
commit.truncatePartitions(ParameterUtils.getPartitions(partitionStrings));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
return new String[] {"Success"};
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
index b9435e12ed..582b4711fd 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java
@@ -83,15 +83,13 @@ public class ExpirePartitionsProcedure extends
ProcedureBase {
map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(),
timestampPattern);
PartitionExpire partitionExpire =
- new PartitionExpire(
+ fileStore.newPartitionExpire(
+ "",
+ fileStoreTable,
TimeUtils.parseDuration(expirationTime),
Duration.ofMillis(0L),
createPartitionExpireStrategy(
- CoreOptions.fromMap(map),
fileStore.partitionType()),
- fileStore.newScan(),
- fileStore.newCommit(""),
- fileStoreTable.catalogEnvironment().partitionHandler(),
- fileStore.options().partitionExpireMaxNum());
+ CoreOptions.fromMap(map),
fileStore.partitionType()));
if (maxExpires != null) {
partitionExpire.withMaxExpireNum(maxExpires);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
index c0d19abff2..86be6f347a 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
@@ -23,14 +23,12 @@ import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.PredicateConverter;
import org.apache.paimon.flink.log.LogStoreTableFactory;
-import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectIdentifier;
@@ -60,7 +58,6 @@ import static
org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
import static
org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE;
import static
org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP;
-import static org.apache.paimon.CoreOptions.createCommitUser;
import static
org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -163,20 +160,16 @@ public abstract class
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
@Override
public Optional<Long> executeDeletion() {
- FileStoreTable fileStoreTable = (FileStoreTable) table;
- try (FileStoreCommit commit =
- fileStoreTable
- .store()
- .newCommit(
-
createCommitUser(fileStoreTable.coreOptions().toConfiguration()))) {
- long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER;
+ try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
if (deletePredicate == null) {
- commit.truncateTable(identifier);
+ commit.truncateTable();
} else {
checkArgument(deleteIsDropPartition());
-
commit.dropPartitions(Collections.singletonList(deletePartitions()),
identifier);
+
commit.truncatePartitions(Collections.singletonList(deletePartitions()));
}
return Optional.empty();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
index 42a47ea1e2..90e5e6e811 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java
@@ -18,19 +18,18 @@
package org.apache.paimon.flink.source.statistics;
-import org.apache.paimon.FileStore;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.source.DataTableSource;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.stats.ColStats;
import org.apache.paimon.stats.Statistics;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.types.BigIntType;
@@ -83,14 +82,13 @@ public abstract class FileStoreTableStatisticsTestBase {
null,
null));
- FileStore<?> fileStore = table.store();
- FileStoreCommit fileStoreCommit = fileStore.newCommit(commitUser);
- Snapshot latestSnapshot = fileStore.snapshotManager().latestSnapshot();
+ BatchTableCommit commit = table.newBatchWriteBuilder().newCommit();
+ Snapshot latestSnapshot = table.latestSnapshot().get();
Statistics colStats =
new Statistics(
latestSnapshot.id(), latestSnapshot.schemaId(), 9L,
null, colStatsMap);
- fileStoreCommit.commitStatistics(colStats, Long.MAX_VALUE);
- fileStoreCommit.close();
+ commit.updateStatistics(colStats);
+ commit.close();
DataTableSource scanSource = new DataTableSource(identifier, table,
false, null, null);
Assertions.assertThat(scanSource.reportStatistics().getRowCount()).isEqualTo(9L);
Map<String, ColumnStats> expectedColStats = new HashMap<>();
@@ -170,14 +168,13 @@ public abstract class FileStoreTableStatisticsTestBase {
null,
null));
- FileStore<?> fileStore = table.store();
- FileStoreCommit fileStoreCommit = fileStore.newCommit(commitUser);
- Snapshot latestSnapshot = fileStore.snapshotManager().latestSnapshot();
+ BatchTableCommit commit = table.newBatchWriteBuilder().newCommit();
+ Snapshot latestSnapshot = table.latestSnapshot().get();
Statistics colStats =
new Statistics(
latestSnapshot.id(), latestSnapshot.schemaId(), 9L,
null, colStatsMap);
- fileStoreCommit.commitStatistics(colStats, Long.MAX_VALUE);
- fileStoreCommit.close();
+ commit.updateStatistics(colStats);
+ commit.close();
Map<String, ColumnStats> expectedColStats = new HashMap<>();
expectedColStats.put(
@@ -249,14 +246,13 @@ public abstract class FileStoreTableStatisticsTestBase {
null,
null));
- FileStore<?> fileStore = table.store();
- FileStoreCommit fileStoreCommit = fileStore.newCommit(commitUser);
- Snapshot latestSnapshot = fileStore.snapshotManager().latestSnapshot();
+ BatchTableCommit commit = table.newBatchWriteBuilder().newCommit();
+ Snapshot latestSnapshot = table.latestSnapshot().get();
Statistics colStats =
new Statistics(
latestSnapshot.id(), latestSnapshot.schemaId(), 9L,
null, colStatsMap);
- fileStoreCommit.commitStatistics(colStats, Long.MAX_VALUE);
- fileStoreCommit.close();
+ commit.updateStatistics(colStats);
+ commit.close();
Map<String, ColumnStats> expectedColStats = new HashMap<>();
expectedColStats.put(
@@ -328,14 +324,13 @@ public abstract class FileStoreTableStatisticsTestBase {
null,
null));
- FileStore<?> fileStore = table.store();
- FileStoreCommit fileStoreCommit = fileStore.newCommit(commitUser);
- Snapshot latestSnapshot = fileStore.snapshotManager().latestSnapshot();
+ BatchTableCommit commit = table.newBatchWriteBuilder().newCommit();
+ Snapshot latestSnapshot = table.latestSnapshot().get();
Statistics colStats =
new Statistics(
latestSnapshot.id(), latestSnapshot.schemaId(), 9L,
null, colStatsMap);
- fileStoreCommit.commitStatistics(colStats, Long.MAX_VALUE);
- fileStoreCommit.close();
+ commit.updateStatistics(colStats);
+ commit.close();
Map<String, ColumnStats> expectedColStats = new HashMap<>();
expectedColStats.put(
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
index 5eb2bfd27a..faba89fffd 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/IcebergHiveMetadataCommitter.java
@@ -50,7 +50,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.stream.Collectors;
-import static
org.apache.paimon.iceberg.AbstractIcebergCommitCallback.catalogDatabasePath;
+import static
org.apache.paimon.iceberg.IcebergCommitCallback.catalogDatabasePath;
/**
* {@link IcebergMetadataCommitter} to commit Iceberg metadata to Hive
metastore, so the table can
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java
index dd064d892c..12a3a286f4 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactManifestProcedure.java
@@ -18,8 +18,8 @@
package org.apache.paimon.spark.procedure;
-import org.apache.paimon.operation.FileStoreCommit;
-import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.catalog.Identifier;
@@ -67,13 +67,12 @@ public class CompactManifestProcedure extends BaseProcedure
{
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0),
PARAMETERS[0].name());
- FileStoreTable table = (FileStoreTable)
loadSparkTable(tableIdent).getTable();
+ Table table = loadSparkTable(tableIdent).getTable();
- try (FileStoreCommit commit =
- table.store()
- .newCommit(table.coreOptions().createCommitUser())
- .ignoreEmptyCommit(false)) {
- commit.compactManifest();
+ try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
+ commit.compactManifests();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
}
return new InternalRow[] {newInternalRow(true)};
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
index 4b9d50db8d..dd9388b67a 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java
@@ -93,15 +93,13 @@ public class ExpirePartitionsProcedure extends
BaseProcedure {
map.put(CoreOptions.PARTITION_TIMESTAMP_PATTERN.key(),
timestampPattern);
PartitionExpire partitionExpire =
- new PartitionExpire(
+ fileStore.newPartitionExpire(
+ "",
+ fileStoreTable,
TimeUtils.parseDuration(expirationTime),
Duration.ofMillis(0L),
createPartitionExpireStrategy(
- CoreOptions.fromMap(map),
fileStore.partitionType()),
- fileStore.newScan(),
- fileStore.newCommit(""),
-
fileStoreTable.catalogEnvironment().partitionHandler(),
-
fileStore.options().partitionExpireMaxNum());
+ CoreOptions.fromMap(map),
fileStore.partitionType()));
if (maxExpires != null) {
partitionExpire.withMaxExpireNum(maxExpires);
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
index 1817add879..64ebf0ed97 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/PaimonPartitionManagement.scala
@@ -75,9 +75,9 @@ trait PaimonPartitionManagement extends
SupportsAtomicPartitionManagement {
partitionHandler.close()
}
} else {
- val commit: FileStoreCommit =
fileStoreTable.store.newCommit(UUID.randomUUID.toString)
+ val commit = fileStoreTable.newBatchWriteBuilder().newCommit()
try {
- commit.dropPartitions(partitions,
BatchWriteBuilder.COMMIT_IDENTIFIER)
+ commit.truncatePartitions(partitions)
} finally {
commit.close()
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index 799551aa6b..222cbe31e7 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -54,9 +54,9 @@ case class DeleteFromPaimonTableCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
- val commit = fileStore.newCommit(UUID.randomUUID.toString)
+ val commit = table.newBatchWriteBuilder().newCommit()
if (condition == null || condition == TrueLiteral) {
- commit.truncateTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
+ commit.truncateTable()
} else {
val (partitionCondition, otherCondition) =
splitPruePartitionAndOtherPredicates(
condition,
@@ -94,7 +94,7 @@ case class DeleteFromPaimonTableCommand(
partition =>
rowDataPartitionComputer.generatePartValues(partition).asScala.asJava
}
if (dropPartitions.nonEmpty) {
- commit.dropPartitions(dropPartitions.asJava,
BatchWriteBuilder.COMMIT_IDENTIFIER)
+ commit.truncatePartitions(dropPartitions.asJava)
} else {
writer.commit(Seq.empty)
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
index 9a88ca2e4c..c9a18bd9fd 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonAnalyzeTableColumnCommand.scala
@@ -92,8 +92,8 @@ case class PaimonAnalyzeTableColumnCommand(
colStatsMap)
// commit stats
- val commit = table.store.newCommit(UUID.randomUUID.toString)
- commit.commitStatistics(stats, BatchWriteBuilder.COMMIT_IDENTIFIER)
+ val commit = table.newBatchWriteBuilder().newCommit()
+ commit.updateStatistics(stats)
commit.close()
Seq.empty[Row]
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
index 9d501e11f9..f55c5011e2 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonTruncateTableCommand.scala
@@ -37,14 +37,13 @@ case class PaimonTruncateTableCommand(v2Table: SparkTable,
partitionSpec: TableP
override def table: FileStoreTable =
v2Table.getTable.asInstanceOf[FileStoreTable]
override def run(sparkSession: SparkSession): Seq[Row] = {
- val commit = table.store.newCommit(UUID.randomUUID.toString)
+ val commit = table.newBatchWriteBuilder().newCommit()
if (partitionSpec.isEmpty) {
- commit.truncateTable(BatchWriteBuilder.COMMIT_IDENTIFIER)
+ commit.truncateTable()
} else {
- commit.dropPartitions(
- Collections.singletonList(partitionSpec.asJava),
- BatchWriteBuilder.COMMIT_IDENTIFIER
+ commit.truncatePartitions(
+ Collections.singletonList(partitionSpec.asJava)
)
}