This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 22bf676b7a [core] Overwrite commit should never be conflicted by
delete files (#6687)
22bf676b7a is described below
commit 22bf676b7a80d8c0e88e73e8a9967d935d068162
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Nov 26 21:33:50 2025 +0800
[core] Overwrite commit should never be conflicted by delete files (#6687)
---
.../paimon/operation/FileStoreCommitImpl.java | 86 ++++++++++++--------
.../paimon/table/AppendOnlySimpleTableTest.java | 91 ++++++++++++++++++++++
2 files changed, 145 insertions(+), 32 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index d0f18760ec..cc9890fc72 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -369,9 +369,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
attempts +=
tryCommit(
- appendTableFiles,
- appendChangelog,
- appendIndexFiles,
+ provider(appendTableFiles, appendChangelog,
appendIndexFiles),
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
@@ -406,9 +404,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
attempts +=
tryCommit(
- compactTableFiles,
- compactChangelog,
- compactIndexFiles,
+ provider(compactTableFiles, compactChangelog,
compactIndexFiles),
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
@@ -577,9 +573,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
if (!compactTableFiles.isEmpty() || !compactIndexFiles.isEmpty()) {
attempts +=
tryCommit(
- compactTableFiles,
- emptyList(),
- compactIndexFiles,
+ provider(compactTableFiles, emptyList(),
compactIndexFiles),
committable.identifier(),
committable.watermark(),
committable.logOffsets(),
@@ -687,9 +681,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
public void commitStatistics(Statistics stats, long commitIdentifier) {
String statsFileName = statsFileHandler.writeStats(stats);
tryCommit(
- emptyList(),
- emptyList(),
- emptyList(),
+ provider(emptyList(), emptyList(), emptyList()),
commitIdentifier,
null,
Collections.emptyMap(),
@@ -830,9 +822,7 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
}
private int tryCommit(
- List<ManifestEntry> tableFiles,
- List<ManifestEntry> changelogFiles,
- List<IndexManifestEntry> indexFiles,
+ ChangesProvider changesProvider,
long identifier,
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
@@ -845,12 +835,13 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
long startMillis = System.currentTimeMillis();
while (true) {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
+ CommitChanges changes = changesProvider.provide(latestSnapshot);
CommitResult result =
tryCommitOnce(
retryResult,
- tableFiles,
- changelogFiles,
- indexFiles,
+ changes.tableFiles,
+ changes.changelogFiles,
+ changes.indexFiles,
identifier,
watermark,
logOffsets,
@@ -895,8 +886,23 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
@Nullable Long watermark,
Map<Integer, Long> logOffsets,
Map<String, String> properties) {
- // collect all files with overwrite
- Snapshot latestSnapshot = snapshotManager.latestSnapshot();
+ return tryCommit(
+ latestSnapshot ->
+ overwriteChanges(changes, indexFiles, latestSnapshot,
partitionFilter),
+ identifier,
+ watermark,
+ logOffsets,
+ properties,
+ CommitKind.OVERWRITE,
+ mustConflictCheck(),
+ null);
+ }
+
+ private CommitChanges overwriteChanges(
+ List<ManifestEntry> changes,
+ List<IndexManifestEntry> indexFiles,
+ @Nullable Snapshot latestSnapshot,
+ @Nullable PartitionPredicate partitionFilter) {
List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
List<IndexManifestEntry> indexChangesWithOverwrite = new ArrayList<>();
if (latestSnapshot != null) {
@@ -931,18 +937,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
changesWithOverwrite.addAll(changes);
indexChangesWithOverwrite.addAll(indexFiles);
-
- return tryCommit(
- changesWithOverwrite,
- emptyList(),
- indexChangesWithOverwrite,
- identifier,
- watermark,
- logOffsets,
- properties,
- CommitKind.OVERWRITE,
- mustConflictCheck(),
- null);
+ return new CommitChanges(changesWithOverwrite, emptyList(),
indexChangesWithOverwrite);
}
@VisibleForTesting
@@ -1547,4 +1542,31 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
return false;
}
}
+
+ private static ChangesProvider provider(
+ List<ManifestEntry> tableFiles,
+ List<ManifestEntry> changelogFiles,
+ List<IndexManifestEntry> indexFiles) {
+ return s -> new CommitChanges(tableFiles, changelogFiles, indexFiles);
+ }
+
+ private interface ChangesProvider {
+ CommitChanges provide(@Nullable Snapshot latestSnapshot);
+ }
+
+ private static class CommitChanges {
+
+ private final List<ManifestEntry> tableFiles;
+ private final List<ManifestEntry> changelogFiles;
+ private final List<IndexManifestEntry> indexFiles;
+
+ private CommitChanges(
+ List<ManifestEntry> tableFiles,
+ List<ManifestEntry> changelogFiles,
+ List<IndexManifestEntry> indexFiles) {
+ this.tableFiles = tableFiles;
+ this.changelogFiles = changelogFiles;
+ this.indexFiles = indexFiles;
+ }
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
index dd356e77fd..2470fdf749 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.append.AppendCompactTask;
import org.apache.paimon.bucket.DefaultBucketFunction;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
@@ -37,6 +38,8 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.BundleRecords;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.ManifestFileMeta;
+import org.apache.paimon.operation.BaseAppendFileStoreWrite;
+import org.apache.paimon.operation.FileStoreWrite;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Equal;
@@ -57,6 +60,7 @@ import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.ScanMode;
@@ -93,6 +97,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -116,6 +121,92 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/** Tests for {@link AppendOnlyFileStoreTable}. */
public class AppendOnlySimpleTableTest extends SimpleTableTestBase {
+ @Test
+ public void testOverwriteNeverFail() throws Exception {
+ FileStoreTable table = createFileStoreTable();
+
+ Runnable writeRecord =
+ () -> {
+ BatchWriteBuilder writeBuilder =
table.newBatchWriteBuilder();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit =
writeBuilder.newCommit()) {
+ write.write(rowData(1, 10, 100L));
+ commit.commit(write.prepareCommit());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ Runnable overwrite =
+ () -> {
+ BatchWriteBuilder writeBuilder =
table.newBatchWriteBuilder().withOverwrite();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit =
writeBuilder.newCommit()) {
+ write.write(rowData(1, 10, 100L));
+ commit.commit(write.prepareCommit());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ Runnable compact =
+ () -> {
+ BatchWriteBuilder writeBuilder =
table.newBatchWriteBuilder().withOverwrite();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit =
writeBuilder.newCommit()) {
+ List<DataSplit> splits =
+ (List)
table.newReadBuilder().newScan().plan().splits();
+ List<DataFileMeta> files =
+ splits.stream()
+ .flatMap(s -> s.dataFiles().stream())
+ .collect(Collectors.toList());
+ FileStoreWrite fileStoreWrite = ((TableWriteImpl)
write).getWrite();
+ CommitMessage commitMessage =
+ new
AppendCompactTask(splits.get(0).partition(), files)
+ .doCompact(
+ table,
(BaseAppendFileStoreWrite) fileStoreWrite);
+
commit.commit(Collections.singletonList(commitMessage));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ };
+
+ AtomicReference<Exception> exception = new AtomicReference<>();
+
+ Thread thread1 =
+ new Thread(
+ () -> {
+ for (int i = 0; i < 10; i++) {
+ try {
+ writeRecord.run();
+ overwrite.run();
+ } catch (Exception e) {
+ exception.set(e);
+ }
+ }
+ });
+
+ Thread thread2 =
+ new Thread(
+ () -> {
+ for (int i = 0; i < 10; i++) {
+ try {
+ writeRecord.run();
+ compact.run();
+ } catch (Exception ignored) {
+ }
+ }
+ });
+
+ thread1.start();
+ thread2.start();
+
+ thread1.join();
+ thread2.join();
+
+ assertThat(exception.get()).isNull();
+ }
+
@Test
public void testDiscardDuplicateFiles() throws Exception {
FileStoreTable table =