This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 69b9f0dc36 [Python] Support read deletion vector pk table (#6766)
69b9f0dc36 is described below
commit 69b9f0dc363f1b9e8d67b3b32d36a947ccfe33aa
Author: umi <[email protected]>
AuthorDate: Tue Dec 9 15:08:04 2025 +0800
[Python] Support read deletion vector pk table (#6766)
---
.github/workflows/paimon-python-checks.yml | 4 +-
.../test/java/org/apache/paimon/JavaPyE2ETest.java | 213 +++++++++++++++++++--
paimon-python/dev/cfg.ini | 2 +-
paimon-python/dev/requirements.txt | 1 +
paimon-python/dev/run_mixed_tests.sh | 36 +++-
paimon-python/pypaimon/deletionvectors/__init__.py | 27 +++
.../apply_deletion_vector_reader.py | 129 +++++++++++++
.../deletionvectors/bitmap_deletion_vector.py | 165 ++++++++++++++++
.../pypaimon/deletionvectors/deletion_vector.py | 142 ++++++++++++++
.../pypaimon/index/deletion_vector_meta.py | 40 ++++
paimon-python/pypaimon/index/index_file_meta.py | 48 +++++
.../pypaimon/manifest/index_manifest_entry.py | 62 ++++++
.../pypaimon/manifest/index_manifest_file.py | 95 +++++++++
.../pypaimon/read/reader/concat_batch_reader.py | 2 +-
.../pypaimon/read/reader/concat_record_reader.py | 2 +-
.../read/reader/iface/record_batch_reader.py | 61 +++++-
.../pypaimon/read/reader/iface/record_iterator.py | 5 +
.../pypaimon/read/reader/iface/record_reader.py | 4 +-
.../pypaimon/read/reader/key_value_wrap_reader.py | 7 +-
.../pypaimon/read/scanner/full_starting_scanner.py | 145 ++++++++++++--
paimon-python/pypaimon/read/split.py | 4 +-
paimon-python/pypaimon/read/split_read.py | 59 ++++--
paimon-python/pypaimon/table/row/generic_row.py | 4 +
.../pypaimon/table/source/deletion_file.py | 49 +++++
.../pypaimon/tests/e2e/java_py_read_write_test.py | 69 +++++++
25 files changed, 1317 insertions(+), 58 deletions(-)
diff --git a/.github/workflows/paimon-python-checks.yml
b/.github/workflows/paimon-python-checks.yml
index e6a10e8949..6df806a4f4 100755
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -91,10 +91,10 @@ jobs:
if [[ "${{ matrix.python-version }}" == "3.6.15" ]]; then
python -m pip install --upgrade pip==21.3.1
python --version
- python -m pip install -q readerwriterlock==1.0.9
'fsspec==2021.10.1' 'cachetools==4.2.4' 'ossfs==2021.8.0' pyarrow==6.0.1
pandas==1.1.5 'polars==0.9.12' 'fastavro==1.4.7' zstandard==0.19.0
dataclasses==0.8.0 flake8 pytest py4j==0.10.9.9 requests parameterized==0.8.1
2>&1 >/dev/null
+ python -m pip install -q pyroaring readerwriterlock==1.0.9
'fsspec==2021.10.1' 'cachetools==4.2.4' 'ossfs==2021.8.0' pyarrow==6.0.1
pandas==1.1.5 'polars==0.9.12' 'fastavro==1.4.7' zstandard==0.19.0
dataclasses==0.8.0 flake8 pytest py4j==0.10.9.9 requests parameterized==0.8.1
2>&1 >/dev/null
else
python -m pip install --upgrade pip
- python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1
cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0
zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3
pylance==0.39.0 flake8==4.0.1 pytest~=7.0 py4j==0.10.9.9 requests
parameterized==0.9.0 2>&1 >/dev/null
+ python -m pip install -q pyroaring readerwriterlock==1.0.9
fsspec==2024.3.1 cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0
fastavro==1.11.1 pyarrow==16.0.0 zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2
numpy==1.24.3 pandas==2.0.3 pylance==0.39.0 flake8==4.0.1 pytest~=7.0
py4j==0.10.9.9 requests parameterized==0.9.0 2>&1 >/dev/null
fi
- name: Run lint-python.sh
shell: bash
diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index df0c8c7360..39c2011bf4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -26,15 +26,29 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.DataFormatTestUtil;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.SchemaUtils;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.PrimaryKeyFileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.InnerTableCommit;
+import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.TraceableFileIO;
import org.junit.jupiter.api.BeforeEach;
@@ -44,9 +58,17 @@ import
org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
+import static org.apache.paimon.CoreOptions.TARGET_FILE_SIZE;
+import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
import static org.apache.paimon.table.SimpleTableTestBase.getResult;
import static org.assertj.core.api.Assertions.assertThat;
@@ -102,12 +124,12 @@ public class JavaPyE2ETest {
try (StreamTableWrite write = fileStoreTable.newWrite(commitUser);
InnerTableCommit commit =
fileStoreTable.newCommit(commitUser)) {
- write.write(createRow(1, "Apple", "Fruit", 1.5));
- write.write(createRow(2, "Banana", "Fruit", 0.8));
- write.write(createRow(3, "Carrot", "Vegetable", 0.6));
- write.write(createRow(4, "Broccoli", "Vegetable", 1.2));
- write.write(createRow(5, "Chicken", "Meat", 5.0));
- write.write(createRow(6, "Beef", "Meat", 8.0));
+ write.write(createRow4Cols(1, "Apple", "Fruit", 1.5));
+ write.write(createRow4Cols(2, "Banana", "Fruit", 0.8));
+ write.write(createRow4Cols(3, "Carrot", "Vegetable", 0.6));
+ write.write(createRow4Cols(4, "Broccoli", "Vegetable", 1.2));
+ write.write(createRow4Cols(5, "Chicken", "Meat", 5.0));
+ write.write(createRow4Cols(6, "Beef", "Meat", 8.0));
commit.commit(0, write.prepareCommit(true, 0));
}
@@ -170,12 +192,12 @@ public class JavaPyE2ETest {
try (StreamTableWrite write = fileStoreTable.newWrite(commitUser);
InnerTableCommit commit =
fileStoreTable.newCommit(commitUser)) {
- write.write(createRow(1, "Apple", "Fruit", 1.5));
- write.write(createRow(2, "Banana", "Fruit", 0.8));
- write.write(createRow(3, "Carrot", "Vegetable", 0.6));
- write.write(createRow(4, "Broccoli", "Vegetable", 1.2));
- write.write(createRow(5, "Chicken", "Meat", 5.0));
- write.write(createRow(6, "Beef", "Meat", 8.0));
+ write.write(createRow4Cols(1, "Apple", "Fruit", 1.5));
+ write.write(createRow4Cols(2, "Banana", "Fruit", 0.8));
+ write.write(createRow4Cols(3, "Carrot", "Vegetable", 0.6));
+ write.write(createRow4Cols(4, "Broccoli", "Vegetable", 1.2));
+ write.write(createRow4Cols(5, "Chicken", "Meat", 5.0));
+ write.write(createRow4Cols(6, "Beef", "Meat", 8.0));
commit.commit(0, write.prepareCommit(true, 0));
}
@@ -198,6 +220,140 @@ public class JavaPyE2ETest {
"6, Beef, Meat, 8.0");
}
+ @Test
+ @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+ public void testPKDeletionVectorWrite() throws Exception {
+ Consumer<Options> optionsSetter =
+ options -> {
+ // let level has many files
+ options.set(TARGET_FILE_SIZE, new MemorySize(1));
+ options.set(DELETION_VECTORS_ENABLED, true);
+ };
+ String tableName = "test_pk_dv";
+ Path tablePath = new Path(warehouse.toString() + "/default.db/" +
tableName);
+ FileStoreTable table = createFileStoreTable(optionsSetter, tablePath);
+ StreamTableWrite write = table.newWrite(commitUser);
+ IOManager ioManager = IOManager.create(tablePath.toString());
+ write.withIOManager(ioManager);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ write.write(createRow3Cols(1, 10, 100L));
+ write.write(createRow3Cols(2, 20, 200L));
+ write.write(createRow3Cols(1, 11, 101L));
+ commit.commit(0, write.prepareCommit(true, 0));
+
+ write.write(createRow3Cols(1, 10, 1000L));
+ write.write(createRow3Cols(2, 21, 201L));
+ write.write(createRow3Cols(2, 21, 2001L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ write.write(createRow3Cols(1, 11, 1001L));
+ write.write(createRow3Cols(2, 21, 20001L));
+ write.write(createRow3Cols(2, 22, 202L));
+ write.write(createRow3ColsWithKind(RowKind.DELETE, 1, 11, 1001L));
+ commit.commit(2, write.prepareCommit(true, 2));
+ write.write(createRow3ColsWithKind(RowKind.DELETE, 2, 20, 200L));
+ commit.commit(2, write.prepareCommit(true, 2));
+
+ // test result
+ Function<InternalRow, String> rowDataToString =
+ row ->
+ internalRowToString(
+ row,
+ DataTypes.ROW(
+ DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()));
+ List<String> result =
+ getResult(table.newRead(), table.newScan().plan().splits(),
rowDataToString);
+ assertThat(result)
+ .containsExactlyInAnyOrder("+I[1, 10, 1000]", "+I[2, 21,
20001]", "+I[2, 22, 202]");
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+ public void testPKDeletionVectorWriteMultiBatch() throws Exception {
+ Consumer<Options> optionsSetter =
+ options -> {
+ // let level has many files
+ options.set(TARGET_FILE_SIZE, new MemorySize(128 * 1024));
+ options.set(DELETION_VECTORS_ENABLED, true);
+ };
+ String tableName = "test_pk_dv_multi_batch";
+ Path tablePath = new Path(warehouse.toString() + "/default.db/" +
tableName);
+ FileStoreTable table = createFileStoreTable(optionsSetter, tablePath);
+ StreamTableWrite write = table.newWrite(commitUser);
+ IOManager ioManager = IOManager.create(tablePath.toString());
+ write.withIOManager(ioManager);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ // Write 10000 records
+ for (int i = 1; i <= 10000; i++) {
+ write.write(createRow3Cols(1, i * 10, (long) i * 100));
+ }
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ // Delete the 81930th record
+ write.write(createRow3ColsWithKind(RowKind.DELETE, 1, 81930, 819300L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ Function<InternalRow, String> rowDataToString =
+ row ->
+ internalRowToString(
+ row,
+ DataTypes.ROW(
+ DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()));
+ List<String> result =
+ getResult(table.newRead(), table.newScan().plan().splits(),
rowDataToString);
+
+ // Verify the count is 9999
+ assertThat(result).hasSize(9999);
+
+ assertThat(result).doesNotContain("+I[1, 81930, 819300]");
+
+ assertThat(result).contains("+I[1, 10, 100]");
+ assertThat(result).contains("+I[1, 100000, 1000000]");
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+ public void testPKDeletionVectorWriteMultiBatchRawConvertable() throws
Exception {
+ Consumer<Options> optionsSetter =
+ options -> {
+ options.set(DELETION_VECTORS_ENABLED, true);
+ };
+ String tableName = "test_pk_dv_raw_convertable";
+ Path tablePath = new Path(warehouse.toString() + "/default.db/" +
tableName);
+ FileStoreTable table = createFileStoreTable(optionsSetter, tablePath);
+ StreamTableWrite write = table.newWrite(commitUser);
+ IOManager ioManager = IOManager.create(tablePath.toString());
+ write.withIOManager(ioManager);
+ StreamTableCommit commit = table.newCommit(commitUser);
+
+ for (int i = 1; i <= 10000; i++) {
+ write.write(createRow3Cols(1, i * 10, (long) i * 100));
+ }
+ commit.commit(0, write.prepareCommit(false, 0));
+
+ write.write(createRow3ColsWithKind(RowKind.DELETE, 1, 81930, 819300L));
+ commit.commit(1, write.prepareCommit(true, 1));
+
+ Function<InternalRow, String> rowDataToString =
+ row ->
+ internalRowToString(
+ row,
+ DataTypes.ROW(
+ DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()));
+ List<String> result =
+ getResult(table.newRead(), table.newScan().plan().splits(),
rowDataToString);
+
+ assertThat(result).hasSize(9999);
+
+ assertThat(result).doesNotContain("+I[1, 81930, 819300]");
+
+ // Verify some sample records exist
+ assertThat(result).contains("+I[1, 10, 100]");
+ assertThat(result).contains("+I[1, 100000, 1000000]");
+ }
+
@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
public void testReadPkTable() throws Exception {
@@ -228,8 +384,39 @@ public class JavaPyE2ETest {
return new Identifier(database, tableName);
}
- private static InternalRow createRow(int id, String name, String category,
double value) {
+ protected FileStoreTable createFileStoreTable(Consumer<Options> configure,
Path tablePath)
+ throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()},
+ new String[] {"pt", "a", "b"});
+ Options options = new Options();
+ options.set(CoreOptions.PATH, tablePath.toString());
+ options.set(BUCKET, 1);
+ configure.accept(options);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), tablePath),
+ new Schema(
+ rowType.getFields(),
+ Collections.singletonList("pt"),
+ Arrays.asList("pt", "a"),
+ options.toMap(),
+ ""));
+ return new PrimaryKeyFileStoreTable(
+ FileIOFinder.find(tablePath), tablePath, tableSchema,
CatalogEnvironment.empty());
+ }
+
+ private static InternalRow createRow4Cols(int id, String name, String
category, double value) {
return GenericRow.of(
id, BinaryString.fromString(name),
BinaryString.fromString(category), value);
}
+
+ protected GenericRow createRow3Cols(Object... values) {
+ return GenericRow.of(values[0], values[1], values[2]);
+ }
+
+ protected GenericRow createRow3ColsWithKind(RowKind rowKind, Object...
values) {
+ return GenericRow.ofKind(rowKind, values[0], values[1], values[2]);
+ }
}
diff --git a/paimon-python/dev/cfg.ini b/paimon-python/dev/cfg.ini
index c90c2f61db..ce776f3a73 100644
--- a/paimon-python/dev/cfg.ini
+++ b/paimon-python/dev/cfg.ini
@@ -19,7 +19,7 @@
[flake8]
# We follow PEP 8 (https://www.python.org/dev/peps/pep-0008/) with one
exception: lines can be
# up to 100 characters in length, not 79.
-ignore=E226,E241,E305,E402,E722,E731,E741,W503,W504,F821
+ignore=E226,E241,E305,E402,E722,E731,E741,W503,W504,W293,F821
max-line-length=120
exclude=.tox/*,dev/*,build/*,dist/*
# autopep8 setting
diff --git a/paimon-python/dev/requirements.txt
b/paimon-python/dev/requirements.txt
index f5612b9811..40025a308f 100644
--- a/paimon-python/dev/requirements.txt
+++ b/paimon-python/dev/requirements.txt
@@ -36,6 +36,7 @@ polars==1.32.0; python_version>"3.8"
pyarrow==6.0.1; python_version < "3.8"
pyarrow>=16,<19; python_version >= "3.8" and python_version < "3.13"
pyarrow>=16,<19; python_version >= "3.13"
+pyroaring
ray==2.48.0
readerwriterlock==1.0.9
zstandard==0.19.0; python_version<"3.9"
diff --git a/paimon-python/dev/run_mixed_tests.sh
b/paimon-python/dev/run_mixed_tests.sh
index 404d7cb888..337b694e0c 100755
--- a/paimon-python/dev/run_mixed_tests.sh
+++ b/paimon-python/dev/run_mixed_tests.sh
@@ -168,13 +168,37 @@ run_java_read_test() {
return 1
fi
}
+run_pk_dv_test() {
+ echo -e "${YELLOW}=== Step 5: Running Primary Key & Deletion Vector Test
(testPKDeletionVectorWriteRead) ===${NC}"
+ cd "$PROJECT_ROOT"
+
+ # Run the specific Java test method
+ echo "Running Maven test for JavaPyE2ETest.testPKDeletionVectorWrite..."
+ if mvn test
-Dtest=org.apache.paimon.JavaPyE2ETest#testPKDeletionVectorWrite -pl
paimon-core -q -Drun.e2e.tests=true; then
+ echo -e "${GREEN}✓ Java test completed successfully${NC}"
+ else
+ echo -e "${RED}✗ Java test failed${NC}"
+ return 1
+ fi
+ cd "$PAIMON_PYTHON_DIR"
+ # Run the specific Python test method
+ echo "Running Python test for JavaPyReadWriteTest.test_pk_dv_read..."
+ if python -m pytest
java_py_read_write_test.py::JavaPyReadWriteTest::test_pk_dv_read -v; then
+ echo -e "${GREEN}✓ Python test completed successfully${NC}"
+ return 0
+ else
+ echo -e "${RED}✗ Python test failed${NC}"
+ return 1
+ fi
+}
# Main execution
main() {
local java_write_result=0
local python_read_result=0
local python_write_result=0
local java_read_result=0
+ local pk_dv_result=0
echo -e "${YELLOW}Starting mixed language test execution...${NC}"
echo ""
@@ -213,6 +237,10 @@ main() {
java_read_result=1
fi
+ # Run pk dv read test
+ if ! run_pk_dv_test; then
+ pk_dv_result=1
+ fi
echo ""
echo -e "${YELLOW}=== Test Results Summary ===${NC}"
@@ -240,12 +268,18 @@ main() {
echo -e "${RED}✗ Java Read Test (Parquet + Lance): FAILED${NC}"
fi
+ if [[ $pk_dv_result -eq 0 ]]; then
+ echo -e "${GREEN}✓ PK DV Test
(JavaPyReadWriteTest.testPKDeletionVectorWriteRead): PASSED${NC}"
+ else
+ echo -e "${RED}✗ PK DV Test
(JavaPyReadWriteTest.testPKDeletionVectorWriteRead): FAILED${NC}"
+ fi
+
echo ""
# Clean up warehouse directory after all tests
cleanup_warehouse
- if [[ $java_write_result -eq 0 && $python_read_result -eq 0 &&
$python_write_result -eq 0 && $java_read_result -eq 0 ]]; then
+ if [[ $java_write_result -eq 0 && $python_read_result -eq 0 &&
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0
]]; then
echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability
verified.${NC}"
return 0
else
diff --git a/paimon-python/pypaimon/deletionvectors/__init__.py
b/paimon-python/pypaimon/deletionvectors/__init__.py
new file mode 100644
index 0000000000..92b307357e
--- /dev/null
+++ b/paimon-python/pypaimon/deletionvectors/__init__.py
@@ -0,0 +1,27 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pypaimon.deletionvectors.deletion_vector import DeletionVector
+from pypaimon.deletionvectors.bitmap_deletion_vector import
BitmapDeletionVector
+from pypaimon.deletionvectors.apply_deletion_vector_reader import
ApplyDeletionVectorReader, ApplyDeletionRecordIterator
+
+__all__ = [
+ 'DeletionVector',
+ 'BitmapDeletionVector',
+ 'ApplyDeletionVectorReader',
+ 'ApplyDeletionRecordIterator'
+]
diff --git
a/paimon-python/pypaimon/deletionvectors/apply_deletion_vector_reader.py
b/paimon-python/pypaimon/deletionvectors/apply_deletion_vector_reader.py
new file mode 100644
index 0000000000..c9a143bdc9
--- /dev/null
+++ b/paimon-python/pypaimon/deletionvectors/apply_deletion_vector_reader.py
@@ -0,0 +1,129 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from typing import Optional
+
+import pyarrow
+from pyarrow import RecordBatch
+
+from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.read.reader.iface.record_iterator import RecordIterator
+from pypaimon.deletionvectors.deletion_vector import DeletionVector
+
+from pyroaring import BitMap
+
+from pypaimon.read.reader.iface.record_reader import RecordReader
+
+
+class ApplyDeletionVectorReader(RecordBatchReader):
+ """
+ A RecordReader which applies DeletionVector to filter records.
+ """
+
+ def __init__(self, reader: RecordReader, deletion_vector: DeletionVector):
+ """
+ Initialize an ApplyDeletionVectorReader.
+
+ Args:
+ reader: The underlying record reader.
+ deletion_vector: The deletion vector to apply.
+ """
+ self._reader = reader
+ self._deletion_vector = deletion_vector
+
+ def reader(self) -> RecordReader:
+ return self._reader
+
+ def deletion_vector(self) -> DeletionVector:
+ return self._deletion_vector
+
+ def read_arrow_batch(self) -> Optional[RecordBatch]:
+ self._reader: RecordBatchReader
+ arrow_batch = self._reader.read_arrow_batch()
+ if arrow_batch is None:
+ return None
+ # Remove the deleted rows from the batch
+ range_bitmap = BitMap(
+ range(self._reader.return_batch_pos() - arrow_batch.num_rows,
self._reader.return_batch_pos()))
+ intersection_bitmap = range_bitmap - self._deletion_vector.bit_map()
+ added_row_list = [x - (self._reader.return_batch_pos() -
arrow_batch.num_rows) for x in
+ list(intersection_bitmap)]
+ return arrow_batch.take(pyarrow.array(added_row_list,
type=pyarrow.int32()))
+
+ def read_batch(self) -> Optional[RecordIterator]:
+ """
+ Reads one batch with deletion vector applied.
+
+ Returns:
+ A RecordIterator with deletion filtering, or None if no more data.
+ """
+ batch = self._reader.read_batch()
+
+ if batch is None:
+ return None
+
+ return ApplyDeletionRecordIterator(batch, self._deletion_vector)
+
+ def close(self):
+ self._reader.close()
+
+
+class ApplyDeletionRecordIterator(RecordIterator):
+ """
+ A RecordIterator that wraps another RecordIterator and applies a
DeletionVector
+ to filter out deleted records.
+ """
+
+ def __init__(self, iterator: RecordIterator, deletion_vector:
DeletionVector):
+ """
+ Initialize an ApplyDeletionRecordIterator.
+
+ Args:
+ iterator: The underlying record iterator.
+ deletion_vector: The deletion vector to apply for filtering.
+ """
+ self._iterator = iterator
+ self._deletion_vector = deletion_vector
+
+ def iterator(self) -> RecordIterator:
+ return self._iterator
+
+ def deletion_vector(self) -> DeletionVector:
+ return self._deletion_vector
+
+ def returned_position(self) -> int:
+ return self._iterator.return_pos()
+
+ def next(self) -> Optional[object]:
+ """
+ Gets the next non-deleted record from the iterator.
+
+ This method skips over any records that are marked as deleted in the
+ deletion vector, returning only non-deleted records.
+
+ Returns:
+ The next non-deleted record, or None if no more records exist.
+ """
+ while True:
+ record = self._iterator.next()
+
+ if record is None:
+ return None
+
+ # Check if the current position is deleted
+ if not
self._deletion_vector.is_deleted(self._iterator.return_pos()):
+ return record
diff --git a/paimon-python/pypaimon/deletionvectors/bitmap_deletion_vector.py
b/paimon-python/pypaimon/deletionvectors/bitmap_deletion_vector.py
new file mode 100644
index 0000000000..0afbe642df
--- /dev/null
+++ b/paimon-python/pypaimon/deletionvectors/bitmap_deletion_vector.py
@@ -0,0 +1,165 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from pypaimon.deletionvectors.deletion_vector import DeletionVector
+import struct
+import zlib
+from pyroaring import BitMap
+
+
+class BitmapDeletionVector(DeletionVector):
+ """
+ A DeletionVector based on RoaringBitmap, it only supports files with row
count
+ not exceeding 2147483647 (max value for 32-bit integer).
+ """
+
+ MAGIC_NUMBER = 1581511376
+ MAGIC_NUMBER_SIZE_BYTES = 4
+ MAX_VALUE = 2147483647
+
+ def __init__(self, bitmap: BitMap = None):
+ """
+ Initialize a BitmapDeletionVector.
+
+ Args:
+ bitmap: Optional RoaringBitmap instance. If None, creates an empty
bitmap.
+ """
+ self._bitmap = bitmap if bitmap is not None else BitMap()
+
+ def delete(self, position: int) -> None:
+ """
+ Marks the row at the specified position as deleted.
+
+ Args:
+ position: The position of the row to be marked as deleted.
+ """
+ self._check_position(position)
+ self._bitmap.add(position)
+
+ def is_deleted(self, position: int) -> bool:
+ """
+ Checks if the row at the specified position is deleted.
+
+ Args:
+ position: The position of the row to check.
+
+ Returns:
+ True if the row is deleted, False otherwise.
+ """
+ self._check_position(position)
+ return position in self._bitmap
+
+ def is_empty(self) -> bool:
+ return len(self._bitmap) == 0
+
+ def get_cardinality(self) -> int:
+ """
+ Returns the number of distinct integers added to the DeletionVector.
+
+ Returns:
+ The number of deleted positions.
+ """
+ return len(self._bitmap)
+
+ def merge(self, deletion_vector: DeletionVector) -> None:
+ """
+ Merge another DeletionVector to this current one.
+
+ Args:
+ deletion_vector: The other DeletionVector to merge.
+ """
+ if isinstance(deletion_vector, BitmapDeletionVector):
+ self._bitmap |= deletion_vector._bitmap
+ else:
+ raise RuntimeError("Only instance with the same class type can be
merged.")
+
+ def serialize(self) -> bytes:
+ """
+ Serializes the deletion vector to bytes.
+
+ Returns:
+ The serialized bytes.
+ """
+ # Serialize the bitmap
+ bitmap_bytes = self._bitmap.serialize()
+
+ # Create the full data with magic number
+ magic_bytes = struct.pack('>I', self.MAGIC_NUMBER)
+ data = magic_bytes + bitmap_bytes
+
+ # Calculate size and checksum
+ size = len(data)
+ checksum = self._calculate_checksum(data)
+
+ # Pack: size (4 bytes) + data + checksum (4 bytes)
+ result = struct.pack('>I', size) + data + struct.pack('>I', checksum)
+
+ return result
+
+ @staticmethod
+ def deserialize_from_bytes(data: bytes) -> 'BitmapDeletionVector':
+ """
+ Deserializes a BitmapDeletionVector from bytes.
+
+ Args:
+ data: The serialized bytes (without magic number).
+
+ Returns:
+ A BitmapDeletionVector instance.
+ """
+ bitmap = BitMap.deserialize(data)
+ return BitmapDeletionVector(bitmap)
+
+ def bit_map(self):
+ return self._bitmap
+
+ def _check_position(self, position: int) -> None:
+ """
+ Checks if the position is valid.
+
+ Args:
+ position: The position to check.
+
+ Raises:
+ ValueError: If the position exceeds the maximum value.
+ """
+ if position > self.MAX_VALUE:
+ raise ValueError(
+ f"The file has too many rows, RoaringBitmap32 only supports
files "
+ f"with row count not exceeding {self.MAX_VALUE}."
+ )
+
+ @staticmethod
+ def _calculate_checksum(data: bytes) -> int:
+ """
+ Calculates CRC32 checksum for the given data.
+
+ Args:
+ data: The data to calculate checksum for.
+
+ Returns:
+ The CRC32 checksum as an unsigned 32-bit integer.
+ """
+ return zlib.crc32(data) & 0xffffffff
+
+ def __eq__(self, other):
+ if not isinstance(other, BitmapDeletionVector):
+ return False
+ return self._bitmap == other._bitmap
+
+ def __hash__(self):
+ return hash(tuple(self._bitmap))
diff --git a/paimon-python/pypaimon/deletionvectors/deletion_vector.py
b/paimon-python/pypaimon/deletionvectors/deletion_vector.py
new file mode 100644
index 0000000000..44c179653f
--- /dev/null
+++ b/paimon-python/pypaimon/deletionvectors/deletion_vector.py
@@ -0,0 +1,142 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from abc import ABC, abstractmethod
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.table.source.deletion_file import DeletionFile
+
+
+class DeletionVector(ABC):
+ """
+ The DeletionVector can efficiently record the positions of rows that are
deleted in a file,
+ which can then be used to filter out deleted rows when processing the file.
+ """
+
+ @abstractmethod
+ def bit_map(self):
+ """
+ Returns the bitmap of the DeletionVector.
+ """
+ pass
+
+ @abstractmethod
+ def delete(self, position: int) -> None:
+ """
+ Marks the row at the specified position as deleted.
+
+ Args:
+ position: The position of the row to be marked as deleted.
+ """
+ pass
+
+ @abstractmethod
+ def is_deleted(self, position: int) -> bool:
+ """
+ Checks if the row at the specified position is deleted.
+
+ Args:
+ position: The position of the row to check.
+
+ Returns:
+ True if the row is deleted, False otherwise.
+ """
+ pass
+
+ @abstractmethod
+ def is_empty(self) -> bool:
+ """
+ Determines if the deletion vector is empty, indicating no deletions.
+
+ Returns:
+ True if the deletion vector is empty, False if it contains
deletions.
+ """
+ pass
+
+ @abstractmethod
+ def get_cardinality(self) -> int:
+ """
+ Returns the number of distinct integers added to the DeletionVector.
+
+ Returns:
+ The number of deleted positions.
+ """
+ pass
+
+ @abstractmethod
+ def merge(self, deletion_vector: 'DeletionVector') -> None:
+ """
+ Merge another DeletionVector to this current one.
+
+ Args:
+ deletion_vector: The other DeletionVector to merge.
+ """
+ pass
+
+ def checked_delete(self, position: int) -> bool:
+ """
+ Marks the row at the specified position as deleted.
+
+ Args:
+ position: The position of the row to be marked as deleted.
+
+ Returns:
+ True if the added position wasn't already deleted. False otherwise.
+ """
+ if self.is_deleted(position):
+ return False
+ else:
+ self.delete(position)
+ return True
+
+ @staticmethod
+ def read(file_io: FileIO, deletion_file: DeletionFile) -> 'DeletionVector':
+ """
+ Read a DeletionVector from a file.
+ """
+ from pypaimon.deletionvectors.bitmap_deletion_vector import
BitmapDeletionVector
+
+ with file_io.new_input_stream(deletion_file.dv_index_path) as f:
+ f.seek(deletion_file.offset)
+
+ # Read bitmap length
+ bitmap_length_bytes = f.read(4)
+ bitmap_length = int.from_bytes(bitmap_length_bytes,
byteorder='big')
+
+ # Read magic number
+ magic_number_bytes = f.read(4)
+ magic_number = int.from_bytes(magic_number_bytes, byteorder='big')
+
+ if magic_number == BitmapDeletionVector.MAGIC_NUMBER:
+ if deletion_file.length is not None and bitmap_length !=
deletion_file.length:
+ raise RuntimeError(
+ f"Size not match, actual size: {bitmap_length},
expected size: {deletion_file.length}"
+ )
+
+ # Magic number has been read, read remaining bytes
+ remaining_bytes = bitmap_length -
BitmapDeletionVector.MAGIC_NUMBER_SIZE_BYTES
+ data = f.read(remaining_bytes)
+
+ # Skip CRC (4 bytes)
+ f.read(4)
+
+ return BitmapDeletionVector.deserialize_from_bytes(data)
+ else:
+ raise RuntimeError(
+ f"Invalid magic number: {magic_number}, "
+ f"expected: {BitmapDeletionVector.MAGIC_NUMBER}"
+ )
diff --git a/paimon-python/pypaimon/index/deletion_vector_meta.py
b/paimon-python/pypaimon/index/deletion_vector_meta.py
new file mode 100644
index 0000000000..d5bc3d7876
--- /dev/null
+++ b/paimon-python/pypaimon/index/deletion_vector_meta.py
@@ -0,0 +1,40 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass
+from typing import Optional
+
+
+@dataclass
+class DeletionVectorMeta:
+ """Metadata of deletion vector."""
+
+ data_file_name: str
+ offset: int
+ length: int
+ cardinality: Optional[int] = None
+
+ def __eq__(self, other):
+ if not isinstance(other, DeletionVectorMeta):
+ return False
+ return (self.data_file_name == other.data_file_name and
+ self.offset == other.offset and
+ self.length == other.length and
+ self.cardinality == other.cardinality)
+
+ def __hash__(self):
+ return hash((self.data_file_name, self.offset, self.length,
self.cardinality))
diff --git a/paimon-python/pypaimon/index/index_file_meta.py
b/paimon-python/pypaimon/index/index_file_meta.py
new file mode 100644
index 0000000000..a0cb7db46c
--- /dev/null
+++ b/paimon-python/pypaimon/index/index_file_meta.py
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass
+from typing import Optional, Dict
+
+from pypaimon.index.deletion_vector_meta import DeletionVectorMeta
+
+
+@dataclass
+class IndexFileMeta:
+ """Metadata of index file."""
+
+ index_type: str
+ file_name: str
+ file_size: int
+ row_count: int
+ dv_ranges: Optional[Dict[str, DeletionVectorMeta]] = None
+ external_path: Optional[str] = None
+
+ def __eq__(self, other):
+ if not isinstance(other, IndexFileMeta):
+ return False
+ return (self.index_type == other.index_type and
+ self.file_name == other.file_name and
+ self.file_size == other.file_size and
+ self.row_count == other.row_count and
+ self.dv_ranges == other.dv_ranges and
+ self.external_path == other.external_path)
+
+ def __hash__(self):
+ dv_ranges_tuple = tuple(sorted(self.dv_ranges.items())) if
self.dv_ranges else None
+ return hash((self.index_type, self.file_name, self.file_size,
+ self.row_count, dv_ranges_tuple, self.external_path))
diff --git a/paimon-python/pypaimon/manifest/index_manifest_entry.py
b/paimon-python/pypaimon/manifest/index_manifest_entry.py
new file mode 100644
index 0000000000..f7b5e399e0
--- /dev/null
+++ b/paimon-python/pypaimon/manifest/index_manifest_entry.py
@@ -0,0 +1,62 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass
+
+from pypaimon.index.index_file_meta import IndexFileMeta
+from pypaimon.table.row.generic_row import GenericRow
+
+
+@dataclass
+class IndexManifestEntry:
+ """Manifest entry for index file."""
+
+ kind: int # 0 for ADD, 1 for DELETE
+ partition: GenericRow
+ bucket: int
+ index_file: IndexFileMeta
+
+ def __eq__(self, other):
+ if not isinstance(other, IndexManifestEntry):
+ return False
+ return (self.kind == other.kind and
+ self.partition == other.partition and
+ self.bucket == other.bucket and
+ self.index_file == other.index_file)
+
+ def __hash__(self):
+ return hash((self.kind, tuple(self.partition.values),
+ self.bucket, self.index_file))
+
+
+INDEX_MANIFEST_ENTRY = {
+ "type": "record",
+ "name": "IndexManifestEntry",
+ "fields": [
+ {"name": "_VERSION", "type": "int"},
+ {"name": "_KIND", "type": "byte"},
+ {"name": "_PARTITION", "type": "bytes"},
+ {"name": "_BUCKET", "type": "int"},
+ {"name": "_INDEX_TYPE", "type": "string"},
+ {"name": "_FILE_NAME", "type": "string"},
+ {"name": "_FILE_SIZE", "type": "long"},
+ {"name": "_ROW_COUNT", "type": "long"},
+ {"name": "_DELETIONS_VECTORS_RANGES", "type": {"type": "array",
"elementType": "DeletionVectorMeta"}},
+ {"name": "_EXTERNAL_PATH", "type": ["null", "string"]},
+ {"name": "_GLOBAL_INDEX", "type": "GlobalIndexMeta"}
+ ]
+}
diff --git a/paimon-python/pypaimon/manifest/index_manifest_file.py
b/paimon-python/pypaimon/manifest/index_manifest_file.py
new file mode 100644
index 0000000000..cd66ee5fce
--- /dev/null
+++ b/paimon-python/pypaimon/manifest/index_manifest_file.py
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from io import BytesIO
+from typing import List
+
+import fastavro
+
+from pypaimon.index.deletion_vector_meta import DeletionVectorMeta
+from pypaimon.index.index_file_meta import IndexFileMeta
+from pypaimon.manifest.index_manifest_entry import IndexManifestEntry
+from pypaimon.table.row.generic_row import GenericRowDeserializer
+
+
+class IndexManifestFile:
+ """Index manifest file reader for reading index manifest entries."""
+
+ DELETION_VECTORS_INDEX = "DELETION_VECTORS"
+
+ def __init__(self, table):
+ from pypaimon.table.file_store_table import FileStoreTable
+
+ self.table: FileStoreTable = table
+ manifest_path = table.table_path.rstrip('/')
+ self.manifest_path = f"{manifest_path}/manifest"
+ self.file_io = table.file_io
+ self.partition_keys_fields = self.table.partition_keys_fields
+
+ def read(self, index_manifest_name: str) -> List[IndexManifestEntry]:
+ """Read index manifest entries from the specified index manifest
file."""
+ index_manifest_path = f"{self.manifest_path}/{index_manifest_name}"
+
+ if not self.file_io.exists(index_manifest_path):
+ return []
+
+ entries = []
+ try:
+ with self.file_io.new_input_stream(index_manifest_path) as
input_stream:
+ avro_bytes = input_stream.read()
+
+ buffer = BytesIO(avro_bytes)
+ reader = fastavro.reader(buffer)
+
+ for record in reader:
+ dv_list = record['_DELETIONS_VECTORS_RANGES']
+ file_dv_dict = {}
+ for dv_meta_record in dv_list:
+ dv_meta = DeletionVectorMeta(
+ data_file_name=dv_meta_record['f0'],
+ offset=dv_meta_record['f1'],
+ length=dv_meta_record['f2'],
+ cardinality=dv_meta_record.get('_CARDINALITY')
+ )
+ file_dv_dict[dv_meta.data_file_name] = dv_meta
+
+ # Create IndexFileMeta
+ index_file_meta = IndexFileMeta(
+ index_type=record['_INDEX_TYPE'],
+ file_name=record['_FILE_NAME'],
+ file_size=record['_FILE_SIZE'],
+ row_count=record['_ROW_COUNT'],
+ dv_ranges=file_dv_dict,
+ external_path=record['_EXTERNAL_PATH']
+ )
+
+ # Create IndexManifestEntry
+ entry = IndexManifestEntry(
+ kind=record['_KIND'],
+ partition=GenericRowDeserializer.from_bytes(
+ record['_PARTITION'],
+ self.partition_keys_fields
+ ),
+ bucket=record['_BUCKET'],
+ index_file=index_file_meta
+ )
+ entries.append(entry)
+
+ except Exception as e:
+ raise RuntimeError(f"Failed to read index manifest file
{index_manifest_path}: {e}") from e
+
+ return entries
diff --git a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
index 4b86265d77..aefff13ebd 100644
--- a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
@@ -29,7 +29,7 @@ from pypaimon.read.reader.iface.record_batch_reader import
RecordBatchReader
class ConcatBatchReader(RecordBatchReader):
def __init__(self, reader_suppliers: List[Callable]):
- self.queue = collections.deque(reader_suppliers)
+ self.queue: collections.deque[Callable] =
collections.deque(reader_suppliers)
self.current_reader: Optional[RecordBatchReader] = None
def read_arrow_batch(self) -> Optional[RecordBatch]:
diff --git a/paimon-python/pypaimon/read/reader/concat_record_reader.py
b/paimon-python/pypaimon/read/reader/concat_record_reader.py
index 75f6b09b09..1c71e27fff 100644
--- a/paimon-python/pypaimon/read/reader/concat_record_reader.py
+++ b/paimon-python/pypaimon/read/reader/concat_record_reader.py
@@ -26,7 +26,7 @@ from pypaimon.read.reader.iface.record_reader import
RecordReader
class ConcatRecordReader(RecordReader):
def __init__(self, reader_suppliers: List[Callable]):
- self.queue = collections.deque(reader_suppliers)
+ self.queue: collections.deque[Callable] =
collections.deque(reader_suppliers)
self.current_reader: Optional[RecordReader] = None
def read_batch(self) -> Optional[RecordIterator]:
diff --git a/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py
b/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py
index b7da3fc96b..ec3a1bc424 100644
--- a/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/iface/record_batch_reader.py
@@ -41,20 +41,25 @@ class RecordBatchReader(RecordReader):
Reads one batch. The method should return null when reaching the end
of the input.
"""
- def _read_next_df(self) -> Optional[polars.DataFrame]:
+ def return_batch_pos(self) -> int:
+ """
+ Returns the current batch position in the file.
+ """
+
+ def read_next_df(self) -> Optional[polars.DataFrame]:
arrow_batch = self.read_arrow_batch()
if arrow_batch is None:
return None
return polars.from_arrow(arrow_batch)
def tuple_iterator(self) -> Optional[Iterator[tuple]]:
- df = self._read_next_df()
+ df = self.read_next_df()
if df is None:
return None
return df.iter_rows()
def read_batch(self) -> Optional[RecordIterator[InternalRow]]:
- df = self._read_next_df()
+ df = self.read_next_df()
if df is None:
return None
return InternalRowWrapperIterator(df.iter_rows(), df.width)
@@ -71,3 +76,53 @@ class
InternalRowWrapperIterator(RecordIterator[InternalRow]):
return None
self._reused_row.replace(row_tuple)
return self._reused_row
+
+
+class RowPositionReader(RecordBatchReader):
+
+ def __init__(self, data_reader: RecordBatchReader):
+ self._data_reader = data_reader
+ self._row_iterator = RowPositionRecordIterator()
+ self.batch_pos = 0
+
+ def read_arrow_batch(self) -> Optional[RecordBatch]:
+ batch = self._data_reader.read_arrow_batch()
+ if batch is None:
+ return None
+ self.batch_pos += batch.num_rows
+ return batch
+
+ def return_batch_pos(self) -> int:
+ return self.batch_pos
+
+ def tuple_iterator(self) -> Optional[RecordIterator]:
+ return
self._row_iterator.replace_iterator(self._data_reader.tuple_iterator())
+
+ def close(self):
+ self._data_reader.close()
+
+
+class RowPositionRecordIterator(RecordIterator[tuple]):
+
+ def __init__(self):
+ self.reused_iterator: Optional[Iterator[tuple]] = None
+ self.pos = -1
+
+ def next(self) -> Optional[tuple]:
+ row_tuple = next(self.reused_iterator, None)
+ if row_tuple is None:
+ return None
+ self.pos += 1
+ return row_tuple
+
+ def return_pos(self) -> int:
+ return self.pos
+
+ def replace_iterator(self, iterator: Iterator[tuple]) ->
Optional[RecordIterator[tuple]]:
+ self.reused_iterator = iterator
+ if self.reused_iterator is None:
+ return None
+ return self
+
+ def __next__(self):
+ return self.next()
diff --git a/paimon-python/pypaimon/read/reader/iface/record_iterator.py
b/paimon-python/pypaimon/read/reader/iface/record_iterator.py
index 6684d8fd4c..5781b4994e 100644
--- a/paimon-python/pypaimon/read/reader/iface/record_iterator.py
+++ b/paimon-python/pypaimon/read/reader/iface/record_iterator.py
@@ -32,3 +32,8 @@ class RecordIterator(Generic[T], ABC):
"""
Gets the next record from the iterator. Returns null if this iterator
has no more elements.
"""
+
+ def return_pos(self) -> int:
+ """
+ Returns the current position of the file.
+ """
diff --git a/paimon-python/pypaimon/read/reader/iface/record_reader.py
b/paimon-python/pypaimon/read/reader/iface/record_reader.py
index 8be28a12e1..3627161677 100644
--- a/paimon-python/pypaimon/read/reader/iface/record_reader.py
+++ b/paimon-python/pypaimon/read/reader/iface/record_reader.py
@@ -26,13 +26,13 @@ T = TypeVar('T')
class RecordReader(Generic[T], ABC):
"""
- The reader that reads the batches of records as RecordIterator.
+ The reader that reads the batches of records.
"""
@abstractmethod
def read_batch(self) -> Optional[RecordIterator[T]]:
"""
- Reads one batch. The method should return null when reaching the end
of the input.
+ Reads one batch as a RecordIterator. The method should return null
when reaching the end of the input.
"""
@abstractmethod
diff --git a/paimon-python/pypaimon/read/reader/key_value_wrap_reader.py
b/paimon-python/pypaimon/read/reader/key_value_wrap_reader.py
index d8de247ec8..96a48afccb 100644
--- a/paimon-python/pypaimon/read/reader/key_value_wrap_reader.py
+++ b/paimon-python/pypaimon/read/reader/key_value_wrap_reader.py
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-from typing import Iterator, Optional
+from typing import Iterator, Optional, Union
from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
from pypaimon.read.reader.iface.record_iterator import RecordIterator
@@ -53,7 +53,7 @@ class KeyValueWrapIterator(RecordIterator[KeyValue]):
def __init__(
self,
- iterator: Iterator,
+ iterator: Union[Iterator, RecordIterator],
reused_kv: KeyValue
):
self.iterator = iterator
@@ -65,3 +65,6 @@ class KeyValueWrapIterator(RecordIterator[KeyValue]):
return None
self.reused_kv.replace(row_tuple)
return self.reused_kv
+
+ def return_pos(self) -> int:
+ return self.iterator.return_pos()
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index e35249bd05..7432e56b84 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -17,10 +17,13 @@ limitations under the License.
"""
import os
from collections import defaultdict
-from typing import Callable, List, Optional
+from typing import Callable, List, Optional, Dict, Set
from pypaimon.common.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
+from pypaimon.table.source.deletion_file import DeletionFile
+from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.manifest.index_manifest_file import IndexManifestFile
from pypaimon.manifest.manifest_file_manager import ManifestFileManager
from pypaimon.manifest.manifest_list_manager import ManifestListManager
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
@@ -64,6 +67,7 @@ class FullStartingScanner(StartingScanner):
self.only_read_real_buckets = True if int(
self.table.options.get('bucket', -1)) ==
BucketMode.POSTPONE_BUCKET.value else False
self.data_evolution =
self.table.options.get(CoreOptions.DATA_EVOLUTION_ENABLED, 'false').lower() ==
'true'
+ self.deletion_vectors_enabled =
self.table.options.get('deletion-vectors.enabled', 'false').lower() == 'true'
def schema_fields_func(schema_id: int):
return self.table.schema_manager.get_schema(schema_id).fields
@@ -77,12 +81,24 @@ class FullStartingScanner(StartingScanner):
file_entries = self.plan_files()
if not file_entries:
return Plan([])
+
+ # Get deletion files map if deletion vectors are enabled.
+ # {partition-bucket -> {filename -> DeletionFile}}
+ deletion_files_map: dict[tuple, dict[str, DeletionFile]] = {}
+ if self.deletion_vectors_enabled:
+ latest_snapshot = self.snapshot_manager.get_latest_snapshot()
+ # Extract unique partition-bucket pairs from file entries
+ buckets = set()
+ for entry in file_entries:
+ buckets.add((tuple(entry.partition.values), entry.bucket))
+ deletion_files_map = self._scan_dv_index(latest_snapshot, buckets)
+
if self.table.is_primary_key_table:
- splits = self._create_primary_key_splits(file_entries)
+ splits = self._create_primary_key_splits(file_entries,
deletion_files_map)
elif self.data_evolution:
- splits = self._create_data_evolution_splits(file_entries)
+ splits = self._create_data_evolution_splits(file_entries,
deletion_files_map)
else:
- splits = self._create_append_only_splits(file_entries)
+ splits = self._create_append_only_splits(file_entries,
deletion_files_map)
splits = self._apply_push_down_limit(splits)
return Plan(splits)
@@ -286,7 +302,8 @@ class FullStartingScanner(StartingScanner):
return False
if self.partition_key_predicate and not
self.partition_key_predicate.test(entry.partition):
return False
-
+ if self.deletion_vectors_enabled and entry.file.level == 0: # do not
read level 0 file
+ return False
# Get SimpleStatsEvolution for this schema
evolution =
self.simple_stats_evolutions.get_or_create(entry.file.schema_id)
@@ -315,7 +332,95 @@ class FullStartingScanner(StartingScanner):
entry.file.row_count
)
- def _create_append_only_splits(self, file_entries: List[ManifestEntry]) ->
List['Split']:
+ def _scan_dv_index(self, snapshot, buckets: Set[tuple]) -> Dict[tuple,
Dict[str, DeletionFile]]:
+ """
+ Scan deletion vector index from snapshot.
+ Returns a map of (partition, bucket) -> {filename -> DeletionFile}
+
+ Reference: SnapshotReaderImpl.scanDvIndex() in Java
+ """
+ if not snapshot or not snapshot.index_manifest:
+ return {}
+
+ result = {}
+
+ # Read index manifest file
+ index_manifest_file = IndexManifestFile(self.table)
+ index_entries = index_manifest_file.read(snapshot.index_manifest)
+
+ # Filter by DELETION_VECTORS_INDEX type and requested buckets
+ for entry in index_entries:
+ if entry.index_file.index_type !=
IndexManifestFile.DELETION_VECTORS_INDEX:
+ continue
+
+ partition_bucket = (tuple(entry.partition.values), entry.bucket)
+ if partition_bucket not in buckets:
+ continue
+
+ # Convert to deletion files
+ deletion_files = self._to_deletion_files(entry)
+ if deletion_files:
+ result[partition_bucket] = deletion_files
+
+ return result
+
+ def _to_deletion_files(self, index_entry) -> Dict[str, DeletionFile]:
+ """
+ Convert index manifest entry to deletion files map.
+ Returns {filename -> DeletionFile}
+ """
+ deletion_files = {}
+ index_file = index_entry.index_file
+
+ # Check if dv_ranges exists
+ if not index_file.dv_ranges:
+ return deletion_files
+
+ # Build deletion file path
+ # Format: manifest/index-manifest-{uuid}
+ index_path = self.table.table_path.rstrip('/') + '/index'
+ dv_file_path = f"{index_path}/{index_file.file_name}"
+
+ # Convert each DeletionVectorMeta to DeletionFile
+ for data_file_name, dv_meta in index_file.dv_ranges.items():
+ deletion_file = DeletionFile(
+ dv_index_path=dv_file_path,
+ offset=dv_meta.offset,
+ length=dv_meta.length,
+ cardinality=dv_meta.cardinality
+ )
+ deletion_files[data_file_name] = deletion_file
+
+ return deletion_files
+
+ def _get_deletion_files_for_split(self, data_files: List[DataFileMeta],
+ deletion_files_map: dict,
+ partition: GenericRow,
+ bucket: int) ->
Optional[List[DeletionFile]]:
+ """
+ Get deletion files for the given data files in a split.
+ """
+ if not deletion_files_map:
+ return None
+
+ partition_key = (tuple(partition.values), bucket)
+ file_deletion_map = deletion_files_map.get(partition_key, {})
+
+ if not file_deletion_map:
+ return None
+
+ deletion_files = []
+ for data_file in data_files:
+ deletion_file = file_deletion_map.get(data_file.file_name)
+ if deletion_file:
+ deletion_files.append(deletion_file)
+ else:
+ deletion_files.append(None)
+
+ return deletion_files if any(df is not None for df in deletion_files)
else None
+
+ def _create_append_only_splits(
+ self, file_entries: List[ManifestEntry], deletion_files_map: dict
= None) -> List['Split']:
partitioned_files = defaultdict(list)
for entry in file_entries:
partitioned_files[(tuple(entry.partition.values),
entry.bucket)].append(entry)
@@ -335,12 +440,13 @@ class FullStartingScanner(StartingScanner):
packed_files: List[List[DataFileMeta]] =
self._pack_for_ordered(data_files, weight_func,
self.target_split_size)
- splits += self._build_split_from_pack(packed_files, file_entries,
False)
+ splits += self._build_split_from_pack(packed_files, file_entries,
False, deletion_files_map)
if self.idx_of_this_subtask is not None:
self._compute_split_start_end_row(splits, plan_start_row,
plan_end_row)
return splits
- def _create_primary_key_splits(self, file_entries: List[ManifestEntry]) ->
List['Split']:
+ def _create_primary_key_splits(
+ self, file_entries: List[ManifestEntry], deletion_files_map: dict
= None) -> List['Split']:
if self.idx_of_this_subtask is not None:
file_entries = self._primary_key_filter_by_shard(file_entries)
partitioned_files = defaultdict(list)
@@ -368,10 +474,11 @@ class FullStartingScanner(StartingScanner):
[file for sub_pack in pack for file in sub_pack]
for pack in packed_files
]
- splits += self._build_split_from_pack(flatten_packed_files,
file_entries, True)
+ splits += self._build_split_from_pack(flatten_packed_files,
file_entries, True, deletion_files_map)
return splits
- def _create_data_evolution_splits(self, file_entries: List[ManifestEntry])
-> List['Split']:
+ def _create_data_evolution_splits(
+ self, file_entries: List[ManifestEntry], deletion_files_map: dict
= None) -> List['Split']:
def sort_key(manifest_entry: ManifestEntry) -> tuple:
first_row_id = manifest_entry.file.first_row_id if
manifest_entry.file.first_row_id is not None else float(
'-inf')
@@ -413,7 +520,7 @@ class FullStartingScanner(StartingScanner):
for pack in packed_files
]
- splits += self._build_split_from_pack(flatten_packed_files,
sorted_entries, False)
+ splits += self._build_split_from_pack(flatten_packed_files,
sorted_entries, False, deletion_files_map)
if self.idx_of_this_subtask is not None:
self._compute_split_start_end_row(splits, plan_start_row,
plan_end_row)
@@ -460,7 +567,8 @@ class FullStartingScanner(StartingScanner):
return split_by_row_id
- def _build_split_from_pack(self, packed_files, file_entries,
for_primary_key_split: bool) -> List['Split']:
+ def _build_split_from_pack(self, packed_files, file_entries,
for_primary_key_split: bool,
+ deletion_files_map: dict = None) ->
List['Split']:
splits = []
for file_group in packed_files:
raw_convertible = True
@@ -479,6 +587,16 @@ class FullStartingScanner(StartingScanner):
total_record_count += data_file.row_count
if file_paths:
+ # Get deletion files for this split
+ data_deletion_files = None
+ if deletion_files_map:
+ data_deletion_files = self._get_deletion_files_for_split(
+ file_group,
+ deletion_files_map,
+ file_entries[0].partition,
+ file_entries[0].bucket
+ )
+
split = Split(
files=file_group,
partition=file_entries[0].partition,
@@ -486,7 +604,8 @@ class FullStartingScanner(StartingScanner):
_file_paths=file_paths,
_row_count=total_record_count,
_file_size=total_file_size,
- raw_convertible=raw_convertible
+ raw_convertible=raw_convertible,
+ data_deletion_files=data_deletion_files
)
splits.append(split)
return splits
diff --git a/paimon-python/pypaimon/read/split.py
b/paimon-python/pypaimon/read/split.py
index f1ab5f3a5b..55cb955ad6 100644
--- a/paimon-python/pypaimon/read/split.py
+++ b/paimon-python/pypaimon/read/split.py
@@ -17,10 +17,11 @@
################################################################################
from dataclasses import dataclass
-from typing import List
+from typing import List, Optional
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.table.row.generic_row import GenericRow
+from pypaimon.table.source.deletion_file import DeletionFile
@dataclass
@@ -35,6 +36,7 @@ class Split:
split_start_row: int = None
split_end_row: int = None
raw_convertible: bool = False
+ data_deletion_files: Optional[List[DeletionFile]] = None
@property
def row_count(self) -> int:
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 94d832eff4..ed579a2a20 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -19,10 +19,12 @@
import os
from abc import ABC, abstractmethod
from functools import partial
-from typing import List, Optional, Tuple
+from typing import List, Optional, Tuple, Callable
from pypaimon.common.core_options import CoreOptions
from pypaimon.common.predicate import Predicate
+from pypaimon.deletionvectors import ApplyDeletionVectorReader
+from pypaimon.deletionvectors.deletion_vector import DeletionVector
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.read.interval_partition import IntervalPartition, SortedRun
from pypaimon.read.partition_info import PartitionInfo
@@ -39,7 +41,7 @@ from pypaimon.read.reader.format_avro_reader import
FormatAvroReader
from pypaimon.read.reader.format_blob_reader import FormatBlobReader
from pypaimon.read.reader.format_lance_reader import FormatLanceReader
from pypaimon.read.reader.format_pyarrow_reader import FormatPyArrowReader
-from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader,
RowPositionReader
from pypaimon.read.reader.iface.record_reader import RecordReader
from pypaimon.read.reader.key_value_unwrap_reader import \
KeyValueUnwrapRecordReader
@@ -70,6 +72,7 @@ class SplitRead(ABC):
if isinstance(self, MergeFileSplitRead):
self.read_fields = self._create_key_value_fields(read_type)
self.schema_id_2_fields = {}
+ self.deletion_file_readers = {}
def _push_down_predicate(self) -> Optional[Predicate]:
if self.predicate is None:
@@ -86,7 +89,8 @@ class SplitRead(ABC):
def create_reader(self) -> RecordReader:
"""Create a record reader for the given split."""
- def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
read_fields: List[str]):
+ def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
+ read_fields: List[str]) -> RecordBatchReader:
(read_file_fields, read_arrow_predicate) =
self._get_fields_and_predicate(file.schema_id, read_fields)
# Use external_path if available, otherwise use file_path
@@ -112,7 +116,7 @@ class SplitRead(ABC):
raise ValueError(f"Unexpected file format: {file_format}")
index_mapping = self.create_index_mapping()
- partition_info = self.create_partition_info()
+ partition_info = self._create_partition_info()
if for_merge_read:
return DataFileBatchReader(format_reader, index_mapping,
partition_info, self.trimmed_primary_key,
self.table.table_schema.fields)
@@ -254,7 +258,7 @@ class SplitRead(ABC):
return trimmed_mapping, trimmed_fields
- def create_partition_info(self):
+ def _create_partition_info(self):
if not self.table.partition_keys:
return None
partition_mapping = self._construct_partition_mapping()
@@ -281,17 +285,33 @@ class SplitRead(ABC):
return mapping
+ def _genarate_deletion_file_readers(self):
+ self.deletion_file_readers = {}
+ if self.split.data_deletion_files:
+ for data_file, deletion_file in zip(self.split.files,
self.split.data_deletion_files):
+ if deletion_file is not None:
+ # Create a callable method to read the deletion vector
+ self.deletion_file_readers[data_file.file_name] = lambda
df=deletion_file: DeletionVector.read(
+ self.table.file_io, df)
+
class RawFileSplitRead(SplitRead):
+ def raw_reader_supplier(self, file: DataFileMeta, dv_factory:
Optional[Callable] = None) -> RecordReader:
+ file_batch_reader = self.file_reader_supplier(file, False,
self._get_final_read_data_fields())
+ dv = dv_factory() if dv_factory else None
+ if dv:
+ return
ApplyDeletionVectorReader(RowPositionReader(file_batch_reader), dv)
+ else:
+ return file_batch_reader
def create_reader(self) -> RecordReader:
+ self._genarate_deletion_file_readers()
data_readers = []
for file in self.split.files:
supplier = partial(
- self.file_reader_supplier,
+ self.raw_reader_supplier,
file=file,
- for_merge_read=False,
- read_fields=self._get_final_read_data_fields(),
+ dv_factory=self.deletion_file_readers.get(file.file_name, None)
)
data_readers.append(supplier)
@@ -312,26 +332,29 @@ class RawFileSplitRead(SplitRead):
class MergeFileSplitRead(SplitRead):
- def kv_reader_supplier(self, file):
- reader_supplier = partial(
- self.file_reader_supplier,
- file=file,
- for_merge_read=True,
- read_fields=self._get_final_read_data_fields()
- )
- return KeyValueWrapReader(reader_supplier(),
len(self.trimmed_primary_key), self.value_arity)
+ def kv_reader_supplier(self, file: DataFileMeta, dv_factory:
Optional[Callable] = None) -> RecordReader:
+ file_batch_reader = self.file_reader_supplier(file, True,
self._get_final_read_data_fields())
+ dv = dv_factory() if dv_factory else None
+ if dv:
+ return ApplyDeletionVectorReader(
+ KeyValueWrapReader(RowPositionReader(file_batch_reader),
+ len(self.trimmed_primary_key),
self.value_arity), dv)
+ else:
+ return KeyValueWrapReader(file_batch_reader,
len(self.trimmed_primary_key), self.value_arity)
- def section_reader_supplier(self, section: List[SortedRun]):
+ def section_reader_supplier(self, section: List[SortedRun]) ->
RecordReader:
readers = []
for sorter_run in section:
data_readers = []
for file in sorter_run.files:
- supplier = partial(self.kv_reader_supplier, file)
+ supplier = partial(self.kv_reader_supplier, file,
self.deletion_file_readers.get(file.file_name, None))
data_readers.append(supplier)
readers.append(ConcatRecordReader(data_readers))
return SortMergeReaderWithMinHeap(readers, self.table.table_schema)
def create_reader(self) -> RecordReader:
+ # Create a dict mapping data file name to deletion file reader method
+ self._genarate_deletion_file_readers()
section_readers = []
sections = IntervalPartition(self.split.files).partition()
for section in sections:
diff --git a/paimon-python/pypaimon/table/row/generic_row.py
b/paimon-python/pypaimon/table/row/generic_row.py
index b05e475951..be5c1ec80f 100644
--- a/paimon-python/pypaimon/table/row/generic_row.py
+++ b/paimon-python/pypaimon/table/row/generic_row.py
@@ -51,6 +51,10 @@ class GenericRow(InternalRow):
def __len__(self) -> int:
return len(self.values)
+ def __str__(self):
+ field_strs = [f"{field.name}={repr(value)}" for field, value in
zip(self.fields, self.values)]
+ return f"GenericRow(row_kind={self.row_kind.name}, {',
'.join(field_strs)})"
+
class GenericRowDeserializer:
HEADER_SIZE_IN_BITS = 8
diff --git a/paimon-python/pypaimon/table/source/deletion_file.py
b/paimon-python/pypaimon/table/source/deletion_file.py
new file mode 100644
index 0000000000..cc05ad6ed7
--- /dev/null
+++ b/paimon-python/pypaimon/table/source/deletion_file.py
@@ -0,0 +1,49 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from dataclasses import dataclass
+from typing import Optional
+
+
+@dataclass
+class DeletionFile:
+ """
+ Deletion file for data file, the first 4 bytes are length, the following
is the bitmap content.
+
+ - The first 4 bytes are length, should equal to length().
+ - Next 4 bytes are the magic number, should be equal to 1581511376.
+ - The remaining content should be a RoaringBitmap.
+ """
+ dv_index_path: str # The file where the vector for data file is located
+ offset: int # The offset where the vector for data file is located in the
dv index file
+ length: int
+ cardinality: Optional[int] = None
+
+ def __eq__(self, other):
+ if not isinstance(other, DeletionFile):
+ return False
+ return (self.dv_index_path == other.dv_index_path and
+ self.offset == other.offset and
+ self.length == other.length and
+ self.cardinality == other.cardinality)
+
+ def __hash__(self):
+ return hash((self.dv_index_path, self.offset, self.length,
self.cardinality))
+
+ def __str__(self):
+ return (f"DeletionFile(path='{self.dv_index_path}',
offset={self.offset}, "
+ f"length={self.length}, cardinality={self.cardinality})")
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index 3213ec87d3..b88224784c 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -180,3 +180,72 @@ class JavaPyReadWriteTest(unittest.TestCase):
# Note: Normal read filters out system fields, so we verify through
Java read
# which explicitly reads KeyValue objects and checks valueKind
print(f"Format: {file_format}, Python read completed. ValueKind
verification should be done in Java test.")
+
+ def test_pk_dv_read(self):
+ pa_schema = pa.schema([
+ pa.field('pt', pa.int32(), nullable=False),
+ pa.field('a', pa.int32(), nullable=False),
+ ('b', pa.int64())
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema,
+ partition_keys=['pt'],
+ primary_keys=['pt', 'a'],
+ options={'bucket': '1'})
+ self.catalog.create_table('default.test_pk_dv', schema, True)
+ table = self.catalog.get_table('default.test_pk_dv')
+ read_builder = table.new_read_builder()
+ table_read = read_builder.new_read()
+ splits = read_builder.new_scan().plan().splits()
+ actual = table_read.to_arrow(splits).sort_by('pt')
+ expected = pa.Table.from_pydict({
+ 'pt': [1, 2, 2],
+ 'a': [10, 21, 22],
+ 'b': [1000, 20001, 202]
+ }, schema=pa_schema)
+ self.assertEqual(expected, actual)
+
+ def test_pk_dv_read_multi_batch(self):
+ pa_schema = pa.schema([
+ pa.field('pt', pa.int32(), nullable=False),
+ pa.field('a', pa.int32(), nullable=False),
+ ('b', pa.int64())
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema,
+ partition_keys=['pt'],
+ primary_keys=['pt', 'a'],
+ options={'bucket': '1'})
+ self.catalog.create_table('default.test_pk_dv_multi_batch', schema,
True)
+ table = self.catalog.get_table('default.test_pk_dv_multi_batch')
+ read_builder = table.new_read_builder()
+ table_read = read_builder.new_read()
+ splits = read_builder.new_scan().plan().splits()
+ actual = table_read.to_arrow(splits).sort_by('pt')
+ expected = pa.Table.from_pydict({
+ 'pt': [1] * 9999,
+ 'a': [i * 10 for i in range(1, 10001) if i * 10 != 81930],
+ 'b': [i * 100 for i in range(1, 10001) if i * 10 != 81930]
+ }, schema=pa_schema)
+ self.assertEqual(expected, actual)
+
+ def test_pk_dv_read_multi_batch_raw_convertable(self):
+ pa_schema = pa.schema([
+ pa.field('pt', pa.int32(), nullable=False),
+ pa.field('a', pa.int32(), nullable=False),
+ ('b', pa.int64())
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema,
+ partition_keys=['pt'],
+ primary_keys=['pt', 'a'],
+ options={'bucket': '1'})
+ self.catalog.create_table('default.test_pk_dv_raw_convertable',
schema, True)
+ table = self.catalog.get_table('default.test_pk_dv_raw_convertable')
+ read_builder = table.new_read_builder()
+ table_read = read_builder.new_read()
+ splits = read_builder.new_scan().plan().splits()
+ actual = table_read.to_arrow(splits).sort_by('pt')
+ expected = pa.Table.from_pydict({
+ 'pt': [1] * 9999,
+ 'a': [i * 10 for i in range(1, 10001) if i * 10 != 81930],
+ 'b': [i * 100 for i in range(1, 10001) if i * 10 != 81930]
+ }, schema=pa_schema)
+ self.assertEqual(expected, actual)