This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 37dbe90d35 [core] introduce v2 deletion vector (#5475)
37dbe90d35 is described below
commit 37dbe90d353427cfd3de1cc01751b160cba9e95c
Author: LsomeYeah <[email protected]>
AuthorDate: Thu Apr 17 13:17:00 2025 +0800
[core] introduce v2 deletion vector (#5475)
---
LICENSE | 4 +
.../paimon/utils/OptimizedRoaringBitmap64.java | 317 +++++++++++++++++++++
.../org/apache/paimon/utils/RoaringBitmap32.java | 9 +
.../deletionvectors/Bitmap64DeletionVector.java | 167 +++++++++++
.../DeletionVectorIndexFileWriter.java | 30 +-
.../deletionvectors/DeletionVectorsIndexFile.java | 65 ++++-
.../paimon/deletionvectors/DeletionVectorTest.java | 72 +++++
.../DeletionVectorsIndexFileTest.java | 141 +++++++--
8 files changed, 767 insertions(+), 38 deletions(-)
diff --git a/LICENSE b/LICENSE
index 513e18cb88..c5e688b325 100644
--- a/LICENSE
+++ b/LICENSE
@@ -254,6 +254,10 @@
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/pa
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkZOrderUDF.java
from http://iceberg.apache.org/ version 1.3.0
+paimon-common/src/main/java/org/apache/paimon/utils/OptimizedRoaringBitmap64.java
+paimon-core/src/main/java/org/apache/paimon/deletionvectors/Bitmap64DeletionVector.java
+from http://iceberg.apache.org/ version 1.8.1
+
paimon-hive/paimon-hive-common/src/test/resources/hive-schema-3.1.0.derby.sql
from https://hive.apache.org/ version 3.1.0
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/OptimizedRoaringBitmap64.java
b/paimon-common/src/main/java/org/apache/paimon/utils/OptimizedRoaringBitmap64.java
new file mode 100644
index 0000000000..5b14e25c30
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/utils/OptimizedRoaringBitmap64.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+/* This file is based on source code from the Iceberg Project
(http://iceberg.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+import org.apache.paimon.annotation.VisibleForTesting;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
+import org.roaringbitmap.RoaringBitmap;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.List;
+import java.util.function.LongConsumer;
+
+/**
+ * A bitmap that supports positive 64-bit positions (the most significant bit
must be 0), but is
+ * optimized for cases where most positions fit in 32 bits by using an array
of 32-bit Roaring
+ * bitmaps. The internal bitmap array is grown as needed to accommodate the
largest position.
+ *
+ * <p>Mostly copied from iceberg.
+ */
+public class OptimizedRoaringBitmap64 {
+
+ public static final long MAX_VALUE = toPosition(Integer.MAX_VALUE - 1,
Integer.MIN_VALUE);
+ private static final RoaringBitmap[] EMPTY_BITMAP_ARRAY = new
RoaringBitmap[0];
+ private static final long BITMAP_COUNT_SIZE_BYTES = 8L;
+ private static final long BITMAP_KEY_SIZE_BYTES = 4L;
+
+ private RoaringBitmap[] bitmaps;
+
+ public OptimizedRoaringBitmap64() {
+ this.bitmaps = EMPTY_BITMAP_ARRAY;
+ }
+
+ private OptimizedRoaringBitmap64(RoaringBitmap[] bitmaps) {
+ this.bitmaps = bitmaps;
+ }
+
+ public static OptimizedRoaringBitmap64 fromRoaringBitmap32(RoaringBitmap32
roaringBitmap32) {
+ RoaringBitmap roaringBitmap = roaringBitmap32.clone().get();
+ return new OptimizedRoaringBitmap64(new RoaringBitmap[]
{roaringBitmap});
+ }
+
+ /**
+ * Sets a position in the bitmap.
+ *
+ * @param pos the position
+ */
+ public void add(long pos) {
+ validatePosition(pos);
+ int key = key(pos);
+ int pos32Bits = pos32Bits(pos);
+ allocateBitmapsIfNeeded(key + 1 /* required bitmap array length */);
+ bitmaps[key].add(pos32Bits);
+ }
+
+ /**
+ * Sets a range of positions in the bitmap.
+ *
+ * @param posStartInclusive the start position of the range (inclusive)
+ * @param posEndExclusive the end position of the range (exclusive)
+ */
+ public void addRange(long posStartInclusive, long posEndExclusive) {
+ for (long pos = posStartInclusive; pos < posEndExclusive; pos++) {
+ add(pos);
+ }
+ }
+
+ /**
+ * Sets all positions from the other bitmap in this bitmap, modifying this
bitmap in place.
+ *
+ * @param that the other bitmap
+ */
+ public void or(OptimizedRoaringBitmap64 that) {
+ allocateBitmapsIfNeeded(that.bitmaps.length);
+ for (int key = 0; key < that.bitmaps.length; key++) {
+ bitmaps[key].or(that.bitmaps[key]);
+ }
+ }
+
+ /**
+ * Checks if a position is set in the bitmap.
+ *
+ * @param pos the position
+ * @return true if the position is set in this bitmap, false otherwise
+ */
+ public boolean contains(long pos) {
+ validatePosition(pos);
+ int key = key(pos);
+ int pos32Bits = pos32Bits(pos);
+ return key < bitmaps.length && bitmaps[key].contains(pos32Bits);
+ }
+
+ /**
+ * Indicates whether the bitmap has any positions set.
+ *
+ * @return true if the bitmap is empty, false otherwise
+ */
+ public boolean isEmpty() {
+ return cardinality() == 0;
+ }
+
+ /**
+ * Returns the number of set positions in the bitmap.
+ *
+ * @return the number of set positions
+ */
+ public long cardinality() {
+ long cardinality = 0L;
+ for (RoaringBitmap bitmap : bitmaps) {
+ cardinality += bitmap.getLongCardinality();
+ }
+ return cardinality;
+ }
+
+ /**
+ * Applies run-length encoding wherever it is more space efficient.
+ *
+ * @return whether the bitmap was changed
+ */
+ public boolean runLengthEncode() {
+ boolean changed = false;
+ for (RoaringBitmap bitmap : bitmaps) {
+ changed |= bitmap.runOptimize();
+ }
+ return changed;
+ }
+
+ /**
+ * Iterates over all positions in the bitmap.
+ *
+ * @param consumer a consumer for positions
+ */
+ public void forEach(LongConsumer consumer) {
+ for (int key = 0; key < bitmaps.length; key++) {
+ forEach(key, bitmaps[key], consumer);
+ }
+ }
+
+ @VisibleForTesting
+ int allocatedBitmapCount() {
+ return bitmaps.length;
+ }
+
+ private void allocateBitmapsIfNeeded(int requiredLength) {
+ if (bitmaps.length < requiredLength) {
+ if (bitmaps.length == 0 && requiredLength == 1) {
+ this.bitmaps = new RoaringBitmap[] {new RoaringBitmap()};
+ } else {
+ RoaringBitmap[] newBitmaps = new RoaringBitmap[requiredLength];
+ System.arraycopy(bitmaps, 0, newBitmaps, 0, bitmaps.length);
+ for (int key = bitmaps.length; key < requiredLength; key++) {
+ newBitmaps[key] = new RoaringBitmap();
+ }
+ this.bitmaps = newBitmaps;
+ }
+ }
+ }
+
+ /**
+ * Returns the number of bytes required to serialize the bitmap.
+ *
+ * @return the serialized size in bytes
+ */
+ public long serializedSizeInBytes() {
+ long size = BITMAP_COUNT_SIZE_BYTES;
+ for (RoaringBitmap bitmap : bitmaps) {
+ size += BITMAP_KEY_SIZE_BYTES + bitmap.serializedSizeInBytes();
+ }
+ return size;
+ }
+
+ /**
+ * Serializes the bitmap using the portable serialization format described
below.
+ *
+ * <ul>
+ * <li>The number of 32-bit Roaring bitmaps, serialized as 8 bytes
+ * <li>For each 32-bit Roaring bitmap, ordered by unsigned comparison of
the 32-bit keys:
+ * <ul>
+ * <li>The key stored as 4 bytes
+ * <li>Serialized 32-bit Roaring bitmap using the standard format
+ * </ul>
+ * </ul>
+ *
+ * <p>Note the byte order of the buffer must be little-endian.
+ *
+ * @param buffer the buffer to write to
+ * @see <a
href="https://github.com/RoaringBitmap/RoaringFormatSpec">Roaring bitmap
spec</a>
+ */
+ public void serialize(ByteBuffer buffer) {
+ validateByteOrder(buffer);
+ buffer.putLong(bitmaps.length);
+ for (int key = 0; key < bitmaps.length; key++) {
+ buffer.putInt(key);
+ bitmaps[key].serialize(buffer);
+ }
+ }
+
+ /**
+ * Deserializes a bitmap from a buffer, assuming the portable
serialization format.
+ *
+ * @param buffer the buffer to read from
+ * @return a new bitmap instance with the deserialized data
+ */
+ public static OptimizedRoaringBitmap64 deserialize(ByteBuffer buffer) {
+ validateByteOrder(buffer);
+
+ // the bitmap array may be sparse with more elements than the number
of read bitmaps
+ int remainingBitmapCount = readBitmapCount(buffer);
+ List<RoaringBitmap> bitmaps =
Lists.newArrayListWithExpectedSize(remainingBitmapCount);
+ int lastKey = -1;
+
+ while (remainingBitmapCount > 0) {
+ int key = readKey(buffer, lastKey);
+
+ // fill gaps as the bitmap array may be sparse
+ while (lastKey < key - 1) {
+ bitmaps.add(new RoaringBitmap());
+ lastKey++;
+ }
+
+ RoaringBitmap bitmap = readBitmap(buffer);
+ bitmaps.add(bitmap);
+
+ lastKey = key;
+ remainingBitmapCount--;
+ }
+
+ return new
OptimizedRoaringBitmap64(bitmaps.toArray(EMPTY_BITMAP_ARRAY));
+ }
+
+ private static void validateByteOrder(ByteBuffer buffer) {
+ Preconditions.checkArgument(
+ buffer.order() == ByteOrder.LITTLE_ENDIAN,
+ "Roaring bitmap serialization requires little-endian byte
order");
+ }
+
+ private static int readBitmapCount(ByteBuffer buffer) {
+ long bitmapCount = buffer.getLong();
+ Preconditions.checkArgument(
+ bitmapCount >= 0 && bitmapCount <= Integer.MAX_VALUE,
+ "Invalid bitmap count: %s",
+ bitmapCount);
+ return (int) bitmapCount;
+ }
+
+ private static int readKey(ByteBuffer buffer, int lastKey) {
+ int key = buffer.getInt();
+ Preconditions.checkArgument(key >= 0, "Invalid unsigned key: %s", key);
+ Preconditions.checkArgument(key <= Integer.MAX_VALUE - 1, "Key is too
large: %s", key);
+ Preconditions.checkArgument(key > lastKey, "Keys must be sorted in
ascending order");
+ return key;
+ }
+
+ private static RoaringBitmap readBitmap(ByteBuffer buffer) {
+ try {
+ RoaringBitmap bitmap = new RoaringBitmap();
+ bitmap.deserialize(buffer);
+ buffer.position(buffer.position() +
bitmap.serializedSizeInBytes());
+ return bitmap;
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+
+ // extracts high 32 bits from a 64-bit position (i.e. key)
+ private static int key(long pos) {
+ return (int) (pos >> 32);
+ }
+
+ // extracts low 32 bits from a 64-bit position (i.e. 32-bit position)
+ private static int pos32Bits(long pos) {
+ return (int) pos;
+ }
+
+ // combines high and low 32 bits into a 64-bit position
+ // the low 32 bits must be bit-masked to avoid sign extension
+ private static long toPosition(int key, int pos32Bits) {
+ return (((long) key) << 32) | (((long) pos32Bits) & 0xFFFFFFFFL);
+ }
+
+ // iterates over 64-bit positions, reconstructing them from keys and
32-bit positions
+ private static void forEach(int key, RoaringBitmap bitmap, LongConsumer
consumer) {
+ bitmap.forEach((int pos32Bits) -> consumer.accept(toPosition(key,
pos32Bits)));
+ }
+
+ private static void validatePosition(long pos) {
+ Preconditions.checkArgument(
+ pos >= 0 && pos <= MAX_VALUE,
+ "OptimizedRoaringBitmap64 supports positions that are >= 0 and
<= %s: %s",
+ MAX_VALUE,
+ pos);
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
index b1f58b47d0..dbcc4dfbe7 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
@@ -44,6 +44,15 @@ public class RoaringBitmap32 {
this.roaringBitmap = roaringBitmap;
}
+ /**
+ * Note: the result is read only, do not call any modify operation outside.
+ *
+ * @return the roaringBitmap
+ */
+ protected RoaringBitmap get() {
+ return roaringBitmap;
+ }
+
public void add(int x) {
roaringBitmap.add(x);
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/Bitmap64DeletionVector.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/Bitmap64DeletionVector.java
new file mode 100644
index 0000000000..b6ccd50c20
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/Bitmap64DeletionVector.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import org.apache.paimon.utils.OptimizedRoaringBitmap64;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.zip.CRC32;
+
+/**
+ * A {@link DeletionVector} based on {@link OptimizedRoaringBitmap64}, it only
supports files with
+ * row count not exceeding {@link OptimizedRoaringBitmap64#MAX_VALUE}.
+ *
+ * <p>Mostly copied from iceberg.
+ */
+public class Bitmap64DeletionVector implements DeletionVector {
+
+ public static final int MAGIC_NUMBER = 1681511377;
+ public static final int LENGTH_SIZE_BYTES = 4;
+ public static final int CRC_SIZE_BYTES = 4;
+ private static final int MAGIC_NUMBER_SIZE_BYTES = 4;
+ private static final int BITMAP_DATA_OFFSET = 4;
+
+ private final OptimizedRoaringBitmap64 roaringBitmap;
+
+ public Bitmap64DeletionVector() {
+ this.roaringBitmap = new OptimizedRoaringBitmap64();
+ }
+
+ private Bitmap64DeletionVector(OptimizedRoaringBitmap64 roaringBitmap) {
+ this.roaringBitmap = roaringBitmap;
+ }
+
+ public static Bitmap64DeletionVector fromBitmapDeletionVector(
+ BitmapDeletionVector bitmapDeletionVector) {
+ RoaringBitmap32 roaringBitmap32 = bitmapDeletionVector.get();
+ return new Bitmap64DeletionVector(
+ OptimizedRoaringBitmap64.fromRoaringBitmap32(roaringBitmap32));
+ }
+
+ @Override
+ public void delete(long position) {
+ roaringBitmap.add(position);
+ }
+
+ @Override
+ public void merge(DeletionVector deletionVector) {
+ if (deletionVector instanceof Bitmap64DeletionVector) {
+ roaringBitmap.or(((Bitmap64DeletionVector)
deletionVector).roaringBitmap);
+ } else {
+ throw new RuntimeException("Only instance with the same class type
can be merged.");
+ }
+ }
+
+ @Override
+ public boolean isDeleted(long position) {
+ return roaringBitmap.contains(position);
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return roaringBitmap.isEmpty();
+ }
+
+ @Override
+ public long getCardinality() {
+ return roaringBitmap.cardinality();
+ }
+
+ @Override
+ public byte[] serializeToBytes() {
+ roaringBitmap.runLengthEncode(); // run-length encode the bitmap
before serializing
+ int bitmapDataLength = computeBitmapDataLength(roaringBitmap); //
magic bytes + bitmap
+ byte[] bytes = new byte[LENGTH_SIZE_BYTES + bitmapDataLength +
CRC_SIZE_BYTES];
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ buffer.putInt(bitmapDataLength);
+ serializeBitmapData(bytes, bitmapDataLength, roaringBitmap);
+ int crcOffset = LENGTH_SIZE_BYTES + bitmapDataLength;
+ int crc = computeChecksum(bytes, bitmapDataLength);
+ buffer.putInt(crcOffset, crc);
+ buffer.rewind();
+ return bytes;
+ }
+
+ public static DeletionVector deserializeFromBytes(byte[] bytes) {
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ int bitmapDataLength = readBitmapDataLength(buffer, bytes.length);
+ OptimizedRoaringBitmap64 bitmap = deserializeBitmap(bytes,
bitmapDataLength);
+ int crc = computeChecksum(bytes, bitmapDataLength);
+ int crcOffset = LENGTH_SIZE_BYTES + bitmapDataLength;
+ int expectedCrc = buffer.getInt(crcOffset);
+ Preconditions.checkArgument(crc == expectedCrc, "Invalid CRC");
+ return new Bitmap64DeletionVector(bitmap);
+ }
+
+ // computes and validates the length of the bitmap data (magic bytes +
bitmap)
+ private static int computeBitmapDataLength(OptimizedRoaringBitmap64
bitmap) {
+ long length = MAGIC_NUMBER_SIZE_BYTES + bitmap.serializedSizeInBytes();
+ long bufferSize = LENGTH_SIZE_BYTES + length + CRC_SIZE_BYTES;
+ Preconditions.checkState(bufferSize <= Integer.MAX_VALUE, "Can't
serialize index > 2GB");
+ return (int) length;
+ }
+
+ private static void serializeBitmapData(
+ byte[] bytes, int bitmapDataLength, OptimizedRoaringBitmap64
bitmap) {
+ ByteBuffer bitmapData = pointToBitmapData(bytes, bitmapDataLength);
+ bitmapData.putInt(MAGIC_NUMBER);
+ bitmap.serialize(bitmapData);
+ }
+
+ // points to the bitmap data in the blob
+ private static ByteBuffer pointToBitmapData(byte[] bytes, int
bitmapDataLength) {
+ ByteBuffer bitmapData = ByteBuffer.wrap(bytes, BITMAP_DATA_OFFSET,
bitmapDataLength);
+ bitmapData.order(ByteOrder.LITTLE_ENDIAN);
+ return bitmapData;
+ }
+
+ // checks the size is equal to the bitmap data length + extra bytes for
length and CRC
+ private static int readBitmapDataLength(ByteBuffer buffer, int size) {
+ int length = buffer.getInt();
+ int expectedLength = size - LENGTH_SIZE_BYTES - CRC_SIZE_BYTES;
+ Preconditions.checkArgument(
+ length == expectedLength,
+ "Invalid bitmap data length: %s, expected %s",
+ length,
+ expectedLength);
+ return length;
+ }
+
+ // validates magic bytes and deserializes the bitmap
+ private static OptimizedRoaringBitmap64 deserializeBitmap(byte[] bytes,
int bitmapDataLength) {
+ ByteBuffer bitmapData = pointToBitmapData(bytes, bitmapDataLength);
+ int magicNumber = bitmapData.getInt();
+ Preconditions.checkArgument(
+ magicNumber == MAGIC_NUMBER,
+ "Invalid magic number: %s, expected %s",
+ magicNumber,
+ MAGIC_NUMBER);
+ return OptimizedRoaringBitmap64.deserialize(bitmapData);
+ }
+
+ // generates a 32-bit unsigned checksum for the magic bytes and serialized
bitmap
+ private static int computeChecksum(byte[] bytes, int bitmapDataLength) {
+ CRC32 crc = new CRC32();
+ crc.update(bytes, BITMAP_DATA_OFFSET, bitmapDataLength);
+ return (int) crc.getValue();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
index 5246d35d4b..a4b45d1929 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
@@ -38,6 +38,7 @@ import java.util.Map;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.VERSION_ID_V1;
+import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.VERSION_ID_V2;
import static
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.calculateChecksum;
/** Writer for deletion vector index file. */
@@ -47,11 +48,18 @@ public class DeletionVectorIndexFileWriter {
private final FileIO fileIO;
private final long targetSizeInBytes;
+ private final int versionId;
+
public DeletionVectorIndexFileWriter(
- FileIO fileIO, PathFactory pathFactory, MemorySize
targetSizePerIndexFile) {
+ FileIO fileIO,
+ PathFactory pathFactory,
+ MemorySize targetSizePerIndexFile,
+ int versionId) {
this.indexPathFactory = pathFactory;
this.fileIO = fileIO;
this.targetSizeInBytes = targetSizePerIndexFile.getBytes();
+
+ this.versionId = versionId;
}
/**
@@ -109,7 +117,7 @@ public class DeletionVectorIndexFileWriter {
private SingleIndexFileWriter() throws IOException {
this.path = indexPathFactory.newPath();
this.dataOutputStream = new
DataOutputStream(fileIO.newOutputStream(path, true));
- dataOutputStream.writeByte(VERSION_ID_V1);
+ dataOutputStream.writeByte(versionId);
this.dvMetas = new LinkedHashMap<>();
}
@@ -119,6 +127,14 @@ public class DeletionVectorIndexFileWriter {
private void write(String key, DeletionVector deletionVector) throws
IOException {
Preconditions.checkNotNull(dataOutputStream);
+ if (versionId == VERSION_ID_V1) {
+ writeV1(key, deletionVector);
+ } else if (versionId == VERSION_ID_V2) {
+ writeV2(key, deletionVector);
+ }
+ }
+
+ private void writeV1(String key, DeletionVector deletionVector) throws
IOException {
byte[] data = deletionVector.serializeToBytes();
int size = data.length;
dvMetas.put(
@@ -130,6 +146,16 @@ public class DeletionVectorIndexFileWriter {
dataOutputStream.writeInt(calculateChecksum(data));
}
+ private void writeV2(String key, DeletionVector deletionVector) throws
IOException {
+ byte[] data = deletionVector.serializeToBytes();
+ int size = data.length;
+ dvMetas.put(
+ key,
+ new DeletionVectorMeta(
+ key, dataOutputStream.size(), size,
deletionVector.getCardinality()));
+ dataOutputStream.write(data);
+ }
+
public IndexFileMeta writtenIndexFile() {
return new IndexFileMeta(
DELETION_VECTORS_INDEX,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
index ff81ac3b52..85958e5625 100644
---
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
+++
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
@@ -45,14 +45,26 @@ import static
org.apache.paimon.utils.Preconditions.checkNotNull;
public class DeletionVectorsIndexFile extends IndexFile {
public static final String DELETION_VECTORS_INDEX = "DELETION_VECTORS";
+ // Current version id is 1
public static final byte VERSION_ID_V1 = 1;
+ public static final byte VERSION_ID_V2 = 2;
+ private final byte writeVersionID;
private final MemorySize targetSizePerIndexFile;
public DeletionVectorsIndexFile(
FileIO fileIO, PathFactory pathFactory, MemorySize
targetSizePerIndexFile) {
+ this(fileIO, pathFactory, targetSizePerIndexFile, VERSION_ID_V1);
+ }
+
+ public DeletionVectorsIndexFile(
+ FileIO fileIO,
+ PathFactory pathFactory,
+ MemorySize targetSizePerIndexFile,
+ byte writeVersionID) {
super(fileIO, pathFactory);
this.targetSizePerIndexFile = targetSizePerIndexFile;
+ this.writeVersionID = writeVersionID;
}
/**
@@ -71,12 +83,12 @@ public class DeletionVectorsIndexFile extends IndexFile {
Map<String, DeletionVector> deletionVectors = new HashMap<>();
Path filePath = pathFactory.toPath(indexFileName);
try (SeekableInputStream inputStream =
fileIO.newInputStream(filePath)) {
- checkVersion(inputStream);
+ int version = checkVersion(inputStream);
DataInputStream dataInputStream = new DataInputStream(inputStream);
for (DeletionVectorMeta deletionVectorMeta :
deletionVectorMetas.values()) {
deletionVectors.put(
deletionVectorMeta.dataFileName(),
- readDeletionVector(dataInputStream,
deletionVectorMeta.length()));
+ readDeletionVector(dataInputStream,
deletionVectorMeta.length(), version));
}
} catch (Exception e) {
throw new RuntimeException(
@@ -105,14 +117,15 @@ public class DeletionVectorsIndexFile extends IndexFile {
String indexFile =
dataFileToDeletionFiles.values().stream().findAny().get().path();
try (SeekableInputStream inputStream = fileIO.newInputStream(new
Path(indexFile))) {
- checkVersion(inputStream);
+ int version = checkVersion(inputStream);
for (String dataFile : dataFileToDeletionFiles.keySet()) {
DeletionFile deletionFile =
dataFileToDeletionFiles.get(dataFile);
checkArgument(deletionFile.path().equals(indexFile));
inputStream.seek(deletionFile.offset());
DataInputStream dataInputStream = new
DataInputStream(inputStream);
deletionVectors.put(
- dataFile, readDeletionVector(dataInputStream, (int)
deletionFile.length()));
+ dataFile,
+ readDeletionVector(dataInputStream, (int)
deletionFile.length(), version));
}
} catch (Exception e) {
throw new RuntimeException("Unable to read deletion vector from
file: " + indexFile, e);
@@ -123,11 +136,11 @@ public class DeletionVectorsIndexFile extends IndexFile {
public DeletionVector readDeletionVector(DeletionFile deletionFile) {
String indexFile = deletionFile.path();
try (SeekableInputStream inputStream = fileIO.newInputStream(new
Path(indexFile))) {
- checkVersion(inputStream);
+ int version = checkVersion(inputStream);
checkArgument(deletionFile.path().equals(indexFile));
inputStream.seek(deletionFile.offset());
DataInputStream dataInputStream = new DataInputStream(inputStream);
- return readDeletionVector(dataInputStream, (int)
deletionFile.length());
+ return readDeletionVector(dataInputStream, (int)
deletionFile.length(), version);
} catch (Exception e) {
throw new RuntimeException("Unable to read deletion vector from
file: " + indexFile, e);
}
@@ -149,25 +162,42 @@ public class DeletionVectorsIndexFile extends IndexFile {
try {
DeletionVectorIndexFileWriter writer =
new DeletionVectorIndexFileWriter(
- this.fileIO, this.pathFactory,
this.targetSizePerIndexFile);
+ this.fileIO,
+ this.pathFactory,
+ this.targetSizePerIndexFile,
+ writeVersionID);
return writer.write(input);
} catch (IOException e) {
throw new RuntimeException("Failed to write deletion vectors.", e);
}
}
- private void checkVersion(InputStream in) throws IOException {
+ private int checkVersion(InputStream in) throws IOException {
int version = in.read();
- if (version != VERSION_ID_V1) {
+ if (version != VERSION_ID_V1 && version != VERSION_ID_V2) {
throw new RuntimeException(
"Version not match, actual version: "
+ version
- + ", expert version: "
- + VERSION_ID_V1);
+ + ", expected version: "
+ + VERSION_ID_V1
+ + " or "
+ + VERSION_ID_V2);
+ }
+ return version;
+ }
+
+ private DeletionVector readDeletionVector(
+ DataInputStream inputStream, int size, int readVersion) {
+ if (readVersion == VERSION_ID_V1) {
+ return readV1DeletionVector(inputStream, size);
+ } else if (readVersion == VERSION_ID_V2) {
+ return readV2DeletionVector(inputStream, size);
+ } else {
+ throw new RuntimeException("Unsupported DeletionVector version: "
+ writeVersionID);
}
}
- private DeletionVector readDeletionVector(DataInputStream inputStream, int
size) {
+ private DeletionVector readV1DeletionVector(DataInputStream inputStream,
int size) {
try {
// check size
int actualSize = inputStream.readInt();
@@ -196,6 +226,17 @@ public class DeletionVectorsIndexFile extends IndexFile {
}
}
+ private DeletionVector readV2DeletionVector(DataInputStream inputStream,
int size) {
+ try {
+ byte[] bytes = new byte[size];
+ inputStream.readFully(bytes);
+
+ return Bitmap64DeletionVector.deserializeFromBytes(bytes);
+ } catch (IOException e) {
+ throw new UncheckedIOException("Unable to read deletion vector",
e);
+ }
+ }
+
public static int calculateChecksum(byte[] bytes) {
CRC32 crc = new CRC32();
crc.update(bytes);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorTest.java
index d2b42b7986..a742c1c309 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorTest.java
@@ -22,6 +22,7 @@ import org.junit.jupiter.api.Test;
import java.util.HashSet;
import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
import static org.assertj.core.api.Assertions.assertThat;
@@ -62,4 +63,75 @@ public class DeletionVectorTest {
assertThat(deserializedDeletionVector.isDeleted(i)).isFalse();
}
}
+
+ @Test
+ public void testBitmap64DeletionVector() {
+ HashSet<Long> toDelete = new HashSet<>();
+ for (int i = 0; i < 10000; i++) {
+
toDelete.add(ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE * 2L));
+ }
+ HashSet<Integer> notDelete = new HashSet<>();
+ for (int i = 0; i < 10000; i++) {
+ if (!toDelete.contains(i)) {
+ notDelete.add(i);
+ }
+ }
+
+ DeletionVector deletionVector = new Bitmap64DeletionVector();
+ assertThat(deletionVector.isEmpty()).isTrue();
+
+ for (Long i : toDelete) {
+ assertThat(deletionVector.checkedDelete(i)).isTrue();
+ assertThat(deletionVector.checkedDelete(i)).isFalse();
+ }
+ DeletionVector deserializedDeletionVector =
+
Bitmap64DeletionVector.deserializeFromBytes(deletionVector.serializeToBytes());
+
+ assertThat(deletionVector.isEmpty()).isFalse();
+ assertThat(deserializedDeletionVector.isEmpty()).isFalse();
+ for (Long i : toDelete) {
+ assertThat(deletionVector.isDeleted(i)).isTrue();
+ assertThat(deserializedDeletionVector.isDeleted(i)).isTrue();
+ }
+ for (Integer i : notDelete) {
+ assertThat(deletionVector.isDeleted(i)).isFalse();
+ assertThat(deserializedDeletionVector.isDeleted(i)).isFalse();
+ }
+ }
+
+ @Test
+ public void testBitmapDeletionVectorTo64() {
+ HashSet<Integer> toDelete = new HashSet<>();
+ for (int i = 0; i < 10000; i++) {
+
toDelete.add(ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE));
+ }
+ HashSet<Integer> notDelete = new HashSet<>();
+ for (int i = 0; i < 10000; i++) {
+ if (!toDelete.contains(i)) {
+ notDelete.add(i);
+ }
+ }
+
+ BitmapDeletionVector deletionVector = new BitmapDeletionVector();
+ assertThat(deletionVector.isEmpty()).isTrue();
+
+ for (Integer i : toDelete) {
+ assertThat(deletionVector.checkedDelete(i)).isTrue();
+ assertThat(deletionVector.checkedDelete(i)).isFalse();
+ }
+
+ DeletionVector bitmap64DeletionVector =
+
Bitmap64DeletionVector.fromBitmapDeletionVector(deletionVector);
+
+ assertThat(bitmap64DeletionVector.isEmpty()).isFalse();
+
+ for (Integer i : toDelete) {
+ assertThat(deletionVector.isDeleted(i)).isTrue();
+ assertThat(bitmap64DeletionVector.isDeleted(i)).isTrue();
+ }
+ for (Integer i : notDelete) {
+ assertThat(deletionVector.isDeleted(i)).isFalse();
+ assertThat(bitmap64DeletionVector.isDeleted(i)).isFalse();
+ }
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
index a4dd8f92c1..a023e82639 100644
---
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
@@ -26,12 +26,16 @@ import org.apache.paimon.utils.PathFactory;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import static org.assertj.core.api.Assertions.assertThat;
@@ -40,26 +44,26 @@ public class DeletionVectorsIndexFileTest {
@TempDir java.nio.file.Path tempPath;
- @Test
- public void testReadDvIndex() {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testReadDvIndex(boolean isV2) {
PathFactory pathFactory = getPathFactory();
DeletionVectorsIndexFile deletionVectorsIndexFile =
- new DeletionVectorsIndexFile(
- LocalFileIO.create(), pathFactory,
MemorySize.ofBytes(Long.MAX_VALUE));
+ deletionVectorsIndexFile(pathFactory, isV2);
// write
HashMap<String, DeletionVector> deleteMap = new HashMap<>();
- BitmapDeletionVector index1 = new BitmapDeletionVector();
+ DeletionVector index1 = createEmptyDV(isV2);
index1.delete(1);
deleteMap.put("file1.parquet", index1);
- BitmapDeletionVector index2 = new BitmapDeletionVector();
+ DeletionVector index2 = createEmptyDV(isV2);
index2.delete(2);
index2.delete(3);
deleteMap.put("file2.parquet", index2);
- BitmapDeletionVector index3 = new BitmapDeletionVector();
+ DeletionVector index3 = createEmptyDV(isV2);
index3.delete(3);
deleteMap.put("file33.parquet", index3);
@@ -81,20 +85,23 @@ public class DeletionVectorsIndexFileTest {
assertThat(deletionVectorsIndexFile.exists(fileName)).isFalse();
}
- @Test
- public void testReadDvIndexWithCopiousDv() {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testReadDvIndexWithCopiousDv(boolean isV2) {
PathFactory pathFactory = getPathFactory();
DeletionVectorsIndexFile deletionVectorsIndexFile =
- new DeletionVectorsIndexFile(
- LocalFileIO.create(), pathFactory,
MemorySize.ofBytes(Long.MAX_VALUE));
+ deletionVectorsIndexFile(pathFactory, isV2);
// write
Random random = new Random();
HashMap<String, DeletionVector> deleteMap = new HashMap<>();
+ HashMap<String, Integer> deleteInteger = new HashMap<>();
for (int i = 0; i < 100000; i++) {
- BitmapDeletionVector index = new BitmapDeletionVector();
- index.delete(random.nextInt(1000000));
+ DeletionVector index = createEmptyDV(isV2);
+ int num = random.nextInt(1000000);
+ index.delete(num);
deleteMap.put(String.format("file%s.parquet", i), index);
+ deleteInteger.put(String.format("file%s.parquet", i), num);
}
// read
@@ -103,21 +110,26 @@ public class DeletionVectorsIndexFileTest {
Map<String, DeletionVector> dvs =
deletionVectorsIndexFile.readAllDeletionVectors(indexFiles);
assertThat(dvs.size()).isEqualTo(100000);
+ for (String file : dvs.keySet()) {
+ int delete = deleteInteger.get(file);
+ assertThat(dvs.get(file).isDeleted(delete)).isTrue();
+ assertThat(dvs.get(file).isDeleted(delete + 1)).isFalse();
+ }
}
- @Test
- public void testReadDvIndexWithEnormousDv() {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testReadDvIndexWithEnormousDv(boolean isV2) {
PathFactory pathFactory = getPathFactory();
DeletionVectorsIndexFile deletionVectorsIndexFile =
- new DeletionVectorsIndexFile(
- LocalFileIO.create(), pathFactory,
MemorySize.ofBytes(Long.MAX_VALUE));
+ deletionVectorsIndexFile(pathFactory, isV2);
// write
Random random = new Random();
Map<String, DeletionVector> fileToDV = new HashMap<>();
Map<String, Long> fileToCardinality = new HashMap<>();
for (int i = 0; i < 5; i++) {
- BitmapDeletionVector index = new BitmapDeletionVector();
+ DeletionVector index = createEmptyDV(isV2);
// the size of dv index file is about 20M
for (int j = 0; j < 10000000; j++) {
index.delete(random.nextInt(Integer.MAX_VALUE));
@@ -137,19 +149,19 @@ public class DeletionVectorsIndexFileTest {
}
}
- @Test
- public void testWriteDVIndexWithLimitedTargetSizePerIndexFile() {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testWriteDVIndexWithLimitedTargetSizePerIndexFile(boolean
isV2) {
PathFactory pathFactory = getPathFactory();
DeletionVectorsIndexFile deletionVectorsIndexFile =
- new DeletionVectorsIndexFile(
- LocalFileIO.create(), pathFactory,
MemorySize.parse("2MB"));
+ deletionVectorsIndexFile(pathFactory, MemorySize.parse("2MB"),
isV2);
// write1
Random random = new Random();
Map<String, DeletionVector> fileToDV = new HashMap<>();
Map<String, Long> fileToCardinality = new HashMap<>();
for (int i = 0; i < 5; i++) {
- BitmapDeletionVector index = new BitmapDeletionVector();
+ DeletionVector index = createEmptyDV(isV2);
// the size of dv index file is about 1.7M
for (int j = 0; j < 750000; j++) {
index.delete(random.nextInt(Integer.MAX_VALUE));
@@ -171,7 +183,7 @@ public class DeletionVectorsIndexFileTest {
fileToDV.clear();
fileToCardinality.clear();
for (int i = 0; i < 10; i++) {
- BitmapDeletionVector index = new BitmapDeletionVector();
+ DeletionVector index = createEmptyDV(isV2);
// the size of dv index file is about 0.42M
for (int j = 0; j < 100000; j++) {
index.delete(random.nextInt(Integer.MAX_VALUE));
@@ -189,6 +201,87 @@ public class DeletionVectorsIndexFileTest {
}
}
+ @Test
+ public void testReadV1AndV2() {
+ PathFactory pathFactory = getPathFactory();
+ DeletionVectorsIndexFile v1DeletionVectorsIndexFile =
+ deletionVectorsIndexFile(pathFactory, false);
+ DeletionVectorsIndexFile v2DeletionVectorsIndexFile =
+ deletionVectorsIndexFile(pathFactory, true);
+
+ // write v1 dv
+ Random random = new Random();
+ HashMap<String, Integer> deleteInteger = new HashMap<>();
+
+ HashMap<String, DeletionVector> deleteMap1 = new HashMap<>();
+ for (int i = 0; i < 50000; i++) {
+ DeletionVector index = createEmptyDV(false);
+ int num = random.nextInt(1000000);
+ index.delete(num);
+ deleteMap1.put(String.format("file%s.parquet", i), index);
+ deleteInteger.put(String.format("file%s.parquet", i), num);
+ }
+ List<IndexFileMeta> indexFiles1 =
v1DeletionVectorsIndexFile.write(deleteMap1);
+ assertThat(indexFiles1.size()).isEqualTo(1);
+
+ // write v2 dv
+ HashMap<String, DeletionVector> deleteMap2 = new HashMap<>();
+ for (int i = 50000; i < 100000; i++) {
+ DeletionVector index = createEmptyDV(true);
+ int num = random.nextInt(1000000);
+ index.delete(num);
+ deleteMap2.put(String.format("file%s.parquet", i), index);
+ deleteInteger.put(String.format("file%s.parquet", i), num);
+ }
+ List<IndexFileMeta> indexFiles2 =
v2DeletionVectorsIndexFile.write(deleteMap2);
+ assertThat(indexFiles2.size()).isEqualTo(1);
+
+ List<IndexFileMeta> totalIndexFiles =
+ Stream.concat(indexFiles1.stream(), indexFiles2.stream())
+ .collect(Collectors.toList());
+ // read when writeVersionID is V1
+ Map<String, DeletionVector> dvs1 =
+
v1DeletionVectorsIndexFile.readAllDeletionVectors(totalIndexFiles);
+ assertThat(dvs1.size()).isEqualTo(100000);
+ for (String file : dvs1.keySet()) {
+ int delete = deleteInteger.get(file);
+ assertThat(dvs1.get(file).isDeleted(delete)).isTrue();
+ assertThat(dvs1.get(file).isDeleted(delete + 1)).isFalse();
+ }
+
+ // read when writeVersionID is V2
+ Map<String, DeletionVector> dvs2 =
+
v2DeletionVectorsIndexFile.readAllDeletionVectors(totalIndexFiles);
+ assertThat(dvs2.size()).isEqualTo(100000);
+ }
+
+ private DeletionVector createEmptyDV(boolean isV2) {
+ if (isV2) {
+ return new Bitmap64DeletionVector();
+ }
+ return new BitmapDeletionVector();
+ }
+
+ private DeletionVectorsIndexFile deletionVectorsIndexFile(
+ PathFactory pathFactory, boolean isV2) {
+
+ return deletionVectorsIndexFile(pathFactory,
MemorySize.ofBytes(Long.MAX_VALUE), isV2);
+ }
+
+ private DeletionVectorsIndexFile deletionVectorsIndexFile(
+ PathFactory pathFactory, MemorySize targetSizePerIndexFile,
boolean isV2) {
+ if (isV2) {
+ return new DeletionVectorsIndexFile(
+ LocalFileIO.create(),
+ pathFactory,
+ targetSizePerIndexFile,
+ DeletionVectorsIndexFile.VERSION_ID_V2);
+ } else {
+ return new DeletionVectorsIndexFile(
+ LocalFileIO.create(), pathFactory, targetSizePerIndexFile);
+ }
+ }
+
private PathFactory getPathFactory() {
Path dir = new Path(tempPath.toUri());
return new PathFactory() {