This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 37dbe90d35 [core] introduce v2 deletion vector (#5475)
37dbe90d35 is described below

commit 37dbe90d353427cfd3de1cc01751b160cba9e95c
Author: LsomeYeah <[email protected]>
AuthorDate: Thu Apr 17 13:17:00 2025 +0800

    [core] introduce v2 deletion vector (#5475)
---
 LICENSE                                            |   4 +
 .../paimon/utils/OptimizedRoaringBitmap64.java     | 317 +++++++++++++++++++++
 .../org/apache/paimon/utils/RoaringBitmap32.java   |   9 +
 .../deletionvectors/Bitmap64DeletionVector.java    | 167 +++++++++++
 .../DeletionVectorIndexFileWriter.java             |  30 +-
 .../deletionvectors/DeletionVectorsIndexFile.java  |  65 ++++-
 .../paimon/deletionvectors/DeletionVectorTest.java |  72 +++++
 .../DeletionVectorsIndexFileTest.java              | 141 +++++++--
 8 files changed, 767 insertions(+), 38 deletions(-)

diff --git a/LICENSE b/LICENSE
index 513e18cb88..c5e688b325 100644
--- a/LICENSE
+++ b/LICENSE
@@ -254,6 +254,10 @@ 
paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/pa
 
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/sort/SparkZOrderUDF.java
 from http://iceberg.apache.org/ version 1.3.0
 
+paimon-common/src/main/java/org/apache/paimon/utils/OptimizedRoaringBitmap64.java
+paimon-core/src/main/java/org/apache/paimon/deletionvectors/Bitmap64DeletionVector.java
+from http://iceberg.apache.org/ version 1.8.1
+
 paimon-hive/paimon-hive-common/src/test/resources/hive-schema-3.1.0.derby.sql
 from https://hive.apache.org/ version 3.1.0
 
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/OptimizedRoaringBitmap64.java
 
b/paimon-common/src/main/java/org/apache/paimon/utils/OptimizedRoaringBitmap64.java
new file mode 100644
index 0000000000..5b14e25c30
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/OptimizedRoaringBitmap64.java
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+/* This file is based on source code from the Iceberg Project 
(http://iceberg.apache.org/), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+import org.apache.paimon.annotation.VisibleForTesting;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
+import org.roaringbitmap.RoaringBitmap;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.List;
+import java.util.function.LongConsumer;
+
+/**
+ * A bitmap that supports positive 64-bit positions (the most significant bit 
must be 0), but is
+ * optimized for cases where most positions fit in 32 bits by using an array 
of 32-bit Roaring
+ * bitmaps. The internal bitmap array is grown as needed to accommodate the 
largest position.
+ *
+ * <p>Mostly copied from iceberg.
+ */
+public class OptimizedRoaringBitmap64 {
+
+    public static final long MAX_VALUE = toPosition(Integer.MAX_VALUE - 1, 
Integer.MIN_VALUE);
+    private static final RoaringBitmap[] EMPTY_BITMAP_ARRAY = new 
RoaringBitmap[0];
+    private static final long BITMAP_COUNT_SIZE_BYTES = 8L;
+    private static final long BITMAP_KEY_SIZE_BYTES = 4L;
+
+    private RoaringBitmap[] bitmaps;
+
+    public OptimizedRoaringBitmap64() {
+        this.bitmaps = EMPTY_BITMAP_ARRAY;
+    }
+
+    private OptimizedRoaringBitmap64(RoaringBitmap[] bitmaps) {
+        this.bitmaps = bitmaps;
+    }
+
+    public static OptimizedRoaringBitmap64 fromRoaringBitmap32(RoaringBitmap32 
roaringBitmap32) {
+        RoaringBitmap roaringBitmap = roaringBitmap32.clone().get();
+        return new OptimizedRoaringBitmap64(new RoaringBitmap[] 
{roaringBitmap});
+    }
+
+    /**
+     * Sets a position in the bitmap.
+     *
+     * @param pos the position
+     */
+    public void add(long pos) {
+        validatePosition(pos);
+        int key = key(pos);
+        int pos32Bits = pos32Bits(pos);
+        allocateBitmapsIfNeeded(key + 1 /* required bitmap array length */);
+        bitmaps[key].add(pos32Bits);
+    }
+
+    /**
+     * Sets a range of positions in the bitmap.
+     *
+     * @param posStartInclusive the start position of the range (inclusive)
+     * @param posEndExclusive the end position of the range (exclusive)
+     */
+    public void addRange(long posStartInclusive, long posEndExclusive) {
+        for (long pos = posStartInclusive; pos < posEndExclusive; pos++) {
+            add(pos);
+        }
+    }
+
+    /**
+     * Sets all positions from the other bitmap in this bitmap, modifying this 
bitmap in place.
+     *
+     * @param that the other bitmap
+     */
+    public void or(OptimizedRoaringBitmap64 that) {
+        allocateBitmapsIfNeeded(that.bitmaps.length);
+        for (int key = 0; key < that.bitmaps.length; key++) {
+            bitmaps[key].or(that.bitmaps[key]);
+        }
+    }
+
+    /**
+     * Checks if a position is set in the bitmap.
+     *
+     * @param pos the position
+     * @return true if the position is set in this bitmap, false otherwise
+     */
+    public boolean contains(long pos) {
+        validatePosition(pos);
+        int key = key(pos);
+        int pos32Bits = pos32Bits(pos);
+        return key < bitmaps.length && bitmaps[key].contains(pos32Bits);
+    }
+
+    /**
+     * Indicates whether the bitmap has any positions set.
+     *
+     * @return true if the bitmap is empty, false otherwise
+     */
+    public boolean isEmpty() {
+        return cardinality() == 0;
+    }
+
+    /**
+     * Returns the number of set positions in the bitmap.
+     *
+     * @return the number of set positions
+     */
+    public long cardinality() {
+        long cardinality = 0L;
+        for (RoaringBitmap bitmap : bitmaps) {
+            cardinality += bitmap.getLongCardinality();
+        }
+        return cardinality;
+    }
+
+    /**
+     * Applies run-length encoding wherever it is more space efficient.
+     *
+     * @return whether the bitmap was changed
+     */
+    public boolean runLengthEncode() {
+        boolean changed = false;
+        for (RoaringBitmap bitmap : bitmaps) {
+            changed |= bitmap.runOptimize();
+        }
+        return changed;
+    }
+
+    /**
+     * Iterates over all positions in the bitmap.
+     *
+     * @param consumer a consumer for positions
+     */
+    public void forEach(LongConsumer consumer) {
+        for (int key = 0; key < bitmaps.length; key++) {
+            forEach(key, bitmaps[key], consumer);
+        }
+    }
+
+    @VisibleForTesting
+    int allocatedBitmapCount() {
+        return bitmaps.length;
+    }
+
+    private void allocateBitmapsIfNeeded(int requiredLength) {
+        if (bitmaps.length < requiredLength) {
+            if (bitmaps.length == 0 && requiredLength == 1) {
+                this.bitmaps = new RoaringBitmap[] {new RoaringBitmap()};
+            } else {
+                RoaringBitmap[] newBitmaps = new RoaringBitmap[requiredLength];
+                System.arraycopy(bitmaps, 0, newBitmaps, 0, bitmaps.length);
+                for (int key = bitmaps.length; key < requiredLength; key++) {
+                    newBitmaps[key] = new RoaringBitmap();
+                }
+                this.bitmaps = newBitmaps;
+            }
+        }
+    }
+
+    /**
+     * Returns the number of bytes required to serialize the bitmap.
+     *
+     * @return the serialized size in bytes
+     */
+    public long serializedSizeInBytes() {
+        long size = BITMAP_COUNT_SIZE_BYTES;
+        for (RoaringBitmap bitmap : bitmaps) {
+            size += BITMAP_KEY_SIZE_BYTES + bitmap.serializedSizeInBytes();
+        }
+        return size;
+    }
+
+    /**
+     * Serializes the bitmap using the portable serialization format described 
below.
+     *
+     * <ul>
+     *   <li>The number of 32-bit Roaring bitmaps, serialized as 8 bytes
+     *   <li>For each 32-bit Roaring bitmap, ordered by unsigned comparison of 
the 32-bit keys:
+     *       <ul>
+     *         <li>The key stored as 4 bytes
+     *         <li>Serialized 32-bit Roaring bitmap using the standard format
+     *       </ul>
+     * </ul>
+     *
+     * <p>Note the byte order of the buffer must be little-endian.
+     *
+     * @param buffer the buffer to write to
+     * @see <a 
href="https://github.com/RoaringBitmap/RoaringFormatSpec";>Roaring bitmap 
spec</a>
+     */
+    public void serialize(ByteBuffer buffer) {
+        validateByteOrder(buffer);
+        buffer.putLong(bitmaps.length);
+        for (int key = 0; key < bitmaps.length; key++) {
+            buffer.putInt(key);
+            bitmaps[key].serialize(buffer);
+        }
+    }
+
+    /**
+     * Deserializes a bitmap from a buffer, assuming the portable 
serialization format.
+     *
+     * @param buffer the buffer to read from
+     * @return a new bitmap instance with the deserialized data
+     */
+    public static OptimizedRoaringBitmap64 deserialize(ByteBuffer buffer) {
+        validateByteOrder(buffer);
+
+        // the bitmap array may be sparse with more elements than the number 
of read bitmaps
+        int remainingBitmapCount = readBitmapCount(buffer);
+        List<RoaringBitmap> bitmaps = 
Lists.newArrayListWithExpectedSize(remainingBitmapCount);
+        int lastKey = -1;
+
+        while (remainingBitmapCount > 0) {
+            int key = readKey(buffer, lastKey);
+
+            // fill gaps as the bitmap array may be sparse
+            while (lastKey < key - 1) {
+                bitmaps.add(new RoaringBitmap());
+                lastKey++;
+            }
+
+            RoaringBitmap bitmap = readBitmap(buffer);
+            bitmaps.add(bitmap);
+
+            lastKey = key;
+            remainingBitmapCount--;
+        }
+
+        return new 
OptimizedRoaringBitmap64(bitmaps.toArray(EMPTY_BITMAP_ARRAY));
+    }
+
+    private static void validateByteOrder(ByteBuffer buffer) {
+        Preconditions.checkArgument(
+                buffer.order() == ByteOrder.LITTLE_ENDIAN,
+                "Roaring bitmap serialization requires little-endian byte 
order");
+    }
+
+    private static int readBitmapCount(ByteBuffer buffer) {
+        long bitmapCount = buffer.getLong();
+        Preconditions.checkArgument(
+                bitmapCount >= 0 && bitmapCount <= Integer.MAX_VALUE,
+                "Invalid bitmap count: %s",
+                bitmapCount);
+        return (int) bitmapCount;
+    }
+
+    private static int readKey(ByteBuffer buffer, int lastKey) {
+        int key = buffer.getInt();
+        Preconditions.checkArgument(key >= 0, "Invalid unsigned key: %s", key);
+        Preconditions.checkArgument(key <= Integer.MAX_VALUE - 1, "Key is too 
large: %s", key);
+        Preconditions.checkArgument(key > lastKey, "Keys must be sorted in 
ascending order");
+        return key;
+    }
+
+    private static RoaringBitmap readBitmap(ByteBuffer buffer) {
+        try {
+            RoaringBitmap bitmap = new RoaringBitmap();
+            bitmap.deserialize(buffer);
+            buffer.position(buffer.position() + 
bitmap.serializedSizeInBytes());
+            return bitmap;
+        } catch (IOException e) {
+            throw new UncheckedIOException(e);
+        }
+    }
+
+    // extracts high 32 bits from a 64-bit position (i.e. key)
+    private static int key(long pos) {
+        return (int) (pos >> 32);
+    }
+
+    // extracts low 32 bits from a 64-bit position (i.e. 32-bit position)
+    private static int pos32Bits(long pos) {
+        return (int) pos;
+    }
+
+    // combines high and low 32 bits into a 64-bit position
+    // the low 32 bits must be bit-masked to avoid sign extension
+    private static long toPosition(int key, int pos32Bits) {
+        return (((long) key) << 32) | (((long) pos32Bits) & 0xFFFFFFFFL);
+    }
+
+    // iterates over 64-bit positions, reconstructing them from keys and 
32-bit positions
+    private static void forEach(int key, RoaringBitmap bitmap, LongConsumer 
consumer) {
+        bitmap.forEach((int pos32Bits) -> consumer.accept(toPosition(key, 
pos32Bits)));
+    }
+
+    private static void validatePosition(long pos) {
+        Preconditions.checkArgument(
+                pos >= 0 && pos <= MAX_VALUE,
+                "OptimizedRoaringBitmap64 supports positions that are >= 0 and 
<= %s: %s",
+                MAX_VALUE,
+                pos);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
index b1f58b47d0..dbcc4dfbe7 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java
@@ -44,6 +44,15 @@ public class RoaringBitmap32 {
         this.roaringBitmap = roaringBitmap;
     }
 
+    /**
+     * Note: the result is read only, do not call any modify operation outside.
+     *
+     * @return the roaringBitmap
+     */
+    protected RoaringBitmap get() {
+        return roaringBitmap;
+    }
+
     public void add(int x) {
         roaringBitmap.add(x);
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/Bitmap64DeletionVector.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/Bitmap64DeletionVector.java
new file mode 100644
index 0000000000..b6ccd50c20
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/Bitmap64DeletionVector.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.deletionvectors;
+
+import org.apache.paimon.utils.OptimizedRoaringBitmap64;
+import org.apache.paimon.utils.Preconditions;
+import org.apache.paimon.utils.RoaringBitmap32;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.zip.CRC32;
+
+/**
+ * A {@link DeletionVector} based on {@link OptimizedRoaringBitmap64}, it only 
supports files with
+ * row count not exceeding {@link OptimizedRoaringBitmap64#MAX_VALUE}.
+ *
+ * <p>Mostly copied from iceberg.
+ */
+public class Bitmap64DeletionVector implements DeletionVector {
+
+    public static final int MAGIC_NUMBER = 1681511377;
+    public static final int LENGTH_SIZE_BYTES = 4;
+    public static final int CRC_SIZE_BYTES = 4;
+    private static final int MAGIC_NUMBER_SIZE_BYTES = 4;
+    private static final int BITMAP_DATA_OFFSET = 4;
+
+    private final OptimizedRoaringBitmap64 roaringBitmap;
+
+    public Bitmap64DeletionVector() {
+        this.roaringBitmap = new OptimizedRoaringBitmap64();
+    }
+
+    private Bitmap64DeletionVector(OptimizedRoaringBitmap64 roaringBitmap) {
+        this.roaringBitmap = roaringBitmap;
+    }
+
+    public static Bitmap64DeletionVector fromBitmapDeletionVector(
+            BitmapDeletionVector bitmapDeletionVector) {
+        RoaringBitmap32 roaringBitmap32 = bitmapDeletionVector.get();
+        return new Bitmap64DeletionVector(
+                OptimizedRoaringBitmap64.fromRoaringBitmap32(roaringBitmap32));
+    }
+
+    @Override
+    public void delete(long position) {
+        roaringBitmap.add(position);
+    }
+
+    @Override
+    public void merge(DeletionVector deletionVector) {
+        if (deletionVector instanceof Bitmap64DeletionVector) {
+            roaringBitmap.or(((Bitmap64DeletionVector) 
deletionVector).roaringBitmap);
+        } else {
+            throw new RuntimeException("Only instance with the same class type 
can be merged.");
+        }
+    }
+
+    @Override
+    public boolean isDeleted(long position) {
+        return roaringBitmap.contains(position);
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return roaringBitmap.isEmpty();
+    }
+
+    @Override
+    public long getCardinality() {
+        return roaringBitmap.cardinality();
+    }
+
+    @Override
+    public byte[] serializeToBytes() {
+        roaringBitmap.runLengthEncode(); // run-length encode the bitmap 
before serializing
+        int bitmapDataLength = computeBitmapDataLength(roaringBitmap); // 
magic bytes + bitmap
+        byte[] bytes = new byte[LENGTH_SIZE_BYTES + bitmapDataLength + 
CRC_SIZE_BYTES];
+        ByteBuffer buffer = ByteBuffer.wrap(bytes);
+        buffer.putInt(bitmapDataLength);
+        serializeBitmapData(bytes, bitmapDataLength, roaringBitmap);
+        int crcOffset = LENGTH_SIZE_BYTES + bitmapDataLength;
+        int crc = computeChecksum(bytes, bitmapDataLength);
+        buffer.putInt(crcOffset, crc);
+        buffer.rewind();
+        return bytes;
+    }
+
+    public static DeletionVector deserializeFromBytes(byte[] bytes) {
+        ByteBuffer buffer = ByteBuffer.wrap(bytes);
+        int bitmapDataLength = readBitmapDataLength(buffer, bytes.length);
+        OptimizedRoaringBitmap64 bitmap = deserializeBitmap(bytes, 
bitmapDataLength);
+        int crc = computeChecksum(bytes, bitmapDataLength);
+        int crcOffset = LENGTH_SIZE_BYTES + bitmapDataLength;
+        int expectedCrc = buffer.getInt(crcOffset);
+        Preconditions.checkArgument(crc == expectedCrc, "Invalid CRC");
+        return new Bitmap64DeletionVector(bitmap);
+    }
+
+    // computes and validates the length of the bitmap data (magic bytes + 
bitmap)
+    private static int computeBitmapDataLength(OptimizedRoaringBitmap64 
bitmap) {
+        long length = MAGIC_NUMBER_SIZE_BYTES + bitmap.serializedSizeInBytes();
+        long bufferSize = LENGTH_SIZE_BYTES + length + CRC_SIZE_BYTES;
+        Preconditions.checkState(bufferSize <= Integer.MAX_VALUE, "Can't 
serialize index > 2GB");
+        return (int) length;
+    }
+
+    private static void serializeBitmapData(
+            byte[] bytes, int bitmapDataLength, OptimizedRoaringBitmap64 
bitmap) {
+        ByteBuffer bitmapData = pointToBitmapData(bytes, bitmapDataLength);
+        bitmapData.putInt(MAGIC_NUMBER);
+        bitmap.serialize(bitmapData);
+    }
+
+    // points to the bitmap data in the blob
+    private static ByteBuffer pointToBitmapData(byte[] bytes, int 
bitmapDataLength) {
+        ByteBuffer bitmapData = ByteBuffer.wrap(bytes, BITMAP_DATA_OFFSET, 
bitmapDataLength);
+        bitmapData.order(ByteOrder.LITTLE_ENDIAN);
+        return bitmapData;
+    }
+
+    // checks the size is equal to the bitmap data length + extra bytes for 
length and CRC
+    private static int readBitmapDataLength(ByteBuffer buffer, int size) {
+        int length = buffer.getInt();
+        int expectedLength = size - LENGTH_SIZE_BYTES - CRC_SIZE_BYTES;
+        Preconditions.checkArgument(
+                length == expectedLength,
+                "Invalid bitmap data length: %s, expected %s",
+                length,
+                expectedLength);
+        return length;
+    }
+
+    // validates magic bytes and deserializes the bitmap
+    private static OptimizedRoaringBitmap64 deserializeBitmap(byte[] bytes, 
int bitmapDataLength) {
+        ByteBuffer bitmapData = pointToBitmapData(bytes, bitmapDataLength);
+        int magicNumber = bitmapData.getInt();
+        Preconditions.checkArgument(
+                magicNumber == MAGIC_NUMBER,
+                "Invalid magic number: %s, expected %s",
+                magicNumber,
+                MAGIC_NUMBER);
+        return OptimizedRoaringBitmap64.deserialize(bitmapData);
+    }
+
+    // generates a 32-bit unsigned checksum for the magic bytes and serialized 
bitmap
+    private static int computeChecksum(byte[] bytes, int bitmapDataLength) {
+        CRC32 crc = new CRC32();
+        crc.update(bytes, BITMAP_DATA_OFFSET, bitmapDataLength);
+        return (int) crc.getValue();
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
index 5246d35d4b..a4b45d1929 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorIndexFileWriter.java
@@ -38,6 +38,7 @@ import java.util.Map;
 
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX;
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.VERSION_ID_V1;
+import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.VERSION_ID_V2;
 import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.calculateChecksum;
 
 /** Writer for deletion vector index file. */
@@ -47,11 +48,18 @@ public class DeletionVectorIndexFileWriter {
     private final FileIO fileIO;
     private final long targetSizeInBytes;
 
+    private final int versionId;
+
     public DeletionVectorIndexFileWriter(
-            FileIO fileIO, PathFactory pathFactory, MemorySize 
targetSizePerIndexFile) {
+            FileIO fileIO,
+            PathFactory pathFactory,
+            MemorySize targetSizePerIndexFile,
+            int versionId) {
         this.indexPathFactory = pathFactory;
         this.fileIO = fileIO;
         this.targetSizeInBytes = targetSizePerIndexFile.getBytes();
+
+        this.versionId = versionId;
     }
 
     /**
@@ -109,7 +117,7 @@ public class DeletionVectorIndexFileWriter {
         private SingleIndexFileWriter() throws IOException {
             this.path = indexPathFactory.newPath();
             this.dataOutputStream = new 
DataOutputStream(fileIO.newOutputStream(path, true));
-            dataOutputStream.writeByte(VERSION_ID_V1);
+            dataOutputStream.writeByte(versionId);
             this.dvMetas = new LinkedHashMap<>();
         }
 
@@ -119,6 +127,14 @@ public class DeletionVectorIndexFileWriter {
 
         private void write(String key, DeletionVector deletionVector) throws 
IOException {
             Preconditions.checkNotNull(dataOutputStream);
+            if (versionId == VERSION_ID_V1) {
+                writeV1(key, deletionVector);
+            } else if (versionId == VERSION_ID_V2) {
+                writeV2(key, deletionVector);
+            }
+        }
+
+        private void writeV1(String key, DeletionVector deletionVector) throws 
IOException {
             byte[] data = deletionVector.serializeToBytes();
             int size = data.length;
             dvMetas.put(
@@ -130,6 +146,16 @@ public class DeletionVectorIndexFileWriter {
             dataOutputStream.writeInt(calculateChecksum(data));
         }
 
+        private void writeV2(String key, DeletionVector deletionVector) throws 
IOException {
+            byte[] data = deletionVector.serializeToBytes();
+            int size = data.length;
+            dvMetas.put(
+                    key,
+                    new DeletionVectorMeta(
+                            key, dataOutputStream.size(), size, 
deletionVector.getCardinality()));
+            dataOutputStream.write(data);
+        }
+
         public IndexFileMeta writtenIndexFile() {
             return new IndexFileMeta(
                     DELETION_VECTORS_INDEX,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
index ff81ac3b52..85958e5625 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFile.java
@@ -45,14 +45,26 @@ import static 
org.apache.paimon.utils.Preconditions.checkNotNull;
 public class DeletionVectorsIndexFile extends IndexFile {
 
     public static final String DELETION_VECTORS_INDEX = "DELETION_VECTORS";
+    // Current version id is 1
     public static final byte VERSION_ID_V1 = 1;
+    public static final byte VERSION_ID_V2 = 2;
 
+    private final byte writeVersionID;
     private final MemorySize targetSizePerIndexFile;
 
     public DeletionVectorsIndexFile(
             FileIO fileIO, PathFactory pathFactory, MemorySize 
targetSizePerIndexFile) {
+        this(fileIO, pathFactory, targetSizePerIndexFile, VERSION_ID_V1);
+    }
+
+    public DeletionVectorsIndexFile(
+            FileIO fileIO,
+            PathFactory pathFactory,
+            MemorySize targetSizePerIndexFile,
+            byte writeVersionID) {
         super(fileIO, pathFactory);
         this.targetSizePerIndexFile = targetSizePerIndexFile;
+        this.writeVersionID = writeVersionID;
     }
 
     /**
@@ -71,12 +83,12 @@ public class DeletionVectorsIndexFile extends IndexFile {
         Map<String, DeletionVector> deletionVectors = new HashMap<>();
         Path filePath = pathFactory.toPath(indexFileName);
         try (SeekableInputStream inputStream = 
fileIO.newInputStream(filePath)) {
-            checkVersion(inputStream);
+            int version = checkVersion(inputStream);
             DataInputStream dataInputStream = new DataInputStream(inputStream);
             for (DeletionVectorMeta deletionVectorMeta : 
deletionVectorMetas.values()) {
                 deletionVectors.put(
                         deletionVectorMeta.dataFileName(),
-                        readDeletionVector(dataInputStream, 
deletionVectorMeta.length()));
+                        readDeletionVector(dataInputStream, 
deletionVectorMeta.length(), version));
             }
         } catch (Exception e) {
             throw new RuntimeException(
@@ -105,14 +117,15 @@ public class DeletionVectorsIndexFile extends IndexFile {
 
         String indexFile = 
dataFileToDeletionFiles.values().stream().findAny().get().path();
         try (SeekableInputStream inputStream = fileIO.newInputStream(new 
Path(indexFile))) {
-            checkVersion(inputStream);
+            int version = checkVersion(inputStream);
             for (String dataFile : dataFileToDeletionFiles.keySet()) {
                 DeletionFile deletionFile = 
dataFileToDeletionFiles.get(dataFile);
                 checkArgument(deletionFile.path().equals(indexFile));
                 inputStream.seek(deletionFile.offset());
                 DataInputStream dataInputStream = new 
DataInputStream(inputStream);
                 deletionVectors.put(
-                        dataFile, readDeletionVector(dataInputStream, (int) 
deletionFile.length()));
+                        dataFile,
+                        readDeletionVector(dataInputStream, (int) 
deletionFile.length(), version));
             }
         } catch (Exception e) {
             throw new RuntimeException("Unable to read deletion vector from 
file: " + indexFile, e);
@@ -123,11 +136,11 @@ public class DeletionVectorsIndexFile extends IndexFile {
     public DeletionVector readDeletionVector(DeletionFile deletionFile) {
         String indexFile = deletionFile.path();
         try (SeekableInputStream inputStream = fileIO.newInputStream(new 
Path(indexFile))) {
-            checkVersion(inputStream);
+            int version = checkVersion(inputStream);
             checkArgument(deletionFile.path().equals(indexFile));
             inputStream.seek(deletionFile.offset());
             DataInputStream dataInputStream = new DataInputStream(inputStream);
-            return readDeletionVector(dataInputStream, (int) 
deletionFile.length());
+            return readDeletionVector(dataInputStream, (int) 
deletionFile.length(), version);
         } catch (Exception e) {
             throw new RuntimeException("Unable to read deletion vector from 
file: " + indexFile, e);
         }
@@ -149,25 +162,42 @@ public class DeletionVectorsIndexFile extends IndexFile {
         try {
             DeletionVectorIndexFileWriter writer =
                     new DeletionVectorIndexFileWriter(
-                            this.fileIO, this.pathFactory, 
this.targetSizePerIndexFile);
+                            this.fileIO,
+                            this.pathFactory,
+                            this.targetSizePerIndexFile,
+                            writeVersionID);
             return writer.write(input);
         } catch (IOException e) {
             throw new RuntimeException("Failed to write deletion vectors.", e);
         }
     }
 
-    private void checkVersion(InputStream in) throws IOException {
+    private int checkVersion(InputStream in) throws IOException {
         int version = in.read();
-        if (version != VERSION_ID_V1) {
+        if (version != VERSION_ID_V1 && version != VERSION_ID_V2) {
             throw new RuntimeException(
                     "Version not match, actual version: "
                             + version
-                            + ", expert version: "
-                            + VERSION_ID_V1);
+                            + ", expected version: "
+                            + VERSION_ID_V1
+                            + " or "
+                            + VERSION_ID_V2);
+        }
+        return version;
+    }
+
+    private DeletionVector readDeletionVector(
+            DataInputStream inputStream, int size, int readVersion) {
+        if (readVersion == VERSION_ID_V1) {
+            return readV1DeletionVector(inputStream, size);
+        } else if (readVersion == VERSION_ID_V2) {
+            return readV2DeletionVector(inputStream, size);
+        } else {
+            throw new RuntimeException("Unsupported DeletionVector version: " 
+ writeVersionID);
         }
     }
 
-    private DeletionVector readDeletionVector(DataInputStream inputStream, int 
size) {
+    private DeletionVector readV1DeletionVector(DataInputStream inputStream, 
int size) {
         try {
             // check size
             int actualSize = inputStream.readInt();
@@ -196,6 +226,17 @@ public class DeletionVectorsIndexFile extends IndexFile {
         }
     }
 
+    private DeletionVector readV2DeletionVector(DataInputStream inputStream, 
int size) {
+        try {
+            byte[] bytes = new byte[size];
+            inputStream.readFully(bytes);
+
+            return Bitmap64DeletionVector.deserializeFromBytes(bytes);
+        } catch (IOException e) {
+            throw new UncheckedIOException("Unable to read deletion vector", 
e);
+        }
+    }
+
     public static int calculateChecksum(byte[] bytes) {
         CRC32 crc = new CRC32();
         crc.update(bytes);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorTest.java
index d2b42b7986..a742c1c309 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorTest.java
@@ -22,6 +22,7 @@ import org.junit.jupiter.api.Test;
 
 import java.util.HashSet;
 import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -62,4 +63,75 @@ public class DeletionVectorTest {
             assertThat(deserializedDeletionVector.isDeleted(i)).isFalse();
         }
     }
+
+    @Test
+    public void testBitmap64DeletionVector() {
+        HashSet<Long> toDelete = new HashSet<>();
+        for (int i = 0; i < 10000; i++) {
+            
toDelete.add(ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE * 2L));
+        }
+        HashSet<Integer> notDelete = new HashSet<>();
+        for (int i = 0; i < 10000; i++) {
+            if (!toDelete.contains(i)) {
+                notDelete.add(i);
+            }
+        }
+
+        DeletionVector deletionVector = new Bitmap64DeletionVector();
+        assertThat(deletionVector.isEmpty()).isTrue();
+
+        for (Long i : toDelete) {
+            assertThat(deletionVector.checkedDelete(i)).isTrue();
+            assertThat(deletionVector.checkedDelete(i)).isFalse();
+        }
+        DeletionVector deserializedDeletionVector =
+                
Bitmap64DeletionVector.deserializeFromBytes(deletionVector.serializeToBytes());
+
+        assertThat(deletionVector.isEmpty()).isFalse();
+        assertThat(deserializedDeletionVector.isEmpty()).isFalse();
+        for (Long i : toDelete) {
+            assertThat(deletionVector.isDeleted(i)).isTrue();
+            assertThat(deserializedDeletionVector.isDeleted(i)).isTrue();
+        }
+        for (Integer i : notDelete) {
+            assertThat(deletionVector.isDeleted(i)).isFalse();
+            assertThat(deserializedDeletionVector.isDeleted(i)).isFalse();
+        }
+    }
+
+    @Test
+    public void testBitmapDeletionVectorTo64() {
+        HashSet<Integer> toDelete = new HashSet<>();
+        for (int i = 0; i < 10000; i++) {
+            
toDelete.add(ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE));
+        }
+        HashSet<Integer> notDelete = new HashSet<>();
+        for (int i = 0; i < 10000; i++) {
+            if (!toDelete.contains(i)) {
+                notDelete.add(i);
+            }
+        }
+
+        BitmapDeletionVector deletionVector = new BitmapDeletionVector();
+        assertThat(deletionVector.isEmpty()).isTrue();
+
+        for (Integer i : toDelete) {
+            assertThat(deletionVector.checkedDelete(i)).isTrue();
+            assertThat(deletionVector.checkedDelete(i)).isFalse();
+        }
+
+        DeletionVector bitmap64DeletionVector =
+                
Bitmap64DeletionVector.fromBitmapDeletionVector(deletionVector);
+
+        assertThat(bitmap64DeletionVector.isEmpty()).isFalse();
+
+        for (Integer i : toDelete) {
+            assertThat(deletionVector.isDeleted(i)).isTrue();
+            assertThat(bitmap64DeletionVector.isDeleted(i)).isTrue();
+        }
+        for (Integer i : notDelete) {
+            assertThat(deletionVector.isDeleted(i)).isFalse();
+            assertThat(bitmap64DeletionVector.isDeleted(i)).isFalse();
+        }
+    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
index a4dd8f92c1..a023e82639 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/deletionvectors/DeletionVectorsIndexFileTest.java
@@ -26,12 +26,16 @@ import org.apache.paimon.utils.PathFactory;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -40,26 +44,26 @@ public class DeletionVectorsIndexFileTest {
 
     @TempDir java.nio.file.Path tempPath;
 
-    @Test
-    public void testReadDvIndex() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testReadDvIndex(boolean isV2) {
         PathFactory pathFactory = getPathFactory();
 
         DeletionVectorsIndexFile deletionVectorsIndexFile =
-                new DeletionVectorsIndexFile(
-                        LocalFileIO.create(), pathFactory, 
MemorySize.ofBytes(Long.MAX_VALUE));
+                deletionVectorsIndexFile(pathFactory, isV2);
 
         // write
         HashMap<String, DeletionVector> deleteMap = new HashMap<>();
-        BitmapDeletionVector index1 = new BitmapDeletionVector();
+        DeletionVector index1 = createEmptyDV(isV2);
         index1.delete(1);
         deleteMap.put("file1.parquet", index1);
 
-        BitmapDeletionVector index2 = new BitmapDeletionVector();
+        DeletionVector index2 = createEmptyDV(isV2);
         index2.delete(2);
         index2.delete(3);
         deleteMap.put("file2.parquet", index2);
 
-        BitmapDeletionVector index3 = new BitmapDeletionVector();
+        DeletionVector index3 = createEmptyDV(isV2);
         index3.delete(3);
         deleteMap.put("file33.parquet", index3);
 
@@ -81,20 +85,23 @@ public class DeletionVectorsIndexFileTest {
         assertThat(deletionVectorsIndexFile.exists(fileName)).isFalse();
     }
 
-    @Test
-    public void testReadDvIndexWithCopiousDv() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testReadDvIndexWithCopiousDv(boolean isV2) {
         PathFactory pathFactory = getPathFactory();
         DeletionVectorsIndexFile deletionVectorsIndexFile =
-                new DeletionVectorsIndexFile(
-                        LocalFileIO.create(), pathFactory, 
MemorySize.ofBytes(Long.MAX_VALUE));
+                deletionVectorsIndexFile(pathFactory, isV2);
 
         // write
         Random random = new Random();
         HashMap<String, DeletionVector> deleteMap = new HashMap<>();
+        HashMap<String, Integer> deleteInteger = new HashMap<>();
         for (int i = 0; i < 100000; i++) {
-            BitmapDeletionVector index = new BitmapDeletionVector();
-            index.delete(random.nextInt(1000000));
+            DeletionVector index = createEmptyDV(isV2);
+            int num = random.nextInt(1000000);
+            index.delete(num);
             deleteMap.put(String.format("file%s.parquet", i), index);
+            deleteInteger.put(String.format("file%s.parquet", i), num);
         }
 
         // read
@@ -103,21 +110,26 @@ public class DeletionVectorsIndexFileTest {
         Map<String, DeletionVector> dvs =
                 deletionVectorsIndexFile.readAllDeletionVectors(indexFiles);
         assertThat(dvs.size()).isEqualTo(100000);
+        for (String file : dvs.keySet()) {
+            int delete = deleteInteger.get(file);
+            assertThat(dvs.get(file).isDeleted(delete)).isTrue();
+            assertThat(dvs.get(file).isDeleted(delete + 1)).isFalse();
+        }
     }
 
-    @Test
-    public void testReadDvIndexWithEnormousDv() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testReadDvIndexWithEnormousDv(boolean isV2) {
         PathFactory pathFactory = getPathFactory();
         DeletionVectorsIndexFile deletionVectorsIndexFile =
-                new DeletionVectorsIndexFile(
-                        LocalFileIO.create(), pathFactory, 
MemorySize.ofBytes(Long.MAX_VALUE));
+                deletionVectorsIndexFile(pathFactory, isV2);
 
         // write
         Random random = new Random();
         Map<String, DeletionVector> fileToDV = new HashMap<>();
         Map<String, Long> fileToCardinality = new HashMap<>();
         for (int i = 0; i < 5; i++) {
-            BitmapDeletionVector index = new BitmapDeletionVector();
+            DeletionVector index = createEmptyDV(isV2);
             // the size of dv index file is about 20M
             for (int j = 0; j < 10000000; j++) {
                 index.delete(random.nextInt(Integer.MAX_VALUE));
@@ -137,19 +149,19 @@ public class DeletionVectorsIndexFileTest {
         }
     }
 
-    @Test
-    public void testWriteDVIndexWithLimitedTargetSizePerIndexFile() {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testWriteDVIndexWithLimitedTargetSizePerIndexFile(boolean 
isV2) {
         PathFactory pathFactory = getPathFactory();
         DeletionVectorsIndexFile deletionVectorsIndexFile =
-                new DeletionVectorsIndexFile(
-                        LocalFileIO.create(), pathFactory, 
MemorySize.parse("2MB"));
+                deletionVectorsIndexFile(pathFactory, MemorySize.parse("2MB"), 
isV2);
 
         // write1
         Random random = new Random();
         Map<String, DeletionVector> fileToDV = new HashMap<>();
         Map<String, Long> fileToCardinality = new HashMap<>();
         for (int i = 0; i < 5; i++) {
-            BitmapDeletionVector index = new BitmapDeletionVector();
+            DeletionVector index = createEmptyDV(isV2);
             // the size of dv index file is about 1.7M
             for (int j = 0; j < 750000; j++) {
                 index.delete(random.nextInt(Integer.MAX_VALUE));
@@ -171,7 +183,7 @@ public class DeletionVectorsIndexFileTest {
         fileToDV.clear();
         fileToCardinality.clear();
         for (int i = 0; i < 10; i++) {
-            BitmapDeletionVector index = new BitmapDeletionVector();
+            DeletionVector index = createEmptyDV(isV2);
             // the size of dv index file is about 0.42M
             for (int j = 0; j < 100000; j++) {
                 index.delete(random.nextInt(Integer.MAX_VALUE));
@@ -189,6 +201,87 @@ public class DeletionVectorsIndexFileTest {
         }
     }
 
+    @Test
+    public void testReadV1AndV2() {
+        PathFactory pathFactory = getPathFactory();
+        DeletionVectorsIndexFile v1DeletionVectorsIndexFile =
+                deletionVectorsIndexFile(pathFactory, false);
+        DeletionVectorsIndexFile v2DeletionVectorsIndexFile =
+                deletionVectorsIndexFile(pathFactory, true);
+
+        // write v1 dv
+        Random random = new Random();
+        HashMap<String, Integer> deleteInteger = new HashMap<>();
+
+        HashMap<String, DeletionVector> deleteMap1 = new HashMap<>();
+        for (int i = 0; i < 50000; i++) {
+            DeletionVector index = createEmptyDV(false);
+            int num = random.nextInt(1000000);
+            index.delete(num);
+            deleteMap1.put(String.format("file%s.parquet", i), index);
+            deleteInteger.put(String.format("file%s.parquet", i), num);
+        }
+        List<IndexFileMeta> indexFiles1 = 
v1DeletionVectorsIndexFile.write(deleteMap1);
+        assertThat(indexFiles1.size()).isEqualTo(1);
+
+        // write v2 dv
+        HashMap<String, DeletionVector> deleteMap2 = new HashMap<>();
+        for (int i = 50000; i < 100000; i++) {
+            DeletionVector index = createEmptyDV(true);
+            int num = random.nextInt(1000000);
+            index.delete(num);
+            deleteMap2.put(String.format("file%s.parquet", i), index);
+            deleteInteger.put(String.format("file%s.parquet", i), num);
+        }
+        List<IndexFileMeta> indexFiles2 = 
v2DeletionVectorsIndexFile.write(deleteMap2);
+        assertThat(indexFiles2.size()).isEqualTo(1);
+
+        List<IndexFileMeta> totalIndexFiles =
+                Stream.concat(indexFiles1.stream(), indexFiles2.stream())
+                        .collect(Collectors.toList());
+        // read when writeVersionID is V1
+        Map<String, DeletionVector> dvs1 =
+                
v1DeletionVectorsIndexFile.readAllDeletionVectors(totalIndexFiles);
+        assertThat(dvs1.size()).isEqualTo(100000);
+        for (String file : dvs1.keySet()) {
+            int delete = deleteInteger.get(file);
+            assertThat(dvs1.get(file).isDeleted(delete)).isTrue();
+            assertThat(dvs1.get(file).isDeleted(delete + 1)).isFalse();
+        }
+
+        // read when writeVersionID is V2
+        Map<String, DeletionVector> dvs2 =
+                
v2DeletionVectorsIndexFile.readAllDeletionVectors(totalIndexFiles);
+        assertThat(dvs2.size()).isEqualTo(100000);
+    }
+
+    private DeletionVector createEmptyDV(boolean isV2) {
+        if (isV2) {
+            return new Bitmap64DeletionVector();
+        }
+        return new BitmapDeletionVector();
+    }
+
+    private DeletionVectorsIndexFile deletionVectorsIndexFile(
+            PathFactory pathFactory, boolean isV2) {
+
+        return deletionVectorsIndexFile(pathFactory, 
MemorySize.ofBytes(Long.MAX_VALUE), isV2);
+    }
+
+    private DeletionVectorsIndexFile deletionVectorsIndexFile(
+            PathFactory pathFactory, MemorySize targetSizePerIndexFile, 
boolean isV2) {
+        if (isV2) {
+            return new DeletionVectorsIndexFile(
+                    LocalFileIO.create(),
+                    pathFactory,
+                    targetSizePerIndexFile,
+                    DeletionVectorsIndexFile.VERSION_ID_V2);
+        } else {
+            return new DeletionVectorsIndexFile(
+                    LocalFileIO.create(), pathFactory, targetSizePerIndexFile);
+        }
+    }
+
     private PathFactory getPathFactory() {
         Path dir = new Path(tempPath.toUri());
         return new PathFactory() {


Reply via email to