This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 327fe5cad0 [python] Implement BTree Reader in Python (#7160)
327fe5cad0 is described below

commit 327fe5cad0571f7934ead6494b7459da11b499c7
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat Jan 31 18:06:26 2026 +0800

    [python] Implement BTree Reader in Python (#7160)
    
    ### Purpose
    
    This PR is for implementing BTree reader in Python. This PR also fixed
    filter for data evolution read, should never push down filter to read.
    - Only indexing of the String type has been implemented
    - Compression is not implemented, and an error will be reported if block
    has been compressed
    
    ### Tests
    
    - Java write and Python read e2e test.
---
 .../test/java/org/apache/paimon/JavaPyE2ETest.java |  84 +++-
 paimon-python/dev/run_mixed_tests.sh               |  43 +-
 .../pypaimon/globalindex/btree/__init__.py         |  25 ++
 .../globalindex/btree/block_aligned_type.py        |  36 ++
 .../pypaimon/globalindex/btree/block_entry.py      |  55 +++
 .../pypaimon/globalindex/btree/block_handle.py     |  41 ++
 .../pypaimon/globalindex/btree/block_reader.py     | 254 ++++++++++++
 .../globalindex/btree/btree_file_footer.py         | 124 ++++++
 .../pypaimon/globalindex/btree/btree_index_meta.py |  63 +++
 .../globalindex/btree/btree_index_reader.py        | 460 +++++++++++++++++++++
 .../pypaimon/globalindex/btree/btree_reader.py     | 164 ++++++++
 .../pypaimon/globalindex/btree/key_serializer.py   | 231 +++++++++++
 .../globalindex/btree/memory_slice_input.py        | 162 ++++++++
 .../pypaimon/globalindex/btree/sst_file_reader.py  | 194 +++++++++
 .../pypaimon/globalindex/global_index_evaluator.py |   2 +-
 .../globalindex/global_index_scan_builder.py       |  11 +
 .../pypaimon/globalindex/roaring_bitmap.py         |   6 +
 paimon-python/pypaimon/read/table_read.py          |   2 +-
 .../pypaimon/tests/e2e/java_py_read_write_test.py  |  28 ++
 19 files changed, 1981 insertions(+), 4 deletions(-)

diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java 
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index 99e14a010f..238f4c08e4 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -30,19 +30,27 @@ import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.fs.FileIOFinder;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
+import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.options.MemorySize;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.SchemaUtils;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.AppendOnlyFileStoreTable;
 import org.apache.paimon.table.CatalogEnvironment;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.PrimaryKeyFileStoreTable;
 import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
 import org.apache.paimon.table.sink.InnerTableCommit;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.TableRead;
 import org.apache.paimon.types.DataType;
@@ -66,7 +74,11 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 
 import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.DATA_EVOLUTION_ENABLED;
 import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
+import static org.apache.paimon.CoreOptions.GLOBAL_INDEX_ENABLED;
+import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.CoreOptions.ROW_TRACKING_ENABLED;
 import static org.apache.paimon.CoreOptions.TARGET_FILE_SIZE;
 import static org.apache.paimon.data.DataFormatTestUtil.internalRowToString;
 import static org.apache.paimon.table.SimpleTableTestBase.getResult;
@@ -396,6 +408,76 @@ public class JavaPyE2ETest {
         }
     }
 
+    @Test
+    @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+    public void testBtreeIndexWrite() throws Exception {
+        // create table
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.STRING(), 
DataTypes.STRING()},
+                        new String[] {"k", "v"});
+        Options options = new Options();
+        Path tablePath = new Path(warehouse.toString() + 
"/default.db/test_btree_index");
+        options.set(PATH, tablePath.toString());
+        options.set(ROW_TRACKING_ENABLED, true);
+        options.set(DATA_EVOLUTION_ENABLED, true);
+        options.set(GLOBAL_INDEX_ENABLED, true);
+        TableSchema tableSchema =
+                SchemaUtils.forceCommit(
+                        new SchemaManager(LocalFileIO.create(), tablePath),
+                        new Schema(
+                                rowType.getFields(),
+                                Collections.emptyList(),
+                                Collections.emptyList(),
+                                options.toMap(),
+                                ""));
+        AppendOnlyFileStoreTable table =
+                new AppendOnlyFileStoreTable(
+                        FileIOFinder.find(tablePath),
+                        tablePath,
+                        tableSchema,
+                        CatalogEnvironment.empty());
+
+        // write data
+        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
+        try (BatchTableWrite write = writeBuilder.newWrite();
+                BatchTableCommit commit = writeBuilder.newCommit()) {
+            write.write(
+                    GenericRow.of(BinaryString.fromString("k1"), 
BinaryString.fromString("v1")));
+            write.write(
+                    GenericRow.of(BinaryString.fromString("k2"), 
BinaryString.fromString("v2")));
+            write.write(
+                    GenericRow.of(BinaryString.fromString("k3"), 
BinaryString.fromString("v3")));
+            commit.commit(write.prepareCommit());
+        }
+
+        // build index
+        BTreeGlobalIndexBuilder builder =
+                new 
BTreeGlobalIndexBuilder(table).withIndexType("btree").withIndexField("k");
+        try (BatchTableCommit commit = writeBuilder.newCommit()) {
+            commit.commit(builder.build(builder.scan(), 
IOManager.create(warehouse.toString())));
+        }
+
+        // assert index
+        List<IndexManifestEntry> indexEntries =
+                
table.indexManifestFileReader().read(table.latestSnapshot().get().indexManifest);
+        assertThat(indexEntries)
+                .singleElement()
+                .matches(entry -> entry.indexFile().rowCount() == 3);
+
+        // read index
+        PredicateBuilder predicateBuilder = new 
PredicateBuilder(table.rowType());
+        ReadBuilder readBuilder =
+                table.newReadBuilder()
+                        .withFilter(predicateBuilder.equal(0, 
BinaryString.fromString("k2")));
+        List<String> result = new ArrayList<>();
+        readBuilder
+                .newRead()
+                .createReader(readBuilder.newScan().plan())
+                .forEachRemaining(r -> result.add(r.getString(0) + ":" + 
r.getString(1)));
+        assertThat(result).containsOnly("k2:v2");
+    }
+
     // Helper method from TableTestBase
     protected Identifier identifier(String tableName) {
         return new Identifier(database, tableName);
@@ -408,7 +490,7 @@ public class JavaPyE2ETest {
                         new DataType[] {DataTypes.INT(), DataTypes.INT(), 
DataTypes.BIGINT()},
                         new String[] {"pt", "a", "b"});
         Options options = new Options();
-        options.set(CoreOptions.PATH, tablePath.toString());
+        options.set(PATH, tablePath.toString());
         options.set(BUCKET, 1);
         configure.accept(options);
         TableSchema tableSchema =
diff --git a/paimon-python/dev/run_mixed_tests.sh 
b/paimon-python/dev/run_mixed_tests.sh
index cd39c6e219..ed7f54e014 100755
--- a/paimon-python/dev/run_mixed_tests.sh
+++ b/paimon-python/dev/run_mixed_tests.sh
@@ -252,6 +252,32 @@ run_faiss_vector_test() {
         return 1
     fi
 }
+
+# Function to run BTree index test (Java write, Python read)
+run_btree_index_test() {
+    echo -e "${YELLOW}=== Step 6: Running BTree Index Test (Java Write, Python 
Read) ===${NC}"
+
+    cd "$PROJECT_ROOT"
+
+    echo "Running Maven test for JavaPyE2ETest.testBtreeIndexWrite..."
+    if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testBtreeIndexWrite -pl 
paimon-core -q -Drun.e2e.tests=true; then
+        echo -e "${GREEN}✓ Java test completed successfully${NC}"
+    else
+        echo -e "${RED}✗ Java test failed${NC}"
+        return 1
+    fi
+    cd "$PAIMON_PYTHON_DIR"
+    # Run the specific Python test method
+    echo "Running Python test for 
JavaPyReadWriteTest.test_read_btree_index_table..."
+    if python -m pytest 
java_py_read_write_test.py::JavaPyReadWriteTest::test_read_btree_index_table 
-v; then
+        echo -e "${GREEN}✓ Python test completed successfully${NC}"
+        return 0
+    else
+        echo -e "${RED}✗ Python test failed${NC}"
+        return 1
+    fi
+}
+
 # Main execution
 main() {
     local java_write_result=0
@@ -260,6 +286,7 @@ main() {
     local java_read_result=0
     local pk_dv_result=0
     local faiss_vector_result=0
+    local btree_index_result=0
 
     echo -e "${YELLOW}Starting mixed language test execution...${NC}"
     echo ""
@@ -311,6 +338,14 @@ main() {
     fi
 
     echo ""
+
+    # Run BTree index test (Java write, Python read)
+    if ! run_btree_index_test; then
+        btree_index_result=1
+    fi
+
+    echo ""
+
     echo -e "${YELLOW}=== Test Results Summary ===${NC}"
 
     if [[ $java_write_result -eq 0 ]]; then
@@ -349,12 +384,18 @@ main() {
         echo -e "${RED}✗ FAISS Vector Index Test (Java Write, Python Read): 
FAILED${NC}"
     fi
 
+    if [[ $btree_index_result -eq 0 ]]; then
+        echo -e "${GREEN}✓ BTree Index Test (Java Write, Python Read): 
PASSED${NC}"
+    else
+        echo -e "${RED}✗ BTree Index Test (Java Write, Python Read): 
FAILED${NC}"
+    fi
+
     echo ""
 
     # Clean up warehouse directory after all tests
     cleanup_warehouse
 
-    if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && 
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && 
$faiss_vector_result -eq 0 ]]; then
+    if [[ $java_write_result -eq 0 && $python_read_result -eq 0 && 
$python_write_result -eq 0 && $java_read_result -eq 0 && $pk_dv_result -eq 0 && 
$faiss_vector_result -eq 0 && $btree_index_result -eq 0 ]]; then
         echo -e "${GREEN}🎉 All tests passed! Java-Python interoperability 
verified.${NC}"
         return 0
     else
diff --git a/paimon-python/pypaimon/globalindex/btree/__init__.py 
b/paimon-python/pypaimon/globalindex/btree/__init__.py
new file mode 100644
index 0000000000..d8e84bcc95
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/__init__.py
@@ -0,0 +1,25 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+"""B-tree index implementation for global index."""
+
+from pypaimon.globalindex.btree.btree_index_reader import BTreeIndexReader
+from pypaimon.globalindex.btree.btree_index_meta import BTreeIndexMeta
+from pypaimon.globalindex.btree.key_serializer import KeySerializer
+
+__all__ = ['BTreeIndexReader', 'BTreeIndexMeta', 'KeySerializer']
diff --git a/paimon-python/pypaimon/globalindex/btree/block_aligned_type.py 
b/paimon-python/pypaimon/globalindex/btree/block_aligned_type.py
new file mode 100644
index 0000000000..c90ca7b709
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/block_aligned_type.py
@@ -0,0 +1,36 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+"""Aligned type for block."""
+
+from enum import Enum
+
+
+class BlockAlignedType(Enum):
+    """Aligned type for block."""
+    
+    ALIGNED = 0
+    UNALIGNED = 1
+    
+    @classmethod
+    def from_byte(cls, b: int) -> 'BlockAlignedType':
+        """Create BlockAlignedType from byte value."""
+        for aligned_type in cls:
+            if aligned_type.value == b:
+                return aligned_type
+        raise ValueError(f"Illegal block aligned type: {b}")
diff --git a/paimon-python/pypaimon/globalindex/btree/block_entry.py 
b/paimon-python/pypaimon/globalindex/btree/block_entry.py
new file mode 100644
index 0000000000..9c399e8d7c
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/block_entry.py
@@ -0,0 +1,55 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+"""Entry represents a key value."""
+
+
+class BlockEntry:
+    """Entry represents a key value."""
+    
+    def __init__(self, key: bytes, value: bytes):
+        """Initialize BlockEntry.
+        
+        Args:
+            key: The key bytes
+            value: The value bytes
+        """
+        if key is None:
+            raise ValueError("key is null")
+        if value is None:
+            raise ValueError("value is null")
+        self.key = key
+        self.value = value
+    
+    def __eq__(self, other) -> bool:
+        """Check equality with another BlockEntry."""
+        if self is other:
+            return True
+        if other is None or not isinstance(other, BlockEntry):
+            return False
+        return self.key == other.key and self.value == other.value
+    
+    def __hash__(self) -> int:
+        """Return hash of this BlockEntry."""
+        result = hash(self.key)
+        result = 31 * result + hash(self.value)
+        return result
+    
+    def __repr__(self) -> str:
+        """Return string representation."""
+        return f"BlockEntry(key={self.key!r}, value={self.value!r})"
diff --git a/paimon-python/pypaimon/globalindex/btree/block_handle.py 
b/paimon-python/pypaimon/globalindex/btree/block_handle.py
new file mode 100644
index 0000000000..a54ee2b176
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/block_handle.py
@@ -0,0 +1,41 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+"""Entry represents a key value."""
+
+from dataclasses import dataclass
+
+
+@dataclass
+class BlockHandle:
+    """Handle for a data block."""
+
+    def __init__(self, offset: int, size: int):
+        """
+        Initialize the block handle.
+
+        Args:
+            offset: Offset of the block in the file
+            size: Size of the block in bytes
+        """
+        self.offset = offset
+        self.size = size
+
+    def is_null(self) -> bool:
+        """Check if this handle represents a null block."""
+        return self.offset == 0 and self.size == 0
diff --git a/paimon-python/pypaimon/globalindex/btree/block_reader.py 
b/paimon-python/pypaimon/globalindex/btree/block_reader.py
new file mode 100644
index 0000000000..f0091d0231
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/block_reader.py
@@ -0,0 +1,254 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+"""Reader for a block."""
+
+import struct
+from typing import Callable, Optional
+
+from pypaimon.globalindex.btree.block_aligned_type import BlockAlignedType
+from pypaimon.globalindex.btree.block_entry import BlockEntry
+from pypaimon.globalindex.btree.memory_slice_input import MemorySliceInput
+
+
+class BlockReader:
+    """Reader for a block."""
+    
+    def __init__(
+        self,
+        block: bytes,
+        record_count: int,
+        comparator: Callable[[bytes, bytes], int]
+    ):
+        """Initialize BlockReader.
+        
+        Args:
+            block: The block data bytes
+            record_count: Number of records in the block
+            comparator: Optional comparator function for keys
+        """
+        self.block = block
+        self.record_count = record_count
+        self.comparator = comparator
+    
+    def block_input(self) -> MemorySliceInput:
+        """Create a MemorySliceInput for this block."""
+        return MemorySliceInput(self.block)
+    
+    def iterator(self) -> 'BlockIterator':
+        """Create a BlockIterator for this reader."""
+        return BlockIterator(self)
+    
+    def seek_to(self, record_position: int) -> int:
+        """Seek to slice position from record position.
+        
+        Args:
+            record_position: The record position to seek to
+            
+        Returns:
+            The slice position
+            
+        Raises:
+            NotImplementedError: If not implemented in subclass
+        """
+        raise NotImplementedError("seekTo must be implemented in subclass")
+    
+    @staticmethod
+    def create(
+        block: bytes,
+        comparator: Optional[Callable[[bytes, bytes], int]] = None
+    ) -> 'BlockReader':
+        """Create a BlockReader from block data.
+        
+        Args:
+            block: The block data bytes
+            comparator: Optional comparator function for keys
+            
+        Returns:
+            A BlockReader instance (AlignedBlockReader or UnalignedBlockReader)
+        """
+        # Read block trailer: last byte is aligned type, previous 4 bytes is 
record size or index length
+        aligned_type_byte = block[-1]
+        aligned_type = BlockAlignedType.from_byte(aligned_type_byte)
+        int_value = struct.unpack('<I', block[-5:-1])[0]
+        
+        if aligned_type == BlockAlignedType.ALIGNED:
+            # Aligned block: records have fixed size
+            data = block[:-5]
+            return AlignedBlockReader(data, int_value, comparator)
+        else:
+            # Unaligned block: uses index
+            index_length = int_value * 4
+            index_offset = len(block) - 5 - index_length
+            data = block[:index_offset]
+            index = block[index_offset:index_offset + index_length]
+            return UnalignedBlockReader(data, index, comparator)
+
+
+class AlignedBlockReader(BlockReader):
+    """Block reader for aligned blocks (fixed record size)."""
+    
+    def __init__(
+        self,
+        data: bytes,
+        record_size: int,
+        comparator: Optional[Callable[[bytes, bytes], int]] = None
+    ):
+        """Initialize AlignedBlockReader.
+        
+        Args:
+            data: The block data bytes
+            record_size: The fixed size of each record
+            comparator: Optional comparator function for keys
+        """
+        record_count = len(data) // record_size
+        super().__init__(data, record_count, comparator)
+        self.record_size = record_size
+    
+    def seek_to(self, record_position: int) -> int:
+        """Seek to slice position from record position.
+        
+        Args:
+            record_position: The record position to seek to
+            
+        Returns:
+            The slice position
+        """
+        return record_position * self.record_size
+
+
+class UnalignedBlockReader(BlockReader):
+    """Block reader for unaligned blocks (uses index)."""
+    
+    def __init__(
+        self,
+        data: bytes,
+        index: bytes,
+        comparator: Optional[Callable[[bytes, bytes], int]] = None
+    ):
+        """Initialize UnalignedBlockReader.
+        
+        Args:
+            data: The block data bytes
+            index: The index bytes (4 bytes per record)
+            comparator: Optional comparator function for keys
+        """
+        record_count = len(index) // 4
+        super().__init__(data, record_count, comparator)
+        self.index = index
+    
+    def seek_to(self, record_position: int) -> int:
+        """Seek to slice position from record position.
+        
+        Args:
+            record_position: The record position to seek to
+            
+        Returns:
+            The slice position
+        """
+        # Read 4-byte integer from index at record_position * 4
+        offset = record_position * 4
+        return struct.unpack('<I', self.index[offset:offset + 4])[0]
+
+
+class BlockIterator:
+    """Iterator for block entries."""
+    
+    def __init__(self, reader: BlockReader):
+        """Initialize BlockIterator.
+        
+        Args:
+            reader: The BlockReader to iterate over
+        """
+        self.reader = reader
+        self.input = reader.block_input()
+        self.polled: Optional[BlockEntry] = None
+    
+    def __iter__(self):
+        """Return self as iterator."""
+        return self
+    
+    def __next__(self) -> BlockEntry:
+        """Get next entry.
+        
+        Returns:
+            The next BlockEntry
+            
+        Raises:
+            StopIteration: If no more entries
+        """
+        if not self.has_next():
+            raise StopIteration
+        
+        if self.polled is not None:
+            result = self.polled
+            self.polled = None
+            return result
+        
+        return self.read_entry()
+    
+    def has_next(self) -> bool:
+        """Check if there are more entries."""
+        return self.polled is not None or self.input.is_readable()
+    
+    def seek_to(self, target_key: bytes) -> bool:
+        """Seek to the first key >= target_key using binary search.
+        
+        Args:
+            target_key: The target key to seek to
+            
+        Returns:
+            True if exact match found, False otherwise
+        """
+        left = 0
+        right = self.reader.record_count - 1
+        
+        while left <= right:
+            mid = left + (right - left) // 2
+            
+            self.input.set_position(self.reader.seek_to(mid))
+            mid_entry = self.read_entry()
+            compare = self.reader.comparator(mid_entry.key, target_key) if 
self.reader.comparator else -1
+            
+            if compare == 0:
+                self.polled = mid_entry
+                return True
+            elif compare > 0:
+                self.polled = mid_entry
+                right = mid - 1
+            else:
+                self.polled = None
+                left = mid + 1
+        
+        return False
+    
+    def read_entry(self) -> BlockEntry:
+        """Read a key-value entry.
+        
+        Returns:
+            A BlockEntry containing key and value
+        """
+        # Read key
+        key_length = self.input.read_var_len_int()
+        key = self.input.read_slice(key_length)
+        
+        # Read value
+        value_length = self.input.read_var_len_int()
+        value = self.input.read_slice(value_length)
+        
+        return BlockEntry(key, value)
diff --git a/paimon-python/pypaimon/globalindex/btree/btree_file_footer.py 
b/paimon-python/pypaimon/globalindex/btree/btree_file_footer.py
new file mode 100644
index 0000000000..7afa3694fd
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/btree_file_footer.py
@@ -0,0 +1,124 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+"""B-tree file footer for storing index metadata."""
+
+from dataclasses import dataclass
+from typing import Optional
+import struct
+
+from pypaimon.globalindex.btree.block_handle import BlockHandle
+
+
+@dataclass
+class BloomFilterHandle:
+    """Handle for bloom filter block."""
+    
+    offset: int
+    size: int
+    expected_entries: int
+    
+    def is_null(self) -> bool:
+        """Check if this handle represents a null bloom filter."""
+        return self.offset == 0 and self.size == 0 and self.expected_entries 
== 0
+
+
+class BTreeFileFooter:
+    """
+    The Footer for BTree file.
+    
+    This footer contains the handles to the bloom filter, index block, and 
null bitmap,
+    allowing efficient navigation of the B-tree index file.
+    """
+    
+    MAGIC_NUMBER = 198732882
+    ENCODED_LENGTH = 48
+    
+    def __init__(
+        self,
+        bloom_filter_handle: Optional[BloomFilterHandle],
+        index_block_handle: BlockHandle,
+        null_bitmap_handle: Optional[BlockHandle]
+    ):
+        """
+        Initialize the BTree file footer.
+        
+        Args:
+            bloom_filter_handle: Handle to the bloom filter block (maybe None)
+            index_block_handle: Handle to the index block
+            null_bitmap_handle: Handle to the null bitmap block (maybe None)
+        """
+        self.bloom_filter_handle = bloom_filter_handle
+        self.index_block_handle = index_block_handle
+        self.null_bitmap_handle = null_bitmap_handle
+    
+    @classmethod
+    def read_footer(cls, data: bytes) -> 'BTreeFileFooter':
+        """
+        Read footer from byte data.
+        
+        Args:
+            data: Byte data containing the footer
+            
+        Returns:
+            BTreeFileFooter instance
+            
+        Raises:
+            ValueError: If magic number doesn't match
+        """
+        offset = 0
+        
+        # Read bloom filter handle
+        bf_offset = struct.unpack('<Q', data[offset:offset + 8])[0]
+        offset += 8
+        bf_size = struct.unpack('<I', data[offset:offset + 4])[0]
+        offset += 4
+        bf_expected = struct.unpack('<Q', data[offset:offset + 8])[0]
+        offset += 8
+        
+        bloom_filter_handle = None
+        if not (bf_offset == 0 and bf_size == 0 and bf_expected == 0):
+            bloom_filter_handle = BloomFilterHandle(bf_offset, bf_size, 
bf_expected)
+        
+        # Read index block handle
+        index_offset = struct.unpack('<Q', data[offset:offset + 8])[0]
+        offset += 8
+        index_size = struct.unpack('<I', data[offset:offset + 4])[0]
+        offset += 4
+        
+        index_block_handle = BlockHandle(index_offset, index_size)
+        
+        # Read null bitmap handle
+        nb_offset = struct.unpack('<Q', data[offset:offset + 8])[0]
+        offset += 8
+        nb_size = struct.unpack('<I', data[offset:offset + 4])[0]
+        offset += 4
+        
+        null_bitmap_handle = None
+        if not (nb_offset == 0 and nb_size == 0):
+            null_bitmap_handle = BlockHandle(nb_offset, nb_size)
+        
+        # Skip padding
+        offset = cls.ENCODED_LENGTH - 4
+        
+        # Read and verify magic number
+        magic_number = struct.unpack('<I', data[offset:offset + 4])[0]
+        if magic_number != cls.MAGIC_NUMBER:
+            raise ValueError("File is not a table (bad magic number)")
+        
+        return cls(bloom_filter_handle, index_block_handle, null_bitmap_handle)
diff --git a/paimon-python/pypaimon/globalindex/btree/btree_index_meta.py 
b/paimon-python/pypaimon/globalindex/btree/btree_index_meta.py
new file mode 100644
index 0000000000..ef64954d14
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/btree_index_meta.py
@@ -0,0 +1,63 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+"""B-tree index metadata."""
+
+from dataclasses import dataclass
+from typing import Optional
+import struct
+
+
+@dataclass
+class BTreeIndexMeta:
+    """
+    Index Meta of each BTree index file.
+    
+    The first key and last key of this meta could be null if the
+    entire btree index file only contains nulls.
+    """
+
+    first_key: Optional[bytes]
+    last_key: Optional[bytes]
+    has_nulls: bool
+
+    def only_nulls(self) -> bool:
+        """Check if this index only contains nulls."""
+        return self.first_key is None and self.last_key is None
+
+    @classmethod
+    def deserialize(cls, data: bytes) -> 'BTreeIndexMeta':
+        """Deserialize metadata from byte array."""
+        offset = 0
+        
+        # Read first key
+        first_key_length = struct.unpack('<I', data[offset:offset + 4])[0]
+        offset += 4
+        first_key = None if first_key_length == 0 else data[offset:offset + 
first_key_length]
+        offset += first_key_length
+        
+        # Read last key
+        last_key_length = struct.unpack('<I', data[offset:offset + 4])[0]
+        offset += 4
+        last_key = None if last_key_length == 0 else data[offset:offset + 
last_key_length]
+        offset += last_key_length
+        
+        # Read has_nulls flag
+        has_nulls = struct.unpack('<B', data[offset:offset + 1])[0] == 1
+        
+        return cls(first_key, last_key, has_nulls)
diff --git a/paimon-python/pypaimon/globalindex/btree/btree_index_reader.py 
b/paimon-python/pypaimon/globalindex/btree/btree_index_reader.py
new file mode 100644
index 0000000000..83d6194d0d
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/btree_index_reader.py
@@ -0,0 +1,460 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+"""
+The BTreeIndexReader implementation for btree index.
+
+This reader provides efficient querying capabilities for B-tree based global 
indexes,
+supporting various predicate operations like equality, range, and null checks.
+"""
+
+import struct
+import zlib
+from typing import List, Optional
+
+from pypaimon.common.file_io import FileIO
+from pypaimon.globalindex.btree.btree_index_meta import BTreeIndexMeta
+from pypaimon.globalindex.btree.key_serializer import KeySerializer
+from pypaimon.globalindex.global_index_meta import GlobalIndexIOMeta
+from pypaimon.globalindex.global_index_reader import FieldRef, 
GlobalIndexReader
+from pypaimon.globalindex.global_index_result import GlobalIndexResult
+from pypaimon.globalindex.roaring_bitmap import RoaringBitmap64
+from pypaimon.globalindex.btree.btree_file_footer import BTreeFileFooter
+from pypaimon.globalindex.btree.sst_file_reader import SstFileReader
+from pypaimon.globalindex.btree.memory_slice_input import MemorySliceInput
+
+
+def _deserialize_row_ids(data: bytes) -> List[int]:
+    data_input = MemorySliceInput(data)
+    length = data_input.read_var_len_int()
+
+    if length <= 0:
+        raise ValueError(f"Invalid row id length: {length}")
+
+    row_ids = []
+    for _ in range(length):
+        row_ids.append(data_input.read_var_len_long())
+
+    return row_ids
+
+
+def _compare_bytes(a: bytes, b: bytes) -> int:
+    """
+    Compare two byte arrays.
+
+    Args:
+        a: First byte array
+        b: Second byte array
+
+    Returns:
+        -1 if a < b, 0 if a == b, 1 if a > b
+    """
+    if a < b:
+        return -1
+    elif a > b:
+        return 1
+    return 0
+
+
+def _is_in_range_bytes(
+        key_bytes: bytes,
+        from_bytes: bytes,
+        to_bytes: bytes,
+        from_inclusive: bool,
+        to_inclusive: bool
+) -> bool:
+    """
+    Check if a key (as bytes) falls within the specified range.
+
+    Args:
+        key_bytes: The key bytes to check
+        from_bytes: Lower bound bytes
+        to_bytes: Upper bound bytes
+        from_inclusive: Whether lower bound is inclusive
+        to_inclusive: Whether upper bound is inclusive
+
+    Returns:
+        True if key is in range, False otherwise
+    """
+    if not from_inclusive and _compare_bytes(key_bytes, from_bytes) == 0:
+        return False
+
+    cmp_to = _compare_bytes(key_bytes, to_bytes)
+    if cmp_to > 0:
+        return False
+    if not to_inclusive and cmp_to == 0:
+        return False
+
+    return True
+
+
+class BTreeIndexReader(GlobalIndexReader):
+    """
+    The GlobalIndexReader implementation for btree index.
+    
+    This reader provides efficient querying capabilities for B-tree based 
global indexes,
+    supporting various predicate operations like equality, range, and null 
checks.
+    """
+
+    FOOTER_ENCODED_LENGTH = 48
+
+    def __init__(
+        self,
+        key_serializer: KeySerializer,
+        file_io: FileIO,
+        index_path: str,
+        io_meta: GlobalIndexIOMeta
+    ):
+        self.key_serializer = key_serializer
+        self.comparator = key_serializer.create_comparator()
+        self.io_meta = io_meta
+        
+        # Deserialize index metadata
+        index_meta = BTreeIndexMeta.deserialize(io_meta.metadata)
+        
+        if index_meta.first_key is not None:
+            self.min_key = key_serializer.deserialize(index_meta.first_key)
+            self.max_key = key_serializer.deserialize(index_meta.last_key)
+        else:
+            # This is possible if this btree index file only stores nulls
+            self.min_key = None
+            self.max_key = None
+        
+        self.has_nulls = index_meta.has_nulls
+        self.input_stream = file_io.new_input_stream(index_path + "/" + 
io_meta.file_name)
+        
+        # Lazy-loaded null bitmap
+        self._null_bitmap: Optional[RoaringBitmap64] = None
+        
+        # Read footer to get index and bloom filter handles
+        self.footer = self._read_footer()
+        
+        # Initialize SST file reader (simplified version)
+        self.reader = self._create_sst_reader()
+
+    def _read_footer(self) -> BTreeFileFooter:
+        """
+        Read the file footer to get metadata handles.
+
+        Returns:
+            BTreeFileFooter containing index_block_handle and 
bloom_filter_handle
+        """
+        file_size = self.io_meta.file_size
+        # Seek to footer position
+        self.input_stream.seek(file_size - BTreeFileFooter.ENCODED_LENGTH)
+        footer_data = self.input_stream.read(BTreeFileFooter.ENCODED_LENGTH)
+
+        # Parse footer
+        return BTreeFileFooter.read_footer(footer_data)
+
+    def _create_sst_reader(self) -> SstFileReader:
+        def comparator(a: bytes, b: bytes) -> int:
+            o1 = self.key_serializer.deserialize(a)
+            o2 = self.key_serializer.deserialize(b)
+            return self.comparator(o1, o2)
+
+        return SstFileReader(self.input_stream, comparator, 
self.footer.index_block_handle)
+
+    def _read_null_bitmap(self) -> RoaringBitmap64:
+        """
+        Read the null bitmap from the index file.
+        
+        Returns:
+            RoaringBitmap64 containing null row IDs
+        """
+        if self._null_bitmap is not None:
+            return self._null_bitmap
+        
+        bitmap = RoaringBitmap64()
+        
+        # Read from the null bitmap block handle if available
+        if self.footer.null_bitmap_handle is not None:
+            self.input_stream.seek(self.footer.null_bitmap_handle.offset)
+            data = self.input_stream.read(self.footer.null_bitmap_handle.size)
+
+            if len(data) >= 4:
+                # Read bitmap data (excluding CRC32)
+                bitmap_length = len(data) - 4
+                bitmap_bytes = data[:bitmap_length]
+                crc32_value = struct.unpack('>I', 
data[bitmap_length:bitmap_length + 4])[0]
+
+                # Verify CRC32
+                actual_crc32 = zlib.crc32(bitmap_bytes) & 0xFFFFFFFF
+                if actual_crc32 == crc32_value:
+                    bitmap = RoaringBitmap64.deserialize(bitmap_bytes)
+
+        self._null_bitmap = bitmap
+        return bitmap
+
+    def _all_non_null_rows(self) -> RoaringBitmap64:
+        """
+        Get all non-null row IDs.
+        
+        This traverses all data to avoid returning null values, which is very
+        advantageous in situations where there are many null values.
+        
+        Returns:
+            RoaringBitmap64 containing all non-null row IDs
+        """
+        if self.min_key is None:
+            return RoaringBitmap64()
+        
+        return self._range_query(self.min_key, self.max_key, True, True)
+
+    def _range_query(
+        self,
+        from_key: object,
+        to_key: object,
+        from_inclusive: bool,
+        to_inclusive: bool
+    ) -> RoaringBitmap64:
+        """
+        Range query on underlying SST File.
+        
+        Args:
+            from_key: Lower bound key
+            to_key: Upper bound key
+            from_inclusive: Whether to include lower bound
+            to_inclusive: Whether to include upper bound
+            
+        Returns:
+            RoaringBitmap64 containing all qualified row IDs
+        """
+        result = RoaringBitmap64()
+        
+        # Use SST reader to efficiently scan the data blocks
+        serialized_from = self.key_serializer.serialize(from_key)
+        serialized_to = self.key_serializer.serialize(to_key)
+
+        # Create iterator and seek to start key
+        file_iter = self.reader.create_iterator()
+        file_iter.seek_to(serialized_from)
+
+        # Iterate through data blocks
+        while True:
+            data_iter = file_iter.read_batch()
+            if data_iter is None:
+                break
+
+            # Process entries in current block
+            while data_iter.has_next():
+                entry = data_iter.__next__()
+                if entry is None:
+                    break
+
+                key_bytes = entry.key
+                value_bytes = entry.value
+
+                # Check if key is within range using byte comparison
+                if _is_in_range_bytes(key_bytes, serialized_from, 
serialized_to, from_inclusive, to_inclusive):
+                    row_ids = _deserialize_row_ids(value_bytes)
+                    for row_id in row_ids:
+                        result.add(row_id)
+                elif _compare_bytes(key_bytes, serialized_to) > 0:
+                    # Key is beyond the range, stop processing
+                    return result
+
+        return result
+
+    def _is_in_range(
+        self,
+        key: object,
+        from_key: object,
+        to_key: object,
+        from_inclusive: bool,
+        to_inclusive: bool
+    ) -> bool:
+        """
+        Check if a key falls within the specified range.
+        
+        Args:
+            key: The key to check
+            from_key: Lower bound
+            to_key: Upper bound
+            from_inclusive: Whether lower bound is inclusive
+            to_inclusive: Whether upper bound is inclusive
+            
+        Returns:
+            True if key is in range, False otherwise
+        """
+        if not from_inclusive and self.comparator(key, from_key) == 0:
+            return False
+        
+        cmp_to = self.comparator(key, to_key)
+        if cmp_to > 0:
+            return False
+        if not to_inclusive and cmp_to == 0:
+            return False
+        
+        return True
+
+    def visit_is_not_null(self, field_ref: FieldRef) -> 
Optional[GlobalIndexResult]:
+        """
+        Visit an is-not-null predicate.
+        
+        Nulls are stored separately in null bitmap.
+        """
+        def supplier():
+            try:
+                return self._all_non_null_rows()
+            except Exception as e:
+                raise RuntimeError("fail to read btree index file.", e)
+        
+        return GlobalIndexResult.create(supplier)
+
+    def visit_is_null(self, field_ref: FieldRef) -> 
Optional[GlobalIndexResult]:
+        """
+        Visit an is-null predicate.
+        
+        Nulls are stored separately in null bitmap.
+        """
+        return GlobalIndexResult.create(self._read_null_bitmap)
+
+    def visit_less_than(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        """Visit a less-than predicate."""
+        def supplier():
+            try:
+                return self._range_query(self.min_key, literal, True, False)
+            except Exception as e:
+                raise RuntimeError("fail to read btree index file.", e)
+        
+        return GlobalIndexResult.create(supplier)
+
+    def visit_greater_or_equal(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        """Visit a greater-or-equal predicate."""
+        def supplier():
+            try:
+                return self._range_query(literal, self.max_key, True, True)
+            except Exception as e:
+                raise RuntimeError("fail to read btree index file.", e)
+        
+        return GlobalIndexResult.create(supplier)
+
+    def visit_not_equal(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        """Visit a not-equal predicate."""
+        def supplier():
+            try:
+                result = self._all_non_null_rows()
+                equal_result = self._range_query(literal, literal, True, True)
+                return RoaringBitmap64.remove_all(result, equal_result)
+            except Exception as e:
+                raise RuntimeError("fail to read btree index file.", e)
+        
+        return GlobalIndexResult.create(supplier)
+
+    def visit_less_or_equal(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        """Visit a less-or-equal predicate."""
+        def supplier():
+            try:
+                return self._range_query(self.min_key, literal, True, True)
+            except Exception as e:
+                raise RuntimeError("fail to read btree index file.", e)
+        
+        return GlobalIndexResult.create(supplier)
+
+    def visit_equal(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        """Visit an equality predicate."""
+        def supplier():
+            return self._range_query(literal, literal, True, True)
+
+        return GlobalIndexResult.create(supplier)
+
+    def visit_greater_than(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        """Visit a greater-than predicate."""
+        def supplier():
+            try:
+                return self._range_query(literal, self.max_key, False, True)
+            except Exception as e:
+                raise RuntimeError("fail to read btree index file.", e)
+        
+        return GlobalIndexResult.create(supplier)
+
+    def visit_in(self, field_ref: FieldRef, literals: List[object]) -> 
Optional[GlobalIndexResult]:
+        """Visit an in predicate."""
+        def supplier():
+            try:
+                result = RoaringBitmap64()
+                for literal in literals:
+                    range_result = self._range_query(literal, literal, True, 
True)
+                    result = RoaringBitmap64.or_(result, range_result)
+                return result
+            except Exception as e:
+                raise RuntimeError("fail to read btree index file.", e)
+        
+        return GlobalIndexResult.create(supplier)
+
+    def visit_not_in(self, field_ref: FieldRef, literals: List[object]) -> 
Optional[GlobalIndexResult]:
+        """Visit a not-in predicate."""
+        def supplier():
+            try:
+                result = self._all_non_null_rows()
+                in_result = self.visit_in(field_ref, literals).results()
+                return RoaringBitmap64.remove_all(result, in_result)
+            except Exception as e:
+                raise RuntimeError("fail to read btree index file.", e)
+        
+        return GlobalIndexResult.create(supplier)
+
+    def visit_starts_with(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        """
+        Visit a starts-with predicate.
+        
+        Note: `startsWith` can also be covered by btree index in the future.
+        """
+        def supplier():
+            try:
+                return self._all_non_null_rows()
+            except Exception as e:
+                raise RuntimeError("fail to read btree index file.", e)
+        
+        return GlobalIndexResult.create(supplier)
+
+    def visit_ends_with(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        """Visit an ends-with predicate."""
+        def supplier():
+            try:
+                return self._all_non_null_rows()
+            except Exception as e:
+                raise RuntimeError("fail to read btree index file.", e)
+        
+        return GlobalIndexResult.create(supplier)
+
+    def visit_contains(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        """Visit a contains predicate."""
+        def supplier():
+            try:
+                return self._all_non_null_rows()
+            except Exception as e:
+                raise RuntimeError("fail to read btree index file.", e)
+        
+        return GlobalIndexResult.create(supplier)
+
+    def visit_like(self, field_ref: FieldRef, literal: object) -> 
Optional[GlobalIndexResult]:
+        """Visit a like predicate."""
+        def supplier():
+            try:
+                return self._all_non_null_rows()
+            except Exception as e:
+                raise RuntimeError("fail to read btree index file.", e)
+        
+        return GlobalIndexResult.create(supplier)
+
+    def close(self) -> None:
+        """Close the reader and release resources."""
+        if self.input_stream is not None:
+            self.input_stream.close()
diff --git a/paimon-python/pypaimon/globalindex/btree/btree_reader.py 
b/paimon-python/pypaimon/globalindex/btree/btree_reader.py
new file mode 100644
index 0000000000..bdd19b48d3
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/btree_reader.py
@@ -0,0 +1,164 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+"""B-tree file footer for storing index metadata."""
+
+from dataclasses import dataclass
+from typing import Optional
+import struct
+
+from pypaimon.globalindex.btree.block_handle import BlockHandle
+
+
+@dataclass
+class BloomFilterHandle:
+    """Handle for bloom filter block."""
+    
+    offset: int
+    size: int
+    expected_entries: int
+    
+    def is_null(self) -> bool:
+        """Check if this handle represents a null bloom filter."""
+        return self.offset == 0 and self.size == 0 and self.expected_entries 
== 0
+
+
+class BTreeFileFooter:
+    """
+    The Footer for BTree file.
+    
+    This footer contains the handles to the bloom filter, index block, and 
null bitmap,
+    allowing efficient navigation of the B-tree index file.
+    """
+    
+    MAGIC_NUMBER = 198732882
+    ENCODED_LENGTH = 48
+    
+    def __init__(
+        self,
+        bloom_filter_handle: Optional[BloomFilterHandle],
+        index_block_handle: BlockHandle,
+        null_bitmap_handle: Optional[BlockHandle]
+    ):
+        """
+        Initialize the BTree file footer.
+        
+        Args:
+            bloom_filter_handle: Handle to the bloom filter block (maybe None)
+            index_block_handle: Handle to the index block
+            null_bitmap_handle: Handle to the null bitmap block (maybe None)
+        """
+        self.bloom_filter_handle = bloom_filter_handle
+        self.index_block_handle = index_block_handle
+        self.null_bitmap_handle = null_bitmap_handle
+    
+    @classmethod
+    def read_footer(cls, data: bytes) -> 'BTreeFileFooter':
+        """
+        Read footer from byte data.
+        
+        Args:
+            data: Byte data containing the footer
+            
+        Returns:
+            BTreeFileFooter instance
+            
+        Raises:
+            ValueError: If magic number doesn't match
+        """
+        offset = 0
+        
+        # Read bloom filter handle
+        bf_offset = struct.unpack('<Q', data[offset:offset + 8])[0]
+        offset += 8
+        bf_size = struct.unpack('<I', data[offset:offset + 4])[0]
+        offset += 4
+        bf_expected = struct.unpack('<Q', data[offset:offset + 8])[0]
+        offset += 8
+        
+        bloom_filter_handle = None
+        if not (bf_offset == 0 and bf_size == 0 and bf_expected == 0):
+            bloom_filter_handle = BloomFilterHandle(bf_offset, bf_size, 
bf_expected)
+        
+        # Read index block handle
+        index_offset = struct.unpack('<Q', data[offset:offset + 8])[0]
+        offset += 8
+        index_size = struct.unpack('<I', data[offset:offset + 4])[0]
+        offset += 4
+        
+        index_block_handle = BlockHandle(index_offset, index_size)
+        
+        # Read null bitmap handle
+        nb_offset = struct.unpack('<Q', data[offset:offset + 8])[0]
+        offset += 8
+        nb_size = struct.unpack('<I', data[offset:offset + 4])[0]
+        offset += 4
+        
+        null_bitmap_handle = None
+        if not (nb_offset == 0 and nb_size == 0):
+            null_bitmap_handle = BlockHandle(nb_offset, nb_size)
+        
+        # Skip padding
+        offset = cls.ENCODED_LENGTH - 4
+        
+        # Read and verify magic number
+        magic_number = struct.unpack('<I', data[offset:offset + 4])[0]
+        if magic_number != cls.MAGIC_NUMBER:
+            raise ValueError("File is not a table (bad magic number)")
+        
+        return cls(bloom_filter_handle, index_block_handle, null_bitmap_handle)
+    
+    def write_footer(self) -> bytes:
+        """
+        Write footer to byte data.
+        
+        Returns:
+            Byte data containing the footer
+        """
+        buffer = bytearray()
+        
+        # Write bloom filter handle
+        if self.bloom_filter_handle is None:
+            buffer.extend(struct.pack('>Q', 0))
+            buffer.extend(struct.pack('>I', 0))
+            buffer.extend(struct.pack('>Q', 0))
+        else:
+            buffer.extend(struct.pack('>Q', self.bloom_filter_handle.offset))
+            buffer.extend(struct.pack('>I', self.bloom_filter_handle.size))
+            buffer.extend(struct.pack('>Q', 
self.bloom_filter_handle.expected_entries))
+        
+        # Write index block handle
+        buffer.extend(struct.pack('>Q', self.index_block_handle.offset))
+        buffer.extend(struct.pack('>I', self.index_block_handle.size))
+        
+        # Write null bitmap handle
+        if self.null_bitmap_handle is None:
+            buffer.extend(struct.pack('>Q', 0))
+            buffer.extend(struct.pack('>I', 0))
+        else:
+            buffer.extend(struct.pack('>Q', self.null_bitmap_handle.offset))
+            buffer.extend(struct.pack('>I', self.null_bitmap_handle.size))
+        
+        # Write magic number
+        buffer.extend(struct.pack('>I', self.MAGIC_NUMBER))
+        
+        # Pad to ENCODED_LENGTH
+        while len(buffer) < self.ENCODED_LENGTH:
+            buffer.extend(b'\x00')
+        
+        return bytes(buffer)
diff --git a/paimon-python/pypaimon/globalindex/btree/key_serializer.py 
b/paimon-python/pypaimon/globalindex/btree/key_serializer.py
new file mode 100644
index 0000000000..79cb32e5e6
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/key_serializer.py
@@ -0,0 +1,231 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+"""Key serializer for B-tree index."""
+
+from abc import ABC, abstractmethod
+from typing import Callable
+import struct
+
+
+class KeySerializer(ABC):
+    """
+    Interface for serializing and deserializing B-tree index keys.
+    
+    This interface provides core methods to ser/de and compare btree index 
keys.
+    """
+
+    @abstractmethod
+    def serialize(self, key: object) -> bytes:
+        """Serialize a key to bytes."""
+        pass
+
+    @abstractmethod
+    def deserialize(self, data: bytes) -> object:
+        """Deserialize bytes to a key."""
+        pass
+
+    @abstractmethod
+    def create_comparator(self) -> Callable[[object, object], int]:
+        """
+        Create a comparator function for keys.
+        
+        Returns:
+            A function that takes two keys and returns:
+            - negative if first < second
+            - 0 if first == second
+            - positive if first > second
+        """
+        pass
+
+
+class StringSerializer(KeySerializer):
+    """Serializer for STRING type."""
+
+    def serialize(self, key: object) -> bytes:
+        """Serialize a string key to bytes."""
+        if isinstance(key, str):
+            return key.encode('utf-8')
+        return str(key).encode('utf-8')
+
+    def deserialize(self, data: bytes) -> object:
+        """Deserialize bytes to a string key."""
+        return data.decode('utf-8')
+
+    def create_comparator(self) -> Callable[[object, object], int]:
+        """Create a comparator for string keys."""
+        def compare(a: object, b: object) -> int:
+            str_a = a if isinstance(a, str) else str(a)
+            str_b = b if isinstance(b, str) else str(b)
+            if str_a < str_b:
+                return -1
+            elif str_a > str_b:
+                return 1
+            return 0
+        return compare
+
+
+class LongSerializer(KeySerializer):
+    """Serializer for BIGINT type."""
+
+    def serialize(self, key: object) -> bytes:
+        """Serialize a long key to bytes."""
+        return struct.pack('>q', int(key))
+
+    def deserialize(self, data: bytes) -> object:
+        """Deserialize bytes to a long key."""
+        return struct.unpack('>q', data)[0]
+
+    def create_comparator(self) -> Callable[[object, object], int]:
+        """Create a comparator for long keys."""
+        def compare(a: object, b: object) -> int:
+            long_a = int(a)
+            long_b = int(b)
+            if long_a < long_b:
+                return -1
+            elif long_a > long_b:
+                return 1
+            return 0
+        return compare
+
+
+class IntSerializer(KeySerializer):
+    """Serializer for INT type."""
+
+    def serialize(self, key: object) -> bytes:
+        """Serialize an int key to bytes."""
+        return struct.pack('>i', int(key))
+
+    def deserialize(self, data: bytes) -> object:
+        """Deserialize bytes to an int key."""
+        return struct.unpack('>i', data)[0]
+
+    def create_comparator(self) -> Callable[[object, object], int]:
+        """Create a comparator for int keys."""
+        def compare(a: object, b: object) -> int:
+            int_a = int(a)
+            int_b = int(b)
+            if int_a < int_b:
+                return -1
+            elif int_a > int_b:
+                return 1
+            return 0
+        return compare
+
+
+class FloatSerializer(KeySerializer):
+    """Serializer for FLOAT type."""
+
+    def serialize(self, key: object) -> bytes:
+        """Serialize a float key to bytes."""
+        return struct.pack('>f', float(key))
+
+    def deserialize(self, data: bytes) -> object:
+        """Deserialize bytes to a float key."""
+        return struct.unpack('>f', data)[0]
+
+    def create_comparator(self) -> Callable[[object, object], int]:
+        """Create a comparator for float keys."""
+        def compare(a: object, b: object) -> int:
+            float_a = float(a)
+            float_b = float(b)
+            if float_a < float_b:
+                return -1
+            elif float_a > float_b:
+                return 1
+            return 0
+        return compare
+
+
+class DoubleSerializer(KeySerializer):
+    """Serializer for DOUBLE type."""
+
+    def serialize(self, key: object) -> bytes:
+        """Serialize a double key to bytes."""
+        return struct.pack('>d', float(key))
+
+    def deserialize(self, data: bytes) -> object:
+        """Deserialize bytes to a double key."""
+        return struct.unpack('>d', data)[0]
+
+    def create_comparator(self) -> Callable[[object, object], int]:
+        """Create a comparator for double keys."""
+        def compare(a: object, b: object) -> int:
+            double_a = float(a)
+            double_b = float(b)
+            if double_a < double_b:
+                return -1
+            elif double_a > double_b:
+                return 1
+            return 0
+        return compare
+
+
+class BooleanSerializer(KeySerializer):
+    """Serializer for BOOLEAN type."""
+
+    def serialize(self, key: object) -> bytes:
+        """Serialize a boolean key to bytes."""
+        return struct.pack('>B', 1 if key else 0)
+
+    def deserialize(self, data: bytes) -> object:
+        """Deserialize bytes to a boolean key."""
+        return struct.unpack('>B', data)[0] == 1
+
+    def create_comparator(self) -> Callable[[object, object], int]:
+        """Create a comparator for boolean keys."""
+        def compare(a: object, b: object) -> int:
+            bool_a = bool(a)
+            bool_b = bool(b)
+            if bool_a < bool_b:
+                return -1
+            elif bool_a > bool_b:
+                return 1
+            return 0
+        return compare
+
+
+def create_serializer(data_type: str) -> KeySerializer:
+    """
+    Factory method to create a KeySerializer based on data type.
+    
+    Args:
+        data_type: String representation of the data type
+        
+    Returns:
+        Appropriate KeySerializer instance
+        
+    Raises:
+        ValueError: If the data type is not supported
+    """
+    data_type_lower = data_type.lower()
+    
+    if data_type_lower in ('string', 'varchar', 'char'):
+        return StringSerializer()
+    elif data_type_lower in ('bigint', 'long'):
+        return LongSerializer()
+    elif data_type_lower in ('int', 'integer'):
+        return IntSerializer()
+    elif data_type_lower in ('float'):
+        return FloatSerializer()
+    elif data_type_lower in ('double'):
+        return DoubleSerializer()
+    elif data_type_lower in ('boolean', 'bool'):
+        return BooleanSerializer()
+    else:
+        raise ValueError(f"DataType: {data_type} is not supported by btree 
index now.")
diff --git a/paimon-python/pypaimon/globalindex/btree/memory_slice_input.py 
b/paimon-python/pypaimon/globalindex/btree/memory_slice_input.py
new file mode 100644
index 0000000000..3897f8c162
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/memory_slice_input.py
@@ -0,0 +1,162 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+"""Input for byte array."""
+
+import struct
+
+
+class MemorySliceInput:
+    """Input for byte array."""
+    
+    def __init__(self, data: bytes):
+        """Initialize MemorySliceInput.
+        
+        Args:
+            data: The byte array to read from
+        """
+        self.data = data
+        self._position = 0
+    
+    def position(self) -> int:
+        """Get current position."""
+        return self._position
+    
+    def set_position(self, position: int) -> None:
+        """Set current position.
+        
+        Args:
+            position: The new position
+            
+        Raises:
+            IndexError: If position is out of bounds
+        """
+        if position < 0 or position > len(self.data):
+            raise IndexError(f"Position {position} out of bounds [0, 
{len(self.data)}]")
+        self._position = position
+    
+    def is_readable(self) -> bool:
+        """Check if there are more bytes to read."""
+        return self.available() > 0
+    
+    def available(self) -> int:
+        """Get number of available bytes."""
+        return len(self.data) - self._position
+    
+    def read_byte(self) -> int:
+        """Read a single byte.
+        
+        Returns:
+            The byte value (0-255)
+            
+        Raises:
+            IndexError: If no more bytes available
+        """
+        if self._position >= len(self.data):
+            raise IndexError("No more bytes available")
+        value = self.data[self._position]
+        self._position += 1
+        return value
+    
+    def read_unsigned_byte(self) -> int:
+        """Read an unsigned byte.
+        
+        Returns:
+            The byte value (0-255)
+        """
+        return self.read_byte() & 0xFF
+    
+    def read_int(self) -> int:
+        """Read a 4-byte integer (big-endian).
+        
+        Returns:
+            The integer value
+        """
+        if self._position + 4 > len(self.data):
+            raise IndexError("Not enough bytes to read int")
+        value = struct.unpack('>I', self.data[self._position:self._position + 
4])[0]
+        self._position += 4
+        return value
+    
+    def read_var_len_int(self) -> int:
+        """Read a variable-length integer.
+        
+        Returns:
+            The integer value
+            
+        Raises:
+            ValueError: If integer is malformed
+        """
+        result = 0
+        offset = 0
+        while offset < 32:
+            b = self.read_unsigned_byte()
+            result |= (b & 0x7F) << offset
+            if (b & 0x80) == 0:
+                return result
+            offset += 7
+        raise ValueError("Malformed integer")
+    
+    def read_long(self) -> int:
+        """Read an 8-byte long (big-endian).
+        
+        Returns:
+            The long value
+        """
+        if self._position + 8 > len(self.data):
+            raise IndexError("Not enough bytes to read long")
+        value = struct.unpack('>Q', self.data[self._position:self._position + 
8])[0]
+        self._position += 8
+        return value
+    
+    def read_var_len_long(self) -> int:
+        """Read a variable-length long.
+        
+        Returns:
+            The long value
+            
+        Raises:
+            ValueError: If long is malformed
+        """
+        result = 0
+        offset = 0
+        while offset < 64:
+            b = self.read_unsigned_byte()
+            result |= (b & 0x7F) << offset
+            if (b & 0x80) == 0:
+                return result
+            offset += 7
+        raise ValueError("Malformed long")
+    
+    def read_slice(self, length: int) -> bytes:
+        """Read a slice of bytes.
+        
+        Args:
+            length: Number of bytes to read
+            
+        Returns:
+            The slice of bytes
+            
+        Raises:
+            IndexError: If not enough bytes available
+        """
+        if self._position + length > len(self.data):
+            raise IndexError(f"Not enough bytes to read slice of length 
{length}")
+        value = self.data[self._position:self._position + length]
+        self._position += length
+        return value
diff --git a/paimon-python/pypaimon/globalindex/btree/sst_file_reader.py 
b/paimon-python/pypaimon/globalindex/btree/sst_file_reader.py
new file mode 100644
index 0000000000..2ac19d07dc
--- /dev/null
+++ b/paimon-python/pypaimon/globalindex/btree/sst_file_reader.py
@@ -0,0 +1,194 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+"""
+An SST File Reader which serves point queries and range queries.
+
+Users can call createIterator to create a file iterator and then use seek
+and read methods to do range queries.
+
+Note that this class is NOT thread-safe.
+"""
+
+import struct
+import zlib
+from typing import Optional, Callable
+from typing import BinaryIO
+
+from pypaimon.globalindex.btree.btree_file_footer import BlockHandle
+from pypaimon.globalindex.btree.block_entry import BlockEntry
+from pypaimon.globalindex.btree.block_reader import BlockReader, BlockIterator
+from pypaimon.globalindex.btree.memory_slice_input import MemorySliceInput
+
+
+class SstFileIterator:
+    """
+    Iterator for range queries on SST file.
+    
+    Allows seeking to a position and reading batches of records.
+    """
+    
+    def __init__(self, read_block: Callable[[BlockHandle], BlockReader], 
index_block_iterator: BlockIterator):
+        self.read_block = read_block
+        self.index_iterator = index_block_iterator
+        self.sought_data_block: Optional[BlockIterator] = None
+    
+    def seek_to(self, key: bytes) -> None:
+        """
+        Seek to the position of the record whose key is exactly equal to or
+        greater than the specified key.
+        
+        Args:
+            key: The key to seek to
+        """
+        self.index_iterator.seek_to(key)
+        
+        if self.index_iterator.has_next():
+            index_entry: BlockEntry = self.index_iterator.__next__()
+            block_handle_bytes = index_entry.__getattribute__("value")
+            handle_input = MemorySliceInput(block_handle_bytes)
+
+            # Parse block handle
+            block_handle = BlockHandle(
+                handle_input.read_var_len_long(),
+                handle_input.read_var_len_int()
+            )
+            
+            # Create data block reader and seek
+            data_block_reader = self.read_block(block_handle)
+            self.sought_data_block = data_block_reader.iterator()
+            self.sought_data_block.seek_to(key)
+        else:
+            self.sought_data_block = None
+    
+    def read_batch(self) -> Optional[BlockIterator]:
+        """
+        Read a batch of records from this SST File and move current record
+        position to the next batch.
+        
+        Returns:
+            BlockIterator for the current batch, or None if at file end
+        """
+        if self.sought_data_block is not None:
+            result = self.sought_data_block
+            self.sought_data_block = None
+            return result
+        
+        if not self.index_iterator.has_next():
+            return None
+        
+        index_entry = self.index_iterator.__next__()
+        block_handle_bytes = index_entry.value
+        
+        # Parse block handle
+        block_handle = BlockHandle(
+            struct.unpack('<Q', block_handle_bytes[0:8])[0],
+            struct.unpack('<I', block_handle_bytes[8:12])[0]
+        )
+        
+        # Create data block reader
+        data_block_reader = self.read_block(block_handle)
+        return data_block_reader.iterator()
+
+
+class SstFileReader:
+    """
+    An SST File Reader which serves point queries and range queries.
+    
+    Users can call createIterator to create a file iterator and then use seek
+    and read methods to do range queries.
+    
+    Note that this class is NOT thread-safe.
+    """
+    
+    def __init__(
+        self,
+        input_stream: BinaryIO,
+        comparator: Callable[[bytes, bytes], int],
+        index_block_handle: BlockHandle
+    ):
+        self.comparator = comparator
+        self.input_stream = input_stream
+        self.index_block = self._read_block(index_block_handle, True)
+
+    def _read_block(self, block_handle: BlockHandle, index: bool) -> 
BlockReader:
+        self.input_stream.seek(block_handle.offset)
+        # Read block data + 5 bytes trailer (1 byte compression type + 4 bytes 
CRC32)
+        block_data = self.input_stream.read(block_handle.size + 5)
+        # Parse block trailer (last 5 bytes: 1 byte compression type + 4 bytes 
CRC32)
+        if len(block_data) < 5:
+            raise ValueError("Block data too short to contain trailer")
+
+        trailer_offset = len(block_data) - 5
+        compression_type = block_data[trailer_offset]
+        if compression_type != 0:
+            raise ValueError("Compression type not supported")
+
+        crc32_value = struct.unpack('<I', block_data[trailer_offset + 
1:trailer_offset + 5])[0]
+
+        # Extract block data (without trailer)
+        block_bytes = block_data[:trailer_offset]
+
+        # Verify CRC32
+        actual_crc32 = self.crc32c(block_bytes, compression_type)
+        if actual_crc32 != crc32_value:
+            raise ValueError(f"CRC32 mismatch: expected {crc32_value}, got 
{actual_crc32}")
+
+        return BlockReader.create(block_bytes, self._slice_comparator if index 
else self.comparator)
+
+    @staticmethod
+    def _slice_comparator(a: bytes, b: bytes) -> int:
+        if a < b:
+            return -1
+        elif a > b:
+            return 1
+        return 0
+    
+    def create_iterator(self) -> SstFileIterator:
+        def read_block(block: BlockHandle) -> BlockReader:
+            return self._read_block(block, False)
+
+        return SstFileIterator(
+            read_block,
+            self.index_block.iterator())
+
+    @staticmethod
+    def crc32c(bytes_data: bytes, compression_type_id: int) -> int:
+        """
+        Calculate CRC32 checksum for the given bytes and compression type.
+
+        Args:
+            bytes_data: The byte array to calculate checksum for
+            compression_type_id: The persistent ID of the compression type 
(0-255)
+
+        Returns:
+            The CRC32 checksum value
+        """
+        # Calculate CRC32 for the data bytes
+        crc_value = zlib.crc32(bytes_data)
+
+        # Update with compression type ID (lower 8 bits only)
+        crc_value = zlib.crc32(bytes([compression_type_id & 0xFF]), crc_value)
+
+        # Return as unsigned 32-bit integer
+        return crc_value & 0xFFFFFFFF
+    
+    def close(self) -> None:
+        """Close the reader and release resources."""
+        # No resources to release in this implementation
+        pass
diff --git a/paimon-python/pypaimon/globalindex/global_index_evaluator.py 
b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
index 6f3fb191f6..dd0eb57235 100644
--- a/paimon-python/pypaimon/globalindex/global_index_evaluator.py
+++ b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
@@ -133,7 +133,7 @@ class GlobalIndexEvaluator:
             readers = self._readers_function(field_id)
             self._index_readers_cache[field_id] = readers
         
-        field_ref = FieldRef(predicate.index, predicate.field, 
str(field.data_type))
+        field_ref = FieldRef(predicate.index, predicate.field, str(field.type))
         
         compound_result: Optional[GlobalIndexResult] = None
         
diff --git a/paimon-python/pypaimon/globalindex/global_index_scan_builder.py 
b/paimon-python/pypaimon/globalindex/global_index_scan_builder.py
index 8b9a7e0f58..3d449d1288 100644
--- a/paimon-python/pypaimon/globalindex/global_index_scan_builder.py
+++ b/paimon-python/pypaimon/globalindex/global_index_scan_builder.py
@@ -180,6 +180,17 @@ class RowRangeGlobalIndexScanner:
                         options=options
                     )
                     readers.append(reader)
+                if index_type == 'btree':
+                    from pypaimon.globalindex.btree import (BTreeIndexReader)
+                    from pypaimon.globalindex.btree.key_serializer import 
(StringSerializer)
+                    for metadata in io_metas:
+                        reader = BTreeIndexReader(
+                            key_serializer=StringSerializer(),  # TODO create 
serializer from type
+                            file_io=file_io,
+                            index_path=index_path,
+                            io_meta=metadata
+                        )
+                        readers.append(reader)
             
             return readers
         
diff --git a/paimon-python/pypaimon/globalindex/roaring_bitmap.py 
b/paimon-python/pypaimon/globalindex/roaring_bitmap.py
index 86781d29eb..c91f0c149a 100644
--- a/paimon-python/pypaimon/globalindex/roaring_bitmap.py
+++ b/paimon-python/pypaimon/globalindex/roaring_bitmap.py
@@ -119,6 +119,12 @@ class RoaringBitmap64:
         result._data = a._data | b._data
         return result
 
+    @staticmethod
+    def remove_all(a: 'RoaringBitmap64', b: 'RoaringBitmap64') -> 
'RoaringBitmap64':
+        result = RoaringBitmap64()
+        result._data = a._data - b._data
+        return result
+
     def serialize(self) -> bytes:
         """Serialize the bitmap to bytes."""
         # Simple serialization format: count followed by sorted values
diff --git a/paimon-python/pypaimon/read/table_read.py 
b/paimon-python/pypaimon/read/table_read.py
index f546c4be6b..7492c5c447 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -254,7 +254,7 @@ class TableRead:
         elif self.table.options.data_evolution_enabled():
             return DataEvolutionSplitRead(
                 table=self.table,
-                predicate=self.predicate,
+                predicate=None,  # Never push predicate to split read
                 read_type=self.read_type,
                 split=split,
                 row_tracking_enabled=True
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py 
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index b56ced657c..86c2866676 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -25,6 +25,7 @@ import pyarrow as pa
 from parameterized import parameterized
 from pypaimon.catalog.catalog_factory import CatalogFactory
 from pypaimon.schema.schema import Schema
+from pypaimon.read.read_builder import ReadBuilder
 
 if sys.version_info[:2] == (3, 6):
     from pypaimon.tests.py36.pyarrow_compat import table_sort_by
@@ -318,3 +319,30 @@ class JavaPyReadWriteTest(unittest.TestCase):
             'b': [i * 100 for i in range(1, 10001) if i * 10 != 81930]
         }, schema=pa_schema)
         self.assertEqual(expected, actual)
+
+    def test_read_btree_index_table(self):
+        table = self.catalog.get_table('default.test_btree_index')
+        read_builder: ReadBuilder = table.new_read_builder()
+
+        # read all
+        table_read = read_builder.new_read()
+        splits = read_builder.new_scan().plan().splits()
+        actual = table_sort_by(table_read.to_arrow(splits), 'k')
+        expected = pa.Table.from_pydict({
+            'k': ["k1", "k2", "k3"],
+            'v': ["v1", "v2", "v3"]
+        })
+        self.assertEqual(expected, actual)
+
+        # read using index
+        predicate_builder = read_builder.new_predicate_builder()
+        predicate = predicate_builder.equal('k', 'k2')
+        read_builder.with_filter(predicate)
+        table_read = read_builder.new_read()
+        splits = read_builder.new_scan().plan().splits()
+        actual = table_sort_by(table_read.to_arrow(splits), 'k')
+        expected = pa.Table.from_pydict({
+            'k': ["k2"],
+            'v': ["v2"]
+        })
+        self.assertEqual(expected, actual)

Reply via email to