This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 24d28dd960 [hotfix] Minor refactor for FileStoreCommitImpl
24d28dd960 is described below
commit 24d28dd9609c578ad484df628395c94eb57e50e3
Author: JingsongLi <[email protected]>
AuthorDate: Fri Sep 19 15:43:41 2025 +0800
[hotfix] Minor refactor for FileStoreCommitImpl
---
.../paimon/operation/FileStoreCommitImpl.java | 40 +++++++++++++---------
1 file changed, 23 insertions(+), 17 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 300cc9c926..466df201a3 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -20,6 +20,7 @@ package org.apache.paimon.operation;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.Snapshot.CommitKind;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.SnapshotCommit;
import org.apache.paimon.data.BinaryRow;
@@ -320,9 +321,12 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
// This optimization is mainly used to decrease the number of
times we read from
// files.
latestSnapshot = snapshotManager.latestSnapshot();
- boolean hasDelete = hasDelete(appendSimpleEntries,
appendIndexFiles);
- Snapshot.CommitKind commitKind =
- hasDelete ? Snapshot.CommitKind.OVERWRITE :
Snapshot.CommitKind.APPEND;
+ CommitKind commitKind = CommitKind.APPEND;
+ ConflictCheck conflictCheck = noConflictCheck();
+ if (containsFileDeletionOrDeletionVectors(appendSimpleEntries,
appendIndexFiles)) {
+ commitKind = CommitKind.OVERWRITE;
+ conflictCheck = mustConflictCheck();
+ }
if (latestSnapshot != null && checkAppendFiles) {
// it is possible that some partitions only have compact
changes,
@@ -348,7 +352,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
committable.logOffsets(),
committable.properties(),
commitKind,
- hasDelete ? mustConflictCheck() :
noConflictCheck(),
+ conflictCheck,
null);
generatedSnapshot += 1;
}
@@ -369,7 +373,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
latestSnapshot.commitUser(),
baseEntries,
SimpleFileEntry.from(compactTableFiles),
- Snapshot.CommitKind.COMPACT);
+ CommitKind.COMPACT);
// assume this compact commit follows just after the
append commit created above
safeLatestSnapshotId += 1;
}
@@ -383,7 +387,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
committable.watermark(),
committable.logOffsets(),
committable.properties(),
- Snapshot.CommitKind.COMPACT,
+ CommitKind.COMPACT,
hasConflictChecked(safeLatestSnapshotId),
null);
generatedSnapshot += 1;
@@ -428,7 +432,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
commitMetrics.reportCommit(commitStats);
}
- private boolean hasDelete(
+ private boolean containsFileDeletionOrDeletionVectors(
List<SimpleFileEntry> appendSimpleEntries,
List<IndexManifestEntry> appendIndexFiles) {
for (SimpleFileEntry appendSimpleEntry : appendSimpleEntries) {
if (appendSimpleEntry.kind().equals(FileKind.DELETE)) {
@@ -554,7 +558,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
committable.watermark(),
committable.logOffsets(),
committable.properties(),
- Snapshot.CommitKind.COMPACT,
+ CommitKind.COMPACT,
mustConflictCheck(),
null);
generatedSnapshot += 1;
@@ -664,7 +668,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
null,
Collections.emptyMap(),
Collections.emptyMap(),
- Snapshot.CommitKind.ANALYZE,
+ CommitKind.ANALYZE,
noConflictCheck(),
statsFileName);
}
@@ -807,7 +811,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
Map<String, String> properties,
- Snapshot.CommitKind commitKind,
+ CommitKind commitKind,
ConflictCheck conflictCheck,
@Nullable String statsFileName) {
int retryCount = 0;
@@ -910,7 +914,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
watermark,
logOffsets,
properties,
- Snapshot.CommitKind.OVERWRITE,
+ CommitKind.OVERWRITE,
mustConflictCheck(),
null);
}
@@ -925,7 +929,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
Map<String, String> properties,
- Snapshot.CommitKind commitKind,
+ CommitKind commitKind,
@Nullable Snapshot latestSnapshot,
ConflictCheck conflictCheck,
@Nullable String newStatsFileName) {
@@ -964,8 +968,8 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
if (strictModeLastSafeSnapshot != null && strictModeLastSafeSnapshot
>= 0) {
for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId;
id++) {
Snapshot snapshot = snapshotManager.snapshot(id);
- if ((snapshot.commitKind() == Snapshot.CommitKind.COMPACT
- || snapshot.commitKind() ==
Snapshot.CommitKind.OVERWRITE)
+ if ((snapshot.commitKind() == CommitKind.COMPACT
+ || snapshot.commitKind() ==
CommitKind.OVERWRITE)
&& !snapshot.commitUser().equals(commitUser)) {
throw new RuntimeException(
String.format(
@@ -1296,7 +1300,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
latestSnapshot.indexManifest(),
commitUser,
Long.MAX_VALUE,
- Snapshot.CommitKind.COMPACT,
+ CommitKind.COMPACT,
System.currentTimeMillis(),
latestSnapshot.logOffsets(),
latestSnapshot.totalRecordCount(),
@@ -1375,11 +1379,11 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
String baseCommitUser,
List<SimpleFileEntry> baseEntries,
List<SimpleFileEntry> changes,
- Snapshot.CommitKind commitKind) {
+ CommitKind commitKind) {
List<SimpleFileEntry> allEntries = new ArrayList<>(baseEntries);
allEntries.addAll(changes);
- if (commitKind != Snapshot.CommitKind.OVERWRITE) {
+ if (commitKind != CommitKind.OVERWRITE) {
// total buckets within the same partition should remain the same
Map<BinaryRow, Integer> totalBuckets = new HashMap<>();
for (SimpleFileEntry entry : allEntries) {
@@ -1438,6 +1442,8 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
assertNoDelete(mergedEntries, exceptionFunction);
+ // TODO check for deletion vectors
+
// fast exit for file store without keys
if (keyComparator == null) {
return;