This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new bfdd516536 [core] Minor refactor conflicts check for deletion vectors
files (#6369)
bfdd516536 is described below
commit bfdd516536f9201781490f49eae130156c8dd0ed
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Oct 9 19:06:07 2025 +0800
[core] Minor refactor conflicts check for deletion vectors files (#6369)
---
.../java/org/apache/paimon/AbstractFileStore.java | 1 -
.../paimon/manifest/IndexManifestFileHandler.java | 42 ++++++-
.../paimon/operation/FileStoreCommitImpl.java | 136 +++++++++++----------
.../paimon/operation/FileStoreCommitTest.java | 8 +-
.../paimon/spark/sql/MergeIntoTableTestBase.scala | 50 ++++++++
5 files changed, 163 insertions(+), 74 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index bbeb0aa150..31918b84cf 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -299,7 +299,6 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
options.commitMaxRetryWait(),
options.commitStrictModeLastSafeSnapshot().orElse(null),
options.rowTrackingEnabled(),
- !schema.primaryKeys().isEmpty(),
options.deletionVectorsEnabled(),
newIndexFileHandler());
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
index bf334e4ad3..3ea24872bb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/IndexManifestFileHandler.java
@@ -19,6 +19,7 @@
package org.apache.paimon.manifest;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.DeletionVectorMeta;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.utils.Pair;
@@ -27,14 +28,18 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Set;
import java.util.stream.Collectors;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkState;
/** IndexManifestFile Handler. */
public class IndexManifestFileHandler {
@@ -115,15 +120,48 @@ public class IndexManifestFileHandler {
public List<IndexManifestEntry> combine(
List<IndexManifestEntry> prevIndexFiles,
List<IndexManifestEntry> newIndexFiles) {
Map<String, IndexManifestEntry> indexEntries = new HashMap<>();
+ Set<String> dvDataFiles = new HashSet<>();
for (IndexManifestEntry entry : prevIndexFiles) {
indexEntries.put(entry.indexFile().fileName(), entry);
+ LinkedHashMap<String, DeletionVectorMeta> dvRanges =
entry.indexFile().dvRanges();
+ if (dvRanges != null) {
+ dvDataFiles.addAll(dvRanges.keySet());
+ }
}
for (IndexManifestEntry entry : newIndexFiles) {
+ String fileName = entry.indexFile().fileName();
+ LinkedHashMap<String, DeletionVectorMeta> dvRanges =
entry.indexFile().dvRanges();
if (entry.kind() == FileKind.ADD) {
- indexEntries.put(entry.indexFile().fileName(), entry);
+ checkState(
+ !indexEntries.containsKey(fileName),
+ "Trying to add file %s which is already added.",
+ fileName);
+ if (dvRanges != null) {
+ for (String dataFile : dvRanges.keySet()) {
+ checkState(
+ !dvDataFiles.contains(dataFile),
+ "Trying to add dv for data file %s which
is already added.",
+ dataFile);
+ dvDataFiles.add(dataFile);
+ }
+ }
+ indexEntries.put(fileName, entry);
} else {
- indexEntries.remove(entry.indexFile().fileName());
+ checkState(
+ indexEntries.containsKey(fileName),
+ "Trying to delete file %s which is not exists.",
+ fileName);
+ if (dvRanges != null) {
+ for (String dataFile : dvRanges.keySet()) {
+ checkState(
+ dvDataFiles.contains(dataFile),
+ "Trying to delete dv for data file %s
which is not exists.",
+ dataFile);
+ dvDataFiles.remove(dataFile);
+ }
+ }
+ indexEntries.remove(fileName);
}
}
return new ArrayList<>(indexEntries.values());
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index e3e6140608..97e67ec5cc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -154,7 +154,6 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
@Nullable private Long strictModeLastSafeSnapshot;
private final InternalRowPartitionComputer partitionComputer;
private final boolean rowTrackingEnabled;
- private final boolean isPkTable;
private final boolean deletionVectorsEnabled;
private final IndexFileHandler indexFileHandler;
@@ -194,7 +193,6 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
long commitMaxRetryWait,
@Nullable Long strictModeLastSafeSnapshot,
boolean rowTrackingEnabled,
- boolean isPkTable,
boolean deletionVectorsEnabled,
IndexFileHandler indexFileHandler) {
this.snapshotCommit = snapshotCommit;
@@ -240,7 +238,6 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
this.statsFileHandler = statsFileHandler;
this.bucketMode = bucketMode;
this.rowTrackingEnabled = rowTrackingEnabled;
- this.isPkTable = isPkTable;
this.deletionVectorsEnabled = deletionVectorsEnabled;
this.indexFileHandler = indexFileHandler;
}
@@ -728,23 +725,23 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
.forEach(m -> appendChangelog.add(makeEntry(FileKind.ADD,
commitMessage, m)));
commitMessage
.newFilesIncrement()
- .newIndexFiles()
+ .deletedIndexFiles()
.forEach(
m ->
appendIndexFiles.add(
new IndexManifestEntry(
- FileKind.ADD,
+ FileKind.DELETE,
commitMessage.partition(),
commitMessage.bucket(),
m)));
commitMessage
.newFilesIncrement()
- .deletedIndexFiles()
+ .newIndexFiles()
.forEach(
m ->
appendIndexFiles.add(
new IndexManifestEntry(
- FileKind.DELETE,
+ FileKind.ADD,
commitMessage.partition(),
commitMessage.bucket(),
m)));
@@ -766,23 +763,23 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
.forEach(m -> compactChangelog.add(makeEntry(FileKind.ADD,
commitMessage, m)));
commitMessage
.compactIncrement()
- .newIndexFiles()
+ .deletedIndexFiles()
.forEach(
m ->
compactIndexFiles.add(
new IndexManifestEntry(
- FileKind.ADD,
+ FileKind.DELETE,
commitMessage.partition(),
commitMessage.bucket(),
m)));
commitMessage
.compactIncrement()
- .deletedIndexFiles()
+ .newIndexFiles()
.forEach(
m ->
compactIndexFiles.add(
new IndexManifestEntry(
- FileKind.DELETE,
+ FileKind.ADD,
commitMessage.partition(),
commitMessage.bucket(),
m)));
@@ -1419,7 +1416,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
List<IndexManifestEntry> deltaIndexEntries,
CommitKind commitKind) {
String baseCommitUser = snapshot.commitUser();
- if (checkForDeletionVector(commitKind)) {
+ if (checkForDeletionVector()) {
// Enrich dvName in fileEntry to checker for base ADD dv and delta
DELETE dv.
// For example:
// If the base file is <ADD baseFile1, ADD dv1>,
@@ -1443,52 +1440,72 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
List<SimpleFileEntry> allEntries = new ArrayList<>(baseEntries);
allEntries.addAll(deltaEntries);
- if (commitKind != CommitKind.OVERWRITE) {
- // total buckets within the same partition should remain the same
- Map<BinaryRow, Integer> totalBuckets = new HashMap<>();
- for (SimpleFileEntry entry : allEntries) {
- if (entry.totalBuckets() <= 0) {
- continue;
- }
-
- if (!totalBuckets.containsKey(entry.partition())) {
- totalBuckets.put(entry.partition(), entry.totalBuckets());
- continue;
- }
-
- int old = totalBuckets.get(entry.partition());
- if (old == entry.totalBuckets()) {
- continue;
- }
-
- Pair<RuntimeException, RuntimeException> conflictException =
- createConflictException(
- "Total buckets of partition "
- + entry.partition()
- + " changed from "
- + old
- + " to "
- + entry.totalBuckets()
- + " without overwrite. Give up
committing.",
- baseCommitUser,
- baseEntries,
- deltaEntries,
- null);
- LOG.warn("", conflictException.getLeft());
- throw conflictException.getRight();
- }
- }
+ checkBucketKeepSame(baseEntries, deltaEntries, commitKind, allEntries,
baseCommitUser);
+ Function<Throwable, RuntimeException> conflictException =
+ conflictException(baseCommitUser, baseEntries, deltaEntries);
Collection<SimpleFileEntry> mergedEntries;
try {
// merge manifest entries and also check if the files we want to
delete are still there
mergedEntries = FileEntry.mergeEntries(allEntries);
} catch (Throwable e) {
- throw conflictException(commitUser, baseEntries,
deltaEntries).apply(e);
+ throw conflictException.apply(e);
+ }
+
+ checkNoDeleteInMergedEntries(mergedEntries, conflictException);
+ checkKeyRangeNoConflicts(baseEntries, deltaEntries, mergedEntries,
baseCommitUser);
+ }
+
+ private void checkBucketKeepSame(
+ List<SimpleFileEntry> baseEntries,
+ List<SimpleFileEntry> deltaEntries,
+ CommitKind commitKind,
+ List<SimpleFileEntry> allEntries,
+ String baseCommitUser) {
+ if (commitKind == CommitKind.OVERWRITE) {
+ return;
}
- assertNoDelete(mergedEntries, conflictException(commitUser,
baseEntries, deltaEntries));
+ // total buckets within the same partition should remain the same
+ Map<BinaryRow, Integer> totalBuckets = new HashMap<>();
+ for (SimpleFileEntry entry : allEntries) {
+ if (entry.totalBuckets() <= 0) {
+ continue;
+ }
+
+ if (!totalBuckets.containsKey(entry.partition())) {
+ totalBuckets.put(entry.partition(), entry.totalBuckets());
+ continue;
+ }
+
+ int old = totalBuckets.get(entry.partition());
+ if (old == entry.totalBuckets()) {
+ continue;
+ }
+
+ Pair<RuntimeException, RuntimeException> conflictException =
+ createConflictException(
+ "Total buckets of partition "
+ + entry.partition()
+ + " changed from "
+ + old
+ + " to "
+ + entry.totalBuckets()
+ + " without overwrite. Give up
committing.",
+ baseCommitUser,
+ baseEntries,
+ deltaEntries,
+ null);
+ LOG.warn("", conflictException.getLeft());
+ throw conflictException.getRight();
+ }
+ }
+ private void checkKeyRangeNoConflicts(
+ List<SimpleFileEntry> baseEntries,
+ List<SimpleFileEntry> deltaEntries,
+ Collection<SimpleFileEntry> mergedEntries,
+ String baseCommitUser) {
// fast exit for file store without keys
if (keyComparator == null) {
return;
@@ -1548,26 +1565,11 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
};
}
- private boolean checkForDeletionVector(CommitKind commitKind) {
- if (!deletionVectorsEnabled) {
- return false;
- }
-
- // todo: Add them once contains DELETE type.
- // PK table's compact dv index only contains ADD type, skip conflict
detection.
- if (isPkTable && commitKind == CommitKind.COMPACT) {
- return false;
- }
-
- // Non-PK table's hash fixed bucket mode only contains ADD type, skip
conflict detection.
- if (!isPkTable && bucketMode.equals(BucketMode.HASH_FIXED)) {
- return false;
- }
-
- return true;
+ private boolean checkForDeletionVector() {
+ return deletionVectorsEnabled &&
bucketMode.equals(BucketMode.BUCKET_UNAWARE);
}
- private void assertNoDelete(
+ private void checkNoDeleteInMergedEntries(
Collection<SimpleFileEntry> mergedEntries,
Function<Throwable, RuntimeException> exceptionFunction) {
try {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 505417a412..c4bcc74f56 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -912,13 +912,13 @@ public class FileStoreCommitTest {
assertThat(dvs.get("f2").isDeleted(4)).isTrue();
// commit 2
- CommitMessage commitMessage3 =
- store.writeDVIndexFiles(
- partition, 0, Collections.singletonMap("f2",
Arrays.asList(3)));
List<IndexFileMeta> deleted =
new
ArrayList<>(commitMessage1.newFilesIncrement().newIndexFiles());
deleted.addAll(commitMessage2.newFilesIncrement().newIndexFiles());
- CommitMessage commitMessage4 = store.removeIndexFiles(partition, 0,
deleted);
+ CommitMessage commitMessage3 = store.removeIndexFiles(partition, 0,
deleted);
+ CommitMessageImpl commitMessage4 =
+ store.writeDVIndexFiles(
+ partition, 0, Collections.singletonMap("f2",
Arrays.asList(3)));
store.commit(commitMessage3, commitMessage4);
// assert 2
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
index bfbadc7624..0beadce6a3 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
@@ -23,6 +23,8 @@ import org.apache.paimon.spark.{PaimonAppendTable,
PaimonPrimaryKeyTable, Paimon
import org.apache.spark.sql.Row
+import java.util.concurrent.Executors
+
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt
@@ -805,4 +807,52 @@ trait MergeIntoAppendTableTest extends PaimonSparkTestBase
with PaimonAppendTabl
}
}
}
+
+ test("Paimon MergeInto: concurrent two merge") {
+ for (dvEnabled <- Seq("true", "false")) {
+ withTable("s", "t") {
+ sql("CREATE TABLE s (id INT, b INT, c INT)")
+ sql(
+ "INSERT INTO s VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4),
(5, 5, 5), (6, 6, 6), (7, 7, 7), (8, 8, 8), (9, 9, 9)")
+
+ sql(
+ s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES
('deletion-vectors.enabled' = '$dvEnabled')")
+ sql(
+ "INSERT INTO t VALUES (1, 1, 1), (2, 2, 2), (3, 3, 3), (4, 4, 4),
(5, 5, 5), (6, 6, 6), (7, 7, 7), (8, 8, 8), (9, 9, 9)")
+
+ def doMergeInto(): Unit = {
+ for (i <- 1 to 9) {
+ try {
+ sql(s"""
+ |MERGE INTO t
+ |USING (SELECT * FROM s WHERE id = $i)
+ |ON t.id = s.id
+ |WHEN MATCHED THEN
+ |UPDATE SET t.id = s.id, t.b = s.b + t.b, t.c = s.c + t.c
+ |""".stripMargin)
+ } catch {
+ case a: Throwable =>
+ assert(
+ a.getMessage.contains("Conflicts during commits") ||
a.getMessage.contains(
+ "Missing file"))
+ }
+ checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(9)))
+ }
+ }
+
+ val executor = Executors.newFixedThreadPool(2)
+ val runnable = new Runnable {
+ override def run(): Unit = doMergeInto()
+ }
+
+ val future1 = executor.submit(runnable)
+ val future2 = executor.submit(runnable)
+
+ future1.get()
+ future2.get()
+
+ executor.shutdown()
+ }
+ }
+ }
}