This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 60454a1b19 [core] Add dv conflict detection during commit (#6303)
60454a1b19 is described below
commit 60454a1b190513f4be42554f0f6acdf16bb9e611
Author: Zouxxyy <[email protected]>
AuthorDate: Thu Oct 9 12:58:30 2025 +0800
[core] Add dv conflict detection during commit (#6303)
---
.../java/org/apache/paimon/AbstractFileStore.java | 5 +-
.../java/org/apache/paimon/manifest/FileEntry.java | 6 +-
.../apache/paimon/manifest/SimpleFileEntry.java | 20 ++
.../paimon/manifest/SimpleFileEntryWithDV.java | 124 ++++++++
.../paimon/operation/FileStoreCommitImpl.java | 154 +++++++---
.../apache/paimon/utils/ConflictDeletionUtils.java | 149 +++++++++
.../org/apache/paimon/TestAppendFileStore.java | 35 +++
.../deletionvectors/BucketedDvMaintainerTest.java | 87 ++++--
.../paimon/operation/FileStoreCommitTest.java | 26 +-
.../paimon/utils/ConflictDeletionUtilsTest.java | 342 +++++++++++++++++++++
.../commands/DeleteFromPaimonTableCommand.scala | 3 +-
.../spark/commands/MergeIntoPaimonTable.scala | 4 +-
.../paimon/spark/commands/PaimonSparkWriter.scala | 7 +-
.../spark/commands/UpdatePaimonTableCommand.scala | 3 +-
.../paimon/spark/sql/MergeIntoTableTestBase.scala | 71 +++--
.../paimon/spark/sql/UpdateTableTestBase.scala | 39 +++
16 files changed, 955 insertions(+), 120 deletions(-)
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index f4a77f877d..bbeb0aa150 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -298,7 +298,10 @@ abstract class AbstractFileStore<T> implements
FileStore<T> {
options.commitMinRetryWait(),
options.commitMaxRetryWait(),
options.commitStrictModeLastSafeSnapshot().orElse(null),
- options.rowTrackingEnabled());
+ options.rowTrackingEnabled(),
+ !schema.primaryKeys().isEmpty(),
+ options.deletionVectorsEnabled(),
+ newIndexFileHandler());
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index 52b029563e..b400258b84 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -22,7 +22,6 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Filter;
-import org.apache.paimon.utils.Preconditions;
import javax.annotation.Nullable;
@@ -40,6 +39,7 @@ import java.util.stream.Collectors;
import static
org.apache.paimon.utils.ManifestReadThreadPool.randomlyExecuteSequentialReturn;
import static
org.apache.paimon.utils.ManifestReadThreadPool.sequentialBatchedExecute;
+import static org.apache.paimon.utils.Preconditions.checkState;
/** Entry representing a file. */
public interface FileEntry {
@@ -77,7 +77,7 @@ public interface FileEntry {
public final int level;
public final String fileName;
public final List<String> extraFiles;
- @Nullable private final byte[] embeddedIndex;
+ @Nullable public final byte[] embeddedIndex;
@Nullable public final String externalPath;
/* Cache the hash code for the string */
@@ -190,7 +190,7 @@ public interface FileEntry {
Identifier identifier = entry.identifier();
switch (entry.kind()) {
case ADD:
- Preconditions.checkState(
+ checkState(
!map.containsKey(identifier),
"Trying to add file %s which is already added.",
identifier);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
index 14ae43c349..e0da3c8d53 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java
@@ -81,6 +81,21 @@ public class SimpleFileEntry implements FileEntry {
entry.externalPath());
}
+ public SimpleFileEntry toDelete() {
+ return new SimpleFileEntry(
+ FileKind.DELETE,
+ partition,
+ bucket,
+ totalBuckets,
+ level,
+ fileName,
+ extraFiles,
+ embeddedIndex,
+ minKey,
+ maxKey,
+ externalPath);
+ }
+
public static List<SimpleFileEntry> from(List<ManifestEntry> entries) {
return
entries.stream().map(SimpleFileEntry::from).collect(Collectors.toList());
}
@@ -115,6 +130,11 @@ public class SimpleFileEntry implements FileEntry {
return fileName;
}
+ @Nullable
+ public byte[] embeddedIndex() {
+ return embeddedIndex;
+ }
+
@Nullable
@Override
public String externalPath() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java
new file mode 100644
index 0000000000..75d73f345f
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntryWithDV.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.manifest;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/** A {@link FileEntry} contains {@link SimpleFileEntry} and dv file name. */
+public class SimpleFileEntryWithDV extends SimpleFileEntry {
+
+ @Nullable private final String dvFileName;
+
+ public SimpleFileEntryWithDV(SimpleFileEntry entry, @Nullable String
dvFileName) {
+ super(
+ entry.kind(),
+ entry.partition(),
+ entry.bucket(),
+ entry.totalBuckets(),
+ entry.level(),
+ entry.fileName(),
+ entry.extraFiles(),
+ entry.embeddedIndex(),
+ entry.minKey(),
+ entry.maxKey(),
+ entry.externalPath());
+ this.dvFileName = dvFileName;
+ }
+
+ public Identifier identifier() {
+ return new IdentifierWithDv(super.identifier(), dvFileName);
+ }
+
+ @Nullable
+ public String dvFileName() {
+ return dvFileName;
+ }
+
+ public SimpleFileEntry toDelete() {
+ return new SimpleFileEntryWithDV(super.toDelete(), dvFileName);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ SimpleFileEntryWithDV that = (SimpleFileEntryWithDV) o;
+ return Objects.equals(dvFileName, that.dvFileName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), dvFileName);
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() + ", {dvFileName=" + dvFileName + '}';
+ }
+
+ /**
+ * The same {@link Identifier} indicates that the {@link ManifestEntry}
refers to the same data
+ * file.
+ */
+ static class IdentifierWithDv extends Identifier {
+
+ private final String dvFileName;
+
+ public IdentifierWithDv(Identifier identifier, String dvFileName) {
+ super(
+ identifier.partition,
+ identifier.bucket,
+ identifier.level,
+ identifier.fileName,
+ identifier.extraFiles,
+ identifier.embeddedIndex,
+ identifier.externalPath);
+ this.dvFileName = dvFileName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ IdentifierWithDv that = (IdentifierWithDv) o;
+ return Objects.equals(dvFileName, that.dvFileName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), dvFileName);
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index f41de87aa9..e3e6140608 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -26,6 +26,7 @@ import org.apache.paimon.catalog.SnapshotCommit;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.FileEntry;
@@ -61,7 +62,6 @@ import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.Pair;
-import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
@@ -70,7 +70,6 @@ import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
@@ -94,8 +93,11 @@ import static
org.apache.paimon.manifest.ManifestEntry.recordCountAdd;
import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
import static
org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
import static
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
+import static
org.apache.paimon.utils.ConflictDeletionUtils.buildBaseEntriesWithDV;
+import static
org.apache.paimon.utils.ConflictDeletionUtils.buildDeltaEntriesWithDV;
import static
org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkState;
/**
* Default implementation of {@link FileStoreCommit}.
@@ -152,6 +154,9 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
@Nullable private Long strictModeLastSafeSnapshot;
private final InternalRowPartitionComputer partitionComputer;
private final boolean rowTrackingEnabled;
+ private final boolean isPkTable;
+ private final boolean deletionVectorsEnabled;
+ private final IndexFileHandler indexFileHandler;
private boolean ignoreEmptyCommit;
private CommitMetrics commitMetrics;
@@ -188,7 +193,10 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
long commitMinRetryWait,
long commitMaxRetryWait,
@Nullable Long strictModeLastSafeSnapshot,
- boolean rowTrackingEnabled) {
+ boolean rowTrackingEnabled,
+ boolean isPkTable,
+ boolean deletionVectorsEnabled,
+ IndexFileHandler indexFileHandler) {
this.snapshotCommit = snapshotCommit;
this.fileIO = fileIO;
this.schemaManager = schemaManager;
@@ -232,6 +240,9 @@ public class FileStoreCommitImpl implements FileStoreCommit
{
this.statsFileHandler = statsFileHandler;
this.bucketMode = bucketMode;
this.rowTrackingEnabled = rowTrackingEnabled;
+ this.isPkTable = isPkTable;
+ this.deletionVectorsEnabled = deletionVectorsEnabled;
+ this.indexFileHandler = indexFileHandler;
}
@Override
@@ -334,11 +345,16 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
// so we need to contain all changes
baseEntries.addAll(
readAllEntriesFromChangedPartitions(
- latestSnapshot, appendTableFiles,
compactTableFiles));
+ latestSnapshot,
+ changedPartitions(
+ appendTableFiles,
+ compactTableFiles,
+ appendIndexFiles)));
noConflictsOrFail(
- latestSnapshot.commitUser(),
+ latestSnapshot,
baseEntries,
appendSimpleEntries,
+ appendIndexFiles,
commitKind);
safeLatestSnapshotId = latestSnapshot.id();
}
@@ -371,9 +387,10 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
if (safeLatestSnapshotId != null) {
baseEntries.addAll(appendSimpleEntries);
noConflictsOrFail(
- latestSnapshot.commitUser(),
+ latestSnapshot,
baseEntries,
SimpleFileEntry.from(compactTableFiles),
+ compactIndexFiles,
CommitKind.COMPACT);
// assume this compact commit follows just after the
append commit created above
safeLatestSnapshotId += 1;
@@ -1004,10 +1021,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
// latestSnapshotId is different from the snapshot id we've
checked for conflicts,
// so we have to check again
List<BinaryRow> changedPartitions =
- deltaFiles.stream()
- .map(ManifestEntry::partition)
- .distinct()
- .collect(Collectors.toList());
+ changedPartitions(deltaFiles, Collections.emptyList(),
indexFiles);
if (retryResult != null && retryResult.latestSnapshot != null) {
baseDataFiles = new ArrayList<>(retryResult.baseDataFiles);
List<SimpleFileEntry> incremental =
@@ -1022,9 +1036,10 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
readAllEntriesFromChangedPartitions(latestSnapshot,
changedPartitions);
}
noConflictsOrFail(
- latestSnapshot.commitUser(),
+ latestSnapshot,
baseDataFiles,
SimpleFileEntry.from(deltaFiles),
+ indexFiles,
commitKind);
}
@@ -1366,16 +1381,23 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
return entries;
}
- @SafeVarargs
- private final List<SimpleFileEntry> readAllEntriesFromChangedPartitions(
- Snapshot snapshot, List<ManifestEntry>... changes) {
- List<BinaryRow> changedPartitions =
- Arrays.stream(changes)
- .flatMap(Collection::stream)
- .map(ManifestEntry::partition)
- .distinct()
- .collect(Collectors.toList());
- return readAllEntriesFromChangedPartitions(snapshot,
changedPartitions);
+ private List<BinaryRow> changedPartitions(
+ List<ManifestEntry> appendTableFiles,
+ List<ManifestEntry> compactTableFiles,
+ List<IndexManifestEntry> appendIndexFiles) {
+ Set<BinaryRow> changedPartitions = new HashSet<>();
+ for (ManifestEntry appendTableFile : appendTableFiles) {
+ changedPartitions.add(appendTableFile.partition());
+ }
+ for (ManifestEntry compactTableFile : compactTableFiles) {
+ changedPartitions.add(compactTableFile.partition());
+ }
+ for (IndexManifestEntry appendIndexFile : appendIndexFiles) {
+ if
(appendIndexFile.indexFile().indexType().equals(DELETION_VECTORS_INDEX)) {
+ changedPartitions.add(appendIndexFile.partition());
+ }
+ }
+ return new ArrayList<>(changedPartitions);
}
private List<SimpleFileEntry> readAllEntriesFromChangedPartitions(
@@ -1391,12 +1413,35 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
private void noConflictsOrFail(
- String baseCommitUser,
+ Snapshot snapshot,
List<SimpleFileEntry> baseEntries,
- List<SimpleFileEntry> changes,
+ List<SimpleFileEntry> deltaEntries,
+ List<IndexManifestEntry> deltaIndexEntries,
CommitKind commitKind) {
+ String baseCommitUser = snapshot.commitUser();
+ if (checkForDeletionVector(commitKind)) {
+ // Enrich dvName in fileEntry to checker for base ADD dv and delta
DELETE dv.
+ // For example:
+ // If the base file is <ADD baseFile1, ADD dv1>,
+ // then the delta file must be <DELETE deltaFile1, DELETE dv1>;
and vice versa,
+ // If the delta file is <DELETE deltaFile2, DELETE dv2>,
+ // then the base file must be <ADD baseFile2, ADD dv2>.
+ try {
+ baseEntries =
+ buildBaseEntriesWithDV(
+ baseEntries,
+ snapshot.indexManifest() == null
+ ? Collections.emptyList()
+ :
indexFileHandler.readManifest(snapshot.indexManifest()));
+ deltaEntries =
+ buildDeltaEntriesWithDV(baseEntries, deltaEntries,
deltaIndexEntries);
+ } catch (Throwable e) {
+ throw conflictException(commitUser, baseEntries,
deltaEntries).apply(e);
+ }
+ }
+
List<SimpleFileEntry> allEntries = new ArrayList<>(baseEntries);
- allEntries.addAll(changes);
+ allEntries.addAll(deltaEntries);
if (commitKind != CommitKind.OVERWRITE) {
// total buckets within the same partition should remain the same
@@ -1427,37 +1472,22 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
+ " without overwrite. Give up
committing.",
baseCommitUser,
baseEntries,
- changes,
+ deltaEntries,
null);
LOG.warn("", conflictException.getLeft());
throw conflictException.getRight();
}
}
- Function<Throwable, RuntimeException> exceptionFunction =
- e -> {
- Pair<RuntimeException, RuntimeException> conflictException
=
- createConflictException(
- "File deletion conflicts detected! Give up
committing.",
- baseCommitUser,
- baseEntries,
- changes,
- e);
- LOG.warn("", conflictException.getLeft());
- return conflictException.getRight();
- };
-
Collection<SimpleFileEntry> mergedEntries;
try {
// merge manifest entries and also check if the files we want to
delete are still there
mergedEntries = FileEntry.mergeEntries(allEntries);
} catch (Throwable e) {
- throw exceptionFunction.apply(e);
+ throw conflictException(commitUser, baseEntries,
deltaEntries).apply(e);
}
- assertNoDelete(mergedEntries, exceptionFunction);
-
- // TODO check for deletion vectors
+ assertNoDelete(mergedEntries, conflictException(commitUser,
baseEntries, deltaEntries));
// fast exit for file store without keys
if (keyComparator == null) {
@@ -1491,7 +1521,7 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
+
b.identifier().toString(pathFactory),
baseCommitUser,
baseEntries,
- changes,
+ deltaEntries,
null);
LOG.warn("", conflictException.getLeft());
@@ -1501,12 +1531,48 @@ public class FileStoreCommitImpl implements
FileStoreCommit {
}
}
+ private Function<Throwable, RuntimeException> conflictException(
+ String baseCommitUser,
+ List<SimpleFileEntry> baseEntries,
+ List<SimpleFileEntry> deltaEntries) {
+ return e -> {
+ Pair<RuntimeException, RuntimeException> conflictException =
+ createConflictException(
+ "File deletion conflicts detected! Give up
committing.",
+ baseCommitUser,
+ baseEntries,
+ deltaEntries,
+ e);
+ LOG.warn("", conflictException.getLeft());
+ return conflictException.getRight();
+ };
+ }
+
+ private boolean checkForDeletionVector(CommitKind commitKind) {
+ if (!deletionVectorsEnabled) {
+ return false;
+ }
+
+ // todo: Add them once contains DELETE type.
+ // PK table's compact dv index only contains ADD type, skip conflict
detection.
+ if (isPkTable && commitKind == CommitKind.COMPACT) {
+ return false;
+ }
+
+ // Non-PK table's hash fixed bucket mode only contains ADD type, skip
conflict detection.
+ if (!isPkTable && bucketMode.equals(BucketMode.HASH_FIXED)) {
+ return false;
+ }
+
+ return true;
+ }
+
private void assertNoDelete(
Collection<SimpleFileEntry> mergedEntries,
Function<Throwable, RuntimeException> exceptionFunction) {
try {
for (SimpleFileEntry entry : mergedEntries) {
- Preconditions.checkState(
+ checkState(
entry.kind() != FileKind.DELETE,
"Trying to delete file %s for table %s which is not
previously added.",
entry.fileName(),
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/ConflictDeletionUtils.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ConflictDeletionUtils.java
new file mode 100644
index 0000000000..00942ea048
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/ConflictDeletionUtils.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.index.DeletionVectorMeta;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.SimpleFileEntry;
+import org.apache.paimon.manifest.SimpleFileEntryWithDV;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/** Utils for conflict deletion. */
+public class ConflictDeletionUtils {
+
+ public static List<SimpleFileEntry> buildBaseEntriesWithDV(
+ List<SimpleFileEntry> baseEntries, List<IndexManifestEntry>
baseIndexEntries) {
+ if (baseEntries.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ Map<String, String> fileNameToDVFileName = new HashMap<>();
+ for (IndexManifestEntry indexManifestEntry : baseIndexEntries) {
+ // Should not attach DELETE type dv index for base file.
+ if (!indexManifestEntry.kind().equals(FileKind.DELETE)) {
+ IndexFileMeta indexFile = indexManifestEntry.indexFile();
+ if (indexFile.dvRanges() != null) {
+ for (DeletionVectorMeta value :
indexFile.dvRanges().values()) {
+ checkState(
+
!fileNameToDVFileName.containsKey(value.dataFileName()),
+ "One file should correspond to only one dv
entry.");
+ fileNameToDVFileName.put(value.dataFileName(),
indexFile.fileName());
+ }
+ }
+ }
+ }
+
+ // Attach dv name to file entries.
+ List<SimpleFileEntry> entriesWithDV = new
ArrayList<>(baseEntries.size());
+ for (SimpleFileEntry fileEntry : baseEntries) {
+ entriesWithDV.add(
+ new SimpleFileEntryWithDV(
+ fileEntry,
fileNameToDVFileName.get(fileEntry.fileName())));
+ }
+ return entriesWithDV;
+ }
+
+ public static List<SimpleFileEntry> buildDeltaEntriesWithDV(
+ List<SimpleFileEntry> baseEntries,
+ List<SimpleFileEntry> deltaEntries,
+ List<IndexManifestEntry> deltaIndexEntries) {
+ if (deltaEntries.isEmpty() && deltaIndexEntries.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<SimpleFileEntry> entriesWithDV = new
ArrayList<>(deltaEntries.size());
+
+ // One file may correspond to more than one dv entries, for example,
delete the old dv, and
+ // create a new one.
+ Map<String, List<IndexManifestEntry>> fileNameToDVEntry = new
HashMap<>();
+ for (IndexManifestEntry deltaIndexEntry : deltaIndexEntries) {
+ if (deltaIndexEntry.indexFile().dvRanges() != null) {
+ for (DeletionVectorMeta meta :
deltaIndexEntry.indexFile().dvRanges().values()) {
+ fileNameToDVEntry.putIfAbsent(meta.dataFileName(), new
ArrayList<>());
+
fileNameToDVEntry.get(meta.dataFileName()).add(deltaIndexEntry);
+ }
+ }
+ }
+
+ Set<String> fileNotInDeltaEntries = new
HashSet<>(fileNameToDVEntry.keySet());
+ // 1. Attach dv name to delta file entries.
+ for (SimpleFileEntry fileEntry : deltaEntries) {
+ if (fileNameToDVEntry.containsKey(fileEntry.fileName())) {
+ List<IndexManifestEntry> dvs =
fileNameToDVEntry.get(fileEntry.fileName());
+ checkState(dvs.size() == 1, "Delta entry only can have one dv
file");
+ entriesWithDV.add(
+ new SimpleFileEntryWithDV(fileEntry,
dvs.get(0).indexFile().fileName()));
+ fileNotInDeltaEntries.remove(fileEntry.fileName());
+ } else {
+ entriesWithDV.add(new SimpleFileEntryWithDV(fileEntry, null));
+ }
+ }
+
+ // 2. For file not in delta entries, build entry with dv with
baseEntries.
+ if (!fileNotInDeltaEntries.isEmpty()) {
+ Map<String, SimpleFileEntry> fileNameToFileEntry = new HashMap<>();
+ for (SimpleFileEntry baseEntry : baseEntries) {
+ if (baseEntry.kind().equals(FileKind.ADD)) {
+ fileNameToFileEntry.put(baseEntry.fileName(), baseEntry);
+ }
+ }
+
+ for (String fileName : fileNotInDeltaEntries) {
+ SimpleFileEntryWithDV simpleFileEntry =
+ (SimpleFileEntryWithDV)
fileNameToFileEntry.get(fileName);
+ checkState(
+ simpleFileEntry != null,
+ String.format(
+ "Trying to create deletion vector on file %s
which is not previously added.",
+ fileName));
+ List<IndexManifestEntry> dvEntries =
fileNameToDVEntry.get(fileName);
+ // If dv entry's type id DELETE, add DELETE<f, dv>
+ // If dv entry's type id ADD, add ADD<f, dv>
+ for (IndexManifestEntry dvEntry : dvEntries) {
+ entriesWithDV.add(
+ new SimpleFileEntryWithDV(
+ dvEntry.kind().equals(FileKind.ADD)
+ ? simpleFileEntry
+ : simpleFileEntry.toDelete(),
+ dvEntry.indexFile().fileName()));
+ }
+
+ // If one file correspond to only one dv entry and the type is
ADD,
+ // we need to add a DELETE<f, null>.
+ // This happens when create a dv for a file that doesn't have
dv before.
+ if (dvEntries.size() == 1 &&
dvEntries.get(0).kind().equals(FileKind.ADD)) {
+ entriesWithDV.add(new
SimpleFileEntryWithDV(simpleFileEntry.toDelete(), null));
+ }
+ }
+ }
+
+ return entriesWithDV;
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
index 3ca33a8222..2754022dd7 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
@@ -29,6 +29,7 @@ import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.operation.FileStoreCommitImpl;
@@ -36,6 +37,7 @@ import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
@@ -43,6 +45,7 @@ import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.TraceableFileIO;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -142,6 +145,38 @@ public class TestAppendFileStore extends
AppendOnlyFileStore {
return factory.create(partition, bucket, indexFiles);
}
+ public CommitMessageImpl writeDataFiles(
+ BinaryRow partition, int bucket, List<String> dataFileNames)
throws IOException {
+ List<DataFileMeta> fileMetas = new ArrayList<>();
+ Path bucketPath = pathFactory().bucketPath(partition, bucket);
+ for (String dataFileName : dataFileNames) {
+ Path path = new Path(bucketPath, dataFileName);
+ fileIO.newOutputStream(path, false).close();
+ fileMetas.add(
+ DataFileMeta.forAppend(
+ path.getName(),
+ 10L,
+ 10L,
+ SimpleStats.EMPTY_STATS,
+ 0L,
+ 0L,
+ schema.id(),
+ Collections.emptyList(),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null));
+ }
+ return new CommitMessageImpl(
+ partition,
+ bucket,
+ options().bucket(),
+ new DataIncrement(fileMetas, Collections.emptyList(),
Collections.emptyList()),
+ CompactIncrement.emptyIncrement());
+ }
+
public CommitMessageImpl writeDVIndexFiles(
BinaryRow partition, int bucket, Map<String, List<Integer>>
dataFileToPositions) {
BucketedDvMaintainer dvMaintainer =
createOrRestoreDVMaintainer(partition, bucket);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
index 10c6f45200..018d55e933 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
@@ -23,16 +23,20 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.PrimaryKeyTableTestBase;
import org.apache.paimon.compact.CompactDeletionFile;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.FileIOUtils;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -40,6 +44,8 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -47,13 +53,22 @@ import java.util.Map;
import java.util.stream.Collectors;
import static java.util.Collections.emptyList;
-import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.assertj.core.api.Assertions.assertThat;
/** Test for {@link BucketedDvMaintainer}. */
public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase {
private IndexFileHandler fileHandler;
+ private final BinaryRow partition = BinaryRow.singleColumn(1);
+
+ @BeforeEach
+ public void setUp() throws Exception {
+ // write files
+ CommitMessageImpl commitMessage =
+ writeDataFiles(partition, 0, Arrays.asList("f1", "f2", "f3"));
+ BatchTableCommit commit = table.newBatchWriteBuilder().newCommit();
+ commit.commit(Collections.singletonList(commitMessage));
+ }
@ParameterizedTest
@ValueSource(booleans = {true, false})
@@ -61,7 +76,7 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
initIndexHandler(bitmap64);
BucketedDvMaintainer.Factory factory =
BucketedDvMaintainer.factory(fileHandler);
- BucketedDvMaintainer dvMaintainer = factory.create(EMPTY_ROW, 0,
emptyList());
+ BucketedDvMaintainer dvMaintainer = factory.create(partition, 0,
emptyList());
assertThat(dvMaintainer.bitmap64).isEqualTo(bitmap64);
dvMaintainer.notifyNewDeletion("f1", 1);
@@ -74,7 +89,7 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
IndexFileMeta file = dvMaintainer.writeDeletionVectorsIndex().get();
Map<String, DeletionVector> deletionVectors =
- fileHandler.readAllDeletionVectors(EMPTY_ROW, 0,
Collections.singletonList(file));
+ fileHandler.readAllDeletionVectors(partition, 0,
Collections.singletonList(file));
assertThat(deletionVectors.get("f1").isDeleted(1)).isTrue();
assertThat(deletionVectors.get("f1").isDeleted(2)).isFalse();
assertThat(deletionVectors.get("f2").isDeleted(1)).isFalse();
@@ -89,7 +104,7 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
BucketedDvMaintainer.Factory factory =
BucketedDvMaintainer.factory(fileHandler);
- BucketedDvMaintainer dvMaintainer = factory.create(EMPTY_ROW, 0, new
HashMap<>());
+ BucketedDvMaintainer dvMaintainer = factory.create(partition, 0, new
HashMap<>());
DeletionVector deletionVector1 = createDeletionVector(bitmap64);
deletionVector1.delete(1);
deletionVector1.delete(3);
@@ -100,7 +115,7 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
IndexFileMeta file = dvMaintainer.writeDeletionVectorsIndex().get();
CommitMessage commitMessage =
new CommitMessageImpl(
- EMPTY_ROW,
+ partition,
0,
1,
new DataIncrement(
@@ -115,8 +130,8 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
Snapshot latestSnapshot = table.snapshotManager().latestSnapshot();
List<IndexFileMeta> indexFiles =
- fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX,
EMPTY_ROW, 0);
- dvMaintainer = factory.create(EMPTY_ROW, 0, indexFiles);
+ fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX,
partition, 0);
+ dvMaintainer = factory.create(partition, 0, indexFiles);
DeletionVector deletionVector2 =
dvMaintainer.deletionVectorOf("f1").get();
assertThat(deletionVector2.isDeleted(1)).isTrue();
assertThat(deletionVector2.isDeleted(2)).isFalse();
@@ -127,7 +142,7 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
file = dvMaintainer.writeDeletionVectorsIndex().get();
commitMessage =
new CommitMessageImpl(
- EMPTY_ROW,
+ partition,
0,
1,
new DataIncrement(
@@ -141,8 +156,8 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
commit.commit(Collections.singletonList(commitMessage));
latestSnapshot = table.snapshotManager().latestSnapshot();
- indexFiles = fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX,
EMPTY_ROW, 0);
- dvMaintainer = factory.create(EMPTY_ROW, 0, indexFiles);
+ indexFiles = fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX,
partition, 0);
+ dvMaintainer = factory.create(partition, 0, indexFiles);
DeletionVector deletionVector3 =
dvMaintainer.deletionVectorOf("f1").get();
assertThat(deletionVector3.isDeleted(1)).isTrue();
assertThat(deletionVector3.isDeleted(2)).isTrue();
@@ -154,7 +169,7 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
initIndexHandler(bitmap64);
BucketedDvMaintainer.Factory factory =
BucketedDvMaintainer.factory(fileHandler);
- BucketedDvMaintainer dvMaintainer = factory.create(EMPTY_ROW, 0,
emptyList());
+ BucketedDvMaintainer dvMaintainer = factory.create(partition, 0,
emptyList());
File indexDir = new File(tempPath.toFile(), "/default.db/T/index");
@@ -195,7 +210,7 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
// write first kind dv
initIndexHandler(bitmap64);
BucketedDvMaintainer.Factory factory1 =
BucketedDvMaintainer.factory(fileHandler);
- BucketedDvMaintainer dvMaintainer1 = factory1.create(EMPTY_ROW, 0, new
HashMap<>());
+ BucketedDvMaintainer dvMaintainer1 = factory1.create(partition, 0, new
HashMap<>());
dvMaintainer1.notifyNewDeletion("f1", 1);
dvMaintainer1.notifyNewDeletion("f1", 3);
dvMaintainer1.notifyNewDeletion("f2", 1);
@@ -205,7 +220,7 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
IndexFileMeta file = dvMaintainer1.writeDeletionVectorsIndex().get();
CommitMessage commitMessage1 =
new CommitMessageImpl(
- EMPTY_ROW,
+ partition,
0,
1,
DataIncrement.emptyIncrement(),
@@ -223,8 +238,8 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
BucketedDvMaintainer.Factory factory2 =
BucketedDvMaintainer.factory(fileHandler);
List<IndexFileMeta> indexFiles =
fileHandler.scan(
- table.latestSnapshot().get(), DELETION_VECTORS_INDEX,
EMPTY_ROW, 0);
- BucketedDvMaintainer dvMaintainer2 = factory2.create(EMPTY_ROW, 0,
indexFiles);
+ table.latestSnapshot().get(), DELETION_VECTORS_INDEX,
partition, 0);
+ BucketedDvMaintainer dvMaintainer2 = factory2.create(partition, 0,
indexFiles);
dvMaintainer2.notifyNewDeletion("f1", 10);
dvMaintainer2.notifyNewDeletion("f3", 1);
dvMaintainer2.notifyNewDeletion("f3", 3);
@@ -242,7 +257,7 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
file = dvMaintainer2.writeDeletionVectorsIndex().get();
CommitMessage commitMessage2 =
new CommitMessageImpl(
- EMPTY_ROW,
+ partition,
0,
1,
DataIncrement.emptyIncrement(),
@@ -258,10 +273,10 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
// test read dv index file which contains two kinds of dv
Map<String, DeletionVector> readDvs =
fileHandler.readAllDeletionVectors(
- EMPTY_ROW,
+ partition,
0,
fileHandler.scan(
- table.latestSnapshot().get(),
"DELETION_VECTORS", EMPTY_ROW, 0));
+ table.latestSnapshot().get(),
"DELETION_VECTORS", partition, 0));
assertThat(readDvs.size()).isEqualTo(3);
assertThat(dvs.get("f1").getCardinality()).isEqualTo(3);
assertThat(dvs.get("f2").getCardinality()).isEqualTo(2);
@@ -293,7 +308,39 @@ public class BucketedDvMaintainerTest extends
PrimaryKeyTableTestBase {
.map(IndexManifestEntry::indexFile)
.collect(Collectors.toList());
Map<String, DeletionVector> deletionVectors =
- new HashMap<>(handler.readAllDeletionVectors(EMPTY_ROW, 0,
indexFiles));
- return factory.create(EMPTY_ROW, 0, deletionVectors);
+ new HashMap<>(handler.readAllDeletionVectors(partition, 0,
indexFiles));
+ return factory.create(partition, 0, deletionVectors);
+ }
+
+ private CommitMessageImpl writeDataFiles(
+ BinaryRow partition, int bucket, List<String> dataFileNames)
throws IOException {
+ List<DataFileMeta> fileMetas = new ArrayList<>();
+ Path bucketPath = table.store().pathFactory().bucketPath(partition,
bucket);
+ for (String dataFileName : dataFileNames) {
+ Path path = new Path(bucketPath, dataFileName);
+ table.fileIO().newOutputStream(path, false).close();
+ fileMetas.add(
+ DataFileMeta.forAppend(
+ path.getName(),
+ 10L,
+ 10L,
+ SimpleStats.EMPTY_STATS,
+ 0L,
+ 0L,
+ table.schema().id(),
+ Collections.emptyList(),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null));
+ }
+ return new CommitMessageImpl(
+ partition,
+ bucket,
+ null,
+ new DataIncrement(fileMetas, Collections.emptyList(),
Collections.emptyList()),
+ CompactIncrement.emptyIncrement());
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 35843e0c2f..505417a412 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -886,23 +886,25 @@ public class FileStoreCommitTest {
Map<String, String> options = new HashMap<>();
options.put(CoreOptions.DELETION_VECTOR_BITMAP64.key(),
String.valueOf(bitmap64));
TestAppendFileStore store =
TestAppendFileStore.createAppendStore(tempDir, options);
+ BinaryRow partition = gen.getPartition(gen.next());
+
+ // create files
+ CommitMessageImpl commitMessage0 =
+ store.writeDataFiles(partition, 0, Arrays.asList("f1", "f2"));
+ store.commit(commitMessage0);
// commit 1
CommitMessageImpl commitMessage1 =
store.writeDVIndexFiles(
- BinaryRow.EMPTY_ROW,
- 0,
- Collections.singletonMap("f1", Arrays.asList(1, 3)));
+ partition, 0, Collections.singletonMap("f1",
Arrays.asList(1, 3)));
CommitMessageImpl commitMessage2 =
store.writeDVIndexFiles(
- BinaryRow.EMPTY_ROW,
- 0,
- Collections.singletonMap("f2", Arrays.asList(2, 4)));
+ partition, 0, Collections.singletonMap("f2",
Arrays.asList(2, 4)));
store.commit(commitMessage1, commitMessage2);
// assert 1
- assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW,
0).size()).isEqualTo(2);
- BucketedDvMaintainer maintainer =
store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0);
+ assertThat(store.scanDVIndexFiles(partition, 0).size()).isEqualTo(2);
+ BucketedDvMaintainer maintainer =
store.createOrRestoreDVMaintainer(partition, 0);
Map<String, DeletionVector> dvs = maintainer.deletionVectors();
assertThat(dvs.size()).isEqualTo(2);
assertThat(dvs.get("f2").isDeleted(2)).isTrue();
@@ -912,16 +914,16 @@ public class FileStoreCommitTest {
// commit 2
CommitMessage commitMessage3 =
store.writeDVIndexFiles(
- BinaryRow.EMPTY_ROW, 0, Collections.singletonMap("f2",
Arrays.asList(3)));
+ partition, 0, Collections.singletonMap("f2",
Arrays.asList(3)));
List<IndexFileMeta> deleted =
new
ArrayList<>(commitMessage1.newFilesIncrement().newIndexFiles());
deleted.addAll(commitMessage2.newFilesIncrement().newIndexFiles());
- CommitMessage commitMessage4 =
store.removeIndexFiles(BinaryRow.EMPTY_ROW, 0, deleted);
+ CommitMessage commitMessage4 = store.removeIndexFiles(partition, 0,
deleted);
store.commit(commitMessage3, commitMessage4);
// assert 2
- assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW,
0).size()).isEqualTo(1);
- maintainer = store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0);
+ assertThat(store.scanDVIndexFiles(partition, 0).size()).isEqualTo(1);
+ maintainer = store.createOrRestoreDVMaintainer(partition, 0);
dvs = maintainer.deletionVectors();
assertThat(dvs.size()).isEqualTo(2);
assertThat(dvs.get("f1").isDeleted(3)).isTrue();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/utils/ConflictDeletionUtilsTest.java
b/paimon-core/src/test/java/org/apache/paimon/utils/ConflictDeletionUtilsTest.java
new file mode 100644
index 0000000000..34e6e59e7c
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/utils/ConflictDeletionUtilsTest.java
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.index.DeletionVectorMeta;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.manifest.FileEntry;
+import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.manifest.SimpleFileEntry;
+import org.apache.paimon.manifest.SimpleFileEntryWithDV;
+
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+
+import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
+import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
+import static org.apache.paimon.manifest.FileKind.ADD;
+import static org.apache.paimon.manifest.FileKind.DELETE;
+import static
org.apache.paimon.utils.ConflictDeletionUtils.buildBaseEntriesWithDV;
+import static
org.apache.paimon.utils.ConflictDeletionUtils.buildDeltaEntriesWithDV;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ConflictDeletionUtils}. */
+public class ConflictDeletionUtilsTest {
+
+ @Test
+ public void testBuildBaseEntriesWithDV() {
+ {
+ // Scene 1
+ List<SimpleFileEntry> baseEntries = new ArrayList<>();
+ baseEntries.add(createFileEntry("f1", ADD));
+ baseEntries.add(createFileEntry("f2", ADD));
+
+ List<IndexManifestEntry> deltaIndexEntries = new ArrayList<>();
+ deltaIndexEntries.add(createDvIndexEntry("dv1", ADD,
Arrays.asList("f2")));
+
+ assertThat(buildBaseEntriesWithDV(baseEntries, deltaIndexEntries))
+ .containsExactlyInAnyOrder(
+ createFileEntryWithDV("f1", ADD, null),
+ createFileEntryWithDV("f2", ADD, "dv1"));
+ }
+
+ {
+ // Scene 2: skip delete dv
+ List<SimpleFileEntry> baseEntries = new ArrayList<>();
+ baseEntries.add(createFileEntry("f1", ADD));
+ baseEntries.add(createFileEntry("f2", ADD));
+
+ List<IndexManifestEntry> deltaIndexEntries = new ArrayList<>();
+ deltaIndexEntries.add(createDvIndexEntry("dv1", DELETE,
Arrays.asList("f2")));
+
+ assertThat(buildBaseEntriesWithDV(baseEntries, deltaIndexEntries))
+ .containsExactlyInAnyOrder(
+ createFileEntryWithDV("f1", ADD, null),
+ createFileEntryWithDV("f2", ADD, null));
+ }
+ }
+
+ @Test
+ public void testBuildDeltaEntriesWithDV() {
+ {
+ // Scene 1: update f2's dv
+ List<SimpleFileEntry> baseEntries = new ArrayList<>();
+ baseEntries.add(createFileEntryWithDV("f1", ADD, "dv1"));
+ baseEntries.add(createFileEntryWithDV("f2", ADD, null));
+
+ List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+ deltaEntries.add(createFileEntry("f2", DELETE));
+ deltaEntries.add(createFileEntry("f2_new", ADD));
+
+ List<IndexManifestEntry> deltaIndexEntries = new ArrayList<>();
+ deltaIndexEntries.add(createDvIndexEntry("dv2", ADD,
Arrays.asList("f2_new")));
+
+ assertThat(buildDeltaEntriesWithDV(baseEntries, deltaEntries,
deltaIndexEntries))
+ .containsExactlyInAnyOrder(
+ createFileEntryWithDV("f2", DELETE, null),
+ createFileEntryWithDV("f2_new", ADD, "dv2"));
+ }
+
+ {
+ // Scene 2: update f2 and merge f1's dv
+ List<SimpleFileEntry> baseEntries = new ArrayList<>();
+ baseEntries.add(createFileEntryWithDV("f1", ADD, "dv1"));
+ baseEntries.add(createFileEntryWithDV("f2", ADD, null));
+
+ List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+ deltaEntries.add(createFileEntry("f2", DELETE));
+ deltaEntries.add(createFileEntry("f2_new", ADD));
+ deltaEntries.add(createFileEntry("f3", ADD));
+
+ List<IndexManifestEntry> deltaIndexEntries = new ArrayList<>();
+ deltaIndexEntries.add(createDvIndexEntry("dv1", DELETE,
Arrays.asList("f1")));
+ deltaIndexEntries.add(createDvIndexEntry("dv2", ADD,
Arrays.asList("f1", "f2_new")));
+
+ assertThat(buildDeltaEntriesWithDV(baseEntries, deltaEntries,
deltaIndexEntries))
+ .containsExactlyInAnyOrder(
+ createFileEntryWithDV("f1", DELETE, "dv1"),
+ createFileEntryWithDV("f1", ADD, "dv2"),
+ createFileEntryWithDV("f2", DELETE, null),
+ createFileEntryWithDV("f2_new", ADD, "dv2"),
+ createFileEntryWithDV("f3", ADD, null));
+ }
+
+ {
+ // Scene 3: update f2 (with dv) and merge f1's dv
+ List<SimpleFileEntry> baseEntries = new ArrayList<>();
+ baseEntries.add(createFileEntryWithDV("f1", ADD, "dv1"));
+ baseEntries.add(createFileEntryWithDV("f2", ADD, "dv2"));
+
+ List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+ deltaEntries.add(createFileEntry("f2", DELETE));
+ deltaEntries.add(createFileEntry("f2_new", ADD));
+ deltaEntries.add(createFileEntry("f3", ADD));
+
+ List<IndexManifestEntry> deltaIndexEntries = new ArrayList<>();
+ deltaIndexEntries.add(createDvIndexEntry("dv1", DELETE,
Arrays.asList("f1")));
+ deltaIndexEntries.add(createDvIndexEntry("dv2", DELETE,
Arrays.asList("f2")));
+ deltaIndexEntries.add(createDvIndexEntry("dv3", ADD,
Arrays.asList("f1", "f2_new")));
+
+ assertThat(buildDeltaEntriesWithDV(baseEntries, deltaEntries,
deltaIndexEntries))
+ .containsExactlyInAnyOrder(
+ createFileEntryWithDV("f1", DELETE, "dv1"),
+ createFileEntryWithDV("f1", ADD, "dv3"),
+ createFileEntryWithDV("f2", DELETE, "dv2"),
+ createFileEntryWithDV("f2_new", ADD, "dv3"),
+ createFileEntryWithDV("f3", ADD, null));
+ }
+
+ {
+ // Scene 4: full compact
+ List<SimpleFileEntry> baseEntries = new ArrayList<>();
+ baseEntries.add(createFileEntryWithDV("f1", ADD, null));
+ baseEntries.add(createFileEntryWithDV("f2", ADD, "dv1"));
+ baseEntries.add(createFileEntryWithDV("f3", ADD, "dv1"));
+ baseEntries.add(createFileEntryWithDV("f4", ADD, "dv2"));
+
+ List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+ deltaEntries.add(createFileEntry("f1", DELETE));
+ deltaEntries.add(createFileEntry("f2", DELETE));
+ deltaEntries.add(createFileEntry("f3", DELETE));
+ deltaEntries.add(createFileEntry("f4", DELETE));
+ deltaEntries.add(createFileEntry("f5_compact", ADD));
+
+ List<IndexManifestEntry> deltaIndexEntries = new ArrayList<>();
+ deltaIndexEntries.add(createDvIndexEntry("dv1", DELETE,
Arrays.asList("f2", "f3")));
+ deltaIndexEntries.add(createDvIndexEntry("dv2", DELETE,
Arrays.asList("f4")));
+
+ assertThat(buildDeltaEntriesWithDV(baseEntries, deltaEntries,
deltaIndexEntries))
+ .containsExactlyInAnyOrder(
+ createFileEntryWithDV("f1", DELETE, null),
+ createFileEntryWithDV("f2", DELETE, "dv1"),
+ createFileEntryWithDV("f3", DELETE, "dv1"),
+ createFileEntryWithDV("f4", DELETE, "dv2"),
+ createFileEntryWithDV("f5_compact", ADD, null));
+ }
+
+ {
+ // Scene 5: merge into with update, delete and insert
+ List<SimpleFileEntry> baseEntries = new ArrayList<>();
+ baseEntries.add(createFileEntryWithDV("f1", ADD, null));
+ baseEntries.add(createFileEntryWithDV("f2", ADD, null));
+ baseEntries.add(createFileEntryWithDV("f3", ADD, "dv1"));
+ baseEntries.add(createFileEntryWithDV("f4", ADD, "dv1"));
+ baseEntries.add(createFileEntryWithDV("f5", ADD, "dv2"));
+
+ List<SimpleFileEntry> deltaEntries = new ArrayList<>();
+ deltaEntries.add(createFileEntry("f2", DELETE));
+ deltaEntries.add(createFileEntry("f3", DELETE));
+ deltaEntries.add(createFileEntry("f3_new", ADD));
+ deltaEntries.add(createFileEntry("f7", ADD));
+
+ List<IndexManifestEntry> deltaIndexEntries = new ArrayList<>();
+ deltaIndexEntries.add(createDvIndexEntry("dv1", DELETE,
Arrays.asList("f3", "f4")));
+ deltaIndexEntries.add(createDvIndexEntry("dv2", DELETE,
Arrays.asList("f5")));
+ deltaIndexEntries.add(createDvIndexEntry("dv3", ADD,
Arrays.asList("f1", "f4", "f5")));
+
+ assertThat(buildDeltaEntriesWithDV(baseEntries, deltaEntries,
deltaIndexEntries))
+ .containsExactlyInAnyOrder(
+ createFileEntryWithDV("f1", DELETE, null),
+ createFileEntryWithDV("f1", ADD, "dv3"),
+ createFileEntryWithDV("f2", DELETE, null),
+ createFileEntryWithDV("f3", DELETE, "dv1"),
+ createFileEntryWithDV("f3_new", ADD, null),
+ createFileEntryWithDV("f4", DELETE, "dv1"),
+ createFileEntryWithDV("f4", ADD, "dv3"),
+ createFileEntryWithDV("f5", DELETE, "dv2"),
+ createFileEntryWithDV("f5", ADD, "dv3"),
+ createFileEntryWithDV("f7", ADD, null));
+ }
+ }
+
+ @Test
+ public void testConflictDeletionWithDV() {
+ {
+ // Scene 1: base -------------> update2 (conflict)
+ // f1 ^ <f1, +dv2>
+ // |
+ // update1 (finished)
+ // <f1, +dv1>
+ List<SimpleFileEntry> update1Entries = new ArrayList<>();
+ update1Entries.add(createFileEntryWithDV("f1", ADD, "dv1"));
+
+ List<SimpleFileEntry> update2DeltaEntries = new ArrayList<>();
+
+ List<IndexManifestEntry> update2DeltaIndexEntries = new
ArrayList<>();
+ update2DeltaIndexEntries.add(createDvIndexEntry("dv2", ADD,
Arrays.asList("f1")));
+
+ List<SimpleFileEntry> update2DeltaEntriesWithDV =
+ buildDeltaEntriesWithDV(
+ update1Entries, update2DeltaEntries,
update2DeltaIndexEntries);
+ assertThat(update2DeltaEntriesWithDV)
+ .containsExactlyInAnyOrder(
+ createFileEntryWithDV("f1", DELETE, null),
+ createFileEntryWithDV("f1", ADD, "dv2"));
+ assertConflict(update1Entries, update2DeltaEntriesWithDV);
+ }
+
+ {
+ // Scene 2: base -------------> update2 (conflict)
+ // <f1, dv0> ^ <f1, +dv2>
+ // |
+ // update1 (finished)
+ // <f1, +dv1>
+ List<SimpleFileEntry> update1Entries = new ArrayList<>();
+ update1Entries.add(createFileEntryWithDV("f1", ADD, "dv1"));
+
+ List<SimpleFileEntry> update2DeltaEntries = new ArrayList<>();
+
+ List<IndexManifestEntry> update2DeltaIndexEntries = new
ArrayList<>();
+ update2DeltaIndexEntries.add(createDvIndexEntry("dv0", DELETE,
Arrays.asList("f1")));
+ update2DeltaIndexEntries.add(createDvIndexEntry("dv2", ADD,
Arrays.asList("f1")));
+
+ List<SimpleFileEntry> update2DeltaEntriesWithDV =
+ buildDeltaEntriesWithDV(
+ update1Entries, update2DeltaEntries,
update2DeltaIndexEntries);
+ assertThat(update2DeltaEntriesWithDV)
+ .containsExactlyInAnyOrder(
+ createFileEntryWithDV("f1", DELETE, "dv0"),
+ createFileEntryWithDV("f1", ADD, "dv2"));
+ assertConflict(update1Entries, update2DeltaEntriesWithDV);
+ }
+
+ {
+ // Scene 3: base -------------> update2 (conflict)
+ // <f1, dv0> ^ <-f1, -dv0>, <+f3, null>
+ // |
+ // update1 (finished)
+ // <-f1, -dv0>, <+f2, dv1>
+ List<SimpleFileEntry> update1Entries = new ArrayList<>();
+ update1Entries.add(createFileEntryWithDV("f2", ADD, "dv1"));
+
+ List<SimpleFileEntry> update2DeltaEntries = new ArrayList<>();
+ update2DeltaEntries.add(createFileEntry("f1", DELETE));
+ update2DeltaEntries.add(createFileEntry("f3", ADD));
+
+ List<IndexManifestEntry> update2DeltaIndexEntries = new
ArrayList<>();
+ update2DeltaIndexEntries.add(createDvIndexEntry("dv0", DELETE,
Arrays.asList("f1")));
+
+ List<SimpleFileEntry> update2DeltaEntriesWithDV =
+ buildDeltaEntriesWithDV(
+ update1Entries, update2DeltaEntries,
update2DeltaIndexEntries);
+ assertThat(update2DeltaEntriesWithDV)
+ .containsExactlyInAnyOrder(
+ createFileEntryWithDV("f1", DELETE, "dv0"),
+ createFileEntryWithDV("f3", ADD, null));
+ assertConflict(update1Entries, update2DeltaEntriesWithDV);
+ }
+ }
+
+ private SimpleFileEntry createFileEntry(String fileName, FileKind kind) {
+ return new SimpleFileEntry(
+ kind,
+ EMPTY_ROW,
+ 0,
+ 1,
+ 0,
+ fileName,
+ Collections.emptyList(),
+ null,
+ EMPTY_ROW,
+ EMPTY_ROW,
+ null);
+ }
+
+ private SimpleFileEntryWithDV createFileEntryWithDV(
+ String fileName, FileKind kind, @Nullable String dvFileName) {
+ return new SimpleFileEntryWithDV(createFileEntry(fileName, kind),
dvFileName);
+ }
+
+ private IndexManifestEntry createDvIndexEntry(
+ String fileName, FileKind kind, List<String> fileNames) {
+ LinkedHashMap<String, DeletionVectorMeta> dvRanges = new
LinkedHashMap<>();
+ for (String name : fileNames) {
+ dvRanges.put(name, new DeletionVectorMeta(name, 1, 1, 1L));
+ }
+ return new IndexManifestEntry(
+ kind,
+ EMPTY_ROW,
+ 0,
+ new IndexFileMeta(
+ DELETION_VECTORS_INDEX, fileName, 11, dvRanges.size(),
dvRanges, null));
+ }
+
+ private void assertConflict(
+ List<SimpleFileEntry> baseEntries, List<SimpleFileEntry>
deltaEntries) {
+ ArrayList<SimpleFileEntry> simpleFileEntryWithDVS = new
ArrayList<>(baseEntries);
+ simpleFileEntryWithDVS.addAll(deltaEntries);
+ Collection<SimpleFileEntry> merged =
FileEntry.mergeEntries(simpleFileEntryWithDVS);
+ int deleteCount = 0;
+ for (SimpleFileEntry simpleFileEntryWithDV : merged) {
+ if (simpleFileEntryWithDV.kind().equals(FileKind.DELETE)) {
+ deleteCount++;
+ }
+ }
+ assert (deleteCount > 0);
+ }
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
index c85315d876..461aca90ef 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/DeleteFromPaimonTableCommand.scala
@@ -119,6 +119,7 @@ case class DeleteFromPaimonTableCommand(
}
private def performNonPrimaryKeyDelete(sparkSession: SparkSession):
Seq[CommitMessage] = {
+ val readSnapshot = table.snapshotManager().latestSnapshot()
// Step1: the candidate data splits which are filtered by Paimon Predicate.
val candidateDataSplits = findCandidateDataSplits(condition,
relation.output)
val dataFilePathToMeta = candidateFileMap(candidateDataSplits)
@@ -133,7 +134,7 @@ case class DeleteFromPaimonTableCommand(
sparkSession)
// Step3: update the touched deletion vectors and index files
- writer.persistDeletionVectors(deletionVectors)
+ writer.persistDeletionVectors(deletionVectors, readSnapshot)
} else {
// Step2: extract out the exactly files, which must have at least one
record to be updated.
val touchedFilePaths =
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
index 5f12d1110d..1ae894dbae 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/MergeIntoPaimonTable.scala
@@ -88,6 +88,8 @@ case class MergeIntoPaimonTable(
}
private def performMergeForNonPkTable(sparkSession: SparkSession):
Seq[CommitMessage] = {
+ // todo: find a more universal way to make read snapshot consistent.
+ val readSnapshot = table.snapshotManager().latestSnapshot()
val targetDS = createDataset(sparkSession, filteredTargetPlan)
val sourceDS = createDataset(sparkSession, sourceTable)
@@ -113,7 +115,7 @@ case class MergeIntoPaimonTable(
val dvDS = ds.where(
s"$ROW_KIND_COL = ${RowKind.DELETE.toByteValue} or $ROW_KIND_COL =
${RowKind.UPDATE_AFTER.toByteValue}")
val deletionVectors = collectDeletionVectors(dataFilePathToMeta, dvDS,
sparkSession)
- val indexCommitMsg = writer.persistDeletionVectors(deletionVectors)
+ val indexCommitMsg = writer.persistDeletionVectors(deletionVectors,
readSnapshot)
// Step4: filter rows that should be written as the inserted/updated
data.
val toWriteDS = ds
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
index 246245c052..6d0563b364 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala
@@ -18,7 +18,7 @@
package org.apache.paimon.spark.commands
-import org.apache.paimon.CoreOptions
+import org.apache.paimon.{CoreOptions, Snapshot}
import org.apache.paimon.CoreOptions.{PartitionSinkStrategy, WRITE_ONLY}
import org.apache.paimon.codegen.CodeGenUtils
import org.apache.paimon.crosspartition.{IndexBootstrap, KeyPartOrRow}
@@ -304,10 +304,11 @@ case class PaimonSparkWriter(table: FileStoreTable,
writeRowTracking: Boolean =
* deletion vectors; else, one index file will contain all deletion vector
with the same partition
* and bucket.
*/
- def persistDeletionVectors(deletionVectors: Dataset[SparkDeletionVector]):
Seq[CommitMessage] = {
+ def persistDeletionVectors(
+ deletionVectors: Dataset[SparkDeletionVector],
+ snapshot: Snapshot): Seq[CommitMessage] = {
val sparkSession = deletionVectors.sparkSession
import sparkSession.implicits._
- val snapshot = table.snapshotManager().latestSnapshotFromFileSystem()
val serializedCommits = deletionVectors
.groupByKey(_.partitionAndBucket)
.mapGroups {
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
index 1babd7b5c3..8839d5c8ac 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/UpdatePaimonTableCommand.scala
@@ -71,6 +71,7 @@ case class UpdatePaimonTableCommand(
/** Update for table without primary keys */
private def performUpdateForNonPkTable(sparkSession: SparkSession):
Seq[CommitMessage] = {
+ val readSnapshot = table.snapshotManager().latestSnapshot()
// Step1: the candidate data splits which are filtered by Paimon Predicate.
val candidateDataSplits = findCandidateDataSplits(condition,
relation.output)
val dataFilePathToMeta = candidateFileMap(candidateDataSplits)
@@ -100,7 +101,7 @@ case class UpdatePaimonTableCommand(
val addCommitMessage = writeOnlyUpdatedData(sparkSession,
touchedDataSplits)
// Step4: write these deletion vectors.
- val indexCommitMsg = writer.persistDeletionVectors(deletionVectors)
+ val indexCommitMsg = writer.persistDeletionVectors(deletionVectors,
readSnapshot)
addCommitMessage ++ indexCommitMsg
} finally {
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
index c867aed9f0..bfbadc7624 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/MergeIntoTableTestBase.scala
@@ -760,46 +760,49 @@ trait MergeIntoAppendTableTest extends
PaimonSparkTestBase with PaimonAppendTabl
}
test("Paimon MergeInto: concurrent merge and compact") {
- withTable("s", "t") {
- sql("CREATE TABLE s (id INT, b INT, c INT)")
- sql("INSERT INTO s VALUES (1, 1, 1)")
-
- sql("CREATE TABLE t (id INT, b INT, c INT)")
- sql("INSERT INTO t VALUES (1, 1, 1)")
-
- val mergeInto = Future {
- for (_ <- 1 to 10) {
- try {
- sql("""
- |MERGE INTO t
- |USING s
- |ON t.id = s.id
- |WHEN MATCHED THEN
- |UPDATE SET t.id = s.id, t.b = s.b + t.b, t.c = s.c + t.c
- |""".stripMargin)
- } catch {
- case a: Throwable =>
- assert(
- a.getMessage.contains("Conflicts during commits") ||
a.getMessage.contains(
- "Missing file"))
+ for (dvEnabled <- Seq("true", "false")) {
+ withTable("s", "t") {
+ sql("CREATE TABLE s (id INT, b INT, c INT)")
+ sql("INSERT INTO s VALUES (1, 1, 1)")
+
+ sql(
+ s"CREATE TABLE t (id INT, b INT, c INT) TBLPROPERTIES
('deletion-vectors.enabled' = '$dvEnabled')")
+ sql("INSERT INTO t VALUES (1, 1, 1)")
+
+ val mergeInto = Future {
+ for (_ <- 1 to 10) {
+ try {
+ sql("""
+ |MERGE INTO t
+ |USING s
+ |ON t.id = s.id
+ |WHEN MATCHED THEN
+ |UPDATE SET t.id = s.id, t.b = s.b + t.b, t.c = s.c + t.c
+ |""".stripMargin)
+ } catch {
+ case a: Throwable =>
+ assert(
+ a.getMessage.contains("Conflicts during commits") ||
a.getMessage.contains(
+ "Missing file"))
+ }
+ checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1)))
}
- checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1)))
}
- }
- val compact = Future {
- for (_ <- 1 to 10) {
- try {
- sql("CALL sys.compact(table => 't', order_strategy => 'order',
order_by => 'id')")
- } catch {
- case a: Throwable => assert(a.getMessage.contains("Conflicts
during commits"))
+ val compact = Future {
+ for (_ <- 1 to 10) {
+ try {
+ sql("CALL sys.compact(table => 't', order_strategy => 'order',
order_by => 'id')")
+ } catch {
+ case a: Throwable => assert(a.getMessage.contains("Conflicts
during commits"))
+ }
+ checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1)))
}
- checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(1)))
}
- }
- Await.result(mergeInto, 60.seconds)
- Await.result(compact, 60.seconds)
+ Await.result(mergeInto, 60.seconds)
+ Await.result(compact, 60.seconds)
+ }
}
}
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala
index 097d2c4e14..072324ce3a 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/UpdateTableTestBase.scala
@@ -25,6 +25,11 @@ import org.apache.paimon.spark.catalyst.analysis.Update
import org.apache.spark.sql.Row
import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
+import scala.concurrent.{Await, Future}
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration.DurationInt
+import scala.util.Random
+
abstract class UpdateTableTestBase extends PaimonSparkTestBase {
import testImplicits._
@@ -393,4 +398,38 @@ abstract class UpdateTableTestBase extends
PaimonSparkTestBase {
assert(latestSnapshot.commitKind.equals(Snapshot.CommitKind.COMPACT))
}
}
+
+ test("Paimon update: random concurrent update and dv table") {
+ withTable("t") {
+ val recordCount = 10000
+ val maxCurrent = Random.nextInt(2) + 1
+
+ sql(s"CREATE TABLE t (a INT, b INT) TBLPROPERTIES
('deletion-vectors.enabled' = 'true')")
+ sql(s"INSERT INTO t SELECT id AS a, 0 AS b FROM range(0, $recordCount)")
+
+ def run(): Future[Unit] = Future {
+ for (_ <- 1 to 20) {
+ try {
+ val i = 20 + Random.nextInt(100)
+ Random.nextInt(2) match {
+ case 0 => sql(s"UPDATE t SET b = b + 1 WHERE (a % $i) =
${Random.nextInt(i)}")
+ case 1 =>
+ sql("CALL sys.compact(table => 't', options =>
'compaction.min.file-num=1')")
+ case 2 =>
+ sql("CALL sys.compact(table => 't', order_strategy => 'order',
order_by => 'a')")
+ }
+ } catch {
+ case a: Throwable => assert(a.getMessage.contains("Conflicts
during commits"))
+ }
+ checkAnswer(sql("SELECT count(*) FROM t"), Seq(Row(recordCount)))
+ }
+ }
+
+ (1 to maxCurrent)
+ .map(_ => run())
+ .foreach(
+ Await.result(_, 600.seconds)
+ )
+ }
+ }
}