This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 22bf676b7a [core] Overwrite commit should never be conflicted by 
delete files (#6687)
22bf676b7a is described below

commit 22bf676b7a80d8c0e88e73e8a9967d935d068162
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Nov 26 21:33:50 2025 +0800

    [core] Overwrite commit should never be conflicted by delete files (#6687)
---
 .../paimon/operation/FileStoreCommitImpl.java      | 86 ++++++++++++--------
 .../paimon/table/AppendOnlySimpleTableTest.java    | 91 ++++++++++++++++++++++
 2 files changed, 145 insertions(+), 32 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index d0f18760ec..cc9890fc72 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -369,9 +369,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
 
                 attempts +=
                         tryCommit(
-                                appendTableFiles,
-                                appendChangelog,
-                                appendIndexFiles,
+                                provider(appendTableFiles, appendChangelog, 
appendIndexFiles),
                                 committable.identifier(),
                                 committable.watermark(),
                                 committable.logOffsets(),
@@ -406,9 +404,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
 
                 attempts +=
                         tryCommit(
-                                compactTableFiles,
-                                compactChangelog,
-                                compactIndexFiles,
+                                provider(compactTableFiles, compactChangelog, 
compactIndexFiles),
                                 committable.identifier(),
                                 committable.watermark(),
                                 committable.logOffsets(),
@@ -577,9 +573,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
             if (!compactTableFiles.isEmpty() || !compactIndexFiles.isEmpty()) {
                 attempts +=
                         tryCommit(
-                                compactTableFiles,
-                                emptyList(),
-                                compactIndexFiles,
+                                provider(compactTableFiles, emptyList(), 
compactIndexFiles),
                                 committable.identifier(),
                                 committable.watermark(),
                                 committable.logOffsets(),
@@ -687,9 +681,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     public void commitStatistics(Statistics stats, long commitIdentifier) {
         String statsFileName = statsFileHandler.writeStats(stats);
         tryCommit(
-                emptyList(),
-                emptyList(),
-                emptyList(),
+                provider(emptyList(), emptyList(), emptyList()),
                 commitIdentifier,
                 null,
                 Collections.emptyMap(),
@@ -830,9 +822,7 @@ public class FileStoreCommitImpl implements FileStoreCommit 
{
     }
 
     private int tryCommit(
-            List<ManifestEntry> tableFiles,
-            List<ManifestEntry> changelogFiles,
-            List<IndexManifestEntry> indexFiles,
+            ChangesProvider changesProvider,
             long identifier,
             @Nullable Long watermark,
             Map<Integer, Long> logOffsets,
@@ -845,12 +835,13 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         long startMillis = System.currentTimeMillis();
         while (true) {
             Snapshot latestSnapshot = snapshotManager.latestSnapshot();
+            CommitChanges changes = changesProvider.provide(latestSnapshot);
             CommitResult result =
                     tryCommitOnce(
                             retryResult,
-                            tableFiles,
-                            changelogFiles,
-                            indexFiles,
+                            changes.tableFiles,
+                            changes.changelogFiles,
+                            changes.indexFiles,
                             identifier,
                             watermark,
                             logOffsets,
@@ -895,8 +886,23 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             @Nullable Long watermark,
             Map<Integer, Long> logOffsets,
             Map<String, String> properties) {
-        // collect all files with overwrite
-        Snapshot latestSnapshot = snapshotManager.latestSnapshot();
+        return tryCommit(
+                latestSnapshot ->
+                        overwriteChanges(changes, indexFiles, latestSnapshot, 
partitionFilter),
+                identifier,
+                watermark,
+                logOffsets,
+                properties,
+                CommitKind.OVERWRITE,
+                mustConflictCheck(),
+                null);
+    }
+
+    private CommitChanges overwriteChanges(
+            List<ManifestEntry> changes,
+            List<IndexManifestEntry> indexFiles,
+            @Nullable Snapshot latestSnapshot,
+            @Nullable PartitionPredicate partitionFilter) {
         List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
         List<IndexManifestEntry> indexChangesWithOverwrite = new ArrayList<>();
         if (latestSnapshot != null) {
@@ -931,18 +937,7 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
         changesWithOverwrite.addAll(changes);
         indexChangesWithOverwrite.addAll(indexFiles);
-
-        return tryCommit(
-                changesWithOverwrite,
-                emptyList(),
-                indexChangesWithOverwrite,
-                identifier,
-                watermark,
-                logOffsets,
-                properties,
-                CommitKind.OVERWRITE,
-                mustConflictCheck(),
-                null);
+        return new CommitChanges(changesWithOverwrite, emptyList(), 
indexChangesWithOverwrite);
     }
 
     @VisibleForTesting
@@ -1547,4 +1542,31 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
             return false;
         }
     }
+
+    private static ChangesProvider provider(
+            List<ManifestEntry> tableFiles,
+            List<ManifestEntry> changelogFiles,
+            List<IndexManifestEntry> indexFiles) {
+        return s -> new CommitChanges(tableFiles, changelogFiles, indexFiles);
+    }
+
+    private interface ChangesProvider {
+        CommitChanges provide(@Nullable Snapshot latestSnapshot);
+    }
+
+    private static class CommitChanges {
+
+        private final List<ManifestEntry> tableFiles;
+        private final List<ManifestEntry> changelogFiles;
+        private final List<IndexManifestEntry> indexFiles;
+
+        private CommitChanges(
+                List<ManifestEntry> tableFiles,
+                List<ManifestEntry> changelogFiles,
+                List<IndexManifestEntry> indexFiles) {
+            this.tableFiles = tableFiles;
+            this.changelogFiles = changelogFiles;
+            this.indexFiles = indexFiles;
+        }
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index dd356e77fd..2470fdf749 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.table;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.append.AppendCompactTask;
 import org.apache.paimon.bucket.DefaultBucketFunction;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.BinaryString;
@@ -37,6 +38,8 @@ import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.BundleRecords;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.operation.BaseAppendFileStoreWrite;
+import org.apache.paimon.operation.FileStoreWrite;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Equal;
@@ -57,6 +60,7 @@ import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.TableWriteImpl;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.ScanMode;
@@ -93,6 +97,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -116,6 +121,92 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 /** Tests for {@link AppendOnlyFileStoreTable}. */
 public class AppendOnlySimpleTableTest extends SimpleTableTestBase {
 
+    @Test
+    public void testOverwriteNeverFail() throws Exception {
+        FileStoreTable table = createFileStoreTable();
+
+        Runnable writeRecord =
+                () -> {
+                    BatchWriteBuilder writeBuilder = 
table.newBatchWriteBuilder();
+                    try (BatchTableWrite write = writeBuilder.newWrite();
+                            BatchTableCommit commit = 
writeBuilder.newCommit()) {
+                        write.write(rowData(1, 10, 100L));
+                        commit.commit(write.prepareCommit());
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                };
+
+        Runnable overwrite =
+                () -> {
+                    BatchWriteBuilder writeBuilder = 
table.newBatchWriteBuilder().withOverwrite();
+                    try (BatchTableWrite write = writeBuilder.newWrite();
+                            BatchTableCommit commit = 
writeBuilder.newCommit()) {
+                        write.write(rowData(1, 10, 100L));
+                        commit.commit(write.prepareCommit());
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                };
+
+        Runnable compact =
+                () -> {
+                    BatchWriteBuilder writeBuilder = 
table.newBatchWriteBuilder().withOverwrite();
+                    try (BatchTableWrite write = writeBuilder.newWrite();
+                            BatchTableCommit commit = 
writeBuilder.newCommit()) {
+                        List<DataSplit> splits =
+                                (List) 
table.newReadBuilder().newScan().plan().splits();
+                        List<DataFileMeta> files =
+                                splits.stream()
+                                        .flatMap(s -> s.dataFiles().stream())
+                                        .collect(Collectors.toList());
+                        FileStoreWrite fileStoreWrite = ((TableWriteImpl) 
write).getWrite();
+                        CommitMessage commitMessage =
+                                new 
AppendCompactTask(splits.get(0).partition(), files)
+                                        .doCompact(
+                                                table, 
(BaseAppendFileStoreWrite) fileStoreWrite);
+                        
commit.commit(Collections.singletonList(commitMessage));
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                };
+
+        AtomicReference<Exception> exception = new AtomicReference<>();
+
+        Thread thread1 =
+                new Thread(
+                        () -> {
+                            for (int i = 0; i < 10; i++) {
+                                try {
+                                    writeRecord.run();
+                                    overwrite.run();
+                                } catch (Exception e) {
+                                    exception.set(e);
+                                }
+                            }
+                        });
+
+        Thread thread2 =
+                new Thread(
+                        () -> {
+                            for (int i = 0; i < 10; i++) {
+                                try {
+                                    writeRecord.run();
+                                    compact.run();
+                                } catch (Exception ignored) {
+                                }
+                            }
+                        });
+
+        thread1.start();
+        thread2.start();
+
+        thread1.join();
+        thread2.join();
+
+        assertThat(exception.get()).isNull();
+    }
+
     @Test
     public void testDiscardDuplicateFiles() throws Exception {
         FileStoreTable table =

Reply via email to