This is an automated email from the ASF dual-hosted git repository. suxiaogang223 pushed a commit to branch refact_reader_branch in repository https://gitbox.apache.org/repos/asf/doris.git
commit 57178c889aa1fdaa64efc9ac9e9f54d6753ea2c2 Author: Socrates <[email protected]> AuthorDate: Mon May 18 16:16:50 2026 +0800 Refine Iceberg reader API boundaries --- be/src/format/parquet/parquet_reader.h | 10 ++++ be/src/format/reader/file_reader.h | 14 +++++ be/src/format/reader/table_reader.h | 93 ++++++++++++++++++++++++++++++-- be/src/format/table/iceberg_reader_v2.h | 89 +++++++++++++++++++++++------- docs/doris-iceberg-parquet-api-design.md | 46 ++++++++++++---- 5 files changed, 218 insertions(+), 34 deletions(-) diff --git a/be/src/format/parquet/parquet_reader.h b/be/src/format/parquet/parquet_reader.h index dfac6494cd8..65227aba04c 100644 --- a/be/src/format/parquet/parquet_reader.h +++ b/be/src/format/parquet/parquet_reader.h @@ -42,23 +42,33 @@ class ParquetReader : public reader::FileReader { public: virtual ~ParquetReader() = default; + // 解析 Parquet footer 并返回 Parquet 文件自身的 schema。 + // 这里不做 Iceberg schema evolution,也不把字段转换成 table/global schema。 Status get_schema(std::vector<reader::SchemaField>* file_schema) const override { // 真实实现会从 Parquet footer / schema descriptor 展开 file-local schema。 file_schema->clear(); return Status::OK(); } + // 初始化 Parquet 专属 scan。 + // 后续可以在 ParquetScanRequest 中扩展 row group、page index、bloom filter 等 + // Parquet-only 选项;table-level 语义仍然必须由 TableColumnMapper 提前转换。 Status init(const ParquetScanRequest& request) { // 真实实现会根据 projected_file_columns、local_filters 和 reader_expression_map // 初始化 row group、column chunk、page reader 以及延时物化计划。 return reader::FileReader::init(request); } + // 读取下一批 Parquet file-local block。 + // 返回列必须保持 file-local 语义,不能在这里补 default/generated/partition 列。 Status next(Block* file_block, size_t* rows, bool* eof) override { // 真实实现会输出 file-local block。stub 默认立即 EOF。 return reader::FileReader::next(file_block, rows, eof); } + // 通用 FileReader 初始化入口。 + // 当上层只持有 reader::FileReader 指针时会走该接口;Parquet 专属参数通过 + // ParquetScanRequest 重载表达。 Status init(const reader::FileScanRequest& request) override { return reader::FileReader::init(request); } diff --git a/be/src/format/reader/file_reader.h b/be/src/format/reader/file_reader.h index af03691b94e..fd5bfcf933f 100644 --- a/be/src/format/reader/file_reader.h +++ b/be/src/format/reader/file_reader.h @@ -87,6 +87,9 @@ class FileReader { public: virtual ~FileReader() = default; + // 打开一个物理文件并加载文件级元数据。 + // 该方法只建立 file-local reader 状态,不接收 table schema,也不做 projection/filter + // 规划;这些输入由 init(FileScanRequest) 提供。 virtual Status open(io::FileReaderSPtr file, io::IOContext* io_ctx = nullptr) { // 真实实现会保存文件句柄、IO 上下文并读取文件元数据。 _file = std::move(file); @@ -95,12 +98,18 @@ public: return Status::OK(); } + // 返回文件自己的 schema 视图。 + // 返回结果必须是 file-local schema:列 id、类型和 children 都按文件格式展开, + // 不在这里解释 Iceberg field id、缺失列、默认值或 generated column。 virtual Status get_schema(std::vector<SchemaField>* file_schema) const { // 真实实现会展开文件格式自己的 file-local schema。 file_schema->clear(); return Status::OK(); } + // 初始化一次 file-local scan。 + // request 由 TableColumnMapper 生成,只包含文件列投影、本地过滤条件和 reader + // expression。FileReader 可以基于它初始化 row group/page/stripe 等文件格式计划。 virtual Status init(const FileScanRequest& request) { // 真实实现会根据 projected columns、local filters 和 reader expressions // 初始化文件格式自己的物理读取计划。 @@ -110,6 +119,9 @@ public: return Status::OK(); } + // 读取下一批 file-local block。 + // file_block 的列顺序和类型必须遵守 FileScanRequest,而不是 table/global schema。 + // eof 表示当前文件 reader 是否读完;多文件切换由 TableReader 负责。 virtual Status next(Block* file_block, size_t* rows, bool* eof) { // stub 默认立即 EOF。 (void)file_block; @@ -123,6 +135,8 @@ public: return Status::OK(); } + // 关闭当前物理文件 reader 并释放文件层状态。 + // 该方法不处理 table-level delete/finalize 状态,后者由 TableReader 子类管理。 virtual Status close() { _file.reset(); _io_ctx = nullptr; diff --git a/be/src/format/reader/table_reader.h b/be/src/format/reader/table_reader.h index 422ee3142d1..8d88ce4fe1b 100644 --- a/be/src/format/reader/table_reader.h +++ b/be/src/format/reader/table_reader.h @@ -113,6 +113,9 @@ public: explicit TableColumnMapper(TableColumnMapperOptions options = {}) : _options(std::move(options)) {} virtual ~TableColumnMapper() = default; + // 建立 table schema 到 file schema 的列映射。 + // 输出的 ColumnMapping 描述 table column 如何从 file column、常量列或表达式得到; + // 后续 projection、filter localization 和 table block finalize 都应复用这份映射。 virtual Status create_mapping(const std::vector<TableColumn>& table_schema, const std::vector<SchemaField>& file_schema, std::vector<ColumnMapping>* mappings) { @@ -136,6 +139,9 @@ public: return Status::OK(); } + // 把 table-level scan 请求转换成 file-local scan 请求。 + // table_request 使用 table/global schema;file_request 只包含 FileReader 能理解的 + // projected_file_columns、local_filters 和 reader_expression_map。 virtual Status create_scan_request(const TableScanRequest& table_request, const std::vector<ColumnMapping>& mappings, FileScanRequest* file_request) { @@ -154,6 +160,9 @@ public: return Status::OK(); } + // 将 table-level filter 定位到文件 schema。 + // trivial mapping 可以直接复制结构化谓词;类型变化时可以尝试安全 cast;无法安全 + // 下推的表达式应通过 reader_expression_map 或 table-level finalize/filter fallback 处理。 virtual Status localize_filters(const std::vector<TableFilter>& table_filters, FileScanRequest* file_request) const { // 真实实现会处理 trivial mapping、safe cast、reader expression fallback 和 @@ -213,15 +222,21 @@ struct TableReadOptions { // table-level reader 基类。 // 该层负责多文件编排和动态分区裁剪等通用 table-level 逻辑,对外输出 table block。 +// 子类只需要实现“如何打开下一个具体 reader”和“如何读取当前 reader”的表格式语义。 class TableReader { public: virtual ~TableReader() = default; + // 初始化 table reader 的通用运行参数。 + // 子类可以在自己的 init(params) 中调用该方法;这里不接收具体表格式 schema/task。 virtual Status init(const TableReadOptions& options) { _options = options; return Status::OK(); } + // table-level 动态过滤入口。 + // 该方法用于根据 split、partition value 或文件级统计判断是否可以跳过后续 reader。 + // can_filter_all=true 表示当前 table reader 范围内的数据都可以被裁剪。 virtual Status filter(const VExprContextSPtr& expr, bool* can_filter_all) { // 真实实现会基于 split/partition/file stats 判断动态分区裁剪结果。 (void)expr; @@ -231,12 +246,78 @@ public: return Status::OK(); } - virtual Status next_reader() { - // 真实实现会切换到下一个 data file / split reader。 + // 对外读取 table block 的统一入口。 + // 基类负责 current reader 的打开、EOF 后切换和关闭;子类只实现 protected hook。 + // table_block 的列必须已经是 table/global schema 语义。 + Status next(Block* table_block, size_t* rows, bool* eof) { + if (rows != nullptr) { + *rows = 0; + } + if (eof != nullptr) { + *eof = false; + } + while (true) { + if (!_has_current_reader) { + RETURN_IF_ERROR(next_reader()); + if (!_has_current_reader) { + if (eof != nullptr) { + *eof = true; + } + return Status::OK(); + } + } + + size_t current_rows = 0; + bool current_eof = false; + RETURN_IF_ERROR(read_current(table_block, ¤t_rows, ¤t_eof)); + if (rows != nullptr) { + *rows = current_rows; + } + if (!current_eof || current_rows > 0) { + return Status::OK(); + } + RETURN_IF_ERROR(close_current_reader()); + _has_current_reader = false; + } + } + + // 关闭 table reader 及当前正在读取的底层 reader。 + // 子类如果持有额外表格式资源,应 override 后先调用 TableReader::close()。 + virtual Status close() { + RETURN_IF_ERROR(close_current_reader()); + _has_current_reader = false; + return Status::OK(); + } + +protected: + // 切换到下一个 reader 的通用流程。 + // 该方法先关闭当前 reader,再调用 open_next_reader;子类不应重复实现这个循环。 + Status next_reader() { + // 多文件切换的公共流程留在基类:关闭当前 reader,然后打开下一个 reader。 + // 子类只通过 open_next_reader 提供具体表格式的 task/split 打开方式。 + RETURN_IF_ERROR(close_current_reader()); + bool has_reader = false; + RETURN_IF_ERROR(open_next_reader(&has_reader)); + _has_current_reader = has_reader; return Status::OK(); } - virtual Status next(Block* table_block, size_t* rows, bool* eof) { + // 打开下一个具体 reader。 + // 子类在这里选择下一个 split/task,创建或重置底层 FileReader,并设置 has_reader。 + // has_reader=false 表示没有更多输入,TableReader::next 会返回 eof=true。 + virtual Status open_next_reader(bool* has_reader) { + // stub 默认没有下一个 reader。 + if (has_reader != nullptr) { + *has_reader = false; + } + return Status::OK(); + } + + // 从当前 reader 读取一批 table block。 + // 子类应在这里读取 file-local block,并完成 delete、virtual column、finalize_expr + // 等 table-level 处理,最终写入 table_block。 + virtual Status read_current(Block* table_block, size_t* rows, bool* eof) { + // stub 默认当前 reader 立即 EOF。 (void)table_block; if (rows != nullptr) { *rows = 0; @@ -247,10 +328,12 @@ public: return Status::OK(); } - virtual Status close() { return Status::OK(); } + // 关闭当前具体 reader。 + // 该 hook 会被 next_reader 和 close 调用;实现应保持幂等。 + virtual Status close_current_reader() { return Status::OK(); } -protected: TableReadOptions _options; + bool _has_current_reader = false; }; } // namespace doris::reader diff --git a/be/src/format/table/iceberg_reader_v2.h b/be/src/format/table/iceberg_reader_v2.h index 70ee2bb3ff5..29b556f71ed 100644 --- a/be/src/format/table/iceberg_reader_v2.h +++ b/be/src/format/table/iceberg_reader_v2.h @@ -69,6 +69,17 @@ struct IcebergReadOptions { bool enable_deletion_vector = true; }; +// IcebergTableReader 的完整初始化输入。 +// 这些字段共同决定一次 table scan 的语义,除非后续有明确的生命周期差异,否则不拆成 +// bind/init/set_tasks 多个阶段,避免调用点暴露半初始化状态。 +struct IcebergTableReadParams { + IcebergReadOptions options; + std::vector<reader::TableColumn> iceberg_schema; + reader::TableScanRequest scan_request; + std::vector<IcebergScanTask> scan_tasks; + std::unique_ptr<reader::FileReader> data_reader; +}; + // Iceberg table-level reader。 // 该层继承 TableReader,复用多文件编排和动态分区裁剪等通用能力;同时组合 // FileReader 完成 data file 物理读取,不继承具体文件格式 reader。 @@ -76,30 +87,36 @@ class IcebergTableReader : public reader::TableReader { public: IcebergTableReader() = default; - explicit IcebergTableReader(std::unique_ptr<reader::FileReader> data_reader) - : _data_reader(std::move(data_reader)) {} - ~IcebergTableReader() override = default; - Status init(const IcebergReadOptions& options, - std::unique_ptr<reader::FileReader> data_reader) { - _iceberg_options = options; - _data_reader = std::move(data_reader); - return reader::TableReader::init(options.table_options); - } - - Status bind(const std::vector<reader::TableColumn>& iceberg_schema) { - // 真实实现会绑定 Iceberg 当前 schema,并准备 field-id based mapping 输入。 - _iceberg_schema = iceberg_schema; - return Status::OK(); + // 初始化一次 Iceberg table scan。 + // params 必须一次性提供 schema、projection/filter、scan tasks 和底层 FileReader; + // 这样 IcebergTableReader 不会暴露 bind/set_tasks 等半初始化阶段。 + Status init(IcebergTableReadParams params) { + // 一次性保存 Iceberg table scan 所需输入。TableReader 负责 reader 切换流程; + // IcebergTableReader 只提供后续要打开的 task 以及 table/file schema 映射语义。 + _iceberg_options = params.options; + _iceberg_schema = std::move(params.iceberg_schema); + _table_scan_request = std::move(params.scan_request); + _scan_tasks = std::move(params.scan_tasks); + _data_reader = std::move(params.data_reader); + _next_task_idx = 0; + return reader::TableReader::init(_iceberg_options.table_options); } - Status init(const reader::TableScanRequest& request) { - // 保存 table-level projection/filter,后续由 TableColumnMapper 转成 FileScanRequest。 - _table_scan_request = request; + // 关闭当前 Iceberg scan。 + // 先让 TableReader 关闭当前 task reader,再释放 IcebergTableReader 持有的底层 + // FileReader。 + Status close() override { + RETURN_IF_ERROR(reader::TableReader::close()); + _data_reader.reset(); return Status::OK(); } +protected: + // 打开单个 Iceberg scan task。 + // 该方法完成当前 data file 的 schema mapping、filter localization、position delete + // 注入,并初始化底层 FileReader;它由 TableReader 的 reader 切换流程调用。 Status open_task(const IcebergScanTask& task) { // 真实实现会读取 data file schema,创建 field-id mapping,应用 position deletes, // 并初始化底层 ParquetReader。 @@ -123,7 +140,27 @@ public: return Status::OK(); } - Status next(Block* table_block, size_t* rows, bool* eof) override { + // 打开下一个 Iceberg task。 + // TableReader 负责循环和 EOF 处理;这里仅从 _scan_tasks 中取下一个 task 并调用 + // open_task。 + Status open_next_reader(bool* has_reader) override { + if (_next_task_idx >= _scan_tasks.size()) { + if (has_reader != nullptr) { + *has_reader = false; + } + return Status::OK(); + } + RETURN_IF_ERROR(open_task(_scan_tasks[_next_task_idx++])); + if (has_reader != nullptr) { + *has_reader = true; + } + return Status::OK(); + } + + // 读取当前 Iceberg task 的下一批 table block。 + // 这里组合底层 FileReader 输出的 file-local block,并负责 equality delete、 + // virtual columns 和 finalize,最终输出 table/global schema block。 + Status read_current(Block* table_block, size_t* rows, bool* eof) override { // 真实实现会读取 file-local block,finalize 成 table block,再应用 equality delete // 和 Iceberg virtual columns。stub 默认 EOF。 // 后续实现应在 IcebergTableReader 内部持有 file-local block;这里仅复用输出指针 @@ -138,6 +175,9 @@ public: return Status::OK(); } + // 将 file-local block 转换为 table/global schema block。 + // 这里执行 ColumnMapping 中的 finalize_expr、缺失列填充、partition/generated 列 + // 物化以及复杂列 remap。 Status finalize_chunk(Block* file_block, Block* table_block) { // 真实实现会根据 ColumnMapping 执行 finalize_expr/default/partition/generated // expressions,把 file-local block 写成 table block。 @@ -146,18 +186,24 @@ public: return Status::OK(); } + // 将 Iceberg position delete / deletion vector 转换成底层 reader 可消费的删除信息。 + // 这一步发生在读取 data file 前,因此会修改 FileScanRequest。 Status apply_position_deletes(reader::FileScanRequest* request) { // 真实实现会把 position delete / deletion vector 转换成 file-local delete 信息。 (void)request; return Status::OK(); } + // 在 table block 上应用 equality delete。 + // equality delete 依赖 table-level 列语义,因此不能下沉到 ParquetReader。 Status apply_equality_deletes(Block* table_block) { // 真实实现会在 table block 上应用 equality delete。 (void)table_block; return Status::OK(); } + // 物化 Iceberg 虚拟列。 + // 例如 _row_id、_last_updated_sequence_number 等,它们不来自 Parquet 文件物理列。 Status materialize_virtual_columns(Block* table_block, size_t rows) { // 真实实现会物化 _row_id、_last_updated_sequence_number 等 Iceberg 虚拟列。 (void)table_block; @@ -165,17 +211,20 @@ public: return Status::OK(); } - Status close() override { + // 关闭当前 task 对应的底层 FileReader。 + // 该方法由 TableReader 在切换 reader 或 close 时调用,要求可重复调用。 + Status close_current_reader() override { if (_data_reader) { RETURN_IF_ERROR(_data_reader->close()); } - _data_reader.reset(); return Status::OK(); } private: IcebergReadOptions _iceberg_options; IcebergScanTask _scan_task; + std::vector<IcebergScanTask> _scan_tasks; + size_t _next_task_idx = 0; reader::TableScanRequest _table_scan_request; std::vector<reader::TableColumn> _iceberg_schema; std::vector<reader::ColumnMapping> _mappings; diff --git a/docs/doris-iceberg-parquet-api-design.md b/docs/doris-iceberg-parquet-api-design.md index 58036667d44..6518043b40d 100644 --- a/docs/doris-iceberg-parquet-api-design.md +++ b/docs/doris-iceberg-parquet-api-design.md @@ -74,6 +74,8 @@ namespace doris::reader - 管理 scan 生命周期; - 承接动态分区裁剪等 table-level 通用过滤逻辑; - 对外统一输出 table block。 +- `next` 是基类统一入口,内部负责 EOF 后切换 reader;具体表格式只提供打开和读取 + 当前 reader 的 hook。 建议接口形状: @@ -86,9 +88,14 @@ public: virtual Status init(const TableReadOptions& options); virtual Status filter(const VExprContextSPtr& expr, bool* can_filter_all); - virtual Status next_reader(); - virtual Status next(Block* table_block, size_t* rows, bool* eof); + Status next(Block* table_block, size_t* rows, bool* eof); virtual Status close(); + +protected: + Status next_reader(); + virtual Status open_next_reader(bool* has_reader); + virtual Status read_current(Block* table_block, size_t* rows, bool* eof); + virtual Status close_current_reader(); }; } // namespace doris::reader @@ -99,6 +106,7 @@ public: - `TableReader` 输出的是 table block,不输出 file-local block。 - `TableReader` 负责多文件编排和 table-level 通用裁剪,不负责 schema mapping,不负责 Parquet 物理解码。 +- `next_reader` 是 `TableReader` 自己的通用切换逻辑,不作为子类公开 override 接口。 - 动态分区裁剪这类逻辑应下放到 `TableReader`,而不是散落在具体表格式 reader 中。 - `TableReader` 不直接依赖旧 `vparquet` 表层语义。 @@ -122,7 +130,7 @@ namespace doris::iceberg 建议职责: - 绑定 Iceberg 当前 table schema; -- 接收 `IcebergScanTask`; +- 接收 `IcebergScanTask` 列表,并按 `TableReader` 的统一调度打开当前 task; - 处理 position delete、equality delete、deletion vector; - 物化 `_row_id`、`_last_updated_sequence_number` 等虚拟列; - 将 `ParquetReader` 返回的 file-local block finalize 成 table block。 @@ -136,13 +144,13 @@ class IcebergTableReader : public reader::TableReader { public: virtual ~IcebergTableReader() = default; - Status init(const IcebergReadOptions& options, - std::unique_ptr<reader::FileReader> data_reader); - Status bind(const std::vector<reader::TableColumn>& iceberg_schema); - Status init(const reader::TableScanRequest& request); - Status open_task(const IcebergScanTask& task); - Status next(Block* table_block, size_t* rows, bool* eof) override; + Status init(IcebergTableReadParams params); Status close() override; + +protected: + Status open_next_reader(bool* has_reader) override; + Status read_current(Block* table_block, size_t* rows, bool* eof) override; + Status close_current_reader() override; }; } // namespace doris::iceberg @@ -153,6 +161,11 @@ public: - `IcebergTableReader` 继承 `TableReader`,并通过组合使用 `FileReader`。 - `IcebergTableReader` 不做 Parquet page/column 解码。 - `IcebergTableReader` 负责 table-level finalize,不负责 file-local pruning 实现。 +- `IcebergTableReader` 的 schema、scan request、scan tasks 和底层 `FileReader` 应通过 + 一个初始化参数对象一次性传入;除非存在明确生命周期差异,不拆成 `bind` / + `init(TableScanRequest)` / `set_scan_tasks` 多阶段接口。 +- `IcebergTableReader` 不重新实现 reader 切换循环,只实现打开 Iceberg task、读取当前 + task 和关闭当前 reader 的 hook。 ### TableColumnMapper @@ -430,6 +443,21 @@ Iceberg 场景下,column id 默认对应 field id。 它是 `IcebergTableReader` 的输入,不应直接传给 `ParquetReader`。 +### IcebergTableReadParams + +`IcebergTableReadParams` 表示一次 Iceberg table scan 的完整初始化输入。 + +建议包含的信息: + +- Iceberg read options; +- Iceberg table schema; +- table scan request; +- Iceberg scan task 列表; +- 底层 `FileReader`。 + +它用于避免 `IcebergTableReader` 暴露多个半初始化阶段。调用方应一次性构造完整 +参数并调用 `init`。 + ## 设计原则 ### 边界原则 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
