This is an automated email from the ASF dual-hosted git repository.

suxiaogang223 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ef45c206aa3b6dc6fe5ac62b5d1694f90d3cc386
Author: Socrates <[email protected]>
AuthorDate: Mon May 18 15:52:10 2026 +0800

    Add Iceberg Parquet reader API skeleton
---
 be/src/format/parquet/parquet_reader.h   |  67 +++++
 be/src/format/reader/file_reader.h       | 141 +++++++++
 be/src/format/reader/table_reader.h      | 256 ++++++++++++++++
 be/src/format/table/iceberg_reader_v2.h  | 186 ++++++++++++
 docs/doris-iceberg-parquet-api-design.md | 483 +++++++++++++++++++++++++++++++
 5 files changed, 1133 insertions(+)

diff --git a/be/src/format/parquet/parquet_reader.h 
b/be/src/format/parquet/parquet_reader.h
new file mode 100644
index 00000000000..dfac6494cd8
--- /dev/null
+++ b/be/src/format/parquet/parquet_reader.h
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <vector>
+
+#include "common/status.h"
+#include "format/reader/file_reader.h"
+
+namespace doris {
+namespace io {
+struct IOContext;
+} // namespace io
+} // namespace doris
+
+namespace doris::parquet {
+
+// ParquetReader 的 file-local scan 请求。
+// 当前没有新增 Parquet-only 字段,但保留独立类型,便于后续加入 row group/page index
+// 等 Parquet 专属选项。
+struct ParquetScanRequest : public reader::FileScanRequest {};
+
+// Parquet 文件物理读取层。
+// 该类只理解 Parquet file-local schema 和 ParquetScanRequest,不理解 Iceberg/global
+// schema,不处理 table-level cast/default/generated/partition 语义。
+class ParquetReader : public reader::FileReader {
+public:
+    virtual ~ParquetReader() = default;
+
+    Status get_schema(std::vector<reader::SchemaField>* file_schema) const 
override {
+        // 真实实现会从 Parquet footer / schema descriptor 展开 file-local schema。
+        file_schema->clear();
+        return Status::OK();
+    }
+
+    Status init(const ParquetScanRequest& request) {
+        // 真实实现会根据 projected_file_columns、local_filters 和 reader_expression_map
+        // 初始化 row group、column chunk、page reader 以及延时物化计划。
+        return reader::FileReader::init(request);
+    }
+
+    Status next(Block* file_block, size_t* rows, bool* eof) override {
+        // 真实实现会输出 file-local block。stub 默认立即 EOF。
+        return reader::FileReader::next(file_block, rows, eof);
+    }
+
+    Status init(const reader::FileScanRequest& request) override {
+        return reader::FileReader::init(request);
+    }
+};
+
+} // namespace doris::parquet
diff --git a/be/src/format/reader/file_reader.h 
b/be/src/format/reader/file_reader.h
new file mode 100644
index 00000000000..af03691b94e
--- /dev/null
+++ b/be/src/format/reader/file_reader.h
@@ -0,0 +1,141 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "common/status.h"
+#include "core/data_type/data_type.h"
+#include "exprs/vexpr_fwd.h"
+#include "io/fs/file_reader_writer_fwd.h"
+
+namespace doris {
+class Block;
+class ColumnPredicate;
+
+namespace io {
+struct IOContext;
+} // namespace io
+} // namespace doris
+
+namespace doris::reader {
+
+using ColumnId = int32_t;
+
+// 文件本地 schema 字段。
+// 这是 FileReader 暴露给 table 层的 file-local schema 视图,不携带 table/global
+// schema 语义。Iceberg field id、name mapping、default/generated/partition 列都不在
+// FileReader 内部解释。
+struct SchemaField {
+    ColumnId id = -1;
+    std::string name;
+    DataTypePtr type;
+    std::vector<SchemaField> children;
+};
+
+// 已经 localize 到文件 schema 的过滤条件。
+// TableColumnMapper 负责把 table-level filter 转成这个结构;FileReader 只消费
+// file-local column id、表达式和结构化谓词。
+struct FileLocalFilter {
+    ColumnId file_column_id = -1;
+
+    // 表达式过滤。适合 cast、复杂表达式或 reader_expression_map 生成的临时列过滤。
+    // 它通常不能直接驱动 row group stats、page index、dictionary、bloom filter。
+    VExprContextSPtr conjunct;
+
+    // 结构化列谓词。适合文件层 pruning,例如 min/max、page index、dictionary、
+    // bloom filter 等只理解单列谓词的优化。
+    std::vector<std::shared_ptr<ColumnPredicate>> predicates;
+};
+
+// 通用文件层 scan 请求。
+// 该结构描述所有文件格式都可以共享的 file-local 读取输入。这里不出现 table/global
+// schema。所有 schema change、filter localization、default/generated/partition
+// 列都应在 table 层完成。
+struct FileScanRequest {
+    virtual ~FileScanRequest() = default;
+
+    std::vector<ColumnId> projected_file_columns;
+    std::vector<FileLocalFilter> local_filters;
+    std::vector<std::pair<ColumnId, VExprContextSPtr>> reader_expression_map;
+};
+
+// 文件物理读取层通用接口。
+// 该接口只描述 file-local schema、file-local scan request 和 file-local block。
+// TableReader/IcebergTableReader 可以通过它组合不同文件格式 reader。
+class FileReader {
+public:
+    virtual ~FileReader() = default;
+
+    virtual Status open(io::FileReaderSPtr file, io::IOContext* io_ctx = 
nullptr) {
+        // 真实实现会保存文件句柄、IO 上下文并读取文件元数据。
+        _file = std::move(file);
+        _io_ctx = io_ctx;
+        _eof = false;
+        return Status::OK();
+    }
+
+    virtual Status get_schema(std::vector<SchemaField>* file_schema) const {
+        // 真实实现会展开文件格式自己的 file-local schema。
+        file_schema->clear();
+        return Status::OK();
+    }
+
+    virtual Status init(const FileScanRequest& request) {
+        // 真实实现会根据 projected columns、local filters 和 reader expressions
+        // 初始化文件格式自己的物理读取计划。
+        _request.projected_file_columns = request.projected_file_columns;
+        _request.local_filters = request.local_filters;
+        _request.reader_expression_map = request.reader_expression_map;
+        return Status::OK();
+    }
+
+    virtual Status next(Block* file_block, size_t* rows, bool* eof) {
+        // stub 默认立即 EOF。
+        (void)file_block;
+        if (rows != nullptr) {
+            *rows = 0;
+        }
+        if (eof != nullptr) {
+            *eof = true;
+        }
+        _eof = true;
+        return Status::OK();
+    }
+
+    virtual Status close() {
+        _file.reset();
+        _io_ctx = nullptr;
+        _request = FileScanRequest {};
+        _eof = true;
+        return Status::OK();
+    }
+
+protected:
+    io::FileReaderSPtr _file;
+    io::IOContext* _io_ctx = nullptr;
+    FileScanRequest _request;
+    bool _eof = true;
+};
+
+} // namespace doris::reader
diff --git a/be/src/format/reader/table_reader.h 
b/be/src/format/reader/table_reader.h
new file mode 100644
index 00000000000..422ee3142d1
--- /dev/null
+++ b/be/src/format/reader/table_reader.h
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "common/status.h"
+#include "core/data_type/data_type.h"
+#include "exprs/vexpr_fwd.h"
+#include "format/reader/file_reader.h"
+
+namespace doris {
+class Block;
+class ColumnPredicate;
+} // namespace doris
+
+namespace doris::reader {
+
+// table/global schema 中的列视图。
+// Iceberg 场景下,id 默认对应 Iceberg field id。该结构不描述文件中的物理列。
+struct TableColumn {
+    ColumnId id = -1;
+    std::string name;
+    DataTypePtr type;
+    std::vector<TableColumn> children;
+};
+
+// table-level filter。
+// TableColumnMapper 负责把它转换成 FileLocalFilter 或 reader_expression_map。
+struct TableFilter {
+    ColumnId table_column_id = -1;
+
+    // 表达式过滤,适合表达 cast、复杂表达式、复杂列提取等语义。
+    VExprContextSPtr conjunct;
+
+    // 结构化列谓词,适合下推到文件层做 row group stats、page index、dictionary、
+    // bloom filter 等优化。
+    std::vector<std::shared_ptr<ColumnPredicate>> predicates;
+};
+
+// 单个 table column 到 file column 的映射结果。
+// 这是 table 层和 file 层的核心边界对象。
+struct ColumnMapping {
+    ColumnId table_column_id = -1;
+    std::optional<ColumnId> file_column_id;
+    DataTypePtr file_type;
+    DataTypePtr table_type;
+
+    // 最终输出表达式。用于把 file-local value 转成 table/global value,例如 cast、
+    // default、partition、generated column 或复杂列 remap。
+    VExprContextSPtr finalize_expr;
+
+    // 读时过滤 fallback 表达式。只在 table filter 不能安全转换成 file-local predicate
+    // 时使用,服务 reader_expression_map,不等价于 finalize_expr。
+    VExprContextSPtr reader_filter_expr;
+
+    std::vector<ColumnMapping> child_mappings;
+    bool is_trivial = false;
+    bool is_constant = false;
+};
+
+enum class TableColumnMappingMode {
+    BY_FIELD_ID,
+    BY_NAME,
+};
+
+enum class TableFilterConversion {
+    COPY_DIRECTLY,
+    CAST_FILTER,
+    EVALUATE_EXPRESSION,
+    FINALIZE_ONLY,
+};
+
+struct TableColumnMapperOptions {
+    TableColumnMappingMode mode = TableColumnMappingMode::BY_FIELD_ID;
+    bool allow_missing_columns = true;
+    bool enable_reader_expression_fallback = true;
+};
+
+// table-level scan 请求。
+// 它仍然使用 table/global schema 语义,不能直接传给 FileReader。
+struct TableScanRequest {
+    std::vector<TableColumn> projected_table_columns;
+    std::vector<TableFilter> table_filters;
+};
+
+// 通用 table schema 到 file schema 映射层。
+// Iceberg 会使用 BY_FIELD_ID;普通 by-name 场景可以复用该组件,但不应把它命名成
+// Iceberg-only 组件。
+class TableColumnMapper {
+public:
+    explicit TableColumnMapper(TableColumnMapperOptions options = {}) : 
_options(std::move(options)) {}
+    virtual ~TableColumnMapper() = default;
+
+    virtual Status create_mapping(const std::vector<TableColumn>& table_schema,
+                                  const std::vector<SchemaField>& file_schema,
+                                  std::vector<ColumnMapping>* mappings) {
+        // 真实实现会做 field id/name matching、类型转换、复杂列 child mapping、缺失列
+        // default/partition/generated 表达式构造。
+        mappings->clear();
+        for (const auto& table_column : table_schema) {
+            ColumnMapping mapping;
+            mapping.table_column_id = table_column.id;
+            mapping.table_type = table_column.type;
+            if (const auto* file_field = find_file_field(table_column, 
file_schema)) {
+                mapping.file_column_id = file_field->id;
+                mapping.file_type = file_field->type;
+                mapping.is_trivial = is_same_type(mapping.table_type, 
mapping.file_type);
+            } else {
+                mapping.is_constant = true;
+            }
+            mappings->push_back(std::move(mapping));
+        }
+        _mappings = *mappings;
+        return Status::OK();
+    }
+
+    virtual Status create_scan_request(const TableScanRequest& table_request,
+                                       const std::vector<ColumnMapping>& 
mappings,
+                                       FileScanRequest* file_request) {
+        // 真实实现会把 table projection/filter 转换成 file-local projection/filter。
+        file_request->projected_file_columns.clear();
+        file_request->local_filters.clear();
+        file_request->reader_expression_map.clear();
+        _mappings = mappings;
+        for (const auto& table_column : table_request.projected_table_columns) 
{
+            const auto* mapping = find_mapping(table_column.id);
+            if (mapping != nullptr && mapping->file_column_id.has_value()) {
+                
file_request->projected_file_columns.push_back(*mapping->file_column_id);
+            }
+        }
+        RETURN_IF_ERROR(localize_filters(table_request.table_filters, 
file_request));
+        return Status::OK();
+    }
+
+    virtual Status localize_filters(const std::vector<TableFilter>& 
table_filters,
+                                    FileScanRequest* file_request) const {
+        // 真实实现会处理 trivial mapping、safe cast、reader expression fallback 和
+        // finalize-only filter。stub 只复制能够直接定位到 file column 的谓词。
+        for (const auto& filter : table_filters) {
+            const auto* mapping = find_mapping(filter.table_column_id);
+            if (mapping == nullptr || !mapping->file_column_id.has_value()) {
+                continue;
+            }
+            FileLocalFilter local_filter;
+            local_filter.file_column_id = *mapping->file_column_id;
+            local_filter.conjunct = filter.conjunct;
+            local_filter.predicates = filter.predicates;
+            file_request->local_filters.push_back(std::move(local_filter));
+        }
+        return Status::OK();
+    }
+
+    const std::vector<ColumnMapping>& mappings() const { return _mappings; }
+
+private:
+    const SchemaField* find_file_field(
+            const TableColumn& table_column,
+            const std::vector<SchemaField>& file_schema) const {
+        for (const auto& field : file_schema) {
+            if (_options.mode == TableColumnMappingMode::BY_FIELD_ID && 
field.id == table_column.id) {
+                return &field;
+            }
+            if (_options.mode == TableColumnMappingMode::BY_NAME && field.name 
== table_column.name) {
+                return &field;
+            }
+        }
+        return nullptr;
+    }
+
+    const ColumnMapping* find_mapping(ColumnId table_column_id) const {
+        for (const auto& mapping : _mappings) {
+            if (mapping.table_column_id == table_column_id) {
+                return &mapping;
+            }
+        }
+        return nullptr;
+    }
+
+    bool is_same_type(const DataTypePtr& table_type, const DataTypePtr& 
file_type) const {
+        return table_type == file_type;
+    }
+
+private:
+    TableColumnMapperOptions _options;
+    std::vector<ColumnMapping> _mappings;
+};
+
+struct TableReadOptions {
+    size_t batch_size = 4096;
+};
+
+// table-level reader 基类。
+// 该层负责多文件编排和动态分区裁剪等通用 table-level 逻辑,对外输出 table block。
+class TableReader {
+public:
+    virtual ~TableReader() = default;
+
+    virtual Status init(const TableReadOptions& options) {
+        _options = options;
+        return Status::OK();
+    }
+
+    virtual Status filter(const VExprContextSPtr& expr, bool* can_filter_all) {
+        // 真实实现会基于 split/partition/file stats 判断动态分区裁剪结果。
+        (void)expr;
+        if (can_filter_all != nullptr) {
+            *can_filter_all = false;
+        }
+        return Status::OK();
+    }
+
+    virtual Status next_reader() {
+        // 真实实现会切换到下一个 data file / split reader。
+        return Status::OK();
+    }
+
+    virtual Status next(Block* table_block, size_t* rows, bool* eof) {
+        (void)table_block;
+        if (rows != nullptr) {
+            *rows = 0;
+        }
+        if (eof != nullptr) {
+            *eof = true;
+        }
+        return Status::OK();
+    }
+
+    virtual Status close() { return Status::OK(); }
+
+protected:
+    TableReadOptions _options;
+};
+
+} // namespace doris::reader
diff --git a/be/src/format/table/iceberg_reader_v2.h 
b/be/src/format/table/iceberg_reader_v2.h
new file mode 100644
index 00000000000..70ee2bb3ff5
--- /dev/null
+++ b/be/src/format/table/iceberg_reader_v2.h
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "common/status.h"
+#include "format/reader/file_reader.h"
+#include "format/reader/table_reader.h"
+
+namespace doris {
+class Block;
+} // namespace doris
+
+namespace doris::iceberg {
+
+// Iceberg data file 摘要。它描述当前要读取的物理 data file,不承载列映射逻辑。
+struct IcebergDataFile {
+    std::string path;
+    std::string format;
+    int64_t record_count = 0;
+    int64_t file_size = 0;
+    int64_t sequence_number = 0;
+    int64_t first_row_id = -1;
+};
+
+// Iceberg delete file 摘要。position/equality/deletion vector 的具体读取在
+// IcebergTableReader 实现阶段补齐。
+struct IcebergDeleteFile {
+    std::string path;
+    std::string format;
+    int64_t sequence_number = 0;
+    std::vector<reader::ColumnId> equality_field_ids;
+};
+
+// 单个 Iceberg data file 的 scan 输入。
+// 该结构只进入 IcebergTableReader,不直接传给 ParquetReader。
+struct IcebergScanTask {
+    IcebergDataFile data_file;
+    std::vector<IcebergDeleteFile> positional_deletes;
+    std::vector<IcebergDeleteFile> equality_deletes;
+    std::vector<IcebergDeleteFile> deletion_vectors;
+};
+
+struct IcebergReadOptions {
+    reader::TableReadOptions table_options;
+    bool enable_position_delete = true;
+    bool enable_equality_delete = true;
+    bool enable_deletion_vector = true;
+};
+
+// Iceberg table-level reader。
+// 该层继承 TableReader,复用多文件编排和动态分区裁剪等通用能力;同时组合
+// FileReader 完成 data file 物理读取,不继承具体文件格式 reader。
+class IcebergTableReader : public reader::TableReader {
+public:
+    IcebergTableReader() = default;
+
+    explicit IcebergTableReader(std::unique_ptr<reader::FileReader> 
data_reader)
+            : _data_reader(std::move(data_reader)) {}
+
+    ~IcebergTableReader() override = default;
+
+    Status init(const IcebergReadOptions& options,
+                std::unique_ptr<reader::FileReader> data_reader) {
+        _iceberg_options = options;
+        _data_reader = std::move(data_reader);
+        return reader::TableReader::init(options.table_options);
+    }
+
+    Status bind(const std::vector<reader::TableColumn>& iceberg_schema) {
+        // 真实实现会绑定 Iceberg 当前 schema,并准备 field-id based mapping 输入。
+        _iceberg_schema = iceberg_schema;
+        return Status::OK();
+    }
+
+    Status init(const reader::TableScanRequest& request) {
+        // 保存 table-level projection/filter,后续由 TableColumnMapper 转成 
FileScanRequest。
+        _table_scan_request = request;
+        return Status::OK();
+    }
+
+    Status open_task(const IcebergScanTask& task) {
+        // 真实实现会读取 data file schema,创建 field-id mapping,应用 position deletes,
+        // 并初始化底层 ParquetReader。
+        _scan_task = task;
+        std::vector<reader::SchemaField> file_schema;
+        if (_data_reader) {
+            RETURN_IF_ERROR(_data_reader->get_schema(&file_schema));
+        }
+        reader::TableColumnMapperOptions mapper_options;
+        mapper_options.mode = reader::TableColumnMappingMode::BY_FIELD_ID;
+        _column_mapper = reader::TableColumnMapper(mapper_options);
+        RETURN_IF_ERROR(_column_mapper.create_mapping(_iceberg_schema, 
file_schema, &_mappings));
+
+        reader::FileScanRequest file_request;
+        
RETURN_IF_ERROR(_column_mapper.create_scan_request(_table_scan_request, 
_mappings,
+                                                           &file_request));
+        RETURN_IF_ERROR(apply_position_deletes(&file_request));
+        if (_data_reader) {
+            RETURN_IF_ERROR(_data_reader->init(file_request));
+        }
+        return Status::OK();
+    }
+
+    Status next(Block* table_block, size_t* rows, bool* eof) override {
+        // 真实实现会读取 file-local block,finalize 成 table block,再应用 equality delete
+        // 和 Iceberg virtual columns。stub 默认 EOF。
+        // 后续实现应在 IcebergTableReader 内部持有 file-local block;这里仅复用输出指针
+        // 作为 header-only API 占位,避免在骨架阶段引入 Block 的完整定义。
+        Block* file_block = table_block;
+        if (_data_reader) {
+            RETURN_IF_ERROR(_data_reader->next(file_block, rows, eof));
+        }
+        RETURN_IF_ERROR(finalize_chunk(file_block, table_block));
+        RETURN_IF_ERROR(apply_equality_deletes(table_block));
+        RETURN_IF_ERROR(materialize_virtual_columns(table_block, rows != 
nullptr ? *rows : 0));
+        return Status::OK();
+    }
+
+    Status finalize_chunk(Block* file_block, Block* table_block) {
+        // 真实实现会根据 ColumnMapping 执行 finalize_expr/default/partition/generated
+        // expressions,把 file-local block 写成 table block。
+        (void)file_block;
+        (void)table_block;
+        return Status::OK();
+    }
+
+    Status apply_position_deletes(reader::FileScanRequest* request) {
+        // 真实实现会把 position delete / deletion vector 转换成 file-local delete 信息。
+        (void)request;
+        return Status::OK();
+    }
+
+    Status apply_equality_deletes(Block* table_block) {
+        // 真实实现会在 table block 上应用 equality delete。
+        (void)table_block;
+        return Status::OK();
+    }
+
+    Status materialize_virtual_columns(Block* table_block, size_t rows) {
+        // 真实实现会物化 _row_id、_last_updated_sequence_number 等 Iceberg 虚拟列。
+        (void)table_block;
+        (void)rows;
+        return Status::OK();
+    }
+
+    Status close() override {
+        if (_data_reader) {
+            RETURN_IF_ERROR(_data_reader->close());
+        }
+        _data_reader.reset();
+        return Status::OK();
+    }
+
+private:
+    IcebergReadOptions _iceberg_options;
+    IcebergScanTask _scan_task;
+    reader::TableScanRequest _table_scan_request;
+    std::vector<reader::TableColumn> _iceberg_schema;
+    std::vector<reader::ColumnMapping> _mappings;
+    reader::TableColumnMapper _column_mapper;
+    std::unique_ptr<reader::FileReader> _data_reader;
+};
+
+} // namespace doris::iceberg
diff --git a/docs/doris-iceberg-parquet-api-design.md 
b/docs/doris-iceberg-parquet-api-design.md
new file mode 100644
index 00000000000..58036667d44
--- /dev/null
+++ b/docs/doris-iceberg-parquet-api-design.md
@@ -0,0 +1,483 @@
+# Doris Iceberg + Parquet 新架构 API 设计
+
+本文档用于描述 Doris 中 Iceberg + Parquet 新架构的 API 设计。本文档作为后续从
+`master` 新开重构分支时的起点,只定义 API 形状、职责边界、依赖方向和兼容原则,
+不定义函数实现细节,不提供伪代码,不包含迁移 patch。
+
+## 架构总览
+
+目标架构包含 table 调度层、表格式语义层、schema 映射层、文件通用层和文件格式实现层:
+
+```text
+FileScanner / split producer
+    ->
+TableReader
+    ->
+IcebergTableReader
+    ->
+TableColumnMapper + FileReader
+    ->
+ParquetReader
+```
+
+核心职责如下:
+
+- `TableReader`
+  负责多文件、多 split 的上层调度,统一 scan 生命周期,对外输出 table block,
+  并承接动态分区裁剪等 table-level 通用逻辑。
+- `IcebergTableReader`
+  负责 Iceberg 表语义,包括 schema 绑定、scan task、delete file、虚拟列和 table
+  block finalize。
+- `TableColumnMapper`
+  负责 table schema 到 file schema 的映射,负责 filter localization 和 schema
+  change 映射。
+- `FileReader`
+  负责文件层通用读取接口,只理解 file-local schema 和 file-local scan request。
+- `ParquetReader`
+  作为 `FileReader` 的 Parquet 实现,负责 Parquet 文件物理读取。
+
+依赖方向必须保持单向:
+
+```text
+TableReader
+  -> IcebergTableReader
+    -> TableColumnMapper
+    -> FileReader
+      -> ParquetReader
+```
+
+低层不反向理解高层语义,尤其 `ParquetReader` 不得反向理解 Iceberg/global schema。
+
+## 核心 API 设计
+
+### TableReader
+
+`TableReader` 是最上层读取接口,作为 `IcebergTableReader` 的基类,负责多 split /
+多 file 调度,并承接 table-level 的通用裁剪逻辑,不下沉文件格式语义。
+
+实际 API 文件:
+
+```text
+be/src/format/reader/table_reader.h
+```
+
+实际命名空间:
+
+```cpp
+namespace doris::reader
+```
+
+建议职责:
+
+- 接收 split 列表或 scan task 列表;
+- 控制当前 reader 的创建、切换和关闭;
+- 管理 scan 生命周期;
+- 承接动态分区裁剪等 table-level 通用过滤逻辑;
+- 对外统一输出 table block。
+
+建议接口形状:
+
+```cpp
+namespace doris::reader {
+
+class TableReader {
+public:
+    virtual ~TableReader() = default;
+
+    virtual Status init(const TableReadOptions& options);
+    virtual Status filter(const VExprContextSPtr& expr, bool* can_filter_all);
+    virtual Status next_reader();
+    virtual Status next(Block* table_block, size_t* rows, bool* eof);
+    virtual Status close();
+};
+
+} // namespace doris::reader
+```
+
+接口约束:
+
+- `TableReader` 输出的是 table block,不输出 file-local block。
+- `TableReader` 负责多文件编排和 table-level 通用裁剪,不负责 schema mapping,不负责
+  Parquet 物理解码。
+- 动态分区裁剪这类逻辑应下放到 `TableReader`,而不是散落在具体表格式 reader 中。
+- `TableReader` 不直接依赖旧 `vparquet` 表层语义。
+
+### IcebergTableReader
+
+`IcebergTableReader` 是 Iceberg 表语义层,负责把单个 Iceberg data file 的读取组织成
+table 语义输出。
+
+实际 API 文件:
+
+```text
+be/src/format/table/iceberg_reader_v2.h
+```
+
+实际命名空间:
+
+```cpp
+namespace doris::iceberg
+```
+
+建议职责:
+
+- 绑定 Iceberg 当前 table schema;
+- 接收 `IcebergScanTask`;
+- 处理 position delete、equality delete、deletion vector;
+- 物化 `_row_id`、`_last_updated_sequence_number` 等虚拟列;
+- 将 `ParquetReader` 返回的 file-local block finalize 成 table block。
+
+建议接口形状:
+
+```cpp
+namespace doris::iceberg {
+
+class IcebergTableReader : public reader::TableReader {
+public:
+    virtual ~IcebergTableReader() = default;
+
+    Status init(const IcebergReadOptions& options,
+                std::unique_ptr<reader::FileReader> data_reader);
+    Status bind(const std::vector<reader::TableColumn>& iceberg_schema);
+    Status init(const reader::TableScanRequest& request);
+    Status open_task(const IcebergScanTask& task);
+    Status next(Block* table_block, size_t* rows, bool* eof) override;
+    Status close() override;
+};
+
+} // namespace doris::iceberg
+```
+
+接口约束:
+
+- `IcebergTableReader` 继承 `TableReader`,并通过组合使用 `FileReader`。
+- `IcebergTableReader` 不做 Parquet page/column 解码。
+- `IcebergTableReader` 负责 table-level finalize,不负责 file-local pruning 实现。
+
+### TableColumnMapper
+
+`TableColumnMapper` 是 table schema 到 file schema 的通用映射层,不是
+Iceberg-only 组件。
+
+实际 API 文件:
+
+```text
+be/src/format/reader/table_reader.h
+```
+
+实际命名空间:
+
+```cpp
+namespace doris::reader
+```
+
+建议职责:
+
+- 输入 table schema、file schema、table scan request;
+- 输出 `ColumnMapping` 和通用 `FileScanRequest`;
+- 负责 filter localization;
+- 负责 schema change 映射;
+- 负责复杂列 child mapping;
+- 负责缺失列、default、partition、generated 列的 finalize 语义描述。
+
+建议接口形状:
+
+```cpp
+namespace doris::reader {
+
+class TableColumnMapper {
+public:
+    explicit TableColumnMapper(TableColumnMapperOptions options = {});
+
+    virtual Status create_mapping(const std::vector<TableColumn>& table_schema,
+                                  const std::vector<SchemaField>& file_schema,
+                                  std::vector<ColumnMapping>* mappings);
+
+    virtual Status create_scan_request(const TableScanRequest& table_request,
+                                       const std::vector<ColumnMapping>& 
mappings,
+                                       FileScanRequest* file_request);
+};
+
+} // namespace doris::reader
+```
+
+接口约束:
+
+- `TableColumnMapper` 的输入是 table schema + file schema + table scan request。
+- `TableColumnMapper` 的输出是 `ColumnMapping` + `FileScanRequest`。
+- `TableColumnMapper` 必须是通用层,不做 Iceberg-only 命名。
+- Iceberg 场景默认按 field id 映射;按 name 映射不是本轮默认路径。
+
+### FileReader
+
+`FileReader` 是文件物理读取层的通用接口,为后续 Parquet 之外的文件格式适配预留。
+
+实际 API 文件:
+
+```text
+be/src/format/reader/file_reader.h
+```
+
+实际命名空间:
+
+```cpp
+namespace doris::reader
+```
+
+建议职责:
+
+- 打开物理文件;
+- 暴露 file-local schema;
+- 接收 `FileScanRequest`;
+- 输出 file-local block;
+- 不理解 table/global schema。
+
+建议接口形状:
+
+```cpp
+namespace doris::reader {
+
+class FileReader {
+public:
+    virtual ~FileReader() = default;
+
+    virtual Status open(io::FileReaderSPtr file, io::IOContext* io_ctx = 
nullptr);
+    virtual Status get_schema(std::vector<SchemaField>* file_schema) const;
+    virtual Status init(const FileScanRequest& request);
+    virtual Status next(Block* file_block, size_t* rows, bool* eof);
+    virtual Status close();
+};
+
+} // namespace doris::reader
+```
+
+接口约束:
+
+- `FileReader` 输出的是 file-local block,不输出 table/global schema block。
+- `FileReader` 不处理 Iceberg schema evolution、default/generated/partition 列。
+- `IcebergTableReader` 组合 `FileReader`,不直接绑定具体文件格式 reader。
+
+### ParquetReader
+
+`ParquetReader` 是 `FileReader` 的 Parquet 实现,只负责 Parquet file-local schema
+和 Parquet file-local scan request。
+
+实际 API 文件:
+
+```text
+be/src/format/parquet/parquet_reader.h
+```
+
+实际命名空间:
+
+```cpp
+namespace doris::parquet
+```
+
+建议职责:
+
+- 打开 Parquet 文件;
+- 解析 footer 和 file schema;
+- 接收 `ParquetScanRequest` 或通用 `FileScanRequest`;
+- 执行 file-local projection 和 file-local filter;
+- 输出 file-local block。
+
+建议接口形状:
+
+```cpp
+namespace doris::parquet {
+
+class ParquetReader : public reader::FileReader {
+public:
+    virtual ~ParquetReader() = default;
+
+    virtual Status open(io::FileReaderSPtr file, io::IOContext* io_ctx = 
nullptr);
+    virtual Status get_schema(std::vector<reader::SchemaField>* file_schema) 
const;
+    virtual Status init(const ParquetScanRequest& request);
+    virtual Status next(Block* file_block, size_t* rows, bool* eof);
+    virtual Status close();
+};
+
+} // namespace doris::parquet
+```
+
+接口约束:
+
+- `ParquetReader` 输出的是 file-local block,不输出 table/global schema block。
+- `ParquetReader` 不理解 Iceberg schema evolution。
+- `ParquetReader` 不负责 default/generated/partition 列。
+- 任何 table-level cast/default/generated/partition 语义都不能重新塞回
+  `ParquetReader`。
+
+## 关键类型
+
+### SchemaField
+
+`SchemaField` 表示文件层 schema 中的列定义。
+
+建议包含的信息:
+
+- file-local column id;
+- 列名;
+- 类型;
+- child fields。
+
+它服务于 `TableColumnMapper` 做 schema matching,不携带 table-level 语义。
+
+### TableColumn
+
+`TableColumn` 表示 table/global schema 中的列定义。
+
+建议包含的信息:
+
+- table column id;
+- 列名;
+- 类型;
+- child columns。
+
+Iceberg 场景下,column id 默认对应 field id。
+
+### TableFilter
+
+`TableFilter` 表示 table 层过滤条件。
+
+建议包含的信息:
+
+- `table_column_id`
+- `conjunct`
+- `predicates`
+
+职责约束:
+
+- `conjunct` 偏表达式过滤,适合表达 cast、复杂表达式、复杂列提取等语义;
+- `predicates` 偏结构化单列下推,适合驱动 row group stats、page index、dictionary、
+  bloom filter 等文件层优化。
+
+### FileLocalFilter
+
+`FileLocalFilter` 表示已经 localize 到 file-local schema 的过滤条件。
+
+建议包含的信息:
+
+- `file_column_id`
+- `conjunct`
+- `predicates`
+
+职责约束:
+
+- `conjunct` 用于 file-local 表达式过滤;
+- `predicates` 用于 file-local 结构化下推;
+- 其输入必须来自 `TableColumnMapper`,不能由具体文件 reader 自己推导 table 语义。
+
+### ColumnMapping
+
+`ColumnMapping` 是 table schema 与 file schema 之间的核心边界对象。
+
+建议包含的信息:
+
+- `table_column_id`
+- `file_column_id`
+- `file_type`
+- `table_type`
+- `finalize_expr`
+- `reader_filter_expr`
+- `child_mappings`
+
+职责约束:
+
+- `finalize_expr` 服务最终输出,把 file-local value 转成 table/global value;
+- `reader_filter_expr` 服务读时 filter fallback;
+- 二者语义不同,不能混用;
+- `child_mappings` 用于复杂列 remap、复杂列裁剪和复杂列 schema change。
+
+### TableScanRequest
+
+`TableScanRequest` 描述 table 层 scan 请求。
+
+建议包含的信息:
+
+- projected table columns;
+- table filters。
+
+它由 `IcebergTableReader` 接收,再交给 `TableColumnMapper` 生成 file-local request。
+
+### ParquetScanRequest
+
+`ParquetScanRequest` 继承 `FileScanRequest`,描述 Parquet file-local scan 请求。
+
+### FileScanRequest
+
+`FileScanRequest` 描述通用 file-local scan 请求。
+
+建议包含的信息:
+
+- projected file columns;
+- local filters;
+- reader expression map。
+
+它是 `FileReader` 的唯一 scan 输入,不包含 table/global schema 语义。
+
+### IcebergScanTask
+
+`IcebergScanTask` 表示一次 Iceberg data file 读取任务。
+
+建议包含的信息:
+
+- data file 信息;
+- position delete 文件;
+- equality delete 文件;
+- deletion vector 信息。
+
+它是 `IcebergTableReader` 的输入,不应直接传给 `ParquetReader`。
+
+## 设计原则
+
+### 边界原则
+
+- `FileReader` 不理解 global schema,不直接处理 Iceberg schema evolution。
+- `ParquetReader` 是 `FileReader` 的 Parquet 实现。
+- `TableColumnMapper` 是 schema mapping 和 filter localization 的唯一入口。
+- `IcebergTableReader` 不做 Parquet 解码,只负责 table-level finalize、delete、
+  virtual columns。
+- `TableReader` 只负责多文件编排和 table-level 通用裁剪,不下沉文件格式语义。
+- 任何 table-level cast/default/generated/partition 语义都不能重新塞回
+  `ParquetReader`。
+
+### 依赖原则
+
+- 低层不能反向依赖高层语义。
+- `FileReader` 只依赖 file-local request。
+- `IcebergTableReader` 继承 `TableReader`,复用其多文件编排和通用裁剪能力。
+- `IcebergTableReader` 通过组合使用 `FileReader`。
+- `TableColumnMapper` 可以被 Iceberg 之外的其他表格式复用。
+
+### 命名原则
+
+- 表层抽象使用 `TableReader`、`IcebergTableReader`、`TableColumnMapper`、
+  `FileReader`、`ParquetReader` 命名。
+- `TableColumnMapper` 不使用 Iceberg-only 命名。
+- file schema 类型使用 `SchemaField`,table schema 类型使用 `TableColumn`。
+
+## 兼容原则
+
+新架构重构期间,新旧代码允许并存,但必须遵守以下约束:
+
+- 旧 `vparquet` / Hive / Hudi / Paimon 路径在新架构稳定前允许保留。
+- 新架构实现不得继续向旧 `vparquet` 表层语义回灌依赖。
+- 先搭新框架 API,再逐步迁移调用点。
+- 不允许边改 API 边混入临时裸逻辑、实验性草稿或未收敛命名。
+- 兼容层可能需要存在,但本文档不定义兼容层的具体实现方案。
+
+## 验收标准
+
+该文档应满足以下目标:
+
+- 不引用错误实验代码作为既成事实;
+- 不出现实现性草稿、裸伪代码、未收敛命名混用;
+- 让另一个工程师从 `master` 新开分支时,可以直接按本文档搭 API 骨架;
+- 读完文档后,不需要再讨论以下问题:
+  - 新架构分几层;
+  - 每层负责什么;
+  - 哪层理解 global schema;
+  - 哪层做 schema change / filter localization / finalize;
+  - 哪层允许依赖旧实现,哪层不允许。


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to