This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new bfdd516536 [core] Minor refactor conflicts check for deletion vectors 
files (#6369)
bfdd516536 is described below

commit bfdd516536f9201781490f49eae130156c8dd0ed
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Oct 9 19:06:07 2025 +0800

    [core] Minor refactor conflicts check for deletion vectors files (#6369)
---
 .../java/org/apache/paimon/AbstractFileStore.java  |   1 -
 .../paimon/manifest/IndexManifestFileHandler.java  |  42 ++++++-
 .../paimon/operation/FileStoreCommitImpl.java      | 136 +++++++++++----------
 .../paimon/operation/FileStoreCommitTest.java      |   8 +-
 .../paimon/spark/sql/MergeIntoTableTestBase.scala  |  50 ++++++++
 5 files changed, 163 insertions(+), 74 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index bbeb0aa150..31918b84cf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -299,7 +299,6 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 options.commitMaxRetryWait(),
                 options.commitStrictModeLastSafeSnapshot().orElse(null),
                 options.rowTrackingEnabled(),
-                !schema.primaryKeys().isEmpty(),
                 options.deletionVectorsEnabled(),
                 newIndexFileHandler());
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
index bf334e4ad3..3ea24872bb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.manifest;
 
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.DeletionVectorMeta;
 import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.utils.Pair;
@@ -27,14 +28,18 @@ import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkState;
 
 /** IndexManifestFile Handler. */
 public class IndexManifestFileHandler {
@@ -115,15 +120,48 @@ public class IndexManifestFileHandler {
         public List<IndexManifestEntry> combine(
                 List<IndexManifestEntry> prevIndexFiles, 
List<IndexManifestEntry> newIndexFiles) {
             Map<String, IndexManifestEntry> indexEntries = new HashMap<>();
+            Set<String> dvDataFiles = new HashSet<>();
             for (IndexManifestEntry entry : prevIndexFiles) {
                 indexEntries.put(entry.indexFile().fileName(), entry);
+                LinkedHashMap<String, DeletionVectorMeta> dvRanges = 
entry.indexFile().dvRanges();
+                if (dvRanges != null) {
+                    dvDataFiles.addAll(dvRanges.keySet());
+                }
             }
 
             for (IndexManifestEntry entry : newIndexFiles) {
+                String fileName = entry.indexFile().fileName();
+                LinkedHashMap<String, DeletionVectorMeta> dvRanges = 
entry.indexFile().dvRanges();
                 if (entry.kind() == FileKind.ADD) {
-                    indexEntries.put(entry.indexFile().fileName(), entry);
+                    checkState(
+                            !indexEntries.containsKey(fileName),
+                            "Trying to add file %s which is already added.",
+                            fileName);
+                    if (dvRanges != null) {
+                        for (String dataFile : dvRanges.keySet()) {
+                            checkState(
+                                    !dvDataFiles.contains(dataFile),
+                                    "Trying to add dv for data file %s which 
is already added.",
+                                    dataFile);
+                            dvDataFiles.add(dataFile);
+                        }
+                    }
+                    indexEntries.put(fileName, entry);
                 } else {
-                    indexEntries.remove(entry.indexFile().fileName());
+                    checkState(
+                            indexEntries.containsKey(fileName),
+                            "Trying to delete file %s which is not exists.",
+                            fileName);
+                    if (dvRanges != null) {
+                        for (String dataFile : dvRanges.keySet()) {
+                            checkState(
+                                    dvDataFiles.contains(dataFile),
+                                    "Trying to delete dv for data file %s 
which is not exists.",
+                                    dataFile);
+                            dvDataFiles.remove(dataFile);
+                        }
+                    }
+                    indexEntries.remove(fileName);
                 }
             }
             return new ArrayList<>(indexEntries.values());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index e3e6140608..97e67ec5cc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -154,7 +154,6 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     @Nullable private Long strictModeLastSafeSnapshot;
     private final InternalRowPartitionComputer partitionComputer;
     private final boolean rowTrackingEnabled;
-    private final boolean isPkTable;
     private final boolean deletionVectorsEnabled;
     private final IndexFileHandler indexFileHandler;
 
@@ -194,7 +193,6 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             long commitMaxRetryWait,
             @Nullable Long strictModeLastSafeSnapshot,
             boolean rowTrackingEnabled,
-            boolean isPkTable,
             boolean deletionVectorsEnabled,
             IndexFileHandler indexFileHandler) {
         this.snapshotCommit = snapshotCommit;
@@ -240,7 +238,6 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         this.statsFileHandler = statsFileHandler;
         this.bucketMode = bucketMode;
         this.rowTrackingEnabled = rowTrackingEnabled;
-        this.isPkTable = isPkTable;
         this.deletionVectorsEnabled = deletionVectorsEnabled;
         this.indexFileHandler = indexFileHandler;
     }
@@ -728,23 +725,23 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                     .forEach(m -> appendChangelog.add(makeEntry(FileKind.ADD, 
commitMessage, m)));
             commitMessage
                     .newFilesIncrement()
-                    .newIndexFiles()
+                    .deletedIndexFiles()
                     .forEach(
                             m ->
                                     appendIndexFiles.add(
                                             new IndexManifestEntry(
-                                                    FileKind.ADD,
+                                                    FileKind.DELETE,
                                                     commitMessage.partition(),
                                                     commitMessage.bucket(),
                                                     m)));
             commitMessage
                     .newFilesIncrement()
-                    .deletedIndexFiles()
+                    .newIndexFiles()
                     .forEach(
                             m ->
                                     appendIndexFiles.add(
                                             new IndexManifestEntry(
-                                                    FileKind.DELETE,
+                                                    FileKind.ADD,
                                                     commitMessage.partition(),
                                                     commitMessage.bucket(),
                                                     m)));
@@ -766,23 +763,23 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                     .forEach(m -> compactChangelog.add(makeEntry(FileKind.ADD, 
commitMessage, m)));
             commitMessage
                     .compactIncrement()
-                    .newIndexFiles()
+                    .deletedIndexFiles()
                     .forEach(
                             m ->
                                     compactIndexFiles.add(
                                             new IndexManifestEntry(
-                                                    FileKind.ADD,
+                                                    FileKind.DELETE,
                                                     commitMessage.partition(),
                                                     commitMessage.bucket(),
                                                     m)));
             commitMessage
                     .compactIncrement()
-                    .deletedIndexFiles()
+                    .newIndexFiles()
                     .forEach(
                             m ->
                                     compactIndexFiles.add(
                                             new IndexManifestEntry(
-                                                    FileKind.DELETE,
+                                                    FileKind.ADD,
                                                     commitMessage.partition(),
                                                     commitMessage.bucket(),
                                                     m)));
@@ -1419,7 +1416,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             List<IndexManifestEntry> deltaIndexEntries,
             CommitKind commitKind) {
         String baseCommitUser = snapshot.commitUser();
-        if (checkForDeletionVector(commitKind)) {
+        if (checkForDeletionVector()) {
             // Enrich dvName in fileEntry to checker for base ADD dv and delta 
DELETE dv.
             // For example:
             // If the base file is <ADD baseFile1, ADD dv1>,
@@ -1443,52 +1440,72 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         List<SimpleFileEntry> allEntries = new ArrayList<>(baseEntries);
         allEntries.addAll(deltaEntries);
 
-        if (commitKind != CommitKind.OVERWRITE) {
-            // total buckets within the same partition should remain the same
-            Map<BinaryRow, Integer> totalBuckets = new HashMap<>();
-            for (SimpleFileEntry entry : allEntries) {
-                if (entry.totalBuckets() <= 0) {
-                    continue;
-                }
-
-                if (!totalBuckets.containsKey(entry.partition())) {
-                    totalBuckets.put(entry.partition(), entry.totalBuckets());
-                    continue;
-                }
-
-                int old = totalBuckets.get(entry.partition());
-                if (old == entry.totalBuckets()) {
-                    continue;
-                }
-
-                Pair<RuntimeException, RuntimeException> conflictException =
-                        createConflictException(
-                                "Total buckets of partition "
-                                        + entry.partition()
-                                        + " changed from "
-                                        + old
-                                        + " to "
-                                        + entry.totalBuckets()
-                                        + " without overwrite. Give up 
committing.",
-                                baseCommitUser,
-                                baseEntries,
-                                deltaEntries,
-                                null);
-                LOG.warn("", conflictException.getLeft());
-                throw conflictException.getRight();
-            }
-        }
+        checkBucketKeepSame(baseEntries, deltaEntries, commitKind, allEntries, 
baseCommitUser);
 
+        Function<Throwable, RuntimeException> conflictException =
+                conflictException(baseCommitUser, baseEntries, deltaEntries);
         Collection<SimpleFileEntry> mergedEntries;
         try {
             // merge manifest entries and also check if the files we want to 
delete are still there
             mergedEntries = FileEntry.mergeEntries(allEntries);
         } catch (Throwable e) {
-            throw conflictException(commitUser, baseEntries, 
deltaEntries).apply(e);
+            throw conflictException.apply(e);
+        }
+
+        checkNoDeleteInMergedEntries(mergedEntries, conflictException);
+        checkKeyRangeNoConflicts(baseEntries, deltaEntries, mergedEntries, 
baseCommitUser);
+    }
+
+    private void checkBucketKeepSame(
+            List<SimpleFileEntry> baseEntries,
+            List<SimpleFileEntry> deltaEntries,
+            CommitKind commitKind,
+            List<SimpleFileEntry> allEntries,
+            String baseCommitUser) {
+        if (commitKind == CommitKind.OVERWRITE) {
+            return;
         }
 
-        assertNoDelete(mergedEntries, conflictException(commitUser, 
baseEntries, deltaEntries));
+        // total buckets within the same partition should remain the same
+        Map<BinaryRow, Integer> totalBuckets = new HashMap<>();
+        for (SimpleFileEntry entry : allEntries) {
+            if (entry.totalBuckets() <= 0) {
+                continue;
+            }
+
+            if (!totalBuckets.containsKey(entry.partition())) {
+                totalBuckets.put(entry.partition(), entry.totalBuckets());
+                continue;
+            }
+
+            int old = totalBuckets.get(entry.partition());
+            if (old == entry.totalBuckets()) {
+                continue;
+            }
+
+            Pair<RuntimeException, RuntimeException> conflictException =
+                    createConflictException(
+                            "Total buckets of partition "
+                                    + entry.partition()
+                                    + " changed from "
+                                    + old
+                                    + " to "
+                                    + entry.totalBuckets()
+                                    + " without overwrite. Give up 
committing.",
+                            baseCommitUser,
+                            baseEntries,
+                            deltaEntries,
+                            null);
+            LOG.warn("", conflictException.getLeft());
+            throw conflictException.getRight();
+        }
+    }
 
+    private void checkKeyRangeNoConflicts(
+            List<SimpleFileEntry> baseEntries,
+            List<SimpleFileEntry> deltaEntries,
+            Collection<SimpleFileEntry> mergedEntries,
+            String baseCommitUser) {
         // fast exit for file store without keys
         if (keyComparator == null) {
             return;
@@ -1548,26 +1565,11 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         };
     }
 
-    private boolean checkForDeletionVector(CommitKind commitKind) {
-        if (!deletionVectorsEnabled) {
-            return false;
-        }
-
-        // todo: Add them once contains DELETE type.
-        // PK table's compact dv index only contains ADD type, skip conflict 
detection.
-        if (isPkTable && commitKind == CommitKind.COMPACT) {
-            return false;
-        }
-
-        // Non-PK table's hash fixed bucket mode only contains ADD type, skip 
conflict detection.
-        if (!isPkTable && bucketMode.equals(BucketMode.HASH_FIXED)) {
-            return false;
-        }
-
-        return true;
+    private boolean checkForDeletionVector() {
+        return deletionVectorsEnabled && 
bucketMode.equals(BucketMode.BUCKET_UNAWARE);
     }
 
-    private void assertNoDelete(
+    private void checkNoDeleteInMergedEntries(
             Collection<SimpleFileEntry> mergedEntries,
             Function<Throwable, RuntimeException> exceptionFunction) {
         try {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 505417a412..c4bcc74f56 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -912,13 +912,13 @@ public class FileStoreCommitTest {
         assertThat(dvs.get("f2").isDeleted(4)).isTrue();
 
         // commit 2
-        CommitMessage commitMessage3 =
-                store.writeDVIndexFiles(
-                        partition, 0, Collections.singletonMap("f2", 
Arrays.asList(3)));
         List<IndexFileMeta> deleted =
                 new 
ArrayList<>(commitMessage1.newFilesIncrement().newIndexFiles());
         deleted.addAll(commitMessage2.newFilesIncrement().newIndexFiles());
-        CommitMessage commitMessage4 = store.removeIndexFiles(partition, 0, 
deleted);
+        CommitMessage commitMessage3 = store.removeIndexFiles(partition, 0, 
deleted);
+        CommitMessageImpl commitMessage4 =
+                store.writeDVIndexFiles(
+                        partition, 0, Collections.singletonMap("f2", 
Arrays.asList(3)));
         store.commit(commitMessage3, commitMessage4);
 
         // assert 2
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
index bfbadc7624..0beadce6a3 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
@@ -23,6 +23,8 @@ import org.apache.paimon.spark.{PaimonAppendTable, 
PaimonPrimaryKeyTable, Paimon
 
 import org.apache.spark.sql.Row
 
+import java.util.concurrent.Executors
+
 import scala.concurrent.{Await, Future}
 import scala.concurrent.ExecutionContext.Implicits.global
 import scala.concurrent.duration.DurationInt
@@ -805,4 +807,52 @@ trait MergeIntoAppendTableTest extends PaimonSparkTestBase 
with PaimonAppendTabl
       }
     }
   }
+
+  test("Paimon MergeInto: concurrent two merge") {
+    for (dvEnabled <- Seq("true", "false")) {
+      withTable("s", "t") {
+        sql("CREATE TABLE s (id INT, b INT, c INT)")
+        sql(
+          "INSERT INTO s VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4), 
(5, 5, 5), (6, 6, 6), (7, 7, 7), (8, 8, 8), (9, 9, 9)")
+
+        sql(
+          s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES 
('deletion-vectors.enabled' = '$dvEnabled')")
+        sql(
+          "INSERT INTO t VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4), 
(5, 5, 5), (6, 6, 6), (7, 7, 7), (8, 8, 8), (9, 9, 9)")
+
+        def doMergeInto(): Unit = {
+          for (i <- 1 to 9) {
+            try {
+              sql(s"""
+                     |MERGE INTO t
+                     |USING (SELECT * FROM s WHERE id = $i)
+                     |ON t.id = s.id
+                     |WHEN MATCHED THEN
+                     |UPDATE SET t.id = s.id, t.b = s.b + t.b, t.c = s.c + t.c
+                     |""".stripMargin)
+            } catch {
+              case a: Throwable =>
+                assert(
+                  a.getMessage.contains("Conflicts during commits") || 
a.getMessage.contains(
+                    "Missing file"))
+            }
+            checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(9)))
+          }
+        }
+
+        val executor = Executors.newFixedThreadPool(2)
+        val runnable = new Runnable {
+          override def run(): Unit = doMergeInto()
+        }
+
+        val future1 = executor.submit(runnable)
+        val future2 = executor.submit(runnable)
+
+        future1.get()
+        future2.get()
+
+        executor.shutdown()
+      }
+    }
+  }
 }

Reply via email to