This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.3 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit bfbefe8ec0921a5a5bbd8df57f1ab1d8bc5322ef Author: universe-hcy <[email protected]> AuthorDate: Wed Oct 22 16:21:00 2025 +0800 [Python] Fix bug comparing string and int in row_key_extractor (#6448) --- .../pypaimon/tests/rest/rest_simple_test.py | 53 +++++++++++++++++++++- paimon-python/pypaimon/write/row_key_extractor.py | 4 +- 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/paimon-python/pypaimon/tests/rest/rest_simple_test.py b/paimon-python/pypaimon/tests/rest/rest_simple_test.py index 1a53b825d4..19aec430fd 100644 --- a/paimon-python/pypaimon/tests/rest/rest_simple_test.py +++ b/paimon-python/pypaimon/tests/rest/rest_simple_test.py @@ -24,7 +24,8 @@ from pypaimon import Schema from pypaimon.catalog.catalog_exception import DatabaseAlreadyExistException, TableAlreadyExistException, \ DatabaseNotExistException, TableNotExistException from pypaimon.tests.rest.rest_base_test import RESTBaseTest -from pypaimon.write.row_key_extractor import FixedBucketRowKeyExtractor, DynamicBucketRowKeyExtractor +from pypaimon.write.row_key_extractor import FixedBucketRowKeyExtractor, DynamicBucketRowKeyExtractor, \ + UnawareBucketRowKeyExtractor class RESTSimpleTest(RESTBaseTest): @@ -103,6 +104,56 @@ class RESTSimpleTest(RESTBaseTest): expected = self._read_test_table(read_builder).sort_by('user_id') self.assertEqual(actual, expected) + def test_with_shard_ao_unaware_bucket_manual(self): + """Test shard_ao_unaware_bucket with setting bucket -1 manually""" + schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], + options={'bucket': '-1'}) + self.rest_catalog.create_table('default.test_with_shard_ao_unaware_bucket_manual', schema, False) + table = self.rest_catalog.get_table('default.test_with_shard_ao_unaware_bucket_manual') + write_builder = table.new_batch_write_builder() + + # Write data with single partition + table_write = write_builder.new_write() + self.assertIsInstance(table_write.row_key_extractor, UnawareBucketRowKeyExtractor) + + table_commit = write_builder.new_commit() + data = { + 'user_id': [1, 2, 3, 4, 5, 6], + 'item_id': [1001, 1002, 1003, 1004, 1005, 1006], + 'behavior': ['a', 'b', 'c', 'd', 'e', 'f'], + 'dt': ['p1', 'p1', 'p1', 'p1', 'p1', 'p1'], + } + pa_table = pa.Table.from_pydict(data, schema=self.pa_schema) + table_write.write_arrow(pa_table) + table_commit.commit(table_write.prepare_commit()) + table_write.close() + table_commit.close() + + read_builder = table.new_read_builder() + table_read = read_builder.new_read() + + # Test first shard (0, 2) - should get first 3 rows + plan = read_builder.new_scan().with_shard(0, 2).plan() + actual = table_read.to_arrow(plan.splits()).sort_by('user_id') + expected = pa.Table.from_pydict({ + 'user_id': [1, 2, 3], + 'item_id': [1001, 1002, 1003], + 'behavior': ['a', 'b', 'c'], + 'dt': ['p1', 'p1', 'p1'], + }, schema=self.pa_schema) + self.assertEqual(actual, expected) + + # Test second shard (1, 2) - should get last 3 rows + plan = read_builder.new_scan().with_shard(1, 2).plan() + actual = table_read.to_arrow(plan.splits()).sort_by('user_id') + expected = pa.Table.from_pydict({ + 'user_id': [4, 5, 6], + 'item_id': [1004, 1005, 1006], + 'behavior': ['d', 'e', 'f'], + 'dt': ['p1', 'p1', 'p1'], + }, schema=self.pa_schema) + self.assertEqual(actual, expected) + def test_with_shard_ao_fixed_bucket(self): schema = Schema.from_pyarrow_schema(self.pa_schema, partition_keys=['dt'], options={'bucket': '5', 'bucket-key': 'item_id'}) diff --git a/paimon-python/pypaimon/write/row_key_extractor.py b/paimon-python/pypaimon/write/row_key_extractor.py index c1b7ebf7d0..db36d50e91 100644 --- a/paimon-python/pypaimon/write/row_key_extractor.py +++ b/paimon-python/pypaimon/write/row_key_extractor.py @@ -101,7 +101,7 @@ class UnawareBucketRowKeyExtractor(RowKeyExtractor): def __init__(self, table_schema: TableSchema): super().__init__(table_schema) - num_buckets = table_schema.options.get(CoreOptions.BUCKET, -1) + num_buckets = int(table_schema.options.get(CoreOptions.BUCKET, -1)) if num_buckets != -1: raise ValueError(f"Unaware bucket mode requires bucket = -1, got {num_buckets}") @@ -118,7 +118,7 @@ class DynamicBucketRowKeyExtractor(RowKeyExtractor): def __init__(self, table_schema: 'TableSchema'): super().__init__(table_schema) - num_buckets = table_schema.options.get(CoreOptions.BUCKET, -1) + num_buckets = int(table_schema.options.get(CoreOptions.BUCKET, -1)) if num_buckets != -1: raise ValueError(
