This is an automated email from the ASF dual-hosted git repository.

junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f702adb04 [core] Add conflict detection for data evolution row id 
(#7124)
6f702adb04 is described below

commit 6f702adb04c271f9a5d20024de87a49efdef308b
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jan 28 14:49:36 2026 +0800

    [core] Add conflict detection for data evolution row id (#7124)
---
 .../main/java/org/apache/paimon/utils/Range.java   |   4 +
 .../java/org/apache/paimon/AbstractFileStore.java  |  26 ++--
 .../apache/paimon/manifest/ExpireFileEntry.java    |  12 +-
 .../java/org/apache/paimon/manifest/FileEntry.java |   6 +
 .../apache/paimon/manifest/PojoManifestEntry.java  |  10 ++
 .../apache/paimon/manifest/SimpleFileEntry.java    |  46 +++++-
 .../paimon/manifest/SimpleFileEntryWithDV.java     |   4 +-
 .../apache/paimon/operation/FileStoreCommit.java   |   4 +
 .../paimon/operation/FileStoreCommitImpl.java      |  12 +-
 .../paimon/operation/commit/CommitScanner.java     |   9 ++
 .../paimon/operation/commit/ConflictDetection.java | 167 +++++++++++++++++++--
 .../operation/commit/ManifestEntryChanges.java     |   5 +-
 .../paimon/table/sink/BatchWriteBuilderImpl.java   |   9 +-
 .../apache/paimon/table/sink/InnerTableCommit.java |   2 +
 .../apache/paimon/table/sink/TableCommitImpl.java  |   6 +
 .../operation/commit/ConflictDetectionTest.java    |   2 +
 .../MergeIntoPaimonDataEvolutionTable.scala        |  16 +-
 .../paimon/spark/commands/PaimonSparkWriter.scala  |   4 +
 .../paimon/spark/sql/RowTrackingTestBase.scala     | 129 ++++++++++++++++
 19 files changed, 426 insertions(+), 47 deletions(-)

diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
index 1cd80bc56b..461a68fbab 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
@@ -47,6 +47,10 @@ public class Range implements Serializable {
         return new Range(from + offset, to + offset);
     }
 
+    public boolean hasIntersection(Range range) {
+        return from <= range.to && to >= range.from;
+    }
+
     public boolean isBefore(Range other) {
         return to < other.from;
     }
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index edf7e16294..70108a1920 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -275,16 +275,20 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
         if (snapshotCommit == null) {
             snapshotCommit = new RenamingSnapshotCommit(snapshotManager, 
Lock.empty());
         }
-        ConflictDetection conflictDetection =
-                new ConflictDetection(
-                        tableName,
-                        commitUser,
-                        partitionType,
-                        pathFactory(),
-                        newKeyComparator(),
-                        bucketMode(),
-                        options.deletionVectorsEnabled(),
-                        newIndexFileHandler());
+        ConflictDetection.Factory conflictDetectFactory =
+                scanner ->
+                        new ConflictDetection(
+                                tableName,
+                                commitUser,
+                                partitionType,
+                                pathFactory(),
+                                newKeyComparator(),
+                                bucketMode(),
+                                options.deletionVectorsEnabled(),
+                                options.dataEvolutionEnabled(),
+                                newIndexFileHandler(),
+                                snapshotManager,
+                                scanner);
         StrictModeChecker strictModeChecker =
                 StrictModeChecker.create(
                         snapshotManager,
@@ -327,7 +331,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 options.commitMaxRetryWait(),
                 options.rowTrackingEnabled(),
                 options.commitDiscardDuplicateFiles(),
-                conflictDetection,
+                conflictDetectFactory,
                 strictModeChecker,
                 rollback);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
index 48ab6d5d5c..024eac2b20 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java
@@ -43,7 +43,9 @@ public class ExpireFileEntry extends SimpleFileEntry {
             BinaryRow minKey,
             BinaryRow maxKey,
             @Nullable FileSource fileSource,
-            @Nullable String externalPath) {
+            @Nullable String externalPath,
+            long rowCount,
+            @Nullable Long firstRowId) {
         super(
                 kind,
                 partition,
@@ -55,7 +57,9 @@ public class ExpireFileEntry extends SimpleFileEntry {
                 embeddedIndex,
                 minKey,
                 maxKey,
-                externalPath);
+                externalPath,
+                rowCount,
+                firstRowId);
         this.fileSource = fileSource;
     }
 
@@ -76,7 +80,9 @@ public class ExpireFileEntry extends SimpleFileEntry {
                 entry.minKey(),
                 entry.maxKey(),
                 entry.file().fileSource().orElse(null),
-                entry.externalPath());
+                entry.externalPath(),
+                entry.rowCount(),
+                entry.firstRowId());
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index 9d649d6f39..3a80088255 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -67,11 +67,17 @@ public interface FileEntry {
 
     List<String> extraFiles();
 
+    long rowCount();
+
+    @Nullable
+    Long firstRowId();
+
     /**
      * The same {@link Identifier} indicates that the {@link ManifestEntry} 
refers to the same data
      * file.
      */
     class Identifier {
+
         public final BinaryRow partition;
         public final int bucket;
         public final int level;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/PojoManifestEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/PojoManifestEntry.java
index 7329a5b6c7..5514bd4cd9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/PojoManifestEntry.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/PojoManifestEntry.java
@@ -91,6 +91,16 @@ public class PojoManifestEntry implements ManifestEntry {
         return file.extraFiles();
     }
 
+    @Override
+    public long rowCount() {
+        return file.rowCount();
+    }
+
+    @Override
+    public @Nullable Long firstRowId() {
+        return file.firstRowId();
+    }
+
     @Override
     public int totalBuckets() {
         return totalBuckets;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
index e0da3c8d53..c1f71c8bda 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
 /** A simple {@link FileEntry} only contains identifier and min max key. */
 public class SimpleFileEntry implements FileEntry {
 
@@ -40,6 +42,8 @@ public class SimpleFileEntry implements FileEntry {
     private final BinaryRow minKey;
     private final BinaryRow maxKey;
     @Nullable private final String externalPath;
+    private final long rowCount;
+    @Nullable private final Long firstRowId;
 
     public SimpleFileEntry(
             FileKind kind,
@@ -52,7 +56,9 @@ public class SimpleFileEntry implements FileEntry {
             @Nullable byte[] embeddedIndex,
             BinaryRow minKey,
             BinaryRow maxKey,
-            @Nullable String externalPath) {
+            @Nullable String externalPath,
+            long rowCount,
+            @Nullable Long firstRowId) {
         this.kind = kind;
         this.partition = partition;
         this.bucket = bucket;
@@ -64,6 +70,8 @@ public class SimpleFileEntry implements FileEntry {
         this.minKey = minKey;
         this.maxKey = maxKey;
         this.externalPath = externalPath;
+        this.rowCount = rowCount;
+        this.firstRowId = firstRowId;
     }
 
     public static SimpleFileEntry from(ManifestEntry entry) {
@@ -78,7 +86,9 @@ public class SimpleFileEntry implements FileEntry {
                 entry.file().embeddedIndex(),
                 entry.minKey(),
                 entry.maxKey(),
-                entry.externalPath());
+                entry.externalPath(),
+                entry.file().rowCount(),
+                entry.firstRowId());
     }
 
     public SimpleFileEntry toDelete() {
@@ -93,7 +103,9 @@ public class SimpleFileEntry implements FileEntry {
                 embeddedIndex,
                 minKey,
                 maxKey,
-                externalPath);
+                externalPath,
+                rowCount,
+                firstRowId);
     }
 
     public static List<SimpleFileEntry> from(List<ManifestEntry> entries) {
@@ -162,6 +174,22 @@ public class SimpleFileEntry implements FileEntry {
         return extraFiles;
     }
 
+    @Override
+    public long rowCount() {
+        return rowCount;
+    }
+
+    @Override
+    public @Nullable Long firstRowId() {
+        return firstRowId;
+    }
+
+    public long nonNullFirstRowId() {
+        Long firstRowId = firstRowId();
+        checkArgument(firstRowId != null, "First row id of '%s' should not be 
null.", fileName());
+        return firstRowId;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -180,7 +208,9 @@ public class SimpleFileEntry implements FileEntry {
                 && Objects.equals(extraFiles, that.extraFiles)
                 && Objects.equals(minKey, that.minKey)
                 && Objects.equals(maxKey, that.maxKey)
-                && Objects.equals(externalPath, that.externalPath);
+                && Objects.equals(externalPath, that.externalPath)
+                && rowCount == that.rowCount
+                && Objects.equals(firstRowId, that.firstRowId);
     }
 
     @Override
@@ -195,7 +225,9 @@ public class SimpleFileEntry implements FileEntry {
                 extraFiles,
                 minKey,
                 maxKey,
-                externalPath);
+                externalPath,
+                rowCount,
+                firstRowId);
     }
 
     @Override
@@ -221,6 +253,10 @@ public class SimpleFileEntry implements FileEntry {
                 + maxKey
                 + ", externalPath="
                 + externalPath
+                + ", rowCount="
+                + rowCount
+                + ", firstRowId="
+                + firstRowId
                 + '}';
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java
index 75d73f345f..6b4c479157 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java
@@ -39,7 +39,9 @@ public class SimpleFileEntryWithDV extends SimpleFileEntry {
                 entry.embeddedIndex(),
                 entry.minKey(),
                 entry.maxKey(),
-                entry.externalPath());
+                entry.externalPath(),
+                entry.rowCount(),
+                entry.firstRowId());
         this.dvFileName = dvFileName;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index ecd4975858..31fb3c52ca 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -26,6 +26,8 @@ import org.apache.paimon.stats.Statistics;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.utils.FileStorePathFactory;
 
+import javax.annotation.Nullable;
+
 import java.util.List;
 import java.util.Map;
 
@@ -38,6 +40,8 @@ public interface FileStoreCommit extends AutoCloseable {
 
     FileStoreCommit appendCommitCheckConflict(boolean 
appendCommitCheckConflict);
 
+    FileStoreCommit rowIdCheckConflict(@Nullable Long rowIdCheckFromSnapshot);
+
     /** Find out which committables need to be retried when recovering from 
the failure. */
     List<ManifestCommittable> filterCommitted(List<ManifestCommittable> 
committables);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 6494b61855..41ba1412c4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -197,7 +197,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             long commitMaxRetryWait,
             boolean rowTrackingEnabled,
             boolean discardDuplicateFiles,
-            ConflictDetection conflictDetection,
+            ConflictDetection.Factory conflictDetectFactory,
             @Nullable StrictModeChecker strictModeChecker,
             @Nullable CommitRollback rollback) {
         this.snapshotCommit = snapshotCommit;
@@ -239,7 +239,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         this.rowTrackingEnabled = rowTrackingEnabled;
         this.discardDuplicateFiles = discardDuplicateFiles;
         this.strictModeChecker = strictModeChecker;
-        this.conflictDetection = conflictDetection;
+        this.conflictDetection = conflictDetectFactory.create(scanner);
         this.commitCleaner = new CommitCleaner(manifestList, manifestFile, 
indexManifestFile);
     }
 
@@ -261,6 +261,12 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         return this;
     }
 
+    @Override
+    public FileStoreCommit rowIdCheckConflict(@Nullable Long 
rowIdCheckFromSnapshot) {
+        
this.conflictDetection.setRowIdCheckFromSnapshot(rowIdCheckFromSnapshot);
+        return this;
+    }
+
     @Override
     public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> 
committables) {
         // nothing to filter, fast exit
@@ -320,7 +326,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                 }
 
                 boolean allowRollback = false;
-                if (containsFileDeletionOrDeletionVectors(
+                if (conflictDetection.shouldBeOverwriteCommit(
                         appendSimpleEntries, changes.appendIndexFiles)) {
                     commitKind = CommitKind.OVERWRITE;
                     checkAppendFiles = true;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
index da8b5c7563..9afe4500a4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/CommitScanner.java
@@ -68,6 +68,15 @@ public class CommitScanner {
         return entries;
     }
 
+    public List<ManifestEntry> readIncrementalEntries(
+            Snapshot snapshot, List<BinaryRow> changedPartitions) {
+        return scan.withSnapshot(snapshot)
+                .withKind(ScanMode.DELTA)
+                .withPartitionFilter(changedPartitions)
+                .plan()
+                .files();
+    }
+
     public List<SimpleFileEntry> readAllEntriesFromChangedPartitions(
             Snapshot snapshot, List<BinaryRow> changedPartitions) {
         try {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
index 990b47f0f6..15f6359417 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
@@ -25,9 +25,11 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.index.DeletionVectorMeta;
 import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileEntry;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.SimpleFileEntry;
 import org.apache.paimon.manifest.SimpleFileEntryWithDV;
 import org.apache.paimon.operation.PartitionExpire;
@@ -35,6 +37,9 @@ import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RangeHelper;
+import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,6 +61,9 @@ import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
+import static org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
+import static 
org.apache.paimon.operation.commit.ManifestEntryChanges.changedPartitions;
 import static 
org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
@@ -71,9 +79,13 @@ public class ConflictDetection {
     private final @Nullable Comparator<InternalRow> keyComparator;
     private final BucketMode bucketMode;
     private final boolean deletionVectorsEnabled;
+    private final boolean dataEvolutionEnabled;
     private final IndexFileHandler indexFileHandler;
+    private final SnapshotManager snapshotManager;
+    private final CommitScanner commitScanner;
 
     private @Nullable PartitionExpire partitionExpire;
+    private @Nullable Long rowIdCheckFromSnapshot = null;
 
     public ConflictDetection(
             String tableName,
@@ -83,7 +95,10 @@ public class ConflictDetection {
             @Nullable Comparator<InternalRow> keyComparator,
             BucketMode bucketMode,
             boolean deletionVectorsEnabled,
-            IndexFileHandler indexFileHandler) {
+            boolean dataEvolutionEnabled,
+            IndexFileHandler indexFileHandler,
+            SnapshotManager snapshotManager,
+            CommitScanner commitScanner) {
         this.tableName = tableName;
         this.commitUser = commitUser;
         this.partitionType = partitionType;
@@ -91,7 +106,14 @@ public class ConflictDetection {
         this.keyComparator = keyComparator;
         this.bucketMode = bucketMode;
         this.deletionVectorsEnabled = deletionVectorsEnabled;
+        this.dataEvolutionEnabled = dataEvolutionEnabled;
         this.indexFileHandler = indexFileHandler;
+        this.snapshotManager = snapshotManager;
+        this.commitScanner = commitScanner;
+    }
+
+    public void setRowIdCheckFromSnapshot(@Nullable Long 
rowIdCheckFromSnapshot) {
+        this.rowIdCheckFromSnapshot = rowIdCheckFromSnapshot;
     }
 
     @Nullable
@@ -103,14 +125,29 @@ public class ConflictDetection {
         this.partitionExpire = partitionExpire;
     }
 
+    public <T extends FileEntry> boolean shouldBeOverwriteCommit(
+            List<T> appendFileEntries, List<IndexManifestEntry> 
appendIndexFiles) {
+        for (T appendFileEntry : appendFileEntries) {
+            if (appendFileEntry.kind().equals(FileKind.DELETE)) {
+                return true;
+            }
+        }
+        for (IndexManifestEntry appendIndexFile : appendIndexFiles) {
+            if 
(appendIndexFile.indexFile().indexType().equals(DELETION_VECTORS_INDEX)) {
+                return true;
+            }
+        }
+        return rowIdCheckFromSnapshot != null;
+    }
+
     public Optional<RuntimeException> checkConflicts(
-            Snapshot snapshot,
+            Snapshot latestSnapshot,
             List<SimpleFileEntry> baseEntries,
             List<SimpleFileEntry> deltaEntries,
             List<IndexManifestEntry> deltaIndexEntries,
             CommitKind commitKind) {
-        String baseCommitUser = snapshot.commitUser();
-        if (checkForDeletionVector()) {
+        String baseCommitUser = latestSnapshot.commitUser();
+        if (deletionVectorsEnabled && 
bucketMode.equals(BucketMode.BUCKET_UNAWARE)) {
             // Enrich dvName in fileEntry to checker for base ADD dv and delta 
DELETE dv.
             // For example:
             // If the base file is <ADD baseFile1, ADD dv1>,
@@ -121,9 +158,10 @@ public class ConflictDetection {
                 baseEntries =
                         buildBaseEntriesWithDV(
                                 baseEntries,
-                                snapshot.indexManifest() == null
+                                latestSnapshot.indexManifest() == null
                                         ? Collections.emptyList()
-                                        : 
indexFileHandler.readManifest(snapshot.indexManifest()));
+                                        : indexFileHandler.readManifest(
+                                                
latestSnapshot.indexManifest()));
                 deltaEntries =
                         buildDeltaEntriesWithDV(baseEntries, deltaEntries, 
deltaIndexEntries);
             } catch (Throwable e) {
@@ -165,7 +203,17 @@ public class ConflictDetection {
         if (exception.isPresent()) {
             return exception;
         }
-        return checkKeyRange(baseEntries, deltaEntries, mergedEntries, 
baseCommitUser);
+        exception = checkKeyRange(baseEntries, deltaEntries, mergedEntries, 
baseCommitUser);
+        if (exception.isPresent()) {
+            return exception;
+        }
+
+        exception = checkRowIdRangeConflicts(commitKind, mergedEntries);
+        if (exception.isPresent()) {
+            return exception;
+        }
+
+        return checkForRowIdFromSnapshot(latestSnapshot, deltaEntries, 
deltaIndexEntries);
     }
 
     private Optional<RuntimeException> checkBucketKeepSame(
@@ -279,10 +327,6 @@ public class ConflictDetection {
         };
     }
 
-    private boolean checkForDeletionVector() {
-        return deletionVectorsEnabled && 
bucketMode.equals(BucketMode.BUCKET_UNAWARE);
-    }
-
     private Optional<RuntimeException> checkDeleteInEntries(
             Collection<SimpleFileEntry> mergedEntries,
             Function<Throwable, RuntimeException> exceptionFunction) {
@@ -295,7 +339,7 @@ public class ConflictDetection {
                         tableName);
             }
         } catch (Throwable e) {
-            Optional<RuntimeException> exception = 
assertConflictForPartitionExpire(mergedEntries);
+            Optional<RuntimeException> exception = 
checkConflictForPartitionExpire(mergedEntries);
             if (exception.isPresent()) {
                 return exception;
             }
@@ -304,7 +348,7 @@ public class ConflictDetection {
         return Optional.empty();
     }
 
-    private Optional<RuntimeException> assertConflictForPartitionExpire(
+    private Optional<RuntimeException> checkConflictForPartitionExpire(
             Collection<SimpleFileEntry> mergedEntries) {
         if (partitionExpire != null && partitionExpire.isValueExpiration()) {
             Set<BinaryRow> deletedPartitions = new HashSet<>();
@@ -332,6 +376,98 @@ public class ConflictDetection {
         return Optional.empty();
     }
 
+    private Optional<RuntimeException> checkRowIdRangeConflicts(
+            CommitKind commitKind, Collection<SimpleFileEntry> mergedEntries) {
+        if (!dataEvolutionEnabled) {
+            return Optional.empty();
+        }
+        if (rowIdCheckFromSnapshot == null && commitKind != 
CommitKind.COMPACT) {
+            return Optional.empty();
+        }
+
+        List<SimpleFileEntry> entries =
+                mergedEntries.stream()
+                        .filter(file -> file.firstRowId() != null)
+                        .collect(Collectors.toList());
+
+        RangeHelper<SimpleFileEntry> rangeHelper =
+                new RangeHelper<>(
+                        SimpleFileEntry::nonNullFirstRowId,
+                        f -> f.nonNullFirstRowId() + f.rowCount() - 1);
+        List<List<SimpleFileEntry>> merged = 
rangeHelper.mergeOverlappingRanges(entries);
+        for (List<SimpleFileEntry> group : merged) {
+            List<SimpleFileEntry> dataFiles = new ArrayList<>();
+            for (SimpleFileEntry f : group) {
+                if (!isBlobFile(f.fileName())) {
+                    dataFiles.add(f);
+                }
+            }
+            if (!rangeHelper.areAllRangesSame(dataFiles)) {
+                return Optional.of(
+                        new RuntimeException(
+                                "For Data Evolution table, multiple 'MERGE 
INTO' and 'COMPACT' operations "
+                                        + "have encountered conflicts, data 
files: "
+                                        + dataFiles));
+            }
+        }
+        return Optional.empty();
+    }
+
+    private Optional<RuntimeException> checkForRowIdFromSnapshot(
+            Snapshot latestSnapshot,
+            List<SimpleFileEntry> deltaEntries,
+            List<IndexManifestEntry> deltaIndexEntries) {
+        if (!dataEvolutionEnabled) {
+            return Optional.empty();
+        }
+        if (rowIdCheckFromSnapshot == null) {
+            return Optional.empty();
+        }
+
+        List<BinaryRow> changedPartitions = changedPartitions(deltaEntries, 
deltaIndexEntries);
+        // collect history row id ranges
+        List<Range> historyIdRanges = new ArrayList<>();
+        for (SimpleFileEntry entry : deltaEntries) {
+            Long firstRowId = entry.firstRowId();
+            long rowCount = entry.rowCount();
+            if (firstRowId != null) {
+                historyIdRanges.add(new Range(firstRowId, firstRowId + 
rowCount - 1));
+            }
+        }
+
+        // check history row id ranges
+        Long checkNextRowId = 
snapshotManager.snapshot(rowIdCheckFromSnapshot).nextRowId();
+        checkState(
+                checkNextRowId != null,
+                "Next row id cannot be null for snapshot %s.",
+                rowIdCheckFromSnapshot);
+        for (long i = rowIdCheckFromSnapshot + 1; i <= latestSnapshot.id(); 
i++) {
+            Snapshot snapshot = snapshotManager.snapshot(i);
+            if (snapshot.commitKind() == CommitKind.COMPACT) {
+                continue;
+            }
+            List<ManifestEntry> changes =
+                    commitScanner.readIncrementalEntries(snapshot, 
changedPartitions);
+            for (ManifestEntry entry : changes) {
+                DataFileMeta file = entry.file();
+                long firstRowId = file.nonNullFirstRowId();
+                if (firstRowId < checkNextRowId) {
+                    Range fileRange = new Range(firstRowId, firstRowId + 
file.rowCount() - 1);
+                    for (Range range : historyIdRanges) {
+                        if (range.hasIntersection(fileRange)) {
+                            return Optional.of(
+                                    new RuntimeException(
+                                            "For Data Evolution table, 
multiple 'MERGE INTO' operations have encountered conflicts,"
+                                                    + " updating the same 
file, which can render some updates ineffective."));
+                        }
+                    }
+                }
+            }
+        }
+
+        return Optional.empty();
+    }
+
     static List<SimpleFileEntry> buildBaseEntriesWithDV(
             List<SimpleFileEntry> baseEntries, List<IndexManifestEntry> 
baseIndexEntries) {
         if (baseEntries.isEmpty()) {
@@ -560,4 +696,9 @@ public class ConflictDetection {
             return Objects.hash(partition, bucket, level);
         }
     }
+
+    /** Factory to create {@link ConflictDetection}. */
+    public interface Factory {
+        ConflictDetection create(CommitScanner scanner);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java
index dcea67e4df..f32f6c9ef5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java
@@ -20,6 +20,7 @@ package org.apache.paimon.operation.commit;
 
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.FileEntry;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.manifest.ManifestEntry;
@@ -163,9 +164,9 @@ public class ManifestEntryChanges {
     }
 
     public static List<BinaryRow> changedPartitions(
-            List<ManifestEntry> dataFileChanges, List<IndexManifestEntry> 
indexFileChanges) {
+            List<? extends FileEntry> dataFileChanges, 
List<IndexManifestEntry> indexFileChanges) {
         Set<BinaryRow> changedPartitions = new HashSet<>();
-        for (ManifestEntry file : dataFileChanges) {
+        for (FileEntry file : dataFileChanges) {
             changedPartitions.add(file.partition());
         }
         for (IndexManifestEntry file : indexFileChanges) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
index 66c67e8965..c06bd9de42 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
@@ -41,6 +41,7 @@ public class BatchWriteBuilderImpl implements 
BatchWriteBuilder {
 
     private Map<String, String> staticPartition;
     private boolean appendCommitCheckConflict = false;
+    private @Nullable Long rowIdCheckFromSnapshot = null;
 
     public BatchWriteBuilderImpl(InnerTable table) {
         this.table = table;
@@ -85,7 +86,8 @@ public class BatchWriteBuilderImpl implements 
BatchWriteBuilder {
         InnerTableCommit commit =
                 table.newCommit(commitUser)
                         .withOverwrite(staticPartition)
-                        .appendCommitCheckConflict(appendCommitCheckConflict);
+                        .appendCommitCheckConflict(appendCommitCheckConflict)
+                        .rowIdCheckConflict(rowIdCheckFromSnapshot);
         commit.ignoreEmptyCommit(
                 Options.fromMap(table.options())
                         .getOptional(CoreOptions.SNAPSHOT_IGNORE_EMPTY_COMMIT)
@@ -101,4 +103,9 @@ public class BatchWriteBuilderImpl implements 
BatchWriteBuilder {
         this.appendCommitCheckConflict = appendCommitCheckConflict;
         return this;
     }
+
+    public BatchWriteBuilderImpl rowIdCheckConflict(@Nullable Long 
rowIdCheckFromSnapshot) {
+        this.rowIdCheckFromSnapshot = rowIdCheckFromSnapshot;
+        return this;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
index 0a8fdd6742..a73771f218 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
@@ -48,6 +48,8 @@ public interface InnerTableCommit extends StreamTableCommit, 
BatchTableCommit {
 
     InnerTableCommit appendCommitCheckConflict(boolean 
appendCommitCheckConflict);
 
+    InnerTableCommit rowIdCheckConflict(@Nullable Long rowIdCheckFromSnapshot);
+
     @Override
     InnerTableCommit withMetricRegistry(MetricRegistry registry);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 8bd65b05c3..0311c9bbe4 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -164,6 +164,12 @@ public class TableCommitImpl implements InnerTableCommit {
         return this;
     }
 
+    @Override
+    public TableCommitImpl rowIdCheckConflict(@Nullable Long 
rowIdCheckFromSnapshot) {
+        commit.rowIdCheckConflict(rowIdCheckFromSnapshot);
+        return this;
+    }
+
     @Override
     public InnerTableCommit withMetricRegistry(MetricRegistry registry) {
         commit.withMetrics(new CommitMetrics(registry, tableName));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
index e75e4d0358..522cb73e72 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
@@ -303,6 +303,8 @@ class ConflictDetectionTest {
                 null,
                 EMPTY_ROW,
                 EMPTY_ROW,
+                null,
+                0L,
                 null);
     }
 
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
index 82c046e621..f8c2b6c24f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonDataEvolutionTable.scala
@@ -136,16 +136,13 @@ case class MergeIntoPaimonDataEvolutionTable(
   lazy val tableSchema: StructType = v2Table.schema
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    // Avoid that more than one source rows match the same target row.
-    val commitMessages = invokeMergeInto(sparkSession)
-    writer.commit(commitMessages)
+    invokeMergeInto(sparkSession)
     Seq.empty[Row]
   }
 
-  private def invokeMergeInto(sparkSession: SparkSession): Seq[CommitMessage] 
= {
-    val tableSplits: Seq[DataSplit] = table
-      .newSnapshotReader()
-      .read()
+  private def invokeMergeInto(sparkSession: SparkSession): Unit = {
+    val plan = table.newSnapshotReader().read()
+    val tableSplits: Seq[DataSplit] = plan
       .splits()
       .asScala
       .map(_.asInstanceOf[DataSplit])
@@ -197,7 +194,10 @@ case class MergeIntoPaimonDataEvolutionTable(
         insertActionInvoke(sparkSession, touchedFileTargetRelation)
       else Nil
 
-    updateCommit ++ insertCommit
+    if (plan.snapshotId() != null) {
+      writer.rowIdCheckConflict(plan.snapshotId())
+    }
+    writer.commit(updateCommit ++ insertCommit)
   }
 
   private def targetRelatedSplits(
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 721c09e2d5..d81d70f55b 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -403,6 +403,10 @@ case class PaimonSparkWriter(
       .map(deserializeCommitMessage(serializer, _))
   }
 
+  def rowIdCheckConflict(rowIdCheckFromSnapshot: Long): Unit = {
+    
writeBuilder.asInstanceOf[BatchWriteBuilderImpl].rowIdCheckConflict(rowIdCheckFromSnapshot)
+  }
+
   def commit(commitMessages: Seq[CommitMessage]): Unit = {
     val finalWriteBuilder = if (postponeBatchWriteFixedBucket) {
       writeBuilder
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
index 4079cf5eed..0ea9d21bcf 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/RowTrackingTestBase.scala
@@ -87,6 +87,135 @@ abstract class RowTrackingTestBase extends 
PaimonSparkTestBase {
     }
   }
 
+  test("Data Evolution: concurrent merge and merge") {
+    withTable("s", "t") {
+      sql(s"""
+            CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES (
+                 'row-tracking.enabled' = 'true',
+                 'data-evolution.enabled' = 'true')
+          """)
+      sql("INSERT INTO t VALUES (1, 0, 0)")
+      Seq((1, 1, 1)).toDF("id", "b", "c").createOrReplaceTempView("s")
+
+      def doMerge(): Unit = {
+        var success = false
+        while (!success) {
+          try {
+            sql(s"""
+                   |MERGE INTO t
+                   |USING s
+                   |ON t.id = s.id
+                   |WHEN MATCHED THEN
+                   |UPDATE SET t.id = s.id, t.b = s.b + t.b, t.c = s.c + t.c
+                   |""".stripMargin).collect()
+            success = true
+          } catch {
+            case e: Exception =>
+              if (
+                !e.getMessage.contains(
+                  "multiple 'MERGE INTO' operations have encountered 
conflicts")
+              ) {
+                throw e
+              }
+          }
+        }
+      }
+
+      val mergeInto1 = Future {
+        for (_ <- 1 to 10) {
+          doMerge()
+        }
+      }
+
+      val mergeInto2 = Future {
+        for (_ <- 1 to 10) {
+          doMerge()
+        }
+      }
+
+      Await.result(mergeInto1, 60.seconds)
+      Await.result(mergeInto2, 60.seconds)
+
+      checkAnswer(sql("SELECT * FROM t"), Seq(Row(1, 20, 20)))
+    }
+  }
+
+  test("Data Evolution: concurrent merge and small files compact") {
+    withTable("s", "t") {
+      sql(s"""
+            CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES (
+                 'row-tracking.enabled' = 'true',
+                 'compaction.min.file-num' = '2',
+                 'data-evolution.enabled' = 'true')
+          """)
+      sql("INSERT INTO t VALUES (1, 0, 0)")
+      Seq((1, 1, 1)).toDF("id", "b", "c").createOrReplaceTempView("s")
+
+      def doWithRetry(doAction: () => Unit): Unit = {
+        var success = false
+        while (!success) {
+          try {
+            doAction.apply()
+            success = true
+          } catch {
+            case e: Exception =>
+              if (!e.getMessage.contains("multiple 'MERGE INTO' and 'COMPACT' 
operations")) {
+                throw e
+              }
+          }
+        }
+      }
+
+      val mergeInto = Future {
+        for (i <- 1 to 10) {
+          doWithRetry(() => sql(s"""
+                                   |MERGE INTO t
+                                   |USING s
+                                   |ON t.id = s.id
+                                   |WHEN MATCHED THEN
+                                   |UPDATE SET t.id = s.id, t.b = s.b + t.b, 
t.c = s.c + t.c
+                                   |""".stripMargin).collect())
+          if (i > 1) {
+            sql(s"INSERT INTO t VALUES ($i, $i, $i)")
+          }
+        }
+      }
+
+      val t = loadTable("t")
+
+      def canBeCompacted: Boolean = {
+        val split = 
t.newSnapshotReader().read().splits().get(0).asInstanceOf[DataSplit]
+        split.dataFiles().size() > 1
+      }
+
+      val compact = Future {
+        for (_ <- 1 to 10) {
+          while (!canBeCompacted) {
+            Thread.sleep(1)
+          }
+          doWithRetry(() => sql("CALL sys.compact(table => 't')"))
+        }
+      }
+
+      Await.result(mergeInto, 60.seconds)
+      Await.result(compact, 60.seconds)
+
+      checkAnswer(
+        sql("SELECT * FROM t"),
+        Seq(
+          Row(1, 10, 10),
+          Row(2, 2, 2),
+          Row(3, 3, 3),
+          Row(4, 4, 4),
+          Row(5, 5, 5),
+          Row(6, 6, 6),
+          Row(7, 7, 7),
+          Row(8, 8, 8),
+          Row(9, 9, 9),
+          Row(10, 10, 10)))
+    }
+  }
+
   test("Row Tracking: read row Tracking") {
     withTable("t") {
       sql("CREATE TABLE t (id INT, data STRING) TBLPROPERTIES 
('row-tracking.enabled' = 'true')")


Reply via email to