This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 384c430bbc [python] Fix failing to read 1000cols (#6244)
384c430bbc is described below
commit 384c430bbc8adf2e0aa7327eac0e854bed308e98
Author: umi <[email protected]>
AuthorDate: Fri Sep 12 14:40:49 2025 +0800
[python] Fix failing to read 1000cols (#6244)
---
paimon-python/pypaimon/table/row/generic_row.py | 4 +-
.../pypaimon/tests/py36/ao_read_write_test.py | 76 ++++++++++++++++++++++
.../pypaimon/tests/reader_append_only_test.py | 76 ++++++++++++++++++++++
3 files changed, 153 insertions(+), 3 deletions(-)
diff --git a/paimon-python/pypaimon/table/row/generic_row.py
b/paimon-python/pypaimon/table/row/generic_row.py
index 2f6d3d86ae..14f42e806c 100644
--- a/paimon-python/pypaimon/table/row/generic_row.py
+++ b/paimon-python/pypaimon/table/row/generic_row.py
@@ -54,9 +54,7 @@ class GenericRowDeserializer:
arity = len(data_fields)
actual_data = bytes_data
if len(bytes_data) >= 4:
- arity_from_bytes = struct.unpack('>i', bytes_data[:4])[0]
- if 0 < arity_from_bytes < 1000:
- actual_data = bytes_data[4:]
+ actual_data = bytes_data[4:]
fields = []
null_bits_size_in_bytes = cls._calculate_bit_set_width_in_bytes(arity)
diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
index 3247ea691f..0e8d97d47b 100644
--- a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
@@ -20,7 +20,9 @@ from datetime import datetime
from unittest.mock import Mock
import pandas as pd
+import numpy as np
import pyarrow as pa
+
from pypaimon.api.options import Options
from pypaimon.catalog.catalog_context import CatalogContext
from pypaimon import CatalogFactory
@@ -307,6 +309,80 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
self.assertEqual(actual, self.expected)
+ def test_over1000cols_read(self):
+ num_rows = 1
+ num_cols = 10
+ table_name = "default.testBug"
+ # Generate dynamic schema based on column count
+ schema_fields = []
+ for i in range(1, num_cols + 1):
+ col_name = f'c{i:03d}'
+ if i == 1:
+ schema_fields.append((col_name, pa.string())) # ID column
+ elif i == 2:
+ schema_fields.append((col_name, pa.string())) # Name column
+ elif i == 3:
+ schema_fields.append((col_name, pa.string())) # Category
column (partition key)
+ elif i % 4 == 0:
+ schema_fields.append((col_name, pa.float64())) # Float columns
+ elif i % 4 == 1:
+ schema_fields.append((col_name, pa.int32())) # Int columns
+ elif i % 4 == 2:
+ schema_fields.append((col_name, pa.string())) # String columns
+ else:
+ schema_fields.append((col_name, pa.int64())) # Long columns
+
+ pa_schema = pa.schema(schema_fields)
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ partition_keys=['c003'], # Use c003 as partition key
+ )
+
+ # Create table
+ self.rest_catalog.create_table(table_name, schema, False)
+ table = self.rest_catalog.get_table(table_name)
+
+ # Generate test data
+ np.random.seed(42) # For reproducible results
+ categories = ['Electronics', 'Clothing', 'Books', 'Home', 'Sports',
'Food', 'Toys', 'Beauty', 'Health', 'Auto']
+ statuses = ['Active', 'Inactive', 'Pending', 'Completed']
+
+ # Generate data dictionary
+ test_data = {}
+ for i in range(1, num_cols + 1):
+ col_name = f'c{i:03d}'
+ if i == 1:
+ test_data[col_name] = [f'Product_{j}' for j in range(1,
num_rows + 1)]
+ elif i == 2:
+ test_data[col_name] = [f'Product_{j}' for j in range(1,
num_rows + 1)]
+ elif i == 3:
+ test_data[col_name] = np.random.choice(categories, num_rows)
+ elif i % 4 == 0:
+ test_data[col_name] = np.random.uniform(1.0, 1000.0,
num_rows).round(2)
+ elif i % 4 == 1:
+ test_data[col_name] = np.random.randint(1, 100, num_rows)
+ elif i % 4 == 2:
+ test_data[col_name] = np.random.choice(statuses, num_rows)
+ else:
+ test_data[col_name] = np.random.randint(1640995200,
1672531200, num_rows)
+
+ test_df = pd.DataFrame(test_data)
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ table_write.write_pandas(test_df)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ result = table_read.to_pandas(table_scan.plan().splits())
+ self.assertEqual(result.to_dict(), test_df.to_dict())
+
def testAppendOnlyReaderWithFilter(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.rest_catalog.create_table('default.test_append_only_filter',
schema, False)
diff --git a/paimon-python/pypaimon/tests/reader_append_only_test.py
b/paimon-python/pypaimon/tests/reader_append_only_test.py
index 5a6c291f2e..795365e48e 100644
--- a/paimon-python/pypaimon/tests/reader_append_only_test.py
+++ b/paimon-python/pypaimon/tests/reader_append_only_test.py
@@ -21,6 +21,8 @@ import tempfile
import unittest
import pyarrow as pa
+import numpy as np
+import pandas as pd
from pypaimon import CatalogFactory
from pypaimon import Schema
@@ -113,6 +115,80 @@ class AoReaderTest(unittest.TestCase):
actual = self._read_test_table(read_builder).sort_by('user_id')
self.assertEqual(actual, self.expected)
+ def test_over1000cols_read(self):
+ num_rows = 1
+ num_cols = 10
+ table_name = "default.testBug"
+ # Generate dynamic schema based on column count
+ schema_fields = []
+ for i in range(1, num_cols + 1):
+ col_name = f'c{i:03d}'
+ if i == 1:
+ schema_fields.append((col_name, pa.string())) # ID column
+ elif i == 2:
+ schema_fields.append((col_name, pa.string())) # Name column
+ elif i == 3:
+ schema_fields.append((col_name, pa.string())) # Category
column (partition key)
+ elif i % 4 == 0:
+ schema_fields.append((col_name, pa.float64())) # Float columns
+ elif i % 4 == 1:
+ schema_fields.append((col_name, pa.int32())) # Int columns
+ elif i % 4 == 2:
+ schema_fields.append((col_name, pa.string())) # String columns
+ else:
+ schema_fields.append((col_name, pa.int64())) # Long columns
+
+ pa_schema = pa.schema(schema_fields)
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ partition_keys=['c003'], # Use c003 as partition key
+ )
+
+ # Create table
+ self.catalog.create_table(table_name, schema, False)
+ table = self.catalog.get_table(table_name)
+
+ # Generate test data
+ np.random.seed(42) # For reproducible results
+ categories = ['Electronics', 'Clothing', 'Books', 'Home', 'Sports',
'Food', 'Toys', 'Beauty', 'Health', 'Auto']
+ statuses = ['Active', 'Inactive', 'Pending', 'Completed']
+
+ # Generate data dictionary
+ test_data = {}
+ for i in range(1, num_cols + 1):
+ col_name = f'c{i:03d}'
+ if i == 1:
+ test_data[col_name] = [f'Product_{j}' for j in range(1,
num_rows + 1)]
+ elif i == 2:
+ test_data[col_name] = [f'Product_{j}' for j in range(1,
num_rows + 1)]
+ elif i == 3:
+ test_data[col_name] = np.random.choice(categories, num_rows)
+ elif i % 4 == 0:
+ test_data[col_name] = np.random.uniform(1.0, 1000.0,
num_rows).round(2)
+ elif i % 4 == 1:
+ test_data[col_name] = np.random.randint(1, 100, num_rows)
+ elif i % 4 == 2:
+ test_data[col_name] = np.random.choice(statuses, num_rows)
+ else:
+ test_data[col_name] = np.random.randint(1640995200,
1672531200, num_rows)
+
+ test_df = pd.DataFrame(test_data)
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ table_write.write_pandas(test_df)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ result = table_read.to_pandas(table_scan.plan().splits())
+ self.assertEqual(result.to_dict(), test_df.to_dict())
+
def testAppendOnlyReaderWithFilter(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.catalog.create_table('default.test_append_only_filter', schema,
False)