This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9eb6305b79 [core] Strict-mode for OVERWRITE check APPEND snapshot with 
fixed bucket files (#6670)
9eb6305b79 is described below

commit 9eb6305b796c2750b3487d5f8789814bfbaa4480
Author: yuzelin <[email protected]>
AuthorDate: Wed Nov 26 10:23:19 2025 +0800

    [core] Strict-mode for OVERWRITE check APPEND snapshot with fixed bucket 
files (#6670)
---
 .../shortcodes/generated/core_configuration.html   |  2 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |  5 +-
 .../java/org/apache/paimon/AbstractFileStore.java  |  3 +-
 .../paimon/operation/FileStoreCommitImpl.java      | 20 +-----
 .../paimon/operation/commit/ConflictDetection.java | 63 ++++++++++++++++-
 .../apache/paimon/table/sink/TableCommitTest.java  | 80 ++++++++++++++++++++--
 6 files changed, 145 insertions(+), 28 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index c6101e0a85..b50b9f2c9a 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -240,7 +240,7 @@ under the License.
             <td><h5>commit.strict-mode.last-safe-snapshot</h5></td>
             <td style="word-wrap: break-word;">(none)</td>
             <td>Long</td>
-            <td>If set, committer will check if there are other commit user's 
COMPACT / OVERWRITE snapshot, starting from the snapshot after this one. If 
found, commit will be aborted. If the value of this option is -1, committer 
will not check for its first commit.</td>
+            <td>If set, committer will check if there are other commit user's 
snapshot starting from the snapshot after this one. If found a COMPACT / 
OVERWRITE snapshot, or found a APPEND snapshot which committed files to fixed 
bucket, commit will be aborted.If the value of this option is -1, committer 
will not check for its first commit.</td>
         </tr>
         <tr>
             <td><h5>commit.timeout</h5></td>
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index fa5b133a3c..e9077ff589 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1949,8 +1949,9 @@ public class CoreOptions implements Serializable {
                     .longType()
                     .noDefaultValue()
                     .withDescription(
-                            "If set, committer will check if there are other 
commit user's COMPACT / OVERWRITE snapshot, "
-                                    + "starting from the snapshot after this 
one. If found, commit will be aborted. "
+                            "If set, committer will check if there are other 
commit user's snapshot starting from the "
+                                    + "snapshot after this one. If found a 
COMPACT / OVERWRITE snapshot, or found a "
+                                    + "APPEND snapshot which committed files 
to fixed bucket, commit will be aborted."
                                     + "If the value of this option is -1, 
committer will not check for its first commit.");
 
     public static final ConfigOption<String> CLUSTERING_COLUMNS =
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 5fa81383ce..a7e15b31d6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -277,7 +277,8 @@ abstract class AbstractFileStore<T> implements FileStore<T> 
{
                         newKeyComparator(),
                         bucketMode(),
                         options.deletionVectorsEnabled(),
-                        newIndexFileHandler());
+                        newIndexFileHandler(),
+                        newScan());
         return new FileStoreCommitImpl(
                 snapshotCommit,
                 fileIO,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index ceb68ed6ba..d0f18760ec 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -992,24 +992,8 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
 
         if (strictModeLastSafeSnapshot != null && strictModeLastSafeSnapshot 
>= 0) {
-            for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId; 
id++) {
-                Snapshot snapshot = snapshotManager.snapshot(id);
-                if ((snapshot.commitKind() == CommitKind.COMPACT
-                                || snapshot.commitKind() == 
CommitKind.OVERWRITE)
-                        && !snapshot.commitUser().equals(commitUser)) {
-                    throw new RuntimeException(
-                            String.format(
-                                    "When trying to commit snapshot %d, "
-                                            + "commit user %s has found a %s 
snapshot (id: %d) by another user %s. "
-                                            + "Giving up committing as %s is 
set.",
-                                    newSnapshotId,
-                                    commitUser,
-                                    snapshot.commitKind().name(),
-                                    id,
-                                    snapshot.commitUser(),
-                                    
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key()));
-                }
-            }
+            conflictDetection.commitStrictModeCheck(
+                    strictModeLastSafeSnapshot, newSnapshotId, commitKind, 
snapshotManager);
             strictModeLastSafeSnapshot = newSnapshotId - 1;
         }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
index 9739beae88..0261a3d7de 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.operation.commit;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.Snapshot.CommitKind;
 import org.apache.paimon.data.BinaryRow;
@@ -28,13 +29,17 @@ import org.apache.paimon.index.IndexFileMeta;
 import org.apache.paimon.manifest.FileEntry;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.SimpleFileEntry;
 import org.apache.paimon.manifest.SimpleFileEntryWithDV;
+import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.PartitionExpire;
 import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.source.ScanMode;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.SnapshotManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,6 +52,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -71,6 +77,7 @@ public class ConflictDetection {
     private final BucketMode bucketMode;
     private final boolean deletionVectorsEnabled;
     private final IndexFileHandler indexFileHandler;
+    private final FileStoreScan scan;
 
     private @Nullable PartitionExpire partitionExpire;
 
@@ -82,7 +89,8 @@ public class ConflictDetection {
             @Nullable Comparator<InternalRow> keyComparator,
             BucketMode bucketMode,
             boolean deletionVectorsEnabled,
-            IndexFileHandler indexFileHandler) {
+            IndexFileHandler indexFileHandler,
+            FileStoreScan scan) {
         this.tableName = tableName;
         this.commitUser = commitUser;
         this.partitionType = partitionType;
@@ -91,12 +99,65 @@ public class ConflictDetection {
         this.bucketMode = bucketMode;
         this.deletionVectorsEnabled = deletionVectorsEnabled;
         this.indexFileHandler = indexFileHandler;
+        this.scan = scan;
     }
 
     public void withPartitionExpire(PartitionExpire partitionExpire) {
         this.partitionExpire = partitionExpire;
     }
 
+    public void commitStrictModeCheck(
+            @Nullable Long strictModeLastSafeSnapshot,
+            long newSnapshotId,
+            CommitKind newCommitKind,
+            SnapshotManager snapshotManager) {
+        if (strictModeLastSafeSnapshot == null || strictModeLastSafeSnapshot < 
0) {
+            return;
+        }
+
+        for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId; 
id++) {
+            Snapshot snapshot = snapshotManager.snapshot(id);
+            if (snapshot.commitUser().equals(commitUser)) {
+                continue;
+            }
+            if (snapshot.commitKind() == CommitKind.COMPACT
+                    || snapshot.commitKind() == CommitKind.OVERWRITE) {
+                throw new RuntimeException(
+                        String.format(
+                                "When trying to commit snapshot %d, "
+                                        + "commit user %s has found a %s 
snapshot (id: %d) by another user %s. "
+                                        + "Giving up committing as %s is set.",
+                                newSnapshotId,
+                                commitUser,
+                                snapshot.commitKind().name(),
+                                id,
+                                snapshot.commitUser(),
+                                
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key()));
+            }
+            if (snapshot.commitKind() == CommitKind.APPEND
+                    && newCommitKind == CommitKind.OVERWRITE) {
+                Iterator<ManifestEntry> entries =
+                        scan.withSnapshot(snapshot)
+                                .withKind(ScanMode.DELTA)
+                                .onlyReadRealBuckets()
+                                .dropStats()
+                                .readFileIterator();
+                if (entries.hasNext()) {
+                    throw new RuntimeException(
+                            String.format(
+                                    "When trying to commit snapshot %d, "
+                                            + "commit user %s has found a 
APPEND snapshot (id: %d) by another user %s "
+                                            + "which committed files to fixed 
bucket. Giving up committing as %s is set.",
+                                    newSnapshotId,
+                                    commitUser,
+                                    id,
+                                    snapshot.commitUser(),
+                                    
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key()));
+                }
+            }
+        }
+    }
+
     public void checkNoConflictsOrFail(
             Snapshot snapshot,
             List<SimpleFileEntry> baseEntries,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index 0a39ca7ef8..10373c0377 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -323,7 +323,7 @@ public class TableCommitTest {
     }
 
     @Test
-    public void testStrictMode() throws Exception {
+    public void testStrictModeForCompact() throws Exception {
         String path = tempDir.toString();
         RowType rowType =
                 RowType.of(
@@ -369,18 +369,88 @@ public class TableCommitTest {
         write2.write(GenericRow.of(1, 1L));
         commit2.commit(1, write2.prepareCommit(false, 1));
 
-        // APPEND commit is ignored
+        // COMPACT commit should be checked
+
+        write1.write(GenericRow.of(4, 4L));
+        write1.compact(BinaryRow.EMPTY_ROW, 0, true);
+        commit1.commit(3, write1.prepareCommit(true, 3));
+
+        write2.write(GenericRow.of(5, 5L));
+        assertThatThrownBy(() -> commit2.commit(3, write2.prepareCommit(false, 
3)))
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining(
+                        "Giving up committing as 
commit.strict-mode.last-safe-snapshot is set.");
+    }
+
+    @Test
+    public void testStrictModeForAppend() throws Exception {
+        String path = tempDir.toString();
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
+                        new String[] {"k", "v"});
+
+        Options options = new Options();
+        options.set(CoreOptions.PATH, path);
+        options.set(CoreOptions.BUCKET, 1);
+        options.set(CoreOptions.NUM_SORTED_RUNS_COMPACTION_TRIGGER, 10);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), new 
Path(path)),
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.emptyList(),
+                                Collections.singletonList("k"),
+                                options.toMap(),
+                                ""));
+        FileStoreTable table =
+                FileStoreTableFactory.create(
+                        LocalFileIO.create(),
+                        new Path(path),
+                        tableSchema,
+                        CatalogEnvironment.empty());
+        String user1 = UUID.randomUUID().toString();
+        FileStoreTable fixedBucketWriteTable = table;
+        TableWriteImpl<?> write1 = fixedBucketWriteTable.newWrite(user1);
+        TableCommitImpl commit1 = fixedBucketWriteTable.newCommit(user1);
 
+        Map<String, String> newOptions = new HashMap<>();
+        
newOptions.put(CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key(), "-1");
+        table = table.copy(newOptions);
+        String user2 = UUID.randomUUID().toString();
+        TableWriteImpl<?> write2 = table.newWrite(user2);
+        TableCommitImpl commit2 = 
table.newCommit(user2).withOverwrite(Collections.emptyMap());
+
+        // by default, first commit is not checked
+
+        write1.write(GenericRow.of(0, 0L));
+        write1.compact(BinaryRow.EMPTY_ROW, 0, true);
+        commit1.commit(1, write1.prepareCommit(true, 1));
+
+        write2.write(GenericRow.of(1, 1L));
+        commit2.commit(1, write2.prepareCommit(false, 1));
+
+        // APPEND with postpone bucket files should be ignored
+        write1.close();
+        commit1.close();
+        Map<String, String> postponeWriteOptions = new HashMap<>();
+        postponeWriteOptions.put(CoreOptions.BUCKET.key(), "-2");
+        
postponeWriteOptions.put(CoreOptions.POSTPONE_BATCH_WRITE_FIXED_BUCKET.key(), 
"false");
+        FileStoreTable postponeWriteTable = 
fixedBucketWriteTable.copy(postponeWriteOptions);
+        write1 = postponeWriteTable.newWrite(user1);
+        commit1 = postponeWriteTable.newCommit(user1);
         write1.write(GenericRow.of(2, 2L));
         commit1.commit(2, write1.prepareCommit(false, 2));
 
         write2.write(GenericRow.of(3, 3L));
         commit2.commit(2, write2.prepareCommit(false, 2));
 
-        // COMPACT commit should be checked
-
+        // APPEND with fixed bucket files should be checked
+        write1.close();
+        commit1.close();
+        write1 = fixedBucketWriteTable.newWrite(user1);
+        commit1 = fixedBucketWriteTable.newCommit(user1);
         write1.write(GenericRow.of(4, 4L));
-        write1.compact(BinaryRow.EMPTY_ROW, 0, true);
         commit1.commit(3, write1.prepareCommit(true, 3));
 
         write2.write(GenericRow.of(5, 5L));

Reply via email to