This is an automated email from the ASF dual-hosted git repository.

suxiaogang223 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 57178c889aa1fdaa64efc9ac9e9f54d6753ea2c2
Author: Socrates <[email protected]>
AuthorDate: Mon May 18 16:16:50 2026 +0800

    Refine Iceberg reader API boundaries
---
 be/src/format/parquet/parquet_reader.h   | 10 ++++
 be/src/format/reader/file_reader.h       | 14 +++++
 be/src/format/reader/table_reader.h      | 93 ++++++++++++++++++++++++++++++--
 be/src/format/table/iceberg_reader_v2.h  | 89 +++++++++++++++++++++++-------
 docs/doris-iceberg-parquet-api-design.md | 46 ++++++++++++----
 5 files changed, 218 insertions(+), 34 deletions(-)

diff --git a/be/src/format/parquet/parquet_reader.h 
b/be/src/format/parquet/parquet_reader.h
index dfac6494cd8..65227aba04c 100644
--- a/be/src/format/parquet/parquet_reader.h
+++ b/be/src/format/parquet/parquet_reader.h
@@ -42,23 +42,33 @@ class ParquetReader : public reader::FileReader {
 public:
     virtual ~ParquetReader() = default;
 
+    // 解析 Parquet footer 并返回 Parquet 文件自身的 schema。
+    // 这里不做 Iceberg schema evolution,也不把字段转换成 table/global schema。
     Status get_schema(std::vector<reader::SchemaField>* file_schema) const 
override {
         // 真实实现会从 Parquet footer / schema descriptor 展开 file-local schema。
         file_schema->clear();
         return Status::OK();
     }
 
+    // 初始化 Parquet 专属 scan。
+    // 后续可以在 ParquetScanRequest 中扩展 row group、page index、bloom filter 等
+    // Parquet-only 选项;table-level 语义仍然必须由 TableColumnMapper 提前转换。
     Status init(const ParquetScanRequest& request) {
         // 真实实现会根据 projected_file_columns、local_filters 和 reader_expression_map
         // 初始化 row group、column chunk、page reader 以及延时物化计划。
         return reader::FileReader::init(request);
     }
 
+    // 读取下一批 Parquet file-local block。
+    // 返回列必须保持 file-local 语义,不能在这里补 default/generated/partition 列。
     Status next(Block* file_block, size_t* rows, bool* eof) override {
         // 真实实现会输出 file-local block。stub 默认立即 EOF。
         return reader::FileReader::next(file_block, rows, eof);
     }
 
+    // 通用 FileReader 初始化入口。
+    // 当上层只持有 reader::FileReader 指针时会走该接口;Parquet 专属参数通过
+    // ParquetScanRequest 重载表达。
     Status init(const reader::FileScanRequest& request) override {
         return reader::FileReader::init(request);
     }
diff --git a/be/src/format/reader/file_reader.h 
b/be/src/format/reader/file_reader.h
index af03691b94e..fd5bfcf933f 100644
--- a/be/src/format/reader/file_reader.h
+++ b/be/src/format/reader/file_reader.h
@@ -87,6 +87,9 @@ class FileReader {
 public:
     virtual ~FileReader() = default;
 
+    // 打开一个物理文件并加载文件级元数据。
+    // 该方法只建立 file-local reader 状态,不接收 table schema,也不做 projection/filter
+    // 规划;这些输入由 init(FileScanRequest) 提供。
     virtual Status open(io::FileReaderSPtr file, io::IOContext* io_ctx = 
nullptr) {
         // 真实实现会保存文件句柄、IO 上下文并读取文件元数据。
         _file = std::move(file);
@@ -95,12 +98,18 @@ public:
         return Status::OK();
     }
 
+    // 返回文件自己的 schema 视图。
+    // 返回结果必须是 file-local schema:列 id、类型和 children 都按文件格式展开,
+    // 不在这里解释 Iceberg field id、缺失列、默认值或 generated column。
     virtual Status get_schema(std::vector<SchemaField>* file_schema) const {
         // 真实实现会展开文件格式自己的 file-local schema。
         file_schema->clear();
         return Status::OK();
     }
 
+    // 初始化一次 file-local scan。
+    // request 由 TableColumnMapper 生成,只包含文件列投影、本地过滤条件和 reader
+    // expression。FileReader 可以基于它初始化 row group/page/stripe 等文件格式计划。
     virtual Status init(const FileScanRequest& request) {
         // 真实实现会根据 projected columns、local filters 和 reader expressions
         // 初始化文件格式自己的物理读取计划。
@@ -110,6 +119,9 @@ public:
         return Status::OK();
     }
 
+    // 读取下一批 file-local block。
+    // file_block 的列顺序和类型必须遵守 FileScanRequest,而不是 table/global schema。
+    // eof 表示当前文件 reader 是否读完;多文件切换由 TableReader 负责。
     virtual Status next(Block* file_block, size_t* rows, bool* eof) {
         // stub 默认立即 EOF。
         (void)file_block;
@@ -123,6 +135,8 @@ public:
         return Status::OK();
     }
 
+    // 关闭当前物理文件 reader 并释放文件层状态。
+    // 该方法不处理 table-level delete/finalize 状态,后者由 TableReader 子类管理。
     virtual Status close() {
         _file.reset();
         _io_ctx = nullptr;
diff --git a/be/src/format/reader/table_reader.h 
b/be/src/format/reader/table_reader.h
index 422ee3142d1..8d88ce4fe1b 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -113,6 +113,9 @@ public:
     explicit TableColumnMapper(TableColumnMapperOptions options = {}) : 
_options(std::move(options)) {}
     virtual ~TableColumnMapper() = default;
 
+    // 建立 table schema 到 file schema 的列映射。
+    // 输出的 ColumnMapping 描述 table column 如何从 file column、常量列或表达式得到;
+    // 后续 projection、filter localization 和 table block finalize 都应复用这份映射。
     virtual Status create_mapping(const std::vector<TableColumn>& table_schema,
                                   const std::vector<SchemaField>& file_schema,
                                   std::vector<ColumnMapping>* mappings) {
@@ -136,6 +139,9 @@ public:
         return Status::OK();
     }
 
+    // 把 table-level scan 请求转换成 file-local scan 请求。
+    // table_request 使用 table/global schema;file_request 只包含 FileReader 能理解的
+    // projected_file_columns、local_filters 和 reader_expression_map。
     virtual Status create_scan_request(const TableScanRequest& table_request,
                                        const std::vector<ColumnMapping>& 
mappings,
                                        FileScanRequest* file_request) {
@@ -154,6 +160,9 @@ public:
         return Status::OK();
     }
 
+    // 将 table-level filter 定位到文件 schema。
+    // trivial mapping 可以直接复制结构化谓词;类型变化时可以尝试安全 cast;无法安全
+    // 下推的表达式应通过 reader_expression_map 或 table-level finalize/filter fallback 
处理。
     virtual Status localize_filters(const std::vector<TableFilter>& 
table_filters,
                                     FileScanRequest* file_request) const {
         // 真实实现会处理 trivial mapping、safe cast、reader expression fallback 和
@@ -213,15 +222,21 @@ struct TableReadOptions {
 
 // table-level reader 基类。
 // 该层负责多文件编排和动态分区裁剪等通用 table-level 逻辑,对外输出 table block。
+// 子类只需要实现“如何打开下一个具体 reader”和“如何读取当前 reader”的表格式语义。
 class TableReader {
 public:
     virtual ~TableReader() = default;
 
+    // 初始化 table reader 的通用运行参数。
+    // 子类可以在自己的 init(params) 中调用该方法;这里不接收具体表格式 schema/task。
     virtual Status init(const TableReadOptions& options) {
         _options = options;
         return Status::OK();
     }
 
+    // table-level 动态过滤入口。
+    // 该方法用于根据 split、partition value 或文件级统计判断是否可以跳过后续 reader。
+    // can_filter_all=true 表示当前 table reader 范围内的数据都可以被裁剪。
     virtual Status filter(const VExprContextSPtr& expr, bool* can_filter_all) {
         // 真实实现会基于 split/partition/file stats 判断动态分区裁剪结果。
         (void)expr;
@@ -231,12 +246,78 @@ public:
         return Status::OK();
     }
 
-    virtual Status next_reader() {
-        // 真实实现会切换到下一个 data file / split reader。
+    // 对外读取 table block 的统一入口。
+    // 基类负责 current reader 的打开、EOF 后切换和关闭;子类只实现 protected hook。
+    // table_block 的列必须已经是 table/global schema 语义。
+    Status next(Block* table_block, size_t* rows, bool* eof) {
+        if (rows != nullptr) {
+            *rows = 0;
+        }
+        if (eof != nullptr) {
+            *eof = false;
+        }
+        while (true) {
+            if (!_has_current_reader) {
+                RETURN_IF_ERROR(next_reader());
+                if (!_has_current_reader) {
+                    if (eof != nullptr) {
+                        *eof = true;
+                    }
+                    return Status::OK();
+                }
+            }
+
+            size_t current_rows = 0;
+            bool current_eof = false;
+            RETURN_IF_ERROR(read_current(table_block, &current_rows, 
&current_eof));
+            if (rows != nullptr) {
+                *rows = current_rows;
+            }
+            if (!current_eof || current_rows > 0) {
+                return Status::OK();
+            }
+            RETURN_IF_ERROR(close_current_reader());
+            _has_current_reader = false;
+        }
+    }
+
+    // 关闭 table reader 及当前正在读取的底层 reader。
+    // 子类如果持有额外表格式资源,应 override 后先调用 TableReader::close()。
+    virtual Status close() {
+        RETURN_IF_ERROR(close_current_reader());
+        _has_current_reader = false;
+        return Status::OK();
+    }
+
+protected:
+    // 切换到下一个 reader 的通用流程。
+    // 该方法先关闭当前 reader,再调用 open_next_reader;子类不应重复实现这个循环。
+    Status next_reader() {
+        // 多文件切换的公共流程留在基类:关闭当前 reader,然后打开下一个 reader。
+        // 子类只通过 open_next_reader 提供具体表格式的 task/split 打开方式。
+        RETURN_IF_ERROR(close_current_reader());
+        bool has_reader = false;
+        RETURN_IF_ERROR(open_next_reader(&has_reader));
+        _has_current_reader = has_reader;
         return Status::OK();
     }
 
-    virtual Status next(Block* table_block, size_t* rows, bool* eof) {
+    // 打开下一个具体 reader。
+    // 子类在这里选择下一个 split/task,创建或重置底层 FileReader,并设置 has_reader。
+    // has_reader=false 表示没有更多输入,TableReader::next 会返回 eof=true。
+    virtual Status open_next_reader(bool* has_reader) {
+        // stub 默认没有下一个 reader。
+        if (has_reader != nullptr) {
+            *has_reader = false;
+        }
+        return Status::OK();
+    }
+
+    // 从当前 reader 读取一批 table block。
+    // 子类应在这里读取 file-local block,并完成 delete、virtual column、finalize_expr
+    // 等 table-level 处理,最终写入 table_block。
+    virtual Status read_current(Block* table_block, size_t* rows, bool* eof) {
+        // stub 默认当前 reader 立即 EOF。
         (void)table_block;
         if (rows != nullptr) {
             *rows = 0;
@@ -247,10 +328,12 @@ public:
         return Status::OK();
     }
 
-    virtual Status close() { return Status::OK(); }
+    // 关闭当前具体 reader。
+    // 该 hook 会被 next_reader 和 close 调用;实现应保持幂等。
+    virtual Status close_current_reader() { return Status::OK(); }
 
-protected:
     TableReadOptions _options;
+    bool _has_current_reader = false;
 };
 
 } // namespace doris::reader
diff --git a/be/src/format/table/iceberg_reader_v2.h 
b/be/src/format/table/iceberg_reader_v2.h
index 70ee2bb3ff5..29b556f71ed 100644
--- a/be/src/format/table/iceberg_reader_v2.h
+++ b/be/src/format/table/iceberg_reader_v2.h
@@ -69,6 +69,17 @@ struct IcebergReadOptions {
     bool enable_deletion_vector = true;
 };
 
+// IcebergTableReader 的完整初始化输入。
+// 这些字段共同决定一次 table scan 的语义,除非后续有明确的生命周期差异,否则不拆成
+// bind/init/set_tasks 多个阶段,避免调用点暴露半初始化状态。
+struct IcebergTableReadParams {
+    IcebergReadOptions options;
+    std::vector<reader::TableColumn> iceberg_schema;
+    reader::TableScanRequest scan_request;
+    std::vector<IcebergScanTask> scan_tasks;
+    std::unique_ptr<reader::FileReader> data_reader;
+};
+
 // Iceberg table-level reader。
 // 该层继承 TableReader,复用多文件编排和动态分区裁剪等通用能力;同时组合
 // FileReader 完成 data file 物理读取,不继承具体文件格式 reader。
@@ -76,30 +87,36 @@ class IcebergTableReader : public reader::TableReader {
 public:
     IcebergTableReader() = default;
 
-    explicit IcebergTableReader(std::unique_ptr<reader::FileReader> 
data_reader)
-            : _data_reader(std::move(data_reader)) {}
-
     ~IcebergTableReader() override = default;
 
-    Status init(const IcebergReadOptions& options,
-                std::unique_ptr<reader::FileReader> data_reader) {
-        _iceberg_options = options;
-        _data_reader = std::move(data_reader);
-        return reader::TableReader::init(options.table_options);
-    }
-
-    Status bind(const std::vector<reader::TableColumn>& iceberg_schema) {
-        // 真实实现会绑定 Iceberg 当前 schema,并准备 field-id based mapping 输入。
-        _iceberg_schema = iceberg_schema;
-        return Status::OK();
+    // 初始化一次 Iceberg table scan。
+    // params 必须一次性提供 schema、projection/filter、scan tasks 和底层 FileReader;
+    // 这样 IcebergTableReader 不会暴露 bind/set_tasks 等半初始化阶段。
+    Status init(IcebergTableReadParams params) {
+        // 一次性保存 Iceberg table scan 所需输入。TableReader 负责 reader 切换流程;
+        // IcebergTableReader 只提供后续要打开的 task 以及 table/file schema 映射语义。
+        _iceberg_options = params.options;
+        _iceberg_schema = std::move(params.iceberg_schema);
+        _table_scan_request = std::move(params.scan_request);
+        _scan_tasks = std::move(params.scan_tasks);
+        _data_reader = std::move(params.data_reader);
+        _next_task_idx = 0;
+        return reader::TableReader::init(_iceberg_options.table_options);
     }
 
-    Status init(const reader::TableScanRequest& request) {
-        // 保存 table-level projection/filter,后续由 TableColumnMapper 转成 
FileScanRequest。
-        _table_scan_request = request;
+    // 关闭当前 Iceberg scan。
+    // 先让 TableReader 关闭当前 task reader,再释放 IcebergTableReader 持有的底层
+    // FileReader。
+    Status close() override {
+        RETURN_IF_ERROR(reader::TableReader::close());
+        _data_reader.reset();
         return Status::OK();
     }
 
+protected:
+    // 打开单个 Iceberg scan task。
+    // 该方法完成当前 data file 的 schema mapping、filter localization、position delete
+    // 注入,并初始化底层 FileReader;它由 TableReader 的 reader 切换流程调用。
     Status open_task(const IcebergScanTask& task) {
         // 真实实现会读取 data file schema,创建 field-id mapping,应用 position deletes,
         // 并初始化底层 ParquetReader。
@@ -123,7 +140,27 @@ public:
         return Status::OK();
     }
 
-    Status next(Block* table_block, size_t* rows, bool* eof) override {
+    // 打开下一个 Iceberg task。
+    // TableReader 负责循环和 EOF 处理;这里仅从 _scan_tasks 中取下一个 task 并调用
+    // open_task。
+    Status open_next_reader(bool* has_reader) override {
+        if (_next_task_idx >= _scan_tasks.size()) {
+            if (has_reader != nullptr) {
+                *has_reader = false;
+            }
+            return Status::OK();
+        }
+        RETURN_IF_ERROR(open_task(_scan_tasks[_next_task_idx++]));
+        if (has_reader != nullptr) {
+            *has_reader = true;
+        }
+        return Status::OK();
+    }
+
+    // 读取当前 Iceberg task 的下一批 table block。
+    // 这里组合底层 FileReader 输出的 file-local block,并负责 equality delete、
+    // virtual columns 和 finalize,最终输出 table/global schema block。
+    Status read_current(Block* table_block, size_t* rows, bool* eof) override {
         // 真实实现会读取 file-local block,finalize 成 table block,再应用 equality delete
         // 和 Iceberg virtual columns。stub 默认 EOF。
         // 后续实现应在 IcebergTableReader 内部持有 file-local block;这里仅复用输出指针
@@ -138,6 +175,9 @@ public:
         return Status::OK();
     }
 
+    // 将 file-local block 转换为 table/global schema block。
+    // 这里执行 ColumnMapping 中的 finalize_expr、缺失列填充、partition/generated 列
+    // 物化以及复杂列 remap。
     Status finalize_chunk(Block* file_block, Block* table_block) {
         // 真实实现会根据 ColumnMapping 执行 finalize_expr/default/partition/generated
         // expressions,把 file-local block 写成 table block。
@@ -146,18 +186,24 @@ public:
         return Status::OK();
     }
 
+    // 将 Iceberg position delete / deletion vector 转换成底层 reader 可消费的删除信息。
+    // 这一步发生在读取 data file 前,因此会修改 FileScanRequest。
     Status apply_position_deletes(reader::FileScanRequest* request) {
         // 真实实现会把 position delete / deletion vector 转换成 file-local delete 信息。
         (void)request;
         return Status::OK();
     }
 
+    // 在 table block 上应用 equality delete。
+    // equality delete 依赖 table-level 列语义,因此不能下沉到 ParquetReader。
     Status apply_equality_deletes(Block* table_block) {
         // 真实实现会在 table block 上应用 equality delete。
         (void)table_block;
         return Status::OK();
     }
 
+    // 物化 Iceberg 虚拟列。
+    // 例如 _row_id、_last_updated_sequence_number 等,它们不来自 Parquet 文件物理列。
     Status materialize_virtual_columns(Block* table_block, size_t rows) {
         // 真实实现会物化 _row_id、_last_updated_sequence_number 等 Iceberg 虚拟列。
         (void)table_block;
@@ -165,17 +211,20 @@ public:
         return Status::OK();
     }
 
-    Status close() override {
+    // 关闭当前 task 对应的底层 FileReader。
+    // 该方法由 TableReader 在切换 reader 或 close 时调用,要求可重复调用。
+    Status close_current_reader() override {
         if (_data_reader) {
             RETURN_IF_ERROR(_data_reader->close());
         }
-        _data_reader.reset();
         return Status::OK();
     }
 
 private:
     IcebergReadOptions _iceberg_options;
     IcebergScanTask _scan_task;
+    std::vector<IcebergScanTask> _scan_tasks;
+    size_t _next_task_idx = 0;
     reader::TableScanRequest _table_scan_request;
     std::vector<reader::TableColumn> _iceberg_schema;
     std::vector<reader::ColumnMapping> _mappings;
diff --git a/docs/doris-iceberg-parquet-api-design.md 
b/docs/doris-iceberg-parquet-api-design.md
index 58036667d44..6518043b40d 100644
--- a/docs/doris-iceberg-parquet-api-design.md
+++ b/docs/doris-iceberg-parquet-api-design.md
@@ -74,6 +74,8 @@ namespace doris::reader
 - 管理 scan 生命周期;
 - 承接动态分区裁剪等 table-level 通用过滤逻辑;
 - 对外统一输出 table block。
+- `next` 是基类统一入口,内部负责 EOF 后切换 reader;具体表格式只提供打开和读取
+  当前 reader 的 hook。
 
 建议接口形状:
 
@@ -86,9 +88,14 @@ public:
 
     virtual Status init(const TableReadOptions& options);
     virtual Status filter(const VExprContextSPtr& expr, bool* can_filter_all);
-    virtual Status next_reader();
-    virtual Status next(Block* table_block, size_t* rows, bool* eof);
+    Status next(Block* table_block, size_t* rows, bool* eof);
     virtual Status close();
+
+protected:
+    Status next_reader();
+    virtual Status open_next_reader(bool* has_reader);
+    virtual Status read_current(Block* table_block, size_t* rows, bool* eof);
+    virtual Status close_current_reader();
 };
 
 } // namespace doris::reader
@@ -99,6 +106,7 @@ public:
 - `TableReader` 输出的是 table block,不输出 file-local block。
 - `TableReader` 负责多文件编排和 table-level 通用裁剪,不负责 schema mapping,不负责
   Parquet 物理解码。
+- `next_reader` 是 `TableReader` 自己的通用切换逻辑,不作为子类公开 override 接口。
 - 动态分区裁剪这类逻辑应下放到 `TableReader`,而不是散落在具体表格式 reader 中。
 - `TableReader` 不直接依赖旧 `vparquet` 表层语义。
 
@@ -122,7 +130,7 @@ namespace doris::iceberg
 建议职责:
 
 - 绑定 Iceberg 当前 table schema;
-- 接收 `IcebergScanTask`;
+- 接收 `IcebergScanTask` 列表,并按 `TableReader` 的统一调度打开当前 task;
 - 处理 position delete、equality delete、deletion vector;
 - 物化 `_row_id`、`_last_updated_sequence_number` 等虚拟列;
 - 将 `ParquetReader` 返回的 file-local block finalize 成 table block。
@@ -136,13 +144,13 @@ class IcebergTableReader : public reader::TableReader {
 public:
     virtual ~IcebergTableReader() = default;
 
-    Status init(const IcebergReadOptions& options,
-                std::unique_ptr<reader::FileReader> data_reader);
-    Status bind(const std::vector<reader::TableColumn>& iceberg_schema);
-    Status init(const reader::TableScanRequest& request);
-    Status open_task(const IcebergScanTask& task);
-    Status next(Block* table_block, size_t* rows, bool* eof) override;
+    Status init(IcebergTableReadParams params);
     Status close() override;
+
+protected:
+    Status open_next_reader(bool* has_reader) override;
+    Status read_current(Block* table_block, size_t* rows, bool* eof) override;
+    Status close_current_reader() override;
 };
 
 } // namespace doris::iceberg
@@ -153,6 +161,11 @@ public:
 - `IcebergTableReader` 继承 `TableReader`,并通过组合使用 `FileReader`。
 - `IcebergTableReader` 不做 Parquet page/column 解码。
 - `IcebergTableReader` 负责 table-level finalize,不负责 file-local pruning 实现。
+- `IcebergTableReader` 的 schema、scan request、scan tasks 和底层 `FileReader` 应通过
+  一个初始化参数对象一次性传入;除非存在明确生命周期差异,不拆成 `bind` /
+  `init(TableScanRequest)` / `set_scan_tasks` 多阶段接口。
+- `IcebergTableReader` 不重新实现 reader 切换循环,只实现打开 Iceberg task、读取当前
+  task 和关闭当前 reader 的 hook。
 
 ### TableColumnMapper
 
@@ -430,6 +443,21 @@ Iceberg 场景下,column id 默认对应 field id。
 
 它是 `IcebergTableReader` 的输入,不应直接传给 `ParquetReader`。
 
+### IcebergTableReadParams
+
+`IcebergTableReadParams` 表示一次 Iceberg table scan 的完整初始化输入。
+
+建议包含的信息:
+
+- Iceberg read options;
+- Iceberg table schema;
+- table scan request;
+- Iceberg scan task 列表;
+- 底层 `FileReader`。
+
+它用于避免 `IcebergTableReader` 暴露多个半初始化阶段。调用方应一次性构造完整
+参数并调用 `init`。
+
 ## 设计原则
 
 ### 边界原则


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to