This is an automated email from the ASF dual-hosted git repository.

Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/refact_reader_branch by this 
push:
     new 1676f2ef64f fix compiling (#63368)
1676f2ef64f is described below

commit 1676f2ef64fb4651a15d6e8df8c0b2e06bcb15e8
Author: Gabriel <[email protected]>
AuthorDate: Mon May 18 20:22:03 2026 +0800

    fix compiling (#63368)
---
 be/src/format/reader/file_reader.h        |   6 +-
 be/src/format/reader/table_reader.h       | 167 ++++++++++++++++++------------
 be/src/format/table/iceberg_reader_v2.cpp |  20 ++++
 be/src/format/table/iceberg_reader_v2.h   | 150 +++------------------------
 4 files changed, 141 insertions(+), 202 deletions(-)

diff --git a/be/src/format/reader/file_reader.h 
b/be/src/format/reader/file_reader.h
index fd5bfcf933f..e9fb0f3c963 100644
--- a/be/src/format/reader/file_reader.h
+++ b/be/src/format/reader/file_reader.h
@@ -122,12 +122,8 @@ public:
     // 读取下一批 file-local block。
     // file_block 的列顺序和类型必须遵守 FileScanRequest,而不是 table/global schema。
     // eof 表示当前文件 reader 是否读完;多文件切换由 TableReader 负责。
-    virtual Status next(Block* file_block, size_t* rows, bool* eof) {
+    virtual Status get_block(Block* file_block, bool* eof) {
         // stub 默认立即 EOF。
-        (void)file_block;
-        if (rows != nullptr) {
-            *rows = 0;
-        }
         if (eof != nullptr) {
             *eof = true;
         }
diff --git a/be/src/format/reader/table_reader.h 
b/be/src/format/reader/table_reader.h
index 8d88ce4fe1b..99dcc507e5d 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -17,6 +17,8 @@
 
 #pragma once
 
+#include <bvar/status.h>
+
 #include <cstddef>
 #include <cstdint>
 #include <memory>
@@ -26,6 +28,7 @@
 #include <vector>
 
 #include "common/status.h"
+#include "core/block/block.h"
 #include "core/data_type/data_type.h"
 #include "exprs/vexpr_fwd.h"
 #include "format/reader/file_reader.h"
@@ -110,7 +113,8 @@ struct TableScanRequest {
 // Iceberg-only 组件。
 class TableColumnMapper {
 public:
-    explicit TableColumnMapper(TableColumnMapperOptions options = {}) : 
_options(std::move(options)) {}
+    explicit TableColumnMapper(TableColumnMapperOptions options = {})
+            : _options(std::move(options)) {}
     virtual ~TableColumnMapper() = default;
 
     // 建立 table schema 到 file schema 的列映射。
@@ -184,14 +188,15 @@ public:
     const std::vector<ColumnMapping>& mappings() const { return _mappings; }
 
 private:
-    const SchemaField* find_file_field(
-            const TableColumn& table_column,
-            const std::vector<SchemaField>& file_schema) const {
+    const SchemaField* find_file_field(const TableColumn& table_column,
+                                       const std::vector<SchemaField>& 
file_schema) const {
         for (const auto& field : file_schema) {
-            if (_options.mode == TableColumnMappingMode::BY_FIELD_ID && 
field.id == table_column.id) {
+            if (_options.mode == TableColumnMappingMode::BY_FIELD_ID &&
+                field.id == table_column.id) {
                 return &field;
             }
-            if (_options.mode == TableColumnMappingMode::BY_NAME && field.name 
== table_column.name) {
+            if (_options.mode == TableColumnMappingMode::BY_NAME &&
+                field.name == table_column.name) {
                 return &field;
             }
         }
@@ -216,8 +221,28 @@ private:
     std::vector<ColumnMapping> _mappings;
 };
 
+struct BaseDataFile {
+    virtual ~BaseDataFile() = default;
+
+    std::string path;
+    std::string format;
+    int64_t record_count = 0;
+    int64_t file_size = 0;
+};
+
+struct ScanTask {
+    virtual ~ScanTask() = default;
+
+    std::unique_ptr<BaseDataFile> data_file;
+};
+
 struct TableReadOptions {
     size_t batch_size = 4096;
+    // TODO: deleted? SCHEMA should be derived from table metadata and inited 
by TableReader it self? it shouldn't be part of read options.
+    std::vector<TableColumn> schema;
+    TableScanRequest scan_request;
+    // Each task denotes a descriptor of a single file to read, along with 
file-level metadata such as stats and delete files.
+    std::vector<std::unique_ptr<ScanTask>> scan_tasks;
 };
 
 // table-level reader 基类。
@@ -228,9 +253,12 @@ public:
     virtual ~TableReader() = default;
 
     // 初始化 table reader 的通用运行参数。
-    // 子类可以在自己的 init(params) 中调用该方法;这里不接收具体表格式 schema/task。
-    virtual Status init(const TableReadOptions& options) {
-        _options = options;
+    // 子类可以在自己的 init(options) 中调用该方法;这里不接收具体表格式 schema/task。
+    virtual Status init(TableReadOptions options) {
+        _schema = std::move(_options.schema);
+        _table_scan_request = std::move(_options.scan_request);
+        _scan_tasks = std::move(_options.scan_tasks);
+        _next_task_idx = 0;
         return Status::OK();
     }
 
@@ -249,91 +277,102 @@ public:
     // 对外读取 table block 的统一入口。
     // 基类负责 current reader 的打开、EOF 后切换和关闭;子类只实现 protected hook。
     // table_block 的列必须已经是 table/global schema 语义。
-    Status next(Block* table_block, size_t* rows, bool* eof) {
-        if (rows != nullptr) {
-            *rows = 0;
-        }
-        if (eof != nullptr) {
-            *eof = false;
+    Status get_block(Block* block, bool* eos) {
+        if (eos != nullptr) {
+            *eos = false;
         }
-        while (true) {
-            if (!_has_current_reader) {
-                RETURN_IF_ERROR(next_reader());
-                if (!_has_current_reader) {
-                    if (eof != nullptr) {
-                        *eof = true;
-                    }
+        while (block->empty() && !*eos) {
+            if (!_data_reader) {
+                RETURN_IF_ERROR(create_next_reader(eos));
+                if (!_data_reader) {
+                    DCHECK(*eos);
                     return Status::OK();
                 }
             }
 
-            size_t current_rows = 0;
             bool current_eof = false;
-            RETURN_IF_ERROR(read_current(table_block, &current_rows, 
&current_eof));
-            if (rows != nullptr) {
-                *rows = current_rows;
-            }
-            if (!current_eof || current_rows > 0) {
-                return Status::OK();
+            RETURN_IF_ERROR(_data_reader->get_block(block, &current_eof));
+            RETURN_IF_ERROR(finalize_chunk(block));
+            RETURN_IF_ERROR(materialize_virtual_columns(block));
+            if (current_eof) {
+                RETURN_IF_ERROR(close_current_reader());
             }
-            RETURN_IF_ERROR(close_current_reader());
-            _has_current_reader = false;
         }
+        return Status::OK();
     }
 
     // 关闭 table reader 及当前正在读取的底层 reader。
     // 子类如果持有额外表格式资源,应 override 后先调用 TableReader::close()。
     virtual Status close() {
-        RETURN_IF_ERROR(close_current_reader());
-        _has_current_reader = false;
+        if (_data_reader) {
+            RETURN_IF_ERROR(close_current_reader());
+        }
         return Status::OK();
     }
 
 protected:
     // 切换到下一个 reader 的通用流程。
-    // 该方法先关闭当前 reader,再调用 open_next_reader;子类不应重复实现这个循环。
-    Status next_reader() {
+    // 该方法先关闭当前 reader,再打开下一个具体 reader;子类不应重复实现这个循环。
+    Status create_next_reader(bool* eos) {
         // 多文件切换的公共流程留在基类:关闭当前 reader,然后打开下一个 reader。
-        // 子类只通过 open_next_reader 提供具体表格式的 task/split 打开方式。
-        RETURN_IF_ERROR(close_current_reader());
-        bool has_reader = false;
-        RETURN_IF_ERROR(open_next_reader(&has_reader));
-        _has_current_reader = has_reader;
+        DCHECK(_data_reader == nullptr);
+        // TODO: 创建_data_reader
+        // _data_reader = std::make_unique<FileReader>(...);
+        if (!_data_reader) {
+            if (eos != nullptr) {
+                *eos = true;
+            }
+            return Status::OK();
+        }
+        RETURN_IF_ERROR(open_reader());
         return Status::OK();
     }
 
-    // 打开下一个具体 reader。
-    // 子类在这里选择下一个 split/task,创建或重置底层 FileReader,并设置 has_reader。
-    // has_reader=false 表示没有更多输入,TableReader::next 会返回 eof=true。
-    virtual Status open_next_reader(bool* has_reader) {
-        // stub 默认没有下一个 reader。
-        if (has_reader != nullptr) {
-            *has_reader = false;
-        }
+    // 打开当前具体 reader。
+    // 子类在这里基于当前 split/task 初始化底层 FileReader。
+    virtual Status open_reader() {
+        std::vector<SchemaField> file_schema;
+        RETURN_IF_ERROR(_data_reader->get_schema(&file_schema));
+        TableColumnMapperOptions mapper_options;
+        mapper_options.mode = TableColumnMappingMode::BY_FIELD_ID;
+        _column_mapper = TableColumnMapper(mapper_options);
+        RETURN_IF_ERROR(_column_mapper.create_mapping(_schema, file_schema, 
&_mappings));
+
+        FileScanRequest file_request;
+        RETURN_IF_ERROR(
+                _column_mapper.create_scan_request(_table_scan_request, 
_mappings, &file_request));
+        RETURN_IF_ERROR(_data_reader->init(file_request));
         return Status::OK();
     }
 
-    // 从当前 reader 读取一批 table block。
-    // 子类应在这里读取 file-local block,并完成 delete、virtual column、finalize_expr
-    // 等 table-level 处理,最终写入 table_block。
-    virtual Status read_current(Block* table_block, size_t* rows, bool* eof) {
-        // stub 默认当前 reader 立即 EOF。
-        (void)table_block;
-        if (rows != nullptr) {
-            *rows = 0;
-        }
-        if (eof != nullptr) {
-            *eof = true;
-        }
+    // 关闭当前具体 reader。
+    // 该 hook 会被 create_next_reader 和 close 调用;实现应保持幂等。
+    virtual Status close_current_reader() {
+        RETURN_IF_ERROR(_data_reader->close());
+        _data_reader.reset();
         return Status::OK();
     }
 
-    // 关闭当前具体 reader。
-    // 该 hook 会被 next_reader 和 close 调用;实现应保持幂等。
-    virtual Status close_current_reader() { return Status::OK(); }
+    // 将 file-local block 转换为 table/global schema block。
+    // 这里执行 ColumnMapping 中的 finalize_expr、缺失列填充、partition/generated 列
+    // 物化以及复杂列 remap。
+    virtual Status finalize_chunk(Block* block) { return Status::OK(); }
+
+    // 物化虚拟列。
+    // 例如 _row_id、_last_updated_sequence_number 等,它们不来自文件物理列。
+    virtual Status materialize_virtual_columns(Block* table_block) {
+        // 真实实现会物化 _row_id、_last_updated_sequence_number 等 Iceberg 虚拟列。
+        return Status::OK();
+    }
 
     TableReadOptions _options;
-    bool _has_current_reader = false;
+    std::unique_ptr<FileReader> _data_reader;
+    std::vector<std::unique_ptr<ScanTask>> _scan_tasks;
+    TableScanRequest _table_scan_request;
+    std::vector<TableColumn> _schema;
+    std::vector<ColumnMapping> _mappings;
+    TableColumnMapper _column_mapper;
+    size_t _next_task_idx = 0;
 };
 
 } // namespace doris::reader
diff --git a/be/src/format/table/iceberg_reader_v2.cpp 
b/be/src/format/table/iceberg_reader_v2.cpp
new file mode 100644
index 00000000000..220f153e93f
--- /dev/null
+++ b/be/src/format/table/iceberg_reader_v2.cpp
@@ -0,0 +1,20 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "format/table/iceberg_reader_v2.h"
+
+namespace doris::iceberg {} // namespace doris::iceberg
diff --git a/be/src/format/table/iceberg_reader_v2.h 
b/be/src/format/table/iceberg_reader_v2.h
index 29b556f71ed..3ddadc9f9de 100644
--- a/be/src/format/table/iceberg_reader_v2.h
+++ b/be/src/format/table/iceberg_reader_v2.h
@@ -35,51 +35,26 @@ class Block;
 namespace doris::iceberg {
 
 // Iceberg data file 摘要。它描述当前要读取的物理 data file,不承载列映射逻辑。
-struct IcebergDataFile {
-    std::string path;
-    std::string format;
-    int64_t record_count = 0;
-    int64_t file_size = 0;
+struct IcebergDataFile final : public reader::BaseDataFile {
     int64_t sequence_number = 0;
     int64_t first_row_id = -1;
 };
 
 // Iceberg delete file 摘要。position/equality/deletion vector 的具体读取在
 // IcebergTableReader 实现阶段补齐。
-struct IcebergDeleteFile {
-    std::string path;
-    std::string format;
+struct IcebergDeleteFile final : public reader::BaseDataFile {
     int64_t sequence_number = 0;
     std::vector<reader::ColumnId> equality_field_ids;
 };
 
 // 单个 Iceberg data file 的 scan 输入。
 // 该结构只进入 IcebergTableReader,不直接传给 ParquetReader。
-struct IcebergScanTask {
-    IcebergDataFile data_file;
+struct IcebergScanTask final : public reader::ScanTask {
     std::vector<IcebergDeleteFile> positional_deletes;
     std::vector<IcebergDeleteFile> equality_deletes;
     std::vector<IcebergDeleteFile> deletion_vectors;
 };
 
-struct IcebergReadOptions {
-    reader::TableReadOptions table_options;
-    bool enable_position_delete = true;
-    bool enable_equality_delete = true;
-    bool enable_deletion_vector = true;
-};
-
-// IcebergTableReader 的完整初始化输入。
-// 这些字段共同决定一次 table scan 的语义,除非后续有明确的生命周期差异,否则不拆成
-// bind/init/set_tasks 多个阶段,避免调用点暴露半初始化状态。
-struct IcebergTableReadParams {
-    IcebergReadOptions options;
-    std::vector<reader::TableColumn> iceberg_schema;
-    reader::TableScanRequest scan_request;
-    std::vector<IcebergScanTask> scan_tasks;
-    std::unique_ptr<reader::FileReader> data_reader;
-};
-
 // Iceberg table-level reader。
 // 该层继承 TableReader,复用多文件编排和动态分区裁剪等通用能力;同时组合
 // FileReader 完成 data file 物理读取,不继承具体文件格式 reader。
@@ -90,18 +65,12 @@ public:
     ~IcebergTableReader() override = default;
 
     // 初始化一次 Iceberg table scan。
-    // params 必须一次性提供 schema、projection/filter、scan tasks 和底层 FileReader;
-    // 这样 IcebergTableReader 不会暴露 bind/set_tasks 等半初始化阶段。
-    Status init(IcebergTableReadParams params) {
+    // options 必须一次性提供 schema、projection/filter 和 scan tasks,避免暴露
+    // bind/set_tasks 等半初始化阶段。
+    Status init(reader::TableReadOptions options) override {
         // 一次性保存 Iceberg table scan 所需输入。TableReader 负责 reader 切换流程;
         // IcebergTableReader 只提供后续要打开的 task 以及 table/file schema 映射语义。
-        _iceberg_options = params.options;
-        _iceberg_schema = std::move(params.iceberg_schema);
-        _table_scan_request = std::move(params.scan_request);
-        _scan_tasks = std::move(params.scan_tasks);
-        _data_reader = std::move(params.data_reader);
-        _next_task_idx = 0;
-        return reader::TableReader::init(_iceberg_options.table_options);
+        return reader::TableReader::init(std::move(options));
     }
 
     // 关闭当前 Iceberg scan。
@@ -114,75 +83,20 @@ public:
     }
 
 protected:
-    // 打开单个 Iceberg scan task。
-    // 该方法完成当前 data file 的 schema mapping、filter localization、position delete
-    // 注入,并初始化底层 FileReader;它由 TableReader 的 reader 切换流程调用。
-    Status open_task(const IcebergScanTask& task) {
-        // 真实实现会读取 data file schema,创建 field-id mapping,应用 position deletes,
-        // 并初始化底层 ParquetReader。
-        _scan_task = task;
-        std::vector<reader::SchemaField> file_schema;
-        if (_data_reader) {
-            RETURN_IF_ERROR(_data_reader->get_schema(&file_schema));
-        }
-        reader::TableColumnMapperOptions mapper_options;
-        mapper_options.mode = reader::TableColumnMappingMode::BY_FIELD_ID;
-        _column_mapper = reader::TableColumnMapper(mapper_options);
-        RETURN_IF_ERROR(_column_mapper.create_mapping(_iceberg_schema, 
file_schema, &_mappings));
-
-        reader::FileScanRequest file_request;
-        
RETURN_IF_ERROR(_column_mapper.create_scan_request(_table_scan_request, 
_mappings,
-                                                           &file_request));
-        RETURN_IF_ERROR(apply_position_deletes(&file_request));
-        if (_data_reader) {
-            RETURN_IF_ERROR(_data_reader->init(file_request));
-        }
-        return Status::OK();
-    }
-
-    // 打开下一个 Iceberg task。
-    // TableReader 负责循环和 EOF 处理;这里仅从 _scan_tasks 中取下一个 task 并调用
-    // open_task。
-    Status open_next_reader(bool* has_reader) override {
-        if (_next_task_idx >= _scan_tasks.size()) {
-            if (has_reader != nullptr) {
-                *has_reader = false;
-            }
-            return Status::OK();
-        }
-        RETURN_IF_ERROR(open_task(_scan_tasks[_next_task_idx++]));
-        if (has_reader != nullptr) {
-            *has_reader = true;
-        }
-        return Status::OK();
-    }
-
-    // 读取当前 Iceberg task 的下一批 table block。
-    // 这里组合底层 FileReader 输出的 file-local block,并负责 equality delete、
-    // virtual columns 和 finalize,最终输出 table/global schema block。
-    Status read_current(Block* table_block, size_t* rows, bool* eof) override {
-        // 真实实现会读取 file-local block,finalize 成 table block,再应用 equality delete
-        // 和 Iceberg virtual columns。stub 默认 EOF。
-        // 后续实现应在 IcebergTableReader 内部持有 file-local block;这里仅复用输出指针
-        // 作为 header-only API 占位,避免在骨架阶段引入 Block 的完整定义。
-        Block* file_block = table_block;
-        if (_data_reader) {
-            RETURN_IF_ERROR(_data_reader->next(file_block, rows, eof));
-        }
-        RETURN_IF_ERROR(finalize_chunk(file_block, table_block));
-        RETURN_IF_ERROR(apply_equality_deletes(table_block));
-        RETURN_IF_ERROR(materialize_virtual_columns(table_block, rows != 
nullptr ? *rows : 0));
-        return Status::OK();
-    }
-
     // 将 file-local block 转换为 table/global schema block。
     // 这里执行 ColumnMapping 中的 finalize_expr、缺失列填充、partition/generated 列
     // 物化以及复杂列 remap。
-    Status finalize_chunk(Block* file_block, Block* table_block) {
+    Status finalize_chunk(Block* block) override {
         // 真实实现会根据 ColumnMapping 执行 finalize_expr/default/partition/generated
         // expressions,把 file-local block 写成 table block。
-        (void)file_block;
-        (void)table_block;
+        RETURN_IF_ERROR(apply_equality_deletes(block));
+        return Status::OK();
+    }
+
+    // 物化 Iceberg 虚拟列。
+    // 例如 _row_id、_last_updated_sequence_number 等,它们不来自 Parquet 文件物理列。
+    Status materialize_virtual_columns(Block* table_block) override {
+        // 真实实现会物化 _row_id、_last_updated_sequence_number 等 Iceberg 虚拟列。
         return Status::OK();
     }
 
@@ -196,40 +110,10 @@ protected:
 
     // 在 table block 上应用 equality delete。
     // equality delete 依赖 table-level 列语义,因此不能下沉到 ParquetReader。
-    Status apply_equality_deletes(Block* table_block) {
+    Status apply_equality_deletes(Block* block) {
         // 真实实现会在 table block 上应用 equality delete。
-        (void)table_block;
-        return Status::OK();
-    }
-
-    // 物化 Iceberg 虚拟列。
-    // 例如 _row_id、_last_updated_sequence_number 等,它们不来自 Parquet 文件物理列。
-    Status materialize_virtual_columns(Block* table_block, size_t rows) {
-        // 真实实现会物化 _row_id、_last_updated_sequence_number 等 Iceberg 虚拟列。
-        (void)table_block;
-        (void)rows;
         return Status::OK();
     }
-
-    // 关闭当前 task 对应的底层 FileReader。
-    // 该方法由 TableReader 在切换 reader 或 close 时调用,要求可重复调用。
-    Status close_current_reader() override {
-        if (_data_reader) {
-            RETURN_IF_ERROR(_data_reader->close());
-        }
-        return Status::OK();
-    }
-
-private:
-    IcebergReadOptions _iceberg_options;
-    IcebergScanTask _scan_task;
-    std::vector<IcebergScanTask> _scan_tasks;
-    size_t _next_task_idx = 0;
-    reader::TableScanRequest _table_scan_request;
-    std::vector<reader::TableColumn> _iceberg_schema;
-    std::vector<reader::ColumnMapping> _mappings;
-    reader::TableColumnMapper _column_mapper;
-    std::unique_ptr<reader::FileReader> _data_reader;
 };
 
 } // namespace doris::iceberg


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to