This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit ded6aebad92457dbf094076920ac19f5da253d74
Author: JingsongLi <[email protected]>
AuthorDate: Tue Dec 30 16:58:47 2025 +0800

    [core] Extract StrictModeChecker from ConflictDetection in Commit
---
 .../java/org/apache/paimon/AbstractFileStore.java  |  14 ++-
 .../paimon/operation/FileStoreCommitImpl.java      |  20 ++--
 .../paimon/operation/commit/ConflictDetection.java |  63 +-----------
 .../paimon/operation/commit/StrictModeChecker.java | 114 +++++++++++++++++++++
 .../apache/paimon/table/sink/TableCommitTest.java  |  54 +++++-----
 5 files changed, 159 insertions(+), 106 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 4eea07a7e9..6944c0283f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -44,6 +44,7 @@ import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.operation.SnapshotDeletion;
 import org.apache.paimon.operation.TagDeletion;
 import org.apache.paimon.operation.commit.ConflictDetection;
+import org.apache.paimon.operation.commit.StrictModeChecker;
 import org.apache.paimon.partition.PartitionExpireStrategy;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -279,8 +280,13 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
                         newKeyComparator(),
                         bucketMode(),
                         options.deletionVectorsEnabled(),
-                        newIndexFileHandler(),
-                        newScan());
+                        newIndexFileHandler());
+        StrictModeChecker strictModeChecker =
+                StrictModeChecker.create(
+                        snapshotManager,
+                        commitUser,
+                        this::newScan,
+                        
options.commitStrictModeLastSafeSnapshot().orElse(null));
         return new FileStoreCommitImpl(
                 snapshotCommit,
                 fileIO,
@@ -310,10 +316,10 @@ abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 options.commitTimeout(),
                 options.commitMinRetryWait(),
                 options.commitMaxRetryWait(),
-                options.commitStrictModeLastSafeSnapshot().orElse(null),
                 options.rowTrackingEnabled(),
                 options.commitDiscardDuplicateFiles(),
-                conflictDetection);
+                conflictDetection,
+                strictModeChecker);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index aa47e69d5e..9933e40250 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -50,6 +50,7 @@ import 
org.apache.paimon.operation.commit.ConflictDetection.ConflictCheck;
 import org.apache.paimon.operation.commit.ManifestEntryChanges;
 import org.apache.paimon.operation.commit.RetryCommitResult;
 import 
org.apache.paimon.operation.commit.RowTrackingCommitUtils.RowTrackingAssigned;
+import org.apache.paimon.operation.commit.StrictModeChecker;
 import org.apache.paimon.operation.commit.SuccessCommitResult;
 import org.apache.paimon.operation.metrics.CommitMetrics;
 import org.apache.paimon.operation.metrics.CommitStats;
@@ -158,10 +159,10 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
     private final long commitMinRetryWait;
     private final long commitMaxRetryWait;
     private final int commitMaxRetries;
-    @Nullable private Long strictModeLastSafeSnapshot;
     private final InternalRowPartitionComputer partitionComputer;
     private final boolean rowTrackingEnabled;
     private final boolean discardDuplicateFiles;
+    @Nullable private final StrictModeChecker strictModeChecker;
     private final ConflictDetection conflictDetection;
     private final CommitCleaner commitCleaner;
 
@@ -198,10 +199,10 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             long commitTimeout,
             long commitMinRetryWait,
             long commitMaxRetryWait,
-            @Nullable Long strictModeLastSafeSnapshot,
             boolean rowTrackingEnabled,
             boolean discardDuplicateFiles,
-            ConflictDetection conflictDetection) {
+            ConflictDetection conflictDetection,
+            @Nullable StrictModeChecker strictModeChecker) {
         this.snapshotCommit = snapshotCommit;
         this.fileIO = fileIO;
         this.schemaManager = schemaManager;
@@ -228,7 +229,6 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         this.commitTimeout = commitTimeout;
         this.commitMinRetryWait = commitMinRetryWait;
         this.commitMaxRetryWait = commitMaxRetryWait;
-        this.strictModeLastSafeSnapshot = strictModeLastSafeSnapshot;
         this.partitionComputer =
                 new InternalRowPartitionComputer(
                         options.partitionDefaultName(),
@@ -241,6 +241,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
         this.bucketMode = bucketMode;
         this.rowTrackingEnabled = rowTrackingEnabled;
         this.discardDuplicateFiles = discardDuplicateFiles;
+        this.strictModeChecker = strictModeChecker;
         this.conflictDetection = conflictDetection;
         this.commitCleaner = new CommitCleaner(manifestList, manifestFile, 
indexManifestFile);
     }
@@ -873,10 +874,9 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             }
         }
 
-        if (strictModeLastSafeSnapshot != null && strictModeLastSafeSnapshot 
>= 0) {
-            conflictDetection.commitStrictModeCheck(
-                    strictModeLastSafeSnapshot, newSnapshotId, commitKind, 
snapshotManager);
-            strictModeLastSafeSnapshot = newSnapshotId - 1;
+        if (strictModeChecker != null) {
+            strictModeChecker.check(newSnapshotId, commitKind);
+            strictModeChecker.update(newSnapshotId - 1);
         }
 
         if (LOG.isDebugEnabled()) {
@@ -1084,8 +1084,8 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                 commitUser,
                 identifier,
                 commitKind.name());
-        if (strictModeLastSafeSnapshot != null) {
-            strictModeLastSafeSnapshot = newSnapshot.id();
+        if (strictModeChecker != null) {
+            strictModeChecker.update(newSnapshotId);
         }
         final List<SimpleFileEntry> finalBaseFiles = baseDataFiles;
         final List<ManifestEntry> finalDeltaFiles = deltaFiles;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
index ae58cba819..801bba4817 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
@@ -18,7 +18,6 @@
 
 package org.apache.paimon.operation.commit;
 
-import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.Snapshot.CommitKind;
 import org.apache.paimon.data.BinaryRow;
@@ -29,17 +28,13 @@ import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.manifest.FileEntry;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.IndexManifestEntry;
-import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.SimpleFileEntry;
 import org.apache.paimon.manifest.SimpleFileEntryWithDV;
-import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.table.BucketMode;
-import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -52,7 +47,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -77,7 +71,6 @@ public class ConflictDetection {
     private final BucketMode bucketMode;
     private final boolean deletionVectorsEnabled;
     private final IndexFileHandler indexFileHandler;
-    private final FileStoreScan scan;
 
     private @Nullable PartitionExpire partitionExpire;
 
@@ -89,8 +82,7 @@ public class ConflictDetection {
             @Nullable Comparator<InternalRow> keyComparator,
             BucketMode bucketMode,
             boolean deletionVectorsEnabled,
-            IndexFileHandler indexFileHandler,
-            FileStoreScan scan) {
+            IndexFileHandler indexFileHandler) {
         this.tableName = tableName;
         this.commitUser = commitUser;
         this.partitionType = partitionType;
@@ -99,7 +91,6 @@ public class ConflictDetection {
         this.bucketMode = bucketMode;
         this.deletionVectorsEnabled = deletionVectorsEnabled;
         this.indexFileHandler = indexFileHandler;
-        this.scan = scan;
     }
 
     @Nullable
@@ -111,58 +102,6 @@ public class ConflictDetection {
         this.partitionExpire = partitionExpire;
     }
 
-    public void commitStrictModeCheck(
-            @Nullable Long strictModeLastSafeSnapshot,
-            long newSnapshotId,
-            CommitKind newCommitKind,
-            SnapshotManager snapshotManager) {
-        if (strictModeLastSafeSnapshot == null || strictModeLastSafeSnapshot < 
0) {
-            return;
-        }
-
-        for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId; 
id++) {
-            Snapshot snapshot = snapshotManager.snapshot(id);
-            if (snapshot.commitUser().equals(commitUser)) {
-                continue;
-            }
-            if (snapshot.commitKind() == CommitKind.COMPACT
-                    || snapshot.commitKind() == CommitKind.OVERWRITE) {
-                throw new RuntimeException(
-                        String.format(
-                                "When trying to commit snapshot %d, "
-                                        + "commit user %s has found a %s 
snapshot (id: %d) by another user %s. "
-                                        + "Giving up committing as %s is set.",
-                                newSnapshotId,
-                                commitUser,
-                                snapshot.commitKind().name(),
-                                id,
-                                snapshot.commitUser(),
-                                
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key()));
-            }
-            if (snapshot.commitKind() == CommitKind.APPEND
-                    && newCommitKind == CommitKind.OVERWRITE) {
-                Iterator<ManifestEntry> entries =
-                        scan.withSnapshot(snapshot)
-                                .withKind(ScanMode.DELTA)
-                                .onlyReadRealBuckets()
-                                .dropStats()
-                                .readFileIterator();
-                if (entries.hasNext()) {
-                    throw new RuntimeException(
-                            String.format(
-                                    "When trying to commit snapshot %d, "
-                                            + "commit user %s has found a 
APPEND snapshot (id: %d) by another user %s "
-                                            + "which committed files to fixed 
bucket. Giving up committing as %s is set.",
-                                    newSnapshotId,
-                                    commitUser,
-                                    id,
-                                    snapshot.commitUser(),
-                                    
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key()));
-                }
-            }
-        }
-    }
-
     public void checkNoConflictsOrFail(
             Snapshot snapshot,
             List<SimpleFileEntry> baseEntries,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
new file mode 100644
index 0000000000..e2d76c2327
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/StrictModeChecker.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation.commit;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.Snapshot.CommitKind;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.table.source.ScanMode;
+import org.apache.paimon.utils.SnapshotManager;
+
+import javax.annotation.Nullable;
+
+import java.util.Iterator;
+import java.util.function.Supplier;
+
+/** A checker to check strict mode based on last safe snapshot. */
+public class StrictModeChecker {
+
+    private final SnapshotManager snapshotManager;
+    private final String commitUser;
+    private final FileStoreScan scan;
+
+    private long strictModeLastSafeSnapshot;
+
+    public StrictModeChecker(
+            SnapshotManager snapshotManager,
+            String commitUser,
+            FileStoreScan scan,
+            long strictModeLastSafeSnapshot) {
+        this.snapshotManager = snapshotManager;
+        this.commitUser = commitUser;
+        this.scan = scan;
+        this.strictModeLastSafeSnapshot = strictModeLastSafeSnapshot;
+    }
+
+    @Nullable
+    public static StrictModeChecker create(
+            SnapshotManager snapshotManager,
+            String commitUser,
+            Supplier<FileStoreScan> scanSupplier,
+            @Nullable Long strictModeLastSafeSnapshot) {
+        if (strictModeLastSafeSnapshot == null) {
+            return null;
+        }
+        return new StrictModeChecker(
+                snapshotManager, commitUser, scanSupplier.get(), 
strictModeLastSafeSnapshot);
+    }
+
+    public void check(long newSnapshotId, CommitKind newCommitKind) {
+        for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId; 
id++) {
+            Snapshot snapshot = snapshotManager.snapshot(id);
+            if (snapshot.commitUser().equals(commitUser)) {
+                continue;
+            }
+            if (snapshot.commitKind() == CommitKind.COMPACT
+                    || snapshot.commitKind() == CommitKind.OVERWRITE) {
+                throw new RuntimeException(
+                        String.format(
+                                "When trying to commit snapshot %d, "
+                                        + "commit user %s has found a %s 
snapshot (id: %d) by another user %s. "
+                                        + "Giving up committing as %s is set.",
+                                newSnapshotId,
+                                commitUser,
+                                snapshot.commitKind().name(),
+                                id,
+                                snapshot.commitUser(),
+                                
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key()));
+            }
+            if (snapshot.commitKind() == CommitKind.APPEND
+                    && newCommitKind == CommitKind.OVERWRITE) {
+                Iterator<ManifestEntry> entries =
+                        scan.withSnapshot(snapshot)
+                                .withKind(ScanMode.DELTA)
+                                .onlyReadRealBuckets()
+                                .dropStats()
+                                .readFileIterator();
+                if (entries.hasNext()) {
+                    throw new RuntimeException(
+                            String.format(
+                                    "When trying to commit snapshot %d, "
+                                            + "commit user %s has found a 
APPEND snapshot (id: %d) by another user %s "
+                                            + "which committed files to fixed 
bucket. Giving up committing as %s is set.",
+                                    newSnapshotId,
+                                    commitUser,
+                                    id,
+                                    snapshot.commitUser(),
+                                    
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key()));
+                }
+            }
+        }
+    }
+
+    public void update(long newSafeSnapshot) {
+        strictModeLastSafeSnapshot = newSafeSnapshot;
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index 2b3625a3c3..7fc17ae792 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -67,6 +67,7 @@ import java.util.stream.Collectors;
 import java.util.stream.LongStream;
 
 import static java.util.Collections.singletonMap;
+import static 
org.apache.paimon.CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT;
 import static 
org.apache.paimon.utils.FileStorePathFactoryTest.createNonPartFactory;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatCode;
@@ -421,19 +422,17 @@ public class TableCommitTest {
         TableWriteImpl<?> write1 = table.newWrite(user1);
         TableCommitImpl commit1 = table.newCommit(user1);
 
-        Map<String, String> newOptions = new HashMap<>();
-        
newOptions.put(CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "-1");
-        table = table.copy(newOptions);
-        String user2 = UUID.randomUUID().toString();
-        TableWriteImpl<?> write2 = table.newWrite(user2);
-        TableCommitImpl commit2 = table.newCommit(user2);
-
-        // by default, first commit is not checked
-
         write1.write(GenericRow.of(0, 0L));
         write1.compact(BinaryRow.EMPTY_ROW, 0, true);
         commit1.commit(1, write1.prepareCommit(true, 1));
 
+        // test skip this commit check
+
+        String user2 = UUID.randomUUID().toString();
+        table = 
table.copy(singletonMap(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "2"));
+        TableWriteImpl<?> write2 = table.newWrite(user2);
+        TableCommitImpl commit2 = table.newCommit(user2);
+
         write2.write(GenericRow.of(1, 1L));
         commit2.commit(1, write2.prepareCommit(false, 1));
 
@@ -482,19 +481,17 @@ public class TableCommitTest {
         TableWriteImpl<?> write1 = fixedBucketWriteTable.newWrite(user1);
         TableCommitImpl commit1 = fixedBucketWriteTable.newCommit(user1);
 
-        Map<String, String> newOptions = new HashMap<>();
-        
newOptions.put(CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "-1");
-        table = table.copy(newOptions);
-        String user2 = UUID.randomUUID().toString();
-        TableWriteImpl<?> write2 = table.newWrite(user2);
-        TableCommitImpl commit2 = 
table.newCommit(user2).withOverwrite(Collections.emptyMap());
-
-        // by default, first commit is not checked
-
         write1.write(GenericRow.of(0, 0L));
         write1.compact(BinaryRow.EMPTY_ROW, 0, true);
         commit1.commit(1, write1.prepareCommit(true, 1));
 
+        // test skip this commit check
+
+        String user2 = UUID.randomUUID().toString();
+        table = 
table.copy(singletonMap(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "2"));
+        TableWriteImpl<?> write2 = table.newWrite(user2);
+        TableCommitImpl commit2 = 
table.newCommit(user2).withOverwrite(Collections.emptyMap());
+
         write2.write(GenericRow.of(1, 1L));
         commit2.commit(1, write2.prepareCommit(false, 1));
 
@@ -560,19 +557,17 @@ public class TableCommitTest {
         TableWriteImpl<?> write1 = fixedBucketWriteTable.newWrite(user1);
         TableCommitImpl commit1 = fixedBucketWriteTable.newCommit(user1);
 
-        Map<String, String> newOptions = new HashMap<>();
-        
newOptions.put(CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "-1");
-        table = table.copy(newOptions);
-        String user2 = UUID.randomUUID().toString();
-        TableWriteImpl<?> write2 = table.newWrite(user2);
-        TableCommitImpl commit2 = table.newCommit(user2);
-
-        // by default, first commit is not checked
-
         write1.write(GenericRow.of(0, 0L));
         write1.compact(BinaryRow.EMPTY_ROW, 0, true);
         commit1.commit(1, write1.prepareCommit(true, 1));
 
+        // test skip this commit check
+
+        String user2 = UUID.randomUUID().toString();
+        table = 
table.copy(singletonMap(COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "2"));
+        TableWriteImpl<?> write2 = table.newWrite(user2);
+        TableCommitImpl commit2 = table.newCommit(user2);
+
         write2.write(GenericRow.of(1, 1L));
         commit2.commit(1, write2.prepareCommit(false, 1));
 
@@ -896,15 +891,14 @@ public class TableCommitTest {
                         tableSchema,
                         CatalogEnvironment.empty());
         String user1 = UUID.randomUUID().toString();
-        FileStoreTable overwriteUpgrade =
-                table.copy(Collections.singletonMap("write-only", "true"));
+        FileStoreTable overwriteUpgrade = 
table.copy(singletonMap("write-only", "true"));
         TableWriteImpl<?> write1 = overwriteUpgrade.newWrite(user1);
         TableCommitImpl commit1 =
                 
overwriteUpgrade.newCommit(user1).withOverwrite(Collections.emptyMap());
 
         String user2 = UUID.randomUUID().toString();
         FileStoreTable compactTable =
-                
table.copy(Collections.singletonMap("compaction.force-up-level-0", "true"));
+                table.copy(singletonMap("compaction.force-up-level-0", 
"true"));
         TableWriteImpl<?> write2 = compactTable.newWrite(user2);
         TableCommitImpl commit2 = compactTable.newCommit(user2);
 

Reply via email to