This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new cf10c78410 [core] If overwrite an empty partition, change commit kind
to APPEND (#6795)
cf10c78410 is described below
commit cf10c784103d82506400af794ffb75790c42d3e1
Author: yuzelin <[email protected]>
AuthorDate: Fri Dec 12 16:37:51 2025 +0800
[core] If overwrite an empty partition, change commit kind to APPEND (#6795)
---
.../paimon/operation/FileStoreCommitImpl.java | 37 ++++++++++----
.../apache/paimon/table/sink/TableCommitTest.java | 57 ++++++++++++++++++++++
2 files changed, 84 insertions(+), 10 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index fc89ff1ecd..dd4b1ba27a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -383,7 +383,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
committable.watermark(),
committable.logOffsets(),
committable.properties(),
- commitKind,
+ provider(commitKind),
conflictCheck,
null);
generatedSnapshot += 1;
@@ -418,7 +418,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
committable.watermark(),
committable.logOffsets(),
committable.properties(),
- CommitKind.COMPACT,
+ provider(CommitKind.COMPACT),
hasConflictChecked(safeLatestSnapshotId),
null);
generatedSnapshot += 1;
@@ -463,10 +463,10 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
commitMetrics.reportCommit(commitStats);
}
- private boolean containsFileDeletionOrDeletionVectors(
- List<SimpleFileEntry> appendSimpleEntries,
List<IndexManifestEntry> appendIndexFiles) {
- for (SimpleFileEntry appendSimpleEntry : appendSimpleEntries) {
- if (appendSimpleEntry.kind().equals(FileKind.DELETE)) {
+ private <T extends FileEntry> boolean
containsFileDeletionOrDeletionVectors(
+ List<T> appendFileEntries, List<IndexManifestEntry>
appendIndexFiles) {
+ for (T appendFileEntry : appendFileEntries) {
+ if (appendFileEntry.kind().equals(FileKind.DELETE)) {
return true;
}
}
@@ -587,7 +587,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
committable.watermark(),
committable.logOffsets(),
committable.properties(),
- CommitKind.COMPACT,
+ provider(CommitKind.COMPACT),
mustConflictCheck(),
null);
generatedSnapshot += 1;
@@ -695,7 +695,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
null,
Collections.emptyMap(),
Collections.emptyMap(),
- CommitKind.ANALYZE,
+ provider(CommitKind.ANALYZE),
noConflictCheck(),
statsFileName);
}
@@ -836,7 +836,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
Map<String, String> properties,
- CommitKind commitKind,
+ CommitKindProvider commitKindProvider,
ConflictCheck conflictCheck,
@Nullable String statsFileName) {
int retryCount = 0;
@@ -845,6 +845,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
CommitChanges changes = changesProvider.provide(latestSnapshot);
+ CommitKind commitKind = commitKindProvider.provide(changes);
CommitResult result =
tryCommitOnce(
retryResult,
@@ -895,6 +896,12 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
Map<String, String> properties) {
+ CommitKindProvider commitKindProvider =
+ commitChanges ->
+ containsFileDeletionOrDeletionVectors(
+ commitChanges.tableFiles,
commitChanges.indexFiles)
+ ? CommitKind.OVERWRITE
+ : CommitKind.APPEND;
return tryCommit(
latestSnapshot ->
overwriteChanges(changes, indexFiles, latestSnapshot,
partitionFilter),
@@ -902,7 +909,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
watermark,
logOffsets,
properties,
- CommitKind.OVERWRITE,
+ commitKindProvider,
mustConflictCheck(),
null);
}
@@ -1559,6 +1566,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
return s -> new CommitChanges(tableFiles, changelogFiles, indexFiles);
}
+ @FunctionalInterface
private interface ChangesProvider {
CommitChanges provide(@Nullable Snapshot latestSnapshot);
}
@@ -1578,4 +1586,13 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
this.indexFiles = indexFiles;
}
}
+
+ @FunctionalInterface
+ private interface CommitKindProvider {
+ CommitKind provide(CommitChanges changes);
+ }
+
+ private static CommitKindProvider provider(CommitKind kind) {
+ return changes -> kind;
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index bc0756fcd1..ff2151e28c 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -812,4 +812,61 @@ public class TableCommitTest {
+ " snapshot that need to be resubmitted
have been deleted");
}
}
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testOverwriteEmptyTable(boolean partitioned) throws Exception {
+ String path = tempDir.toString();
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.BIGINT(),
DataTypes.INT()},
+ new String[] {"k", "v", "pt"});
+
+ Options options = new Options();
+ options.set(CoreOptions.PATH, path);
+ options.set(CoreOptions.BUCKET, 1);
+ options.set(CoreOptions.BUCKET_KEY, "k");
+ options.set(CoreOptions.WRITE_ONLY, true);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), new
Path(path)),
+ new Schema(
+ rowType.getFields(),
+ partitioned
+ ? Collections.singletonList("pt")
+ : Collections.emptyList(),
+ Collections.emptyList(),
+ options.toMap(),
+ ""));
+ FileStoreTable table =
+ FileStoreTableFactory.create(
+ LocalFileIO.create(),
+ new Path(path),
+ tableSchema,
+ CatalogEnvironment.empty());
+ SnapshotManager snapshotManager = table.snapshotManager();
+ String user = UUID.randomUUID().toString();
+ TableWriteImpl<?> write = table.newWrite(user);
+ TableCommitImpl commit = table.newCommit(user);
+
+ write.write(GenericRow.of(0, 0L, 1));
+ commit.withOverwrite(partitioned ? singletonMap("pt", "1") :
Collections.emptyMap());
+ commit.commit(1, write.prepareCommit(false, 1));
+ assertThat(snapshotManager.latestSnapshot().commitKind())
+ .isEqualTo(Snapshot.CommitKind.APPEND);
+
+ write.write(GenericRow.of(1, 1L, 1));
+ commit.withOverwrite(partitioned ? singletonMap("pt", "1") :
Collections.emptyMap());
+ commit.commit(2, write.prepareCommit(false, 2));
+ assertThat(snapshotManager.latestSnapshot().commitKind())
+ .isEqualTo(Snapshot.CommitKind.OVERWRITE);
+
+ if (partitioned) {
+ write.write(GenericRow.of(3, 3L, 2));
+ commit.withOverwrite(singletonMap("pt", "2"));
+ commit.commit(3, write.prepareCommit(false, 3));
+ assertThat(snapshotManager.latestSnapshot().commitKind())
+ .isEqualTo(Snapshot.CommitKind.APPEND);
+ }
+ }
}