This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 3bbb4fe365 [Python] Fix bug comparing string and int in
row_key_extractor (#6448)
3bbb4fe365 is described below
commit 3bbb4fe365cc4bf90ebed0add9fc645ac708a3b7
Author: universe-hcy <[email protected]>
AuthorDate: Wed Oct 22 16:21:00 2025 +0800
[Python] Fix bug comparing string and int in row_key_extractor (#6448)
---
.../pypaimon/tests/rest/rest_simple_test.py | 53 +++++++++++++++++++++-
paimon-python/pypaimon/write/row_key_extractor.py | 4 +-
2 files changed, 54 insertions(+), 3 deletions(-)
diff --git a/paimon-python/pypaimon/tests/rest/rest_simple_test.py
b/paimon-python/pypaimon/tests/rest/rest_simple_test.py
index 1a53b825d4..19aec430fd 100644
--- a/paimon-python/pypaimon/tests/rest/rest_simple_test.py
+++ b/paimon-python/pypaimon/tests/rest/rest_simple_test.py
@@ -24,7 +24,8 @@ from pypaimon import Schema
from pypaimon.catalog.catalog_exception import DatabaseAlreadyExistException,
TableAlreadyExistException, \
DatabaseNotExistException, TableNotExistException
from pypaimon.tests.rest.rest_base_test import RESTBaseTest
-from pypaimon.write.row_key_extractor import FixedBucketRowKeyExtractor,
DynamicBucketRowKeyExtractor
+from pypaimon.write.row_key_extractor import FixedBucketRowKeyExtractor,
DynamicBucketRowKeyExtractor, \
+ UnawareBucketRowKeyExtractor
class RESTSimpleTest(RESTBaseTest):
@@ -103,6 +104,56 @@ class RESTSimpleTest(RESTBaseTest):
expected = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, expected)
+ def test_with_shard_ao_unaware_bucket_manual(self):
+ """Test shard_ao_unaware_bucket with setting bucket -1 manually"""
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'],
+ options={'bucket': '-1'})
+
self.rest_catalog.create_table('default.test_with_shard_ao_unaware_bucket_manual',
schema, False)
+ table =
self.rest_catalog.get_table('default.test_with_shard_ao_unaware_bucket_manual')
+ write_builder = table.new_batch_write_builder()
+
+ # Write data with single partition
+ table_write = write_builder.new_write()
+ self.assertIsInstance(table_write.row_key_extractor,
UnawareBucketRowKeyExtractor)
+
+ table_commit = write_builder.new_commit()
+ data = {
+ 'user_id': [1, 2, 3, 4, 5, 6],
+ 'item_id': [1001, 1002, 1003, 1004, 1005, 1006],
+ 'behavior': ['a', 'b', 'c', 'd', 'e', 'f'],
+ 'dt': ['p1', 'p1', 'p1', 'p1', 'p1', 'p1'],
+ }
+ pa_table = pa.Table.from_pydict(data, schema=self.pa_schema)
+ table_write.write_arrow(pa_table)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ read_builder = table.new_read_builder()
+ table_read = read_builder.new_read()
+
+ # Test first shard (0, 2) - should get first 3 rows
+ plan = read_builder.new_scan().with_shard(0, 2).plan()
+ actual = table_read.to_arrow(plan.splits()).sort_by('user_id')
+ expected = pa.Table.from_pydict({
+ 'user_id': [1, 2, 3],
+ 'item_id': [1001, 1002, 1003],
+ 'behavior': ['a', 'b', 'c'],
+ 'dt': ['p1', 'p1', 'p1'],
+ }, schema=self.pa_schema)
+ self.assertEqual(actual, expected)
+
+ # Test second shard (1, 2) - should get last 3 rows
+ plan = read_builder.new_scan().with_shard(1, 2).plan()
+ actual = table_read.to_arrow(plan.splits()).sort_by('user_id')
+ expected = pa.Table.from_pydict({
+ 'user_id': [4, 5, 6],
+ 'item_id': [1004, 1005, 1006],
+ 'behavior': ['d', 'e', 'f'],
+ 'dt': ['p1', 'p1', 'p1'],
+ }, schema=self.pa_schema)
+ self.assertEqual(actual, expected)
+
def test_with_shard_ao_fixed_bucket(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'],
options={'bucket': '5',
'bucket-key': 'item_id'})
diff --git a/paimon-python/pypaimon/write/row_key_extractor.py
b/paimon-python/pypaimon/write/row_key_extractor.py
index c1b7ebf7d0..db36d50e91 100644
--- a/paimon-python/pypaimon/write/row_key_extractor.py
+++ b/paimon-python/pypaimon/write/row_key_extractor.py
@@ -101,7 +101,7 @@ class UnawareBucketRowKeyExtractor(RowKeyExtractor):
def __init__(self, table_schema: TableSchema):
super().__init__(table_schema)
- num_buckets = table_schema.options.get(CoreOptions.BUCKET, -1)
+ num_buckets = int(table_schema.options.get(CoreOptions.BUCKET, -1))
if num_buckets != -1:
raise ValueError(f"Unaware bucket mode requires bucket = -1, got
{num_buckets}")
@@ -118,7 +118,7 @@ class DynamicBucketRowKeyExtractor(RowKeyExtractor):
def __init__(self, table_schema: 'TableSchema'):
super().__init__(table_schema)
- num_buckets = table_schema.options.get(CoreOptions.BUCKET, -1)
+ num_buckets = int(table_schema.options.get(CoreOptions.BUCKET, -1))
if num_buckets != -1:
raise ValueError(