This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 6948f854bc [python] Python BTree index reader supports INT and BIGINT 
(#7163)
6948f854bc is described below

commit 6948f854bcdd7a2eb1c4376836f99aa5fd80ac6b
Author: Jingsong Lee <[email protected]>
AuthorDate: Sun Feb 1 15:03:44 2026 +0800

    [python] Python BTree index reader supports INT and BIGINT (#7163)
---
 .../test/java/org/apache/paimon/JavaPyE2ETest.java |  48 ++++++---
 .../pypaimon/globalindex/btree/block_reader.py     |   2 +-
 .../globalindex/btree/btree_index_reader.py        |  81 +++------------
 .../pypaimon/globalindex/btree/key_serializer.py   | 114 +++------------------
 .../pypaimon/globalindex/btree/sst_file_reader.py  |  16 +--
 .../pypaimon/globalindex/global_index_evaluator.py |  28 +++--
 .../globalindex/global_index_scan_builder.py       |  13 +--
 .../pypaimon/tests/e2e/java_py_read_write_test.py  |  26 +++--
 8 files changed, 102 insertions(+), 226 deletions(-)

diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java 
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index 238f4c08e4..c397cef388 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -411,13 +411,37 @@ public class JavaPyE2ETest {
     @Test
     @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
     public void testBtreeIndexWrite() throws Exception {
+        testBtreeIndexWriteString();
+        testBtreeIndexWriteInt();
+        testBtreeIndexWriteBigInt();
+    }
+
+    private void testBtreeIndexWriteString() throws Exception {
+        testBtreeIndexWriteGeneric(
+                DataTypes.STRING(),
+                "test_btree_index_string",
+                BinaryString.fromString("k1"),
+                BinaryString.fromString("k2"),
+                BinaryString.fromString("k3"));
+    }
+
+    private void testBtreeIndexWriteInt() throws Exception {
+        testBtreeIndexWriteGeneric(DataTypes.INT(), "test_btree_index_int", 
100, 200, 300);
+    }
+
+    private void testBtreeIndexWriteBigInt() throws Exception {
+        testBtreeIndexWriteGeneric(
+                DataTypes.BIGINT(), "test_btree_index_bigint", 1000L, 2000L, 
3000L);
+    }
+
+    private <T> void testBtreeIndexWriteGeneric(
+            DataType keyType, String tableName, Object key1, Object key2, 
Object key3)
+            throws Exception {
         // create table
         RowType rowType =
-                RowType.of(
-                        new DataType[] {DataTypes.STRING(), 
DataTypes.STRING()},
-                        new String[] {"k", "v"});
+                RowType.of(new DataType[] {keyType, DataTypes.STRING()}, new 
String[] {"k", "v"});
         Options options = new Options();
-        Path tablePath = new Path(warehouse.toString() + 
"/default.db/test_btree_index");
+        Path tablePath = new Path(warehouse.toString() + "/default.db/" + 
tableName);
         options.set(PATH, tablePath.toString());
         options.set(ROW_TRACKING_ENABLED, true);
         options.set(DATA_EVOLUTION_ENABLED, true);
@@ -442,12 +466,9 @@ public class JavaPyE2ETest {
         BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
         try (BatchTableWrite write = writeBuilder.newWrite();
                 BatchTableCommit commit = writeBuilder.newCommit()) {
-            write.write(
-                    GenericRow.of(BinaryString.fromString("k1"), 
BinaryString.fromString("v1")));
-            write.write(
-                    GenericRow.of(BinaryString.fromString("k2"), 
BinaryString.fromString("v2")));
-            write.write(
-                    GenericRow.of(BinaryString.fromString("k3"), 
BinaryString.fromString("v3")));
+            write.write(GenericRow.of(key1, BinaryString.fromString("v1")));
+            write.write(GenericRow.of(key2, BinaryString.fromString("v2")));
+            write.write(GenericRow.of(key3, BinaryString.fromString("v3")));
             commit.commit(write.prepareCommit());
         }
 
@@ -468,14 +489,13 @@ public class JavaPyE2ETest {
         // read index
         PredicateBuilder predicateBuilder = new 
PredicateBuilder(table.rowType());
         ReadBuilder readBuilder =
-                table.newReadBuilder()
-                        .withFilter(predicateBuilder.equal(0, 
BinaryString.fromString("k2")));
+                table.newReadBuilder().withFilter(predicateBuilder.equal(0, 
key2));
         List<String> result = new ArrayList<>();
         readBuilder
                 .newRead()
                 .createReader(readBuilder.newScan().plan())
-                .forEachRemaining(r -> result.add(r.getString(0) + ":" + 
r.getString(1)));
-        assertThat(result).containsOnly("k2:v2");
+                .forEachRemaining(r -> result.add(r.getString(1).toString()));
+        assertThat(result).containsOnly("v2");
     }
 
     // Helper method from TableTestBase
diff --git a/paimon-python/pypaimon/globalindex/btree/block_reader.py 
b/paimon-python/pypaimon/globalindex/btree/block_reader.py
index f0091d0231..cff6c99794 100644
--- a/paimon-python/pypaimon/globalindex/btree/block_reader.py
+++ b/paimon-python/pypaimon/globalindex/btree/block_reader.py
@@ -223,7 +223,7 @@ class BlockIterator:
             
             self.input.set_position(self.reader.seek_to(mid))
             mid_entry = self.read_entry()
-            compare = self.reader.comparator(mid_entry.key, target_key) if 
self.reader.comparator else -1
+            compare = self.reader.comparator(mid_entry.key, target_key)
             
             if compare == 0:
                 self.polled = mid_entry
diff --git a/paimon-python/pypaimon/globalindex/btree/btree_index_reader.py 
b/paimon-python/pypaimon/globalindex/btree/btree_index_reader.py
index 83d6194d0d..f5172b167a 100644
--- a/paimon-python/pypaimon/globalindex/btree/btree_index_reader.py
+++ b/paimon-python/pypaimon/globalindex/btree/btree_index_reader.py
@@ -53,56 +53,6 @@ def _deserialize_row_ids(data: bytes) -> List[int]:
     return row_ids
 
 
-def _compare_bytes(a: bytes, b: bytes) -> int:
-    """
-    Compare two byte arrays.
-
-    Args:
-        a: First byte array
-        b: Second byte array
-
-    Returns:
-        -1 if a < b, 0 if a == b, 1 if a > b
-    """
-    if a < b:
-        return -1
-    elif a > b:
-        return 1
-    return 0
-
-
-def _is_in_range_bytes(
-        key_bytes: bytes,
-        from_bytes: bytes,
-        to_bytes: bytes,
-        from_inclusive: bool,
-        to_inclusive: bool
-) -> bool:
-    """
-    Check if a key (as bytes) falls within the specified range.
-
-    Args:
-        key_bytes: The key bytes to check
-        from_bytes: Lower bound bytes
-        to_bytes: Upper bound bytes
-        from_inclusive: Whether lower bound is inclusive
-        to_inclusive: Whether upper bound is inclusive
-
-    Returns:
-        True if key is in range, False otherwise
-    """
-    if not from_inclusive and _compare_bytes(key_bytes, from_bytes) == 0:
-        return False
-
-    cmp_to = _compare_bytes(key_bytes, to_bytes)
-    if cmp_to > 0:
-        return False
-    if not to_inclusive and cmp_to == 0:
-        return False
-
-    return True
-
-
 class BTreeIndexReader(GlobalIndexReader):
     """
     The GlobalIndexReader implementation for btree index.
@@ -225,25 +175,21 @@ class BTreeIndexReader(GlobalIndexReader):
     ) -> RoaringBitmap64:
         """
         Range query on underlying SST File.
-        
+
         Args:
             from_key: Lower bound key
             to_key: Upper bound key
             from_inclusive: Whether to include lower bound
             to_inclusive: Whether to include upper bound
-            
+
         Returns:
             RoaringBitmap64 containing all qualified row IDs
         """
         result = RoaringBitmap64()
-        
-        # Use SST reader to efficiently scan the data blocks
-        serialized_from = self.key_serializer.serialize(from_key)
-        serialized_to = self.key_serializer.serialize(to_key)
 
         # Create iterator and seek to start key
         file_iter = self.reader.create_iterator()
-        file_iter.seek_to(serialized_from)
+        file_iter.seek_to(self.key_serializer.serialize(from_key))
 
         # Iterate through data blocks
         while True:
@@ -260,15 +206,22 @@ class BTreeIndexReader(GlobalIndexReader):
                 key_bytes = entry.key
                 value_bytes = entry.value
 
-                # Check if key is within range using byte comparison
-                if _is_in_range_bytes(key_bytes, serialized_from, 
serialized_to, from_inclusive, to_inclusive):
-                    row_ids = _deserialize_row_ids(value_bytes)
-                    for row_id in row_ids:
-                        result.add(row_id)
-                elif _compare_bytes(key_bytes, serialized_to) > 0:
-                    # Key is beyond the range, stop processing
+                key = self.key_serializer.deserialize(key_bytes)
+
+                # Skip if key equals from_key and from_inclusive is False
+                if not from_inclusive and self.comparator(key, from_key) == 0:
+                    continue
+
+                # Check if key is beyond the range
+                difference = self.comparator(key, to_key)
+                if difference > 0 or (not to_inclusive and difference == 0):
                     return result
 
+                # Add all row IDs for this key
+                row_ids = _deserialize_row_ids(value_bytes)
+                for row_id in row_ids:
+                    result.add(row_id)
+
         return result
 
     def _is_in_range(
diff --git a/paimon-python/pypaimon/globalindex/btree/key_serializer.py 
b/paimon-python/pypaimon/globalindex/btree/key_serializer.py
index 79cb32e5e6..f303b2634e 100644
--- a/paimon-python/pypaimon/globalindex/btree/key_serializer.py
+++ b/paimon-python/pypaimon/globalindex/btree/key_serializer.py
@@ -22,6 +22,9 @@ from abc import ABC, abstractmethod
 from typing import Callable
 import struct
 
+from pypaimon.schema.data_types import DataType
+from pypaimon.schema.data_types import AtomicType
+
 
 class KeySerializer(ABC):
     """
@@ -85,11 +88,11 @@ class LongSerializer(KeySerializer):
 
     def serialize(self, key: object) -> bytes:
         """Serialize a long key to bytes."""
-        return struct.pack('>q', int(key))
+        return struct.pack('<q', int(key))
 
     def deserialize(self, data: bytes) -> object:
         """Deserialize bytes to a long key."""
-        return struct.unpack('>q', data)[0]
+        return struct.unpack('<q', data)[0]
 
     def create_comparator(self) -> Callable[[object, object], int]:
         """Create a comparator for long keys."""
@@ -109,11 +112,11 @@ class IntSerializer(KeySerializer):
 
     def serialize(self, key: object) -> bytes:
         """Serialize an int key to bytes."""
-        return struct.pack('>i', int(key))
+        return struct.pack('<i', int(key))
 
     def deserialize(self, data: bytes) -> object:
         """Deserialize bytes to an int key."""
-        return struct.unpack('>i', data)[0]
+        return struct.unpack('<i', data)[0]
 
     def create_comparator(self) -> Callable[[object, object], int]:
         """Create a comparator for int keys."""
@@ -128,104 +131,15 @@ class IntSerializer(KeySerializer):
         return compare
 
 
-class FloatSerializer(KeySerializer):
-    """Serializer for FLOAT type."""
-
-    def serialize(self, key: object) -> bytes:
-        """Serialize a float key to bytes."""
-        return struct.pack('>f', float(key))
-
-    def deserialize(self, data: bytes) -> object:
-        """Deserialize bytes to a float key."""
-        return struct.unpack('>f', data)[0]
-
-    def create_comparator(self) -> Callable[[object, object], int]:
-        """Create a comparator for float keys."""
-        def compare(a: object, b: object) -> int:
-            float_a = float(a)
-            float_b = float(b)
-            if float_a < float_b:
-                return -1
-            elif float_a > float_b:
-                return 1
-            return 0
-        return compare
-
-
-class DoubleSerializer(KeySerializer):
-    """Serializer for DOUBLE type."""
-
-    def serialize(self, key: object) -> bytes:
-        """Serialize a double key to bytes."""
-        return struct.pack('>d', float(key))
-
-    def deserialize(self, data: bytes) -> object:
-        """Deserialize bytes to a double key."""
-        return struct.unpack('>d', data)[0]
-
-    def create_comparator(self) -> Callable[[object, object], int]:
-        """Create a comparator for double keys."""
-        def compare(a: object, b: object) -> int:
-            double_a = float(a)
-            double_b = float(b)
-            if double_a < double_b:
-                return -1
-            elif double_a > double_b:
-                return 1
-            return 0
-        return compare
-
-
-class BooleanSerializer(KeySerializer):
-    """Serializer for BOOLEAN type."""
-
-    def serialize(self, key: object) -> bytes:
-        """Serialize a boolean key to bytes."""
-        return struct.pack('>B', 1 if key else 0)
-
-    def deserialize(self, data: bytes) -> object:
-        """Deserialize bytes to a boolean key."""
-        return struct.unpack('>B', data)[0] == 1
-
-    def create_comparator(self) -> Callable[[object, object], int]:
-        """Create a comparator for boolean keys."""
-        def compare(a: object, b: object) -> int:
-            bool_a = bool(a)
-            bool_b = bool(b)
-            if bool_a < bool_b:
-                return -1
-            elif bool_a > bool_b:
-                return 1
-            return 0
-        return compare
-
-
-def create_serializer(data_type: str) -> KeySerializer:
-    """
-    Factory method to create a KeySerializer based on data type.
-    
-    Args:
-        data_type: String representation of the data type
-        
-    Returns:
-        Appropriate KeySerializer instance
-        
-    Raises:
-        ValueError: If the data type is not supported
-    """
-    data_type_lower = data_type.lower()
-    
-    if data_type_lower in ('string', 'varchar', 'char'):
+def create_serializer(data_type: DataType) -> KeySerializer:
+    if not isinstance(data_type, AtomicType):
+        raise ValueError(f"Key serializer only support AtomicType yet, meet 
{data_type.__class__}")
+    type_name = data_type.type.upper()
+    if type_name in ('CHAR', 'VARCHAR', 'STRING'):
         return StringSerializer()
-    elif data_type_lower in ('bigint', 'long'):
+    elif type_name == 'BIGINT':
         return LongSerializer()
-    elif data_type_lower in ('int', 'integer'):
+    elif type_name == 'INT':
         return IntSerializer()
-    elif data_type_lower in ('float'):
-        return FloatSerializer()
-    elif data_type_lower in ('double'):
-        return DoubleSerializer()
-    elif data_type_lower in ('boolean', 'bool'):
-        return BooleanSerializer()
     else:
         raise ValueError(f"DataType: {data_type} is not supported by btree 
index now.")
diff --git a/paimon-python/pypaimon/globalindex/btree/sst_file_reader.py 
b/paimon-python/pypaimon/globalindex/btree/sst_file_reader.py
index 2ac19d07dc..8b346deecc 100644
--- a/paimon-python/pypaimon/globalindex/btree/sst_file_reader.py
+++ b/paimon-python/pypaimon/globalindex/btree/sst_file_reader.py
@@ -124,9 +124,9 @@ class SstFileReader:
     ):
         self.comparator = comparator
         self.input_stream = input_stream
-        self.index_block = self._read_block(index_block_handle, True)
+        self.index_block = self._read_block(index_block_handle)
 
-    def _read_block(self, block_handle: BlockHandle, index: bool) -> 
BlockReader:
+    def _read_block(self, block_handle: BlockHandle) -> BlockReader:
         self.input_stream.seek(block_handle.offset)
         # Read block data + 5 bytes trailer (1 byte compression type + 4 bytes 
CRC32)
         block_data = self.input_stream.read(block_handle.size + 5)
@@ -149,19 +149,11 @@ class SstFileReader:
         if actual_crc32 != crc32_value:
             raise ValueError(f"CRC32 mismatch: expected {crc32_value}, got 
{actual_crc32}")
 
-        return BlockReader.create(block_bytes, self._slice_comparator if index 
else self.comparator)
+        return BlockReader.create(block_bytes, self.comparator)
 
-    @staticmethod
-    def _slice_comparator(a: bytes, b: bytes) -> int:
-        if a < b:
-            return -1
-        elif a > b:
-            return 1
-        return 0
-    
     def create_iterator(self) -> SstFileIterator:
         def read_block(block: BlockHandle) -> BlockReader:
-            return self._read_block(block, False)
+            return self._read_block(block)
 
         return SstFileIterator(
             read_block,
diff --git a/paimon-python/pypaimon/globalindex/global_index_evaluator.py 
b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
index dd0eb57235..46bb5cd67d 100644
--- a/paimon-python/pypaimon/globalindex/global_index_evaluator.py
+++ b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
@@ -18,15 +18,13 @@
 
 """Global index evaluator for filtering data using global indexes."""
 
-from typing import Callable, Collection, Dict, List, Optional, TYPE_CHECKING
+from typing import Callable, Collection, Dict, List, Optional
 
 from pypaimon.globalindex.global_index_reader import GlobalIndexReader, 
FieldRef
 from pypaimon.globalindex.global_index_result import GlobalIndexResult
-
-if TYPE_CHECKING:
-    from pypaimon.common.predicate import Predicate
-    from pypaimon.globalindex.vector_search import VectorSearch
-    from pypaimon.schema.data_types import DataField
+from pypaimon.common.predicate import Predicate
+from pypaimon.globalindex.vector_search import VectorSearch
+from pypaimon.schema.data_types import DataField
 
 
 class GlobalIndexEvaluator:
@@ -36,8 +34,8 @@ class GlobalIndexEvaluator:
 
     def __init__(
         self,
-        fields: List['DataField'],
-        readers_function: Callable[[int], Collection[GlobalIndexReader]]
+        fields: List[DataField],
+        readers_function: Callable[[DataField], Collection[GlobalIndexReader]]
     ):
         self._fields = fields
         self._field_by_name = {f.name: f for f in fields}
@@ -46,8 +44,8 @@ class GlobalIndexEvaluator:
 
     def evaluate(
         self,
-        predicate: Optional['Predicate'],
-        vector_search: Optional['VectorSearch']
+        predicate: Optional[Predicate],
+        vector_search: Optional[VectorSearch]
     ) -> Optional[GlobalIndexResult]:
         compound_result: Optional[GlobalIndexResult] = None
         
@@ -64,7 +62,7 @@ class GlobalIndexEvaluator:
             field_id = field.id
             readers = self._index_readers_cache.get(field_id)
             if readers is None:
-                readers = self._readers_function(field_id)
+                readers = self._readers_function(field)
                 self._index_readers_cache[field_id] = readers
             
             # If we have a compound result from predicates, use it to filter 
vector search
@@ -87,7 +85,7 @@ class GlobalIndexEvaluator:
         
         return compound_result
 
-    def _visit_predicate(self, predicate: 'Predicate') -> 
Optional[GlobalIndexResult]:
+    def _visit_predicate(self, predicate: Predicate) -> 
Optional[GlobalIndexResult]:
         """Visit a predicate and return the index result."""
         if predicate.method == 'and':
             compound_result: Optional[GlobalIndexResult] = None
@@ -121,7 +119,7 @@ class GlobalIndexEvaluator:
             # Leaf predicate
             return self._visit_leaf_predicate(predicate)
 
-    def _visit_leaf_predicate(self, predicate: 'Predicate') -> 
Optional[GlobalIndexResult]:
+    def _visit_leaf_predicate(self, predicate: Predicate) -> 
Optional[GlobalIndexResult]:
         """Visit a leaf predicate and return the index result."""
         field = self._field_by_name.get(predicate.field)
         if field is None:
@@ -130,7 +128,7 @@ class GlobalIndexEvaluator:
         field_id = field.id
         readers = self._index_readers_cache.get(field_id)
         if readers is None:
-            readers = self._readers_function(field_id)
+            readers = self._readers_function(field)
             self._index_readers_cache[field_id] = readers
         
         field_ref = FieldRef(predicate.index, predicate.field, str(field.type))
@@ -155,7 +153,7 @@ class GlobalIndexEvaluator:
     def _visit_function(
         self,
         reader: GlobalIndexReader,
-        predicate: 'Predicate',
+        predicate: Predicate,
         field_ref: FieldRef
     ) -> Optional[GlobalIndexResult]:
         """Visit a predicate function with the given reader."""
diff --git a/paimon-python/pypaimon/globalindex/global_index_scan_builder.py 
b/paimon-python/pypaimon/globalindex/global_index_scan_builder.py
index 3d449d1288..e14df8426b 100644
--- a/paimon-python/pypaimon/globalindex/global_index_scan_builder.py
+++ b/paimon-python/pypaimon/globalindex/global_index_scan_builder.py
@@ -25,6 +25,7 @@ from concurrent.futures import ThreadPoolExecutor, 
as_completed
 from pypaimon.globalindex import GlobalIndexIOMeta, GlobalIndexReader, 
GlobalIndexEvaluator
 from pypaimon.globalindex.range import Range
 from pypaimon.globalindex.global_index_result import GlobalIndexResult
+from pypaimon.schema.data_types import DataField
 
 
 class GlobalIndexScanBuilder(ABC):
@@ -160,12 +161,12 @@ class RowRangeGlobalIndexScanner:
             )
             index_metas[field_id][index_type].append(io_meta)
         
-        def readers_function(field_id: int) -> Collection[GlobalIndexReader]:
+        def readers_function(field: DataField) -> 
Collection[GlobalIndexReader]:
             readers = []
-            if field_id not in index_metas:
+            if field.id not in index_metas:
                 return readers
             
-            for index_type, io_metas in index_metas[field_id].items():
+            for index_type, io_metas in index_metas[field.id].items():
                 if index_type == 'faiss-vector-ann':
                     # Lazy import to avoid requiring faiss when not used
                     from pypaimon.globalindex.faiss import (
@@ -181,11 +182,11 @@ class RowRangeGlobalIndexScanner:
                     )
                     readers.append(reader)
                 if index_type == 'btree':
-                    from pypaimon.globalindex.btree import (BTreeIndexReader)
-                    from pypaimon.globalindex.btree.key_serializer import 
(StringSerializer)
+                    from pypaimon.globalindex.btree import BTreeIndexReader
+                    from pypaimon.globalindex.btree.key_serializer import 
create_serializer
                     for metadata in io_metas:
                         reader = BTreeIndexReader(
-                            key_serializer=StringSerializer(),  # TODO create 
serializer from type
+                            key_serializer=create_serializer(field.type),
                             file_io=file_io,
                             index_path=index_path,
                             io_meta=metadata
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py 
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index 86c2866676..b4cdca1dfa 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -321,28 +321,26 @@ class JavaPyReadWriteTest(unittest.TestCase):
         self.assertEqual(expected, actual)
 
     def test_read_btree_index_table(self):
-        table = self.catalog.get_table('default.test_btree_index')
-        read_builder: ReadBuilder = table.new_read_builder()
+        self._test_read_btree_index_generic("test_btree_index_string", "k2", 
pa.string())
+        self._test_read_btree_index_generic("test_btree_index_int", 200, 
pa.int32())
+        self._test_read_btree_index_generic("test_btree_index_bigint", 2000, 
pa.int64())
 
-        # read all
-        table_read = read_builder.new_read()
-        splits = read_builder.new_scan().plan().splits()
-        actual = table_sort_by(table_read.to_arrow(splits), 'k')
-        expected = pa.Table.from_pydict({
-            'k': ["k1", "k2", "k3"],
-            'v': ["v1", "v2", "v3"]
-        })
-        self.assertEqual(expected, actual)
+    def _test_read_btree_index_generic(self, table_name: str, k, k_type):
+        table = self.catalog.get_table('default.' + table_name)
+        read_builder: ReadBuilder = table.new_read_builder()
 
         # read using index
         predicate_builder = read_builder.new_predicate_builder()
-        predicate = predicate_builder.equal('k', 'k2')
+        predicate = predicate_builder.equal('k', k)
         read_builder.with_filter(predicate)
         table_read = read_builder.new_read()
         splits = read_builder.new_scan().plan().splits()
         actual = table_sort_by(table_read.to_arrow(splits), 'k')
         expected = pa.Table.from_pydict({
-            'k': ["k2"],
+            'k': [k],
             'v': ["v2"]
-        })
+        }, schema=pa.schema([
+            ("k", k_type),
+            ("v", pa.string())
+        ]))
         self.assertEqual(expected, actual)

Reply via email to