This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 69b9f0dc36 [Python] Support read deletion vector pk table (#6766)
69b9f0dc36 is described below

commit 69b9f0dc363f1b9e8d67b3b32d36a947ccfe33aa
Author: umi <[email protected]>
AuthorDate: Tue Dec 9 15:08:04 2025 +0800

    [Python] Support read deletion vector pk table (#6766)
---
 .github/workflows/paimon-python-checks.yml         |   4 +-
 .../test/java/org/apache/paimon/JavaPyE2ETest.java | 213 +++++++++++++++++++--
 paimon-python/dev/cfg.ini                          |   2 +-
 paimon-python/dev/requirements.txt                 |   1 +
 paimon-python/dev/run_mixed_tests.sh               |  36 +++-
 paimon-python/pypaimon/deletionvectors/__init__.py |  27 +++
 .../apply_deletion_vector_reader.py                | 129 +++++++++++++
 .../deletionvectors/bitmap_deletion_vector.py      | 165 ++++++++++++++++
 .../pypaimon/deletionvectors/deletion_vector.py    | 142 ++++++++++++++
 .../pypaimon/index/deletion_vector_meta.py         |  40 ++++
 paimon-python/pypaimon/index/index_file_meta.py    |  48 +++++
 .../pypaimon/manifest/index_manifest_entry.py      |  62 ++++++
 .../pypaimon/manifest/index_manifest_file.py       |  95 +++++++++
 .../pypaimon/read/reader/concat_batch_reader.py    |   2 +-
 .../pypaimon/read/reader/concat_record_reader.py   |   2 +-
 .../read/reader/iface/record_batch_reader.py       |  61 +++++-
 .../pypaimon/read/reader/iface/record_iterator.py  |   5 +
 .../pypaimon/read/reader/iface/record_reader.py    |   4 +-
 .../pypaimon/read/reader/key_value_wrap_reader.py  |   7 +-
 .../pypaimon/read/scanner/full_starting_scanner.py | 145 ++++++++++++--
 paimon-python/pypaimon/read/split.py               |   4 +-
 paimon-python/pypaimon/read/split_read.py          |  59 ++++--
 paimon-python/pypaimon/table/row/generic_row.py    |   4 +
 .../pypaimon/table/source/deletion_file.py         |  49 +++++
 .../pypaimon/tests/e2e/java_py_read_write_test.py  |  69 +++++++
 25 files changed, 1317 insertions(+), 58 deletions(-)

diff --git a/.github/workflows/paimon-python-checks.yml 
b/.github/workflows/paimon-python-checks.yml
index e6a10e8949..6df806a4f4 100755
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -91,10 +91,10 @@ jobs:
           if [[ "${{ matrix.python-version }}" == "3.6.15" ]]; then
             python -m pip install --upgrade pip==21.3.1
             python --version
-            python -m pip install -q readerwriterlock==1.0.9 
'fsspec==2021.10.1' 'cachetools==4.2.4' 'ossfs==2021.8.0' pyarrow==6.0.1 
pandas==1.1.5 'polars==0.9.12' 'fastavro==1.4.7' zstandard==0.19.0 
dataclasses==0.8.0 flake8 pytest py4j==0.10.9.9 requests parameterized==0.8.1 
2>&1 >/dev/null
+            python -m pip install -q pyroaring readerwriterlock==1.0.9 
'fsspec==2021.10.1' 'cachetools==4.2.4' 'ossfs==2021.8.0' pyarrow==6.0.1 
pandas==1.1.5 'polars==0.9.12' 'fastavro==1.4.7' zstandard==0.19.0 
dataclasses==0.8.0 flake8 pytest py4j==0.10.9.9 requests parameterized==0.8.1 
2>&1 >/dev/null
           else
             python -m pip install --upgrade pip
-            python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1 
cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0 
zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3 
pylance==0.39.0 flake8==4.0.1 pytest~=7.0 py4j==0.10.9.9 requests 
parameterized==0.9.0 2>&1 >/dev/null
+            python -m pip install -q pyroaring readerwriterlock==1.0.9 
fsspec==2024.3.1 cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 
fastavro==1.11.1 pyarrow==16.0.0 zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 
numpy==1.24.3 pandas==2.0.3 pylance==0.39.0 flake8==4.0.1 pytest~=7.0 
py4j==0.10.9.9 requests parameterized==0.9.0 2>&1 >/dev/null
           fi
       - name: Run lint-python.sh
         shell: bash
diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java 
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index df0c8c7360..39c2011bf4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -26,15 +26,29 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.DataFormatTestUtil;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.PrimaryKeyFileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.table.sink.InnerTableCommit;
+import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.TraceableFileIO;
 
 import org.junit.jupiter.api.BeforeEach;
@@ -44,9 +58,17 @@ import 
org.junit.jupiter.api.condition.EnabledIfSystemProperty;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
+import static org.apache.paimon.CoreOptions.TARGET_FILE_SIZE;
+import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
 import static org.apache.paimon.table.SimpleTableTestBase.getResult;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -102,12 +124,12 @@ public class JavaPyE2ETest {
         try (StreamTableWrite write = fileStoreTable.newWrite(commitUser);
                 InnerTableCommit commit = 
fileStoreTable.newCommit(commitUser)) {
 
-            write.write(createRow(1, "Apple", "Fruit", 1.5));
-            write.write(createRow(2, "Banana", "Fruit", 0.8));
-            write.write(createRow(3, "Carrot", "Vegetable", 0.6));
-            write.write(createRow(4, "Broccoli", "Vegetable", 1.2));
-            write.write(createRow(5, "Chicken", "Meat", 5.0));
-            write.write(createRow(6, "Beef", "Meat", 8.0));
+            write.write(createRow4Cols(1, "Apple", "Fruit", 1.5));
+            write.write(createRow4Cols(2, "Banana", "Fruit", 0.8));
+            write.write(createRow4Cols(3, "Carrot", "Vegetable", 0.6));
+            write.write(createRow4Cols(4, "Broccoli", "Vegetable", 1.2));
+            write.write(createRow4Cols(5, "Chicken", "Meat", 5.0));
+            write.write(createRow4Cols(6, "Beef", "Meat", 8.0));
 
             commit.commit(0, write.prepareCommit(true, 0));
         }
@@ -170,12 +192,12 @@ public class JavaPyE2ETest {
         try (StreamTableWrite write = fileStoreTable.newWrite(commitUser);
                 InnerTableCommit commit = 
fileStoreTable.newCommit(commitUser)) {
 
-            write.write(createRow(1, "Apple", "Fruit", 1.5));
-            write.write(createRow(2, "Banana", "Fruit", 0.8));
-            write.write(createRow(3, "Carrot", "Vegetable", 0.6));
-            write.write(createRow(4, "Broccoli", "Vegetable", 1.2));
-            write.write(createRow(5, "Chicken", "Meat", 5.0));
-            write.write(createRow(6, "Beef", "Meat", 8.0));
+            write.write(createRow4Cols(1, "Apple", "Fruit", 1.5));
+            write.write(createRow4Cols(2, "Banana", "Fruit", 0.8));
+            write.write(createRow4Cols(3, "Carrot", "Vegetable", 0.6));
+            write.write(createRow4Cols(4, "Broccoli", "Vegetable", 1.2));
+            write.write(createRow4Cols(5, "Chicken", "Meat", 5.0));
+            write.write(createRow4Cols(6, "Beef", "Meat", 8.0));
 
             commit.commit(0, write.prepareCommit(true, 0));
         }
@@ -198,6 +220,140 @@ public class JavaPyE2ETest {
                         "6, Beef, Meat, 8.0");
     }
 
+    @Test
+    @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+    public void testPKDeletionVectorWrite() throws Exception {
+        Consumer<Options> optionsSetter =
+                options -> {
+                    // let level has many files
+                    options.set(TARGET_FILE_SIZE, new MemorySize(1));
+                    options.set(DELETION_VECTORS_ENABLED, true);
+                };
+        String tableName = "test_pk_dv";
+        Path tablePath = new Path(warehouse.toString() + "/default.db/" + 
tableName);
+        FileStoreTable table = createFileStoreTable(optionsSetter, tablePath);
+        StreamTableWrite write = table.newWrite(commitUser);
+        IOManager ioManager = IOManager.create(tablePath.toString());
+        write.withIOManager(ioManager);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        write.write(createRow3Cols(1, 10, 100L));
+        write.write(createRow3Cols(2, 20, 200L));
+        write.write(createRow3Cols(1, 11, 101L));
+        commit.commit(0, write.prepareCommit(true, 0));
+
+        write.write(createRow3Cols(1, 10, 1000L));
+        write.write(createRow3Cols(2, 21, 201L));
+        write.write(createRow3Cols(2, 21, 2001L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        write.write(createRow3Cols(1, 11, 1001L));
+        write.write(createRow3Cols(2, 21, 20001L));
+        write.write(createRow3Cols(2, 22, 202L));
+        write.write(createRow3ColsWithKind(RowKind.DELETE, 1, 11, 1001L));
+        commit.commit(2, write.prepareCommit(true, 2));
+        write.write(createRow3ColsWithKind(RowKind.DELETE, 2, 20, 200L));
+        commit.commit(2, write.prepareCommit(true, 2));
+
+        // test result
+        Function<InternalRow, String> rowDataToString =
+                row ->
+                        internalRowToString(
+                                row,
+                                DataTypes.ROW(
+                                        DataTypes.INT(), DataTypes.INT(), 
DataTypes.BIGINT()));
+        List<String> result =
+                getResult(table.newRead(), table.newScan().plan().splits(), 
rowDataToString);
+        assertThat(result)
+                .containsExactlyInAnyOrder("+I[1, 10, 1000]", "+I[2, 21, 
20001]", "+I[2, 22, 202]");
+    }
+
+    @Test
+    @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+    public void testPKDeletionVectorWriteMultiBatch() throws Exception {
+        Consumer<Options> optionsSetter =
+                options -> {
+                    // let level has many files
+                    options.set(TARGET_FILE_SIZE, new MemorySize(128 * 1024));
+                    options.set(DELETION_VECTORS_ENABLED, true);
+                };
+        String tableName = "test_pk_dv_multi_batch";
+        Path tablePath = new Path(warehouse.toString() + "/default.db/" + 
tableName);
+        FileStoreTable table = createFileStoreTable(optionsSetter, tablePath);
+        StreamTableWrite write = table.newWrite(commitUser);
+        IOManager ioManager = IOManager.create(tablePath.toString());
+        write.withIOManager(ioManager);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        // Write 10000 records
+        for (int i = 1; i <= 10000; i++) {
+            write.write(createRow3Cols(1, i * 10, (long) i * 100));
+        }
+        commit.commit(0, write.prepareCommit(false, 0));
+
+        // Delete the 81930th record
+        write.write(createRow3ColsWithKind(RowKind.DELETE, 1, 81930, 819300L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        Function<InternalRow, String> rowDataToString =
+                row ->
+                        internalRowToString(
+                                row,
+                                DataTypes.ROW(
+                                        DataTypes.INT(), DataTypes.INT(), 
DataTypes.BIGINT()));
+        List<String> result =
+                getResult(table.newRead(), table.newScan().plan().splits(), 
rowDataToString);
+
+        // Verify the count is 9999
+        assertThat(result).hasSize(9999);
+
+        assertThat(result).doesNotContain("+I[1, 81930, 819300]");
+
+        assertThat(result).contains("+I[1, 10, 100]");
+        assertThat(result).contains("+I[1, 100000, 1000000]");
+    }
+
+    @Test
+    @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+    public void testPKDeletionVectorWriteMultiBatchRawConvertable() throws 
Exception {
+        Consumer<Options> optionsSetter =
+                options -> {
+                    options.set(DELETION_VECTORS_ENABLED, true);
+                };
+        String tableName = "test_pk_dv_raw_convertable";
+        Path tablePath = new Path(warehouse.toString() + "/default.db/" + 
tableName);
+        FileStoreTable table = createFileStoreTable(optionsSetter, tablePath);
+        StreamTableWrite write = table.newWrite(commitUser);
+        IOManager ioManager = IOManager.create(tablePath.toString());
+        write.withIOManager(ioManager);
+        StreamTableCommit commit = table.newCommit(commitUser);
+
+        for (int i = 1; i <= 10000; i++) {
+            write.write(createRow3Cols(1, i * 10, (long) i * 100));
+        }
+        commit.commit(0, write.prepareCommit(false, 0));
+
+        write.write(createRow3ColsWithKind(RowKind.DELETE, 1, 81930, 819300L));
+        commit.commit(1, write.prepareCommit(true, 1));
+
+        Function<InternalRow, String> rowDataToString =
+                row ->
+                        internalRowToString(
+                                row,
+                                DataTypes.ROW(
+                                        DataTypes.INT(), DataTypes.INT(), 
DataTypes.BIGINT()));
+        List<String> result =
+                getResult(table.newRead(), table.newScan().plan().splits(), 
rowDataToString);
+
+        assertThat(result).hasSize(9999);
+
+        assertThat(result).doesNotContain("+I[1, 81930, 819300]");
+
+        // Verify some sample records exist
+        assertThat(result).contains("+I[1, 10, 100]");
+        assertThat(result).contains("+I[1, 100000, 1000000]");
+    }
+
     @Test
     @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
     public void testReadPkTable() throws Exception {
@@ -228,8 +384,39 @@ public class JavaPyE2ETest {
         return new Identifier(database, tableName);
     }
 
-    private static InternalRow createRow(int id, String name, String category, 
double value) {
+    protected FileStoreTable createFileStoreTable(Consumer<Options> configure, 
Path tablePath)
+            throws Exception {
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT(), DataTypes.INT(), 
DataTypes.BIGINT()},
+                        new String[] {"pt", "a", "b"});
+        Options options = new Options();
+        options.set(CoreOptions.PATH, tablePath.toString());
+        options.set(BUCKET, 1);
+        configure.accept(options);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), tablePath),
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.singletonList("pt"),
+                                Arrays.asList("pt", "a"),
+                                options.toMap(),
+                                ""));
+        return new PrimaryKeyFileStoreTable(
+                FileIOFinder.find(tablePath), tablePath, tableSchema, 
CatalogEnvironment.empty());
+    }
+
+    private static InternalRow createRow4Cols(int id, String name, String 
category, double value) {
         return GenericRow.of(
                 id, BinaryString.fromString(name), 
BinaryString.fromString(category), value);
     }
+
+    protected GenericRow createRow3Cols(Object... values) {
+        return GenericRow.of(values[0], values[1], values[2]);
+    }
+
+    protected GenericRow createRow3ColsWithKind(RowKind rowKind, Object... 
values) {
+        return GenericRow.ofKind(rowKind, values[0], values[1], values[2]);
+    }
 }
diff --git a/paimon-python/dev/cfg.ini b/paimon-python/dev/cfg.ini
index c90c2f61db..ce776f3a73 100644
--- a/paimon-python/dev/cfg.ini
+++ b/paimon-python/dev/cfg.ini
@@ -19,7 +19,7 @@
 [flake8]
 # We follow PEP 8 (https://www.python.org/dev/peps/pep-0008/) with one 
exception: lines can be
 # up to 100 characters in length, not 79.
-ignore=E226,E241,E305,E402,E722,E731,E741,W503,W504,F821
+ignore=E226,E241,E305,E402,E722,E731,E741,W503,W504,W293,F821
 max-line-length=120
 exclude=.tox/*,dev/*,build/*,dist/*
 # autopep8 setting
diff --git a/paimon-python/dev/requirements.txt 
b/paimon-python/dev/requirements.txt
index f5612b9811..40025a308f 100644
--- a/paimon-python/dev/requirements.txt
+++ b/paimon-python/dev/requirements.txt
@@ -36,6 +36,7 @@ polars==1.32.0; python_version>"3.8"
 pyarrow==6.0.1; python_version < "3.8"
 pyarrow>=16,<19; python_version >= "3.8" and python_version < "3.13"
 pyarrow>=16,<19; python_version >= "3.13"
+pyroaring
 ray==2.48.0
 readerwriterlock==1.0.9
 zstandard==0.19.0; python_version<"3.9"
diff --git a/paimon-python/dev/run_mixed_tests.sh 
b/paimon-python/dev/run_mixed_tests.sh
index 404d7cb888..337b694e0c 100755
--- a/paimon-python/dev/run_mixed_tests.sh
+++ b/paimon-python/dev/run_mixed_tests.sh
@@ -168,13 +168,37 @@ run_java_read_test() {
         return 1
     fi
 }
+run_pk_dv_test() {
+    echo -e "${YELLOW}=== Step 5: Running Primary Key & Deletion Vector Test 
(testPKDeletionVectorWriteRead) ===${NC}"
 
+    cd "$PROJECT_ROOT"
+
+    # Run the specific Java test method
+    echo "Running Maven test for JavaPyE2ETest.testPKDeletionVectorWrite..."
+    if mvn test 
-Dtest=org.apache.paimon.JavaPyE2ETest#testPKDeletionVectorWrite -pl 
paimon-core -q -Drun.e2e.tests=true; then
+        echo -e "${GREEN}✓ Java test completed successfully${NC}"
+    else
+        echo -e "${RED}✗ Java test failed${NC}"
+        return 1
+    fi
+    cd "$PAIMON_PYTHON_DIR"
+    # Run the specific Python test method
+    echo "Running Python test for JavaPyReadWriteTest.test_pk_dv_read..."
+    if python -m pytest 
java_py_read_write_test.py::JavaPyReadWriteTest::test_pk_dv_read -v; then
+        echo -e "${GREEN}✓ Python test completed successfully${NC}"
+        return 0
+    else
+        echo -e "${RED}✗ Python test failed${NC}"
+        return 1
+    fi
+}
 # Main execution
 main() {
     local java_write_result=0
     local python_read_result=0
     local python_write_result=0
     local java_read_result=0
+    local pk_dv_result=0
 
     echo -e "${YELLOW}Starting mixed language test execution...${NC}"
     echo ""
@@ -213,6 +237,10 @@ main() {
         java_read_result=1
     fi
 
+    # Run pk dv read test
+    if ! run_pk_dv_test; then
+        pk_dv_result=1
+    fi
     echo ""
     echo -e "${YELLOW}=== Test Results Summary ===${NC}"
 
@@ -240,12 +268,18 @@ main() {
         echo -e "${RED}✗ Java Read Test (Parquet + Lance): FAILED${NC}"
     fi
 
+     if [[ $pk_dv_result -eq 0 ]]; then
+          echo -e "${GREEN}✓ PK DV Test 
(JavaPyReadWriteTest.testPKDeletionVectorWriteRead): PASSED${NC}"
+      else
+          echo -e "${RED}✗ PK DV Test 
(JavaPyReadWriteTest.testPKDeletionVectorWriteRead): FAILED${NC}"
+      fi
+
     echo ""
 
     # Clean up warehouse directory after all tests
     cleanup_warehouse
 
-    if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && 
$python_write_result -eq 0 && $java_read_result -eq 0 ]]; then
+    if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && 
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 
]]; then
         echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability 
verified.${NC}"
         return 0
     else
diff --git a/paimon-python/pypaimon/deletionvectors/__init__.py 
b/paimon-python/pypaimon/deletionvectors/__init__.py
new file mode 100644
index 0000000000..92b307357e
--- /dev/null
+++ b/paimon-python/pypaimon/deletionvectors/__init__.py
@@ -0,0 +1,27 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+from pypaimon.deletionvectors.deletion_vector import DeletionVector
+from pypaimon.deletionvectors.bitmap_deletion_vector import 
BitmapDeletionVector
+from pypaimon.deletionvectors.apply_deletion_vector_reader import 
ApplyDeletionVectorReader, ApplyDeletionRecordIterator
+
+__all__ = [
+    'DeletionVector',
+    'BitmapDeletionVector',
+    'ApplyDeletionVectorReader',
+    'ApplyDeletionRecordIterator'
+]
diff --git 
a/paimon-python/pypaimon/deletionvectors/apply_deletion_vector_reader.py 
b/paimon-python/pypaimon/deletionvectors/apply_deletion_vector_reader.py
new file mode 100644
index 0000000000..c9a143bdc9
--- /dev/null
+++ b/paimon-python/pypaimon/deletionvectors/apply_deletion_vector_reader.py
@@ -0,0 +1,129 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+from typing import Optional
+
+import pyarrow
+from pyarrow import RecordBatch
+
+from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.read.reader.iface.record_iterator import RecordIterator
+from pypaimon.deletionvectors.deletion_vector import DeletionVector
+
+from pyroaring import BitMap
+
+from pypaimon.read.reader.iface.record_reader import RecordReader
+
+
+class ApplyDeletionVectorReader(RecordBatchReader):
+    """
+    A RecordReader which applies DeletionVector to filter records.
+    """
+
+    def __init__(self, reader: RecordReader, deletion_vector: DeletionVector):
+        """
+        Initialize an ApplyDeletionVectorReader.
+
+        Args:
+            reader: The underlying record reader.
+            deletion_vector: The deletion vector to apply.
+        """
+        self._reader = reader
+        self._deletion_vector = deletion_vector
+
+    def reader(self) -> RecordReader:
+        return self._reader
+
+    def deletion_vector(self) -> DeletionVector:
+        return self._deletion_vector
+
+    def read_arrow_batch(self) -> Optional[RecordBatch]:
+        self._reader: RecordBatchReader
+        arrow_batch = self._reader.read_arrow_batch()
+        if arrow_batch is None:
+            return None
+        # Remove the deleted rows from the batch
+        range_bitmap = BitMap(
+            range(self._reader.return_batch_pos() - arrow_batch.num_rows, 
self._reader.return_batch_pos()))
+        intersection_bitmap = range_bitmap - self._deletion_vector.bit_map()
+        added_row_list = [x - (self._reader.return_batch_pos() - 
arrow_batch.num_rows) for x in
+                          list(intersection_bitmap)]
+        return arrow_batch.take(pyarrow.array(added_row_list, 
type=pyarrow.int32()))
+
+    def read_batch(self) -> Optional[RecordIterator]:
+        """
+        Reads one batch with deletion vector applied.
+
+        Returns:
+            A RecordIterator with deletion filtering, or None if no more data.
+        """
+        batch = self._reader.read_batch()
+
+        if batch is None:
+            return None
+
+        return ApplyDeletionRecordIterator(batch, self._deletion_vector)
+
+    def close(self):
+        self._reader.close()
+
+
+class ApplyDeletionRecordIterator(RecordIterator):
+    """
+    A RecordIterator that wraps another RecordIterator and applies a 
DeletionVector
+    to filter out deleted records.
+    """
+
+    def __init__(self, iterator: RecordIterator, deletion_vector: 
DeletionVector):
+        """
+        Initialize an ApplyDeletionRecordIterator.
+
+        Args:
+            iterator: The underlying record iterator.
+            deletion_vector: The deletion vector to apply for filtering.
+        """
+        self._iterator = iterator
+        self._deletion_vector = deletion_vector
+
+    def iterator(self) -> RecordIterator:
+        return self._iterator
+
+    def deletion_vector(self) -> DeletionVector:
+        return self._deletion_vector
+
+    def returned_position(self) -> int:
+        return self._iterator.return_pos()
+
+    def next(self) -> Optional[object]:
+        """
+        Gets the next non-deleted record from the iterator.
+
+        This method skips over any records that are marked as deleted in the
+        deletion vector, returning only non-deleted records.
+
+        Returns:
+            The next non-deleted record, or None if no more records exist.
+        """
+        while True:
+            record = self._iterator.next()
+
+            if record is None:
+                return None
+
+            # Check if the current position is deleted
+            if not 
self._deletion_vector.is_deleted(self._iterator.return_pos()):
+                return record
diff --git a/paimon-python/pypaimon/deletionvectors/bitmap_deletion_vector.py 
b/paimon-python/pypaimon/deletionvectors/bitmap_deletion_vector.py
new file mode 100644
index 0000000000..0afbe642df
--- /dev/null
+++ b/paimon-python/pypaimon/deletionvectors/bitmap_deletion_vector.py
@@ -0,0 +1,165 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+from pypaimon.deletionvectors.deletion_vector import DeletionVector
+import struct
+import zlib
+from pyroaring import BitMap
+
+
+class BitmapDeletionVector(DeletionVector):
+    """
+    A DeletionVector based on RoaringBitmap, it only supports files with row 
count
+    not exceeding 2147483647 (max value for 32-bit integer).
+    """
+
+    MAGIC_NUMBER = 1581511376
+    MAGIC_NUMBER_SIZE_BYTES = 4
+    MAX_VALUE = 2147483647
+
+    def __init__(self, bitmap: BitMap = None):
+        """
+        Initialize a BitmapDeletionVector.
+
+        Args:
+            bitmap: Optional RoaringBitmap instance. If None, creates an empty 
bitmap.
+        """
+        self._bitmap = bitmap if bitmap is not None else BitMap()
+
+    def delete(self, position: int) -> None:
+        """
+        Marks the row at the specified position as deleted.
+
+        Args:
+            position: The position of the row to be marked as deleted.
+        """
+        self._check_position(position)
+        self._bitmap.add(position)
+
+    def is_deleted(self, position: int) -> bool:
+        """
+        Checks if the row at the specified position is deleted.
+        
+        Args:
+            position: The position of the row to check.
+
+        Returns:
+            True if the row is deleted, False otherwise.
+        """
+        self._check_position(position)
+        return position in self._bitmap
+
+    def is_empty(self) -> bool:
+        return len(self._bitmap) == 0
+
+    def get_cardinality(self) -> int:
+        """
+        Returns the number of distinct integers added to the DeletionVector.
+
+        Returns:
+            The number of deleted positions.
+        """
+        return len(self._bitmap)
+
+    def merge(self, deletion_vector: DeletionVector) -> None:
+        """
+        Merge another DeletionVector to this current one.
+
+        Args:
+            deletion_vector: The other DeletionVector to merge.
+        """
+        if isinstance(deletion_vector, BitmapDeletionVector):
+            self._bitmap |= deletion_vector._bitmap
+        else:
+            raise RuntimeError("Only instance with the same class type can be 
merged.")
+
+    def serialize(self) -> bytes:
+        """
+        Serializes the deletion vector to bytes.
+
+        Returns:
+            The serialized bytes.
+        """
+        # Serialize the bitmap
+        bitmap_bytes = self._bitmap.serialize()
+
+        # Create the full data with magic number
+        magic_bytes = struct.pack('>I', self.MAGIC_NUMBER)
+        data = magic_bytes + bitmap_bytes
+
+        # Calculate size and checksum
+        size = len(data)
+        checksum = self._calculate_checksum(data)
+
+        # Pack: size (4 bytes) + data + checksum (4 bytes)
+        result = struct.pack('>I', size) + data + struct.pack('>I', checksum)
+
+        return result
+
+    @staticmethod
+    def deserialize_from_bytes(data: bytes) -> 'BitmapDeletionVector':
+        """
+        Deserializes a BitmapDeletionVector from bytes.
+
+        Args:
+            data: The serialized bytes (without magic number).
+
+        Returns:
+            A BitmapDeletionVector instance.
+        """
+        bitmap = BitMap.deserialize(data)
+        return BitmapDeletionVector(bitmap)
+
+    def bit_map(self):
+        return self._bitmap
+
+    def _check_position(self, position: int) -> None:
+        """
+        Checks if the position is valid.
+
+        Args:
+            position: The position to check.
+
+        Raises:
+            ValueError: If the position exceeds the maximum value.
+        """
+        if position > self.MAX_VALUE:
+            raise ValueError(
+                f"The file has too many rows, RoaringBitmap32 only supports 
files "
+                f"with row count not exceeding {self.MAX_VALUE}."
+            )
+
+    @staticmethod
+    def _calculate_checksum(data: bytes) -> int:
+        """
+        Calculates CRC32 checksum for the given data.
+
+        Args:
+            data: The data to calculate checksum for.
+
+        Returns:
+            The CRC32 checksum as an unsigned 32-bit integer.
+        """
+        return zlib.crc32(data) & 0xffffffff
+
+    def __eq__(self, other):
+        if not isinstance(other, BitmapDeletionVector):
+            return False
+        return self._bitmap == other._bitmap
+
+    def __hash__(self):
+        return hash(tuple(self._bitmap))
diff --git a/paimon-python/pypaimon/deletionvectors/deletion_vector.py 
b/paimon-python/pypaimon/deletionvectors/deletion_vector.py
new file mode 100644
index 0000000000..44c179653f
--- /dev/null
+++ b/paimon-python/pypaimon/deletionvectors/deletion_vector.py
@@ -0,0 +1,142 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+from abc import ABC, abstractmethod
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.table.source.deletion_file import DeletionFile
+
+
+class DeletionVector(ABC):
+    """
+    The DeletionVector can efficiently record the positions of rows that are 
deleted in a file,
+    which can then be used to filter out deleted rows when processing the file.
+    """
+
+    @abstractmethod
+    def bit_map(self):
+        """
+        Returns the bitmap of the DeletionVector.
+        """
+        pass
+
+    @abstractmethod
+    def delete(self, position: int) -> None:
+        """
+        Marks the row at the specified position as deleted.
+
+        Args:
+            position: The position of the row to be marked as deleted.
+        """
+        pass
+
+    @abstractmethod
+    def is_deleted(self, position: int) -> bool:
+        """
+        Checks if the row at the specified position is deleted.
+
+        Args:
+            position: The position of the row to check.
+
+        Returns:
+            True if the row is deleted, False otherwise.
+        """
+        pass
+
+    @abstractmethod
+    def is_empty(self) -> bool:
+        """
+        Determines if the deletion vector is empty, indicating no deletions.
+
+        Returns:
+            True if the deletion vector is empty, False if it contains 
deletions.
+        """
+        pass
+
+    @abstractmethod
+    def get_cardinality(self) -> int:
+        """
+        Returns the number of distinct integers added to the DeletionVector.
+
+        Returns:
+            The number of deleted positions.
+        """
+        pass
+
+    @abstractmethod
+    def merge(self, deletion_vector: 'DeletionVector') -> None:
+        """
+        Merge another DeletionVector to this current one.
+
+        Args:
+            deletion_vector: The other DeletionVector to merge.
+        """
+        pass
+
+    def checked_delete(self, position: int) -> bool:
+        """
+        Marks the row at the specified position as deleted.
+
+        Args:
+            position: The position of the row to be marked as deleted.
+
+        Returns:
+            True if the added position wasn't already deleted. False otherwise.
+        """
+        if self.is_deleted(position):
+            return False
+        else:
+            self.delete(position)
+            return True
+
+    @staticmethod
+    def read(file_io: FileIO, deletion_file: DeletionFile) -> 'DeletionVector':
+        """
+        Read a DeletionVector from a file.
+        """
+        from pypaimon.deletionvectors.bitmap_deletion_vector import 
BitmapDeletionVector
+
+        with file_io.new_input_stream(deletion_file.dv_index_path) as f:
+            f.seek(deletion_file.offset)
+
+            # Read bitmap length
+            bitmap_length_bytes = f.read(4)
+            bitmap_length = int.from_bytes(bitmap_length_bytes, 
byteorder='big')
+
+            # Read magic number
+            magic_number_bytes = f.read(4)
+            magic_number = int.from_bytes(magic_number_bytes, byteorder='big')
+
+            if magic_number == BitmapDeletionVector.MAGIC_NUMBER:
+                if deletion_file.length is not None and bitmap_length != 
deletion_file.length:
+                    raise RuntimeError(
+                        f"Size not match, actual size: {bitmap_length}, 
expected size: {deletion_file.length}"
+                    )
+
+                # Magic number has been read, read remaining bytes
+                remaining_bytes = bitmap_length - 
BitmapDeletionVector.MAGIC_NUMBER_SIZE_BYTES
+                data = f.read(remaining_bytes)
+
+                # Skip CRC (4 bytes)
+                f.read(4)
+
+                return BitmapDeletionVector.deserialize_from_bytes(data)
+            else:
+                raise RuntimeError(
+                    f"Invalid magic number: {magic_number}, "
+                    f"expected: {BitmapDeletionVector.MAGIC_NUMBER}"
+                )
diff --git a/paimon-python/pypaimon/index/deletion_vector_meta.py 
b/paimon-python/pypaimon/index/deletion_vector_meta.py
new file mode 100644
index 0000000000..d5bc3d7876
--- /dev/null
+++ b/paimon-python/pypaimon/index/deletion_vector_meta.py
@@ -0,0 +1,40 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+from dataclasses import dataclass
+from typing import Optional
+
+
+@dataclass
+class DeletionVectorMeta:
+    """Metadata of deletion vector."""
+
+    data_file_name: str
+    offset: int
+    length: int
+    cardinality: Optional[int] = None
+
+    def __eq__(self, other):
+        if not isinstance(other, DeletionVectorMeta):
+            return False
+        return (self.data_file_name == other.data_file_name and
+                self.offset == other.offset and
+                self.length == other.length and
+                self.cardinality == other.cardinality)
+
+    def __hash__(self):
+        return hash((self.data_file_name, self.offset, self.length, 
self.cardinality))
diff --git a/paimon-python/pypaimon/index/index_file_meta.py 
b/paimon-python/pypaimon/index/index_file_meta.py
new file mode 100644
index 0000000000..a0cb7db46c
--- /dev/null
+++ b/paimon-python/pypaimon/index/index_file_meta.py
@@ -0,0 +1,48 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+from dataclasses import dataclass
+from typing import Optional, Dict
+
+from pypaimon.index.deletion_vector_meta import DeletionVectorMeta
+
+
+@dataclass
+class IndexFileMeta:
+    """Metadata of index file."""
+
+    index_type: str
+    file_name: str
+    file_size: int
+    row_count: int
+    dv_ranges: Optional[Dict[str, DeletionVectorMeta]] = None
+    external_path: Optional[str] = None
+
+    def __eq__(self, other):
+        if not isinstance(other, IndexFileMeta):
+            return False
+        return (self.index_type == other.index_type and
+                self.file_name == other.file_name and
+                self.file_size == other.file_size and
+                self.row_count == other.row_count and
+                self.dv_ranges == other.dv_ranges and
+                self.external_path == other.external_path)
+
+    def __hash__(self):
+        dv_ranges_tuple = tuple(sorted(self.dv_ranges.items())) if 
self.dv_ranges else None
+        return hash((self.index_type, self.file_name, self.file_size,
+                     self.row_count, dv_ranges_tuple, self.external_path))
diff --git a/paimon-python/pypaimon/manifest/index_manifest_entry.py 
b/paimon-python/pypaimon/manifest/index_manifest_entry.py
new file mode 100644
index 0000000000..f7b5e399e0
--- /dev/null
+++ b/paimon-python/pypaimon/manifest/index_manifest_entry.py
@@ -0,0 +1,62 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+from dataclasses import dataclass
+
+from pypaimon.index.index_file_meta import IndexFileMeta
+from pypaimon.table.row.generic_row import GenericRow
+
+
+@dataclass
+class IndexManifestEntry:
+    """Manifest entry for index file."""
+
+    kind: int  # 0 for ADD, 1 for DELETE
+    partition: GenericRow
+    bucket: int
+    index_file: IndexFileMeta
+
+    def __eq__(self, other):
+        if not isinstance(other, IndexManifestEntry):
+            return False
+        return (self.kind == other.kind and
+                self.partition == other.partition and
+                self.bucket == other.bucket and
+                self.index_file == other.index_file)
+
+    def __hash__(self):
+        return hash((self.kind, tuple(self.partition.values),
+                     self.bucket, self.index_file))
+
+
+INDEX_MANIFEST_ENTRY = {
+    "type": "record",
+    "name": "IndexManifestEntry",
+    "fields": [
+        {"name": "_VERSION", "type": "int"},
+        {"name": "_KIND", "type": "byte"},
+        {"name": "_PARTITION", "type": "bytes"},
+        {"name": "_BUCKET", "type": "int"},
+        {"name": "_INDEX_TYPE", "type": "string"},
+        {"name": "_FILE_NAME", "type": "string"},
+        {"name": "_FILE_SIZE", "type": "long"},
+        {"name": "_ROW_COUNT", "type": "long"},
+        {"name": "_DELETIONS_VECTORS_RANGES", "type": {"type": "array", 
"elementType": "DeletionVectorMeta"}},
+        {"name": "_EXTERNAL_PATH", "type": ["null", "string"]},
+        {"name": "_GLOBAL_INDEX", "type": "GlobalIndexMeta"}
+    ]
+}
diff --git a/paimon-python/pypaimon/manifest/index_manifest_file.py 
b/paimon-python/pypaimon/manifest/index_manifest_file.py
new file mode 100644
index 0000000000..cd66ee5fce
--- /dev/null
+++ b/paimon-python/pypaimon/manifest/index_manifest_file.py
@@ -0,0 +1,95 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+from io import BytesIO
+from typing import List
+
+import fastavro
+
+from pypaimon.index.deletion_vector_meta import DeletionVectorMeta
+from pypaimon.index.index_file_meta import IndexFileMeta
+from pypaimon.manifest.index_manifest_entry import IndexManifestEntry
+from pypaimon.table.row.generic_row import GenericRowDeserializer
+
+
+class IndexManifestFile:
+    """Index manifest file reader for reading index manifest entries."""
+
+    DELETION_VECTORS_INDEX = "DELETION_VECTORS"
+
+    def __init__(self, table):
+        from pypaimon.table.file_store_table import FileStoreTable
+
+        self.table: FileStoreTable = table
+        manifest_path = table.table_path.rstrip('/')
+        self.manifest_path = f"{manifest_path}/manifest"
+        self.file_io = table.file_io
+        self.partition_keys_fields = self.table.partition_keys_fields
+
+    def read(self, index_manifest_name: str) -> List[IndexManifestEntry]:
+        """Read index manifest entries from the specified index manifest 
file."""
+        index_manifest_path = f"{self.manifest_path}/{index_manifest_name}"
+
+        if not self.file_io.exists(index_manifest_path):
+            return []
+
+        entries = []
+        try:
+            with self.file_io.new_input_stream(index_manifest_path) as 
input_stream:
+                avro_bytes = input_stream.read()
+
+            buffer = BytesIO(avro_bytes)
+            reader = fastavro.reader(buffer)
+
+            for record in reader:
+                dv_list = record['_DELETIONS_VECTORS_RANGES']
+                file_dv_dict = {}
+                for dv_meta_record in dv_list:
+                    dv_meta = DeletionVectorMeta(
+                        data_file_name=dv_meta_record['f0'],
+                        offset=dv_meta_record['f1'],
+                        length=dv_meta_record['f2'],
+                        cardinality=dv_meta_record.get('_CARDINALITY')
+                    )
+                    file_dv_dict[dv_meta.data_file_name] = dv_meta
+
+                # Create IndexFileMeta
+                index_file_meta = IndexFileMeta(
+                    index_type=record['_INDEX_TYPE'],
+                    file_name=record['_FILE_NAME'],
+                    file_size=record['_FILE_SIZE'],
+                    row_count=record['_ROW_COUNT'],
+                    dv_ranges=file_dv_dict,
+                    external_path=record['_EXTERNAL_PATH']
+                )
+
+                # Create IndexManifestEntry
+                entry = IndexManifestEntry(
+                    kind=record['_KIND'],
+                    partition=GenericRowDeserializer.from_bytes(
+                        record['_PARTITION'],
+                        self.partition_keys_fields
+                    ),
+                    bucket=record['_BUCKET'],
+                    index_file=index_file_meta
+                )
+                entries.append(entry)
+
+        except Exception as e:
+            raise RuntimeError(f"Failed to read index manifest file 
{index_manifest_path}: {e}") from e
+
+        return entries
diff --git a/paimon-python/pypaimon/read/reader/concat_batch_reader.py 
b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
index 4b86265d77..aefff13ebd 100644
--- a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
@@ -29,7 +29,7 @@ from pypaimon.read.reader.iface.record_batch_reader import 
RecordBatchReader
 class ConcatBatchReader(RecordBatchReader):
 
     def __init__(self, reader_suppliers: List[Callable]):
-        self.queue = collections.deque(reader_suppliers)
+        self.queue: collections.deque[Callable] = 
collections.deque(reader_suppliers)
         self.current_reader: Optional[RecordBatchReader] = None
 
     def read_arrow_batch(self) -> Optional[RecordBatch]:
diff --git a/paimon-python/pypaimon/read/reader/concat_record_reader.py 
b/paimon-python/pypaimon/read/reader/concat_record_reader.py
index 75f6b09b09..1c71e27fff 100644
--- a/paimon-python/pypaimon/read/reader/concat_record_reader.py
+++ b/paimon-python/pypaimon/read/reader/concat_record_reader.py
@@ -26,7 +26,7 @@ from pypaimon.read.reader.iface.record_reader import 
RecordReader
 class ConcatRecordReader(RecordReader):
 
     def __init__(self, reader_suppliers: List[Callable]):
-        self.queue = collections.deque(reader_suppliers)
+        self.queue: collections.deque[Callable] = 
collections.deque(reader_suppliers)
         self.current_reader: Optional[RecordReader] = None
 
     def read_batch(self) -> Optional[RecordIterator]:
diff --git a/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py 
b/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py
index b7da3fc96b..ec3a1bc424 100644
--- a/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py
@@ -41,20 +41,25 @@ class RecordBatchReader(RecordReader):
         Reads one batch. The method should return null when reaching the end 
of the input.
         """
 
-    def _read_next_df(self) -> Optional[polars.DataFrame]:
+    def return_batch_pos(self) -> int:
+        """
+        Returns the current batch position in the file.
+        """
+
+    def read_next_df(self) -> Optional[polars.DataFrame]:
         arrow_batch = self.read_arrow_batch()
         if arrow_batch is None:
             return None
         return polars.from_arrow(arrow_batch)
 
     def tuple_iterator(self) -> Optional[Iterator[tuple]]:
-        df = self._read_next_df()
+        df = self.read_next_df()
         if df is None:
             return None
         return df.iter_rows()
 
     def read_batch(self) -> Optional[RecordIterator[InternalRow]]:
-        df = self._read_next_df()
+        df = self.read_next_df()
         if df is None:
             return None
         return InternalRowWrapperIterator(df.iter_rows(), df.width)
@@ -71,3 +76,53 @@ class 
InternalRowWrapperIterator(RecordIterator[InternalRow]):
             return None
         self._reused_row.replace(row_tuple)
         return self._reused_row
+
+
+class RowPositionReader(RecordBatchReader):
+
+    def __init__(self, data_reader: RecordBatchReader):
+        self._data_reader = data_reader
+        self._row_iterator = RowPositionRecordIterator()
+        self.batch_pos = 0
+
+    def read_arrow_batch(self) -> Optional[RecordBatch]:
+        batch = self._data_reader.read_arrow_batch()
+        if batch is None:
+            return None
+        self.batch_pos += batch.num_rows
+        return batch
+
+    def return_batch_pos(self) -> int:
+        return self.batch_pos
+
+    def tuple_iterator(self) -> Optional[RecordIterator]:
+        return 
self._row_iterator.replace_iterator(self._data_reader.tuple_iterator())
+
+    def close(self):
+        self._data_reader.close()
+
+
+class RowPositionRecordIterator(RecordIterator[tuple]):
+
+    def __init__(self):
+        self.reused_iterator: Optional[Iterator[tuple]] = None
+        self.pos = -1
+
+    def next(self) -> Optional[tuple]:
+        row_tuple = next(self.reused_iterator, None)
+        if row_tuple is None:
+            return None
+        self.pos += 1
+        return row_tuple
+
+    def return_pos(self) -> int:
+        return self.pos
+
+    def replace_iterator(self, iterator: Iterator[tuple]) -> 
Optional[RecordIterator[tuple]]:
+        self.reused_iterator = iterator
+        if self.reused_iterator is None:
+            return None
+        return self
+
+    def __next__(self):
+        return self.next()
diff --git a/paimon-python/pypaimon/read/reader/iface/record_iterator.py 
b/paimon-python/pypaimon/read/reader/iface/record_iterator.py
index 6684d8fd4c..5781b4994e 100644
--- a/paimon-python/pypaimon/read/reader/iface/record_iterator.py
+++ b/paimon-python/pypaimon/read/reader/iface/record_iterator.py
@@ -32,3 +32,8 @@ class RecordIterator(Generic[T], ABC):
         """
         Gets the next record from the iterator. Returns null if this iterator 
has no more elements.
         """
+
+    def return_pos(self) -> int:
+        """
+        Returns the current position of the file.
+        """
diff --git a/paimon-python/pypaimon/read/reader/iface/record_reader.py 
b/paimon-python/pypaimon/read/reader/iface/record_reader.py
index 8be28a12e1..3627161677 100644
--- a/paimon-python/pypaimon/read/reader/iface/record_reader.py
+++ b/paimon-python/pypaimon/read/reader/iface/record_reader.py
@@ -26,13 +26,13 @@ T = TypeVar('T')
 
 class RecordReader(Generic[T], ABC):
     """
-    The reader that reads the batches of records as RecordIterator.
+    The reader that reads the batches of records.
     """
 
     @abstractmethod
     def read_batch(self) -> Optional[RecordIterator[T]]:
         """
-        Reads one batch. The method should return null when reaching the end 
of the input.
+        Reads one batch as a RecordIterator. The method should return null 
when reaching the end of the input.
         """
 
     @abstractmethod
diff --git a/paimon-python/pypaimon/read/reader/key_value_wrap_reader.py 
b/paimon-python/pypaimon/read/reader/key_value_wrap_reader.py
index d8de247ec8..96a48afccb 100644
--- a/paimon-python/pypaimon/read/reader/key_value_wrap_reader.py
+++ b/paimon-python/pypaimon/read/reader/key_value_wrap_reader.py
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 
-from typing import Iterator, Optional
+from typing import Iterator, Optional, Union
 
 from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
 from pypaimon.read.reader.iface.record_iterator import RecordIterator
@@ -53,7 +53,7 @@ class KeyValueWrapIterator(RecordIterator[KeyValue]):
 
     def __init__(
             self,
-            iterator: Iterator,
+            iterator: Union[Iterator, RecordIterator],
             reused_kv: KeyValue
     ):
         self.iterator = iterator
@@ -65,3 +65,6 @@ class KeyValueWrapIterator(RecordIterator[KeyValue]):
             return None
         self.reused_kv.replace(row_tuple)
         return self.reused_kv
+
+    def return_pos(self) -> int:
+        return self.iterator.return_pos()
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py 
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index e35249bd05..7432e56b84 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -17,10 +17,13 @@ limitations under the License.
 """
 import os
 from collections import defaultdict
-from typing import Callable, List, Optional
+from typing import Callable, List, Optional, Dict, Set
 
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
+from pypaimon.table.source.deletion_file import DeletionFile
+from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.manifest.index_manifest_file import IndexManifestFile
 from pypaimon.manifest.manifest_file_manager import ManifestFileManager
 from pypaimon.manifest.manifest_list_manager import ManifestListManager
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
@@ -64,6 +67,7 @@ class FullStartingScanner(StartingScanner):
         self.only_read_real_buckets = True if int(
             self.table.options.get('bucket', -1)) == 
BucketMode.POSTPONE_BUCKET.value else False
         self.data_evolution = 
self.table.options.get(CoreOptions.DATA_EVOLUTION_ENABLED, 'false').lower() == 
'true'
+        self.deletion_vectors_enabled = 
self.table.options.get('deletion-vectors.enabled', 'false').lower() == 'true'
 
         def schema_fields_func(schema_id: int):
             return self.table.schema_manager.get_schema(schema_id).fields
@@ -77,12 +81,24 @@ class FullStartingScanner(StartingScanner):
         file_entries = self.plan_files()
         if not file_entries:
             return Plan([])
+
+        # Get deletion files map if deletion vectors are enabled.
+        # {partition-bucket -> {filename -> DeletionFile}}
+        deletion_files_map: dict[tuple, dict[str, DeletionFile]] = {}
+        if self.deletion_vectors_enabled:
+            latest_snapshot = self.snapshot_manager.get_latest_snapshot()
+            # Extract unique partition-bucket pairs from file entries
+            buckets = set()
+            for entry in file_entries:
+                buckets.add((tuple(entry.partition.values), entry.bucket))
+            deletion_files_map = self._scan_dv_index(latest_snapshot, buckets)
+
         if self.table.is_primary_key_table:
-            splits = self._create_primary_key_splits(file_entries)
+            splits = self._create_primary_key_splits(file_entries, 
deletion_files_map)
         elif self.data_evolution:
-            splits = self._create_data_evolution_splits(file_entries)
+            splits = self._create_data_evolution_splits(file_entries, 
deletion_files_map)
         else:
-            splits = self._create_append_only_splits(file_entries)
+            splits = self._create_append_only_splits(file_entries, 
deletion_files_map)
 
         splits = self._apply_push_down_limit(splits)
         return Plan(splits)
@@ -286,7 +302,8 @@ class FullStartingScanner(StartingScanner):
             return False
         if self.partition_key_predicate and not 
self.partition_key_predicate.test(entry.partition):
             return False
-
+        if self.deletion_vectors_enabled and entry.file.level == 0:  # do not 
read level 0 file
+            return False
         # Get SimpleStatsEvolution for this schema
         evolution = 
self.simple_stats_evolutions.get_or_create(entry.file.schema_id)
 
@@ -315,7 +332,95 @@ class FullStartingScanner(StartingScanner):
                 entry.file.row_count
             )
 
-    def _create_append_only_splits(self, file_entries: List[ManifestEntry]) -> 
List['Split']:
+    def _scan_dv_index(self, snapshot, buckets: Set[tuple]) -> Dict[tuple, 
Dict[str, DeletionFile]]:
+        """
+        Scan deletion vector index from snapshot.
+        Returns a map of (partition, bucket) -> {filename -> DeletionFile}
+
+        Reference: SnapshotReaderImpl.scanDvIndex() in Java
+        """
+        if not snapshot or not snapshot.index_manifest:
+            return {}
+
+        result = {}
+
+        # Read index manifest file
+        index_manifest_file = IndexManifestFile(self.table)
+        index_entries = index_manifest_file.read(snapshot.index_manifest)
+
+        # Filter by DELETION_VECTORS_INDEX type and requested buckets
+        for entry in index_entries:
+            if entry.index_file.index_type != 
IndexManifestFile.DELETION_VECTORS_INDEX:
+                continue
+
+            partition_bucket = (tuple(entry.partition.values), entry.bucket)
+            if partition_bucket not in buckets:
+                continue
+
+            # Convert to deletion files
+            deletion_files = self._to_deletion_files(entry)
+            if deletion_files:
+                result[partition_bucket] = deletion_files
+
+        return result
+
+    def _to_deletion_files(self, index_entry) -> Dict[str, DeletionFile]:
+        """
+        Convert index manifest entry to deletion files map.
+        Returns {filename -> DeletionFile}
+        """
+        deletion_files = {}
+        index_file = index_entry.index_file
+
+        # Check if dv_ranges exists
+        if not index_file.dv_ranges:
+            return deletion_files
+
+        # Build deletion file path
+        # Format: manifest/index-manifest-{uuid}
+        index_path = self.table.table_path.rstrip('/') + '/index'
+        dv_file_path = f"{index_path}/{index_file.file_name}"
+
+        # Convert each DeletionVectorMeta to DeletionFile
+        for data_file_name, dv_meta in index_file.dv_ranges.items():
+            deletion_file = DeletionFile(
+                dv_index_path=dv_file_path,
+                offset=dv_meta.offset,
+                length=dv_meta.length,
+                cardinality=dv_meta.cardinality
+            )
+            deletion_files[data_file_name] = deletion_file
+
+        return deletion_files
+
+    def _get_deletion_files_for_split(self, data_files: List[DataFileMeta],
+                                      deletion_files_map: dict,
+                                      partition: GenericRow,
+                                      bucket: int) -> 
Optional[List[DeletionFile]]:
+        """
+        Get deletion files for the given data files in a split.
+        """
+        if not deletion_files_map:
+            return None
+
+        partition_key = (tuple(partition.values), bucket)
+        file_deletion_map = deletion_files_map.get(partition_key, {})
+
+        if not file_deletion_map:
+            return None
+
+        deletion_files = []
+        for data_file in data_files:
+            deletion_file = file_deletion_map.get(data_file.file_name)
+            if deletion_file:
+                deletion_files.append(deletion_file)
+            else:
+                deletion_files.append(None)
+
+        return deletion_files if any(df is not None for df in deletion_files) 
else None
+
+    def _create_append_only_splits(
+            self, file_entries: List[ManifestEntry], deletion_files_map: dict 
= None) -> List['Split']:
         partitioned_files = defaultdict(list)
         for entry in file_entries:
             partitioned_files[(tuple(entry.partition.values), 
entry.bucket)].append(entry)
@@ -335,12 +440,13 @@ class FullStartingScanner(StartingScanner):
 
             packed_files: List[List[DataFileMeta]] = 
self._pack_for_ordered(data_files, weight_func,
                                                                             
self.target_split_size)
-            splits += self._build_split_from_pack(packed_files, file_entries, 
False)
+            splits += self._build_split_from_pack(packed_files, file_entries, 
False, deletion_files_map)
         if self.idx_of_this_subtask is not None:
             self._compute_split_start_end_row(splits, plan_start_row, 
plan_end_row)
         return splits
 
-    def _create_primary_key_splits(self, file_entries: List[ManifestEntry]) -> 
List['Split']:
+    def _create_primary_key_splits(
+            self, file_entries: List[ManifestEntry], deletion_files_map: dict 
= None) -> List['Split']:
         if self.idx_of_this_subtask is not None:
             file_entries = self._primary_key_filter_by_shard(file_entries)
         partitioned_files = defaultdict(list)
@@ -368,10 +474,11 @@ class FullStartingScanner(StartingScanner):
                 [file for sub_pack in pack for file in sub_pack]
                 for pack in packed_files
             ]
-            splits += self._build_split_from_pack(flatten_packed_files, 
file_entries, True)
+            splits += self._build_split_from_pack(flatten_packed_files, 
file_entries, True, deletion_files_map)
         return splits
 
-    def _create_data_evolution_splits(self, file_entries: List[ManifestEntry]) 
-> List['Split']:
+    def _create_data_evolution_splits(
+            self, file_entries: List[ManifestEntry], deletion_files_map: dict 
= None) -> List['Split']:
         def sort_key(manifest_entry: ManifestEntry) -> tuple:
             first_row_id = manifest_entry.file.first_row_id if 
manifest_entry.file.first_row_id is not None else float(
                 '-inf')
@@ -413,7 +520,7 @@ class FullStartingScanner(StartingScanner):
                 for pack in packed_files
             ]
 
-            splits += self._build_split_from_pack(flatten_packed_files, 
sorted_entries, False)
+            splits += self._build_split_from_pack(flatten_packed_files, 
sorted_entries, False, deletion_files_map)
 
         if self.idx_of_this_subtask is not None:
             self._compute_split_start_end_row(splits, plan_start_row, 
plan_end_row)
@@ -460,7 +567,8 @@ class FullStartingScanner(StartingScanner):
 
         return split_by_row_id
 
-    def _build_split_from_pack(self, packed_files, file_entries, 
for_primary_key_split: bool) -> List['Split']:
+    def _build_split_from_pack(self, packed_files, file_entries, 
for_primary_key_split: bool,
+                               deletion_files_map: dict = None) -> 
List['Split']:
         splits = []
         for file_group in packed_files:
             raw_convertible = True
@@ -479,6 +587,16 @@ class FullStartingScanner(StartingScanner):
                 total_record_count += data_file.row_count
 
             if file_paths:
+                # Get deletion files for this split
+                data_deletion_files = None
+                if deletion_files_map:
+                    data_deletion_files = self._get_deletion_files_for_split(
+                        file_group,
+                        deletion_files_map,
+                        file_entries[0].partition,
+                        file_entries[0].bucket
+                    )
+
                 split = Split(
                     files=file_group,
                     partition=file_entries[0].partition,
@@ -486,7 +604,8 @@ class FullStartingScanner(StartingScanner):
                     _file_paths=file_paths,
                     _row_count=total_record_count,
                     _file_size=total_file_size,
-                    raw_convertible=raw_convertible
+                    raw_convertible=raw_convertible,
+                    data_deletion_files=data_deletion_files
                 )
                 splits.append(split)
         return splits
diff --git a/paimon-python/pypaimon/read/split.py 
b/paimon-python/pypaimon/read/split.py
index f1ab5f3a5b..55cb955ad6 100644
--- a/paimon-python/pypaimon/read/split.py
+++ b/paimon-python/pypaimon/read/split.py
@@ -17,10 +17,11 @@
 
################################################################################
 
 from dataclasses import dataclass
-from typing import List
+from typing import List, Optional
 
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.table.source.deletion_file import DeletionFile
 
 
 @dataclass
@@ -35,6 +36,7 @@ class Split:
     split_start_row: int = None
     split_end_row: int = None
     raw_convertible: bool = False
+    data_deletion_files: Optional[List[DeletionFile]] = None
 
     @property
     def row_count(self) -> int:
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index 94d832eff4..ed579a2a20 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -19,10 +19,12 @@
 import os
 from abc import ABC, abstractmethod
 from functools import partial
-from typing import List, Optional, Tuple
+from typing import List, Optional, Tuple, Callable
 
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.common.predicate import Predicate
+from pypaimon.deletionvectors import ApplyDeletionVectorReader
+from pypaimon.deletionvectors.deletion_vector import DeletionVector
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.read.interval_partition import IntervalPartition, SortedRun
 from pypaimon.read.partition_info import PartitionInfo
@@ -39,7 +41,7 @@ from pypaimon.read.reader.format_avro_reader import 
FormatAvroReader
 from pypaimon.read.reader.format_blob_reader import FormatBlobReader
 from pypaimon.read.reader.format_lance_reader import FormatLanceReader
 from pypaimon.read.reader.format_pyarrow_reader import FormatPyArrowReader
-from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader, 
RowPositionReader
 from pypaimon.read.reader.iface.record_reader import RecordReader
 from pypaimon.read.reader.key_value_unwrap_reader import \
     KeyValueUnwrapRecordReader
@@ -70,6 +72,7 @@ class SplitRead(ABC):
         if isinstance(self, MergeFileSplitRead):
             self.read_fields = self._create_key_value_fields(read_type)
         self.schema_id_2_fields = {}
+        self.deletion_file_readers = {}
 
     def _push_down_predicate(self) -> Optional[Predicate]:
         if self.predicate is None:
@@ -86,7 +89,8 @@ class SplitRead(ABC):
     def create_reader(self) -> RecordReader:
         """Create a record reader for the given split."""
 
-    def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool, 
read_fields: List[str]):
+    def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
+                             read_fields: List[str]) -> RecordBatchReader:
         (read_file_fields, read_arrow_predicate) = 
self._get_fields_and_predicate(file.schema_id, read_fields)
 
         # Use external_path if available, otherwise use file_path
@@ -112,7 +116,7 @@ class SplitRead(ABC):
             raise ValueError(f"Unexpected file format: {file_format}")
 
         index_mapping = self.create_index_mapping()
-        partition_info = self.create_partition_info()
+        partition_info = self._create_partition_info()
         if for_merge_read:
             return DataFileBatchReader(format_reader, index_mapping, 
partition_info, self.trimmed_primary_key,
                                        self.table.table_schema.fields)
@@ -254,7 +258,7 @@ class SplitRead(ABC):
 
         return trimmed_mapping, trimmed_fields
 
-    def create_partition_info(self):
+    def _create_partition_info(self):
         if not self.table.partition_keys:
             return None
         partition_mapping = self._construct_partition_mapping()
@@ -281,17 +285,33 @@ class SplitRead(ABC):
 
         return mapping
 
+    def _genarate_deletion_file_readers(self):
+        self.deletion_file_readers = {}
+        if self.split.data_deletion_files:
+            for data_file, deletion_file in zip(self.split.files, 
self.split.data_deletion_files):
+                if deletion_file is not None:
+                    # Create a callable method to read the deletion vector
+                    self.deletion_file_readers[data_file.file_name] = lambda 
df=deletion_file: DeletionVector.read(
+                        self.table.file_io, df)
+
 
 class RawFileSplitRead(SplitRead):
+    def raw_reader_supplier(self, file: DataFileMeta, dv_factory: 
Optional[Callable] = None) -> RecordReader:
+        file_batch_reader = self.file_reader_supplier(file, False, 
self._get_final_read_data_fields())
+        dv = dv_factory() if dv_factory else None
+        if dv:
+            return 
ApplyDeletionVectorReader(RowPositionReader(file_batch_reader), dv)
+        else:
+            return file_batch_reader
 
     def create_reader(self) -> RecordReader:
+        self._genarate_deletion_file_readers()
         data_readers = []
         for file in self.split.files:
             supplier = partial(
-                self.file_reader_supplier,
+                self.raw_reader_supplier,
                 file=file,
-                for_merge_read=False,
-                read_fields=self._get_final_read_data_fields(),
+                dv_factory=self.deletion_file_readers.get(file.file_name, None)
             )
             data_readers.append(supplier)
 
@@ -312,26 +332,29 @@ class RawFileSplitRead(SplitRead):
 
 
 class MergeFileSplitRead(SplitRead):
-    def kv_reader_supplier(self, file):
-        reader_supplier = partial(
-            self.file_reader_supplier,
-            file=file,
-            for_merge_read=True,
-            read_fields=self._get_final_read_data_fields()
-        )
-        return KeyValueWrapReader(reader_supplier(), 
len(self.trimmed_primary_key), self.value_arity)
+    def kv_reader_supplier(self, file: DataFileMeta, dv_factory: 
Optional[Callable] = None) -> RecordReader:
+        file_batch_reader = self.file_reader_supplier(file, True, 
self._get_final_read_data_fields())
+        dv = dv_factory() if dv_factory else None
+        if dv:
+            return ApplyDeletionVectorReader(
+                KeyValueWrapReader(RowPositionReader(file_batch_reader),
+                                   len(self.trimmed_primary_key), 
self.value_arity), dv)
+        else:
+            return KeyValueWrapReader(file_batch_reader, 
len(self.trimmed_primary_key), self.value_arity)
 
-    def section_reader_supplier(self, section: List[SortedRun]):
+    def section_reader_supplier(self, section: List[SortedRun]) -> 
RecordReader:
         readers = []
         for sorter_run in section:
             data_readers = []
             for file in sorter_run.files:
-                supplier = partial(self.kv_reader_supplier, file)
+                supplier = partial(self.kv_reader_supplier, file, 
self.deletion_file_readers.get(file.file_name, None))
                 data_readers.append(supplier)
             readers.append(ConcatRecordReader(data_readers))
         return SortMergeReaderWithMinHeap(readers, self.table.table_schema)
 
     def create_reader(self) -> RecordReader:
+        # Create a dict mapping data file name to deletion file reader method
+        self._genarate_deletion_file_readers()
         section_readers = []
         sections = IntervalPartition(self.split.files).partition()
         for section in sections:
diff --git a/paimon-python/pypaimon/table/row/generic_row.py 
b/paimon-python/pypaimon/table/row/generic_row.py
index b05e475951..be5c1ec80f 100644
--- a/paimon-python/pypaimon/table/row/generic_row.py
+++ b/paimon-python/pypaimon/table/row/generic_row.py
@@ -51,6 +51,10 @@ class GenericRow(InternalRow):
     def __len__(self) -> int:
         return len(self.values)
 
+    def __str__(self):
+        field_strs = [f"{field.name}={repr(value)}" for field, value in 
zip(self.fields, self.values)]
+        return f"GenericRow(row_kind={self.row_kind.name}, {', 
'.join(field_strs)})"
+
 
 class GenericRowDeserializer:
     HEADER_SIZE_IN_BITS = 8
diff --git a/paimon-python/pypaimon/table/source/deletion_file.py 
b/paimon-python/pypaimon/table/source/deletion_file.py
new file mode 100644
index 0000000000..cc05ad6ed7
--- /dev/null
+++ b/paimon-python/pypaimon/table/source/deletion_file.py
@@ -0,0 +1,49 @@
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing,
+#  software distributed under the License is distributed on an
+#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#  KIND, either express or implied.  See the License for the
+#  specific language governing permissions and limitations
+#  under the License.
+
+from dataclasses import dataclass
+from typing import Optional
+
+
+@dataclass
+class DeletionFile:
+    """
+    Deletion file for data file, the first 4 bytes are length, the following 
is the bitmap content.
+
+    - The first 4 bytes are length, should equal to length().
+    - Next 4 bytes are the magic number, should be equal to 1581511376.
+    - The remaining content should be a RoaringBitmap.
+    """
+    dv_index_path: str  # The file where the vector for data file is located
+    offset: int  # The offset where the vector for data file is located in the 
dv index file
+    length: int
+    cardinality: Optional[int] = None
+
+    def __eq__(self, other):
+        if not isinstance(other, DeletionFile):
+            return False
+        return (self.dv_index_path == other.dv_index_path and
+                self.offset == other.offset and
+                self.length == other.length and
+                self.cardinality == other.cardinality)
+
+    def __hash__(self):
+        return hash((self.dv_index_path, self.offset, self.length, 
self.cardinality))
+
+    def __str__(self):
+        return (f"DeletionFile(path='{self.dv_index_path}', 
offset={self.offset}, "
+                f"length={self.length}, cardinality={self.cardinality})")
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py 
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index 3213ec87d3..b88224784c 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -180,3 +180,72 @@ class JavaPyReadWriteTest(unittest.TestCase):
         # Note: Normal read filters out system fields, so we verify through 
Java read
         # which explicitly reads KeyValue objects and checks valueKind
         print(f"Format: {file_format}, Python read completed. ValueKind 
verification should be done in Java test.")
+
+    def test_pk_dv_read(self):
+        pa_schema = pa.schema([
+            pa.field('pt', pa.int32(), nullable=False),
+            pa.field('a', pa.int32(), nullable=False),
+            ('b', pa.int64())
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema,
+                                            partition_keys=['pt'],
+                                            primary_keys=['pt', 'a'],
+                                            options={'bucket': '1'})
+        self.catalog.create_table('default.test_pk_dv', schema, True)
+        table = self.catalog.get_table('default.test_pk_dv')
+        read_builder = table.new_read_builder()
+        table_read = read_builder.new_read()
+        splits = read_builder.new_scan().plan().splits()
+        actual = table_read.to_arrow(splits).sort_by('pt')
+        expected = pa.Table.from_pydict({
+            'pt': [1, 2, 2],
+            'a': [10, 21, 22],
+            'b': [1000, 20001, 202]
+        }, schema=pa_schema)
+        self.assertEqual(expected, actual)
+
+    def test_pk_dv_read_multi_batch(self):
+        pa_schema = pa.schema([
+            pa.field('pt', pa.int32(), nullable=False),
+            pa.field('a', pa.int32(), nullable=False),
+            ('b', pa.int64())
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema,
+                                            partition_keys=['pt'],
+                                            primary_keys=['pt', 'a'],
+                                            options={'bucket': '1'})
+        self.catalog.create_table('default.test_pk_dv_multi_batch', schema, 
True)
+        table = self.catalog.get_table('default.test_pk_dv_multi_batch')
+        read_builder = table.new_read_builder()
+        table_read = read_builder.new_read()
+        splits = read_builder.new_scan().plan().splits()
+        actual = table_read.to_arrow(splits).sort_by('pt')
+        expected = pa.Table.from_pydict({
+            'pt': [1] * 9999,
+            'a': [i * 10 for i in range(1, 10001) if i * 10 != 81930],
+            'b': [i * 100 for i in range(1, 10001) if i * 10 != 81930]
+        }, schema=pa_schema)
+        self.assertEqual(expected, actual)
+
+    def test_pk_dv_read_multi_batch_raw_convertable(self):
+        pa_schema = pa.schema([
+            pa.field('pt', pa.int32(), nullable=False),
+            pa.field('a', pa.int32(), nullable=False),
+            ('b', pa.int64())
+        ])
+        schema = Schema.from_pyarrow_schema(pa_schema,
+                                            partition_keys=['pt'],
+                                            primary_keys=['pt', 'a'],
+                                            options={'bucket': '1'})
+        self.catalog.create_table('default.test_pk_dv_raw_convertable', 
schema, True)
+        table = self.catalog.get_table('default.test_pk_dv_raw_convertable')
+        read_builder = table.new_read_builder()
+        table_read = read_builder.new_read()
+        splits = read_builder.new_scan().plan().splits()
+        actual = table_read.to_arrow(splits).sort_by('pt')
+        expected = pa.Table.from_pydict({
+            'pt': [1] * 9999,
+            'a': [i * 10 for i in range(1, 10001) if i * 10 != 81930],
+            'b': [i * 100 for i in range(1, 10001) if i * 10 != 81930]
+        }, schema=pa_schema)
+        self.assertEqual(expected, actual)

Reply via email to