This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.3
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 3b1f7593ccdd14979125040b5b74ef1125e59008
Author: Zouxxyy <[email protected]>
AuthorDate: Fri Sep 19 15:39:01 2025 +0800

    [core] Fix the commit kind when performing row-level changes on non-pk 
table (#6025)
---
 .../src/main/java/org/apache/paimon/Snapshot.java  |   7 +-
 .../apache/paimon/operation/FileStoreCommit.java   |   2 +-
 .../paimon/operation/FileStoreCommitImpl.java      | 181 ++++++++++++---------
 .../apache/paimon/table/sink/TableCommitImpl.java  |   3 +-
 .../test/java/org/apache/paimon/TestFileStore.java |   2 +-
 .../apache/paimon/operation/FileDeletionTest.java  |   4 +-
 .../apache/paimon/operation/TestCommitThread.java  |   2 +-
 .../paimon/spark/sql/MergeIntoTableTest.scala      |   2 +
 .../paimon/spark/sql/MergeIntoTableTest.scala      |   2 +
 .../paimon/spark/sql/MergeIntoTableTest.scala      |   2 +
 .../paimon/spark/sql/MergeIntoTableTest.scala      |   2 +
 .../paimon/spark/sql/MergeIntoTableTest.scala      |   2 +
 .../paimon/spark/sql/DeleteFromTableTestBase.scala |  38 ++++-
 .../paimon/spark/sql/MergeIntoTableTestBase.scala  |  91 ++++++++++-
 .../paimon/spark/sql/UpdateTableTestBase.scala     |  38 ++++-
 15 files changed, 287 insertions(+), 91 deletions(-)

diff --git a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java 
b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java
index 31defa3bfb..a318a72e40 100644
--- a/paimon-api/src/main/java/org/apache/paimon/Snapshot.java
+++ b/paimon-api/src/main/java/org/apache/paimon/Snapshot.java
@@ -496,13 +496,16 @@ public class Snapshot implements Serializable {
     /** Type of changes in this snapshot. */
     public enum CommitKind {
 
-        /** Changes flushed from the mem table. */
+        /** New data files are appended to the table and no data file is 
deleted. */
         APPEND,
 
         /** Changes by compacting existing data files. */
         COMPACT,
 
-        /** Changes that clear up the whole partition and then add new 
records. */
+        /**
+         * New data files are added to overwrite existing data files or just 
delete existing data
+         * files.
+         */
         OVERWRITE,
 
         /** Collect statistics. */
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index 4156ce0a83..6a00db6f0e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -50,7 +50,7 @@ public interface FileStoreCommit extends AutoCloseable {
      *     note that this partition does not necessarily equal to the 
partitions of the newly added
      *     key-values. This is just the partition to be cleaned up.
      */
-    int overwrite(
+    int overwritePartition(
             Map<String, String> partition,
             ManifestCommittable committable,
             Map<String, String> properties);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index a11339fe7d..300cc9c926 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -25,7 +25,6 @@ import org.apache.paimon.catalog.SnapshotCommit;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.manifest.FileEntry;
@@ -88,7 +87,6 @@ import java.util.stream.Collectors;
 
 import static java.util.Collections.emptyList;
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
-import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
 import static org.apache.paimon.manifest.ManifestEntry.recordCount;
 import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd;
 import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
@@ -296,24 +294,24 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
 
         List<ManifestEntry> appendTableFiles = new ArrayList<>();
         List<ManifestEntry> appendChangelog = new ArrayList<>();
+        List<IndexManifestEntry> appendIndexFiles = new ArrayList<>();
         List<ManifestEntry> compactTableFiles = new ArrayList<>();
         List<ManifestEntry> compactChangelog = new ArrayList<>();
-        List<IndexManifestEntry> appendHashIndexFiles = new ArrayList<>();
-        List<IndexManifestEntry> compactDvIndexFiles = new ArrayList<>();
+        List<IndexManifestEntry> compactIndexFiles = new ArrayList<>();
         collectChanges(
                 committable.fileCommittables(),
                 appendTableFiles,
                 appendChangelog,
+                appendIndexFiles,
                 compactTableFiles,
                 compactChangelog,
-                appendHashIndexFiles,
-                compactDvIndexFiles);
+                compactIndexFiles);
         try {
             List<SimpleFileEntry> appendSimpleEntries = 
SimpleFileEntry.from(appendTableFiles);
             if (!ignoreEmptyCommit
                     || !appendTableFiles.isEmpty()
                     || !appendChangelog.isEmpty()
-                    || !appendHashIndexFiles.isEmpty()) {
+                    || !appendIndexFiles.isEmpty()) {
                 // Optimization for common path.
                 // Step 1:
                 // Read manifest entries from changed partitions here and 
check for conflicts.
@@ -322,6 +320,10 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                 // This optimization is mainly used to decrease the number of 
times we read from
                 // files.
                 latestSnapshot = snapshotManager.latestSnapshot();
+                boolean hasDelete = hasDelete(appendSimpleEntries, 
appendIndexFiles);
+                Snapshot.CommitKind commitKind =
+                        hasDelete ? Snapshot.CommitKind.OVERWRITE : 
Snapshot.CommitKind.APPEND;
+
                 if (latestSnapshot != null && checkAppendFiles) {
                     // it is possible that some partitions only have compact 
changes,
                     // so we need to contain all changes
@@ -332,7 +334,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                             latestSnapshot.commitUser(),
                             baseEntries,
                             appendSimpleEntries,
-                            Snapshot.CommitKind.APPEND);
+                            commitKind);
                     safeLatestSnapshotId = latestSnapshot.id();
                 }
 
@@ -340,20 +342,20 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                         tryCommit(
                                 appendTableFiles,
                                 appendChangelog,
-                                appendHashIndexFiles,
+                                appendIndexFiles,
                                 committable.identifier(),
                                 committable.watermark(),
                                 committable.logOffsets(),
                                 committable.properties(),
-                                Snapshot.CommitKind.APPEND,
-                                noConflictCheck(),
+                                commitKind,
+                                hasDelete ? mustConflictCheck() : 
noConflictCheck(),
                                 null);
                 generatedSnapshot += 1;
             }
 
             if (!compactTableFiles.isEmpty()
                     || !compactChangelog.isEmpty()
-                    || !compactDvIndexFiles.isEmpty()) {
+                    || !compactIndexFiles.isEmpty()) {
                 // Optimization for common path.
                 // Step 2:
                 // Add appendChanges to the manifest entries read above and 
check for conflicts.
@@ -376,7 +378,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                         tryCommit(
                                 compactTableFiles,
                                 compactChangelog,
-                                compactDvIndexFiles,
+                                compactIndexFiles,
                                 committable.identifier(),
                                 committable.watermark(),
                                 committable.logOffsets(),
@@ -426,8 +428,23 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         commitMetrics.reportCommit(commitStats);
     }
 
+    private boolean hasDelete(
+            List<SimpleFileEntry> appendSimpleEntries, 
List<IndexManifestEntry> appendIndexFiles) {
+        for (SimpleFileEntry appendSimpleEntry : appendSimpleEntries) {
+            if (appendSimpleEntry.kind().equals(FileKind.DELETE)) {
+                return true;
+            }
+        }
+        for (IndexManifestEntry appendIndexFile : appendIndexFiles) {
+            if 
(appendIndexFile.indexFile().indexType().equals(DELETION_VECTORS_INDEX)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
     @Override
-    public int overwrite(
+    public int overwritePartition(
             Map<String, String> partition,
             ManifestCommittable committable,
             Map<String, String> properties) {
@@ -448,18 +465,18 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         int attempts = 0;
         List<ManifestEntry> appendTableFiles = new ArrayList<>();
         List<ManifestEntry> appendChangelog = new ArrayList<>();
+        List<IndexManifestEntry> appendIndexFiles = new ArrayList<>();
         List<ManifestEntry> compactTableFiles = new ArrayList<>();
         List<ManifestEntry> compactChangelog = new ArrayList<>();
-        List<IndexManifestEntry> appendHashIndexFiles = new ArrayList<>();
-        List<IndexManifestEntry> compactDvIndexFiles = new ArrayList<>();
+        List<IndexManifestEntry> compactIndexFiles = new ArrayList<>();
         collectChanges(
                 committable.fileCommittables(),
                 appendTableFiles,
                 appendChangelog,
+                appendIndexFiles,
                 compactTableFiles,
                 compactChangelog,
-                appendHashIndexFiles,
-                compactDvIndexFiles);
+                compactIndexFiles);
 
         if (!appendChangelog.isEmpty() || !compactChangelog.isEmpty()) {
             StringBuilder warnMessage =
@@ -493,7 +510,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                     partitionFilter = 
PartitionPredicate.fromMultiple(partitionType, partitions);
                 }
             } else {
-                // partition may be partial partition fields, so here must to 
use predicate way.
+                // partition may be partial partition fields, so here must use 
predicate way.
                 Predicate partitionPredicate =
                         createPartitionPredicate(partition, partitionType, 
partitionDefaultName);
                 partitionFilter =
@@ -516,10 +533,10 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             // overwrite new files
             if (!skipOverwrite) {
                 attempts +=
-                        tryOverwrite(
+                        tryOverwritePartition(
                                 partitionFilter,
                                 appendTableFiles,
-                                appendHashIndexFiles,
+                                appendIndexFiles,
                                 committable.identifier(),
                                 committable.watermark(),
                                 committable.logOffsets(),
@@ -527,12 +544,12 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                 generatedSnapshot += 1;
             }
 
-            if (!compactTableFiles.isEmpty() || 
!compactDvIndexFiles.isEmpty()) {
+            if (!compactTableFiles.isEmpty() || !compactIndexFiles.isEmpty()) {
                 attempts +=
                         tryCommit(
                                 compactTableFiles,
                                 emptyList(),
-                                compactDvIndexFiles,
+                                compactIndexFiles,
                                 committable.identifier(),
                                 committable.watermark(),
                                 committable.logOffsets(),
@@ -590,7 +607,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             partitionFilter = PartitionPredicate.fromPredicate(partitionType, 
predicate);
         }
 
-        tryOverwrite(
+        tryOverwritePartition(
                 partitionFilter,
                 emptyList(),
                 emptyList(),
@@ -602,7 +619,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
 
     @Override
     public void truncateTable(long commitIdentifier) {
-        tryOverwrite(
+        tryOverwritePartition(
                 null,
                 emptyList(),
                 emptyList(),
@@ -666,10 +683,10 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             List<CommitMessage> commitMessages,
             List<ManifestEntry> appendTableFiles,
             List<ManifestEntry> appendChangelog,
+            List<IndexManifestEntry> appendIndexFiles,
             List<ManifestEntry> compactTableFiles,
             List<ManifestEntry> compactChangelog,
-            List<IndexManifestEntry> appendHashIndexFiles,
-            List<IndexManifestEntry> compactDvIndexFiles) {
+            List<IndexManifestEntry> compactIndexFiles) {
         for (CommitMessage message : commitMessages) {
             CommitMessageImpl commitMessage = (CommitMessageImpl) message;
             commitMessage
@@ -687,6 +704,29 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                     .newFilesIncrement()
                     .changelogFiles()
                     .forEach(m -> appendChangelog.add(makeEntry(FileKind.ADD, 
commitMessage, m)));
+            commitMessage
+                    .newFilesIncrement()
+                    .newIndexFiles()
+                    .forEach(
+                            m ->
+                                    appendIndexFiles.add(
+                                            new IndexManifestEntry(
+                                                    FileKind.ADD,
+                                                    commitMessage.partition(),
+                                                    commitMessage.bucket(),
+                                                    m)));
+            commitMessage
+                    .newFilesIncrement()
+                    .deletedIndexFiles()
+                    .forEach(
+                            m ->
+                                    appendIndexFiles.add(
+                                            new IndexManifestEntry(
+                                                    FileKind.DELETE,
+                                                    commitMessage.partition(),
+                                                    commitMessage.bucket(),
+                                                    m)));
+
             commitMessage
                     .compactIncrement()
                     .compactBefore()
@@ -702,53 +742,28 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                     .compactIncrement()
                     .changelogFiles()
                     .forEach(m -> compactChangelog.add(makeEntry(FileKind.ADD, 
commitMessage, m)));
-
-            // todo: split them
-            List<IndexFileMeta> newIndexFiles =
-                    new 
ArrayList<>(commitMessage.newFilesIncrement().newIndexFiles());
-            
newIndexFiles.addAll(commitMessage.compactIncrement().newIndexFiles());
-            newIndexFiles.forEach(
-                    f -> {
-                        switch (f.indexType()) {
-                            case HASH_INDEX:
-                                appendHashIndexFiles.add(
-                                        new IndexManifestEntry(
-                                                FileKind.ADD,
-                                                commitMessage.partition(),
-                                                commitMessage.bucket(),
-                                                f));
-                                break;
-                            case DELETION_VECTORS_INDEX:
-                                compactDvIndexFiles.add(
-                                        new IndexManifestEntry(
-                                                FileKind.ADD,
-                                                commitMessage.partition(),
-                                                commitMessage.bucket(),
-                                                f));
-                                break;
-                            default:
-                                throw new RuntimeException("Unknown index 
type: " + f.indexType());
-                        }
-                    });
-
-            // todo: split them
-            List<IndexFileMeta> deletedIndexFiles =
-                    new 
ArrayList<>(commitMessage.newFilesIncrement().deletedIndexFiles());
-            
deletedIndexFiles.addAll(commitMessage.compactIncrement().deletedIndexFiles());
-            deletedIndexFiles.forEach(
-                    f -> {
-                        if (f.indexType().equals(DELETION_VECTORS_INDEX)) {
-                            compactDvIndexFiles.add(
-                                    new IndexManifestEntry(
-                                            FileKind.DELETE,
-                                            commitMessage.partition(),
-                                            commitMessage.bucket(),
-                                            f));
-                        } else {
-                            throw new RuntimeException(
-                                    "This index type is not supported to 
delete: " + f.indexType());
-                        }
-                    });
+            commitMessage
+                    .compactIncrement()
+                    .newIndexFiles()
+                    .forEach(
+                            m ->
+                                    compactIndexFiles.add(
+                                            new IndexManifestEntry(
+                                                    FileKind.ADD,
+                                                    commitMessage.partition(),
+                                                    commitMessage.bucket(),
+                                                    m)));
+            commitMessage
+                    .compactIncrement()
+                    .deletedIndexFiles()
+                    .forEach(
+                            m ->
+                                    compactIndexFiles.add(
+                                            new IndexManifestEntry(
+                                                    FileKind.DELETE,
+                                                    commitMessage.partition(),
+                                                    commitMessage.bucket(),
+                                                    m)));
         }
         if (!commitMessages.isEmpty()) {
             List<String> msg = new ArrayList<>();
@@ -758,17 +773,17 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             if (!appendChangelog.isEmpty()) {
                 msg.add(appendChangelog.size() + " append Changelogs");
             }
+            if (!appendIndexFiles.isEmpty()) {
+                msg.add(appendIndexFiles.size() + " append index files");
+            }
             if (!compactTableFiles.isEmpty()) {
                 msg.add(compactTableFiles.size() + " compact table files");
             }
             if (!compactChangelog.isEmpty()) {
                 msg.add(compactChangelog.size() + " compact Changelogs");
             }
-            if (!appendHashIndexFiles.isEmpty()) {
-                msg.add(appendHashIndexFiles.size() + " append hash index 
files");
-            }
-            if (!compactDvIndexFiles.isEmpty()) {
-                msg.add(compactDvIndexFiles.size() + " compact dv index 
files");
+            if (!compactIndexFiles.isEmpty()) {
+                msg.add(compactIndexFiles.size() + " compact index files");
             }
             LOG.info("Finished collecting changes, including: {}", 
String.join(", ", msg));
         }
@@ -836,7 +851,13 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         return retryCount + 1;
     }
 
-    private int tryOverwrite(
+    /**
+     * Try to overwrite partition.
+     *
+     * @param partitionFilter Partition filter indicating which partitions to 
overwrite, if {@code
+     *     null}, overwrites the entire table.
+     */
+    private int tryOverwritePartition(
             @Nullable PartitionPredicate partitionFilter,
             List<ManifestEntry> changes,
             List<IndexManifestEntry> indexFiles,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index e549bd2e2c..7f19c7443c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -248,7 +248,8 @@ public class TableCommitImpl implements InnerTableCommit {
                 committable = new ManifestCommittable(Long.MAX_VALUE);
             }
             int newSnapshots =
-                    commit.overwrite(overwritePartition, committable, 
Collections.emptyMap());
+                    commit.overwritePartition(
+                            overwritePartition, committable, 
Collections.emptyMap());
             maintain(
                     committable.identifier(),
                     maintainExecutor,
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index dc1702c264..aa405060f4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -263,7 +263,7 @@ public class TestFileStore extends KeyValueFileStore {
                 null,
                 Collections.emptyList(),
                 (commit, committable) ->
-                        commit.overwrite(partition, committable, 
Collections.emptyMap()));
+                        commit.overwritePartition(partition, committable, 
Collections.emptyMap()));
     }
 
     public Snapshot dropPartitions(List<Map<String, String>> partitions) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 8e794fe749..0bc6e041ab 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -154,13 +154,13 @@ public class FileDeletionTest {
         FileStoreCommitImpl commit = store.newCommit();
         Map<String, String> partitionSpec = new HashMap<>();
         partitionSpec.put("dt", "0401");
-        commit.overwrite(
+        commit.overwritePartition(
                 partitionSpec, new ManifestCommittable(commitIdentifier++), 
Collections.emptyMap());
 
         // step 3: generate snapshot 3 by cleaning partition dt=0402/hr=10
         partitionSpec.put("dt", "0402");
         partitionSpec.put("hr", "8");
-        commit.overwrite(
+        commit.overwritePartition(
                 partitionSpec, new ManifestCommittable(commitIdentifier++), 
Collections.emptyMap());
         commit.close();
 
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
index 2f409a9176..cdd2d8fba1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/TestCommitThread.java
@@ -169,7 +169,7 @@ public class TestCommitThread extends Thread {
         runWithRetry(
                 committable,
                 () ->
-                        commit.overwrite(
+                        commit.overwritePartition(
                                 
TestKeyValueGenerator.toPartitionMap(partition, MULTI_PARTITIONED),
                                 committable,
                                 Collections.emptyMap()));
diff --git 
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
 
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
index f1f0d8c065..5ab343c41c 100644
--- 
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
+++ 
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
@@ -32,8 +32,10 @@ class MergeIntoPrimaryKeyNonBucketTableTest
 
 class MergeIntoAppendBucketedTableTest
   extends MergeIntoTableTestBase
+  with MergeIntoAppendTableTest
   with PaimonAppendBucketedTableTest {}
 
 class MergeIntoAppendNonBucketedTableTest
   extends MergeIntoTableTestBase
+  with MergeIntoAppendTableTest
   with PaimonAppendNonBucketTableTest {}
diff --git 
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
index f1f0d8c065..5ab343c41c 100644
--- 
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
+++ 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
@@ -32,8 +32,10 @@ class MergeIntoPrimaryKeyNonBucketTableTest
 
 class MergeIntoAppendBucketedTableTest
   extends MergeIntoTableTestBase
+  with MergeIntoAppendTableTest
   with PaimonAppendBucketedTableTest {}
 
 class MergeIntoAppendNonBucketedTableTest
   extends MergeIntoTableTestBase
+  with MergeIntoAppendTableTest
   with PaimonAppendNonBucketTableTest {}
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
index e1cfe3a396..b9a85b147e 100644
--- 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
@@ -34,10 +34,12 @@ class MergeIntoPrimaryKeyNonBucketTableTest
 
 class MergeIntoAppendBucketedTableTest
   extends MergeIntoTableTestBase
+  with MergeIntoAppendTableTest
   with MergeIntoNotMatchedBySourceTest
   with PaimonAppendBucketedTableTest {}
 
 class MergeIntoAppendNonBucketedTableTest
   extends MergeIntoTableTestBase
+  with MergeIntoAppendTableTest
   with MergeIntoNotMatchedBySourceTest
   with PaimonAppendNonBucketTableTest {}
diff --git 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
index e1cfe3a396..b9a85b147e 100644
--- 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
+++ 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
@@ -34,10 +34,12 @@ class MergeIntoPrimaryKeyNonBucketTableTest
 
 class MergeIntoAppendBucketedTableTest
   extends MergeIntoTableTestBase
+  with MergeIntoAppendTableTest
   with MergeIntoNotMatchedBySourceTest
   with PaimonAppendBucketedTableTest {}
 
 class MergeIntoAppendNonBucketedTableTest
   extends MergeIntoTableTestBase
+  with MergeIntoAppendTableTest
   with MergeIntoNotMatchedBySourceTest
   with PaimonAppendNonBucketTableTest {}
diff --git 
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
index e1cfe3a396..b9a85b147e 100644
--- 
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
+++ 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTest.scala
@@ -34,10 +34,12 @@ class MergeIntoPrimaryKeyNonBucketTableTest
 
 class MergeIntoAppendBucketedTableTest
   extends MergeIntoTableTestBase
+  with MergeIntoAppendTableTest
   with MergeIntoNotMatchedBySourceTest
   with PaimonAppendBucketedTableTest {}
 
 class MergeIntoAppendNonBucketedTableTest
   extends MergeIntoTableTestBase
+  with MergeIntoAppendTableTest
   with MergeIntoNotMatchedBySourceTest
   with PaimonAppendNonBucketTableTest {}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
index 6b38c1f5f0..ea05d94dd3 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeleteFromTableTestBase.scala
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.spark.sql
 
-import org.apache.paimon.CoreOptions
+import org.apache.paimon.{CoreOptions, Snapshot}
 import org.apache.paimon.CoreOptions.MergeEngine
 import org.apache.paimon.spark.PaimonSparkTestBase
 
@@ -435,4 +435,40 @@ abstract class DeleteFromTableTestBase extends 
PaimonSparkTestBase {
         }
       }
   }
+
+  test("Paimon delete: non pk table commit kind") {
+    for (dvEnabled <- Seq(true, false)) {
+      withTable("t") {
+        sql(
+          s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES 
('deletion-vectors.enabled' = '$dvEnabled')")
+        sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM 
range(1, 4)")
+
+        sql("DELETE FROM t WHERE id = 1")
+        checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(2, 2), Row(3, 
3)))
+        val table = loadTable("t")
+        var latestSnapshot = table.latestSnapshot().get()
+        assert(latestSnapshot.id == 2)
+        assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.OVERWRITE))
+
+        sql("DELETE FROM t WHERE id = 2")
+        checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(3, 3)))
+        latestSnapshot = table.latestSnapshot().get()
+        assert(latestSnapshot.id == 3)
+        assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.OVERWRITE))
+      }
+    }
+  }
+
+  test("Paimon delete: pk dv table commit kind") {
+    withTable("t") {
+      sql(
+        s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES 
('deletion-vectors.enabled' = 'true', 'primary-key' = 'id')")
+      sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM 
range(1, 4)")
+      sql("DELETE FROM t WHERE id = 1")
+      val table = loadTable("t")
+      val latestSnapshot = table.latestSnapshot().get()
+      assert(latestSnapshot.id == 4)
+      assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.COMPACT))
+    }
+  }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
index 4564e1646c..c867aed9f0 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
@@ -18,10 +18,15 @@
 
 package org.apache.paimon.spark.sql
 
-import org.apache.paimon.spark.{PaimonPrimaryKeyTable, PaimonSparkTestBase, 
PaimonTableTest}
+import org.apache.paimon.Snapshot
+import org.apache.paimon.spark.{PaimonAppendTable, PaimonPrimaryKeyTable, 
PaimonSparkTestBase, PaimonTableTest}
 
 import org.apache.spark.sql.Row
 
+import scala.concurrent.{Await, Future}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.DurationInt
+
 abstract class MergeIntoTableTestBase extends PaimonSparkTestBase with 
PaimonTableTest {
 
   import testImplicits._
@@ -714,3 +719,87 @@ trait MergeIntoPrimaryKeyTableTest extends 
PaimonSparkTestBase with PaimonPrimar
     }
   }
 }
+
+trait MergeIntoAppendTableTest extends PaimonSparkTestBase with 
PaimonAppendTable {
+
+  test("Paimon MergeInto: non pk table commit kind") {
+    withTable("s", "t") {
+      createTable("s", "id INT, b INT, c INT", Seq("id"))
+      sql("INSERT INTO s VALUES (1, 1, 1)")
+
+      createTable("t", "id INT, b INT, c INT", Seq("id"))
+      sql("INSERT INTO t VALUES (2, 2, 2)")
+
+      sql("""
+            |MERGE INTO t
+            |USING s
+            |ON t.id = s.id
+            |WHEN NOT MATCHED THEN
+            |INSERT (id, b, c) VALUES (s.id, s.b, s.c);
+            |""".stripMargin)
+
+      val table = loadTable("t")
+      var latestSnapshot = table.latestSnapshot().get()
+      assert(latestSnapshot.id == 2)
+      // no old data is deleted, so the commit kind is APPEND
+      assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.APPEND))
+
+      sql("INSERT INTO s VALUES (2, 22, 22)")
+      sql("""
+            |MERGE INTO t
+            |USING s
+            |ON t.id = s.id
+            |WHEN MATCHED THEN
+            |UPDATE SET id = s.id, b = s.b, c = s.c
+            |""".stripMargin)
+      latestSnapshot = table.latestSnapshot().get()
+      assert(latestSnapshot.id == 3)
+      // new data is updated, so the commit kind is OVERWRITE
+      assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.OVERWRITE))
+    }
+  }
+
+  test("Paimon MergeInto: concurrent merge and compact") {
+    withTable("s", "t") {
+      sql("CREATE TABLE s (id INT, b INT, c INT)")
+      sql("INSERT INTO s VALUES (1, 1, 1)")
+
+      sql("CREATE TABLE t (id INT, b INT, c INT)")
+      sql("INSERT INTO t VALUES (1, 1, 1)")
+
+      val mergeInto = Future {
+        for (_ <- 1 to 10) {
+          try {
+            sql("""
+                  |MERGE INTO t
+                  |USING s
+                  |ON t.id = s.id
+                  |WHEN MATCHED THEN
+                  |UPDATE SET t.id = s.id, t.b = s.b + t.b, t.c = s.c + t.c
+                  |""".stripMargin)
+          } catch {
+            case a: Throwable =>
+              assert(
+                a.getMessage.contains("Conflicts during commits") || 
a.getMessage.contains(
+                  "Missing file"))
+          }
+          checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1)))
+        }
+      }
+
+      val compact = Future {
+        for (_ <- 1 to 10) {
+          try {
+            sql("CALL sys.compact(table => 't', order_strategy => 'order', 
order_by => 'id')")
+          } catch {
+            case a: Throwable => assert(a.getMessage.contains("Conflicts 
during commits"))
+          }
+          checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1)))
+        }
+      }
+
+      Await.result(mergeInto, 60.seconds)
+      Await.result(compact, 60.seconds)
+    }
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala
index 5cbefe9faa..097d2c4e14 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.spark.sql
 
-import org.apache.paimon.CoreOptions
+import org.apache.paimon.{CoreOptions, Snapshot}
 import org.apache.paimon.spark.PaimonSparkTestBase
 import org.apache.paimon.spark.catalyst.analysis.Update
 
@@ -357,4 +357,40 @@ abstract class UpdateTableTestBase extends 
PaimonSparkTestBase {
     sql("UPDATE T SET c = 'b' WHERE id = 1")
     checkAnswer(sql("SELECT * FROM T"), Seq(Row(1, "s", "b")))
   }
+
+  test("Paimon update: non pk table commit kind") {
+    for (dvEnabled <- Seq(true, false)) {
+      withTable("t") {
+        sql(
+          s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES 
('deletion-vectors.enabled' = '$dvEnabled')")
+        sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM 
range(1, 4)")
+
+        sql("UPDATE t SET data = 111 WHERE id = 1")
+        checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(1, 111), 
Row(2, 2), Row(3, 3)))
+        val table = loadTable("t")
+        var latestSnapshot = table.latestSnapshot().get()
+        assert(latestSnapshot.id == 2)
+        assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.OVERWRITE))
+
+        sql("UPDATE t SET data = 222 WHERE id = 2")
+        checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(1, 111), 
Row(2, 222), Row(3, 3)))
+        latestSnapshot = table.latestSnapshot().get()
+        assert(latestSnapshot.id == 3)
+        assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.OVERWRITE))
+      }
+    }
+  }
+
+  test("Paimon update: pk dv table commit kind") {
+    withTable("t") {
+      sql(
+        s"CREATE TABLE t (id INT, data INT) TBLPROPERTIES 
('deletion-vectors.enabled' = 'true', 'primary-key' = 'id')")
+      sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ id, id AS data FROM 
range(1, 4)")
+      sql("UPDATE t SET data = 111 WHERE id = 1")
+      val table = loadTable("t")
+      val latestSnapshot = table.latestSnapshot().get()
+      assert(latestSnapshot.id == 4)
+      assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.COMPACT))
+    }
+  }
 }


Reply via email to