klboke opened a new issue, #6182: URL: https://github.com/apache/paimon/issues/6182
### Search before asking - [x] I searched in the [issues](https://github.com/apache/paimon/issues) and found nothing similar. ### Paimon version 1.3 ### Compute Engine spark ### Minimal reproduce step ```python def test_create_wide_table(self): """创建一张包含 200 列(f0-f199)和分区列 dt 的宽表""" logging.basicConfig(level=logging.INFO) options = { 'metastore': 'rest', 'uri': "https://cn-shanghai-vpc.dlf.aliyuncs.com", 'warehouse': 'lakehouse_rnd', 'dlf.region': 'cn-shanghai', "token.provider": "dlf", 'dlf.access-key-id': os.getenv("DLF_ACCESS_KEY_ID"), 'dlf.access-key-secret': os.getenv("DLF_ACCESS_KEY_SECRET"), 'data.token.enabled': 'true' } catalog = CatalogFactory.create(options) # 构建表结构:200个数据列 + 1个分区列 # 创建 PyArrow schema pa_fields = [] # 创建 200 个数据列 f0 到 f199 for i in range(200): pa_fields.append(pa.field(f"f{i}", pa.string(), metadata={"description": f"Column f{i}"})) # 添加分区列 dt pa_fields.append(pa.field("dt", pa.string(), metadata={"description": "Partition column dt"})) # 创建 PyArrow schema pa_schema = pa.schema(pa_fields) # 转换为 Paimon Schema 并指定分区键 schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=["dt"]) # 创建表 table_identifier = Identifier.create("adn", "wide_table_200cols") try: # 如果表已存在,先删除 try: catalog.get_table(table_identifier) catalog.drop_table(table_identifier) print(f"Dropped existing table {table_identifier}") except Exception: # 表不存在,继续创建 pass # 创建新表 table = catalog.create_table( identifier=table_identifier, schema=schema, ignore_if_exists=False ) print(f"Successfully created table {table_identifier} with {len(pa_fields)-1} data columns and 1 partition column") print(f"Table schema: {len([f for f in pa_fields if f.name != 'dt'])} data columns (f0-f199) + dt partition") return table except Exception as e: print(f"Error creating table: {e}") raise e def test_write_wide_table(self): """往 200 列宽表写入 200W 行数据""" import random logging.basicConfig(level=logging.INFO) options = { 'metastore': 'rest', 'uri': "https://cn-shanghai-vpc.dlf.aliyuncs.com", 'warehouse': 'lakehouse_rnd', 'dlf.region': 'cn-shanghai', "token.provider": "dlf", 'dlf.access-key-id': os.getenv("DLF_ACCESS_KEY_ID"), 'dlf.access-key-secret': os.getenv("DLF_ACCESS_KEY_SECRET"), 'data.token.enabled': 'true' } catalog = CatalogFactory.create(options) table_identifier = Identifier.create("adn", "wide_table_200cols") table = catalog.get_table(table_identifier) total_rows = 2000000 # 200W 行数据 batch_size = 100000 # 10万条数据为一个批次 commit_batches = total_rows // batch_size for commit_batch in range(commit_batches): start_idx = commit_batch * batch_size end_idx = start_idx + batch_size print(f"正在处理第 {commit_batch + 1}/{commit_batches} 个提交批次 ({start_idx:,} - {end_idx:,})...") # 生成当前批次的数据 - 为所有 200 个列生成数据 data = {} # 为 f0-f199 生成数据 for i in range(200): if i == 0: data[f"f{i}"] = [f'value_{j}' for j in range(start_idx, end_idx)] elif i == 1: data[f"f{i}"] = [random.choice(['A', 'B', 'C', 'D', 'E']) for _ in range(batch_size)] elif i == 2: data[f"f{i}"] = [f'detail_{random.randint(1, 1000)}' for _ in range(batch_size)] elif i == 3: data[f"f{i}"] = [f'id_{j:06d}' for j in range(start_idx, end_idx)] else: # 为其他列生成随机字符串数据 data[f"f{i}"] = [f'col{i}_val_{random.randint(1, 10000)}' for _ in range(batch_size)] # 添加分区列数据 data['dt'] = ['2025-09-01' for _ in range(batch_size)] # 将字典转换为PyArrow RecordBatch arrow_batch = pa.RecordBatch.from_pydict(data) # 为每个提交批次创建新的写入和提交对象 write_builder = table.new_batch_write_builder() table_write = write_builder.new_write() table_commit = write_builder.new_commit() try: # 写入当前批次数据 table_write.write_arrow_batch(arrow_batch) print(f"批次数据写入完成,正在提交...") # 提交当前批次 commit_messages = table_write.prepare_commit() table_commit.commit(commit_messages) print(f"第 {commit_batch + 1} 个批次提交成功!已写入 {end_idx:,} 条数据") finally: # 确保资源清理 table_write.close() table_commit.close() print(f"所有数据写入完成!总共写入 {total_rows:,} 条数据到 200 列宽表,分 {commit_batches} 次提交") ``` ### What doesn't meet your expectations? When writing 2 million records with pypaimon, the data is successfully written, and the DLF console also shows 2 million records. However, when reading the data with pypaimon, several hundred thousand records are missing. ### Anything else? _No response_ ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
