This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 671762a14b [core] Extract ConflictDetection for FileStoreCommitImpl
671762a14b is described below

commit 671762a14b65e36f36cb8415cee7194f44098d63
Author: JingsongLi <[email protected]>
AuthorDate: Fri Oct 10 14:30:42 2025 +0800

    [core] Extract ConflictDetection for FileStoreCommitImpl
---
 .../java/org/apache/paimon/AbstractFileStore.java  |  15 +-
 .../paimon/operation/FileStoreCommitImpl.java      | 372 +-------------
 .../paimon/operation/commit/ConflictDetection.java | 547 +++++++++++++++++++++
 .../apache/paimon/utils/ConflictDeletionUtils.java | 149 ------
 .../apache/paimon/operation/FileDeletionTest.java  |   2 +-
 .../paimon/operation/FileStoreCommitTest.java      |   2 +-
 .../commit/ConflictDetectionTest.java}             |   9 +-
 7 files changed, 577 insertions(+), 519 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 31918b84cf..2e209fba45 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -41,6 +41,7 @@ import org.apache.paimon.operation.ManifestsReader;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.operation.SnapshotDeletion;
 import org.apache.paimon.operation.TagDeletion;
+import org.apache.paimon.operation.commit.ConflictDetection;
 import org.apache.paimon.partition.PartitionExpireStrategy;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -267,6 +268,16 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
         if (snapshotCommit == null) {
             snapshotCommit = new RenamingSnapshotCommit(snapshotManager, 
Lock.empty());
         }
+        ConflictDetection conflictDetection =
+                new ConflictDetection(
+                        tableName,
+                        commitUser,
+                        partitionType,
+                        pathFactory(),
+                        newKeyComparator(),
+                        bucketMode(),
+                        options.deletionVectorsEnabled(),
+                        newIndexFileHandler());
         return new FileStoreCommitImpl(
                 snapshotCommit,
                 fileIO,
@@ -287,7 +298,6 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 options.manifestFullCompactionThresholdSize(),
                 options.manifestMergeMinCount(),
                 partitionType.getFieldCount() > 0 && 
options.dynamicPartitionOverwrite(),
-                newKeyComparator(),
                 options.branch(),
                 newStatsFileHandler(),
                 bucketMode(),
@@ -299,8 +309,7 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                 options.commitMaxRetryWait(),
                 options.commitStrictModeLastSafeSnapshot().orElse(null),
                 options.rowTrackingEnabled(),
-                options.deletionVectorsEnabled(),
-                newIndexFileHandler());
+                conflictDetection);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 97e67ec5cc..365317a07d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -24,9 +24,7 @@ import org.apache.paimon.Snapshot.CommitKind;
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.SnapshotCommit;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFilePathFactory;
 import org.apache.paimon.manifest.FileEntry;
@@ -41,6 +39,8 @@ import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.manifest.SimpleFileEntry;
+import org.apache.paimon.operation.commit.ConflictDetection;
+import org.apache.paimon.operation.commit.ConflictDetection.ConflictCheck;
 import org.apache.paimon.operation.metrics.CommitMetrics;
 import org.apache.paimon.operation.metrics.CommitStats;
 import org.apache.paimon.options.MemorySize;
@@ -70,9 +70,7 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -82,7 +80,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 
 import static java.util.Collections.emptyList;
@@ -91,13 +88,12 @@ import static 
org.apache.paimon.format.blob.BlobFileFormat.isBlobFile;
 import static org.apache.paimon.manifest.ManifestEntry.recordCount;
 import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd;
 import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
+import static 
org.apache.paimon.operation.commit.ConflictDetection.hasConflictChecked;
+import static 
org.apache.paimon.operation.commit.ConflictDetection.mustConflictCheck;
+import static 
org.apache.paimon.operation.commit.ConflictDetection.noConflictCheck;
 import static 
org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
 import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
-import static 
org.apache.paimon.utils.ConflictDeletionUtils.buildBaseEntriesWithDV;
-import static 
org.apache.paimon.utils.ConflictDeletionUtils.buildDeltaEntriesWithDV;
-import static 
org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
-import static org.apache.paimon.utils.Preconditions.checkState;
 
 /**
  * Default implementation of {@link FileStoreCommit}.
@@ -141,7 +137,6 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     private final MemorySize manifestFullCompactionSize;
     private final int manifestMergeMinCount;
     private final boolean dynamicPartitionOverwrite;
-    @Nullable private final Comparator<InternalRow> keyComparator;
     private final String branchName;
     @Nullable private final Integer manifestReadParallelism;
     private final List<CommitCallback> commitCallbacks;
@@ -154,12 +149,10 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
     @Nullable private Long strictModeLastSafeSnapshot;
     private final InternalRowPartitionComputer partitionComputer;
     private final boolean rowTrackingEnabled;
-    private final boolean deletionVectorsEnabled;
-    private final IndexFileHandler indexFileHandler;
+    private final ConflictDetection conflictDetection;
 
     private boolean ignoreEmptyCommit;
     private CommitMetrics commitMetrics;
-    @Nullable private PartitionExpire partitionExpire;
 
     public FileStoreCommitImpl(
             SnapshotCommit snapshotCommit,
@@ -181,7 +174,6 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             MemorySize manifestFullCompactionSize,
             int manifestMergeMinCount,
             boolean dynamicPartitionOverwrite,
-            @Nullable Comparator<InternalRow> keyComparator,
             String branchName,
             StatsFileHandler statsFileHandler,
             BucketMode bucketMode,
@@ -193,8 +185,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             long commitMaxRetryWait,
             @Nullable Long strictModeLastSafeSnapshot,
             boolean rowTrackingEnabled,
-            boolean deletionVectorsEnabled,
-            IndexFileHandler indexFileHandler) {
+            ConflictDetection conflictDetection) {
         this.snapshotCommit = snapshotCommit;
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
@@ -217,7 +208,6 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         this.manifestFullCompactionSize = manifestFullCompactionSize;
         this.manifestMergeMinCount = manifestMergeMinCount;
         this.dynamicPartitionOverwrite = dynamicPartitionOverwrite;
-        this.keyComparator = keyComparator;
         this.branchName = branchName;
         this.manifestReadParallelism = manifestReadParallelism;
         this.commitCallbacks = commitCallbacks;
@@ -238,8 +228,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         this.statsFileHandler = statsFileHandler;
         this.bucketMode = bucketMode;
         this.rowTrackingEnabled = rowTrackingEnabled;
-        this.deletionVectorsEnabled = deletionVectorsEnabled;
-        this.indexFileHandler = indexFileHandler;
+        this.conflictDetection = conflictDetection;
     }
 
     @Override
@@ -250,7 +239,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
 
     @Override
     public FileStoreCommit withPartitionExpire(PartitionExpire 
partitionExpire) {
-        this.partitionExpire = partitionExpire;
+        this.conflictDetection.withPartitionExpire(partitionExpire);
         return this;
     }
 
@@ -347,7 +336,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                                             appendTableFiles,
                                             compactTableFiles,
                                             appendIndexFiles)));
-                    noConflictsOrFail(
+                    conflictDetection.checkNoConflictsOrFail(
                             latestSnapshot,
                             baseEntries,
                             appendSimpleEntries,
@@ -383,7 +372,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                 // files.
                 if (safeLatestSnapshotId != null) {
                     baseEntries.addAll(appendSimpleEntries);
-                    noConflictsOrFail(
+                    conflictDetection.checkNoConflictsOrFail(
                             latestSnapshot,
                             baseEntries,
                             SimpleFileEntry.from(compactTableFiles),
@@ -1032,7 +1021,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                 baseDataFiles =
                         readAllEntriesFromChangedPartitions(latestSnapshot, 
changedPartitions);
             }
-            noConflictsOrFail(
+            conflictDetection.checkNoConflictsOrFail(
                     latestSnapshot,
                     baseDataFiles,
                     SimpleFileEntry.from(deltaFiles),
@@ -1409,296 +1398,6 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
     }
 
-    private void noConflictsOrFail(
-            Snapshot snapshot,
-            List<SimpleFileEntry> baseEntries,
-            List<SimpleFileEntry> deltaEntries,
-            List<IndexManifestEntry> deltaIndexEntries,
-            CommitKind commitKind) {
-        String baseCommitUser = snapshot.commitUser();
-        if (checkForDeletionVector()) {
-            // Enrich dvName in fileEntry to checker for base ADD dv and delta 
DELETE dv.
-            // For example:
-            // If the base file is <ADD baseFile1, ADD dv1>,
-            // then the delta file must be <DELETE deltaFile1, DELETE dv1>; 
and vice versa,
-            // If the delta file is <DELETE deltaFile2, DELETE dv2>,
-            // then the base file must be <ADD baseFile2, ADD dv2>.
-            try {
-                baseEntries =
-                        buildBaseEntriesWithDV(
-                                baseEntries,
-                                snapshot.indexManifest() == null
-                                        ? Collections.emptyList()
-                                        : 
indexFileHandler.readManifest(snapshot.indexManifest()));
-                deltaEntries =
-                        buildDeltaEntriesWithDV(baseEntries, deltaEntries, 
deltaIndexEntries);
-            } catch (Throwable e) {
-                throw conflictException(commitUser, baseEntries, 
deltaEntries).apply(e);
-            }
-        }
-
-        List<SimpleFileEntry> allEntries = new ArrayList<>(baseEntries);
-        allEntries.addAll(deltaEntries);
-
-        checkBucketKeepSame(baseEntries, deltaEntries, commitKind, allEntries, 
baseCommitUser);
-
-        Function<Throwable, RuntimeException> conflictException =
-                conflictException(baseCommitUser, baseEntries, deltaEntries);
-        Collection<SimpleFileEntry> mergedEntries;
-        try {
-            // merge manifest entries and also check if the files we want to 
delete are still there
-            mergedEntries = FileEntry.mergeEntries(allEntries);
-        } catch (Throwable e) {
-            throw conflictException.apply(e);
-        }
-
-        checkNoDeleteInMergedEntries(mergedEntries, conflictException);
-        checkKeyRangeNoConflicts(baseEntries, deltaEntries, mergedEntries, 
baseCommitUser);
-    }
-
-    private void checkBucketKeepSame(
-            List<SimpleFileEntry> baseEntries,
-            List<SimpleFileEntry> deltaEntries,
-            CommitKind commitKind,
-            List<SimpleFileEntry> allEntries,
-            String baseCommitUser) {
-        if (commitKind == CommitKind.OVERWRITE) {
-            return;
-        }
-
-        // total buckets within the same partition should remain the same
-        Map<BinaryRow, Integer> totalBuckets = new HashMap<>();
-        for (SimpleFileEntry entry : allEntries) {
-            if (entry.totalBuckets() <= 0) {
-                continue;
-            }
-
-            if (!totalBuckets.containsKey(entry.partition())) {
-                totalBuckets.put(entry.partition(), entry.totalBuckets());
-                continue;
-            }
-
-            int old = totalBuckets.get(entry.partition());
-            if (old == entry.totalBuckets()) {
-                continue;
-            }
-
-            Pair<RuntimeException, RuntimeException> conflictException =
-                    createConflictException(
-                            "Total buckets of partition "
-                                    + entry.partition()
-                                    + " changed from "
-                                    + old
-                                    + " to "
-                                    + entry.totalBuckets()
-                                    + " without overwrite. Give up 
committing.",
-                            baseCommitUser,
-                            baseEntries,
-                            deltaEntries,
-                            null);
-            LOG.warn("", conflictException.getLeft());
-            throw conflictException.getRight();
-        }
-    }
-
-    private void checkKeyRangeNoConflicts(
-            List<SimpleFileEntry> baseEntries,
-            List<SimpleFileEntry> deltaEntries,
-            Collection<SimpleFileEntry> mergedEntries,
-            String baseCommitUser) {
-        // fast exit for file store without keys
-        if (keyComparator == null) {
-            return;
-        }
-
-        // group entries by partitions, buckets and levels
-        Map<LevelIdentifier, List<SimpleFileEntry>> levels = new HashMap<>();
-        for (SimpleFileEntry entry : mergedEntries) {
-            int level = entry.level();
-            if (level >= 1) {
-                levels.computeIfAbsent(
-                                new LevelIdentifier(entry.partition(), 
entry.bucket(), level),
-                                lv -> new ArrayList<>())
-                        .add(entry);
-            }
-        }
-
-        // check for all LSM level >= 1, key ranges of files do not intersect
-        for (List<SimpleFileEntry> entries : levels.values()) {
-            entries.sort((a, b) -> keyComparator.compare(a.minKey(), 
b.minKey()));
-            for (int i = 0; i + 1 < entries.size(); i++) {
-                SimpleFileEntry a = entries.get(i);
-                SimpleFileEntry b = entries.get(i + 1);
-                if (keyComparator.compare(a.maxKey(), b.minKey()) >= 0) {
-                    Pair<RuntimeException, RuntimeException> conflictException 
=
-                            createConflictException(
-                                    "LSM conflicts detected! Give up 
committing. Conflict files are:\n"
-                                            + 
a.identifier().toString(pathFactory)
-                                            + "\n"
-                                            + 
b.identifier().toString(pathFactory),
-                                    baseCommitUser,
-                                    baseEntries,
-                                    deltaEntries,
-                                    null);
-
-                    LOG.warn("", conflictException.getLeft());
-                    throw conflictException.getRight();
-                }
-            }
-        }
-    }
-
-    private Function<Throwable, RuntimeException> conflictException(
-            String baseCommitUser,
-            List<SimpleFileEntry> baseEntries,
-            List<SimpleFileEntry> deltaEntries) {
-        return e -> {
-            Pair<RuntimeException, RuntimeException> conflictException =
-                    createConflictException(
-                            "File deletion conflicts detected! Give up 
committing.",
-                            baseCommitUser,
-                            baseEntries,
-                            deltaEntries,
-                            e);
-            LOG.warn("", conflictException.getLeft());
-            return conflictException.getRight();
-        };
-    }
-
-    private boolean checkForDeletionVector() {
-        return deletionVectorsEnabled && 
bucketMode.equals(BucketMode.BUCKET_UNAWARE);
-    }
-
-    private void checkNoDeleteInMergedEntries(
-            Collection<SimpleFileEntry> mergedEntries,
-            Function<Throwable, RuntimeException> exceptionFunction) {
-        try {
-            for (SimpleFileEntry entry : mergedEntries) {
-                checkState(
-                        entry.kind() != FileKind.DELETE,
-                        "Trying to delete file %s for table %s which is not 
previously added.",
-                        entry.fileName(),
-                        tableName);
-            }
-        } catch (Throwable e) {
-            assertConflictForPartitionExpire(mergedEntries);
-            throw exceptionFunction.apply(e);
-        }
-    }
-
-    private void assertConflictForPartitionExpire(Collection<SimpleFileEntry> 
mergedEntries) {
-        if (partitionExpire != null && partitionExpire.isValueExpiration()) {
-            Set<BinaryRow> deletedPartitions = new HashSet<>();
-            for (SimpleFileEntry entry : mergedEntries) {
-                if (entry.kind() == FileKind.DELETE) {
-                    deletedPartitions.add(entry.partition());
-                }
-            }
-            if (partitionExpire.isValueAllExpired(deletedPartitions)) {
-                List<String> expiredPartitions =
-                        deletedPartitions.stream()
-                                .map(
-                                        partition ->
-                                                partToSimpleString(
-                                                        partitionType, 
partition, "-", 200))
-                                .collect(Collectors.toList());
-                throw new RuntimeException(
-                        "You are writing data to expired partitions, and you 
can filter this data to avoid job failover."
-                                + " Otherwise, continuous expired records will 
cause the job to failover restart continuously."
-                                + " Expired partitions are: "
-                                + expiredPartitions);
-            }
-        }
-    }
-
-    /**
-     * Construct detailed conflict exception. The returned exception is formed 
of (full exception,
-     * simplified exception), The simplified exception is generated when the 
entry length is larger
-     * than the max limit.
-     */
-    private Pair<RuntimeException, RuntimeException> createConflictException(
-            String message,
-            String baseCommitUser,
-            List<SimpleFileEntry> baseEntries,
-            List<SimpleFileEntry> changes,
-            Throwable cause) {
-        String possibleCauses =
-                String.join(
-                        "\n",
-                        "Don't panic!",
-                        "Conflicts during commits are normal and this failure 
is intended to resolve the conflicts.",
-                        "Conflicts are mainly caused by the following 
scenarios:",
-                        "1. Multiple jobs are writing into the same partition 
at the same time, "
-                                + "or you use STATEMENT SET to execute 
multiple INSERT statements into the same Paimon table.",
-                        "   You'll probably see different base commit user and 
current commit user below.",
-                        "   You can use "
-                                + 
"https://paimon.apache.org/docs/master/maintenance/dedicated-compaction#dedicated-compaction-job";
-                                + " to support multiple writing.",
-                        "2. You're recovering from an old savepoint, or you're 
creating multiple jobs from a savepoint.",
-                        "   The job will fail continuously in this scenario to 
protect metadata from corruption.",
-                        "   You can either recover from the latest savepoint, "
-                                + "or you can revert the table to the snapshot 
corresponding to the old savepoint.");
-        String commitUserString =
-                "Base commit user is: "
-                        + baseCommitUser
-                        + "; Current commit user is: "
-                        + commitUser;
-        String baseEntriesString =
-                "Base entries are:\n"
-                        + baseEntries.stream()
-                                .map(Object::toString)
-                                .collect(Collectors.joining("\n"));
-        String changesString =
-                "Changes are:\n"
-                        + 
changes.stream().map(Object::toString).collect(Collectors.joining("\n"));
-
-        RuntimeException fullException =
-                new RuntimeException(
-                        message
-                                + "\n\n"
-                                + possibleCauses
-                                + "\n\n"
-                                + commitUserString
-                                + "\n\n"
-                                + baseEntriesString
-                                + "\n\n"
-                                + changesString,
-                        cause);
-
-        RuntimeException simplifiedException;
-        int maxEntry = 50;
-        if (baseEntries.size() > maxEntry || changes.size() > maxEntry) {
-            baseEntriesString =
-                    "Base entries are:\n"
-                            + baseEntries.subList(0, 
Math.min(baseEntries.size(), maxEntry))
-                                    .stream()
-                                    .map(Object::toString)
-                                    .collect(Collectors.joining("\n"));
-            changesString =
-                    "Changes are:\n"
-                            + changes.subList(0, Math.min(changes.size(), 
maxEntry)).stream()
-                                    .map(Object::toString)
-                                    .collect(Collectors.joining("\n"));
-            simplifiedException =
-                    new RuntimeException(
-                            message
-                                    + "\n\n"
-                                    + possibleCauses
-                                    + "\n\n"
-                                    + commitUserString
-                                    + "\n\n"
-                                    + baseEntriesString
-                                    + "\n\n"
-                                    + changesString
-                                    + "\n\n"
-                                    + "The entry list above are not fully 
displayed, please refer to taskmanager.log for more information.",
-                            cause);
-            return Pair.of(fullException, simplifiedException);
-        } else {
-            return Pair.of(fullException, fullException);
-        }
-    }
-
     private void cleanUpNoReuseTmpManifests(
             Pair<String, Long> baseManifestList,
             List<ManifestFileMeta> mergeBeforeManifests,
@@ -1766,53 +1465,6 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         IOUtils.closeQuietly(snapshotCommit);
     }
 
-    private static class LevelIdentifier {
-
-        private final BinaryRow partition;
-        private final int bucket;
-        private final int level;
-
-        private LevelIdentifier(BinaryRow partition, int bucket, int level) {
-            this.partition = partition;
-            this.bucket = bucket;
-            this.level = level;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (!(o instanceof LevelIdentifier)) {
-                return false;
-            }
-            LevelIdentifier that = (LevelIdentifier) o;
-            return Objects.equals(partition, that.partition)
-                    && bucket == that.bucket
-                    && level == that.level;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(partition, bucket, level);
-        }
-    }
-
-    /** Should do conflict check. */
-    interface ConflictCheck {
-        boolean shouldCheck(long latestSnapshot);
-    }
-
-    static ConflictCheck hasConflictChecked(@Nullable Long 
checkedLatestSnapshotId) {
-        return latestSnapshot -> !Objects.equals(latestSnapshot, 
checkedLatestSnapshotId);
-    }
-
-    static ConflictCheck noConflictCheck() {
-        return latestSnapshot -> false;
-    }
-
-    @VisibleForTesting
-    static ConflictCheck mustConflictCheck() {
-        return latestSnapshot -> true;
-    }
-
     private interface CommitResult {
         boolean isSuccess();
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
new file mode 100644
index 0000000000..9739beae88
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.commit;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.Snapshot.CommitKind;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.index.DeletionVectorMeta;
+import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.FileEntry;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.SimpleFileEntry;
+import org.apache.paimon.manifest.SimpleFileEntryWithDV;
+import org.apache.paimon.operation.PartitionExpire;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.Pair;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static 
org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/** Util class for detecting conflicts between base and delta files. */
+public class ConflictDetection {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ConflictDetection.class);
+
+    private final String tableName;
+    private final String commitUser;
+    private final RowType partitionType;
+    private final FileStorePathFactory pathFactory;
+    private final @Nullable Comparator<InternalRow> keyComparator;
+    private final BucketMode bucketMode;
+    private final boolean deletionVectorsEnabled;
+    private final IndexFileHandler indexFileHandler;
+
+    private @Nullable PartitionExpire partitionExpire;
+
+    public ConflictDetection(
+            String tableName,
+            String commitUser,
+            RowType partitionType,
+            FileStorePathFactory pathFactory,
+            @Nullable Comparator<InternalRow> keyComparator,
+            BucketMode bucketMode,
+            boolean deletionVectorsEnabled,
+            IndexFileHandler indexFileHandler) {
+        this.tableName = tableName;
+        this.commitUser = commitUser;
+        this.partitionType = partitionType;
+        this.pathFactory = pathFactory;
+        this.keyComparator = keyComparator;
+        this.bucketMode = bucketMode;
+        this.deletionVectorsEnabled = deletionVectorsEnabled;
+        this.indexFileHandler = indexFileHandler;
+    }
+
+    public void withPartitionExpire(PartitionExpire partitionExpire) {
+        this.partitionExpire = partitionExpire;
+    }
+
+    public void checkNoConflictsOrFail(
+            Snapshot snapshot,
+            List<SimpleFileEntry> baseEntries,
+            List<SimpleFileEntry> deltaEntries,
+            List<IndexManifestEntry> deltaIndexEntries,
+            CommitKind commitKind) {
+        String baseCommitUser = snapshot.commitUser();
+        if (checkForDeletionVector()) {
+            // Enrich dvName in fileEntry to checker for base ADD dv and delta 
DELETE dv.
+            // For example:
+            // If the base file is <ADD baseFile1, ADD dv1>,
+            // then the delta file must be <DELETE deltaFile1, DELETE dv1>; 
and vice versa,
+            // If the delta file is <DELETE deltaFile2, DELETE dv2>,
+            // then the base file must be <ADD baseFile2, ADD dv2>.
+            try {
+                baseEntries =
+                        buildBaseEntriesWithDV(
+                                baseEntries,
+                                snapshot.indexManifest() == null
+                                        ? Collections.emptyList()
+                                        : 
indexFileHandler.readManifest(snapshot.indexManifest()));
+                deltaEntries =
+                        buildDeltaEntriesWithDV(baseEntries, deltaEntries, 
deltaIndexEntries);
+            } catch (Throwable e) {
+                throw conflictException(commitUser, baseEntries, 
deltaEntries).apply(e);
+            }
+        }
+
+        List<SimpleFileEntry> allEntries = new ArrayList<>(baseEntries);
+        allEntries.addAll(deltaEntries);
+
+        checkBucketKeepSame(baseEntries, deltaEntries, commitKind, allEntries, 
baseCommitUser);
+
+        Function<Throwable, RuntimeException> conflictException =
+                conflictException(baseCommitUser, baseEntries, deltaEntries);
+        Collection<SimpleFileEntry> mergedEntries;
+        try {
+            // merge manifest entries and also check if the files we want to 
delete are still there
+            mergedEntries = FileEntry.mergeEntries(allEntries);
+        } catch (Throwable e) {
+            throw conflictException.apply(e);
+        }
+
+        checkNoDeleteInMergedEntries(mergedEntries, conflictException);
+        checkKeyRangeNoConflicts(baseEntries, deltaEntries, mergedEntries, 
baseCommitUser);
+    }
+
+    private void checkBucketKeepSame(
+            List<SimpleFileEntry> baseEntries,
+            List<SimpleFileEntry> deltaEntries,
+            CommitKind commitKind,
+            List<SimpleFileEntry> allEntries,
+            String baseCommitUser) {
+        if (commitKind == CommitKind.OVERWRITE) {
+            return;
+        }
+
+        // total buckets within the same partition should remain the same
+        Map<BinaryRow, Integer> totalBuckets = new HashMap<>();
+        for (SimpleFileEntry entry : allEntries) {
+            if (entry.totalBuckets() <= 0) {
+                continue;
+            }
+
+            if (!totalBuckets.containsKey(entry.partition())) {
+                totalBuckets.put(entry.partition(), entry.totalBuckets());
+                continue;
+            }
+
+            int old = totalBuckets.get(entry.partition());
+            if (old == entry.totalBuckets()) {
+                continue;
+            }
+
+            Pair<RuntimeException, RuntimeException> conflictException =
+                    createConflictException(
+                            "Total buckets of partition "
+                                    + entry.partition()
+                                    + " changed from "
+                                    + old
+                                    + " to "
+                                    + entry.totalBuckets()
+                                    + " without overwrite. Give up 
committing.",
+                            baseCommitUser,
+                            baseEntries,
+                            deltaEntries,
+                            null);
+            LOG.warn("", conflictException.getLeft());
+            throw conflictException.getRight();
+        }
+    }
+
+    private void checkKeyRangeNoConflicts(
+            List<SimpleFileEntry> baseEntries,
+            List<SimpleFileEntry> deltaEntries,
+            Collection<SimpleFileEntry> mergedEntries,
+            String baseCommitUser) {
+        // fast exit for file store without keys
+        if (keyComparator == null) {
+            return;
+        }
+
+        // group entries by partitions, buckets and levels
+        Map<LevelIdentifier, List<SimpleFileEntry>> levels = new HashMap<>();
+        for (SimpleFileEntry entry : mergedEntries) {
+            int level = entry.level();
+            if (level >= 1) {
+                levels.computeIfAbsent(
+                                new LevelIdentifier(entry.partition(), 
entry.bucket(), level),
+                                lv -> new ArrayList<>())
+                        .add(entry);
+            }
+        }
+
+        // check for all LSM level >= 1, key ranges of files do not intersect
+        for (List<SimpleFileEntry> entries : levels.values()) {
+            entries.sort((a, b) -> keyComparator.compare(a.minKey(), 
b.minKey()));
+            for (int i = 0; i + 1 < entries.size(); i++) {
+                SimpleFileEntry a = entries.get(i);
+                SimpleFileEntry b = entries.get(i + 1);
+                if (keyComparator.compare(a.maxKey(), b.minKey()) >= 0) {
+                    Pair<RuntimeException, RuntimeException> conflictException 
=
+                            createConflictException(
+                                    "LSM conflicts detected! Give up 
committing. Conflict files are:\n"
+                                            + 
a.identifier().toString(pathFactory)
+                                            + "\n"
+                                            + 
b.identifier().toString(pathFactory),
+                                    baseCommitUser,
+                                    baseEntries,
+                                    deltaEntries,
+                                    null);
+
+                    LOG.warn("", conflictException.getLeft());
+                    throw conflictException.getRight();
+                }
+            }
+        }
+    }
+
+    private Function<Throwable, RuntimeException> conflictException(
+            String baseCommitUser,
+            List<SimpleFileEntry> baseEntries,
+            List<SimpleFileEntry> deltaEntries) {
+        return e -> {
+            Pair<RuntimeException, RuntimeException> conflictException =
+                    createConflictException(
+                            "File deletion conflicts detected! Give up 
committing.",
+                            baseCommitUser,
+                            baseEntries,
+                            deltaEntries,
+                            e);
+            LOG.warn("", conflictException.getLeft());
+            return conflictException.getRight();
+        };
+    }
+
+    private boolean checkForDeletionVector() {
+        return deletionVectorsEnabled && 
bucketMode.equals(BucketMode.BUCKET_UNAWARE);
+    }
+
+    private void checkNoDeleteInMergedEntries(
+            Collection<SimpleFileEntry> mergedEntries,
+            Function<Throwable, RuntimeException> exceptionFunction) {
+        try {
+            for (SimpleFileEntry entry : mergedEntries) {
+                checkState(
+                        entry.kind() != FileKind.DELETE,
+                        "Trying to delete file %s for table %s which is not 
previously added.",
+                        entry.fileName(),
+                        tableName);
+            }
+        } catch (Throwable e) {
+            assertConflictForPartitionExpire(mergedEntries);
+            throw exceptionFunction.apply(e);
+        }
+    }
+
+    private void assertConflictForPartitionExpire(Collection<SimpleFileEntry> 
mergedEntries) {
+        if (partitionExpire != null && partitionExpire.isValueExpiration()) {
+            Set<BinaryRow> deletedPartitions = new HashSet<>();
+            for (SimpleFileEntry entry : mergedEntries) {
+                if (entry.kind() == FileKind.DELETE) {
+                    deletedPartitions.add(entry.partition());
+                }
+            }
+            if (partitionExpire.isValueAllExpired(deletedPartitions)) {
+                List<String> expiredPartitions =
+                        deletedPartitions.stream()
+                                .map(
+                                        partition ->
+                                                partToSimpleString(
+                                                        partitionType, 
partition, "-", 200))
+                                .collect(Collectors.toList());
+                throw new RuntimeException(
+                        "You are writing data to expired partitions, and you 
can filter this data to avoid job failover."
+                                + " Otherwise, continuous expired records will 
cause the job to failover restart continuously."
+                                + " Expired partitions are: "
+                                + expiredPartitions);
+            }
+        }
+    }
+
+    static List<SimpleFileEntry> buildBaseEntriesWithDV(
+            List<SimpleFileEntry> baseEntries, List<IndexManifestEntry> 
baseIndexEntries) {
+        if (baseEntries.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        Map<String, String> fileNameToDVFileName = new HashMap<>();
+        for (IndexManifestEntry indexManifestEntry : baseIndexEntries) {
+            // Should not attach DELETE type dv index for base file.
+            if (!indexManifestEntry.kind().equals(FileKind.DELETE)) {
+                IndexFileMeta indexFile = indexManifestEntry.indexFile();
+                LinkedHashMap<String, DeletionVectorMeta> dvRanges = 
indexFile.dvRanges();
+                if (dvRanges != null) {
+                    for (DeletionVectorMeta value : dvRanges.values()) {
+                        checkState(
+                                
!fileNameToDVFileName.containsKey(value.dataFileName()),
+                                "One file should correspond to only one dv 
entry.");
+                        fileNameToDVFileName.put(value.dataFileName(), 
indexFile.fileName());
+                    }
+                }
+            }
+        }
+
+        // Attach dv name to file entries.
+        List<SimpleFileEntry> entriesWithDV = new 
ArrayList<>(baseEntries.size());
+        for (SimpleFileEntry fileEntry : baseEntries) {
+            entriesWithDV.add(
+                    new SimpleFileEntryWithDV(
+                            fileEntry, 
fileNameToDVFileName.get(fileEntry.fileName())));
+        }
+        return entriesWithDV;
+    }
+
+    static List<SimpleFileEntry> buildDeltaEntriesWithDV(
+            List<SimpleFileEntry> baseEntries,
+            List<SimpleFileEntry> deltaEntries,
+            List<IndexManifestEntry> deltaIndexEntries) {
+        if (deltaEntries.isEmpty() && deltaIndexEntries.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        List<SimpleFileEntry> entriesWithDV = new 
ArrayList<>(deltaEntries.size());
+
+        // One file may correspond to more than one dv entries, for example, 
delete the old dv, and
+        // create a new one.
+        Map<String, List<IndexManifestEntry>> fileNameToDVEntry = new 
HashMap<>();
+        for (IndexManifestEntry deltaIndexEntry : deltaIndexEntries) {
+            LinkedHashMap<String, DeletionVectorMeta> dvRanges =
+                    deltaIndexEntry.indexFile().dvRanges();
+            if (dvRanges != null) {
+                for (DeletionVectorMeta meta : dvRanges.values()) {
+                    fileNameToDVEntry.putIfAbsent(meta.dataFileName(), new 
ArrayList<>());
+                    
fileNameToDVEntry.get(meta.dataFileName()).add(deltaIndexEntry);
+                }
+            }
+        }
+
+        Set<String> fileNotInDeltaEntries = new 
HashSet<>(fileNameToDVEntry.keySet());
+        // 1. Attach dv name to delta file entries.
+        for (SimpleFileEntry fileEntry : deltaEntries) {
+            if (fileNameToDVEntry.containsKey(fileEntry.fileName())) {
+                List<IndexManifestEntry> dvs = 
fileNameToDVEntry.get(fileEntry.fileName());
+                checkState(dvs.size() == 1, "Delta entry only can have one dv 
file");
+                entriesWithDV.add(
+                        new SimpleFileEntryWithDV(fileEntry, 
dvs.get(0).indexFile().fileName()));
+                fileNotInDeltaEntries.remove(fileEntry.fileName());
+            } else {
+                entriesWithDV.add(new SimpleFileEntryWithDV(fileEntry, null));
+            }
+        }
+
+        // 2. For file not in delta entries, build entry with dv with 
baseEntries.
+        if (!fileNotInDeltaEntries.isEmpty()) {
+            Map<String, SimpleFileEntry> fileNameToFileEntry = new HashMap<>();
+            for (SimpleFileEntry baseEntry : baseEntries) {
+                if (baseEntry.kind().equals(FileKind.ADD)) {
+                    fileNameToFileEntry.put(baseEntry.fileName(), baseEntry);
+                }
+            }
+
+            for (String fileName : fileNotInDeltaEntries) {
+                SimpleFileEntryWithDV simpleFileEntry =
+                        (SimpleFileEntryWithDV) 
fileNameToFileEntry.get(fileName);
+                checkState(
+                        simpleFileEntry != null,
+                        String.format(
+                                "Trying to create deletion vector on file %s 
which is not previously added.",
+                                fileName));
+                List<IndexManifestEntry> dvEntries = 
fileNameToDVEntry.get(fileName);
+                // If dv entry's type id DELETE, add DELETE<f, dv>
+                // If dv entry's type id ADD, add ADD<f, dv>
+                for (IndexManifestEntry dvEntry : dvEntries) {
+                    entriesWithDV.add(
+                            new SimpleFileEntryWithDV(
+                                    dvEntry.kind().equals(FileKind.ADD)
+                                            ? simpleFileEntry
+                                            : simpleFileEntry.toDelete(),
+                                    dvEntry.indexFile().fileName()));
+                }
+
+                // If one file correspond to only one dv entry and the type is 
ADD,
+                // we need to add a DELETE<f, null>.
+                // This happens when create a dv for a file that doesn't have 
dv before.
+                if (dvEntries.size() == 1 && 
dvEntries.get(0).kind().equals(FileKind.ADD)) {
+                    entriesWithDV.add(new 
SimpleFileEntryWithDV(simpleFileEntry.toDelete(), null));
+                }
+            }
+        }
+
+        return entriesWithDV;
+    }
+
+    /**
+     * Construct detailed conflict exception. The returned exception is formed 
of (full exception,
+     * simplified exception), The simplified exception is generated when the 
entry length is larger
+     * than the max limit.
+     */
+    private Pair<RuntimeException, RuntimeException> createConflictException(
+            String message,
+            String baseCommitUser,
+            List<SimpleFileEntry> baseEntries,
+            List<SimpleFileEntry> changes,
+            Throwable cause) {
+        String possibleCauses =
+                String.join(
+                        "\n",
+                        "Don't panic!",
+                        "Conflicts during commits are normal and this failure 
is intended to resolve the conflicts.",
+                        "Conflicts are mainly caused by the following 
scenarios:",
+                        "1. Multiple jobs are writing into the same partition 
at the same time, "
+                                + "or you use STATEMENT SET to execute 
multiple INSERT statements into the same Paimon table.",
+                        "   You'll probably see different base commit user and 
current commit user below.",
+                        "   You can use "
+                                + 
"https://paimon.apache.org/docs/master/maintenance/dedicated-compaction#dedicated-compaction-job";
+                                + " to support multiple writing.",
+                        "2. You're recovering from an old savepoint, or you're 
creating multiple jobs from a savepoint.",
+                        "   The job will fail continuously in this scenario to 
protect metadata from corruption.",
+                        "   You can either recover from the latest savepoint, "
+                                + "or you can revert the table to the snapshot 
corresponding to the old savepoint.");
+        String commitUserString =
+                "Base commit user is: "
+                        + baseCommitUser
+                        + "; Current commit user is: "
+                        + commitUser;
+        String baseEntriesString =
+                "Base entries are:\n"
+                        + baseEntries.stream()
+                                .map(Object::toString)
+                                .collect(Collectors.joining("\n"));
+        String changesString =
+                "Changes are:\n"
+                        + 
changes.stream().map(Object::toString).collect(Collectors.joining("\n"));
+
+        RuntimeException fullException =
+                new RuntimeException(
+                        message
+                                + "\n\n"
+                                + possibleCauses
+                                + "\n\n"
+                                + commitUserString
+                                + "\n\n"
+                                + baseEntriesString
+                                + "\n\n"
+                                + changesString,
+                        cause);
+
+        RuntimeException simplifiedException;
+        int maxEntry = 50;
+        if (baseEntries.size() > maxEntry || changes.size() > maxEntry) {
+            baseEntriesString =
+                    "Base entries are:\n"
+                            + baseEntries.subList(0, 
Math.min(baseEntries.size(), maxEntry))
+                                    .stream()
+                                    .map(Object::toString)
+                                    .collect(Collectors.joining("\n"));
+            changesString =
+                    "Changes are:\n"
+                            + changes.subList(0, Math.min(changes.size(), 
maxEntry)).stream()
+                                    .map(Object::toString)
+                                    .collect(Collectors.joining("\n"));
+            simplifiedException =
+                    new RuntimeException(
+                            message
+                                    + "\n\n"
+                                    + possibleCauses
+                                    + "\n\n"
+                                    + commitUserString
+                                    + "\n\n"
+                                    + baseEntriesString
+                                    + "\n\n"
+                                    + changesString
+                                    + "\n\n"
+                                    + "The entry list above are not fully 
displayed, please refer to taskmanager.log for more information.",
+                            cause);
+            return Pair.of(fullException, simplifiedException);
+        } else {
+            return Pair.of(fullException, fullException);
+        }
+    }
+
+    private static class LevelIdentifier {
+
+        private final BinaryRow partition;
+        private final int bucket;
+        private final int level;
+
+        private LevelIdentifier(BinaryRow partition, int bucket, int level) {
+            this.partition = partition;
+            this.bucket = bucket;
+            this.level = level;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof LevelIdentifier)) {
+                return false;
+            }
+            LevelIdentifier that = (LevelIdentifier) o;
+            return Objects.equals(partition, that.partition)
+                    && bucket == that.bucket
+                    && level == that.level;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(partition, bucket, level);
+        }
+    }
+
+    /** Should do conflict check. */
+    public interface ConflictCheck {
+        boolean shouldCheck(long latestSnapshot);
+    }
+
+    public static ConflictCheck hasConflictChecked(@Nullable Long 
checkedLatestSnapshotId) {
+        return latestSnapshot -> !Objects.equals(latestSnapshot, 
checkedLatestSnapshotId);
+    }
+
+    public static ConflictCheck noConflictCheck() {
+        return latestSnapshot -> false;
+    }
+
+    public static ConflictCheck mustConflictCheck() {
+        return latestSnapshot -> true;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ConflictDeletionUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ConflictDeletionUtils.java
deleted file mode 100644
index 00942ea048..0000000000
--- 
a/paimon-core/src/main/java/org/apache/paimon/utils/ConflictDeletionUtils.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.utils;
-
-import org.apache.paimon.index.DeletionVectorMeta;
-import org.apache.paimon.index.IndexFileMeta;
-import org.apache.paimon.manifest.FileKind;
-import org.apache.paimon.manifest.IndexManifestEntry;
-import org.apache.paimon.manifest.SimpleFileEntry;
-import org.apache.paimon.manifest.SimpleFileEntryWithDV;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.apache.paimon.utils.Preconditions.checkState;
-
-/** Utils for conflict deletion. */
-public class ConflictDeletionUtils {
-
-    public static List<SimpleFileEntry> buildBaseEntriesWithDV(
-            List<SimpleFileEntry> baseEntries, List<IndexManifestEntry> 
baseIndexEntries) {
-        if (baseEntries.isEmpty()) {
-            return Collections.emptyList();
-        }
-
-        Map<String, String> fileNameToDVFileName = new HashMap<>();
-        for (IndexManifestEntry indexManifestEntry : baseIndexEntries) {
-            // Should not attach DELETE type dv index for base file.
-            if (!indexManifestEntry.kind().equals(FileKind.DELETE)) {
-                IndexFileMeta indexFile = indexManifestEntry.indexFile();
-                if (indexFile.dvRanges() != null) {
-                    for (DeletionVectorMeta value : 
indexFile.dvRanges().values()) {
-                        checkState(
-                                
!fileNameToDVFileName.containsKey(value.dataFileName()),
-                                "One file should correspond to only one dv 
entry.");
-                        fileNameToDVFileName.put(value.dataFileName(), 
indexFile.fileName());
-                    }
-                }
-            }
-        }
-
-        // Attach dv name to file entries.
-        List<SimpleFileEntry> entriesWithDV = new 
ArrayList<>(baseEntries.size());
-        for (SimpleFileEntry fileEntry : baseEntries) {
-            entriesWithDV.add(
-                    new SimpleFileEntryWithDV(
-                            fileEntry, 
fileNameToDVFileName.get(fileEntry.fileName())));
-        }
-        return entriesWithDV;
-    }
-
-    public static List<SimpleFileEntry> buildDeltaEntriesWithDV(
-            List<SimpleFileEntry> baseEntries,
-            List<SimpleFileEntry> deltaEntries,
-            List<IndexManifestEntry> deltaIndexEntries) {
-        if (deltaEntries.isEmpty() && deltaIndexEntries.isEmpty()) {
-            return Collections.emptyList();
-        }
-
-        List<SimpleFileEntry> entriesWithDV = new 
ArrayList<>(deltaEntries.size());
-
-        // One file may correspond to more than one dv entries, for example, 
delete the old dv, and
-        // create a new one.
-        Map<String, List<IndexManifestEntry>> fileNameToDVEntry = new 
HashMap<>();
-        for (IndexManifestEntry deltaIndexEntry : deltaIndexEntries) {
-            if (deltaIndexEntry.indexFile().dvRanges() != null) {
-                for (DeletionVectorMeta meta : 
deltaIndexEntry.indexFile().dvRanges().values()) {
-                    fileNameToDVEntry.putIfAbsent(meta.dataFileName(), new 
ArrayList<>());
-                    
fileNameToDVEntry.get(meta.dataFileName()).add(deltaIndexEntry);
-                }
-            }
-        }
-
-        Set<String> fileNotInDeltaEntries = new 
HashSet<>(fileNameToDVEntry.keySet());
-        // 1. Attach dv name to delta file entries.
-        for (SimpleFileEntry fileEntry : deltaEntries) {
-            if (fileNameToDVEntry.containsKey(fileEntry.fileName())) {
-                List<IndexManifestEntry> dvs = 
fileNameToDVEntry.get(fileEntry.fileName());
-                checkState(dvs.size() == 1, "Delta entry only can have one dv 
file");
-                entriesWithDV.add(
-                        new SimpleFileEntryWithDV(fileEntry, 
dvs.get(0).indexFile().fileName()));
-                fileNotInDeltaEntries.remove(fileEntry.fileName());
-            } else {
-                entriesWithDV.add(new SimpleFileEntryWithDV(fileEntry, null));
-            }
-        }
-
-        // 2. For file not in delta entries, build entry with dv with 
baseEntries.
-        if (!fileNotInDeltaEntries.isEmpty()) {
-            Map<String, SimpleFileEntry> fileNameToFileEntry = new HashMap<>();
-            for (SimpleFileEntry baseEntry : baseEntries) {
-                if (baseEntry.kind().equals(FileKind.ADD)) {
-                    fileNameToFileEntry.put(baseEntry.fileName(), baseEntry);
-                }
-            }
-
-            for (String fileName : fileNotInDeltaEntries) {
-                SimpleFileEntryWithDV simpleFileEntry =
-                        (SimpleFileEntryWithDV) 
fileNameToFileEntry.get(fileName);
-                checkState(
-                        simpleFileEntry != null,
-                        String.format(
-                                "Trying to create deletion vector on file %s 
which is not previously added.",
-                                fileName));
-                List<IndexManifestEntry> dvEntries = 
fileNameToDVEntry.get(fileName);
-                // If dv entry's type id DELETE, add DELETE<f, dv>
-                // If dv entry's type id ADD, add ADD<f, dv>
-                for (IndexManifestEntry dvEntry : dvEntries) {
-                    entriesWithDV.add(
-                            new SimpleFileEntryWithDV(
-                                    dvEntry.kind().equals(FileKind.ADD)
-                                            ? simpleFileEntry
-                                            : simpleFileEntry.toDelete(),
-                                    dvEntry.indexFile().fileName()));
-                }
-
-                // If one file correspond to only one dv entry and the type is 
ADD,
-                // we need to add a DELETE<f, null>.
-                // This happens when create a dv for a file that doesn't have 
dv before.
-                if (dvEntries.size() == 1 && 
dvEntries.get(0).kind().equals(FileKind.ADD)) {
-                    entriesWithDV.add(new 
SimpleFileEntryWithDV(simpleFileEntry.toDelete(), null));
-                }
-            }
-        }
-
-        return entriesWithDV;
-    }
-}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
index 9d17593c11..984424610d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java
@@ -72,12 +72,12 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.SNAPSHOT_CLEAN_EMPTY_DIRECTORIES;
-import static 
org.apache.paimon.operation.FileStoreCommitImpl.mustConflictCheck;
 import static 
org.apache.paimon.operation.FileStoreTestUtils.assertNFilesExists;
 import static org.apache.paimon.operation.FileStoreTestUtils.assertPathExists;
 import static 
org.apache.paimon.operation.FileStoreTestUtils.assertPathNotExists;
 import static org.apache.paimon.operation.FileStoreTestUtils.commitData;
 import static org.apache.paimon.operation.FileStoreTestUtils.partitionedData;
+import static 
org.apache.paimon.operation.commit.ConflictDetection.mustConflictCheck;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /**
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index c4bcc74f56..2c1f6c50ea 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -84,7 +84,7 @@ import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
-import static 
org.apache.paimon.operation.FileStoreCommitImpl.mustConflictCheck;
+import static 
org.apache.paimon.operation.commit.ConflictDetection.mustConflictCheck;
 import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS;
 import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/ConflictDeletionUtilsTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
similarity index 98%
rename from 
paimon-core/src/test/java/org/apache/paimon/utils/ConflictDeletionUtilsTest.java
rename to 
paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
index 34e6e59e7c..e75e4d0358 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/utils/ConflictDeletionUtilsTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/commit/ConflictDetectionTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.utils;
+package org.apache.paimon.operation.commit;
 
 import org.apache.paimon.index.DeletionVectorMeta;
 import org.apache.paimon.index.IndexFileMeta;
@@ -41,12 +41,11 @@ import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static org.apache.paimon.manifest.FileKind.ADD;
 import static org.apache.paimon.manifest.FileKind.DELETE;
-import static 
org.apache.paimon.utils.ConflictDeletionUtils.buildBaseEntriesWithDV;
-import static 
org.apache.paimon.utils.ConflictDeletionUtils.buildDeltaEntriesWithDV;
+import static 
org.apache.paimon.operation.commit.ConflictDetection.buildBaseEntriesWithDV;
+import static 
org.apache.paimon.operation.commit.ConflictDetection.buildDeltaEntriesWithDV;
 import static org.assertj.core.api.Assertions.assertThat;
 
-/** Tests for {@link ConflictDeletionUtils}. */
-public class ConflictDeletionUtilsTest {
+class ConflictDetectionTest {
 
     @Test
     public void testBuildBaseEntriesWithDV() {

Reply via email to