This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1c269968c2 [core] Simplify AppendDeleteFileMaintainer to maintainer
free (#6123)
1c269968c2 is described below
commit 1c269968c2b32d2eb4987435b98dc143f76df1ca
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Aug 22 19:57:32 2025 +0800
[core] Simplify AppendDeleteFileMaintainer to maintainer free (#6123)
---
.../main/java/org/apache/paimon/CoreOptions.java | 2 +-
.../java/org/apache/paimon/AbstractFileStore.java | 6 +-
.../org/apache/paimon/AppendOnlyFileStore.java | 11 ++-
.../java/org/apache/paimon/KeyValueFileStore.java | 9 +--
.../append/BucketedAppendCompactManager.java | 16 ++--
.../apache/paimon/compact/CompactDeletionFile.java | 21 ++---
...rsMaintainer.java => BucketedDvMaintainer.java} | 27 +++----
.../paimon/deletionvectors/DeletionVector.java | 2 +-
.../DeletionVectorIndexFileWriter.java | 38 +++++----
.../deletionvectors/DeletionVectorsIndexFile.java | 32 ++++----
.../append/AppendDeleteFileMaintainer.java | 90 ++++++++--------------
.../append/BaseAppendDeleteFileMaintainer.java | 36 +++++++--
.../append/BucketedAppendDeleteFileMaintainer.java | 15 ++--
.../org/apache/paimon/index/IndexFileHandler.java | 55 +------------
.../paimon/mergetree/compact/CompactStrategy.java | 4 +-
.../LookupChangelogMergeFunctionWrapper.java | 6 +-
.../compact/LookupMergeTreeCompactRewriter.java | 12 +--
.../mergetree/compact/MergeTreeCompactManager.java | 6 +-
.../paimon/operation/AbstractFileStoreWrite.java | 14 ++--
.../paimon/operation/AppendFileStoreWrite.java | 7 +-
.../paimon/operation/BaseAppendFileStoreWrite.java | 8 +-
.../operation/BucketedAppendFileStoreWrite.java | 6 +-
.../apache/paimon/operation/FileStoreWrite.java | 6 +-
.../paimon/operation/KeyValueFileStoreWrite.java | 10 +--
.../paimon/operation/MemoryFileStoreWrite.java | 4 +-
.../postpone/PostponeBucketFileStoreWrite.java | 4 +-
.../org/apache/paimon/TestAppendFileStore.java | 14 ++--
...inerTest.java => BucketedDvMaintainerTest.java} | 58 ++++++--------
.../DeletionVectorsIndexFileTest.java | 16 ++--
.../append/AppendDeletionFileMaintainerHelper.java | 19 ++++-
.../paimon/operation/FileStoreCommitTest.java | 5 +-
.../paimon/spark/sql/DeletionVectorTest.scala | 20 ++---
32 files changed, 266 insertions(+), 313 deletions(-)
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index e0f9af088d..8d86219cfb 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -2863,7 +2863,7 @@ public class CoreOptions implements Serializable {
return deletionVectorsEnabled() || mergeEngine() == FIRST_ROW;
}
- public MemorySize deletionVectorIndexFileTargetSize() {
+ public MemorySize dvIndexFileTargetSize() {
return options.get(DELETION_VECTOR_INDEX_FILE_TARGET_SIZE);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 11f67a2c10..4d2b7726de 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -43,14 +43,12 @@ import org.apache.paimon.operation.ManifestsReader;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.SnapshotDeletion;
import org.apache.paimon.operation.TagDeletion;
-import org.apache.paimon.options.MemorySize;
import org.apache.paimon.partition.PartitionExpireStrategy;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.stats.StatsFile;
import org.apache.paimon.stats.StatsFileHandler;
-import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.PartitionHandler;
@@ -230,9 +228,7 @@ abstract class AbstractFileStore<T> implements FileStore<T>
{
new DeletionVectorsIndexFile(
fileIO,
pathFactory().indexFileFactory(),
- bucketMode() == BucketMode.BUCKET_UNAWARE
- ? options.deletionVectorIndexFileTargetSize()
- : MemorySize.ofBytes(Long.MAX_VALUE),
+ options.dvIndexFileTargetSize(),
options.deletionVectorBitmap64()));
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 0d9ad3b121..2f552d799b 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -19,7 +19,7 @@
package org.apache.paimon;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.operation.AppendFileStoreWrite;
@@ -107,10 +107,6 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
@Override
public BaseAppendFileStoreWrite newWrite(String commitUser, @Nullable
Integer writeId) {
- DeletionVectorsMaintainer.Factory dvMaintainerFactory =
- options.deletionVectorsEnabled()
- ?
DeletionVectorsMaintainer.factory(newIndexFileHandler())
- : null;
if (bucketMode() == BucketMode.BUCKET_UNAWARE) {
RawFileSplitRead readForCompact = newRead();
if (options.rowTrackingEnabled()) {
@@ -126,9 +122,12 @@ public class AppendOnlyFileStore extends
AbstractFileStore<InternalRow> {
snapshotManager(),
newScan(),
options,
- dvMaintainerFactory,
tableName);
} else {
+ BucketedDvMaintainer.Factory dvMaintainerFactory =
+ options.deletionVectorsEnabled()
+ ?
BucketedDvMaintainer.factory(newIndexFileHandler())
+ : null;
return new BucketedAppendFileStoreWrite(
fileIO,
newRead(),
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index f7a743ace1..5e5b354e65 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -20,7 +20,7 @@ package org.apache.paimon;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.index.DynamicBucketIndexMaintainer;
@@ -177,10 +177,9 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
if (bucketMode() == BucketMode.HASH_DYNAMIC) {
indexFactory = new
DynamicBucketIndexMaintainer.Factory(newIndexFileHandler());
}
- DeletionVectorsMaintainer.Factory deletionVectorsMaintainerFactory =
null;
+ BucketedDvMaintainer.Factory dvMaintainerFactory = null;
if (options.deletionVectorsEnabled()) {
- deletionVectorsMaintainerFactory =
- new
DeletionVectorsMaintainer.Factory(newIndexFileHandler());
+ dvMaintainerFactory =
BucketedDvMaintainer.factory(newIndexFileHandler());
}
return new KeyValueFileStoreWrite(
fileIO,
@@ -199,7 +198,7 @@ public class KeyValueFileStore extends
AbstractFileStore<KeyValue> {
snapshotManager(),
newScan(),
indexFactory,
- deletionVectorsMaintainerFactory,
+ dvMaintainerFactory,
options,
keyValueFieldsExtractor,
tableName);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
index 666704b836..4a057f9576 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/BucketedAppendCompactManager.java
@@ -24,7 +24,7 @@ import org.apache.paimon.compact.CompactDeletionFile;
import org.apache.paimon.compact.CompactFutureManager;
import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.compact.CompactTask;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.operation.metrics.CompactionMetrics;
import org.apache.paimon.operation.metrics.MetricUtils;
@@ -56,7 +56,7 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
private static final int FULL_COMPACT_MIN_FILE = 3;
private final ExecutorService executor;
- private final DeletionVectorsMaintainer dvMaintainer;
+ private final BucketedDvMaintainer dvMaintainer;
private final PriorityQueue<DataFileMeta> toCompact;
private final int minFileNum;
private final long targetFileSize;
@@ -70,7 +70,7 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
public BucketedAppendCompactManager(
ExecutorService executor,
List<DataFileMeta> restored,
- @Nullable DeletionVectorsMaintainer dvMaintainer,
+ @Nullable BucketedDvMaintainer dvMaintainer,
int minFileNum,
long targetFileSize,
boolean forceRewriteAllFiles,
@@ -241,14 +241,14 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
/** A {@link CompactTask} impl for full compaction of append-only table. */
public static class FullCompactTask extends CompactTask {
- private final DeletionVectorsMaintainer dvMaintainer;
+ private final BucketedDvMaintainer dvMaintainer;
private final LinkedList<DataFileMeta> toCompact;
private final long targetFileSize;
private final boolean forceRewriteAllFiles;
private final CompactRewriter rewriter;
public FullCompactTask(
- DeletionVectorsMaintainer dvMaintainer,
+ BucketedDvMaintainer dvMaintainer,
Collection<DataFileMeta> inputs,
long targetFileSize,
boolean forceRewriteAllFiles,
@@ -314,12 +314,12 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
*/
public static class AutoCompactTask extends CompactTask {
- private final DeletionVectorsMaintainer dvMaintainer;
+ private final BucketedDvMaintainer dvMaintainer;
private final List<DataFileMeta> toCompact;
private final CompactRewriter rewriter;
public AutoCompactTask(
- DeletionVectorsMaintainer dvMaintainer,
+ BucketedDvMaintainer dvMaintainer,
List<DataFileMeta> toCompact,
CompactRewriter rewriter,
@Nullable CompactionMetrics.Reporter metricsReporter) {
@@ -336,7 +336,7 @@ public class BucketedAppendCompactManager extends
CompactFutureManager {
}
private static CompactResult compact(
- @Nullable DeletionVectorsMaintainer dvMaintainer,
+ @Nullable BucketedDvMaintainer dvMaintainer,
List<DataFileMeta> toCompact,
CompactRewriter rewriter)
throws Exception {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java
index 390ab7af90..f52c3a41d8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/compact/CompactDeletionFile.java
@@ -18,13 +18,12 @@
package org.apache.paimon.compact;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import javax.annotation.Nullable;
-import java.util.List;
import java.util.Optional;
/** Deletion File from compaction. */
@@ -41,19 +40,13 @@ public interface CompactDeletionFile {
* immediately, so when updateCompactResult, we need to merge old deletion
files (just delete
* them).
*/
- static CompactDeletionFile generateFiles(DeletionVectorsMaintainer
maintainer) {
- List<IndexFileMeta> files = maintainer.writeDeletionVectorsIndex();
- if (files.size() > 1) {
- throw new IllegalStateException(
- "Should only generate one compact deletion file, this is a
bug.");
- }
-
- return new GeneratedDeletionFile(
- files.isEmpty() ? null : files.get(0),
maintainer.indexFileHandler());
+ static CompactDeletionFile generateFiles(BucketedDvMaintainer maintainer) {
+ Optional<IndexFileMeta> file = maintainer.writeDeletionVectorsIndex();
+ return new GeneratedDeletionFile(file.orElse(null),
maintainer.indexFileHandler());
}
/** For sync compaction, only create deletion files when prepareCommit. */
- static CompactDeletionFile lazyGeneration(DeletionVectorsMaintainer
maintainer) {
+ static CompactDeletionFile lazyGeneration(BucketedDvMaintainer maintainer)
{
return new LazyCompactDeletionFile(maintainer);
}
@@ -107,11 +100,11 @@ public interface CompactDeletionFile {
/** A lazy generation implementation of {@link CompactDeletionFile}. */
class LazyCompactDeletionFile implements CompactDeletionFile {
- private final DeletionVectorsMaintainer maintainer;
+ private final BucketedDvMaintainer maintainer;
private boolean generated = false;
- public LazyCompactDeletionFile(DeletionVectorsMaintainer maintainer) {
+ public LazyCompactDeletionFile(BucketedDvMaintainer maintainer) {
this.maintainer = maintainer;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java
similarity index 86%
rename from
paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
rename to
paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java
index f19a236a14..1feb135efc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BucketedDvMaintainer.java
@@ -31,14 +31,14 @@ import java.util.Map;
import java.util.Optional;
/** Maintainer of deletionVectors index. */
-public class DeletionVectorsMaintainer {
+public class BucketedDvMaintainer {
private final IndexFileHandler indexFileHandler;
private final Map<String, DeletionVector> deletionVectors;
protected final boolean bitmap64;
private boolean modified;
- private DeletionVectorsMaintainer(
+ private BucketedDvMaintainer(
IndexFileHandler fileHandler, Map<String, DeletionVector>
deletionVectors) {
this.indexFileHandler = fileHandler;
this.deletionVectors = deletionVectors;
@@ -108,15 +108,16 @@ public class DeletionVectorsMaintainer {
/**
* Write new deletion vectors index file if any modifications have been
made.
*
- * @return A list containing the metadata of the deletion vectors index
file, or an empty list
- * if no changes need to be committed.
+ * @return None if no modifications have been made, otherwise the new
deletion vectors index
+ * file.
*/
- public List<IndexFileMeta> writeDeletionVectorsIndex() {
+ public Optional<IndexFileMeta> writeDeletionVectorsIndex() {
if (modified) {
modified = false;
- return indexFileHandler.writeDeletionVectorsIndex(deletionVectors);
+ return Optional.of(
+
indexFileHandler.deletionVectorsIndex().writeSingleFile(deletionVectors));
}
- return Collections.emptyList();
+ return Optional.empty();
}
/**
@@ -147,12 +148,12 @@ public class DeletionVectorsMaintainer {
return new Factory(handler);
}
- /** Factory to restore {@link DeletionVectorsMaintainer}. */
+ /** Factory to restore {@link BucketedDvMaintainer}. */
public static class Factory {
private final IndexFileHandler handler;
- public Factory(IndexFileHandler handler) {
+ private Factory(IndexFileHandler handler) {
this.handler = handler;
}
@@ -160,7 +161,7 @@ public class DeletionVectorsMaintainer {
return handler;
}
- public DeletionVectorsMaintainer create(@Nullable List<IndexFileMeta>
restoredFiles) {
+ public BucketedDvMaintainer create(@Nullable List<IndexFileMeta>
restoredFiles) {
if (restoredFiles == null) {
restoredFiles = Collections.emptyList();
}
@@ -169,12 +170,12 @@ public class DeletionVectorsMaintainer {
return create(deletionVectors);
}
- public DeletionVectorsMaintainer create() {
+ public BucketedDvMaintainer create() {
return create(new HashMap<>());
}
- public DeletionVectorsMaintainer create(Map<String, DeletionVector>
deletionVectors) {
- return new DeletionVectorsMaintainer(handler, deletionVectors);
+ public BucketedDvMaintainer create(Map<String, DeletionVector>
deletionVectors) {
+ return new BucketedDvMaintainer(handler, deletionVectors);
}
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
index ff48c7d218..e06dc767a9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVector.java
@@ -149,7 +149,7 @@ public interface DeletionVector extends
DeletionVectorJudger {
return fileName -> Optional.empty();
}
- static Factory factory(@Nullable DeletionVectorsMaintainer dvMaintainer) {
+ static Factory factory(@Nullable BucketedDvMaintainer dvMaintainer) {
if (dvMaintainer == null) {
return emptyFactory();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
index 4647b55fb1..4104305e05 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
@@ -45,12 +45,29 @@ public class DeletionVectorIndexFileWriter {
}
/**
- * For unaware-bucket mode, this method will write out multiple index
files, else, it will write
- * out only one index file.
+ * The deletion file of the bucketed table is updated according to the
bucket. If a compaction
+ * occurs and there is no longer a deletion file, an empty deletion file
needs to be generated
+ * to overwrite the old file.
+ *
+ * <p>TODO: We can consider sending a message to delete the deletion file
in the future.
*/
- public List<IndexFileMeta> write(Map<String, DeletionVector> input) throws
IOException {
+ public IndexFileMeta writeSingleFile(Map<String, DeletionVector> input)
throws IOException {
+
+ DeletionFileWriter writer = new
DeletionFileWriter(indexPathFactory.newPath(), fileIO);
+ try {
+ for (Map.Entry<String, DeletionVector> entry : input.entrySet()) {
+ writer.write(entry.getKey(), entry.getValue());
+ }
+ } finally {
+ writer.close();
+ }
+ return writer.result();
+ }
+
+ public List<IndexFileMeta> writeWithRolling(Map<String, DeletionVector>
input)
+ throws IOException {
if (input.isEmpty()) {
- return emptyIndexFile();
+ return Collections.emptyList();
}
List<IndexFileMeta> result = new ArrayList<>();
Iterator<Map.Entry<String, DeletionVector>> iterator =
input.entrySet().iterator();
@@ -76,17 +93,4 @@ public class DeletionVectorIndexFileWriter {
}
return writer.result();
}
-
- /**
- * The deletion file of the bucketed table is updated according to the
bucket. If a compaction
- * occurs and there is no longer a deletion file, an empty deletion file
needs to be generated
- * to overwrite the old file.
- *
- * <p>TODO: We can consider sending a message to delete the deletion file
in the future.
- */
- private List<IndexFileMeta> emptyIndexFile() throws IOException {
- DeletionFileWriter writer = new
DeletionFileWriter(indexPathFactory.newPath(), fileIO);
- writer.close();
- return Collections.singletonList(writer.result());
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
index fa9ed56417..cc554b816a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
@@ -141,29 +141,27 @@ public class DeletionVectorsIndexFile extends IndexFile {
}
}
- /**
- * Write deletion vectors to a new file, the format of this file can be
referenced at: <a
- * href="https://cwiki.apache.org/confluence/x/Tws4EQ">PIP-16</a>.
- *
- * @param input A map where the key represents which file the
DeletionVector belongs to, and the
- * value is the corresponding DeletionVector object.
- * @return A Pair object specifying the name of the written new file and a
map where the key
- * represents which file the DeletionVector belongs to and the value
is a Pair object
- * specifying the range (start position and size) within the file
where the deletion vector
- * data is located.
- * @throws UncheckedIOException If an I/O error occurs while writing to
the file.
- */
- public List<IndexFileMeta> write(Map<String, DeletionVector> input) {
+ public IndexFileMeta writeSingleFile(Map<String, DeletionVector> input) {
+ try {
+ return createWriter().writeSingleFile(input);
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to write deletion vectors.", e);
+ }
+ }
+
+ public List<IndexFileMeta> writeWithRolling(Map<String, DeletionVector>
input) {
try {
- DeletionVectorIndexFileWriter writer =
- new DeletionVectorIndexFileWriter(
- this.fileIO, this.pathFactory,
this.targetSizePerIndexFile);
- return writer.write(input);
+ return createWriter().writeWithRolling(input);
} catch (IOException e) {
throw new RuntimeException("Failed to write deletion vectors.", e);
}
}
+ private DeletionVectorIndexFileWriter createWriter() {
+ return new DeletionVectorIndexFileWriter(
+ this.fileIO, this.pathFactory, this.targetSizePerIndexFile);
+ }
+
private void checkVersion(InputStream in) throws IOException {
int version = in.read();
if (version != VERSION_ID_V1) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeleteFileMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeleteFileMaintainer.java
index 38e1176de7..70e00bc4ad 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeleteFileMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/AppendDeleteFileMaintainer.java
@@ -22,9 +22,7 @@ import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
@@ -36,58 +34,48 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET;
/** A {@link BaseAppendDeleteFileMaintainer} of unaware bucket append table. */
public class AppendDeleteFileMaintainer implements
BaseAppendDeleteFileMaintainer {
- private final IndexFileHandler indexFileHandler;
+ private final DeletionVectorsIndexFile dvIndexFile;
private final BinaryRow partition;
private final Map<String, DeletionFile> dataFileToDeletionFile;
- private final Map<String, IndexManifestEntry> indexNameToEntry = new
HashMap<>();
-
- private final Map<String, Map<String, DeletionFile>>
indexFileToDeletionFiles = new HashMap<>();
- private final Map<String, String> dataFileToIndexFile = new HashMap<>();
-
- private final Set<String> touchedIndexFiles = new HashSet<>();
-
- private final DeletionVectorsMaintainer maintainer;
+ private final Map<String, IndexManifestEntry> indexNameToEntry;
+ private final Map<String, Map<String, DeletionFile>>
indexFileToDeletionFiles;
+ private final Map<String, String> dataFileToIndexFile;
+ private final Set<String> touchedIndexFiles;
+ private final Map<String, DeletionVector> deletionVectors;
AppendDeleteFileMaintainer(
- IndexFileHandler indexFileHandler,
+ DeletionVectorsIndexFile dvIndexFile,
BinaryRow partition,
+ List<IndexManifestEntry> manifestEntries,
Map<String, DeletionFile> deletionFiles) {
- this.indexFileHandler = indexFileHandler;
+ this.dvIndexFile = dvIndexFile;
this.partition = partition;
this.dataFileToDeletionFile = new HashMap<>(deletionFiles);
- // the deletion of data files is independent
- // just create an empty maintainer
- this.maintainer = new
DeletionVectorsMaintainer.Factory(indexFileHandler).create();
-
- List<String> touchedIndexFileNames =
- deletionFiles.values().stream()
- .map(deletionFile -> new
Path(deletionFile.path()).getName())
- .distinct()
- .collect(Collectors.toList());
- indexFileHandler.scanEntries().stream()
- .filter(
- indexManifestEntry ->
- touchedIndexFileNames.contains(
-
indexManifestEntry.indexFile().fileName()))
- .forEach(entry ->
indexNameToEntry.put(entry.indexFile().fileName(), entry));
+ this.deletionVectors = new HashMap<>();
+
+ this.indexNameToEntry = new HashMap<>();
+ for (IndexManifestEntry entry : manifestEntries) {
+ indexNameToEntry.put(entry.indexFile().fileName(), entry);
+ }
+ this.indexFileToDeletionFiles = new HashMap<>();
+ this.dataFileToIndexFile = new HashMap<>();
for (String dataFile : deletionFiles.keySet()) {
DeletionFile deletionFile = deletionFiles.get(dataFile);
String indexFileName = new Path(deletionFile.path()).getName();
- if (!indexFileToDeletionFiles.containsKey(indexFileName)) {
- indexFileToDeletionFiles.put(indexFileName, new HashMap<>());
- }
- indexFileToDeletionFiles.get(indexFileName).put(dataFile,
deletionFile);
+ indexFileToDeletionFiles
+ .computeIfAbsent(indexFileName, k -> new HashMap<>())
+ .put(dataFile, deletionFile);
dataFileToIndexFile.put(dataFile, indexFileName);
}
+ this.touchedIndexFiles = new HashSet<>();
}
@Override
@@ -111,7 +99,7 @@ public class AppendDeleteFileMaintainer implements
BaseAppendDeleteFileMaintaine
public DeletionVector getDeletionVector(String dataFile) {
DeletionFile deletionFile = getDeletionFile(dataFile);
if (deletionFile != null) {
- return
indexFileHandler.deletionVectorsIndex().readDeletionVector(deletionFile);
+ return dvIndexFile.readDeletionVector(deletionFile);
}
return null;
}
@@ -129,28 +117,26 @@ public class AppendDeleteFileMaintainer implements
BaseAppendDeleteFileMaintaine
@Override
public void notifyNewDeletionVector(String dataFile, DeletionVector
deletionVector) {
- DeletionVectorsIndexFile deletionVectorsIndexFile =
indexFileHandler.deletionVectorsIndex();
DeletionFile previous = notifyRemovedDeletionVector(dataFile);
if (previous != null) {
-
deletionVector.merge(deletionVectorsIndexFile.readDeletionVector(previous));
+ deletionVector.merge(dvIndexFile.readDeletionVector(previous));
}
- maintainer.notifyNewDeletion(dataFile, deletionVector);
+ deletionVectors.put(dataFile, deletionVector);
}
@Override
public List<IndexManifestEntry> persist() {
List<IndexManifestEntry> result = writeUnchangedDeletionVector();
- List<IndexManifestEntry> newIndexFileEntries =
- maintainer.writeDeletionVectorsIndex().stream()
- .map(
- fileMeta ->
- new IndexManifestEntry(
- FileKind.ADD, partition,
UNAWARE_BUCKET, fileMeta))
- .collect(Collectors.toList());
- result.addAll(newIndexFileEntries);
+ dvIndexFile.writeWithRolling(deletionVectors).stream()
+ .map(this::toAddEntry)
+ .forEach(result::add);
return result;
}
+ private IndexManifestEntry toAddEntry(IndexFileMeta file) {
+ return new IndexManifestEntry(FileKind.ADD, partition, UNAWARE_BUCKET,
file);
+ }
+
public String getIndexFilePath(String dataFile) {
DeletionFile deletionFile = getDeletionFile(dataFile);
return deletionFile == null ? null : deletionFile.path();
@@ -158,7 +144,6 @@ public class AppendDeleteFileMaintainer implements
BaseAppendDeleteFileMaintaine
@VisibleForTesting
List<IndexManifestEntry> writeUnchangedDeletionVector() {
- DeletionVectorsIndexFile deletionVectorsIndexFile =
indexFileHandler.deletionVectorsIndex();
List<IndexManifestEntry> newIndexEntries = new ArrayList<>();
for (String indexFile : indexFileToDeletionFiles.keySet()) {
if (touchedIndexFiles.contains(indexFile)) {
@@ -169,17 +154,10 @@ public class AppendDeleteFileMaintainer implements
BaseAppendDeleteFileMaintaine
indexFileToDeletionFiles.get(indexFile);
if (!dataFileToDeletionFiles.isEmpty()) {
List<IndexFileMeta> newIndexFiles =
- indexFileHandler.writeDeletionVectorsIndex(
-
deletionVectorsIndexFile.readDeletionVector(
- dataFileToDeletionFiles));
+ dvIndexFile.writeWithRolling(
+
dvIndexFile.readDeletionVector(dataFileToDeletionFiles));
newIndexFiles.forEach(
- newIndexFile ->
- newIndexEntries.add(
- new IndexManifestEntry(
- FileKind.ADD,
- oldEntry.partition(),
- oldEntry.bucket(),
- newIndexFile)));
+ newIndexFile ->
newIndexEntries.add(toAddEntry(newIndexFile)));
}
// mark the touched index file as removed.
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
index 6afffda970..ae3d3e8a97 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BaseAppendDeleteFileMaintainer.java
@@ -20,8 +20,9 @@ package org.apache.paimon.deletionvectors.append;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.deletionvectors.DeletionVector;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.index.DeletionVectorMeta;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.IndexManifestEntry;
@@ -29,11 +30,14 @@ import org.apache.paimon.table.source.DeletionFile;
import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
-import static org.apache.paimon.table.BucketMode.UNAWARE_BUCKET;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
/**
* A maintainer to maintain deletion files for append table, the core methods:
@@ -63,15 +67,33 @@ public interface BaseAppendDeleteFileMaintainer {
// overwrite the entire deletion file of the bucket when writing
deletes.
List<IndexFileMeta> indexFiles =
indexFileHandler.scan(snapshot, DELETION_VECTORS_INDEX,
partition, bucket);
- DeletionVectorsMaintainer maintainer =
- new
DeletionVectorsMaintainer.Factory(indexFileHandler).create(indexFiles);
+ BucketedDvMaintainer maintainer =
+
BucketedDvMaintainer.factory(indexFileHandler).create(indexFiles);
return new BucketedAppendDeleteFileMaintainer(partition, bucket,
maintainer);
}
static AppendDeleteFileMaintainer forUnawareAppend(
IndexFileHandler indexFileHandler, @Nullable Snapshot snapshot,
BinaryRow partition) {
- Map<String, DeletionFile> deletionFiles =
- indexFileHandler.scanDVIndex(snapshot, partition,
UNAWARE_BUCKET);
- return new AppendDeleteFileMaintainer(indexFileHandler, partition,
deletionFiles);
+ List<IndexManifestEntry> manifestEntries =
+ indexFileHandler.scan(snapshot,
DELETION_VECTORS_INDEX).stream()
+ .filter(e -> e.partition().equals(partition))
+ .collect(Collectors.toList());
+ Map<String, DeletionFile> deletionFiles = new HashMap<>();
+ for (IndexManifestEntry file : manifestEntries) {
+ IndexFileMeta meta = file.indexFile();
+ LinkedHashMap<String, DeletionVectorMeta> dvMetas =
meta.deletionVectorMetas();
+ checkNotNull(dvMetas);
+ for (DeletionVectorMeta dvMeta : dvMetas.values()) {
+ deletionFiles.put(
+ dvMeta.dataFileName(),
+ new DeletionFile(
+ indexFileHandler.filePath(meta).toString(),
+ dvMeta.offset(),
+ dvMeta.length(),
+ dvMeta.cardinality()));
+ }
+ }
+ return new AppendDeleteFileMaintainer(
+ indexFileHandler.deletionVectorsIndex(), partition,
manifestEntries, deletionFiles);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeleteFileMaintainer.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeleteFileMaintainer.java
index 847caae499..7245ebfe12 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeleteFileMaintainer.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/append/BucketedAppendDeleteFileMaintainer.java
@@ -19,23 +19,23 @@
package org.apache.paimon.deletionvectors.append;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.deletionvectors.DeletionVector;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.IndexManifestEntry;
+import java.util.ArrayList;
import java.util.List;
-import java.util.stream.Collectors;
/** A {@link BaseAppendDeleteFileMaintainer} of bucketed append table. */
public class BucketedAppendDeleteFileMaintainer implements
BaseAppendDeleteFileMaintainer {
private final BinaryRow partition;
private final int bucket;
- private final DeletionVectorsMaintainer maintainer;
+ private final BucketedDvMaintainer maintainer;
BucketedAppendDeleteFileMaintainer(
- BinaryRow partition, int bucket, DeletionVectorsMaintainer
maintainer) {
+ BinaryRow partition, int bucket, BucketedDvMaintainer maintainer) {
this.partition = partition;
this.bucket = bucket;
this.maintainer = maintainer;
@@ -58,8 +58,11 @@ public class BucketedAppendDeleteFileMaintainer implements
BaseAppendDeleteFileM
@Override
public List<IndexManifestEntry> persist() {
- return maintainer.writeDeletionVectorsIndex().stream()
+ List<IndexManifestEntry> result = new ArrayList<>();
+ maintainer
+ .writeDeletionVectorsIndex()
.map(fileMeta -> new IndexManifestEntry(FileKind.ADD,
partition, bucket, fileMeta))
- .collect(Collectors.toList());
+ .ifPresent(result::add);
+ return result;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
index 1d18bde05f..38c4df0ae9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/index/IndexFileHandler.java
@@ -25,20 +25,16 @@ import
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.IndexManifestFile;
-import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.utils.IntIterator;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
import org.apache.paimon.utils.SnapshotManager;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -47,7 +43,6 @@ import java.util.Set;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
import static org.apache.paimon.utils.Preconditions.checkArgument;
-import static org.apache.paimon.utils.Preconditions.checkNotNull;
/** Handle index files. */
public class IndexFileHandler {
@@ -85,37 +80,6 @@ public class IndexFileHandler {
return result.isEmpty() ? Optional.empty() :
Optional.of(result.get(0));
}
- public Map<String, DeletionFile> scanDVIndex(
- @Nullable Snapshot snapshot, BinaryRow partition, int bucket) {
- if (snapshot == null) {
- return Collections.emptyMap();
- }
- String indexManifest = snapshot.indexManifest();
- if (indexManifest == null) {
- return Collections.emptyMap();
- }
- Map<String, DeletionFile> result = new HashMap<>();
- for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) {
- IndexFileMeta meta = file.indexFile();
- if (meta.indexType().equals(DELETION_VECTORS_INDEX)
- && file.partition().equals(partition)
- && file.bucket() == bucket) {
- LinkedHashMap<String, DeletionVectorMeta> dvMetas =
meta.deletionVectorMetas();
- checkNotNull(dvMetas);
- for (DeletionVectorMeta dvMeta : dvMetas.values()) {
- result.put(
- dvMeta.dataFileName(),
- new DeletionFile(
- filePath(meta).toString(),
- dvMeta.offset(),
- dvMeta.length(),
- dvMeta.cardinality()));
- }
- }
- }
- return result;
- }
-
public List<IndexManifestEntry> scan(String indexType) {
return scan(snapshotManager.latestSnapshot(), indexType);
}
@@ -189,18 +153,10 @@ public class IndexFileHandler {
public List<IndexManifestEntry> scanEntries(
Snapshot snapshot, String indexType, Set<BinaryRow> partitions) {
- if (snapshot == null) {
- return Collections.emptyList();
- }
- String indexManifest = snapshot.indexManifest();
- if (indexManifest == null) {
- return Collections.emptyList();
- }
-
+ List<IndexManifestEntry> manifestEntries = scan(snapshot, indexType);
List<IndexManifestEntry> result = new ArrayList<>();
- for (IndexManifestEntry file : indexManifestFile.read(indexManifest)) {
- if (file.indexFile().indexType().equals(indexType)
- && partitions.contains(file.partition())) {
+ for (IndexManifestEntry file : manifestEntries) {
+ if (partitions.contains(file.partition())) {
result.add(file);
}
}
@@ -289,9 +245,4 @@ public class IndexFileHandler {
}
return deletionVectorsIndex.readAllDeletionVectors(fileMetas);
}
-
- public List<IndexFileMeta> writeDeletionVectorsIndex(
- Map<String, DeletionVector> deletionVectors) {
- return deletionVectorsIndex.write(deletionVectors);
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
index 0ab0981963..48d0a0ca24 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/CompactStrategy.java
@@ -19,7 +19,7 @@
package org.apache.paimon.mergetree.compact;
import org.apache.paimon.compact.CompactUnit;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.RecordLevelExpire;
import org.apache.paimon.mergetree.LevelSortedRun;
@@ -54,7 +54,7 @@ public interface CompactStrategy {
int numLevels,
List<LevelSortedRun> runs,
@Nullable RecordLevelExpire recordLevelExpire,
- @Nullable DeletionVectorsMaintainer dvMaintainer,
+ @Nullable BucketedDvMaintainer dvMaintainer,
boolean forceRewriteAllFiles) {
int maxLevel = numLevels - 1;
if (runs.isEmpty()) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
index ba427f7e92..6c9376ae33 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapper.java
@@ -21,7 +21,7 @@ package org.apache.paimon.mergetree.compact;
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.mergetree.LookupLevels.PositionedKeyValue;
import org.apache.paimon.types.RowKind;
@@ -64,7 +64,7 @@ public class LookupChangelogMergeFunctionWrapper<T>
private final KeyValue reusedAfter = new KeyValue();
@Nullable private final RecordEqualiser valueEqualiser;
private final LookupStrategy lookupStrategy;
- private final @Nullable DeletionVectorsMaintainer
deletionVectorsMaintainer;
+ private final @Nullable BucketedDvMaintainer deletionVectorsMaintainer;
private final Comparator<KeyValue> comparator;
public LookupChangelogMergeFunctionWrapper(
@@ -72,7 +72,7 @@ public class LookupChangelogMergeFunctionWrapper<T>
Function<InternalRow, T> lookup,
@Nullable RecordEqualiser valueEqualiser,
LookupStrategy lookupStrategy,
- @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer,
+ @Nullable BucketedDvMaintainer deletionVectorsMaintainer,
@Nullable UserDefinedSeqComparator userDefinedSeqComparator) {
MergeFunction<KeyValue> mergeFunction = mergeFunctionFactory.create();
checkArgument(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
index feb6d0b10b..896fba0d60 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -23,7 +23,7 @@ import org.apache.paimon.CoreOptions.MergeEngine;
import org.apache.paimon.KeyValue;
import org.apache.paimon.codegen.RecordEqualiser;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.FileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
@@ -56,7 +56,7 @@ public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewrite
private final LookupLevels<T> lookupLevels;
private final MergeFunctionWrapperFactory<T> wrapperFactory;
private final boolean noSequenceField;
- @Nullable private final DeletionVectorsMaintainer dvMaintainer;
+ @Nullable private final BucketedDvMaintainer dvMaintainer;
private final IntFunction<String> level2FileFormat;
public LookupMergeTreeCompactRewriter(
@@ -71,7 +71,7 @@ public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewrite
MergeSorter mergeSorter,
MergeFunctionWrapperFactory<T> wrapperFactory,
boolean produceChangelog,
- @Nullable DeletionVectorsMaintainer dvMaintainer,
+ @Nullable BucketedDvMaintainer dvMaintainer,
CoreOptions options) {
super(
maxLevel,
@@ -156,7 +156,7 @@ public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewrite
MergeFunctionFactory<KeyValue> mfFactory,
int outputLevel,
LookupLevels<T> lookupLevels,
- @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer);
+ @Nullable BucketedDvMaintainer deletionVectorsMaintainer);
}
/** A normal {@link MergeFunctionWrapperFactory} to create lookup wrapper.
*/
@@ -181,7 +181,7 @@ public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewrite
MergeFunctionFactory<KeyValue> mfFactory,
int outputLevel,
LookupLevels<T> lookupLevels,
- @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer)
{
+ @Nullable BucketedDvMaintainer deletionVectorsMaintainer) {
return new LookupChangelogMergeFunctionWrapper<>(
mfFactory,
key -> {
@@ -207,7 +207,7 @@ public class LookupMergeTreeCompactRewriter<T> extends
ChangelogMergeTreeRewrite
MergeFunctionFactory<KeyValue> mfFactory,
int outputLevel,
LookupLevels<Boolean> lookupLevels,
- @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer)
{
+ @Nullable BucketedDvMaintainer deletionVectorsMaintainer) {
return new FirstRowMergeFunctionWrapper(
mfFactory,
key -> {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
index 987fa06a30..f0909565ee 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactManager.java
@@ -26,7 +26,7 @@ import org.apache.paimon.compact.CompactResult;
import org.apache.paimon.compact.CompactTask;
import org.apache.paimon.compact.CompactUnit;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.RecordLevelExpire;
import org.apache.paimon.mergetree.LevelSortedRun;
@@ -63,7 +63,7 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
private final CompactRewriter rewriter;
@Nullable private final CompactionMetrics.Reporter metricsReporter;
- @Nullable private final DeletionVectorsMaintainer dvMaintainer;
+ @Nullable private final BucketedDvMaintainer dvMaintainer;
private final boolean lazyGenDeletionFile;
private final boolean needLookup;
private final boolean forceRewriteAllFiles;
@@ -79,7 +79,7 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
int numSortedRunStopTrigger,
CompactRewriter rewriter,
@Nullable CompactionMetrics.Reporter metricsReporter,
- @Nullable DeletionVectorsMaintainer dvMaintainer,
+ @Nullable BucketedDvMaintainer dvMaintainer,
boolean lazyGenDeletionFile,
boolean needLookup,
@Nullable RecordLevelExpire recordLevelExpire,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 2e61c3d2f1..15af7eb4fd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -24,7 +24,7 @@ import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.compact.CompactDeletionFile;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.index.DynamicBucketIndexMaintainer;
import org.apache.paimon.index.IndexFileHandler;
@@ -73,7 +73,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
private final int writerNumberMax;
@Nullable private final DynamicBucketIndexMaintainer.Factory
dbMaintainerFactory;
- @Nullable private final DeletionVectorsMaintainer.Factory
dvMaintainerFactory;
+ @Nullable private final BucketedDvMaintainer.Factory dvMaintainerFactory;
private final int numBuckets;
private final RowType partitionType;
@@ -95,7 +95,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
SnapshotManager snapshotManager,
FileStoreScan scan,
@Nullable DynamicBucketIndexMaintainer.Factory dbMaintainerFactory,
- @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
+ @Nullable BucketedDvMaintainer.Factory dvMaintainerFactory,
String tableName,
CoreOptions options,
RowType partitionType) {
@@ -430,7 +430,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
dbMaintainerFactory == null
? null
:
dbMaintainerFactory.create(restored.dynamicBucketIndex());
- DeletionVectorsMaintainer dvMaintainer =
+ BucketedDvMaintainer dvMaintainer =
dvMaintainerFactory == null
? null
:
dvMaintainerFactory.create(restored.deleteVectorsIndex());
@@ -524,7 +524,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
long restoredMaxSeqNumber,
@Nullable CommitIncrement restoreIncrement,
ExecutorService compactExecutor,
- @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer);
+ @Nullable BucketedDvMaintainer deletionVectorsMaintainer);
// force buffer spill to avoid out of memory in batch mode
protected void forceBufferSpill() throws Exception {}
@@ -538,7 +538,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
public final RecordWriter<T> writer;
public final int totalBuckets;
@Nullable public final DynamicBucketIndexMaintainer
dynamicBucketMaintainer;
- @Nullable public final DeletionVectorsMaintainer
deletionVectorsMaintainer;
+ @Nullable public final BucketedDvMaintainer deletionVectorsMaintainer;
protected final long baseSnapshotId;
protected long lastModifiedCommitIdentifier;
@@ -546,7 +546,7 @@ public abstract class AbstractFileStoreWrite<T> implements
FileStoreWrite<T> {
RecordWriter<T> writer,
int totalBuckets,
@Nullable DynamicBucketIndexMaintainer dynamicBucketMaintainer,
- @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer,
+ @Nullable BucketedDvMaintainer deletionVectorsMaintainer,
Long baseSnapshotId) {
this.writer = writer;
this.totalBuckets = totalBuckets;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendFileStoreWrite.java
index 3f1e701255..2df7c36934 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendFileStoreWrite.java
@@ -23,7 +23,7 @@ import org.apache.paimon.compact.CompactManager;
import org.apache.paimon.compact.NoopCompactManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.types.RowType;
@@ -51,7 +51,6 @@ public class AppendFileStoreWrite extends
BaseAppendFileStoreWrite {
SnapshotManager snapshotManager,
FileStoreScan scan,
CoreOptions options,
- @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
String tableName) {
super(
fileIO,
@@ -63,7 +62,7 @@ public class AppendFileStoreWrite extends
BaseAppendFileStoreWrite {
snapshotManager,
scan,
options,
- dvMaintainerFactory,
+ null,
tableName);
super.withIgnorePreviousFiles(true);
}
@@ -74,7 +73,7 @@ public class AppendFileStoreWrite extends
BaseAppendFileStoreWrite {
int bucket,
List<DataFileMeta> restoredFiles,
ExecutorService compactExecutor,
- @Nullable DeletionVectorsMaintainer dvMaintainer) {
+ @Nullable BucketedDvMaintainer dvMaintainer) {
return new NoopCompactManager();
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
index dca05a6522..3bd8264831 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/BaseAppendFileStoreWrite.java
@@ -24,8 +24,8 @@ import org.apache.paimon.append.AppendOnlyWriter;
import org.apache.paimon.compact.CompactManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.deletionvectors.DeletionVector;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.FileIO;
@@ -88,7 +88,7 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
SnapshotManager snapshotManager,
FileStoreScan scan,
CoreOptions options,
- @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
+ @Nullable BucketedDvMaintainer.Factory dvMaintainerFactory,
String tableName) {
super(snapshotManager, scan, options, partitionType, null,
dvMaintainerFactory, tableName);
this.fileIO = fileIO;
@@ -111,7 +111,7 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
long restoredMaxSeqNumber,
@Nullable CommitIncrement restoreIncrement,
ExecutorService compactExecutor,
- @Nullable DeletionVectorsMaintainer dvMaintainer) {
+ @Nullable BucketedDvMaintainer dvMaintainer) {
return new AppendOnlyWriter(
fileIO,
ioManager,
@@ -161,7 +161,7 @@ public abstract class BaseAppendFileStoreWrite extends
MemoryFileStoreWrite<Inte
int bucket,
List<DataFileMeta> restoredFiles,
ExecutorService compactExecutor,
- @Nullable DeletionVectorsMaintainer dvMaintainer);
+ @Nullable BucketedDvMaintainer dvMaintainer);
public List<DataFileMeta> compactRewrite(
BinaryRow partition,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
index 9fffc0ce7b..71f10d4b3d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketedAppendFileStoreWrite.java
@@ -24,8 +24,8 @@ import org.apache.paimon.compact.CompactManager;
import org.apache.paimon.compact.NoopCompactManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.deletionvectors.DeletionVector;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.types.RowType;
@@ -54,7 +54,7 @@ public class BucketedAppendFileStoreWrite extends
BaseAppendFileStoreWrite {
SnapshotManager snapshotManager,
FileStoreScan scan,
CoreOptions options,
- @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
+ @Nullable BucketedDvMaintainer.Factory dvMaintainerFactory,
String tableName) {
super(
fileIO,
@@ -77,7 +77,7 @@ public class BucketedAppendFileStoreWrite extends
BaseAppendFileStoreWrite {
int bucket,
List<DataFileMeta> restoredFiles,
ExecutorService compactExecutor,
- @Nullable DeletionVectorsMaintainer dvMaintainer) {
+ @Nullable BucketedDvMaintainer dvMaintainer) {
if (options.writeOnly()) {
return new NoopCompactManager();
} else {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
index 4c75fdbb73..b268f7b7a5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreWrite.java
@@ -20,7 +20,7 @@ package org.apache.paimon.operation;
import org.apache.paimon.FileStore;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.index.DynamicBucketIndexMaintainer;
import org.apache.paimon.io.DataFileMeta;
@@ -156,7 +156,7 @@ public interface FileStoreWrite<T> extends
Restorable<List<FileStoreWrite.State<
protected final List<DataFileMeta> dataFiles;
protected final long maxSequenceNumber;
@Nullable protected final DynamicBucketIndexMaintainer indexMaintainer;
- @Nullable protected final DeletionVectorsMaintainer
deletionVectorsMaintainer;
+ @Nullable protected final BucketedDvMaintainer
deletionVectorsMaintainer;
protected final CommitIncrement commitIncrement;
protected State(
@@ -168,7 +168,7 @@ public interface FileStoreWrite<T> extends
Restorable<List<FileStoreWrite.State<
Collection<DataFileMeta> dataFiles,
long maxSequenceNumber,
@Nullable DynamicBucketIndexMaintainer indexMaintainer,
- @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer,
+ @Nullable BucketedDvMaintainer deletionVectorsMaintainer,
CommitIncrement commitIncrement) {
this.partition = partition;
this.bucket = bucket;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 4e9e558d04..afd3fb906f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -29,8 +29,8 @@ import org.apache.paimon.compact.NoopCompactManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.RowCompactedSerializer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.deletionvectors.DeletionVector;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.format.FileFormatDiscover;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.index.DynamicBucketIndexMaintainer;
@@ -131,7 +131,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
SnapshotManager snapshotManager,
FileStoreScan scan,
@Nullable DynamicBucketIndexMaintainer.Factory dbMaintainerFactory,
- @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
+ @Nullable BucketedDvMaintainer.Factory dvMaintainerFactory,
CoreOptions options,
KeyValueFieldsExtractor extractor,
String tableName) {
@@ -185,7 +185,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
long restoredMaxSeqNumber,
@Nullable CommitIncrement restoreIncrement,
ExecutorService compactExecutor,
- @Nullable DeletionVectorsMaintainer dvMaintainer) {
+ @Nullable BucketedDvMaintainer dvMaintainer) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Creating merge tree writer for partition {} bucket {}
from restored files {}",
@@ -260,7 +260,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
CompactStrategy compactStrategy,
ExecutorService compactExecutor,
Levels levels,
- @Nullable DeletionVectorsMaintainer dvMaintainer) {
+ @Nullable BucketedDvMaintainer dvMaintainer) {
if (options.writeOnly()) {
return new NoopCompactManager();
} else {
@@ -299,7 +299,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
Comparator<InternalRow> keyComparator,
@Nullable FieldsComparator userDefinedSeqComparator,
Levels levels,
- @Nullable DeletionVectorsMaintainer dvMaintainer) {
+ @Nullable BucketedDvMaintainer dvMaintainer) {
DeletionVector.Factory dvFactory =
DeletionVector.factory(dvMaintainer);
FileReaderFactory<KeyValue> readerFactory =
readerFactoryBuilder.build(partition, bucket, dvFactory);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
index c91a664fad..06263d4ec6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/MemoryFileStoreWrite.java
@@ -19,7 +19,7 @@
package org.apache.paimon.operation;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.index.DynamicBucketIndexMaintainer;
import org.apache.paimon.io.cache.CacheManager;
import org.apache.paimon.memory.HeapMemorySegmentPool;
@@ -65,7 +65,7 @@ public abstract class MemoryFileStoreWrite<T> extends
AbstractFileStoreWrite<T>
CoreOptions options,
RowType partitionType,
@Nullable DynamicBucketIndexMaintainer.Factory dbMaintainerFactory,
- @Nullable DeletionVectorsMaintainer.Factory dvMaintainerFactory,
+ @Nullable BucketedDvMaintainer.Factory dvMaintainerFactory,
String tableName) {
super(
snapshotManager,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
index da7287ad96..ac9a851cb2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/postpone/PostponeBucketFileStoreWrite.java
@@ -21,7 +21,7 @@ package org.apache.paimon.postpone;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.avro.AvroSchemaConverter;
import org.apache.paimon.fs.FileIO;
@@ -180,7 +180,7 @@ public class PostponeBucketFileStoreWrite extends
MemoryFileStoreWrite<KeyValue>
long restoredMaxSeqNumber,
@Nullable CommitIncrement restoreIncrement,
ExecutorService compactExecutor,
- @Nullable DeletionVectorsMaintainer deletionVectorsMaintainer) {
+ @Nullable BucketedDvMaintainer deletionVectorsMaintainer) {
Preconditions.checkArgument(bucket == BucketMode.POSTPONE_BUCKET);
Preconditions.checkArgument(
restoreFiles.isEmpty(),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
index a67b5c0a97..55ffdcbe53 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestAppendFileStore.java
@@ -19,7 +19,7 @@
package org.apache.paimon;
import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.deletionvectors.append.AppendDeleteFileMaintainer;
import
org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainerHelper;
import org.apache.paimon.fs.FileIO;
@@ -44,6 +44,7 @@ import org.apache.paimon.table.source.DeletionFile;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.TraceableFileIO;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -126,10 +127,9 @@ public class TestAppendFileStore extends
AppendOnlyFileStore {
fileHandler, partition, dataFileToDeletionFiles);
}
- public DeletionVectorsMaintainer createOrRestoreDVMaintainer(BinaryRow
partition, int bucket) {
+ public BucketedDvMaintainer createOrRestoreDVMaintainer(BinaryRow
partition, int bucket) {
Snapshot latestSnapshot = snapshotManager().latestSnapshot();
- DeletionVectorsMaintainer.Factory factory =
- new DeletionVectorsMaintainer.Factory(fileHandler);
+ BucketedDvMaintainer.Factory factory =
BucketedDvMaintainer.factory(fileHandler);
List<IndexFileMeta> indexFiles =
fileHandler.scan(latestSnapshot, DELETION_VECTORS_INDEX,
partition, bucket);
return factory.create(indexFiles);
@@ -137,19 +137,21 @@ public class TestAppendFileStore extends
AppendOnlyFileStore {
public CommitMessageImpl writeDVIndexFiles(
BinaryRow partition, int bucket, Map<String, List<Integer>>
dataFileToPositions) {
- DeletionVectorsMaintainer dvMaintainer =
createOrRestoreDVMaintainer(partition, bucket);
+ BucketedDvMaintainer dvMaintainer =
createOrRestoreDVMaintainer(partition, bucket);
for (Map.Entry<String, List<Integer>> entry :
dataFileToPositions.entrySet()) {
for (Integer pos : entry.getValue()) {
dvMaintainer.notifyNewDeletion(entry.getKey(), pos);
}
}
+ List<IndexFileMeta> indexFiles = new ArrayList<>();
+ dvMaintainer.writeDeletionVectorsIndex().ifPresent(indexFiles::add);
return new CommitMessageImpl(
partition,
bucket,
options().bucket(),
DataIncrement.emptyIncrement(),
CompactIncrement.emptyIncrement(),
- new IndexIncrement(dvMaintainer.writeDeletionVectorsIndex()));
+ new IndexIncrement(indexFiles));
}
public static TestAppendFileStore createAppendStore(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
similarity index 83%
rename from
paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
index 7e82ad8d65..2128ae8714 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsMaintainerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/BucketedDvMaintainerTest.java
@@ -51,8 +51,8 @@ import static java.util.Collections.emptyList;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static org.assertj.core.api.Assertions.assertThat;
-/** Test for {@link DeletionVectorsMaintainer}. */
-public class DeletionVectorsMaintainerTest extends PrimaryKeyTableTestBase {
+/** Test for {@link BucketedDvMaintainer}. */
+public class BucketedDvMaintainerTest extends PrimaryKeyTableTestBase {
private IndexFileHandler fileHandler;
@ParameterizedTest
@@ -60,9 +60,8 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
public void test0(boolean bitmap64) {
initIndexHandler(bitmap64);
- DeletionVectorsMaintainer.Factory factory =
- new DeletionVectorsMaintainer.Factory(fileHandler);
- DeletionVectorsMaintainer dvMaintainer = factory.create(emptyList());
+ BucketedDvMaintainer.Factory factory =
BucketedDvMaintainer.factory(fileHandler);
+ BucketedDvMaintainer dvMaintainer = factory.create(emptyList());
assertThat(dvMaintainer.bitmap64).isEqualTo(bitmap64);
dvMaintainer.notifyNewDeletion("f1", 1);
@@ -72,9 +71,10 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
assertThat(dvMaintainer.deletionVectorOf("f1")).isPresent();
assertThat(dvMaintainer.deletionVectorOf("f3")).isEmpty();
- List<IndexFileMeta> fileMetas =
dvMaintainer.writeDeletionVectorsIndex();
+ IndexFileMeta file = dvMaintainer.writeDeletionVectorsIndex().get();
- Map<String, DeletionVector> deletionVectors =
fileHandler.readAllDeletionVectors(fileMetas);
+ Map<String, DeletionVector> deletionVectors =
+
fileHandler.readAllDeletionVectors(Collections.singletonList(file));
assertThat(deletionVectors.get("f1").isDeleted(1)).isTrue();
assertThat(deletionVectors.get("f1").isDeleted(2)).isFalse();
assertThat(deletionVectors.get("f2").isDeleted(1)).isFalse();
@@ -87,10 +87,9 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
public void test1(boolean bitmap64) {
initIndexHandler(bitmap64);
- DeletionVectorsMaintainer.Factory factory =
- new DeletionVectorsMaintainer.Factory(fileHandler);
+ BucketedDvMaintainer.Factory factory =
BucketedDvMaintainer.factory(fileHandler);
- DeletionVectorsMaintainer dvMaintainer = factory.create();
+ BucketedDvMaintainer dvMaintainer = factory.create();
DeletionVector deletionVector1 = createDeletionVector(bitmap64);
deletionVector1.delete(1);
deletionVector1.delete(3);
@@ -98,8 +97,7 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
dvMaintainer.notifyNewDeletion("f1", deletionVector1);
assertThat(dvMaintainer.bitmap64()).isEqualTo(bitmap64);
- List<IndexFileMeta> fileMetas1 =
dvMaintainer.writeDeletionVectorsIndex();
- assertThat(fileMetas1.size()).isEqualTo(1);
+ IndexFileMeta file = dvMaintainer.writeDeletionVectorsIndex().get();
CommitMessage commitMessage =
new CommitMessageImpl(
BinaryRow.EMPTY_ROW,
@@ -107,7 +105,7 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
1,
DataIncrement.emptyIncrement(),
CompactIncrement.emptyIncrement(),
- new IndexIncrement(fileMetas1));
+ new IndexIncrement(Collections.singletonList(file)));
BatchTableCommit commit = table.newBatchWriteBuilder().newCommit();
commit.commit(Collections.singletonList(commitMessage));
@@ -122,8 +120,7 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
deletionVector2.delete(2);
dvMaintainer.notifyNewDeletion("f1", deletionVector2);
- List<IndexFileMeta> fileMetas2 =
dvMaintainer.writeDeletionVectorsIndex();
- assertThat(fileMetas2.size()).isEqualTo(1);
+ file = dvMaintainer.writeDeletionVectorsIndex().get();
commitMessage =
new CommitMessageImpl(
BinaryRow.EMPTY_ROW,
@@ -131,7 +128,7 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
1,
DataIncrement.emptyIncrement(),
CompactIncrement.emptyIncrement(),
- new IndexIncrement(fileMetas2));
+ new IndexIncrement(Collections.singletonList(file)));
commit = table.newBatchWriteBuilder().newCommit();
commit.commit(Collections.singletonList(commitMessage));
@@ -149,9 +146,8 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
public void testCompactDeletion(boolean bitmap64) throws IOException {
initIndexHandler(bitmap64);
- DeletionVectorsMaintainer.Factory factory =
- new DeletionVectorsMaintainer.Factory(fileHandler);
- DeletionVectorsMaintainer dvMaintainer = factory.create(emptyList());
+ BucketedDvMaintainer.Factory factory =
BucketedDvMaintainer.factory(fileHandler);
+ BucketedDvMaintainer dvMaintainer = factory.create(emptyList());
File indexDir = new File(tempPath.toFile(), "/default.db/T/index");
@@ -191,17 +187,15 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
public void testReadAndWriteMixedDv(boolean bitmap64) {
// write first kind dv
initIndexHandler(bitmap64);
- DeletionVectorsMaintainer.Factory factory1 =
- new DeletionVectorsMaintainer.Factory(fileHandler);
- DeletionVectorsMaintainer dvMaintainer1 = factory1.create();
+ BucketedDvMaintainer.Factory factory1 =
BucketedDvMaintainer.factory(fileHandler);
+ BucketedDvMaintainer dvMaintainer1 = factory1.create();
dvMaintainer1.notifyNewDeletion("f1", 1);
dvMaintainer1.notifyNewDeletion("f1", 3);
dvMaintainer1.notifyNewDeletion("f2", 1);
dvMaintainer1.notifyNewDeletion("f2", 3);
assertThat(dvMaintainer1.bitmap64()).isEqualTo(bitmap64);
- List<IndexFileMeta> fileMetas1 =
dvMaintainer1.writeDeletionVectorsIndex();
- assertThat(fileMetas1.size()).isEqualTo(1);
+ IndexFileMeta file = dvMaintainer1.writeDeletionVectorsIndex().get();
CommitMessage commitMessage1 =
new CommitMessageImpl(
BinaryRow.EMPTY_ROW,
@@ -209,21 +203,20 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
1,
DataIncrement.emptyIncrement(),
CompactIncrement.emptyIncrement(),
- new IndexIncrement(fileMetas1));
+ new IndexIncrement(Collections.singletonList(file)));
BatchTableCommit commit1 = table.newBatchWriteBuilder().newCommit();
commit1.commit(Collections.singletonList(commitMessage1));
// write second kind dv
initIndexHandler(!bitmap64);
- DeletionVectorsMaintainer.Factory factory2 =
- new DeletionVectorsMaintainer.Factory(fileHandler);
+ BucketedDvMaintainer.Factory factory2 =
BucketedDvMaintainer.factory(fileHandler);
List<IndexFileMeta> indexFiles =
fileHandler.scan(
table.latestSnapshot().get(),
DELETION_VECTORS_INDEX,
BinaryRow.EMPTY_ROW,
0);
- DeletionVectorsMaintainer dvMaintainer2 = factory2.create(indexFiles);
+ BucketedDvMaintainer dvMaintainer2 = factory2.create(indexFiles);
dvMaintainer2.notifyNewDeletion("f1", 10);
dvMaintainer2.notifyNewDeletion("f3", 1);
dvMaintainer2.notifyNewDeletion("f3", 3);
@@ -238,8 +231,7 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
assertThat(dvs.get("f3"))
.isInstanceOf(bitmap64 ? BitmapDeletionVector.class :
Bitmap64DeletionVector.class);
- List<IndexFileMeta> fileMetas2 =
dvMaintainer2.writeDeletionVectorsIndex();
- assertThat(fileMetas2.size()).isEqualTo(1);
+ file = dvMaintainer2.writeDeletionVectorsIndex().get();
CommitMessage commitMessage2 =
new CommitMessageImpl(
BinaryRow.EMPTY_ROW,
@@ -247,7 +239,7 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
1,
DataIncrement.emptyIncrement(),
CompactIncrement.emptyIncrement(),
- new IndexIncrement(fileMetas2));
+ new IndexIncrement(Collections.singletonList(file)));
BatchTableCommit commit2 = table.newBatchWriteBuilder().newCommit();
commit2.commit(Collections.singletonList(commitMessage2));
@@ -278,8 +270,8 @@ public class DeletionVectorsMaintainerTest extends
PrimaryKeyTableTestBase {
fileHandler = table.store().newIndexFileHandler();
}
- public static DeletionVectorsMaintainer createOrRestore(
- DeletionVectorsMaintainer.Factory factory,
+ public static BucketedDvMaintainer createOrRestore(
+ BucketedDvMaintainer.Factory factory,
@Nullable Snapshot snapshot,
BinaryRow partition) {
IndexFileHandler handler = factory.indexFileHandler();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
index 2b55cba306..d30462c25d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
@@ -72,7 +72,7 @@ public class DeletionVectorsIndexFileTest {
index3.delete(3);
deleteMap.put("file33.parquet", index3);
- List<IndexFileMeta> indexFiles =
deletionVectorsIndexFile.write(deleteMap);
+ List<IndexFileMeta> indexFiles =
deletionVectorsIndexFile.writeWithRolling(deleteMap);
assertThat(indexFiles.size()).isEqualTo(1);
// read
@@ -110,7 +110,7 @@ public class DeletionVectorsIndexFileTest {
}
// read
- List<IndexFileMeta> indexFiles =
deletionVectorsIndexFile.write(deleteMap);
+ List<IndexFileMeta> indexFiles =
deletionVectorsIndexFile.writeWithRolling(deleteMap);
assertThat(indexFiles.size()).isEqualTo(1);
Map<String, DeletionVector> dvs =
deletionVectorsIndexFile.readAllDeletionVectors(indexFiles);
@@ -142,7 +142,7 @@ public class DeletionVectorsIndexFileTest {
fileToCardinality.put("f" + i, index.getCardinality());
fileToDV.put("f" + i, index);
}
- List<IndexFileMeta> indexFiles =
deletionVectorsIndexFile.write(fileToDV);
+ List<IndexFileMeta> indexFiles =
deletionVectorsIndexFile.writeWithRolling(fileToDV);
// read
assertThat(indexFiles.size()).isEqualTo(1);
@@ -174,7 +174,7 @@ public class DeletionVectorsIndexFileTest {
fileToCardinality.put("f" + i, index.getCardinality());
fileToDV.put("f" + i, index);
}
- List<IndexFileMeta> indexFiles =
deletionVectorsIndexFile.write(fileToDV);
+ List<IndexFileMeta> indexFiles =
deletionVectorsIndexFile.writeWithRolling(fileToDV);
// assert 1
assertThat(indexFiles.size()).isEqualTo(3);
@@ -196,7 +196,7 @@ public class DeletionVectorsIndexFileTest {
fileToCardinality.put("f" + i, index.getCardinality());
fileToDV.put("f" + i, index);
}
- indexFiles = deletionVectorsIndexFile.write(fileToDV);
+ indexFiles = deletionVectorsIndexFile.writeWithRolling(fileToDV);
// assert 2
assertThat(indexFiles.size()).isGreaterThan(1);
@@ -226,7 +226,7 @@ public class DeletionVectorsIndexFileTest {
deleteMap1.put(String.format("file%s.parquet", i), index);
deleteInteger.put(String.format("file%s.parquet", i), num);
}
- List<IndexFileMeta> indexFiles1 =
v1DeletionVectorsIndexFile.write(deleteMap1);
+ List<IndexFileMeta> indexFiles1 =
v1DeletionVectorsIndexFile.writeWithRolling(deleteMap1);
assertThat(indexFiles1.size()).isEqualTo(1);
// write v2 dv
@@ -238,7 +238,7 @@ public class DeletionVectorsIndexFileTest {
deleteMap2.put(String.format("file%s.parquet", i), index);
deleteInteger.put(String.format("file%s.parquet", i), num);
}
- List<IndexFileMeta> indexFiles2 =
v2DeletionVectorsIndexFile.write(deleteMap2);
+ List<IndexFileMeta> indexFiles2 =
v2DeletionVectorsIndexFile.writeWithRolling(deleteMap2);
assertThat(indexFiles2.size()).isEqualTo(1);
List<IndexFileMeta> totalIndexFiles =
@@ -274,7 +274,7 @@ public class DeletionVectorsIndexFileTest {
index1.delete(100);
deleteMap.put("file1.parquet", index1);
- List<IndexFileMeta> indexFiles =
deletionVectorsIndexFile.write(deleteMap);
+ List<IndexFileMeta> indexFiles =
deletionVectorsIndexFile.writeWithRolling(deleteMap);
assertThat(indexFiles.size()).isEqualTo(1);
IndexFileMeta indexFileMeta = indexFiles.get(0);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
index c6048b95f4..892d219278 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/append/AppendDeletionFileMaintainerHelper.java
@@ -19,10 +19,14 @@
package org.apache.paimon.deletionvectors.append;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
+import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.table.source.DeletionFile;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/** Helper for {@link BaseAppendDeleteFileMaintainer}. */
public class AppendDeletionFileMaintainerHelper {
@@ -31,6 +35,19 @@ public class AppendDeletionFileMaintainerHelper {
IndexFileHandler indexFileHandler,
BinaryRow partition,
Map<String, DeletionFile> deletionFiles) {
- return new AppendDeleteFileMaintainer(indexFileHandler, partition,
deletionFiles);
+ List<String> touchedIndexFileNames =
+ deletionFiles.values().stream()
+ .map(deletionFile -> new
Path(deletionFile.path()).getName())
+ .distinct()
+ .collect(Collectors.toList());
+ List<IndexManifestEntry> manifests =
+ indexFileHandler.scanEntries().stream()
+ .filter(
+ indexManifestEntry ->
+ touchedIndexFileNames.contains(
+
indexManifestEntry.indexFile().fileName()))
+ .collect(Collectors.toList());
+ return new AppendDeleteFileMaintainer(
+ indexFileHandler.deletionVectorsIndex(), partition, manifests,
deletionFiles);
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
index 23e942d3f4..5701106be4 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java
@@ -25,8 +25,8 @@ import org.apache.paimon.TestAppendFileStore;
import org.apache.paimon.TestFileStore;
import org.apache.paimon.TestKeyValueGenerator;
import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.deletionvectors.BucketedDvMaintainer;
import org.apache.paimon.deletionvectors.DeletionVector;
-import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.index.IndexFileHandler;
@@ -890,8 +890,7 @@ public class FileStoreCommitTest {
// assert 1
assertThat(store.scanDVIndexFiles(BinaryRow.EMPTY_ROW,
0).size()).isEqualTo(2);
- DeletionVectorsMaintainer maintainer =
- store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0);
+ BucketedDvMaintainer maintainer =
store.createOrRestoreDVMaintainer(BinaryRow.EMPTY_ROW, 0);
Map<String, DeletionVector> dvs = maintainer.deletionVectors();
assertThat(dvs.size()).isEqualTo(2);
assertThat(dvs.get("f2").isDeleted(2)).isTrue();
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
index f85213da7c..445d1507d9 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
@@ -19,7 +19,7 @@
package org.apache.paimon.spark.sql
import org.apache.paimon.data.BinaryRow
-import org.apache.paimon.deletionvectors.{DeletionVector,
DeletionVectorsMaintainer, DeletionVectorsMaintainerTest}
+import org.apache.paimon.deletionvectors.{BucketedDvMaintainer,
BucketedDvMaintainerTest, DeletionVector}
import org.apache.paimon.fs.Path
import org.apache.paimon.spark.{PaimonSparkTestBase, PaimonSplitScan}
import org.apache.paimon.spark.schema.PaimonMetadataColumn
@@ -117,7 +117,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
val table = loadTable("target")
val dvMaintainerFactory =
- new
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
+ BucketedDvMaintainer.factory(table.store().newIndexFileHandler())
runAndCheckSplitScan(s"""
|MERGE INTO target
|USING source
@@ -167,7 +167,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
val table = loadTable("T")
val dvMaintainerFactory =
- new
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
+ BucketedDvMaintainer.factory(table.store().newIndexFileHandler())
spark.sql("INSERT INTO T VALUES (1, 'a'), (2, 'b'), (3, 'c')")
val deletionVectors1 = getAllLatestDeletionVectors(table,
dvMaintainerFactory)
@@ -239,7 +239,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
val table = loadTable("T")
val dvMaintainerFactory =
- new
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
+ BucketedDvMaintainer.factory(table.store().newIndexFileHandler())
spark.sql(
"INSERT INTO T VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c',
'2025'), (4, 'd', '2025')")
@@ -326,7 +326,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
val table = loadTable("T")
val dvMaintainerFactory =
- new
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
+ BucketedDvMaintainer.factory(table.store().newIndexFileHandler())
spark.sql("INSERT INTO T VALUES (1, 'a'), (2, 'b')")
val deletionVectors1 = getAllLatestDeletionVectors(table,
dvMaintainerFactory)
@@ -383,7 +383,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
val table = loadTable("T")
val dvMaintainerFactory =
- new
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
+ BucketedDvMaintainer.factory(table.store().newIndexFileHandler())
def getDeletionVectors(ptValues: Seq[String]): Map[String,
DeletionVector] = {
getLatestDeletionVectors(
@@ -467,7 +467,7 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
Row(1, "a_new1") :: Row(2, "b") :: Row(3, "c_new1") :: Nil)
val dvMaintainerFactory =
- new
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
+ BucketedDvMaintainer.factory(table.store().newIndexFileHandler())
val deletionVectors1 = getAllLatestDeletionVectors(table,
dvMaintainerFactory)
// 1, 3 deleted, their row positions are 0, 2
@@ -705,17 +705,17 @@ class DeletionVectorTest extends PaimonSparkTestBase with
AdaptiveSparkPlanHelpe
private def getAllLatestDeletionVectors(
table: FileStoreTable,
- dvMaintainerFactory: DeletionVectorsMaintainer.Factory): Map[String,
DeletionVector] = {
+ dvMaintainerFactory: BucketedDvMaintainer.Factory): Map[String,
DeletionVector] = {
getLatestDeletionVectors(table, dvMaintainerFactory,
Seq(BinaryRow.EMPTY_ROW))
}
private def getLatestDeletionVectors(
table: FileStoreTable,
- dvMaintainerFactory: DeletionVectorsMaintainer.Factory,
+ dvMaintainerFactory: BucketedDvMaintainer.Factory,
partitions: Seq[BinaryRow]): Map[String, DeletionVector] = {
partitions.flatMap {
partition =>
- DeletionVectorsMaintainerTest
+ BucketedDvMaintainerTest
.createOrRestore(
dvMaintainerFactory,
table.snapshotManager().latestSnapshot(),