klboke commented on issue #6157:
URL: https://github.com/apache/paimon/issues/6157#issuecomment-3240689271
@JingsongLi The current implementation approach is roughly as follows:
```python
# ========= Row Group 规划(适配 Paimon 数据文件)=========
def _plan_paimon_row_groups_global(
data_files: List[str],
fs,
columns: List[str],
slice_id: int,
slice_count: int,
validate_schema: bool = True,
table_name: str = "unknown",
) -> List[Tuple[str, int, Optional[Tuple[int, int]]]]:
"""
全局分片策略(适配 Paimon 数据文件):
1. 将所有 Paimon 数据文件排序后看作一个连续的数据空间
2. 每个 worker 在该全局空间中获得一个连续范围
3. 只读取与该范围重叠的文件部分,并进行精确的行级过滤
Args:
data_files: Paimon 数据文件路径列表
fs: PyArrow 文件系统对象
columns: 需要读取的列名列表
slice_id: 当前 worker ID (0 <= slice_id < slice_count)
slice_count: 总 worker 数量
validate_schema: 是否验证列存在性
Returns:
分配给当前 worker 的 (file_path, row_group_id, row_range) 列表
"""
tasks: List[Tuple[str, int, Optional[Tuple[int, int]]]] = []
# 第一步:收集所有 Paimon 数据文件信息并排序
file_info = [] # (file_path, file_rows, metadata)
for path in sorted(data_files): # 按路径排序保证一致性
try:
# 使用 PyArrow dataset 兼容 Paimon 产生的 Parquet 文件
ds = pds.dataset(path, format="parquet", filesystem=fs)
for frag in ds.get_fragments():
try:
pf = pq.ParquetFile(frag.path, filesystem=fs)
except Exception as e:
logger.warning("Skipping unreadable file %s: %s",
frag.path, e)
continue
if validate_schema:
schema = pf.schema_arrow
missing = [c for c in columns if
schema.get_field_index(c) == -1]
if missing:
raise ValueError(f"File {frag.path} missing columns:
{missing}")
metadata = pf.metadata
file_rows = sum(metadata.row_group(rg_idx).num_rows for
rg_idx in range(metadata.num_row_groups))
if file_rows > 0:
file_info.append((frag.path, file_rows, metadata))
except Exception as e:
logger.warning("Error processing Paimon data file %s: %s", path,
e)
continue
if not file_info:
logger.warning("No valid Paimon data files found")
return []
# 第二步:计算全局数据空间和每个文件的偏移
total_rows = sum(rows for _, rows, _ in file_info)
file_offsets = [] # (file_path, global_start, global_end, metadata)
current_offset = 0
for file_path, file_rows, metadata in file_info:
file_offsets.append((file_path, current_offset, current_offset +
file_rows, metadata))
current_offset += file_rows
# 第三步:计算当前 worker 的全局范围
rows_per_worker = total_rows // slice_count
global_start = slice_id * rows_per_worker
if slice_id == slice_count - 1:
global_end = total_rows
else:
global_end = global_start + rows_per_worker
# 这个信息移到数据处理阶段输出,这里只输出规划完成信息
logger.info(
"Worker %d/%d planned table '%s' global range [%d:%d) of %d total
rows",
slice_id, slice_count, table_name, global_start, global_end,
total_rows
)
# 第四步:找到与该范围重叠的文件并收集 Row Group
total_assigned_rgs = 0
for file_path, file_global_start, file_global_end, metadata in
file_offsets:
# 检查文件是否与 worker 范围重叠
if file_global_end > global_start and file_global_start < global_end:
# 计算重叠部分在文件内的相对位置
overlap_start = max(global_start, file_global_start)
overlap_end = min(global_end, file_global_end)
file_relative_start = overlap_start - file_global_start
file_relative_end = overlap_end - file_global_start
# logger.info(
# "File %s overlaps: global[%d:%d) -> file[%d:%d)",
# file_path, overlap_start, overlap_end,
file_relative_start, file_relative_end
# )
# 在该文件中找到覆盖范围的 Row Group 并计算精确的行范围
current_offset = 0
for rg_idx in range(metadata.num_row_groups):
rg = metadata.row_group(rg_idx)
rg_num_rows = rg.num_rows
rg_start = current_offset
rg_end = current_offset + rg_num_rows
# 检查该 Row Group 是否与文件内的目标范围重叠
if rg_end > file_relative_start and rg_start <
file_relative_end:
# 计算在该 Row Group 内需要读取的精确行范围
rg_read_start = max(0, file_relative_start - rg_start)
rg_read_end = min(rg_num_rows, file_relative_end -
rg_start)
if rg_read_start < rg_read_end:
# 如果需要读取整个 Row Group,row_range 设为 None(优化)
if rg_read_start == 0 and rg_read_end == rg_num_rows:
row_range = None
else:
row_range = (rg_read_start, rg_read_end)
tasks.append((file_path, rg_idx, row_range))
total_assigned_rgs += 1
logger.debug(
"Worker %d: file %s RG%d [%s] -> reading rows
[%d:%d)",
slice_id, file_path, rg_idx,
"full" if row_range is None else
f"{row_range[0]}:{row_range[1]}",
rg_read_start, rg_read_end
)
current_offset = rg_end
return tasks
def _calculate_row_range(total_rows: int, slice_id: int, slice_count: int,
file_path: str = None):
"""
计算当前 worker 应该处理的行范围
"""
if slice_count <= 1:
return 0, total_rows # 单 worker 模式,处理所有行
rows_per_worker = total_rows // slice_count
start_row = slice_id * rows_per_worker
if slice_id == slice_count - 1:
# 最后一个 worker 处理剩余的所有行
end_row = total_rows
else:
end_row = start_row + rows_per_worker
if file_path:
logger.info(
"Worker %d/%d processing rows [%d, %d) of %d total rows in file
%s",
slice_id, slice_count, start_row, end_row, total_rows, file_path
)
# 如果当前 worker 没有分配到任何行,返回 None
if start_row >= end_row:
if file_path:
logger.info("Worker %d has no rows to process in file %s",
slice_id, file_path)
return None
return start_row, end_row
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]