This is an automated email from the ASF dual-hosted git repository.

Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/refact_reader_branch by this 
push:
     new 5dc54d878a2 [improvement](be) Reuse table reader file block (#63704)
5dc54d878a2 is described below

commit 5dc54d878a2ae257797fecb7400653587bdd0a54
Author: Gabriel <[email protected]>
AuthorDate: Wed May 27 09:14:47 2026 +0800

    [improvement](be) Reuse table reader file block (#63704)
---
 be/src/exec/scan/file_scanner.h                    |   2 +-
 be/src/format/new_parquet/parquet_reader.cpp       |  49 +++++--
 be/src/format/new_parquet/parquet_reader.h         |   2 +-
 be/src/format/reader/column_mapper.cpp             |  54 ++++++--
 be/src/format/reader/column_mapper.h               |   1 +
 be/src/format/reader/file_reader.h                 |   7 +-
 be/src/format/reader/table_reader.cpp              |  81 ++++++++++-
 be/src/format/reader/table_reader.h                | 154 +++++++++++++--------
 be/test/format/new_parquet/parquet_reader_test.cpp |   8 +-
 9 files changed, 264 insertions(+), 94 deletions(-)

diff --git a/be/src/exec/scan/file_scanner.h b/be/src/exec/scan/file_scanner.h
index 7c3d9d08b6a..34f59cdee32 100644
--- a/be/src/exec/scan/file_scanner.h
+++ b/be/src/exec/scan/file_scanner.h
@@ -189,7 +189,7 @@ protected:
 
     std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
     std::unique_ptr<io::FileReaderStats> _file_reader_stats;
-    std::unique_ptr<io::IOContext> _io_ctx;
+    std::shared_ptr<io::IOContext> _io_ctx;
 
     // Whether to fill partition columns from path, default is true.
     std::unordered_map<std::string, std::tuple<std::string, const 
SlotDescriptor*>>
diff --git a/be/src/format/new_parquet/parquet_reader.cpp 
b/be/src/format/new_parquet/parquet_reader.cpp
index fc00484758e..7f442808523 100644
--- a/be/src/format/new_parquet/parquet_reader.cpp
+++ b/be/src/format/new_parquet/parquet_reader.cpp
@@ -208,10 +208,11 @@ Status ParquetReader::_read_filter_columns(int64_t 
batch_rows, Block* file_block
     for (size_t filter_idx = 0; filter_idx < 
_request->predicate_columns.size(); ++filter_idx) {
         const int file_field_id = _request->predicate_columns[filter_idx];
         auto& column_reader = _state->current_predicate_columns[filter_idx];
-        auto column = 
file_block->get_by_position(column_reader->file_column_id())
-                              .column->assume_mutable();
-        DCHECK_EQ(file_block->get_by_position(column_reader->file_column_id())
-                          .type->get_primitive_type(),
+        auto position_it = _request->column_positions.find(file_field_id);
+        DORIS_CHECK(position_it != _request->column_positions.end());
+        const auto block_position = position_it->second;
+        auto column = 
file_block->get_by_position(block_position).column->assume_mutable();
+        
DCHECK_EQ(file_block->get_by_position(block_position).type->get_primitive_type(),
                   column_reader->type()->get_primitive_type());
         int64_t column_rows = 0;
         RETURN_IF_ERROR(column_reader->read(batch_rows, column, &column_rows));
@@ -236,7 +237,7 @@ Status ParquetReader::_read_filter_columns(int64_t 
batch_rows, Block* file_block
             }
             break;
         }
-        file_block->replace_by_position(file_field_id, std::move(column));
+        file_block->replace_by_position(block_position, std::move(column));
         if (*selected_rows == 0) {
             break;
         }
@@ -313,7 +314,7 @@ Status ParquetReader::_open_next_row_group(bool* 
has_row_group) {
     return Status::OK();
 }
 
-// `file_block` has a complete struct derived from the file's schema.
+// `file_block` has the same layout as FileScanRequest::column_positions.
 Status ParquetReader::_read_current_row_group_batch(int64_t batch_rows, Block* 
file_block,
                                                     size_t* rows) {
     if (_state->current_predicate_columns.empty() &&
@@ -331,9 +332,12 @@ Status 
ParquetReader::_read_current_row_group_batch(int64_t batch_rows, Block* f
     if (need_filter_output) {
         IColumn::Filter output_filter = _selection_to_filter(selection, 
selected_rows, batch_rows);
         for (const auto file_field_id : _request->predicate_columns) {
+            auto position_it = _request->column_positions.find(file_field_id);
+            DORIS_CHECK(position_it != _request->column_positions.end());
+            const auto block_position = position_it->second;
             RETURN_IF_CATCH_EXCEPTION(file_block->replace_by_position(
-                    file_field_id, file_block->get_by_position(file_field_id)
-                                           .column->filter(output_filter, 
selected_rows)));
+                    block_position, file_block->get_by_position(block_position)
+                                            .column->filter(output_filter, 
selected_rows)));
         }
     }
 
@@ -341,9 +345,12 @@ Status 
ParquetReader::_read_current_row_group_batch(int64_t batch_rows, Block* f
     for (size_t output_idx = 0; output_idx < 
_state->current_non_predicate_columns.size();
          ++output_idx) {
         auto& column_reader = 
_state->current_non_predicate_columns[output_idx];
-        auto col = 
file_block->get_columns()[column_reader->file_column_id()]->assume_mutable();
-        DCHECK_EQ(file_block->get_by_position(column_reader->file_column_id())
-                          .type->get_primitive_type(),
+        auto position_it =
+                
_request->column_positions.find(_request->non_predicate_columns[output_idx]);
+        DORIS_CHECK(position_it != _request->column_positions.end());
+        const auto block_position = position_it->second;
+        auto col = file_block->get_columns()[block_position]->assume_mutable();
+        
DCHECK_EQ(file_block->get_by_position(block_position).type->get_primitive_type(),
                   column_reader->type()->get_primitive_type());
         if (need_filter_output) {
             [[maybe_unused]] auto old_size = col->size();
@@ -368,7 +375,7 @@ Status ParquetReader::_read_current_row_group_batch(int64_t 
batch_rows, Block* f
     return Status::OK();
 }
 
-ParquetReader::ParquetReader(std::unique_ptr<io::FileSystemProperties>& 
system_properties,
+ParquetReader::ParquetReader(std::shared_ptr<io::FileSystemProperties>& 
system_properties,
                              std::unique_ptr<io::FileDescription>& 
file_description,
                              std::shared_ptr<io::IOContext> io_ctx, 
RuntimeProfile* profile)
         : FileReader(system_properties, file_description, io_ctx, profile) {}
@@ -424,7 +431,25 @@ Status 
ParquetReader::open(std::unique_ptr<reader::FileScanRequest>& request) {
     }
     RETURN_IF_ERROR(reader::FileReader::open(request));
 
+    // `_request->column_positions.empty()` means all columns are needed by 
table reader
+    if (_request->column_positions.empty()) {
+        for (const auto file_column_id : _request->predicate_columns) {
+            _request->column_positions.emplace(file_column_id, file_column_id);
+        }
+        for (const auto file_column_id : _request->non_predicate_columns) {
+            _request->column_positions.emplace(file_column_id, file_column_id);
+        }
+    }
+
     const int num_fields = static_cast<int>(_state->file_schema.size());
+    for (const auto file_column_id : _request->predicate_columns) {
+        DORIS_CHECK(_request->column_positions.count(file_column_id) > 0);
+        DORIS_CHECK(file_column_id >= 0 && file_column_id < num_fields);
+    }
+    for (const auto file_column_id : _request->non_predicate_columns) {
+        DORIS_CHECK(_request->column_positions.count(file_column_id) > 0);
+        DORIS_CHECK(file_column_id >= 0 && file_column_id < num_fields);
+    }
     for (const auto& local_filter : _request->local_filters) {
         if (local_filter.file_column_id < 0 || local_filter.file_column_id >= 
num_fields) {
             return Status::InvalidArgument("Invalid parquet filter top-level 
field id {}",
diff --git a/be/src/format/new_parquet/parquet_reader.h 
b/be/src/format/new_parquet/parquet_reader.h
index 426960a4dfd..6920f8c4d78 100644
--- a/be/src/format/new_parquet/parquet_reader.h
+++ b/be/src/format/new_parquet/parquet_reader.h
@@ -45,7 +45,7 @@ struct ParquetScanRequest : public reader::FileScanRequest {};
 // schema,不处理 table-level cast/default/generated/partition 语义。
 class ParquetReader : public reader::FileReader {
 public:
-    ParquetReader(std::unique_ptr<io::FileSystemProperties>& system_properties,
+    ParquetReader(std::shared_ptr<io::FileSystemProperties>& system_properties,
                   std::unique_ptr<io::FileDescription>& file_description,
                   std::shared_ptr<io::IOContext> io_ctx, RuntimeProfile* 
profile);
     ~ParquetReader() override;
diff --git a/be/src/format/reader/column_mapper.cpp 
b/be/src/format/reader/column_mapper.cpp
index 0eed9d3e566..b2453dbbfaf 100644
--- a/be/src/format/reader/column_mapper.cpp
+++ b/be/src/format/reader/column_mapper.cpp
@@ -17,6 +17,7 @@
 
 #include "format/reader/column_mapper.h"
 
+#include <cstddef>
 #include <vector>
 
 #include "common/status.h"
@@ -30,6 +31,31 @@ namespace doris::reader {
 static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id";
 static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER = 
"_last_updated_sequence_number";
 
+static void add_scan_column(FileScanRequest* file_request, ColumnId 
file_column_id,
+                            std::vector<ColumnId>* scan_columns) {
+    if (file_request->column_positions.count(file_column_id) == 0) {
+        file_request->column_positions.emplace(file_column_id,
+                                               
file_request->column_positions.size());
+        scan_columns->push_back(file_column_id);
+    }
+}
+
+static void rebuild_projection(ColumnMapping* mapping, size_t block_position) {
+    DORIS_CHECK(mapping->file_column_id.has_value());
+    if (mapping->is_trivial) {
+        mapping->projection = 
VExprContext::create_shared(TableSlotRef::create_shared(
+                cast_set<int>(block_position), cast_set<int>(block_position), 
-1,
+                mapping->file_type, mapping->file_column_name));
+        return;
+    }
+
+    auto expr = Cast::create_shared(mapping->table_type);
+    expr->add_child(TableSlotRef::create_shared(cast_set<int>(block_position),
+                                                cast_set<int>(block_position), 
-1,
+                                                mapping->file_type, 
mapping->file_column_name));
+    mapping->projection = VExprContext::create_shared(expr);
+}
+
 Status TableColumnMapper::create_mapping(const std::vector<TableColumn>& 
projected_columns,
                                          const std::map<std::string, Field>& 
partition_values,
                                          const std::vector<SchemaField>& 
file_schema) {
@@ -40,21 +66,9 @@ Status TableColumnMapper::create_mapping(const 
std::vector<TableColumn>& project
         mapping.table_type = table_column.type;
         if (const auto* file_field = _find_file_field(table_column, 
file_schema)) {
             mapping.file_column_id = file_field->id;
+            mapping.file_column_name = file_field->name;
             mapping.file_type = file_field->type;
             mapping.is_trivial = _is_same_type(mapping.table_type, 
mapping.file_type);
-            if (!mapping.is_trivial) {
-                // 1. Data type mismatch (caused by schema evolution) and 
casting is needed.
-                auto expr = Cast::create_shared(mapping.table_type);
-                
expr->add_child(TableSlotRef::create_shared(mapping.file_column_id.value(),
-                                                            
mapping.file_column_id.value(), -1,
-                                                            mapping.file_type, 
file_field->name));
-                mapping.projection = VExprContext::create_shared(expr);
-            } else {
-                // 2. Data type matches, trivial mapping.
-                mapping.projection = 
VExprContext::create_shared(TableSlotRef::create_shared(
-                        mapping.file_column_id.value(), 
mapping.file_column_id.value(), -1,
-                        mapping.file_type, file_field->name));
-            }
         } else if (table_column.is_partition_key && 
partition_values.count(table_column.name) > 0) {
             // 3. Partition column, use partition value as a constant mapping. 
Note that partition column may also have default expression, but partition 
value should take precedence if it exists.
             mapping.default_expr = 
VExprContext::create_shared(TableLiteral::create_shared(
@@ -91,17 +105,27 @@ Status TableColumnMapper::create_scan_request(const 
std::map<int32_t, TableFilte
     // 真实实现会把 table projection/filter 转换成 file-local projection/filter。
     file_request->predicate_columns.clear();
     file_request->non_predicate_columns.clear();
+    file_request->column_positions.clear();
     file_request->local_filters.clear();
     file_request->reader_expression_map.clear();
     for (const auto& table_column : projected_columns) {
         const auto* mapping = _find_mapping(table_column.id);
         if (mapping != nullptr && mapping->file_column_id.has_value()) {
             if (table_filters.count(table_column.id) == 0) {
-                
file_request->non_predicate_columns.push_back(*mapping->file_column_id);
+                add_scan_column(file_request, *mapping->file_column_id,
+                                &file_request->non_predicate_columns);
             }
         }
     }
     RETURN_IF_ERROR(localize_filters(table_filters, file_request));
+    for (auto& mapping : _mappings) {
+        if (!mapping.file_column_id.has_value()) {
+            continue;
+        }
+        auto position_it = 
file_request->column_positions.find(*mapping.file_column_id);
+        DORIS_CHECK(position_it != file_request->column_positions.end());
+        rebuild_projection(&mapping, position_it->second);
+    }
     return Status::OK();
 }
 
@@ -124,7 +148,7 @@ Status TableColumnMapper::localize_filters(const 
std::map<int32_t, TableFilter>&
             local_filter.predicates = it.second.predicates;
             file_request->local_filters.push_back(std::move(local_filter));
         }
-        file_request->predicate_columns.push_back(*mapping->file_column_id);
+        add_scan_column(file_request, *mapping->file_column_id, 
&file_request->predicate_columns);
     }
     return Status::OK();
 }
diff --git a/be/src/format/reader/column_mapper.h 
b/be/src/format/reader/column_mapper.h
index d0d8076798b..4360b23e7de 100644
--- a/be/src/format/reader/column_mapper.h
+++ b/be/src/format/reader/column_mapper.h
@@ -51,6 +51,7 @@ enum TableVirtualColumnType {
 struct ColumnMapping {
     int32_t table_column_id = -1;
     std::optional<int32_t> file_column_id;
+    std::string file_column_name;
     DataTypePtr file_type;
     DataTypePtr table_type;
 
diff --git a/be/src/format/reader/file_reader.h 
b/be/src/format/reader/file_reader.h
index 7fbdb9b576c..0b0527535e5 100644
--- a/be/src/format/reader/file_reader.h
+++ b/be/src/format/reader/file_reader.h
@@ -95,6 +95,7 @@ struct FileScanRequest {
 
     std::vector<ColumnId> predicate_columns;
     std::vector<ColumnId> non_predicate_columns;
+    std::map<ColumnId, size_t> column_positions;
     std::vector<FileLocalFilter> local_filters;
     // fallback path if filters cannot be localized to file-local predicates. 
The expression can reference projected_file_columns and partition columns.
     std::vector<std::pair<ColumnId, VExprContextSPtr>> reader_expression_map;
@@ -136,10 +137,10 @@ public:
         int64_t bloom_filter_read_time = 0;
     };
 
-    FileReader(std::unique_ptr<io::FileSystemProperties>& system_properties,
+    FileReader(std::shared_ptr<io::FileSystemProperties>& system_properties,
                std::unique_ptr<io::FileDescription>& file_description,
                std::shared_ptr<io::IOContext> io_ctx, RuntimeProfile* profile)
-            : _system_properties(std::move(system_properties)),
+            : _system_properties(system_properties),
               _file_description(std::move(file_description)),
               _io_ctx(io_ctx),
               _profile(profile) {}
@@ -196,7 +197,7 @@ protected:
     std::unique_ptr<FileScanRequest> _request;
     bool _eof = true;
     ReaderStatistics _reader_statistics;
-    std::unique_ptr<io::FileSystemProperties> _system_properties;
+    std::shared_ptr<io::FileSystemProperties> _system_properties;
     std::unique_ptr<io::FileDescription> _file_description;
     std::shared_ptr<io::IOContext> _io_ctx;
     RuntimeProfile* _profile = nullptr;
diff --git a/be/src/format/reader/table_reader.cpp 
b/be/src/format/reader/table_reader.cpp
index b89641c0bd2..13f093228e6 100644
--- a/be/src/format/reader/table_reader.cpp
+++ b/be/src/format/reader/table_reader.cpp
@@ -17,16 +17,95 @@
 
 #include "format/reader/table_reader.h"
 
+#include <gen_cpp/PlanNodes_types.h>
+#include <gen_cpp/Types_types.h>
+
 #include <vector>
 
 #include "common/status.h"
+#include "format/new_parquet/parquet_reader.h"
 #include "format/reader/column_mapper.h"
 #include "format/table/deletion_vector_reader.h"
+#include "io/io_common.h"
 
 namespace doris::reader {
 
+std::shared_ptr<io::FileSystemProperties> create_system_properties(
+        const TFileScanRangeParams* scan_params) {
+    auto system_properties = std::make_shared<io::FileSystemProperties>();
+    if (scan_params == nullptr || !scan_params->__isset.file_type) {
+        system_properties->system_type = TFileType::FILE_LOCAL;
+        return system_properties;
+    }
+    system_properties->system_type = scan_params->file_type;
+    system_properties->properties = scan_params->properties;
+    system_properties->hdfs_params = scan_params->hdfs_params;
+    if (scan_params->__isset.broker_addresses) {
+        
system_properties->broker_addresses.assign(scan_params->broker_addresses.begin(),
+                                                   
scan_params->broker_addresses.end());
+    }
+    return system_properties;
+}
+
+Status TableReader::init(TableReadOptions options) {
+    _scan_params = options.scan_params;
+    _format = options.format;
+    _io_ctx = options.io_ctx;
+    _runtime_state = options.runtime_state;
+    _scanner_profile = options.scanner_profile;
+    _projected_columns = std::move(options.projected_columns);
+    _system_properties = create_system_properties(_scan_params);
+    _profile = std::move(options.profile);
+    TableColumnMapperOptions mapper_options;
+    mapper_options.mode = TableColumnMappingMode::BY_FIELD_ID;
+    _data_reader.column_mapper = TableColumnMapper(mapper_options);
+    // TODO:
+    // _table_filters = build_table_filters_from_conjuncts(options.conjuncts);
+    return Status::OK();
+}
+
+Status TableReader::create_next_reader(bool* eos) {
+    DCHECK(_data_reader.reader == nullptr);
+    if (_current_task == nullptr) {
+        *eos = true;
+        return Status::OK();
+    }
+
+    switch (_format) {
+    case FileFormat::PARQUET: {
+        _data_reader.reader = std::make_unique<parquet::ParquetReader>(
+                _system_properties, _current_task->data_file, _io_ctx, 
_scanner_profile);
+        break;
+    }
+    case FileFormat::ORC:
+    case FileFormat::CSV:
+        return Status::NotSupported("TableReader does not support file format 
{}",
+                                    static_cast<int>(_format));
+    }
+
+    RETURN_IF_ERROR(_data_reader.reader->init(_runtime_state));
+    RETURN_IF_ERROR(open_reader());
+    *eos = false;
+    return Status::OK();
+}
+
+std::unique_ptr<io::FileDescription> create_file_description(const 
TFileRangeDesc& range) {
+    auto file_description = std::make_unique<io::FileDescription>();
+    file_description->path = range.path;
+    file_description->file_size = range.__isset.file_size ? range.file_size : 
-1;
+    if (range.__isset.fs_name) {
+        file_description->fs_name = range.fs_name;
+    }
+    if (range.__isset.file_cache_admission) {
+        file_description->file_cache_admission = range.file_cache_admission;
+    }
+    return file_description;
+}
+
 Status TableReader::prepare_split(const SplitReadOptions& options) {
     _partition_values = std::move(options.partition_values);
+    _current_task = std::make_unique<ScanTask>();
+    _current_task->data_file = create_file_description(options.current_range);
     return _parse_delete_predicates(options);
 }
 
@@ -39,7 +118,7 @@ Status TableReader::_parse_delete_predicates(const 
SplitReadOptions& options) {
             auto* delete_rows = new DeleteRows;
 
             DeletionVectorReader dv_reader(_runtime_state, _scanner_profile, 
*_scan_params, desc,
-                                           _io_ctx);
+                                           _io_ctx.get());
             create_status = dv_reader.open();
             if (!create_status.ok()) [[unlikely]] {
                 return nullptr;
diff --git a/be/src/format/reader/table_reader.h 
b/be/src/format/reader/table_reader.h
index 7572383b8ad..53791747faf 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -19,10 +19,8 @@
 
 #include <bvar/status.h>
 
-#include <cstddef>
-#include <cstdint>
+#include <map>
 #include <memory>
-#include <optional>
 #include <string>
 #include <utility>
 #include <vector>
@@ -34,8 +32,8 @@
 #include "exprs/vexpr_fwd.h"
 #include "format/reader/column_mapper.h"
 #include "format/reader/expr/delete_predicate.h"
-#include "format/reader/expr/literal.h"
 #include "format/reader/file_reader.h"
+#include "runtime/descriptors.h"
 
 namespace doris {
 class Block;
@@ -91,7 +89,7 @@ struct BaseDataFile {
 struct ScanTask {
     virtual ~ScanTask() = default;
 
-    std::unique_ptr<BaseDataFile> data_file;
+    std::unique_ptr<io::FileDescription> data_file;
 };
 
 struct ReadProfile {
@@ -106,11 +104,9 @@ struct TableReadOptions {
     const VExprContext conjuncts;
     const FileFormat format;
     TFileScanRangeParams* scan_params;
-    io::IOContext* io_ctx;
+    std::shared_ptr<io::IOContext> io_ctx;
     RuntimeState* runtime_state;
     RuntimeProfile* scanner_profile;
-    // Each task denotes a descriptor of a single file to read, along with 
file-level metadata such as stats and delete files.
-    std::vector<std::unique_ptr<ScanTask>> scan_tasks;
 
     std::unique_ptr<ReadProfile> profile;
 };
@@ -130,22 +126,7 @@ public:
 
     // 初始化 table reader 的通用运行参数。
     // 子类可以在自己的 init(options) 中调用该方法;这里不接收具体表格式 schema/task。
-    virtual Status init(TableReadOptions options) {
-        _scan_params = options.scan_params;
-        _format = options.format;
-        _io_ctx = options.io_ctx;
-        _runtime_state = options.runtime_state;
-        _scanner_profile = options.scanner_profile;
-        _scan_tasks = std::move(_options.scan_tasks);
-        _next_task_idx = 0;
-        _profile = std::move(options.profile);
-        TableColumnMapperOptions mapper_options;
-        mapper_options.mode = TableColumnMappingMode::BY_FIELD_ID;
-        _data_reader.column_mapper = TableColumnMapper(mapper_options);
-        // TODO:
-        // _table_filters = 
build_table_filters_from_conjuncts(options.conjuncts);
-        return Status::OK();
-    }
+    virtual Status init(TableReadOptions options);
 
     // 读取当前 split/partition 之前初始化。
     virtual Status prepare_split(const SplitReadOptions& options);
@@ -166,7 +147,13 @@ public:
     // 基类负责 current reader 的打开、EOF 后切换和关闭;子类只实现 protected hook。
     // table_block 的列必须已经是 table/global schema 语义。
     Status get_block(Block* block, bool* eos) {
-        while (block->empty() && !*eos) {
+        DORIS_CHECK(block->columns() == _projected_columns.size());
+        block->clear_column_data(_projected_columns.size());
+
+        while (true) {
+            if (*eos) {
+                return Status::OK();
+            }
             if (!_data_reader.reader) {
                 RETURN_IF_ERROR(create_next_reader(eos));
                 if (!_data_reader.reader) {
@@ -176,23 +163,25 @@ public:
             }
 
             bool current_eof = false;
-            Block current_block;
-            for (const auto& field : _data_reader.block_schema) {
-                // TODO: reuse column's memory
-                current_block.insert({field.type->create_column(), field.type, 
field.name});
-            }
+            _data_reader.block_template.clear_column_data();
             size_t current_rows = 0;
-            RETURN_IF_ERROR(
-                    _data_reader.reader->get_block(&current_block, 
&current_rows, &current_eof));
-            if (current_rows == 0 && !current_eof) {
+            
RETURN_IF_ERROR(_data_reader.reader->get_block(&_data_reader.block_template,
+                                                           &current_rows, 
&current_eof));
+            if (current_rows == 0) {
+                if (current_eof) {
+                    RETURN_IF_ERROR(close_current_reader());
+                }
                 continue;
             }
+            DCHECK_EQ(_data_reader.block_template.columns(), 
_data_reader.scan_schema.size());
 
+            DORIS_CHECK(block->columns() == 
_data_reader.column_mapper.mappings().size());
             size_t idx = 0;
             for (const auto& mapping : _data_reader.column_mapper.mappings()) {
-                int res_id;
-                RETURN_IF_ERROR(mapping.projection->execute(&current_block, 
&res_id));
-                block->replace_by_position(idx, 
current_block.get_columns()[res_id]);
+                ColumnPtr column;
+                RETURN_IF_ERROR(_materialize_mapping_column(
+                        mapping, &_data_reader.block_template, current_rows, 
&column));
+                block->replace_by_position(idx, std::move(column));
                 idx++;
             }
             RETURN_IF_ERROR(finalize_chunk(block));
@@ -200,8 +189,8 @@ public:
             if (current_eof) {
                 RETURN_IF_ERROR(close_current_reader());
             }
+            return Status::OK();
         }
-        return Status::OK();
     }
 
     // 关闭 table reader 及当前正在读取的底层 reader。
@@ -219,20 +208,7 @@ protected:
     }
     // 切换到下一个 reader 的通用流程。
     // 该方法先关闭当前 reader,再打开下一个具体 reader;子类不应重复实现这个循环。
-    Status create_next_reader(bool* eos) {
-        // 多文件切换的公共流程留在基类:关闭当前 reader,然后打开下一个 reader。
-        DCHECK(_data_reader.reader == nullptr);
-        // TODO: 创建_data_reader
-        // _data_reader = std::make_unique<FileReader>(...);
-        if (!_data_reader.reader) {
-            if (eos != nullptr) {
-                *eos = true;
-            }
-            return Status::OK();
-        }
-        RETURN_IF_ERROR(open_reader());
-        return Status::OK();
-    }
+    Status create_next_reader(bool* eos);
 
     // 打开当前具体 reader。
     // 子类在这里基于当前 split/task 初始化底层 FileReader。
@@ -240,13 +216,29 @@ protected:
         std::vector<SchemaField> file_schema;
         RETURN_IF_ERROR(_data_reader.reader->get_schema(&file_schema));
         _data_reader.block_schema = file_schema;
-        
RETURN_IF_ERROR(_data_reader.column_mapper.create_mapping(_options.projected_columns,
+        
RETURN_IF_ERROR(_data_reader.column_mapper.create_mapping(_projected_columns,
                                                                   
_partition_values, file_schema));
+        DORIS_CHECK(_data_reader.column_mapper.mappings().size() == 
_projected_columns.size());
 
         auto file_request = std::make_unique<FileScanRequest>();
         RETURN_IF_ERROR(_data_reader.column_mapper.create_scan_request(
-                _table_filters, _options.projected_columns, 
file_request.get()));
+                _table_filters, _projected_columns, file_request.get()));
+        _data_reader.scan_schema.clear();
+        _data_reader.block_template.clear();
+        _data_reader.scan_schema.resize(file_request->column_positions.size());
+        for (const auto& [file_column_id, block_position] : 
file_request->column_positions) {
+            DORIS_CHECK(block_position < _data_reader.scan_schema.size());
+            const auto* field = _find_schema_field(_data_reader.block_schema, 
file_column_id);
+            DORIS_CHECK(field != nullptr);
+            _data_reader.scan_schema[block_position] = *field;
+        }
+        _data_reader.block_template.reserve(_data_reader.scan_schema.size());
+        for (const auto& field : _data_reader.scan_schema) {
+            _data_reader.block_template.insert(
+                    {field.type->create_column(), field.type, field.name});
+        }
         RETURN_IF_ERROR(_data_reader.reader->open(file_request));
+        RETURN_IF_ERROR(_open_mapping_exprs());
         return Status::OK();
     }
 
@@ -257,6 +249,9 @@ protected:
         _data_reader.reader.reset();
         _data_reader.column_mapper.clear();
         _data_reader.block_schema.clear();
+        _data_reader.scan_schema.clear();
+        _data_reader.block_template.clear();
+        _current_task.reset();
         return Status::OK();
     }
 
@@ -272,28 +267,73 @@ protected:
         return Status::OK();
     }
 
+    Status _materialize_mapping_column(const ColumnMapping& mapping, Block* 
current_block,
+                                       size_t current_rows, ColumnPtr* column) 
{
+        if (mapping.projection != nullptr) {
+            int res_id;
+            RETURN_IF_ERROR(mapping.projection->execute(current_block, 
&res_id));
+            *column = current_block->get_columns()[res_id];
+            return Status::OK();
+        }
+        if (mapping.default_expr != nullptr) {
+            int res_id;
+            RETURN_IF_ERROR(mapping.default_expr->execute(current_block, 
&res_id));
+            *column = current_block->get_columns()[res_id];
+            return Status::OK();
+        }
+        *column = 
mapping.table_type->create_column_const_with_default_value(current_rows);
+        return Status::OK();
+    }
+
+    Status _open_mapping_exprs() {
+        RowDescriptor row_desc;
+        for (const auto& mapping : _data_reader.column_mapper.mappings()) {
+            if (mapping.projection != nullptr) {
+                RETURN_IF_ERROR(mapping.projection->prepare(_runtime_state, 
row_desc));
+                RETURN_IF_ERROR(mapping.projection->open(_runtime_state));
+            }
+            if (mapping.default_expr != nullptr) {
+                RETURN_IF_ERROR(mapping.default_expr->prepare(_runtime_state, 
row_desc));
+                RETURN_IF_ERROR(mapping.default_expr->open(_runtime_state));
+            }
+        }
+        return Status::OK();
+    }
+
     struct DataReader {
         std::unique_ptr<FileReader> reader;
         TableColumnMapper column_mapper;
         std::vector<SchemaField> block_schema;
+        std::vector<SchemaField> scan_schema;
+        Block block_template;
     };
     DataReader _data_reader;
-    TableReadOptions _options;
-    std::vector<std::unique_ptr<ScanTask>> _scan_tasks;
+    std::vector<TableColumn> _projected_columns;
+    std::unique_ptr<ScanTask> _current_task;
+    std::shared_ptr<io::FileSystemProperties> _system_properties;
     // partition key -> value
     std::map<std::string, Field> _partition_values;
-    size_t _next_task_idx = 0;
     std::map<int32_t, TableFilter> _table_filters;
     std::unique_ptr<ReadProfile> _profile;
     // Parsed from DELETION_VECTOR in Iceberg and Paimon
     DeleteRows* _delete_rows;
     TFileScanRangeParams* _scan_params;
-    io::IOContext* _io_ctx;
+    std::shared_ptr<io::IOContext> _io_ctx;
     RuntimeState* _runtime_state;
     RuntimeProfile* _scanner_profile;
     FileFormat _format;
 
 private:
+    static const SchemaField* _find_schema_field(const 
std::vector<SchemaField>& schema,
+                                                 ColumnId column_id) {
+        for (const auto& field : schema) {
+            if (field.id == column_id) {
+                return &field;
+            }
+        }
+        return nullptr;
+    }
+
     Status _parse_delete_predicates(const SplitReadOptions& options);
 };
 
diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp 
b/be/test/format/new_parquet/parquet_reader_test.cpp
index 2086cd8cfb7..e805243ca22 100644
--- a/be/test/format/new_parquet/parquet_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_reader_test.cpp
@@ -98,10 +98,10 @@ Block build_file_block(const 
std::vector<reader::SchemaField>& schema) {
 
 class TestFileReader final : public reader::FileReader {
 public:
-    TestFileReader(std::unique_ptr<io::FileSystemProperties>& 
system_properties,
+    TestFileReader(std::shared_ptr<io::FileSystemProperties>& 
system_properties,
                    std::unique_ptr<io::FileDescription>& file_description,
                    std::shared_ptr<io::IOContext> io_ctx)
-            : reader::FileReader(system_properties, file_description, 
std::move(io_ctx), nullptr) {}
+            : reader::FileReader(system_properties, file_description, io_ctx, 
nullptr) {}
 
     Status get_schema(std::vector<reader::SchemaField>* file_schema) const 
override {
         file_schema->clear();
@@ -119,7 +119,7 @@ public:
 };
 
 TEST(FileReaderTest, OpenStoresRequestAndCloseClearsState) {
-    auto system_properties = std::make_unique<io::FileSystemProperties>();
+    auto system_properties = std::make_shared<io::FileSystemProperties>();
     system_properties->system_type = TFileType::FILE_LOCAL;
     auto file_description = std::make_unique<io::FileDescription>();
     auto io_ctx = std::make_shared<io::IOContext>();
@@ -149,7 +149,7 @@ protected:
     void TearDown() override { std::filesystem::remove_all(_test_dir); }
 
     std::unique_ptr<parquet::ParquetReader> create_reader() const {
-        auto system_properties = std::make_unique<io::FileSystemProperties>();
+        auto system_properties = std::make_shared<io::FileSystemProperties>();
         system_properties->system_type = TFileType::FILE_LOCAL;
         auto file_description = std::make_unique<io::FileDescription>();
         file_description->path = _file_path;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to