This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 51bfa9a10b [python] Fix bin packing bug when writing large data (#6189)
51bfa9a10b is described below
commit 51bfa9a10b224fa52b245e65632a79cd65d10dbe
Author: umi <[email protected]>
AuthorDate: Wed Sep 3 15:13:24 2025 +0800
[python] Fix bin packing bug when writing large data (#6189)
---
paimon-python/pypaimon/read/table_scan.py | 2 +-
.../pypaimon/tests/rest_table_read_write_test.py | 122 +++++++++++++++++++++
2 files changed, 123 insertions(+), 1 deletion(-)
diff --git a/paimon-python/pypaimon/read/table_scan.py
b/paimon-python/pypaimon/read/table_scan.py
index 1c2c4f33dc..2a927c0055 100644
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -249,7 +249,7 @@ class TableScan:
for item in items:
weight = weight_func(item)
if bin_weight + weight > target_weight and len(bin_items) > 0:
- packed.append(bin_items)
+ packed.append(list(bin_items))
bin_items.clear()
bin_weight = 0
diff --git a/paimon-python/pypaimon/tests/rest_table_read_write_test.py
b/paimon-python/pypaimon/tests/rest_table_read_write_test.py
index fd1cd65b07..0608c4ef58 100644
--- a/paimon-python/pypaimon/tests/rest_table_read_write_test.py
+++ b/paimon-python/pypaimon/tests/rest_table_read_write_test.py
@@ -16,9 +16,16 @@ See the License for the specific language governing
permissions and
limitations under the License.
"""
+import logging
+
import pandas as pd
import pyarrow as pa
+from pypaimon.api.options import Options
+from pypaimon.catalog.catalog_context import CatalogContext
+from pypaimon.catalog.catalog_factory import CatalogFactory
+from pypaimon.catalog.rest.rest_catalog import RESTCatalog
+from pypaimon.common.identifier import Identifier
from pypaimon.schema.schema import Schema
from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest
@@ -273,3 +280,118 @@ class RESTTableReadWriteTest(RESTCatalogBaseTest):
actual = duckdb_con.query("SELECT * FROM duckdb_table").fetchdf()
expect = pd.DataFrame(self.raw_data)
pd.testing.assert_frame_equal(actual.reset_index(drop=True),
expect.reset_index(drop=True))
+
+ def testWriteWideTableLargeData(self):
+ logging.basicConfig(level=logging.INFO)
+ catalog = CatalogFactory.create(self.options)
+
+ # Build table structure: 200 data columns + 1 partition column
+ # Create PyArrow schema
+ pa_fields = []
+
+ # Create 200 data columns f0 to f199
+ for i in range(200):
+ pa_fields.append(pa.field(f"f{i}", pa.string(),
metadata={"description": f"Column f{i}"}))
+
+ # Add partition column dt
+ pa_fields.append(pa.field("dt", pa.string(), metadata={"description":
"Partition column dt"}))
+
+ # Create PyArrow schema
+ pa_schema = pa.schema(pa_fields)
+
+ # Convert to Paimon Schema and specify partition key
+ schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=["dt"])
+
+ # Create table
+ table_identifier = Identifier.create("default", "wide_table_200cols")
+ try:
+ # If table already exists, drop it first
+ try:
+ catalog.get_table(table_identifier)
+ catalog.drop_table(table_identifier)
+ print(f"Dropped existing table {table_identifier}")
+ except Exception:
+ # Table does not exist, continue creating
+ pass
+
+ # Create new table
+ catalog.create_table(
+ identifier=table_identifier,
+ schema=schema,
+ ignore_if_exists=False
+ )
+
+ print(
+ f"Successfully created table {table_identifier} with
{len(pa_fields) - 1} "
+ f"data columns and 1 partition column")
+ print(
+ f"Table schema: {len([f for f in pa_fields if f.name !=
'dt'])} data columns (f0-f199) + dt partition")
+
+ except Exception as e:
+ print(f"Error creating table: {e}")
+ raise e
+ import random
+
+ table_identifier = Identifier.create("default", "wide_table_200cols")
+ table = catalog.get_table(table_identifier)
+
+ total_rows = 500000 # rows of data
+ batch_size = 100000 # 100,000 rows per batch
+ commit_batches = total_rows // batch_size
+
+ for commit_batch in range(commit_batches):
+ start_idx = commit_batch * batch_size
+ end_idx = start_idx + batch_size
+
+ print(f"Processing batch {commit_batch + 1}/{commit_batches}
({start_idx:,} - {end_idx:,})...")
+ # Generate data for current batch - generate data for all 200
columns
+ data = {}
+ # Generate data for f0-f199
+ for i in range(200):
+ if i == 0:
+ data[f"f{i}"] = [f'value_{j}' for j in range(start_idx,
end_idx)]
+ elif i == 1:
+ data[f"f{i}"] = [random.choice(['A', 'B', 'C', 'D', 'E'])
for _ in range(batch_size)]
+ elif i == 2:
+ data[f"f{i}"] = [f'detail_{random.randint(1, 1000)}' for _
in range(batch_size)]
+ elif i == 3:
+ data[f"f{i}"] = [f'id_{j:06d}' for j in range(start_idx,
end_idx)]
+ else:
+ # Generate random string data for other columns
+ data[f"f{i}"] = [f'col{i}_val_{random.randint(1, 10000)}'
for _ in range(batch_size)]
+
+ # Add partition column data
+ data['dt'] = ['2025-09-01' for _ in range(batch_size)]
+ # Convert dictionary to PyArrow RecordBatch
+ arrow_batch = pa.RecordBatch.from_pydict(data)
+ # Create new write and commit objects for each commit batch
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ try:
+ # Write current batch data
+ table_write.write_arrow_batch(arrow_batch)
+ print("Batch data write completed, committing...")
+ # Commit current batch
+ commit_messages = table_write.prepare_commit()
+ table_commit.commit(commit_messages)
+ print(f"Batch {commit_batch + 1} committed successfully!
Written {end_idx:,} rows of data")
+
+ finally:
+ # Ensure resource cleanup
+ table_write.close()
+ table_commit.close()
+
+ print(
+ f"All data writing completed! "
+ f"Total written {total_rows:,} rows of data to 200-column wide
table in {commit_batches} commits")
+ rest_catalog =
RESTCatalog(CatalogContext.create_from_options(Options(self.options)))
+ table = rest_catalog.get_table('default.wide_table_200cols')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ read_builder = (table.new_read_builder()
+ .with_projection(['f0', 'f1'])
+ .with_filter(predicate=predicate_builder.equal("dt",
"2025-09-01")))
+ table_read = read_builder.new_read()
+ splits = read_builder.new_scan().plan().splits()
+ self.assertEqual(table_read.to_arrow(splits).num_rows, total_rows)