This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new cf10c78410 [core] If overwrite an empty partition, change commit kind 
to APPEND (#6795)
cf10c78410 is described below

commit cf10c784103d82506400af794ffb75790c42d3e1
Author: yuzelin <[email protected]>
AuthorDate: Fri Dec 12 16:37:51 2025 +0800

    [core] If overwrite an empty partition, change commit kind to APPEND (#6795)
---
 .../paimon/operation/FileStoreCommitImpl.java      | 37 ++++++++++----
 .../apache/paimon/table/sink/TableCommitTest.java  | 57 ++++++++++++++++++++++
 2 files changed, 84 insertions(+), 10 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index fc89ff1ecd..dd4b1ba27a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -383,7 +383,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                 committable.watermark(),
                                 committable.logOffsets(),
                                 committable.properties(),
-                                commitKind,
+                                provider(commitKind),
                                 conflictCheck,
                                 null);
                 generatedSnapshot += 1;
@@ -418,7 +418,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                 committable.watermark(),
                                 committable.logOffsets(),
                                 committable.properties(),
-                                CommitKind.COMPACT,
+                                provider(CommitKind.COMPACT),
                                 hasConflictChecked(safeLatestSnapshotId),
                                 null);
                 generatedSnapshot += 1;
@@ -463,10 +463,10 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         commitMetrics.reportCommit(commitStats);
     }
 
-    private boolean containsFileDeletionOrDeletionVectors(
-            List<SimpleFileEntry> appendSimpleEntries, 
List<IndexManifestEntry> appendIndexFiles) {
-        for (SimpleFileEntry appendSimpleEntry : appendSimpleEntries) {
-            if (appendSimpleEntry.kind().equals(FileKind.DELETE)) {
+    private <T extends FileEntry> boolean 
containsFileDeletionOrDeletionVectors(
+            List<T> appendFileEntries, List<IndexManifestEntry> 
appendIndexFiles) {
+        for (T appendFileEntry : appendFileEntries) {
+            if (appendFileEntry.kind().equals(FileKind.DELETE)) {
                 return true;
             }
         }
@@ -587,7 +587,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                 committable.watermark(),
                                 committable.logOffsets(),
                                 committable.properties(),
-                                CommitKind.COMPACT,
+                                provider(CommitKind.COMPACT),
                                 mustConflictCheck(),
                                 null);
                 generatedSnapshot += 1;
@@ -695,7 +695,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                 null,
                 Collections.emptyMap(),
                 Collections.emptyMap(),
-                CommitKind.ANALYZE,
+                provider(CommitKind.ANALYZE),
                 noConflictCheck(),
                 statsFileName);
     }
@@ -836,7 +836,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             @Nullable Long watermark,
             Map<Integer, Long> logOffsets,
             Map<String, String> properties,
-            CommitKind commitKind,
+            CommitKindProvider commitKindProvider,
             ConflictCheck conflictCheck,
             @Nullable String statsFileName) {
         int retryCount = 0;
@@ -845,6 +845,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         while (true) {
             Snapshot latestSnapshot = snapshotManager.latestSnapshot();
             CommitChanges changes = changesProvider.provide(latestSnapshot);
+            CommitKind commitKind = commitKindProvider.provide(changes);
             CommitResult result =
                     tryCommitOnce(
                             retryResult,
@@ -895,6 +896,12 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             @Nullable Long watermark,
             Map<Integer, Long> logOffsets,
             Map<String, String> properties) {
+        CommitKindProvider commitKindProvider =
+                commitChanges ->
+                        containsFileDeletionOrDeletionVectors(
+                                        commitChanges.tableFiles, 
commitChanges.indexFiles)
+                                ? CommitKind.OVERWRITE
+                                : CommitKind.APPEND;
         return tryCommit(
                 latestSnapshot ->
                         overwriteChanges(changes, indexFiles, latestSnapshot, 
partitionFilter),
@@ -902,7 +909,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                 watermark,
                 logOffsets,
                 properties,
-                CommitKind.OVERWRITE,
+                commitKindProvider,
                 mustConflictCheck(),
                 null);
     }
@@ -1559,6 +1566,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         return s -> new CommitChanges(tableFiles, changelogFiles, indexFiles);
     }
 
+    @FunctionalInterface
     private interface ChangesProvider {
         CommitChanges provide(@Nullable Snapshot latestSnapshot);
     }
@@ -1578,4 +1586,13 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             this.indexFiles = indexFiles;
         }
     }
+
+    @FunctionalInterface
+    private interface CommitKindProvider {
+        CommitKind provide(CommitChanges changes);
+    }
+
+    private static CommitKindProvider provider(CommitKind kind) {
+        return changes -> kind;
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index bc0756fcd1..ff2151e28c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -812,4 +812,61 @@ public class TableCommitTest {
                                     + " snapshot that need to be resubmitted 
have been deleted");
         }
     }
+
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testOverwriteEmptyTable(boolean partitioned) throws Exception {
+        String path = tempDir.toString();
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.BIGINT(), 
DataTypes.INT()},
+                        new String[] {"k", "v", "pt"});
+
+        Options options = new Options();
+        options.set(CoreOptions.PATH, path);
+        options.set(CoreOptions.BUCKET, 1);
+        options.set(CoreOptions.BUCKET_KEY, "k");
+        options.set(CoreOptions.WRITE_ONLY, true);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), new 
Path(path)),
+                        new Schema(
+                                rowType.getFields(),
+                                partitioned
+                                        ? Collections.singletonList("pt")
+                                        : Collections.emptyList(),
+                                Collections.emptyList(),
+                                options.toMap(),
+                                ""));
+        FileStoreTable table =
+                FileStoreTableFactory.create(
+                        LocalFileIO.create(),
+                        new Path(path),
+                        tableSchema,
+                        CatalogEnvironment.empty());
+        SnapshotManager snapshotManager = table.snapshotManager();
+        String user = UUID.randomUUID().toString();
+        TableWriteImpl<?> write = table.newWrite(user);
+        TableCommitImpl commit = table.newCommit(user);
+
+        write.write(GenericRow.of(0, 0L, 1));
+        commit.withOverwrite(partitioned ? singletonMap("pt", "1") : 
Collections.emptyMap());
+        commit.commit(1, write.prepareCommit(false, 1));
+        assertThat(snapshotManager.latestSnapshot().commitKind())
+                .isEqualTo(Snapshot.CommitKind.APPEND);
+
+        write.write(GenericRow.of(1, 1L, 1));
+        commit.withOverwrite(partitioned ? singletonMap("pt", "1") : 
Collections.emptyMap());
+        commit.commit(2, write.prepareCommit(false, 2));
+        assertThat(snapshotManager.latestSnapshot().commitKind())
+                .isEqualTo(Snapshot.CommitKind.OVERWRITE);
+
+        if (partitioned) {
+            write.write(GenericRow.of(3, 3L, 2));
+            commit.withOverwrite(singletonMap("pt", "2"));
+            commit.commit(3, write.prepareCommit(false, 3));
+            assertThat(snapshotManager.latestSnapshot().commitKind())
+                    .isEqualTo(Snapshot.CommitKind.APPEND);
+        }
+    }
 }

Reply via email to