This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/refact_reader_branch by this
push:
new 5dc54d878a2 [improvement](be) Reuse table reader file block (#63704)
5dc54d878a2 is described below
commit 5dc54d878a2ae257797fecb7400653587bdd0a54
Author: Gabriel <[email protected]>
AuthorDate: Wed May 27 09:14:47 2026 +0800
[improvement](be) Reuse table reader file block (#63704)
---
be/src/exec/scan/file_scanner.h | 2 +-
be/src/format/new_parquet/parquet_reader.cpp | 49 +++++--
be/src/format/new_parquet/parquet_reader.h | 2 +-
be/src/format/reader/column_mapper.cpp | 54 ++++++--
be/src/format/reader/column_mapper.h | 1 +
be/src/format/reader/file_reader.h | 7 +-
be/src/format/reader/table_reader.cpp | 81 ++++++++++-
be/src/format/reader/table_reader.h | 154 +++++++++++++--------
be/test/format/new_parquet/parquet_reader_test.cpp | 8 +-
9 files changed, 264 insertions(+), 94 deletions(-)
diff --git a/be/src/exec/scan/file_scanner.h b/be/src/exec/scan/file_scanner.h
index 7c3d9d08b6a..34f59cdee32 100644
--- a/be/src/exec/scan/file_scanner.h
+++ b/be/src/exec/scan/file_scanner.h
@@ -189,7 +189,7 @@ protected:
std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
std::unique_ptr<io::FileReaderStats> _file_reader_stats;
- std::unique_ptr<io::IOContext> _io_ctx;
+ std::shared_ptr<io::IOContext> _io_ctx;
// Whether to fill partition columns from path, default is true.
std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>
diff --git a/be/src/format/new_parquet/parquet_reader.cpp
b/be/src/format/new_parquet/parquet_reader.cpp
index fc00484758e..7f442808523 100644
--- a/be/src/format/new_parquet/parquet_reader.cpp
+++ b/be/src/format/new_parquet/parquet_reader.cpp
@@ -208,10 +208,11 @@ Status ParquetReader::_read_filter_columns(int64_t
batch_rows, Block* file_block
for (size_t filter_idx = 0; filter_idx <
_request->predicate_columns.size(); ++filter_idx) {
const int file_field_id = _request->predicate_columns[filter_idx];
auto& column_reader = _state->current_predicate_columns[filter_idx];
- auto column =
file_block->get_by_position(column_reader->file_column_id())
- .column->assume_mutable();
- DCHECK_EQ(file_block->get_by_position(column_reader->file_column_id())
- .type->get_primitive_type(),
+ auto position_it = _request->column_positions.find(file_field_id);
+ DORIS_CHECK(position_it != _request->column_positions.end());
+ const auto block_position = position_it->second;
+ auto column =
file_block->get_by_position(block_position).column->assume_mutable();
+
DCHECK_EQ(file_block->get_by_position(block_position).type->get_primitive_type(),
column_reader->type()->get_primitive_type());
int64_t column_rows = 0;
RETURN_IF_ERROR(column_reader->read(batch_rows, column, &column_rows));
@@ -236,7 +237,7 @@ Status ParquetReader::_read_filter_columns(int64_t
batch_rows, Block* file_block
}
break;
}
- file_block->replace_by_position(file_field_id, std::move(column));
+ file_block->replace_by_position(block_position, std::move(column));
if (*selected_rows == 0) {
break;
}
@@ -313,7 +314,7 @@ Status ParquetReader::_open_next_row_group(bool*
has_row_group) {
return Status::OK();
}
-// `file_block` has a complete struct derived from the file's schema.
+// `file_block` has the same layout as FileScanRequest::column_positions.
Status ParquetReader::_read_current_row_group_batch(int64_t batch_rows, Block*
file_block,
size_t* rows) {
if (_state->current_predicate_columns.empty() &&
@@ -331,9 +332,12 @@ Status
ParquetReader::_read_current_row_group_batch(int64_t batch_rows, Block* f
if (need_filter_output) {
IColumn::Filter output_filter = _selection_to_filter(selection,
selected_rows, batch_rows);
for (const auto file_field_id : _request->predicate_columns) {
+ auto position_it = _request->column_positions.find(file_field_id);
+ DORIS_CHECK(position_it != _request->column_positions.end());
+ const auto block_position = position_it->second;
RETURN_IF_CATCH_EXCEPTION(file_block->replace_by_position(
- file_field_id, file_block->get_by_position(file_field_id)
- .column->filter(output_filter,
selected_rows)));
+ block_position, file_block->get_by_position(block_position)
+ .column->filter(output_filter,
selected_rows)));
}
}
@@ -341,9 +345,12 @@ Status
ParquetReader::_read_current_row_group_batch(int64_t batch_rows, Block* f
for (size_t output_idx = 0; output_idx <
_state->current_non_predicate_columns.size();
++output_idx) {
auto& column_reader =
_state->current_non_predicate_columns[output_idx];
- auto col =
file_block->get_columns()[column_reader->file_column_id()]->assume_mutable();
- DCHECK_EQ(file_block->get_by_position(column_reader->file_column_id())
- .type->get_primitive_type(),
+ auto position_it =
+
_request->column_positions.find(_request->non_predicate_columns[output_idx]);
+ DORIS_CHECK(position_it != _request->column_positions.end());
+ const auto block_position = position_it->second;
+ auto col = file_block->get_columns()[block_position]->assume_mutable();
+
DCHECK_EQ(file_block->get_by_position(block_position).type->get_primitive_type(),
column_reader->type()->get_primitive_type());
if (need_filter_output) {
[[maybe_unused]] auto old_size = col->size();
@@ -368,7 +375,7 @@ Status ParquetReader::_read_current_row_group_batch(int64_t
batch_rows, Block* f
return Status::OK();
}
-ParquetReader::ParquetReader(std::unique_ptr<io::FileSystemProperties>&
system_properties,
+ParquetReader::ParquetReader(std::shared_ptr<io::FileSystemProperties>&
system_properties,
std::unique_ptr<io::FileDescription>&
file_description,
std::shared_ptr<io::IOContext> io_ctx,
RuntimeProfile* profile)
: FileReader(system_properties, file_description, io_ctx, profile) {}
@@ -424,7 +431,25 @@ Status
ParquetReader::open(std::unique_ptr<reader::FileScanRequest>& request) {
}
RETURN_IF_ERROR(reader::FileReader::open(request));
+ // `_request->column_positions.empty()` means all columns are needed by
table reader
+ if (_request->column_positions.empty()) {
+ for (const auto file_column_id : _request->predicate_columns) {
+ _request->column_positions.emplace(file_column_id, file_column_id);
+ }
+ for (const auto file_column_id : _request->non_predicate_columns) {
+ _request->column_positions.emplace(file_column_id, file_column_id);
+ }
+ }
+
const int num_fields = static_cast<int>(_state->file_schema.size());
+ for (const auto file_column_id : _request->predicate_columns) {
+ DORIS_CHECK(_request->column_positions.count(file_column_id) > 0);
+ DORIS_CHECK(file_column_id >= 0 && file_column_id < num_fields);
+ }
+ for (const auto file_column_id : _request->non_predicate_columns) {
+ DORIS_CHECK(_request->column_positions.count(file_column_id) > 0);
+ DORIS_CHECK(file_column_id >= 0 && file_column_id < num_fields);
+ }
for (const auto& local_filter : _request->local_filters) {
if (local_filter.file_column_id < 0 || local_filter.file_column_id >=
num_fields) {
return Status::InvalidArgument("Invalid parquet filter top-level
field id {}",
diff --git a/be/src/format/new_parquet/parquet_reader.h
b/be/src/format/new_parquet/parquet_reader.h
index 426960a4dfd..6920f8c4d78 100644
--- a/be/src/format/new_parquet/parquet_reader.h
+++ b/be/src/format/new_parquet/parquet_reader.h
@@ -45,7 +45,7 @@ struct ParquetScanRequest : public reader::FileScanRequest {};
// schema,不处理 table-level cast/default/generated/partition 语义。
class ParquetReader : public reader::FileReader {
public:
- ParquetReader(std::unique_ptr<io::FileSystemProperties>& system_properties,
+ ParquetReader(std::shared_ptr<io::FileSystemProperties>& system_properties,
std::unique_ptr<io::FileDescription>& file_description,
std::shared_ptr<io::IOContext> io_ctx, RuntimeProfile*
profile);
~ParquetReader() override;
diff --git a/be/src/format/reader/column_mapper.cpp
b/be/src/format/reader/column_mapper.cpp
index 0eed9d3e566..b2453dbbfaf 100644
--- a/be/src/format/reader/column_mapper.cpp
+++ b/be/src/format/reader/column_mapper.cpp
@@ -17,6 +17,7 @@
#include "format/reader/column_mapper.h"
+#include <cstddef>
#include <vector>
#include "common/status.h"
@@ -30,6 +31,31 @@ namespace doris::reader {
static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id";
static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER =
"_last_updated_sequence_number";
+static void add_scan_column(FileScanRequest* file_request, ColumnId
file_column_id,
+ std::vector<ColumnId>* scan_columns) {
+ if (file_request->column_positions.count(file_column_id) == 0) {
+ file_request->column_positions.emplace(file_column_id,
+
file_request->column_positions.size());
+ scan_columns->push_back(file_column_id);
+ }
+}
+
+static void rebuild_projection(ColumnMapping* mapping, size_t block_position) {
+ DORIS_CHECK(mapping->file_column_id.has_value());
+ if (mapping->is_trivial) {
+ mapping->projection =
VExprContext::create_shared(TableSlotRef::create_shared(
+ cast_set<int>(block_position), cast_set<int>(block_position),
-1,
+ mapping->file_type, mapping->file_column_name));
+ return;
+ }
+
+ auto expr = Cast::create_shared(mapping->table_type);
+ expr->add_child(TableSlotRef::create_shared(cast_set<int>(block_position),
+ cast_set<int>(block_position),
-1,
+ mapping->file_type,
mapping->file_column_name));
+ mapping->projection = VExprContext::create_shared(expr);
+}
+
Status TableColumnMapper::create_mapping(const std::vector<TableColumn>&
projected_columns,
const std::map<std::string, Field>&
partition_values,
const std::vector<SchemaField>&
file_schema) {
@@ -40,21 +66,9 @@ Status TableColumnMapper::create_mapping(const
std::vector<TableColumn>& project
mapping.table_type = table_column.type;
if (const auto* file_field = _find_file_field(table_column,
file_schema)) {
mapping.file_column_id = file_field->id;
+ mapping.file_column_name = file_field->name;
mapping.file_type = file_field->type;
mapping.is_trivial = _is_same_type(mapping.table_type,
mapping.file_type);
- if (!mapping.is_trivial) {
- // 1. Data type mismatch (caused by schema evolution) and
casting is needed.
- auto expr = Cast::create_shared(mapping.table_type);
-
expr->add_child(TableSlotRef::create_shared(mapping.file_column_id.value(),
-
mapping.file_column_id.value(), -1,
- mapping.file_type,
file_field->name));
- mapping.projection = VExprContext::create_shared(expr);
- } else {
- // 2. Data type matches, trivial mapping.
- mapping.projection =
VExprContext::create_shared(TableSlotRef::create_shared(
- mapping.file_column_id.value(),
mapping.file_column_id.value(), -1,
- mapping.file_type, file_field->name));
- }
} else if (table_column.is_partition_key &&
partition_values.count(table_column.name) > 0) {
// 3. Partition column, use partition value as a constant mapping.
Note that partition column may also have default expression, but partition
value should take precedence if it exists.
mapping.default_expr =
VExprContext::create_shared(TableLiteral::create_shared(
@@ -91,17 +105,27 @@ Status TableColumnMapper::create_scan_request(const
std::map<int32_t, TableFilte
// 真实实现会把 table projection/filter 转换成 file-local projection/filter。
file_request->predicate_columns.clear();
file_request->non_predicate_columns.clear();
+ file_request->column_positions.clear();
file_request->local_filters.clear();
file_request->reader_expression_map.clear();
for (const auto& table_column : projected_columns) {
const auto* mapping = _find_mapping(table_column.id);
if (mapping != nullptr && mapping->file_column_id.has_value()) {
if (table_filters.count(table_column.id) == 0) {
-
file_request->non_predicate_columns.push_back(*mapping->file_column_id);
+ add_scan_column(file_request, *mapping->file_column_id,
+ &file_request->non_predicate_columns);
}
}
}
RETURN_IF_ERROR(localize_filters(table_filters, file_request));
+ for (auto& mapping : _mappings) {
+ if (!mapping.file_column_id.has_value()) {
+ continue;
+ }
+ auto position_it =
file_request->column_positions.find(*mapping.file_column_id);
+ DORIS_CHECK(position_it != file_request->column_positions.end());
+ rebuild_projection(&mapping, position_it->second);
+ }
return Status::OK();
}
@@ -124,7 +148,7 @@ Status TableColumnMapper::localize_filters(const
std::map<int32_t, TableFilter>&
local_filter.predicates = it.second.predicates;
file_request->local_filters.push_back(std::move(local_filter));
}
- file_request->predicate_columns.push_back(*mapping->file_column_id);
+ add_scan_column(file_request, *mapping->file_column_id,
&file_request->predicate_columns);
}
return Status::OK();
}
diff --git a/be/src/format/reader/column_mapper.h
b/be/src/format/reader/column_mapper.h
index d0d8076798b..4360b23e7de 100644
--- a/be/src/format/reader/column_mapper.h
+++ b/be/src/format/reader/column_mapper.h
@@ -51,6 +51,7 @@ enum TableVirtualColumnType {
struct ColumnMapping {
int32_t table_column_id = -1;
std::optional<int32_t> file_column_id;
+ std::string file_column_name;
DataTypePtr file_type;
DataTypePtr table_type;
diff --git a/be/src/format/reader/file_reader.h
b/be/src/format/reader/file_reader.h
index 7fbdb9b576c..0b0527535e5 100644
--- a/be/src/format/reader/file_reader.h
+++ b/be/src/format/reader/file_reader.h
@@ -95,6 +95,7 @@ struct FileScanRequest {
std::vector<ColumnId> predicate_columns;
std::vector<ColumnId> non_predicate_columns;
+ std::map<ColumnId, size_t> column_positions;
std::vector<FileLocalFilter> local_filters;
// fallback path if filters cannot be localized to file-local predicates.
The expression can reference projected_file_columns and partition columns.
std::vector<std::pair<ColumnId, VExprContextSPtr>> reader_expression_map;
@@ -136,10 +137,10 @@ public:
int64_t bloom_filter_read_time = 0;
};
- FileReader(std::unique_ptr<io::FileSystemProperties>& system_properties,
+ FileReader(std::shared_ptr<io::FileSystemProperties>& system_properties,
std::unique_ptr<io::FileDescription>& file_description,
std::shared_ptr<io::IOContext> io_ctx, RuntimeProfile* profile)
- : _system_properties(std::move(system_properties)),
+ : _system_properties(system_properties),
_file_description(std::move(file_description)),
_io_ctx(io_ctx),
_profile(profile) {}
@@ -196,7 +197,7 @@ protected:
std::unique_ptr<FileScanRequest> _request;
bool _eof = true;
ReaderStatistics _reader_statistics;
- std::unique_ptr<io::FileSystemProperties> _system_properties;
+ std::shared_ptr<io::FileSystemProperties> _system_properties;
std::unique_ptr<io::FileDescription> _file_description;
std::shared_ptr<io::IOContext> _io_ctx;
RuntimeProfile* _profile = nullptr;
diff --git a/be/src/format/reader/table_reader.cpp
b/be/src/format/reader/table_reader.cpp
index b89641c0bd2..13f093228e6 100644
--- a/be/src/format/reader/table_reader.cpp
+++ b/be/src/format/reader/table_reader.cpp
@@ -17,16 +17,95 @@
#include "format/reader/table_reader.h"
+#include <gen_cpp/PlanNodes_types.h>
+#include <gen_cpp/Types_types.h>
+
#include <vector>
#include "common/status.h"
+#include "format/new_parquet/parquet_reader.h"
#include "format/reader/column_mapper.h"
#include "format/table/deletion_vector_reader.h"
+#include "io/io_common.h"
namespace doris::reader {
+std::shared_ptr<io::FileSystemProperties> create_system_properties(
+ const TFileScanRangeParams* scan_params) {
+ auto system_properties = std::make_shared<io::FileSystemProperties>();
+ if (scan_params == nullptr || !scan_params->__isset.file_type) {
+ system_properties->system_type = TFileType::FILE_LOCAL;
+ return system_properties;
+ }
+ system_properties->system_type = scan_params->file_type;
+ system_properties->properties = scan_params->properties;
+ system_properties->hdfs_params = scan_params->hdfs_params;
+ if (scan_params->__isset.broker_addresses) {
+
system_properties->broker_addresses.assign(scan_params->broker_addresses.begin(),
+
scan_params->broker_addresses.end());
+ }
+ return system_properties;
+}
+
+Status TableReader::init(TableReadOptions options) {
+ _scan_params = options.scan_params;
+ _format = options.format;
+ _io_ctx = options.io_ctx;
+ _runtime_state = options.runtime_state;
+ _scanner_profile = options.scanner_profile;
+ _projected_columns = std::move(options.projected_columns);
+ _system_properties = create_system_properties(_scan_params);
+ _profile = std::move(options.profile);
+ TableColumnMapperOptions mapper_options;
+ mapper_options.mode = TableColumnMappingMode::BY_FIELD_ID;
+ _data_reader.column_mapper = TableColumnMapper(mapper_options);
+ // TODO:
+ // _table_filters = build_table_filters_from_conjuncts(options.conjuncts);
+ return Status::OK();
+}
+
+Status TableReader::create_next_reader(bool* eos) {
+ DCHECK(_data_reader.reader == nullptr);
+ if (_current_task == nullptr) {
+ *eos = true;
+ return Status::OK();
+ }
+
+ switch (_format) {
+ case FileFormat::PARQUET: {
+ _data_reader.reader = std::make_unique<parquet::ParquetReader>(
+ _system_properties, _current_task->data_file, _io_ctx,
_scanner_profile);
+ break;
+ }
+ case FileFormat::ORC:
+ case FileFormat::CSV:
+ return Status::NotSupported("TableReader does not support file format
{}",
+ static_cast<int>(_format));
+ }
+
+ RETURN_IF_ERROR(_data_reader.reader->init(_runtime_state));
+ RETURN_IF_ERROR(open_reader());
+ *eos = false;
+ return Status::OK();
+}
+
+std::unique_ptr<io::FileDescription> create_file_description(const
TFileRangeDesc& range) {
+ auto file_description = std::make_unique<io::FileDescription>();
+ file_description->path = range.path;
+ file_description->file_size = range.__isset.file_size ? range.file_size :
-1;
+ if (range.__isset.fs_name) {
+ file_description->fs_name = range.fs_name;
+ }
+ if (range.__isset.file_cache_admission) {
+ file_description->file_cache_admission = range.file_cache_admission;
+ }
+ return file_description;
+}
+
Status TableReader::prepare_split(const SplitReadOptions& options) {
_partition_values = std::move(options.partition_values);
+ _current_task = std::make_unique<ScanTask>();
+ _current_task->data_file = create_file_description(options.current_range);
return _parse_delete_predicates(options);
}
@@ -39,7 +118,7 @@ Status TableReader::_parse_delete_predicates(const
SplitReadOptions& options) {
auto* delete_rows = new DeleteRows;
DeletionVectorReader dv_reader(_runtime_state, _scanner_profile,
*_scan_params, desc,
- _io_ctx);
+ _io_ctx.get());
create_status = dv_reader.open();
if (!create_status.ok()) [[unlikely]] {
return nullptr;
diff --git a/be/src/format/reader/table_reader.h
b/be/src/format/reader/table_reader.h
index 7572383b8ad..53791747faf 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -19,10 +19,8 @@
#include <bvar/status.h>
-#include <cstddef>
-#include <cstdint>
+#include <map>
#include <memory>
-#include <optional>
#include <string>
#include <utility>
#include <vector>
@@ -34,8 +32,8 @@
#include "exprs/vexpr_fwd.h"
#include "format/reader/column_mapper.h"
#include "format/reader/expr/delete_predicate.h"
-#include "format/reader/expr/literal.h"
#include "format/reader/file_reader.h"
+#include "runtime/descriptors.h"
namespace doris {
class Block;
@@ -91,7 +89,7 @@ struct BaseDataFile {
struct ScanTask {
virtual ~ScanTask() = default;
- std::unique_ptr<BaseDataFile> data_file;
+ std::unique_ptr<io::FileDescription> data_file;
};
struct ReadProfile {
@@ -106,11 +104,9 @@ struct TableReadOptions {
const VExprContext conjuncts;
const FileFormat format;
TFileScanRangeParams* scan_params;
- io::IOContext* io_ctx;
+ std::shared_ptr<io::IOContext> io_ctx;
RuntimeState* runtime_state;
RuntimeProfile* scanner_profile;
- // Each task denotes a descriptor of a single file to read, along with
file-level metadata such as stats and delete files.
- std::vector<std::unique_ptr<ScanTask>> scan_tasks;
std::unique_ptr<ReadProfile> profile;
};
@@ -130,22 +126,7 @@ public:
// 初始化 table reader 的通用运行参数。
// 子类可以在自己的 init(options) 中调用该方法;这里不接收具体表格式 schema/task。
- virtual Status init(TableReadOptions options) {
- _scan_params = options.scan_params;
- _format = options.format;
- _io_ctx = options.io_ctx;
- _runtime_state = options.runtime_state;
- _scanner_profile = options.scanner_profile;
- _scan_tasks = std::move(_options.scan_tasks);
- _next_task_idx = 0;
- _profile = std::move(options.profile);
- TableColumnMapperOptions mapper_options;
- mapper_options.mode = TableColumnMappingMode::BY_FIELD_ID;
- _data_reader.column_mapper = TableColumnMapper(mapper_options);
- // TODO:
- // _table_filters =
build_table_filters_from_conjuncts(options.conjuncts);
- return Status::OK();
- }
+ virtual Status init(TableReadOptions options);
// 读取当前 split/partition 之前初始化。
virtual Status prepare_split(const SplitReadOptions& options);
@@ -166,7 +147,13 @@ public:
// 基类负责 current reader 的打开、EOF 后切换和关闭;子类只实现 protected hook。
// table_block 的列必须已经是 table/global schema 语义。
Status get_block(Block* block, bool* eos) {
- while (block->empty() && !*eos) {
+ DORIS_CHECK(block->columns() == _projected_columns.size());
+ block->clear_column_data(_projected_columns.size());
+
+ while (true) {
+ if (*eos) {
+ return Status::OK();
+ }
if (!_data_reader.reader) {
RETURN_IF_ERROR(create_next_reader(eos));
if (!_data_reader.reader) {
@@ -176,23 +163,25 @@ public:
}
bool current_eof = false;
- Block current_block;
- for (const auto& field : _data_reader.block_schema) {
- // TODO: reuse column's memory
- current_block.insert({field.type->create_column(), field.type,
field.name});
- }
+ _data_reader.block_template.clear_column_data();
size_t current_rows = 0;
- RETURN_IF_ERROR(
- _data_reader.reader->get_block(¤t_block,
¤t_rows, ¤t_eof));
- if (current_rows == 0 && !current_eof) {
+
RETURN_IF_ERROR(_data_reader.reader->get_block(&_data_reader.block_template,
+ ¤t_rows,
¤t_eof));
+ if (current_rows == 0) {
+ if (current_eof) {
+ RETURN_IF_ERROR(close_current_reader());
+ }
continue;
}
+ DCHECK_EQ(_data_reader.block_template.columns(),
_data_reader.scan_schema.size());
+ DORIS_CHECK(block->columns() ==
_data_reader.column_mapper.mappings().size());
size_t idx = 0;
for (const auto& mapping : _data_reader.column_mapper.mappings()) {
- int res_id;
- RETURN_IF_ERROR(mapping.projection->execute(¤t_block,
&res_id));
- block->replace_by_position(idx,
current_block.get_columns()[res_id]);
+ ColumnPtr column;
+ RETURN_IF_ERROR(_materialize_mapping_column(
+ mapping, &_data_reader.block_template, current_rows,
&column));
+ block->replace_by_position(idx, std::move(column));
idx++;
}
RETURN_IF_ERROR(finalize_chunk(block));
@@ -200,8 +189,8 @@ public:
if (current_eof) {
RETURN_IF_ERROR(close_current_reader());
}
+ return Status::OK();
}
- return Status::OK();
}
// 关闭 table reader 及当前正在读取的底层 reader。
@@ -219,20 +208,7 @@ protected:
}
// 切换到下一个 reader 的通用流程。
// 该方法先关闭当前 reader,再打开下一个具体 reader;子类不应重复实现这个循环。
- Status create_next_reader(bool* eos) {
- // 多文件切换的公共流程留在基类:关闭当前 reader,然后打开下一个 reader。
- DCHECK(_data_reader.reader == nullptr);
- // TODO: 创建_data_reader
- // _data_reader = std::make_unique<FileReader>(...);
- if (!_data_reader.reader) {
- if (eos != nullptr) {
- *eos = true;
- }
- return Status::OK();
- }
- RETURN_IF_ERROR(open_reader());
- return Status::OK();
- }
+ Status create_next_reader(bool* eos);
// 打开当前具体 reader。
// 子类在这里基于当前 split/task 初始化底层 FileReader。
@@ -240,13 +216,29 @@ protected:
std::vector<SchemaField> file_schema;
RETURN_IF_ERROR(_data_reader.reader->get_schema(&file_schema));
_data_reader.block_schema = file_schema;
-
RETURN_IF_ERROR(_data_reader.column_mapper.create_mapping(_options.projected_columns,
+
RETURN_IF_ERROR(_data_reader.column_mapper.create_mapping(_projected_columns,
_partition_values, file_schema));
+ DORIS_CHECK(_data_reader.column_mapper.mappings().size() ==
_projected_columns.size());
auto file_request = std::make_unique<FileScanRequest>();
RETURN_IF_ERROR(_data_reader.column_mapper.create_scan_request(
- _table_filters, _options.projected_columns,
file_request.get()));
+ _table_filters, _projected_columns, file_request.get()));
+ _data_reader.scan_schema.clear();
+ _data_reader.block_template.clear();
+ _data_reader.scan_schema.resize(file_request->column_positions.size());
+ for (const auto& [file_column_id, block_position] :
file_request->column_positions) {
+ DORIS_CHECK(block_position < _data_reader.scan_schema.size());
+ const auto* field = _find_schema_field(_data_reader.block_schema,
file_column_id);
+ DORIS_CHECK(field != nullptr);
+ _data_reader.scan_schema[block_position] = *field;
+ }
+ _data_reader.block_template.reserve(_data_reader.scan_schema.size());
+ for (const auto& field : _data_reader.scan_schema) {
+ _data_reader.block_template.insert(
+ {field.type->create_column(), field.type, field.name});
+ }
RETURN_IF_ERROR(_data_reader.reader->open(file_request));
+ RETURN_IF_ERROR(_open_mapping_exprs());
return Status::OK();
}
@@ -257,6 +249,9 @@ protected:
_data_reader.reader.reset();
_data_reader.column_mapper.clear();
_data_reader.block_schema.clear();
+ _data_reader.scan_schema.clear();
+ _data_reader.block_template.clear();
+ _current_task.reset();
return Status::OK();
}
@@ -272,28 +267,73 @@ protected:
return Status::OK();
}
+ Status _materialize_mapping_column(const ColumnMapping& mapping, Block*
current_block,
+ size_t current_rows, ColumnPtr* column)
{
+ if (mapping.projection != nullptr) {
+ int res_id;
+ RETURN_IF_ERROR(mapping.projection->execute(current_block,
&res_id));
+ *column = current_block->get_columns()[res_id];
+ return Status::OK();
+ }
+ if (mapping.default_expr != nullptr) {
+ int res_id;
+ RETURN_IF_ERROR(mapping.default_expr->execute(current_block,
&res_id));
+ *column = current_block->get_columns()[res_id];
+ return Status::OK();
+ }
+ *column =
mapping.table_type->create_column_const_with_default_value(current_rows);
+ return Status::OK();
+ }
+
+ Status _open_mapping_exprs() {
+ RowDescriptor row_desc;
+ for (const auto& mapping : _data_reader.column_mapper.mappings()) {
+ if (mapping.projection != nullptr) {
+ RETURN_IF_ERROR(mapping.projection->prepare(_runtime_state,
row_desc));
+ RETURN_IF_ERROR(mapping.projection->open(_runtime_state));
+ }
+ if (mapping.default_expr != nullptr) {
+ RETURN_IF_ERROR(mapping.default_expr->prepare(_runtime_state,
row_desc));
+ RETURN_IF_ERROR(mapping.default_expr->open(_runtime_state));
+ }
+ }
+ return Status::OK();
+ }
+
struct DataReader {
std::unique_ptr<FileReader> reader;
TableColumnMapper column_mapper;
std::vector<SchemaField> block_schema;
+ std::vector<SchemaField> scan_schema;
+ Block block_template;
};
DataReader _data_reader;
- TableReadOptions _options;
- std::vector<std::unique_ptr<ScanTask>> _scan_tasks;
+ std::vector<TableColumn> _projected_columns;
+ std::unique_ptr<ScanTask> _current_task;
+ std::shared_ptr<io::FileSystemProperties> _system_properties;
// partition key -> value
std::map<std::string, Field> _partition_values;
- size_t _next_task_idx = 0;
std::map<int32_t, TableFilter> _table_filters;
std::unique_ptr<ReadProfile> _profile;
// Parsed from DELETION_VECTOR in Iceberg and Paimon
DeleteRows* _delete_rows;
TFileScanRangeParams* _scan_params;
- io::IOContext* _io_ctx;
+ std::shared_ptr<io::IOContext> _io_ctx;
RuntimeState* _runtime_state;
RuntimeProfile* _scanner_profile;
FileFormat _format;
private:
+ static const SchemaField* _find_schema_field(const
std::vector<SchemaField>& schema,
+ ColumnId column_id) {
+ for (const auto& field : schema) {
+ if (field.id == column_id) {
+ return &field;
+ }
+ }
+ return nullptr;
+ }
+
Status _parse_delete_predicates(const SplitReadOptions& options);
};
diff --git a/be/test/format/new_parquet/parquet_reader_test.cpp
b/be/test/format/new_parquet/parquet_reader_test.cpp
index 2086cd8cfb7..e805243ca22 100644
--- a/be/test/format/new_parquet/parquet_reader_test.cpp
+++ b/be/test/format/new_parquet/parquet_reader_test.cpp
@@ -98,10 +98,10 @@ Block build_file_block(const
std::vector<reader::SchemaField>& schema) {
class TestFileReader final : public reader::FileReader {
public:
- TestFileReader(std::unique_ptr<io::FileSystemProperties>&
system_properties,
+ TestFileReader(std::shared_ptr<io::FileSystemProperties>&
system_properties,
std::unique_ptr<io::FileDescription>& file_description,
std::shared_ptr<io::IOContext> io_ctx)
- : reader::FileReader(system_properties, file_description,
std::move(io_ctx), nullptr) {}
+ : reader::FileReader(system_properties, file_description, io_ctx,
nullptr) {}
Status get_schema(std::vector<reader::SchemaField>* file_schema) const
override {
file_schema->clear();
@@ -119,7 +119,7 @@ public:
};
TEST(FileReaderTest, OpenStoresRequestAndCloseClearsState) {
- auto system_properties = std::make_unique<io::FileSystemProperties>();
+ auto system_properties = std::make_shared<io::FileSystemProperties>();
system_properties->system_type = TFileType::FILE_LOCAL;
auto file_description = std::make_unique<io::FileDescription>();
auto io_ctx = std::make_shared<io::IOContext>();
@@ -149,7 +149,7 @@ protected:
void TearDown() override { std::filesystem::remove_all(_test_dir); }
std::unique_ptr<parquet::ParquetReader> create_reader() const {
- auto system_properties = std::make_unique<io::FileSystemProperties>();
+ auto system_properties = std::make_shared<io::FileSystemProperties>();
system_properties->system_type = TFileType::FILE_LOCAL;
auto file_description = std::make_unique<io::FileDescription>();
file_description->path = _file_path;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]