This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a1187e935 [core] Append commit should check bucket number if latest 
commit user is different (#6723)
7a1187e935 is described below

commit 7a1187e9354d656cebbc2b4dd1a42ac56e75a469
Author: yuzelin <[email protected]>
AuthorDate: Tue Dec 2 19:23:37 2025 +0800

    [core] Append commit should check bucket number if latest commit user is 
different (#6723)
---
 .../apache/paimon/operation/FileStoreCommit.java   |  2 +
 .../paimon/operation/FileStoreCommitImpl.java      |  9 +++
 .../paimon/table/sink/BatchWriteBuilderImpl.java   | 13 ++++-
 .../apache/paimon/table/sink/InnerTableCommit.java |  2 +
 .../apache/paimon/table/sink/TableCommitImpl.java  |  6 ++
 .../apache/paimon/table/sink/TableCommitTest.java  | 68 ++++++++++++++++++++++
 .../paimon/flink/sink/PostponeFixedBucketSink.java | 12 +++-
 .../paimon/spark/commands/PaimonSparkWriter.scala  |  2 +
 8 files changed, 110 insertions(+), 4 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index 6a00db6f0e..ecd4975858 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -36,6 +36,8 @@ public interface FileStoreCommit extends AutoCloseable {
 
     FileStoreCommit withPartitionExpire(PartitionExpire partitionExpire);
 
+    FileStoreCommit appendCommitCheckConflict(boolean 
appendCommitCheckConflict);
+
     /** Find out which committables need to be retried when recovering from 
the failure. */
     List<ManifestCommittable> filterCommitted(List<ManifestCommittable> 
committables);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index cc9890fc72..fc89ff1ecd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -154,6 +154,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
 
     private boolean ignoreEmptyCommit;
     private CommitMetrics commitMetrics;
+    private boolean appendCommitCheckConflict = false;
 
     public FileStoreCommitImpl(
             SnapshotCommit snapshotCommit,
@@ -246,6 +247,12 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         return this;
     }
 
+    @Override
+    public FileStoreCommit appendCommitCheckConflict(boolean 
appendCommitCheckConflict) {
+        this.appendCommitCheckConflict = appendCommitCheckConflict;
+        return this;
+    }
+
     @Override
     public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> 
committables) {
         // nothing to filter, fast exit
@@ -327,6 +334,8 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
                 if (containsFileDeletionOrDeletionVectors(appendSimpleEntries, 
appendIndexFiles)) {
                     commitKind = CommitKind.OVERWRITE;
                     conflictCheck = mustConflictCheck();
+                } else if (latestSnapshot != null && 
appendCommitCheckConflict) {
+                    conflictCheck = mustConflictCheck();
                 }
 
                 boolean discardDuplicate = discardDuplicateFiles && commitKind 
== CommitKind.APPEND;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
index 6c950a360a..66c67e8965 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/BatchWriteBuilderImpl.java
@@ -40,6 +40,7 @@ public class BatchWriteBuilderImpl implements 
BatchWriteBuilder {
     private final String commitUser;
 
     private Map<String, String> staticPartition;
+    private boolean appendCommitCheckConflict = false;
 
     public BatchWriteBuilderImpl(InnerTable table) {
         this.table = table;
@@ -81,7 +82,10 @@ public class BatchWriteBuilderImpl implements 
BatchWriteBuilder {
 
     @Override
     public BatchTableCommit newCommit() {
-        InnerTableCommit commit = 
table.newCommit(commitUser).withOverwrite(staticPartition);
+        InnerTableCommit commit =
+                table.newCommit(commitUser)
+                        .withOverwrite(staticPartition)
+                        .appendCommitCheckConflict(appendCommitCheckConflict);
         commit.ignoreEmptyCommit(
                 Options.fromMap(table.options())
                         .getOptional(CoreOptions.SNAPSHOT_IGNORE_EMPTY_COMMIT)
@@ -89,7 +93,12 @@ public class BatchWriteBuilderImpl implements 
BatchWriteBuilder {
         return commit;
     }
 
-    public BatchWriteBuilder copyWithNewTable(Table newTable) {
+    public BatchWriteBuilderImpl copyWithNewTable(Table newTable) {
         return new BatchWriteBuilderImpl((InnerTable) newTable, commitUser, 
staticPartition);
     }
+
+    public BatchWriteBuilderImpl appendCommitCheckConflict(boolean 
appendCommitCheckConflict) {
+        this.appendCommitCheckConflict = appendCommitCheckConflict;
+        return this;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
index df6241086a..0a8fdd6742 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/InnerTableCommit.java
@@ -46,6 +46,8 @@ public interface InnerTableCommit extends StreamTableCommit, 
BatchTableCommit {
 
     InnerTableCommit expireForEmptyCommit(boolean expireForEmptyCommit);
 
+    InnerTableCommit appendCommitCheckConflict(boolean 
appendCommitCheckConflict);
+
     @Override
     InnerTableCommit withMetricRegistry(MetricRegistry registry);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 3e778c6bcc..9caf08ff28 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -158,6 +158,12 @@ public class TableCommitImpl implements InnerTableCommit {
         return this;
     }
 
+    @Override
+    public TableCommitImpl appendCommitCheckConflict(boolean 
appendCommitCheckConflict) {
+        commit.appendCommitCheckConflict(appendCommitCheckConflict);
+        return this;
+    }
+
     @Override
     public InnerTableCommit withMetricRegistry(MetricRegistry registry) {
         commit.withMetrics(new CommitMetrics(registry, tableName));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
index e73695a209..bc0756fcd1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java
@@ -50,6 +50,8 @@ import org.apache.paimon.utils.SnapshotManager;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -67,6 +69,7 @@ import java.util.stream.LongStream;
 import static java.util.Collections.singletonMap;
 import static 
org.apache.paimon.utils.FileStorePathFactoryTest.createNonPartFactory;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link TableCommit}. */
@@ -322,6 +325,71 @@ public class TableCommitTest {
         }
     }
 
+    @ParameterizedTest
+    @ValueSource(booleans = {true, false})
+    public void testGiveUpCommitWhenAppendFoundTotalBucketsChanged(boolean 
checkAppend)
+            throws Exception {
+        String path = tempDir.toString();
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
+                        new String[] {"k", "v"});
+
+        Options options = new Options();
+        options.set(CoreOptions.PATH, path);
+        options.set(CoreOptions.BUCKET, 1);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), new 
Path(path)),
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.emptyList(),
+                                Collections.singletonList("k"),
+                                options.toMap(),
+                                ""));
+        FileStoreTable table =
+                FileStoreTableFactory.create(
+                        LocalFileIO.create(),
+                        new Path(path),
+                        tableSchema,
+                        CatalogEnvironment.empty());
+
+        String commitUser1 = UUID.randomUUID().toString();
+        TableWriteImpl<?> write1 = table.newWrite(commitUser1);
+        TableCommitImpl commit1 = table.newCommit(commitUser1);
+        for (int i = 1; i < 10; i++) {
+            write1.write(GenericRow.of(i, (long) i));
+        }
+
+        // mock rescale
+        String commitUser2 = UUID.randomUUID().toString();
+        options = new Options(table.options());
+        options.set(CoreOptions.BUCKET, 2);
+        FileStoreTable rescaleTable = 
table.copy(tableSchema.copy(options.toMap()));
+        try (TableWriteImpl<?> write = rescaleTable.newWrite(commitUser2);
+                TableCommitImpl commit =
+                        
rescaleTable.newCommit(commitUser2).withOverwrite(Collections.emptyMap())) {
+            for (int i = 1; i < 10; i++) {
+                write.write(GenericRow.of(i, (long) i));
+            }
+            commit.commit(1, write.prepareCommit(false, 1));
+        }
+
+        if (checkAppend) {
+            commit1.appendCommitCheckConflict(true);
+            assertThatThrownBy(() -> commit1.commit(1, 
write1.prepareCommit(false, 1)))
+                    .isInstanceOf(RuntimeException.class)
+                    .hasMessageContaining("changed from 2 to 1 without 
overwrite");
+        } else {
+            // the commit result is error, but here verify that no check if
+            // appendCommitCheckConflict was not set
+            assertThatCode(() -> commit1.commit(1, write1.prepareCommit(false, 
1)))
+                    .doesNotThrowAnyException();
+        }
+        write1.close();
+        commit1.close();
+    }
+
     @Test
     public void testStrictModeForCompact() throws Exception {
         String path = tempDir.toString();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java
index 3223ac50d3..f81ec99866 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/PostponeFixedBucketSink.java
@@ -68,8 +68,16 @@ public class PostponeFixedBucketSink extends 
FlinkWriteSink<InternalRow> {
     @Override
     protected Committer.Factory<Committable, ManifestCommittable> 
createCommitterFactory() {
         if (overwritePartition == null) {
-            // The table has copied bucket option outside, no need to change 
anything
-            return super.createCommitterFactory();
+            // The table has copied bucket option outside, no need to change.
+            return context ->
+                    new StoreCommitter(
+                            table,
+                            table.newCommit(context.commitUser())
+                                    .withOverwrite(overwritePartition)
+                                    
.ignoreEmptyCommit(!context.streamingCheckpointEnabled())
+                                    // Need to check conflict
+                                    .appendCommitCheckConflict(true),
+                            context);
         } else {
             // When overwriting, the postpone bucket files need to be deleted, 
so using a postpone
             // bucket table commit here
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 23690b46eb..c4219e4e63 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -410,6 +410,8 @@ case class PaimonSparkWriter(
       writeBuilder
         .asInstanceOf[BatchWriteBuilderImpl]
         .copyWithNewTable(PostponeUtils.tableForCommit(table))
+        // Need to check conflict
+        .appendCommitCheckConflict(true)
     } else {
       writeBuilder
     }

Reply via email to