This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 327fe5cad0 [python] Implement BTree Reader in Python (#7160)
327fe5cad0 is described below
commit 327fe5cad0571f7934ead6494b7459da11b499c7
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat Jan 31 18:06:26 2026 +0800
[python] Implement BTree Reader in Python (#7160)
### Purpose
This PR is for implementing BTree reader in Python. This PR also fixed
filter for data evolution read, should never push down filter to read.
- Only indexing of the String type has been implemented
- Compression is not implemented, and an error will be reported if block
has been compressed
### Tests
- Java write and Python read e2e test.
---
.../test/java/org/apache/paimon/JavaPyE2ETest.java | 84 +++-
paimon-python/dev/run_mixed_tests.sh | 43 +-
.../pypaimon/globalindex/btree/__init__.py | 25 ++
.../globalindex/btree/block_aligned_type.py | 36 ++
.../pypaimon/globalindex/btree/block_entry.py | 55 +++
.../pypaimon/globalindex/btree/block_handle.py | 41 ++
.../pypaimon/globalindex/btree/block_reader.py | 254 ++++++++++++
.../globalindex/btree/btree_file_footer.py | 124 ++++++
.../pypaimon/globalindex/btree/btree_index_meta.py | 63 +++
.../globalindex/btree/btree_index_reader.py | 460 +++++++++++++++++++++
.../pypaimon/globalindex/btree/btree_reader.py | 164 ++++++++
.../pypaimon/globalindex/btree/key_serializer.py | 231 +++++++++++
.../globalindex/btree/memory_slice_input.py | 162 ++++++++
.../pypaimon/globalindex/btree/sst_file_reader.py | 194 +++++++++
.../pypaimon/globalindex/global_index_evaluator.py | 2 +-
.../globalindex/global_index_scan_builder.py | 11 +
.../pypaimon/globalindex/roaring_bitmap.py | 6 +
paimon-python/pypaimon/read/table_read.py | 2 +-
.../pypaimon/tests/e2e/java_py_read_write_test.py | 28 ++
19 files changed, 1981 insertions(+), 4 deletions(-)
diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index 99e14a010f..238f4c08e4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -30,19 +30,27 @@ import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
+import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.PrimaryKeyFileStoreTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.InnerTableCommit;
import org.apache.paimon.table.sink.StreamTableCommit;
import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.TableRead;
import org.apache.paimon.types.DataType;
@@ -66,7 +74,11 @@ import java.util.function.Consumer;
import java.util.function.Function;
import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.DATA_EVOLUTION_ENABLED;
import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
+import static org.apache.paimon.CoreOptions.GLOBAL_INDEX_ENABLED;
+import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.CoreOptions.ROW_TRACKING_ENABLED;
import static org.apache.paimon.CoreOptions.TARGET_FILE_SIZE;
import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
import static org.apache.paimon.table.SimpleTableTestBase.getResult;
@@ -396,6 +408,76 @@ public class JavaPyE2ETest {
}
}
+ @Test
+ @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+ public void testBtreeIndexWrite() throws Exception {
+ // create table
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.STRING(),
DataTypes.STRING()},
+ new String[] {"k", "v"});
+ Options options = new Options();
+ Path tablePath = new Path(warehouse.toString() +
"/default.db/test_btree_index");
+ options.set(PATH, tablePath.toString());
+ options.set(ROW_TRACKING_ENABLED, true);
+ options.set(DATA_EVOLUTION_ENABLED, true);
+ options.set(GLOBAL_INDEX_ENABLED, true);
+ TableSchema tableSchema =
+ SchemaUtils.forceCommit(
+ new SchemaManager(LocalFileIO.create(), tablePath),
+ new Schema(
+ rowType.getFields(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ options.toMap(),
+ ""));
+ AppendOnlyFileStoreTable table =
+ new AppendOnlyFileStoreTable(
+ FileIOFinder.find(tablePath),
+ tablePath,
+ tableSchema,
+ CatalogEnvironment.empty());
+
+ // write data
+ BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+ try (BatchTableWrite write = writeBuilder.newWrite();
+ BatchTableCommit commit = writeBuilder.newCommit()) {
+ write.write(
+ GenericRow.of(BinaryString.fromString("k1"),
BinaryString.fromString("v1")));
+ write.write(
+ GenericRow.of(BinaryString.fromString("k2"),
BinaryString.fromString("v2")));
+ write.write(
+ GenericRow.of(BinaryString.fromString("k3"),
BinaryString.fromString("v3")));
+ commit.commit(write.prepareCommit());
+ }
+
+ // build index
+ BTreeGlobalIndexBuilder builder =
+ new
BTreeGlobalIndexBuilder(table).withIndexType("btree").withIndexField("k");
+ try (BatchTableCommit commit = writeBuilder.newCommit()) {
+ commit.commit(builder.build(builder.scan(),
IOManager.create(warehouse.toString())));
+ }
+
+ // assert index
+ List<IndexManifestEntry> indexEntries =
+
table.indexManifestFileReader().read(table.latestSnapshot().get().indexManifest);
+ assertThat(indexEntries)
+ .singleElement()
+ .matches(entry -> entry.indexFile().rowCount() == 3);
+
+ // read index
+ PredicateBuilder predicateBuilder = new
PredicateBuilder(table.rowType());
+ ReadBuilder readBuilder =
+ table.newReadBuilder()
+ .withFilter(predicateBuilder.equal(0,
BinaryString.fromString("k2")));
+ List<String> result = new ArrayList<>();
+ readBuilder
+ .newRead()
+ .createReader(readBuilder.newScan().plan())
+ .forEachRemaining(r -> result.add(r.getString(0) + ":" +
r.getString(1)));
+ assertThat(result).containsOnly("k2:v2");
+ }
+
// Helper method from TableTestBase
protected Identifier identifier(String tableName) {
return new Identifier(database, tableName);
@@ -408,7 +490,7 @@ public class JavaPyE2ETest {
new DataType[] {DataTypes.INT(), DataTypes.INT(),
DataTypes.BIGINT()},
new String[] {"pt", "a", "b"});
Options options = new Options();
- options.set(CoreOptions.PATH, tablePath.toString());
+ options.set(PATH, tablePath.toString());
options.set(BUCKET, 1);
configure.accept(options);
TableSchema tableSchema =
diff --git a/paimon-python/dev/run_mixed_tests.sh
b/paimon-python/dev/run_mixed_tests.sh
index cd39c6e219..ed7f54e014 100755
--- a/paimon-python/dev/run_mixed_tests.sh
+++ b/paimon-python/dev/run_mixed_tests.sh
@@ -252,6 +252,32 @@ run_faiss_vector_test() {
return 1
fi
}
+
+# Function to run BTree index test (Java write, Python read)
+run_btree_index_test() {
+ echo -e "${YELLOW}=== Step 6: Running BTree Index Test (Java Write, Python
Read) ===${NC}"
+
+ cd "$PROJECT_ROOT"
+
+ echo "Running Maven test for JavaPyE2ETest.testBtreeIndexWrite..."
+ if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testBtreeIndexWrite -pl
paimon-core -q -Drun.e2e.tests=true; then
+ echo -e "${GREEN}✓ Java test completed successfully${NC}"
+ else
+ echo -e "${RED}✗ Java test failed${NC}"
+ return 1
+ fi
+ cd "$PAIMON_PYTHON_DIR"
+ # Run the specific Python test method
+ echo "Running Python test for
JavaPyReadWriteTest.test_read_btree_index_table..."
+ if python -m pytest
java_py_read_write_test.py::JavaPyReadWriteTest::test_read_btree_index_table
-v; then
+ echo -e "${GREEN}✓ Python test completed successfully${NC}"
+ return 0
+ else
+ echo -e "${RED}✗ Python test failed${NC}"
+ return 1
+ fi
+}
+
# Main execution
main() {
local java_write_result=0
@@ -260,6 +286,7 @@ main() {
local java_read_result=0
local pk_dv_result=0
local faiss_vector_result=0
+ local btree_index_result=0
echo -e "${YELLOW}Starting mixed language test execution...${NC}"
echo ""
@@ -311,6 +338,14 @@ main() {
fi
echo ""
+
+ # Run BTree index test (Java write, Python read)
+ if ! run_btree_index_test; then
+ btree_index_result=1
+ fi
+
+ echo ""
+
echo -e "${YELLOW}=== Test Results Summary ===${NC}"
if [[ $java_write_result -eq 0 ]]; then
@@ -349,12 +384,18 @@ main() {
echo -e "${RED}✗ FAISS Vector Index Test (Java Write, Python Read):
FAILED${NC}"
fi
+ if [[ $btree_index_result -eq 0 ]]; then
+ echo -e "${GREEN}✓ BTree Index Test (Java Write, Python Read):
PASSED${NC}"
+ else
+ echo -e "${RED}✗ BTree Index Test (Java Write, Python Read):
FAILED${NC}"
+ fi
+
echo ""
# Clean up warehouse directory after all tests
cleanup_warehouse
- if [[ $java_write_result -eq 0 && $python_read_result -eq 0 &&
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 &&
$faiss_vector_result -eq 0 ]]; then
+ if [[ $java_write_result -eq 0 && $python_read_result -eq 0 &&
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 &&
$faiss_vector_result -eq 0 && $btree_index_result -eq 0 ]]; then
echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability
verified.${NC}"
return 0
else
diff --git a/paimon-python/pypaimon/globalindex/btree/__init__.py
b/paimon-python/pypaimon/globalindex/btree/__init__.py
new file mode 100644
index 0000000000..d8e84bcc95
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/__init__.py
@@ -0,0 +1,25 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""B-tree index implementation for global index."""
+
+from pypaimon.globalindex.btree.btree_index_reader import BTreeIndexReader
+from pypaimon.globalindex.btree.btree_index_meta import BTreeIndexMeta
+from pypaimon.globalindex.btree.key_serializer import KeySerializer
+
+__all__ = ['BTreeIndexReader', 'BTreeIndexMeta', 'KeySerializer']
diff --git a/paimon-python/pypaimon/globalindex/btree/block_aligned_type.py
b/paimon-python/pypaimon/globalindex/btree/block_aligned_type.py
new file mode 100644
index 0000000000..c90ca7b709
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/block_aligned_type.py
@@ -0,0 +1,36 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Aligned type for block."""
+
+from enum import Enum
+
+
+class BlockAlignedType(Enum):
+ """Aligned type for block."""
+
+ ALIGNED = 0
+ UNALIGNED = 1
+
+ @classmethod
+ def from_byte(cls, b: int) -> 'BlockAlignedType':
+ """Create BlockAlignedType from byte value."""
+ for aligned_type in cls:
+ if aligned_type.value == b:
+ return aligned_type
+ raise ValueError(f"Illegal block aligned type: {b}")
diff --git a/paimon-python/pypaimon/globalindex/btree/block_entry.py
b/paimon-python/pypaimon/globalindex/btree/block_entry.py
new file mode 100644
index 0000000000..9c399e8d7c
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/block_entry.py
@@ -0,0 +1,55 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Entry represents a key value."""
+
+
+class BlockEntry:
+ """Entry represents a key value."""
+
+ def __init__(self, key: bytes, value: bytes):
+ """Initialize BlockEntry.
+
+ Args:
+ key: The key bytes
+ value: The value bytes
+ """
+ if key is None:
+ raise ValueError("key is null")
+ if value is None:
+ raise ValueError("value is null")
+ self.key = key
+ self.value = value
+
+ def __eq__(self, other) -> bool:
+ """Check equality with another BlockEntry."""
+ if self is other:
+ return True
+ if other is None or not isinstance(other, BlockEntry):
+ return False
+ return self.key == other.key and self.value == other.value
+
+ def __hash__(self) -> int:
+ """Return hash of this BlockEntry."""
+ result = hash(self.key)
+ result = 31 * result + hash(self.value)
+ return result
+
+ def __repr__(self) -> str:
+ """Return string representation."""
+ return f"BlockEntry(key={self.key!r}, value={self.value!r})"
diff --git a/paimon-python/pypaimon/globalindex/btree/block_handle.py
b/paimon-python/pypaimon/globalindex/btree/block_handle.py
new file mode 100644
index 0000000000..a54ee2b176
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/block_handle.py
@@ -0,0 +1,41 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Entry represents a key value."""
+
+from dataclasses import dataclass
+
+
+@dataclass
+class BlockHandle:
+ """Handle for a data block."""
+
+ def __init__(self, offset: int, size: int):
+ """
+ Initialize the block handle.
+
+ Args:
+ offset: Offset of the block in the file
+ size: Size of the block in bytes
+ """
+ self.offset = offset
+ self.size = size
+
+ def is_null(self) -> bool:
+ """Check if this handle represents a null block."""
+ return self.offset == 0 and self.size == 0
diff --git a/paimon-python/pypaimon/globalindex/btree/block_reader.py
b/paimon-python/pypaimon/globalindex/btree/block_reader.py
new file mode 100644
index 0000000000..f0091d0231
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/block_reader.py
@@ -0,0 +1,254 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Reader for a block."""
+
+import struct
+from typing import Callable, Optional
+
+from pypaimon.globalindex.btree.block_aligned_type import BlockAlignedType
+from pypaimon.globalindex.btree.block_entry import BlockEntry
+from pypaimon.globalindex.btree.memory_slice_input import MemorySliceInput
+
+
+class BlockReader:
+ """Reader for a block."""
+
+ def __init__(
+ self,
+ block: bytes,
+ record_count: int,
+ comparator: Callable[[bytes, bytes], int]
+ ):
+ """Initialize BlockReader.
+
+ Args:
+ block: The block data bytes
+ record_count: Number of records in the block
+ comparator: Optional comparator function for keys
+ """
+ self.block = block
+ self.record_count = record_count
+ self.comparator = comparator
+
+ def block_input(self) -> MemorySliceInput:
+ """Create a MemorySliceInput for this block."""
+ return MemorySliceInput(self.block)
+
+ def iterator(self) -> 'BlockIterator':
+ """Create a BlockIterator for this reader."""
+ return BlockIterator(self)
+
+ def seek_to(self, record_position: int) -> int:
+ """Seek to slice position from record position.
+
+ Args:
+ record_position: The record position to seek to
+
+ Returns:
+ The slice position
+
+ Raises:
+ NotImplementedError: If not implemented in subclass
+ """
+ raise NotImplementedError("seekTo must be implemented in subclass")
+
+ @staticmethod
+ def create(
+ block: bytes,
+ comparator: Optional[Callable[[bytes, bytes], int]] = None
+ ) -> 'BlockReader':
+ """Create a BlockReader from block data.
+
+ Args:
+ block: The block data bytes
+ comparator: Optional comparator function for keys
+
+ Returns:
+ A BlockReader instance (AlignedBlockReader or UnalignedBlockReader)
+ """
+ # Read block trailer: last byte is aligned type, previous 4 bytes is
record size or index length
+ aligned_type_byte = block[-1]
+ aligned_type = BlockAlignedType.from_byte(aligned_type_byte)
+ int_value = struct.unpack('<I', block[-5:-1])[0]
+
+ if aligned_type == BlockAlignedType.ALIGNED:
+ # Aligned block: records have fixed size
+ data = block[:-5]
+ return AlignedBlockReader(data, int_value, comparator)
+ else:
+ # Unaligned block: uses index
+ index_length = int_value * 4
+ index_offset = len(block) - 5 - index_length
+ data = block[:index_offset]
+ index = block[index_offset:index_offset + index_length]
+ return UnalignedBlockReader(data, index, comparator)
+
+
+class AlignedBlockReader(BlockReader):
+ """Block reader for aligned blocks (fixed record size)."""
+
+ def __init__(
+ self,
+ data: bytes,
+ record_size: int,
+ comparator: Optional[Callable[[bytes, bytes], int]] = None
+ ):
+ """Initialize AlignedBlockReader.
+
+ Args:
+ data: The block data bytes
+ record_size: The fixed size of each record
+ comparator: Optional comparator function for keys
+ """
+ record_count = len(data) // record_size
+ super().__init__(data, record_count, comparator)
+ self.record_size = record_size
+
+ def seek_to(self, record_position: int) -> int:
+ """Seek to slice position from record position.
+
+ Args:
+ record_position: The record position to seek to
+
+ Returns:
+ The slice position
+ """
+ return record_position * self.record_size
+
+
+class UnalignedBlockReader(BlockReader):
+ """Block reader for unaligned blocks (uses index)."""
+
+ def __init__(
+ self,
+ data: bytes,
+ index: bytes,
+ comparator: Optional[Callable[[bytes, bytes], int]] = None
+ ):
+ """Initialize UnalignedBlockReader.
+
+ Args:
+ data: The block data bytes
+ index: The index bytes (4 bytes per record)
+ comparator: Optional comparator function for keys
+ """
+ record_count = len(index) // 4
+ super().__init__(data, record_count, comparator)
+ self.index = index
+
+ def seek_to(self, record_position: int) -> int:
+ """Seek to slice position from record position.
+
+ Args:
+ record_position: The record position to seek to
+
+ Returns:
+ The slice position
+ """
+ # Read 4-byte integer from index at record_position * 4
+ offset = record_position * 4
+ return struct.unpack('<I', self.index[offset:offset + 4])[0]
+
+
+class BlockIterator:
+ """Iterator for block entries."""
+
+ def __init__(self, reader: BlockReader):
+ """Initialize BlockIterator.
+
+ Args:
+ reader: The BlockReader to iterate over
+ """
+ self.reader = reader
+ self.input = reader.block_input()
+ self.polled: Optional[BlockEntry] = None
+
+ def __iter__(self):
+ """Return self as iterator."""
+ return self
+
+ def __next__(self) -> BlockEntry:
+ """Get next entry.
+
+ Returns:
+ The next BlockEntry
+
+ Raises:
+ StopIteration: If no more entries
+ """
+ if not self.has_next():
+ raise StopIteration
+
+ if self.polled is not None:
+ result = self.polled
+ self.polled = None
+ return result
+
+ return self.read_entry()
+
+ def has_next(self) -> bool:
+ """Check if there are more entries."""
+ return self.polled is not None or self.input.is_readable()
+
+ def seek_to(self, target_key: bytes) -> bool:
+ """Seek to the first key >= target_key using binary search.
+
+ Args:
+ target_key: The target key to seek to
+
+ Returns:
+ True if exact match found, False otherwise
+ """
+ left = 0
+ right = self.reader.record_count - 1
+
+ while left <= right:
+ mid = left + (right - left) // 2
+
+ self.input.set_position(self.reader.seek_to(mid))
+ mid_entry = self.read_entry()
+ compare = self.reader.comparator(mid_entry.key, target_key) if
self.reader.comparator else -1
+
+ if compare == 0:
+ self.polled = mid_entry
+ return True
+ elif compare > 0:
+ self.polled = mid_entry
+ right = mid - 1
+ else:
+ self.polled = None
+ left = mid + 1
+
+ return False
+
+ def read_entry(self) -> BlockEntry:
+ """Read a key-value entry.
+
+ Returns:
+ A BlockEntry containing key and value
+ """
+ # Read key
+ key_length = self.input.read_var_len_int()
+ key = self.input.read_slice(key_length)
+
+ # Read value
+ value_length = self.input.read_var_len_int()
+ value = self.input.read_slice(value_length)
+
+ return BlockEntry(key, value)
diff --git a/paimon-python/pypaimon/globalindex/btree/btree_file_footer.py
b/paimon-python/pypaimon/globalindex/btree/btree_file_footer.py
new file mode 100644
index 0000000000..7afa3694fd
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/btree_file_footer.py
@@ -0,0 +1,124 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""B-tree file footer for storing index metadata."""
+
+from dataclasses import dataclass
+from typing import Optional
+import struct
+
+from pypaimon.globalindex.btree.block_handle import BlockHandle
+
+
+@dataclass
+class BloomFilterHandle:
+ """Handle for bloom filter block."""
+
+ offset: int
+ size: int
+ expected_entries: int
+
+ def is_null(self) -> bool:
+ """Check if this handle represents a null bloom filter."""
+ return self.offset == 0 and self.size == 0 and self.expected_entries
== 0
+
+
+class BTreeFileFooter:
+ """
+ The Footer for BTree file.
+
+ This footer contains the handles to the bloom filter, index block, and
null bitmap,
+ allowing efficient navigation of the B-tree index file.
+ """
+
+ MAGIC_NUMBER = 198732882
+ ENCODED_LENGTH = 48
+
+ def __init__(
+ self,
+ bloom_filter_handle: Optional[BloomFilterHandle],
+ index_block_handle: BlockHandle,
+ null_bitmap_handle: Optional[BlockHandle]
+ ):
+ """
+ Initialize the BTree file footer.
+
+ Args:
+ bloom_filter_handle: Handle to the bloom filter block (maybe None)
+ index_block_handle: Handle to the index block
+ null_bitmap_handle: Handle to the null bitmap block (maybe None)
+ """
+ self.bloom_filter_handle = bloom_filter_handle
+ self.index_block_handle = index_block_handle
+ self.null_bitmap_handle = null_bitmap_handle
+
+ @classmethod
+ def read_footer(cls, data: bytes) -> 'BTreeFileFooter':
+ """
+ Read footer from byte data.
+
+ Args:
+ data: Byte data containing the footer
+
+ Returns:
+ BTreeFileFooter instance
+
+ Raises:
+ ValueError: If magic number doesn't match
+ """
+ offset = 0
+
+ # Read bloom filter handle
+ bf_offset = struct.unpack('<Q', data[offset:offset + 8])[0]
+ offset += 8
+ bf_size = struct.unpack('<I', data[offset:offset + 4])[0]
+ offset += 4
+ bf_expected = struct.unpack('<Q', data[offset:offset + 8])[0]
+ offset += 8
+
+ bloom_filter_handle = None
+ if not (bf_offset == 0 and bf_size == 0 and bf_expected == 0):
+ bloom_filter_handle = BloomFilterHandle(bf_offset, bf_size,
bf_expected)
+
+ # Read index block handle
+ index_offset = struct.unpack('<Q', data[offset:offset + 8])[0]
+ offset += 8
+ index_size = struct.unpack('<I', data[offset:offset + 4])[0]
+ offset += 4
+
+ index_block_handle = BlockHandle(index_offset, index_size)
+
+ # Read null bitmap handle
+ nb_offset = struct.unpack('<Q', data[offset:offset + 8])[0]
+ offset += 8
+ nb_size = struct.unpack('<I', data[offset:offset + 4])[0]
+ offset += 4
+
+ null_bitmap_handle = None
+ if not (nb_offset == 0 and nb_size == 0):
+ null_bitmap_handle = BlockHandle(nb_offset, nb_size)
+
+ # Skip padding
+ offset = cls.ENCODED_LENGTH - 4
+
+ # Read and verify magic number
+ magic_number = struct.unpack('<I', data[offset:offset + 4])[0]
+ if magic_number != cls.MAGIC_NUMBER:
+ raise ValueError("File is not a table (bad magic number)")
+
+ return cls(bloom_filter_handle, index_block_handle, null_bitmap_handle)
diff --git a/paimon-python/pypaimon/globalindex/btree/btree_index_meta.py
b/paimon-python/pypaimon/globalindex/btree/btree_index_meta.py
new file mode 100644
index 0000000000..ef64954d14
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/btree_index_meta.py
@@ -0,0 +1,63 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""B-tree index metadata."""
+
+from dataclasses import dataclass
+from typing import Optional
+import struct
+
+
+@dataclass
+class BTreeIndexMeta:
+ """
+ Index Meta of each BTree index file.
+
+ The first key and last key of this meta could be null if the
+ entire btree index file only contains nulls.
+ """
+
+ first_key: Optional[bytes]
+ last_key: Optional[bytes]
+ has_nulls: bool
+
+ def only_nulls(self) -> bool:
+ """Check if this index only contains nulls."""
+ return self.first_key is None and self.last_key is None
+
+ @classmethod
+ def deserialize(cls, data: bytes) -> 'BTreeIndexMeta':
+ """Deserialize metadata from byte array."""
+ offset = 0
+
+ # Read first key
+ first_key_length = struct.unpack('<I', data[offset:offset + 4])[0]
+ offset += 4
+ first_key = None if first_key_length == 0 else data[offset:offset +
first_key_length]
+ offset += first_key_length
+
+ # Read last key
+ last_key_length = struct.unpack('<I', data[offset:offset + 4])[0]
+ offset += 4
+ last_key = None if last_key_length == 0 else data[offset:offset +
last_key_length]
+ offset += last_key_length
+
+ # Read has_nulls flag
+ has_nulls = struct.unpack('<B', data[offset:offset + 1])[0] == 1
+
+ return cls(first_key, last_key, has_nulls)
diff --git a/paimon-python/pypaimon/globalindex/btree/btree_index_reader.py
b/paimon-python/pypaimon/globalindex/btree/btree_index_reader.py
new file mode 100644
index 0000000000..83d6194d0d
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/btree_index_reader.py
@@ -0,0 +1,460 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""
+The BTreeIndexReader implementation for btree index.
+
+This reader provides efficient querying capabilities for B-tree based global
indexes,
+supporting various predicate operations like equality, range, and null checks.
+"""
+
+import struct
+import zlib
+from typing import List, Optional
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.globalindex.btree.btree_index_meta import BTreeIndexMeta
+from pypaimon.globalindex.btree.key_serializer import KeySerializer
+from pypaimon.globalindex.global_index_meta import GlobalIndexIOMeta
+from pypaimon.globalindex.global_index_reader import FieldRef,
GlobalIndexReader
+from pypaimon.globalindex.global_index_result import GlobalIndexResult
+from pypaimon.globalindex.roaring_bitmap import RoaringBitmap64
+from pypaimon.globalindex.btree.btree_file_footer import BTreeFileFooter
+from pypaimon.globalindex.btree.sst_file_reader import SstFileReader
+from pypaimon.globalindex.btree.memory_slice_input import MemorySliceInput
+
+
+def _deserialize_row_ids(data: bytes) -> List[int]:
+ data_input = MemorySliceInput(data)
+ length = data_input.read_var_len_int()
+
+ if length <= 0:
+ raise ValueError(f"Invalid row id length: {length}")
+
+ row_ids = []
+ for _ in range(length):
+ row_ids.append(data_input.read_var_len_long())
+
+ return row_ids
+
+
+def _compare_bytes(a: bytes, b: bytes) -> int:
+ """
+ Compare two byte arrays.
+
+ Args:
+ a: First byte array
+ b: Second byte array
+
+ Returns:
+ -1 if a < b, 0 if a == b, 1 if a > b
+ """
+ if a < b:
+ return -1
+ elif a > b:
+ return 1
+ return 0
+
+
+def _is_in_range_bytes(
+ key_bytes: bytes,
+ from_bytes: bytes,
+ to_bytes: bytes,
+ from_inclusive: bool,
+ to_inclusive: bool
+) -> bool:
+ """
+ Check if a key (as bytes) falls within the specified range.
+
+ Args:
+ key_bytes: The key bytes to check
+ from_bytes: Lower bound bytes
+ to_bytes: Upper bound bytes
+ from_inclusive: Whether lower bound is inclusive
+ to_inclusive: Whether upper bound is inclusive
+
+ Returns:
+ True if key is in range, False otherwise
+ """
+ if not from_inclusive and _compare_bytes(key_bytes, from_bytes) == 0:
+ return False
+
+ cmp_to = _compare_bytes(key_bytes, to_bytes)
+ if cmp_to > 0:
+ return False
+ if not to_inclusive and cmp_to == 0:
+ return False
+
+ return True
+
+
+class BTreeIndexReader(GlobalIndexReader):
+ """
+ The GlobalIndexReader implementation for btree index.
+
+ This reader provides efficient querying capabilities for B-tree based
global indexes,
+ supporting various predicate operations like equality, range, and null
checks.
+ """
+
+ FOOTER_ENCODED_LENGTH = 48
+
+ def __init__(
+ self,
+ key_serializer: KeySerializer,
+ file_io: FileIO,
+ index_path: str,
+ io_meta: GlobalIndexIOMeta
+ ):
+ self.key_serializer = key_serializer
+ self.comparator = key_serializer.create_comparator()
+ self.io_meta = io_meta
+
+ # Deserialize index metadata
+ index_meta = BTreeIndexMeta.deserialize(io_meta.metadata)
+
+ if index_meta.first_key is not None:
+ self.min_key = key_serializer.deserialize(index_meta.first_key)
+ self.max_key = key_serializer.deserialize(index_meta.last_key)
+ else:
+ # This is possible if this btree index file only stores nulls
+ self.min_key = None
+ self.max_key = None
+
+ self.has_nulls = index_meta.has_nulls
+ self.input_stream = file_io.new_input_stream(index_path + "/" +
io_meta.file_name)
+
+ # Lazy-loaded null bitmap
+ self._null_bitmap: Optional[RoaringBitmap64] = None
+
+ # Read footer to get index and bloom filter handles
+ self.footer = self._read_footer()
+
+ # Initialize SST file reader (simplified version)
+ self.reader = self._create_sst_reader()
+
+ def _read_footer(self) -> BTreeFileFooter:
+ """
+ Read the file footer to get metadata handles.
+
+ Returns:
+ BTreeFileFooter containing index_block_handle and
bloom_filter_handle
+ """
+ file_size = self.io_meta.file_size
+ # Seek to footer position
+ self.input_stream.seek(file_size - BTreeFileFooter.ENCODED_LENGTH)
+ footer_data = self.input_stream.read(BTreeFileFooter.ENCODED_LENGTH)
+
+ # Parse footer
+ return BTreeFileFooter.read_footer(footer_data)
+
+ def _create_sst_reader(self) -> SstFileReader:
+ def comparator(a: bytes, b: bytes) -> int:
+ o1 = self.key_serializer.deserialize(a)
+ o2 = self.key_serializer.deserialize(b)
+ return self.comparator(o1, o2)
+
+ return SstFileReader(self.input_stream, comparator,
self.footer.index_block_handle)
+
+ def _read_null_bitmap(self) -> RoaringBitmap64:
+ """
+ Read the null bitmap from the index file.
+
+ Returns:
+ RoaringBitmap64 containing null row IDs
+ """
+ if self._null_bitmap is not None:
+ return self._null_bitmap
+
+ bitmap = RoaringBitmap64()
+
+ # Read from the null bitmap block handle if available
+ if self.footer.null_bitmap_handle is not None:
+ self.input_stream.seek(self.footer.null_bitmap_handle.offset)
+ data = self.input_stream.read(self.footer.null_bitmap_handle.size)
+
+ if len(data) >= 4:
+ # Read bitmap data (excluding CRC32)
+ bitmap_length = len(data) - 4
+ bitmap_bytes = data[:bitmap_length]
+ crc32_value = struct.unpack('>I',
data[bitmap_length:bitmap_length + 4])[0]
+
+ # Verify CRC32
+ actual_crc32 = zlib.crc32(bitmap_bytes) & 0xFFFFFFFF
+ if actual_crc32 == crc32_value:
+ bitmap = RoaringBitmap64.deserialize(bitmap_bytes)
+
+ self._null_bitmap = bitmap
+ return bitmap
+
+ def _all_non_null_rows(self) -> RoaringBitmap64:
+ """
+ Get all non-null row IDs.
+
+ This traverses all data to avoid returning null values, which is very
+ advantageous in situations where there are many null values.
+
+ Returns:
+ RoaringBitmap64 containing all non-null row IDs
+ """
+ if self.min_key is None:
+ return RoaringBitmap64()
+
+ return self._range_query(self.min_key, self.max_key, True, True)
+
+ def _range_query(
+ self,
+ from_key: object,
+ to_key: object,
+ from_inclusive: bool,
+ to_inclusive: bool
+ ) -> RoaringBitmap64:
+ """
+ Range query on underlying SST File.
+
+ Args:
+ from_key: Lower bound key
+ to_key: Upper bound key
+ from_inclusive: Whether to include lower bound
+ to_inclusive: Whether to include upper bound
+
+ Returns:
+ RoaringBitmap64 containing all qualified row IDs
+ """
+ result = RoaringBitmap64()
+
+ # Use SST reader to efficiently scan the data blocks
+ serialized_from = self.key_serializer.serialize(from_key)
+ serialized_to = self.key_serializer.serialize(to_key)
+
+ # Create iterator and seek to start key
+ file_iter = self.reader.create_iterator()
+ file_iter.seek_to(serialized_from)
+
+ # Iterate through data blocks
+ while True:
+ data_iter = file_iter.read_batch()
+ if data_iter is None:
+ break
+
+ # Process entries in current block
+ while data_iter.has_next():
+ entry = data_iter.__next__()
+ if entry is None:
+ break
+
+ key_bytes = entry.key
+ value_bytes = entry.value
+
+ # Check if key is within range using byte comparison
+ if _is_in_range_bytes(key_bytes, serialized_from,
serialized_to, from_inclusive, to_inclusive):
+ row_ids = _deserialize_row_ids(value_bytes)
+ for row_id in row_ids:
+ result.add(row_id)
+ elif _compare_bytes(key_bytes, serialized_to) > 0:
+ # Key is beyond the range, stop processing
+ return result
+
+ return result
+
+ def _is_in_range(
+ self,
+ key: object,
+ from_key: object,
+ to_key: object,
+ from_inclusive: bool,
+ to_inclusive: bool
+ ) -> bool:
+ """
+ Check if a key falls within the specified range.
+
+ Args:
+ key: The key to check
+ from_key: Lower bound
+ to_key: Upper bound
+ from_inclusive: Whether lower bound is inclusive
+ to_inclusive: Whether upper bound is inclusive
+
+ Returns:
+ True if key is in range, False otherwise
+ """
+ if not from_inclusive and self.comparator(key, from_key) == 0:
+ return False
+
+ cmp_to = self.comparator(key, to_key)
+ if cmp_to > 0:
+ return False
+ if not to_inclusive and cmp_to == 0:
+ return False
+
+ return True
+
+ def visit_is_not_null(self, field_ref: FieldRef) ->
Optional[GlobalIndexResult]:
+ """
+ Visit an is-not-null predicate.
+
+ Nulls are stored separately in null bitmap.
+ """
+ def supplier():
+ try:
+ return self._all_non_null_rows()
+ except Exception as e:
+ raise RuntimeError("fail to read btree index file.", e)
+
+ return GlobalIndexResult.create(supplier)
+
+ def visit_is_null(self, field_ref: FieldRef) ->
Optional[GlobalIndexResult]:
+ """
+ Visit an is-null predicate.
+
+ Nulls are stored separately in null bitmap.
+ """
+ return GlobalIndexResult.create(self._read_null_bitmap)
+
+ def visit_less_than(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ """Visit a less-than predicate."""
+ def supplier():
+ try:
+ return self._range_query(self.min_key, literal, True, False)
+ except Exception as e:
+ raise RuntimeError("fail to read btree index file.", e)
+
+ return GlobalIndexResult.create(supplier)
+
+ def visit_greater_or_equal(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ """Visit a greater-or-equal predicate."""
+ def supplier():
+ try:
+ return self._range_query(literal, self.max_key, True, True)
+ except Exception as e:
+ raise RuntimeError("fail to read btree index file.", e)
+
+ return GlobalIndexResult.create(supplier)
+
+ def visit_not_equal(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ """Visit a not-equal predicate."""
+ def supplier():
+ try:
+ result = self._all_non_null_rows()
+ equal_result = self._range_query(literal, literal, True, True)
+ return RoaringBitmap64.remove_all(result, equal_result)
+ except Exception as e:
+ raise RuntimeError("fail to read btree index file.", e)
+
+ return GlobalIndexResult.create(supplier)
+
+ def visit_less_or_equal(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ """Visit a less-or-equal predicate."""
+ def supplier():
+ try:
+ return self._range_query(self.min_key, literal, True, True)
+ except Exception as e:
+ raise RuntimeError("fail to read btree index file.", e)
+
+ return GlobalIndexResult.create(supplier)
+
+ def visit_equal(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ """Visit an equality predicate."""
+ def supplier():
+ return self._range_query(literal, literal, True, True)
+
+ return GlobalIndexResult.create(supplier)
+
+ def visit_greater_than(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ """Visit a greater-than predicate."""
+ def supplier():
+ try:
+ return self._range_query(literal, self.max_key, False, True)
+ except Exception as e:
+ raise RuntimeError("fail to read btree index file.", e)
+
+ return GlobalIndexResult.create(supplier)
+
+ def visit_in(self, field_ref: FieldRef, literals: List[object]) ->
Optional[GlobalIndexResult]:
+ """Visit an in predicate."""
+ def supplier():
+ try:
+ result = RoaringBitmap64()
+ for literal in literals:
+ range_result = self._range_query(literal, literal, True,
True)
+ result = RoaringBitmap64.or_(result, range_result)
+ return result
+ except Exception as e:
+ raise RuntimeError("fail to read btree index file.", e)
+
+ return GlobalIndexResult.create(supplier)
+
+ def visit_not_in(self, field_ref: FieldRef, literals: List[object]) ->
Optional[GlobalIndexResult]:
+ """Visit a not-in predicate."""
+ def supplier():
+ try:
+ result = self._all_non_null_rows()
+ in_result = self.visit_in(field_ref, literals).results()
+ return RoaringBitmap64.remove_all(result, in_result)
+ except Exception as e:
+ raise RuntimeError("fail to read btree index file.", e)
+
+ return GlobalIndexResult.create(supplier)
+
+ def visit_starts_with(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ """
+ Visit a starts-with predicate.
+
+ Note: `startsWith` can also be covered by btree index in the future.
+ """
+ def supplier():
+ try:
+ return self._all_non_null_rows()
+ except Exception as e:
+ raise RuntimeError("fail to read btree index file.", e)
+
+ return GlobalIndexResult.create(supplier)
+
+ def visit_ends_with(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ """Visit an ends-with predicate."""
+ def supplier():
+ try:
+ return self._all_non_null_rows()
+ except Exception as e:
+ raise RuntimeError("fail to read btree index file.", e)
+
+ return GlobalIndexResult.create(supplier)
+
+ def visit_contains(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ """Visit a contains predicate."""
+ def supplier():
+ try:
+ return self._all_non_null_rows()
+ except Exception as e:
+ raise RuntimeError("fail to read btree index file.", e)
+
+ return GlobalIndexResult.create(supplier)
+
+ def visit_like(self, field_ref: FieldRef, literal: object) ->
Optional[GlobalIndexResult]:
+ """Visit a like predicate."""
+ def supplier():
+ try:
+ return self._all_non_null_rows()
+ except Exception as e:
+ raise RuntimeError("fail to read btree index file.", e)
+
+ return GlobalIndexResult.create(supplier)
+
+ def close(self) -> None:
+ """Close the reader and release resources."""
+ if self.input_stream is not None:
+ self.input_stream.close()
diff --git a/paimon-python/pypaimon/globalindex/btree/btree_reader.py
b/paimon-python/pypaimon/globalindex/btree/btree_reader.py
new file mode 100644
index 0000000000..bdd19b48d3
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/btree_reader.py
@@ -0,0 +1,164 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""B-tree file footer for storing index metadata."""
+
+from dataclasses import dataclass
+from typing import Optional
+import struct
+
+from pypaimon.globalindex.btree.block_handle import BlockHandle
+
+
+@dataclass
+class BloomFilterHandle:
+ """Handle for bloom filter block."""
+
+ offset: int
+ size: int
+ expected_entries: int
+
+ def is_null(self) -> bool:
+ """Check if this handle represents a null bloom filter."""
+ return self.offset == 0 and self.size == 0 and self.expected_entries
== 0
+
+
+class BTreeFileFooter:
+ """
+ The Footer for BTree file.
+
+ This footer contains the handles to the bloom filter, index block, and
null bitmap,
+ allowing efficient navigation of the B-tree index file.
+ """
+
+ MAGIC_NUMBER = 198732882
+ ENCODED_LENGTH = 48
+
+ def __init__(
+ self,
+ bloom_filter_handle: Optional[BloomFilterHandle],
+ index_block_handle: BlockHandle,
+ null_bitmap_handle: Optional[BlockHandle]
+ ):
+ """
+ Initialize the BTree file footer.
+
+ Args:
+ bloom_filter_handle: Handle to the bloom filter block (maybe None)
+ index_block_handle: Handle to the index block
+ null_bitmap_handle: Handle to the null bitmap block (maybe None)
+ """
+ self.bloom_filter_handle = bloom_filter_handle
+ self.index_block_handle = index_block_handle
+ self.null_bitmap_handle = null_bitmap_handle
+
+ @classmethod
+ def read_footer(cls, data: bytes) -> 'BTreeFileFooter':
+ """
+ Read footer from byte data.
+
+ Args:
+ data: Byte data containing the footer
+
+ Returns:
+ BTreeFileFooter instance
+
+ Raises:
+ ValueError: If magic number doesn't match
+ """
+ offset = 0
+
+ # Read bloom filter handle
+ bf_offset = struct.unpack('<Q', data[offset:offset + 8])[0]
+ offset += 8
+ bf_size = struct.unpack('<I', data[offset:offset + 4])[0]
+ offset += 4
+ bf_expected = struct.unpack('<Q', data[offset:offset + 8])[0]
+ offset += 8
+
+ bloom_filter_handle = None
+ if not (bf_offset == 0 and bf_size == 0 and bf_expected == 0):
+ bloom_filter_handle = BloomFilterHandle(bf_offset, bf_size,
bf_expected)
+
+ # Read index block handle
+ index_offset = struct.unpack('<Q', data[offset:offset + 8])[0]
+ offset += 8
+ index_size = struct.unpack('<I', data[offset:offset + 4])[0]
+ offset += 4
+
+ index_block_handle = BlockHandle(index_offset, index_size)
+
+ # Read null bitmap handle
+ nb_offset = struct.unpack('<Q', data[offset:offset + 8])[0]
+ offset += 8
+ nb_size = struct.unpack('<I', data[offset:offset + 4])[0]
+ offset += 4
+
+ null_bitmap_handle = None
+ if not (nb_offset == 0 and nb_size == 0):
+ null_bitmap_handle = BlockHandle(nb_offset, nb_size)
+
+ # Skip padding
+ offset = cls.ENCODED_LENGTH - 4
+
+ # Read and verify magic number
+ magic_number = struct.unpack('<I', data[offset:offset + 4])[0]
+ if magic_number != cls.MAGIC_NUMBER:
+ raise ValueError("File is not a table (bad magic number)")
+
+ return cls(bloom_filter_handle, index_block_handle, null_bitmap_handle)
+
+ def write_footer(self) -> bytes:
+ """
+ Write footer to byte data.
+
+ Returns:
+ Byte data containing the footer
+ """
+ buffer = bytearray()
+
+ # Write bloom filter handle
+ if self.bloom_filter_handle is None:
+ buffer.extend(struct.pack('>Q', 0))
+ buffer.extend(struct.pack('>I', 0))
+ buffer.extend(struct.pack('>Q', 0))
+ else:
+ buffer.extend(struct.pack('>Q', self.bloom_filter_handle.offset))
+ buffer.extend(struct.pack('>I', self.bloom_filter_handle.size))
+ buffer.extend(struct.pack('>Q',
self.bloom_filter_handle.expected_entries))
+
+ # Write index block handle
+ buffer.extend(struct.pack('>Q', self.index_block_handle.offset))
+ buffer.extend(struct.pack('>I', self.index_block_handle.size))
+
+ # Write null bitmap handle
+ if self.null_bitmap_handle is None:
+ buffer.extend(struct.pack('>Q', 0))
+ buffer.extend(struct.pack('>I', 0))
+ else:
+ buffer.extend(struct.pack('>Q', self.null_bitmap_handle.offset))
+ buffer.extend(struct.pack('>I', self.null_bitmap_handle.size))
+
+ # Write magic number
+ buffer.extend(struct.pack('>I', self.MAGIC_NUMBER))
+
+ # Pad to ENCODED_LENGTH
+ while len(buffer) < self.ENCODED_LENGTH:
+ buffer.extend(b'\x00')
+
+ return bytes(buffer)
diff --git a/paimon-python/pypaimon/globalindex/btree/key_serializer.py
b/paimon-python/pypaimon/globalindex/btree/key_serializer.py
new file mode 100644
index 0000000000..79cb32e5e6
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/key_serializer.py
@@ -0,0 +1,231 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Key serializer for B-tree index."""
+
+from abc import ABC, abstractmethod
+from typing import Callable
+import struct
+
+
+class KeySerializer(ABC):
+ """
+ Interface for serializing and deserializing B-tree index keys.
+
+ This interface provides core methods to ser/de and compare btree index
keys.
+ """
+
+ @abstractmethod
+ def serialize(self, key: object) -> bytes:
+ """Serialize a key to bytes."""
+ pass
+
+ @abstractmethod
+ def deserialize(self, data: bytes) -> object:
+ """Deserialize bytes to a key."""
+ pass
+
+ @abstractmethod
+ def create_comparator(self) -> Callable[[object, object], int]:
+ """
+ Create a comparator function for keys.
+
+ Returns:
+ A function that takes two keys and returns:
+ - negative if first < second
+ - 0 if first == second
+ - positive if first > second
+ """
+ pass
+
+
+class StringSerializer(KeySerializer):
+ """Serializer for STRING type."""
+
+ def serialize(self, key: object) -> bytes:
+ """Serialize a string key to bytes."""
+ if isinstance(key, str):
+ return key.encode('utf-8')
+ return str(key).encode('utf-8')
+
+ def deserialize(self, data: bytes) -> object:
+ """Deserialize bytes to a string key."""
+ return data.decode('utf-8')
+
+ def create_comparator(self) -> Callable[[object, object], int]:
+ """Create a comparator for string keys."""
+ def compare(a: object, b: object) -> int:
+ str_a = a if isinstance(a, str) else str(a)
+ str_b = b if isinstance(b, str) else str(b)
+ if str_a < str_b:
+ return -1
+ elif str_a > str_b:
+ return 1
+ return 0
+ return compare
+
+
+class LongSerializer(KeySerializer):
+ """Serializer for BIGINT type."""
+
+ def serialize(self, key: object) -> bytes:
+ """Serialize a long key to bytes."""
+ return struct.pack('>q', int(key))
+
+ def deserialize(self, data: bytes) -> object:
+ """Deserialize bytes to a long key."""
+ return struct.unpack('>q', data)[0]
+
+ def create_comparator(self) -> Callable[[object, object], int]:
+ """Create a comparator for long keys."""
+ def compare(a: object, b: object) -> int:
+ long_a = int(a)
+ long_b = int(b)
+ if long_a < long_b:
+ return -1
+ elif long_a > long_b:
+ return 1
+ return 0
+ return compare
+
+
+class IntSerializer(KeySerializer):
+ """Serializer for INT type."""
+
+ def serialize(self, key: object) -> bytes:
+ """Serialize an int key to bytes."""
+ return struct.pack('>i', int(key))
+
+ def deserialize(self, data: bytes) -> object:
+ """Deserialize bytes to an int key."""
+ return struct.unpack('>i', data)[0]
+
+ def create_comparator(self) -> Callable[[object, object], int]:
+ """Create a comparator for int keys."""
+ def compare(a: object, b: object) -> int:
+ int_a = int(a)
+ int_b = int(b)
+ if int_a < int_b:
+ return -1
+ elif int_a > int_b:
+ return 1
+ return 0
+ return compare
+
+
+class FloatSerializer(KeySerializer):
+ """Serializer for FLOAT type."""
+
+ def serialize(self, key: object) -> bytes:
+ """Serialize a float key to bytes."""
+ return struct.pack('>f', float(key))
+
+ def deserialize(self, data: bytes) -> object:
+ """Deserialize bytes to a float key."""
+ return struct.unpack('>f', data)[0]
+
+ def create_comparator(self) -> Callable[[object, object], int]:
+ """Create a comparator for float keys."""
+ def compare(a: object, b: object) -> int:
+ float_a = float(a)
+ float_b = float(b)
+ if float_a < float_b:
+ return -1
+ elif float_a > float_b:
+ return 1
+ return 0
+ return compare
+
+
+class DoubleSerializer(KeySerializer):
+ """Serializer for DOUBLE type."""
+
+ def serialize(self, key: object) -> bytes:
+ """Serialize a double key to bytes."""
+ return struct.pack('>d', float(key))
+
+ def deserialize(self, data: bytes) -> object:
+ """Deserialize bytes to a double key."""
+ return struct.unpack('>d', data)[0]
+
+ def create_comparator(self) -> Callable[[object, object], int]:
+ """Create a comparator for double keys."""
+ def compare(a: object, b: object) -> int:
+ double_a = float(a)
+ double_b = float(b)
+ if double_a < double_b:
+ return -1
+ elif double_a > double_b:
+ return 1
+ return 0
+ return compare
+
+
+class BooleanSerializer(KeySerializer):
+ """Serializer for BOOLEAN type."""
+
+ def serialize(self, key: object) -> bytes:
+ """Serialize a boolean key to bytes."""
+ return struct.pack('>B', 1 if key else 0)
+
+ def deserialize(self, data: bytes) -> object:
+ """Deserialize bytes to a boolean key."""
+ return struct.unpack('>B', data)[0] == 1
+
+ def create_comparator(self) -> Callable[[object, object], int]:
+ """Create a comparator for boolean keys."""
+ def compare(a: object, b: object) -> int:
+ bool_a = bool(a)
+ bool_b = bool(b)
+ if bool_a < bool_b:
+ return -1
+ elif bool_a > bool_b:
+ return 1
+ return 0
+ return compare
+
+
+def create_serializer(data_type: str) -> KeySerializer:
+ """
+ Factory method to create a KeySerializer based on data type.
+
+ Args:
+ data_type: String representation of the data type
+
+ Returns:
+ Appropriate KeySerializer instance
+
+ Raises:
+ ValueError: If the data type is not supported
+ """
+ data_type_lower = data_type.lower()
+
+ if data_type_lower in ('string', 'varchar', 'char'):
+ return StringSerializer()
+ elif data_type_lower in ('bigint', 'long'):
+ return LongSerializer()
+ elif data_type_lower in ('int', 'integer'):
+ return IntSerializer()
+ elif data_type_lower in ('float'):
+ return FloatSerializer()
+ elif data_type_lower in ('double'):
+ return DoubleSerializer()
+ elif data_type_lower in ('boolean', 'bool'):
+ return BooleanSerializer()
+ else:
+ raise ValueError(f"DataType: {data_type} is not supported by btree
index now.")
diff --git a/paimon-python/pypaimon/globalindex/btree/memory_slice_input.py
b/paimon-python/pypaimon/globalindex/btree/memory_slice_input.py
new file mode 100644
index 0000000000..3897f8c162
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/memory_slice_input.py
@@ -0,0 +1,162 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""Input for byte array."""
+
+import struct
+
+
+class MemorySliceInput:
+ """Input for byte array."""
+
+ def __init__(self, data: bytes):
+ """Initialize MemorySliceInput.
+
+ Args:
+ data: The byte array to read from
+ """
+ self.data = data
+ self._position = 0
+
+ def position(self) -> int:
+ """Get current position."""
+ return self._position
+
+ def set_position(self, position: int) -> None:
+ """Set current position.
+
+ Args:
+ position: The new position
+
+ Raises:
+ IndexError: If position is out of bounds
+ """
+ if position < 0 or position > len(self.data):
+ raise IndexError(f"Position {position} out of bounds [0,
{len(self.data)}]")
+ self._position = position
+
+ def is_readable(self) -> bool:
+ """Check if there are more bytes to read."""
+ return self.available() > 0
+
+ def available(self) -> int:
+ """Get number of available bytes."""
+ return len(self.data) - self._position
+
+ def read_byte(self) -> int:
+ """Read a single byte.
+
+ Returns:
+ The byte value (0-255)
+
+ Raises:
+ IndexError: If no more bytes available
+ """
+ if self._position >= len(self.data):
+ raise IndexError("No more bytes available")
+ value = self.data[self._position]
+ self._position += 1
+ return value
+
+ def read_unsigned_byte(self) -> int:
+ """Read an unsigned byte.
+
+ Returns:
+ The byte value (0-255)
+ """
+ return self.read_byte() & 0xFF
+
+ def read_int(self) -> int:
+ """Read a 4-byte integer (big-endian).
+
+ Returns:
+ The integer value
+ """
+ if self._position + 4 > len(self.data):
+ raise IndexError("Not enough bytes to read int")
+ value = struct.unpack('>I', self.data[self._position:self._position +
4])[0]
+ self._position += 4
+ return value
+
+ def read_var_len_int(self) -> int:
+ """Read a variable-length integer.
+
+ Returns:
+ The integer value
+
+ Raises:
+ ValueError: If integer is malformed
+ """
+ result = 0
+ offset = 0
+ while offset < 32:
+ b = self.read_unsigned_byte()
+ result |= (b & 0x7F) << offset
+ if (b & 0x80) == 0:
+ return result
+ offset += 7
+ raise ValueError("Malformed integer")
+
+ def read_long(self) -> int:
+ """Read an 8-byte long (big-endian).
+
+ Returns:
+ The long value
+ """
+ if self._position + 8 > len(self.data):
+ raise IndexError("Not enough bytes to read long")
+ value = struct.unpack('>Q', self.data[self._position:self._position +
8])[0]
+ self._position += 8
+ return value
+
+ def read_var_len_long(self) -> int:
+ """Read a variable-length long.
+
+ Returns:
+ The long value
+
+ Raises:
+ ValueError: If long is malformed
+ """
+ result = 0
+ offset = 0
+ while offset < 64:
+ b = self.read_unsigned_byte()
+ result |= (b & 0x7F) << offset
+ if (b & 0x80) == 0:
+ return result
+ offset += 7
+ raise ValueError("Malformed long")
+
+ def read_slice(self, length: int) -> bytes:
+ """Read a slice of bytes.
+
+ Args:
+ length: Number of bytes to read
+
+ Returns:
+ The slice of bytes
+
+ Raises:
+ IndexError: If not enough bytes available
+ """
+ if self._position + length > len(self.data):
+ raise IndexError(f"Not enough bytes to read slice of length
{length}")
+ value = self.data[self._position:self._position + length]
+ self._position += length
+ return value
diff --git a/paimon-python/pypaimon/globalindex/btree/sst_file_reader.py
b/paimon-python/pypaimon/globalindex/btree/sst_file_reader.py
new file mode 100644
index 0000000000..2ac19d07dc
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/sst_file_reader.py
@@ -0,0 +1,194 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""
+An SST File Reader which serves point queries and range queries.
+
+Users can call createIterator to create a file iterator and then use seek
+and read methods to do range queries.
+
+Note that this class is NOT thread-safe.
+"""
+
+import struct
+import zlib
+from typing import Optional, Callable
+from typing import BinaryIO
+
+from pypaimon.globalindex.btree.btree_file_footer import BlockHandle
+from pypaimon.globalindex.btree.block_entry import BlockEntry
+from pypaimon.globalindex.btree.block_reader import BlockReader, BlockIterator
+from pypaimon.globalindex.btree.memory_slice_input import MemorySliceInput
+
+
+class SstFileIterator:
+ """
+ Iterator for range queries on SST file.
+
+ Allows seeking to a position and reading batches of records.
+ """
+
+ def __init__(self, read_block: Callable[[BlockHandle], BlockReader],
index_block_iterator: BlockIterator):
+ self.read_block = read_block
+ self.index_iterator = index_block_iterator
+ self.sought_data_block: Optional[BlockIterator] = None
+
+ def seek_to(self, key: bytes) -> None:
+ """
+ Seek to the position of the record whose key is exactly equal to or
+ greater than the specified key.
+
+ Args:
+ key: The key to seek to
+ """
+ self.index_iterator.seek_to(key)
+
+ if self.index_iterator.has_next():
+ index_entry: BlockEntry = self.index_iterator.__next__()
+ block_handle_bytes = index_entry.__getattribute__("value")
+ handle_input = MemorySliceInput(block_handle_bytes)
+
+ # Parse block handle
+ block_handle = BlockHandle(
+ handle_input.read_var_len_long(),
+ handle_input.read_var_len_int()
+ )
+
+ # Create data block reader and seek
+ data_block_reader = self.read_block(block_handle)
+ self.sought_data_block = data_block_reader.iterator()
+ self.sought_data_block.seek_to(key)
+ else:
+ self.sought_data_block = None
+
+ def read_batch(self) -> Optional[BlockIterator]:
+ """
+ Read a batch of records from this SST File and move current record
+ position to the next batch.
+
+ Returns:
+ BlockIterator for the current batch, or None if at file end
+ """
+ if self.sought_data_block is not None:
+ result = self.sought_data_block
+ self.sought_data_block = None
+ return result
+
+ if not self.index_iterator.has_next():
+ return None
+
+ index_entry = self.index_iterator.__next__()
+ block_handle_bytes = index_entry.value
+
+ # Parse block handle
+ block_handle = BlockHandle(
+ struct.unpack('<Q', block_handle_bytes[0:8])[0],
+ struct.unpack('<I', block_handle_bytes[8:12])[0]
+ )
+
+ # Create data block reader
+ data_block_reader = self.read_block(block_handle)
+ return data_block_reader.iterator()
+
+
+class SstFileReader:
+ """
+ An SST File Reader which serves point queries and range queries.
+
+ Users can call createIterator to create a file iterator and then use seek
+ and read methods to do range queries.
+
+ Note that this class is NOT thread-safe.
+ """
+
+ def __init__(
+ self,
+ input_stream: BinaryIO,
+ comparator: Callable[[bytes, bytes], int],
+ index_block_handle: BlockHandle
+ ):
+ self.comparator = comparator
+ self.input_stream = input_stream
+ self.index_block = self._read_block(index_block_handle, True)
+
+ def _read_block(self, block_handle: BlockHandle, index: bool) ->
BlockReader:
+ self.input_stream.seek(block_handle.offset)
+ # Read block data + 5 bytes trailer (1 byte compression type + 4 bytes
CRC32)
+ block_data = self.input_stream.read(block_handle.size + 5)
+ # Parse block trailer (last 5 bytes: 1 byte compression type + 4 bytes
CRC32)
+ if len(block_data) < 5:
+ raise ValueError("Block data too short to contain trailer")
+
+ trailer_offset = len(block_data) - 5
+ compression_type = block_data[trailer_offset]
+ if compression_type != 0:
+ raise ValueError("Compression type not supported")
+
+ crc32_value = struct.unpack('<I', block_data[trailer_offset +
1:trailer_offset + 5])[0]
+
+ # Extract block data (without trailer)
+ block_bytes = block_data[:trailer_offset]
+
+ # Verify CRC32
+ actual_crc32 = self.crc32c(block_bytes, compression_type)
+ if actual_crc32 != crc32_value:
+ raise ValueError(f"CRC32 mismatch: expected {crc32_value}, got
{actual_crc32}")
+
+ return BlockReader.create(block_bytes, self._slice_comparator if index
else self.comparator)
+
+ @staticmethod
+ def _slice_comparator(a: bytes, b: bytes) -> int:
+ if a < b:
+ return -1
+ elif a > b:
+ return 1
+ return 0
+
+ def create_iterator(self) -> SstFileIterator:
+ def read_block(block: BlockHandle) -> BlockReader:
+ return self._read_block(block, False)
+
+ return SstFileIterator(
+ read_block,
+ self.index_block.iterator())
+
+ @staticmethod
+ def crc32c(bytes_data: bytes, compression_type_id: int) -> int:
+ """
+ Calculate CRC32 checksum for the given bytes and compression type.
+
+ Args:
+ bytes_data: The byte array to calculate checksum for
+ compression_type_id: The persistent ID of the compression type
(0-255)
+
+ Returns:
+ The CRC32 checksum value
+ """
+ # Calculate CRC32 for the data bytes
+ crc_value = zlib.crc32(bytes_data)
+
+ # Update with compression type ID (lower 8 bits only)
+ crc_value = zlib.crc32(bytes([compression_type_id & 0xFF]), crc_value)
+
+ # Return as unsigned 32-bit integer
+ return crc_value & 0xFFFFFFFF
+
+ def close(self) -> None:
+ """Close the reader and release resources."""
+ # No resources to release in this implementation
+ pass
diff --git a/paimon-python/pypaimon/globalindex/global_index_evaluator.py
b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
index 6f3fb191f6..dd0eb57235 100644
--- a/paimon-python/pypaimon/globalindex/global_index_evaluator.py
+++ b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
@@ -133,7 +133,7 @@ class GlobalIndexEvaluator:
readers = self._readers_function(field_id)
self._index_readers_cache[field_id] = readers
- field_ref = FieldRef(predicate.index, predicate.field,
str(field.data_type))
+ field_ref = FieldRef(predicate.index, predicate.field, str(field.type))
compound_result: Optional[GlobalIndexResult] = None
diff --git a/paimon-python/pypaimon/globalindex/global_index_scan_builder.py
b/paimon-python/pypaimon/globalindex/global_index_scan_builder.py
index 8b9a7e0f58..3d449d1288 100644
--- a/paimon-python/pypaimon/globalindex/global_index_scan_builder.py
+++ b/paimon-python/pypaimon/globalindex/global_index_scan_builder.py
@@ -180,6 +180,17 @@ class RowRangeGlobalIndexScanner:
options=options
)
readers.append(reader)
+ if index_type == 'btree':
+ from pypaimon.globalindex.btree import (BTreeIndexReader)
+ from pypaimon.globalindex.btree.key_serializer import
(StringSerializer)
+ for metadata in io_metas:
+ reader = BTreeIndexReader(
+ key_serializer=StringSerializer(), # TODO create
serializer from type
+ file_io=file_io,
+ index_path=index_path,
+ io_meta=metadata
+ )
+ readers.append(reader)
return readers
diff --git a/paimon-python/pypaimon/globalindex/roaring_bitmap.py
b/paimon-python/pypaimon/globalindex/roaring_bitmap.py
index 86781d29eb..c91f0c149a 100644
--- a/paimon-python/pypaimon/globalindex/roaring_bitmap.py
+++ b/paimon-python/pypaimon/globalindex/roaring_bitmap.py
@@ -119,6 +119,12 @@ class RoaringBitmap64:
result._data = a._data | b._data
return result
+ @staticmethod
+ def remove_all(a: 'RoaringBitmap64', b: 'RoaringBitmap64') ->
'RoaringBitmap64':
+ result = RoaringBitmap64()
+ result._data = a._data - b._data
+ return result
+
def serialize(self) -> bytes:
"""Serialize the bitmap to bytes."""
# Simple serialization format: count followed by sorted values
diff --git a/paimon-python/pypaimon/read/table_read.py
b/paimon-python/pypaimon/read/table_read.py
index f546c4be6b..7492c5c447 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -254,7 +254,7 @@ class TableRead:
elif self.table.options.data_evolution_enabled():
return DataEvolutionSplitRead(
table=self.table,
- predicate=self.predicate,
+ predicate=None, # Never push predicate to split read
read_type=self.read_type,
split=split,
row_tracking_enabled=True
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index b56ced657c..86c2866676 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -25,6 +25,7 @@ import pyarrow as pa
from parameterized import parameterized
from pypaimon.catalog.catalog_factory import CatalogFactory
from pypaimon.schema.schema import Schema
+from pypaimon.read.read_builder import ReadBuilder
if sys.version_info[:2] == (3, 6):
from pypaimon.tests.py36.pyarrow_compat import table_sort_by
@@ -318,3 +319,30 @@ class JavaPyReadWriteTest(unittest.TestCase):
'b': [i * 100 for i in range(1, 10001) if i * 10 != 81930]
}, schema=pa_schema)
self.assertEqual(expected, actual)
+
+ def test_read_btree_index_table(self):
+ table = self.catalog.get_table('default.test_btree_index')
+ read_builder: ReadBuilder = table.new_read_builder()
+
+ # read all
+ table_read = read_builder.new_read()
+ splits = read_builder.new_scan().plan().splits()
+ actual = table_sort_by(table_read.to_arrow(splits), 'k')
+ expected = pa.Table.from_pydict({
+ 'k': ["k1", "k2", "k3"],
+ 'v': ["v1", "v2", "v3"]
+ })
+ self.assertEqual(expected, actual)
+
+ # read using index
+ predicate_builder = read_builder.new_predicate_builder()
+ predicate = predicate_builder.equal('k', 'k2')
+ read_builder.with_filter(predicate)
+ table_read = read_builder.new_read()
+ splits = read_builder.new_scan().plan().splits()
+ actual = table_sort_by(table_read.to_arrow(splits), 'k')
+ expected = pa.Table.from_pydict({
+ 'k': ["k2"],
+ 'v': ["v2"]
+ })
+ self.assertEqual(expected, actual)