This is an automated email from the ASF dual-hosted git repository.

Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/refact_reader_branch by this 
push:
     new e5d17b881ba refactor table reader (#63397)
e5d17b881ba is described below

commit e5d17b881ba4049948a9370db285eaf3f280ed38
Author: Gabriel <[email protected]>
AuthorDate: Tue May 19 16:32:59 2026 +0800

    refactor table reader (#63397)
---
 be/src/exec/scan/file_scanner.cpp          |   1 -
 be/src/exec/scan/file_scanner.h            |   4 +-
 be/src/exprs/vliteral.cpp                  |   6 -
 be/src/exprs/vliteral.h                    |   8 +-
 be/src/exprs/vslot_ref.h                   |   4 +
 be/src/format/reader/column_mapper.cpp     | 137 ++++++++++++++++
 be/src/format/reader/column_mapper.h       | 124 ++++++++++++++
 be/src/format/reader/expr/literal.h        |  35 ++++
 be/src/format/reader/expr/slot_ref.h       |  39 +++++
 be/src/format/reader/file_reader.h         |  27 ++-
 be/src/format/reader/table_reader.h        | 255 ++++++++---------------------
 be/src/format/table/iceberg_reader_mixin.h |   3 -
 be/src/format/table/iceberg_reader_v2.h    |  20 ---
 13 files changed, 440 insertions(+), 223 deletions(-)

diff --git a/be/src/exec/scan/file_scanner.cpp 
b/be/src/exec/scan/file_scanner.cpp
index 5f1d248c1e1..0ba7266456e 100644
--- a/be/src/exec/scan/file_scanner.cpp
+++ b/be/src/exec/scan/file_scanner.cpp
@@ -1791,7 +1791,6 @@ Status FileScanner::_init_expr_ctxes() {
         if (is_file_slot) {
             _is_file_slot.emplace(slot_id);
             _file_slot_descs.emplace_back(it->second);
-            _file_col_names.push_back(it->second->col_name());
         }
 
         _column_descs.push_back(col_desc);
diff --git a/be/src/exec/scan/file_scanner.h b/be/src/exec/scan/file_scanner.h
index cd4066ec987..7c3d9d08b6a 100644
--- a/be/src/exec/scan/file_scanner.h
+++ b/be/src/exec/scan/file_scanner.h
@@ -133,8 +133,6 @@ protected:
     bool _cur_reader_eof = false;
     // File source slot descriptors
     std::vector<SlotDescriptor*> _file_slot_descs;
-    // col names from _file_slot_descs
-    std::vector<std::string> _file_col_names;
     // Unified column descriptors for init_reader (includes file, partition, 
missing, synthesized cols)
     std::vector<ColumnDescriptor> _column_descs;
 
@@ -147,6 +145,7 @@ protected:
     // dest slot name to index in _dest_vexpr_ctx;
     std::unordered_map<std::string, int> _dest_slot_name_to_idx;
     // col name to default value expr
+    // TODO: only used by json reader. Could we delete this?
     std::unordered_map<std::string, VExprContextSPtr> _col_default_value_ctx;
     // the map values of dest slot id to src slot desc
     // if there is not key of dest slot id in 
dest_sid_to_src_sid_without_trans, it will be set to nullptr
@@ -193,7 +192,6 @@ protected:
     std::unique_ptr<io::IOContext> _io_ctx;
 
     // Whether to fill partition columns from path, default is true.
-    bool _fill_partition_from_path = true;
     std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>
             _partition_col_descs;
     std::unordered_map<std::string, bool> _partition_value_is_null;
diff --git a/be/src/exprs/vliteral.cpp b/be/src/exprs/vliteral.cpp
index 551839f699e..9b93d709727 100644
--- a/be/src/exprs/vliteral.cpp
+++ b/be/src/exprs/vliteral.cpp
@@ -37,12 +37,6 @@ namespace doris {
 
 class VExprContext;
 
-void VLiteral::init(const TExprNode& node) {
-    Field field;
-    field = _data_type->get_field(node);
-    _column_ptr = _data_type->create_column_const(1, field);
-}
-
 Status VLiteral::prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) {
     RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
     return Status::OK();
diff --git a/be/src/exprs/vliteral.h b/be/src/exprs/vliteral.h
index b1b8e89157d..e5a4c7a5f3d 100644
--- a/be/src/exprs/vliteral.h
+++ b/be/src/exprs/vliteral.h
@@ -39,7 +39,9 @@ public:
     VLiteral(const TExprNode& node, bool should_init = true)
             : VExpr(node), _expr_name(_data_type->get_name()) {
         if (should_init) {
-            init(node);
+            Field field;
+            field = _data_type->get_field(node);
+            _column_ptr = _data_type->create_column_const(1, field);
         }
     }
 
@@ -69,11 +71,9 @@ public:
     uint64_t get_digest(uint64_t seed) const override;
 
 protected:
+    VLiteral(const DataTypePtr& type) : VExpr(type, false) {}
     ColumnPtr _column_ptr;
     std::string _expr_name;
-
-private:
-    void init(const TExprNode& node);
 };
 
 } // namespace doris
diff --git a/be/src/exprs/vslot_ref.h b/be/src/exprs/vslot_ref.h
index 21b5735753b..3ac9f641c19 100644
--- a/be/src/exprs/vslot_ref.h
+++ b/be/src/exprs/vslot_ref.h
@@ -73,6 +73,10 @@ public:
 
     double execute_cost() const override { return 0.0; }
 
+protected:
+    VSlotRef(int slot_id, int column_id, int column_uniq_id)
+            : _slot_id(slot_id), _column_id(column_id), 
_column_uniq_id(column_uniq_id) {}
+
 private:
     int _slot_id;
     int _column_id;
diff --git a/be/src/format/reader/column_mapper.cpp 
b/be/src/format/reader/column_mapper.cpp
new file mode 100644
index 00000000000..7006365b054
--- /dev/null
+++ b/be/src/format/reader/column_mapper.cpp
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "format/reader/column_mapper.h"
+
+#include <vector>
+
+#include "common/status.h"
+#include "expr/slot_ref.h"
+#include "format/reader/file_reader.h"
+#include "format/reader/table_reader.h"
+
+namespace doris::reader {
+
+Status TableColumnMapper::create_mapping(const std::vector<TableColumn>& 
projected_columns,
+                                         std::vector<SchemaField> block_schema,
+                                         const std::map<std::string, Field>& 
partition_values,
+                                         const std::vector<SchemaField>& 
file_schema) {
+    // 真实实现会做 field id/name matching、类型转换、复杂列 child mapping、缺失列
+    // default/partition/generated 表达式构造。
+    _mappings.clear();
+    block_schema.clear();
+    for (const auto& table_column : projected_columns) {
+        ColumnMapping mapping;
+        mapping.table_column_id = table_column.id;
+        mapping.table_type = table_column.type;
+        if (const auto* file_field = _find_file_field(table_column, 
file_schema)) {
+            mapping.file_column_id = file_field->id;
+            mapping.file_type = file_field->type;
+            mapping.is_trivial = _is_same_type(mapping.table_type, 
mapping.file_type);
+            if (!mapping.is_trivial) {
+                // TODO:
+                return Status::NotSupported(
+                        "column mapping with type conversion is not supported 
yet: table column "
+                        "'{}' (id={}, type={}) vs file column (id={}, 
type={})",
+                        table_column.name, mapping.table_column_id, 
mapping.table_type->get_name(),
+                        mapping.file_column_id.value(), 
mapping.file_type->get_name());
+            } else {
+                mapping.projection = 
VExprContext::create_shared(TableSlotRef::create_shared(
+                        *mapping.file_column_id, block_schema.size(), -1, 
mapping.table_type));
+            }
+            block_schema.push_back(SchemaField {
+                    mapping.file_column_id.value(), table_column.name, 
mapping.table_type, {}});
+        } else if (table_column.default_expr != nullptr) {
+            mapping.is_constant = true;
+            mapping.default_expr = table_column.default_expr;
+        } else if (table_column.is_partition_key && 
partition_values.count(table_column.name) > 0) {
+            mapping.default_expr = 
VExprContext::create_shared(TableLiteral::create_shared(
+                    mapping.table_type, 
partition_values.at(table_column.name)));
+        } else {
+            if (table_column.is_partition_key) {
+                return Status::InvalidArgument(
+                        "Table column '%s' (id=%d) does not have a matching 
partition value",
+                        table_column.name);
+            }
+            if (!_options.allow_missing_columns) {
+                return Status::InvalidArgument(
+                        "Table column '%s' (id=%d) does not have a matching 
file column",
+                        table_column.name, table_column.id);
+            }
+        }
+        _mappings.push_back(std::move(mapping));
+    }
+    return Status::OK();
+}
+
+Status TableColumnMapper::create_scan_request(const std::map<int32_t, 
TableFilter>& table_filters,
+                                              const std::vector<TableColumn>& 
projected_columns,
+                                              FileScanRequest* file_request) {
+    // 真实实现会把 table projection/filter 转换成 file-local projection/filter。
+    file_request->predicate_columns.clear();
+    file_request->non_predicate_columns.clear();
+    file_request->local_filters.clear();
+    file_request->reader_expression_map.clear();
+    for (const auto& table_column : projected_columns) {
+        const auto* mapping = _find_mapping(table_column.id);
+        if (mapping != nullptr && mapping->file_column_id.has_value() &&
+            table_filters.count(table_column.id) == 0) {
+            
file_request->non_predicate_columns.push_back(*mapping->file_column_id);
+        }
+    }
+    RETURN_IF_ERROR(localize_filters(table_filters, file_request));
+    return Status::OK();
+}
+
+Status TableColumnMapper::localize_filters(const std::map<int32_t, 
TableFilter>& table_filters,
+                                           FileScanRequest* file_request) 
const {
+    // 真实实现会处理 trivial mapping、safe cast、reader expression fallback 和
+    // finalize-only filter。stub 只复制能够直接定位到 file column 的谓词。
+    for (const auto& it : table_filters) {
+        const auto* mapping = _find_mapping(it.first);
+        if (mapping == nullptr || !mapping->file_column_id.has_value()) {
+            continue;
+        }
+        if (!it.second.can_be_localized()) {
+            // TODO: Rewrite table filter to reader_expression_map
+            // 
file_request->reader_expression_map.emplace_back(mapping->table_column_id, 
it.second.conjunct);
+        } else {
+            FileLocalFilter local_filter;
+            local_filter.file_column_id = *mapping->file_column_id;
+            local_filter.conjunct = it.second.conjunct;
+            local_filter.predicates = it.second.predicates;
+            file_request->local_filters.push_back(std::move(local_filter));
+        }
+        file_request->predicate_columns.push_back(*mapping->file_column_id);
+    }
+    return Status::OK();
+}
+
+const SchemaField* TableColumnMapper::_find_file_field(
+        const TableColumn& table_column, const std::vector<SchemaField>& 
file_schema) const {
+    for (const auto& field : file_schema) {
+        if (_options.mode == TableColumnMappingMode::BY_FIELD_ID && field.id 
== table_column.id) {
+            return &field;
+        }
+        if (field.name == table_column.name) {
+            return &field;
+        }
+    }
+    return nullptr;
+}
+
+} // namespace doris::reader
diff --git a/be/src/format/reader/column_mapper.h 
b/be/src/format/reader/column_mapper.h
new file mode 100644
index 00000000000..4c6b510ff0e
--- /dev/null
+++ b/be/src/format/reader/column_mapper.h
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "common/status.h"
+#include "core/data_type/data_type.h"
+#include "exprs/vexpr_fwd.h"
+#include "format/reader/expr/literal.h"
+namespace doris::reader {
+
+struct TableColumn;
+struct TableFilter;
+struct SchemaField;
+struct FileScanRequest;
+
+enum class TableColumnMappingMode {
+    BY_FIELD_ID,
+    BY_NAME,
+};
+
+// 单个 table column 到 file column 的映射结果。
+// 这是 table 层和 file 层的核心边界对象。
+struct ColumnMapping {
+    int32_t table_column_id = -1;
+    std::optional<int32_t> file_column_id;
+    DataTypePtr file_type;
+    DataTypePtr table_type;
+
+    // 最终输出表达式。用于把 file-local value 转成 table/global value,例如 cast、
+    // default、partition、generated column 或复杂列 remap。
+    VExprContextSPtr projection;
+
+    // 读时过滤 fallback 表达式。只在 table filter 不能安全转换成 file-local predicate
+    // 时使用,服务 reader_expression_map,不等价于 finalize_expr。
+    VExprContextSPtr reader_filter_expr;
+
+    std::vector<ColumnMapping> child_mappings;
+    bool is_trivial = false;
+    bool is_constant = false;
+    VExprContextSPtr default_expr;
+};
+
+struct TableColumnMapperOptions {
+    TableColumnMappingMode mode = TableColumnMappingMode::BY_FIELD_ID;
+    bool allow_missing_columns = true;
+    bool enable_reader_expression_fallback = true;
+};
+
+// 通用 table schema 到 file schema 映射层。
+// Iceberg 会使用 BY_FIELD_ID;普通 by-name 场景可以复用该组件,但不应把它命名成
+// Iceberg-only 组件。
+class TableColumnMapper {
+public:
+    explicit TableColumnMapper(TableColumnMapperOptions options = {})
+            : _options(std::move(options)) {}
+    virtual ~TableColumnMapper() = default;
+
+    // 建立 table schema 到 file schema 的列映射。
+    // 输出的 ColumnMapping 描述 table column 如何从 file column、常量列或表达式得到;
+    // 后续 projection、filter localization 和 table block finalize 都应复用这份映射。
+    virtual Status create_mapping(const std::vector<TableColumn>& 
projected_columns,
+                                  std::vector<SchemaField> block_schema,
+                                  const std::map<std::string, Field>& 
partition_values,
+                                  const std::vector<SchemaField>& file_schema);
+
+    // 把 table-level scan 请求转换成 file-local scan 请求。
+    // table_request 使用 table/global schema;file_request 只包含 FileReader 能理解的
+    // projected_file_columns、local_filters 和 reader_expression_map。
+    virtual Status create_scan_request(const std::map<int32_t, TableFilter>& 
table_filters,
+                                       const std::vector<TableColumn>& 
projected_columns,
+                                       FileScanRequest* file_request);
+
+    // 将 table-level filter 定位到文件 schema。
+    // trivial mapping 可以直接复制结构化谓词;类型变化时可以尝试安全 cast;无法安全
+    // 下推的表达式应通过 reader_expression_map 或 table-level finalize/filter fallback 
处理。
+    virtual Status localize_filters(const std::map<int32_t, TableFilter>& 
table_filters,
+                                    FileScanRequest* file_request) const;
+    void clear() { _mappings.clear(); }
+    const std::vector<ColumnMapping>& mappings() const { return _mappings; }
+
+private:
+    const SchemaField* _find_file_field(const TableColumn& table_column,
+                                        const std::vector<SchemaField>& 
file_schema) const;
+
+    const ColumnMapping* _find_mapping(ColumnId table_column_id) const {
+        for (const auto& mapping : _mappings) {
+            if (mapping.table_column_id == table_column_id) {
+                return &mapping;
+            }
+        }
+        return nullptr;
+    }
+
+    bool _is_same_type(const DataTypePtr& table_type, const DataTypePtr& 
file_type) const {
+        return table_type == file_type;
+    }
+
+    TableColumnMapperOptions _options;
+    std::vector<ColumnMapping> _mappings;
+};
+
+} // namespace doris::reader
diff --git a/be/src/format/reader/expr/literal.h 
b/be/src/format/reader/expr/literal.h
new file mode 100644
index 00000000000..9c4202994ee
--- /dev/null
+++ b/be/src/format/reader/expr/literal.h
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "core/data_type/data_type.h"
+#include "exprs/vliteral.h"
+
+namespace doris {
+
+class TableLiteral : public VLiteral {
+    ENABLE_FACTORY_CREATOR(TableLiteral);
+
+public:
+    TableLiteral(const DataTypePtr& type, const Field& field) : VLiteral(type) 
{
+        _data_type = type;
+        _column_ptr = _data_type->create_column_const(1, field);
+    }
+};
+
+} // namespace doris
diff --git a/be/src/format/reader/expr/slot_ref.h 
b/be/src/format/reader/expr/slot_ref.h
new file mode 100644
index 00000000000..6b5d027602e
--- /dev/null
+++ b/be/src/format/reader/expr/slot_ref.h
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "core/data_type/data_type.h"
+#include "exprs/vslot_ref.h"
+
+namespace doris {
+
+class TableSlotRef : public VSlotRef {
+    ENABLE_FACTORY_CREATOR(TableSlotRef);
+
+public:
+    TableSlotRef(int slot_id, int column_id, int column_uniq_id, const 
DataTypePtr& type)
+            : VSlotRef(slot_id, column_id, column_uniq_id) {
+        _data_type = type;
+    }
+
+    Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override {
+        return Status::OK();
+    }
+};
+
+} // namespace doris
diff --git a/be/src/format/reader/file_reader.h 
b/be/src/format/reader/file_reader.h
index e9fb0f3c963..edebdcaff42 100644
--- a/be/src/format/reader/file_reader.h
+++ b/be/src/format/reader/file_reader.h
@@ -47,7 +47,7 @@ using ColumnId = int32_t;
 // schema 语义。Iceberg field id、name mapping、default/generated/partition 列都不在
 // FileReader 内部解释。
 struct SchemaField {
-    ColumnId id = -1;
+    int32_t id = -1;
     std::string name;
     DataTypePtr type;
     std::vector<SchemaField> children;
@@ -65,9 +65,16 @@ struct FileLocalFilter {
 
     // 结构化列谓词。适合文件层 pruning,例如 min/max、page index、dictionary、
     // bloom filter 等只理解单列谓词的优化。
+    // TODO: conjunct 支持表达所有 filter 语义之后删除。
     std::vector<std::shared_ptr<ColumnPredicate>> predicates;
 };
 
+enum class FileFormat {
+    PARQUET,
+    ORC,
+    CSV,
+};
+
 // 通用文件层 scan 请求。
 // 该结构描述所有文件格式都可以共享的 file-local 读取输入。这里不出现 table/global
 // schema。所有 schema change、filter localization、default/generated/partition
@@ -75,9 +82,23 @@ struct FileLocalFilter {
 struct FileScanRequest {
     virtual ~FileScanRequest() = default;
 
-    std::vector<ColumnId> projected_file_columns;
+    std::vector<ColumnId> predicate_columns;
+    std::vector<ColumnId> non_predicate_columns;
     std::vector<FileLocalFilter> local_filters;
+    // fallback path if filters cannot be localized to file-local predicates. 
The expression can reference projected_file_columns and partition columns.
     std::vector<std::pair<ColumnId, VExprContextSPtr>> reader_expression_map;
+    // partition key -> value
+    std::map<std::string, Field> partition_values;
+
+    // projected_columns' id is file-local column id, and they are all from 
file schema.
+    // For example,
+    // file schema: [0: id (int), 1: name (string), 2: age (int)]
+    // predicate: age > 30
+    // table-level projection: [name, id]
+    // predicate_columns: [2]
+    // non_predicate_columns: [1, 0]
+    // projected_columns are columns in blocks returned to table reader: [1, 
0] means only name and id are projected,
+    std::vector<ColumnId> projected_columns;
 };
 
 // 文件物理读取层通用接口。
@@ -113,7 +134,7 @@ public:
     virtual Status init(const FileScanRequest& request) {
         // 真实实现会根据 projected columns、local filters 和 reader expressions
         // 初始化文件格式自己的物理读取计划。
-        _request.projected_file_columns = request.projected_file_columns;
+        // _request.projected_file_columns = request.projected_file_columns;
         _request.local_filters = request.local_filters;
         _request.reader_expression_map = request.reader_expression_map;
         return Status::OK();
diff --git a/be/src/format/reader/table_reader.h 
b/be/src/format/reader/table_reader.h
index 99dcc507e5d..4d8fe0620c8 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -30,7 +30,10 @@
 #include "common/status.h"
 #include "core/block/block.h"
 #include "core/data_type/data_type.h"
+#include "exprs/vexpr_context.h"
 #include "exprs/vexpr_fwd.h"
+#include "format/reader/column_mapper.h"
+#include "format/reader/expr/literal.h"
 #include "format/reader/file_reader.h"
 
 namespace doris {
@@ -47,45 +50,22 @@ struct TableColumn {
     std::string name;
     DataTypePtr type;
     std::vector<TableColumn> children;
+    VExprContextSPtr default_expr;
+    bool is_partition_key = false;
 };
 
 // table-level filter。
 // TableColumnMapper 负责把它转换成 FileLocalFilter 或 reader_expression_map。
 struct TableFilter {
-    ColumnId table_column_id = -1;
-
     // 表达式过滤,适合表达 cast、复杂表达式、复杂列提取等语义。
     VExprContextSPtr conjunct;
 
     // 结构化列谓词,适合下推到文件层做 row group stats、page index、dictionary、
     // bloom filter 等优化。
+    // TODO: conjunct 支持表达所有 filter 语义之后删除。
     std::vector<std::shared_ptr<ColumnPredicate>> predicates;
-};
-
-// 单个 table column 到 file column 的映射结果。
-// 这是 table 层和 file 层的核心边界对象。
-struct ColumnMapping {
-    ColumnId table_column_id = -1;
-    std::optional<ColumnId> file_column_id;
-    DataTypePtr file_type;
-    DataTypePtr table_type;
-
-    // 最终输出表达式。用于把 file-local value 转成 table/global value,例如 cast、
-    // default、partition、generated column 或复杂列 remap。
-    VExprContextSPtr finalize_expr;
-
-    // 读时过滤 fallback 表达式。只在 table filter 不能安全转换成 file-local predicate
-    // 时使用,服务 reader_expression_map,不等价于 finalize_expr。
-    VExprContextSPtr reader_filter_expr;
-
-    std::vector<ColumnMapping> child_mappings;
-    bool is_trivial = false;
-    bool is_constant = false;
-};
 
-enum class TableColumnMappingMode {
-    BY_FIELD_ID,
-    BY_NAME,
+    bool can_be_localized() const { return true; }
 };
 
 enum class TableFilterConversion {
@@ -95,132 +75,6 @@ enum class TableFilterConversion {
     FINALIZE_ONLY,
 };
 
-struct TableColumnMapperOptions {
-    TableColumnMappingMode mode = TableColumnMappingMode::BY_FIELD_ID;
-    bool allow_missing_columns = true;
-    bool enable_reader_expression_fallback = true;
-};
-
-// table-level scan 请求。
-// 它仍然使用 table/global schema 语义,不能直接传给 FileReader。
-struct TableScanRequest {
-    std::vector<TableColumn> projected_table_columns;
-    std::vector<TableFilter> table_filters;
-};
-
-// 通用 table schema 到 file schema 映射层。
-// Iceberg 会使用 BY_FIELD_ID;普通 by-name 场景可以复用该组件,但不应把它命名成
-// Iceberg-only 组件。
-class TableColumnMapper {
-public:
-    explicit TableColumnMapper(TableColumnMapperOptions options = {})
-            : _options(std::move(options)) {}
-    virtual ~TableColumnMapper() = default;
-
-    // 建立 table schema 到 file schema 的列映射。
-    // 输出的 ColumnMapping 描述 table column 如何从 file column、常量列或表达式得到;
-    // 后续 projection、filter localization 和 table block finalize 都应复用这份映射。
-    virtual Status create_mapping(const std::vector<TableColumn>& table_schema,
-                                  const std::vector<SchemaField>& file_schema,
-                                  std::vector<ColumnMapping>* mappings) {
-        // 真实实现会做 field id/name matching、类型转换、复杂列 child mapping、缺失列
-        // default/partition/generated 表达式构造。
-        mappings->clear();
-        for (const auto& table_column : table_schema) {
-            ColumnMapping mapping;
-            mapping.table_column_id = table_column.id;
-            mapping.table_type = table_column.type;
-            if (const auto* file_field = find_file_field(table_column, 
file_schema)) {
-                mapping.file_column_id = file_field->id;
-                mapping.file_type = file_field->type;
-                mapping.is_trivial = is_same_type(mapping.table_type, 
mapping.file_type);
-            } else {
-                mapping.is_constant = true;
-            }
-            mappings->push_back(std::move(mapping));
-        }
-        _mappings = *mappings;
-        return Status::OK();
-    }
-
-    // 把 table-level scan 请求转换成 file-local scan 请求。
-    // table_request 使用 table/global schema;file_request 只包含 FileReader 能理解的
-    // projected_file_columns、local_filters 和 reader_expression_map。
-    virtual Status create_scan_request(const TableScanRequest& table_request,
-                                       const std::vector<ColumnMapping>& 
mappings,
-                                       FileScanRequest* file_request) {
-        // 真实实现会把 table projection/filter 转换成 file-local projection/filter。
-        file_request->projected_file_columns.clear();
-        file_request->local_filters.clear();
-        file_request->reader_expression_map.clear();
-        _mappings = mappings;
-        for (const auto& table_column : table_request.projected_table_columns) 
{
-            const auto* mapping = find_mapping(table_column.id);
-            if (mapping != nullptr && mapping->file_column_id.has_value()) {
-                
file_request->projected_file_columns.push_back(*mapping->file_column_id);
-            }
-        }
-        RETURN_IF_ERROR(localize_filters(table_request.table_filters, 
file_request));
-        return Status::OK();
-    }
-
-    // 将 table-level filter 定位到文件 schema。
-    // trivial mapping 可以直接复制结构化谓词;类型变化时可以尝试安全 cast;无法安全
-    // 下推的表达式应通过 reader_expression_map 或 table-level finalize/filter fallback 
处理。
-    virtual Status localize_filters(const std::vector<TableFilter>& 
table_filters,
-                                    FileScanRequest* file_request) const {
-        // 真实实现会处理 trivial mapping、safe cast、reader expression fallback 和
-        // finalize-only filter。stub 只复制能够直接定位到 file column 的谓词。
-        for (const auto& filter : table_filters) {
-            const auto* mapping = find_mapping(filter.table_column_id);
-            if (mapping == nullptr || !mapping->file_column_id.has_value()) {
-                continue;
-            }
-            FileLocalFilter local_filter;
-            local_filter.file_column_id = *mapping->file_column_id;
-            local_filter.conjunct = filter.conjunct;
-            local_filter.predicates = filter.predicates;
-            file_request->local_filters.push_back(std::move(local_filter));
-        }
-        return Status::OK();
-    }
-
-    const std::vector<ColumnMapping>& mappings() const { return _mappings; }
-
-private:
-    const SchemaField* find_file_field(const TableColumn& table_column,
-                                       const std::vector<SchemaField>& 
file_schema) const {
-        for (const auto& field : file_schema) {
-            if (_options.mode == TableColumnMappingMode::BY_FIELD_ID &&
-                field.id == table_column.id) {
-                return &field;
-            }
-            if (_options.mode == TableColumnMappingMode::BY_NAME &&
-                field.name == table_column.name) {
-                return &field;
-            }
-        }
-        return nullptr;
-    }
-
-    const ColumnMapping* find_mapping(ColumnId table_column_id) const {
-        for (const auto& mapping : _mappings) {
-            if (mapping.table_column_id == table_column_id) {
-                return &mapping;
-            }
-        }
-        return nullptr;
-    }
-
-    bool is_same_type(const DataTypePtr& table_type, const DataTypePtr& 
file_type) const {
-        return table_type == file_type;
-    }
-
-private:
-    TableColumnMapperOptions _options;
-    std::vector<ColumnMapping> _mappings;
-};
-
 struct BaseDataFile {
     virtual ~BaseDataFile() = default;
 
@@ -236,13 +90,21 @@ struct ScanTask {
     std::unique_ptr<BaseDataFile> data_file;
 };
 
+struct ReadProfile {};
+
 struct TableReadOptions {
-    size_t batch_size = 4096;
-    // TODO: deleted? SCHEMA should be derived from table metadata and inited 
by TableReader it self? it shouldn't be part of read options.
-    std::vector<TableColumn> schema;
-    TableScanRequest scan_request;
+    const std::vector<TableColumn> projected_columns;
+    // All conjuncts from scan operator
+    const VExprContext conjuncts;
+    const FileFormat format;
     // Each task denotes a descriptor of a single file to read, along with 
file-level metadata such as stats and delete files.
     std::vector<std::unique_ptr<ScanTask>> scan_tasks;
+
+    std::unique_ptr<ReadProfile> profile;
+};
+
+struct SplitReadOptions {
+    std::map<std::string, Field> partition_values;
 };
 
 // table-level reader 基类。
@@ -255,10 +117,20 @@ public:
     // 初始化 table reader 的通用运行参数。
     // 子类可以在自己的 init(options) 中调用该方法;这里不接收具体表格式 schema/task。
     virtual Status init(TableReadOptions options) {
-        _schema = std::move(_options.schema);
-        _table_scan_request = std::move(_options.scan_request);
         _scan_tasks = std::move(_options.scan_tasks);
         _next_task_idx = 0;
+        _profile = std::move(options.profile);
+        TableColumnMapperOptions mapper_options;
+        mapper_options.mode = TableColumnMappingMode::BY_FIELD_ID;
+        _data_reader.column_mapper = TableColumnMapper(mapper_options);
+        // TODO:
+        // _table_filters = 
build_table_filters_from_conjuncts(options.conjuncts);
+        return Status::OK();
+    }
+
+    // 读取当前 split/partition 之前初始化。
+    virtual Status prepare_split(SplitReadOptions options) {
+        _partition_values = std::move(options.partition_values);
         return Status::OK();
     }
 
@@ -278,20 +150,30 @@ public:
     // 基类负责 current reader 的打开、EOF 后切换和关闭;子类只实现 protected hook。
     // table_block 的列必须已经是 table/global schema 语义。
     Status get_block(Block* block, bool* eos) {
-        if (eos != nullptr) {
-            *eos = false;
-        }
         while (block->empty() && !*eos) {
-            if (!_data_reader) {
+            if (!_data_reader.reader) {
                 RETURN_IF_ERROR(create_next_reader(eos));
-                if (!_data_reader) {
+                if (!_data_reader.reader) {
                     DCHECK(*eos);
                     return Status::OK();
                 }
             }
 
             bool current_eof = false;
-            RETURN_IF_ERROR(_data_reader->get_block(block, &current_eof));
+            Block current_block;
+            for (const auto& field : _data_reader.block_schema) {
+                // TODO: reuse column's memory
+                current_block.insert({field.type->create_column(), field.type, 
field.name});
+            }
+            RETURN_IF_ERROR(_data_reader.reader->get_block(&current_block, 
&current_eof));
+
+            size_t idx = 0;
+            for (const auto& mapping : _data_reader.column_mapper.mappings()) {
+                int res_id;
+                RETURN_IF_ERROR(mapping.projection->execute(&current_block, 
&res_id));
+                block->replace_by_position(idx, 
current_block.get_columns()[res_id]);
+                idx++;
+            }
             RETURN_IF_ERROR(finalize_chunk(block));
             RETURN_IF_ERROR(materialize_virtual_columns(block));
             if (current_eof) {
@@ -304,7 +186,7 @@ public:
     // 关闭 table reader 及当前正在读取的底层 reader。
     // 子类如果持有额外表格式资源,应 override 后先调用 TableReader::close()。
     virtual Status close() {
-        if (_data_reader) {
+        if (_data_reader.reader) {
             RETURN_IF_ERROR(close_current_reader());
         }
         return Status::OK();
@@ -315,10 +197,10 @@ protected:
     // 该方法先关闭当前 reader,再打开下一个具体 reader;子类不应重复实现这个循环。
     Status create_next_reader(bool* eos) {
         // 多文件切换的公共流程留在基类:关闭当前 reader,然后打开下一个 reader。
-        DCHECK(_data_reader == nullptr);
+        DCHECK(_data_reader.reader == nullptr);
         // TODO: 创建_data_reader
         // _data_reader = std::make_unique<FileReader>(...);
-        if (!_data_reader) {
+        if (!_data_reader.reader) {
             if (eos != nullptr) {
                 *eos = true;
             }
@@ -332,24 +214,25 @@ protected:
     // 子类在这里基于当前 split/task 初始化底层 FileReader。
     virtual Status open_reader() {
         std::vector<SchemaField> file_schema;
-        RETURN_IF_ERROR(_data_reader->get_schema(&file_schema));
-        TableColumnMapperOptions mapper_options;
-        mapper_options.mode = TableColumnMappingMode::BY_FIELD_ID;
-        _column_mapper = TableColumnMapper(mapper_options);
-        RETURN_IF_ERROR(_column_mapper.create_mapping(_schema, file_schema, 
&_mappings));
+        RETURN_IF_ERROR(_data_reader.reader->get_schema(&file_schema));
+        
RETURN_IF_ERROR(_data_reader.column_mapper.create_mapping(_options.projected_columns,
+                                                                  
_data_reader.block_schema,
+                                                                  
_partition_values, file_schema));
 
         FileScanRequest file_request;
-        RETURN_IF_ERROR(
-                _column_mapper.create_scan_request(_table_scan_request, 
_mappings, &file_request));
-        RETURN_IF_ERROR(_data_reader->init(file_request));
+        RETURN_IF_ERROR(_data_reader.column_mapper.create_scan_request(
+                _table_filters, _options.projected_columns, &file_request));
+        RETURN_IF_ERROR(_data_reader.reader->init(file_request));
         return Status::OK();
     }
 
     // 关闭当前具体 reader。
     // 该 hook 会被 create_next_reader 和 close 调用;实现应保持幂等。
     virtual Status close_current_reader() {
-        RETURN_IF_ERROR(_data_reader->close());
-        _data_reader.reset();
+        RETURN_IF_ERROR(_data_reader.reader->close());
+        _data_reader.reader.reset();
+        _data_reader.column_mapper.clear();
+        _data_reader.block_schema.clear();
         return Status::OK();
     }
 
@@ -365,14 +248,20 @@ protected:
         return Status::OK();
     }
 
+    struct DataReader {
+        std::unique_ptr<FileReader> reader;
+        TableColumnMapper column_mapper;
+        // Schema of blocks from file reader.
+        std::vector<SchemaField> block_schema;
+    };
+    DataReader _data_reader;
     TableReadOptions _options;
-    std::unique_ptr<FileReader> _data_reader;
     std::vector<std::unique_ptr<ScanTask>> _scan_tasks;
-    TableScanRequest _table_scan_request;
-    std::vector<TableColumn> _schema;
-    std::vector<ColumnMapping> _mappings;
-    TableColumnMapper _column_mapper;
+    // partition key -> value
+    std::map<std::string, Field> _partition_values;
     size_t _next_task_idx = 0;
+    std::map<int32_t, TableFilter> _table_filters;
+    std::unique_ptr<ReadProfile> _profile;
 };
 
 } // namespace doris::reader
diff --git a/be/src/format/table/iceberg_reader_mixin.h 
b/be/src/format/table/iceberg_reader_mixin.h
index 42c80c9b7d4..c9c84639b8f 100644
--- a/be/src/format/table/iceberg_reader_mixin.h
+++ b/be/src/format/table/iceberg_reader_mixin.h
@@ -341,9 +341,6 @@ protected:
     // id -> block column name
     std::unordered_map<int, std::string> _id_to_block_column_name;
 
-    // File column names used during init
-    std::vector<std::string> _file_col_names;
-
     std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()>
             _create_topn_row_id_column_iterator;
 
diff --git a/be/src/format/table/iceberg_reader_v2.h 
b/be/src/format/table/iceberg_reader_v2.h
index 3ddadc9f9de..fc957eda124 100644
--- a/be/src/format/table/iceberg_reader_v2.h
+++ b/be/src/format/table/iceberg_reader_v2.h
@@ -60,28 +60,8 @@ struct IcebergScanTask final : public reader::ScanTask {
 // FileReader 完成 data file 物理读取,不继承具体文件格式 reader。
 class IcebergTableReader : public reader::TableReader {
 public:
-    IcebergTableReader() = default;
-
     ~IcebergTableReader() override = default;
 
-    // 初始化一次 Iceberg table scan。
-    // options 必须一次性提供 schema、projection/filter 和 scan tasks,避免暴露
-    // bind/set_tasks 等半初始化阶段。
-    Status init(reader::TableReadOptions options) override {
-        // 一次性保存 Iceberg table scan 所需输入。TableReader 负责 reader 切换流程;
-        // IcebergTableReader 只提供后续要打开的 task 以及 table/file schema 映射语义。
-        return reader::TableReader::init(std::move(options));
-    }
-
-    // 关闭当前 Iceberg scan。
-    // 先让 TableReader 关闭当前 task reader,再释放 IcebergTableReader 持有的底层
-    // FileReader。
-    Status close() override {
-        RETURN_IF_ERROR(reader::TableReader::close());
-        _data_reader.reset();
-        return Status::OK();
-    }
-
 protected:
     // 将 file-local block 转换为 table/global schema block。
     // 这里执行 ColumnMapping 中的 finalize_expr、缺失列填充、partition/generated 列


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to