This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fee853559ef [fix](be) Use shared IOContext in file scanner readers
(#64033)
fee853559ef is described below
commit fee853559ef1b68c785f7aff320a43f88b861a41
Author: Gabriel <[email protected]>
AuthorDate: Wed Jun 3 10:22:02 2026 +0800
[fix](be) Use shared IOContext in file scanner readers (#64033)
Problem Summary: FileScanner kept passing raw IOContext pointers to
several file readers, so DelegateReader could still create a
shallow-copied IOContext on the hot scan path. That left different
IOContext instances inside the same reader stack and could also
dereference missing child stats pointers when an IOContext existed
without file reader stats. This change keeps FileScanner's IOContext in
a shared holder, passes it through CSV, text, JSON, native, Parquet,
ORC, and table-format reader variants, and makes Native/Parquet/ORC use
the shared DelegateReader API when a holder is available. Tracing/stat
updates now check the nested stats pointer before use.
---
be/src/exec/scan/file_scanner.cpp | 61 +++++++++++------------
be/src/exec/scan/file_scanner.h | 4 +-
be/src/format/arrow/arrow_stream_reader.cpp | 3 +-
be/src/format/csv/csv_reader.cpp | 3 +-
be/src/format/json/new_json_reader.cpp | 3 +-
be/src/format/native/native_reader.cpp | 25 ++++++++--
be/src/format/native/native_reader.h | 6 +++
be/src/format/orc/vorc_reader.cpp | 7 +--
be/src/format/orc/vorc_reader.h | 7 +--
be/src/format/parquet/vparquet_reader.cpp | 21 +++++---
be/src/format/table/hive_reader.h | 19 +++++++
be/src/format/table/hudi_reader.h | 13 +++++
be/src/format/table/iceberg_reader.h | 15 ++++++
be/src/format/table/paimon_reader.h | 21 ++++++++
be/src/format/table/transactional_hive_reader.cpp | 17 +++++++
be/src/format/table/transactional_hive_reader.h | 8 +++
be/src/format/text/text_reader.cpp | 6 ++-
be/src/format/text/text_reader.h | 3 +-
18 files changed, 186 insertions(+), 56 deletions(-)
diff --git a/be/src/exec/scan/file_scanner.cpp
b/be/src/exec/scan/file_scanner.cpp
index c0a79f9f38d..5a0955fb55f 100644
--- a/be/src/exec/scan/file_scanner.cpp
+++ b/be/src/exec/scan/file_scanner.cpp
@@ -1197,9 +1197,9 @@ Status FileScanner::_get_next_reader() {
case TFileFormatType::FORMAT_CSV_DEFLATE:
case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
case TFileFormatType::FORMAT_PROTO: {
- auto reader =
- CsvReader::create_unique(_state, _profile, &_counter,
*_params, range,
- _file_slot_descs,
_state->batch_size(), _io_ctx.get());
+ auto reader = CsvReader::create_unique(_state, _profile,
&_counter, *_params, range,
+ _file_slot_descs,
_state->batch_size(), nullptr,
+ _io_ctx);
CsvInitContext csv_ctx;
_fill_base_init_context(&csv_ctx);
csv_ctx.is_load = _is_load;
@@ -1209,8 +1209,8 @@ Status FileScanner::_get_next_reader() {
}
case TFileFormatType::FORMAT_TEXT: {
auto reader = TextReader::create_unique(_state, _profile,
&_counter, *_params, range,
- _file_slot_descs,
_state->batch_size(),
- _io_ctx.get());
+ _file_slot_descs,
_state->batch_size(), nullptr,
+ _io_ctx);
CsvInitContext text_ctx;
_fill_base_init_context(&text_ctx);
text_ctx.is_load = _is_load;
@@ -1221,7 +1221,7 @@ Status FileScanner::_get_next_reader() {
case TFileFormatType::FORMAT_JSON: {
_cur_reader = NewJsonReader::create_unique(_state, _profile,
&_counter, *_params, range,
_file_slot_descs,
&_scanner_eof,
- _state->batch_size(),
_io_ctx.get());
+ _state->batch_size(),
nullptr, _io_ctx);
JsonInitContext json_ctx;
_fill_base_init_context(&json_ctx);
json_ctx.col_default_value_ctx = &_col_default_value_ctx;
@@ -1239,8 +1239,7 @@ Status FileScanner::_get_next_reader() {
break;
}
case TFileFormatType::FORMAT_NATIVE: {
- auto reader =
- NativeReader::create_unique(_profile, *_params, range,
_io_ctx.get(), _state);
+ auto reader = NativeReader::create_unique(_profile, *_params,
range, _io_ctx, _state);
ReaderInitContext native_ctx;
_fill_base_init_context(&native_ctx);
init_status =
static_cast<GenericReader*>(reader.get())->init_reader(&native_ctx);
@@ -1382,7 +1381,7 @@ Status FileScanner::_init_parquet_reader(FileMetaCache*
file_meta_cache_ptr,
// IcebergParquetReader IS-A ParquetReader (CRTP mixin), no wrapping
needed
std::unique_ptr<IcebergParquetReader> iceberg_reader =
IcebergParquetReader::create_unique(
_kv_cache, _profile, *_params, range, _state->batch_size(),
&_state->timezone_obj(),
- _io_ctx.get(), _state, file_meta_cache_ptr);
+ _io_ctx, _state, file_meta_cache_ptr);
iceberg_reader->set_create_row_id_column_iterator_func(
[this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2>
{
return _create_row_id_column_iterator();
@@ -1394,21 +1393,21 @@ Status FileScanner::_init_parquet_reader(FileMetaCache*
file_meta_cache_ptr,
// PaimonParquetReader IS-A ParquetReader, no wrapping needed
auto paimon_reader = PaimonParquetReader::create_unique(
_profile, *_params, range, _state->batch_size(),
&_state->timezone_obj(), _kv_cache,
- _io_ctx.get(), _state, file_meta_cache_ptr);
+ _io_ctx, _state, file_meta_cache_ptr);
init_status =
static_cast<GenericReader*>(paimon_reader.get())->init_reader(&pctx);
_cur_reader = std::move(paimon_reader);
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "hudi") {
// HudiParquetReader IS-A ParquetReader, no wrapping needed
auto hudi_reader = HudiParquetReader::create_unique(
- _profile, *_params, range, _state->batch_size(),
&_state->timezone_obj(),
- _io_ctx.get(), _state, file_meta_cache_ptr);
+ _profile, *_params, range, _state->batch_size(),
&_state->timezone_obj(), _io_ctx,
+ _state, file_meta_cache_ptr);
init_status =
static_cast<GenericReader*>(hudi_reader.get())->init_reader(&pctx);
_cur_reader = std::move(hudi_reader);
} else if (range.table_format_params.table_format_type == "hive") {
auto hive_reader = HiveParquetReader::create_unique(
- _profile, *_params, range, _state->batch_size(),
&_state->timezone_obj(),
- _io_ctx.get(), _state, &_is_file_slot, file_meta_cache_ptr,
+ _profile, *_params, range, _state->batch_size(),
&_state->timezone_obj(), _io_ctx,
+ _state, &_is_file_slot, file_meta_cache_ptr,
_state->query_options().enable_parquet_lazy_mat);
hive_reader->set_create_row_id_column_iterator_func(
[this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2>
{
@@ -1420,7 +1419,7 @@ Status FileScanner::_init_parquet_reader(FileMetaCache*
file_meta_cache_ptr,
if (!parquet_reader) {
parquet_reader = ParquetReader::create_unique(
_profile, *_params, range, _state->batch_size(),
&_state->timezone_obj(),
- _io_ctx.get(), _state, file_meta_cache_ptr,
+ _io_ctx, _state, file_meta_cache_ptr,
_state->query_options().enable_parquet_lazy_mat);
}
parquet_reader->set_create_row_id_column_iterator_func(
@@ -1433,7 +1432,7 @@ Status FileScanner::_init_parquet_reader(FileMetaCache*
file_meta_cache_ptr,
if (!parquet_reader) {
parquet_reader = ParquetReader::create_unique(
_profile, *_params, range, _state->batch_size(),
&_state->timezone_obj(),
- _io_ctx.get(), _state, file_meta_cache_ptr,
+ _io_ctx, _state, file_meta_cache_ptr,
_state->query_options().enable_parquet_lazy_mat);
}
init_status =
static_cast<GenericReader*>(parquet_reader.get())->init_reader(&pctx);
@@ -1460,7 +1459,7 @@ Status FileScanner::_init_orc_reader(FileMetaCache*
file_meta_cache_ptr,
// TransactionalHiveReader IS-A OrcReader, no wrapping needed
auto tran_orc_reader = TransactionalHiveReader::create_unique(
_profile, _state, *_params, range, _state->batch_size(),
_state->timezone(),
- _io_ctx.get(), file_meta_cache_ptr);
+ _io_ctx, file_meta_cache_ptr);
tran_orc_reader->set_create_row_id_column_iterator_func(
[this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2>
{
return _create_row_id_column_iterator();
@@ -1473,7 +1472,7 @@ Status FileScanner::_init_orc_reader(FileMetaCache*
file_meta_cache_ptr,
// IcebergOrcReader IS-A OrcReader (CRTP mixin), no wrapping needed
std::unique_ptr<IcebergOrcReader> iceberg_reader =
IcebergOrcReader::create_unique(
_kv_cache, _profile, _state, *_params, range,
_state->batch_size(),
- _state->timezone(), _io_ctx.get(), file_meta_cache_ptr);
+ _state->timezone(), _io_ctx, file_meta_cache_ptr);
iceberg_reader->set_create_row_id_column_iterator_func(
[this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2>
{
return _create_row_id_column_iterator();
@@ -1486,7 +1485,7 @@ Status FileScanner::_init_orc_reader(FileMetaCache*
file_meta_cache_ptr,
// PaimonOrcReader IS-A OrcReader, no wrapping needed
auto paimon_reader = PaimonOrcReader::create_unique(
_profile, _state, *_params, range, _state->batch_size(),
_state->timezone(),
- _kv_cache, _io_ctx.get(), file_meta_cache_ptr);
+ _kv_cache, _io_ctx, file_meta_cache_ptr);
init_status =
static_cast<GenericReader*>(paimon_reader.get())->init_reader(&octx);
_cur_reader = std::move(paimon_reader);
@@ -1495,7 +1494,7 @@ Status FileScanner::_init_orc_reader(FileMetaCache*
file_meta_cache_ptr,
// HudiOrcReader IS-A OrcReader, no wrapping needed
auto hudi_reader = HudiOrcReader::create_unique(_profile, _state,
*_params, range,
_state->batch_size(),
_state->timezone(),
- _io_ctx.get(),
file_meta_cache_ptr);
+ _io_ctx,
file_meta_cache_ptr);
init_status =
static_cast<GenericReader*>(hudi_reader.get())->init_reader(&octx);
_cur_reader = std::move(hudi_reader);
@@ -1503,7 +1502,7 @@ Status FileScanner::_init_orc_reader(FileMetaCache*
file_meta_cache_ptr,
range.table_format_params.table_format_type == "hive") {
auto hive_reader = HiveOrcReader::create_unique(
_profile, _state, *_params, range, _state->batch_size(),
_state->timezone(),
- _io_ctx.get(), &_is_file_slot, file_meta_cache_ptr,
+ _io_ctx, &_is_file_slot, file_meta_cache_ptr,
_state->query_options().enable_orc_lazy_mat);
hive_reader->set_create_row_id_column_iterator_func(
[this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2>
{
@@ -1515,10 +1514,9 @@ Status FileScanner::_init_orc_reader(FileMetaCache*
file_meta_cache_ptr,
} else if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "tvf") {
if (!orc_reader) {
- orc_reader = OrcReader::create_unique(_profile, _state, *_params,
range,
- _state->batch_size(),
_state->timezone(),
- _io_ctx.get(),
file_meta_cache_ptr,
-
_state->query_options().enable_orc_lazy_mat);
+ orc_reader = OrcReader::create_unique(
+ _profile, _state, *_params, range, _state->batch_size(),
_state->timezone(),
+ _io_ctx, file_meta_cache_ptr,
_state->query_options().enable_orc_lazy_mat);
}
orc_reader->set_create_row_id_column_iterator_func(
[this]() -> std::shared_ptr<segment_v2::RowIdColumnIteratorV2>
{
@@ -1528,10 +1526,9 @@ Status FileScanner::_init_orc_reader(FileMetaCache*
file_meta_cache_ptr,
_cur_reader = std::move(orc_reader);
} else if (_is_load) {
if (!orc_reader) {
- orc_reader = OrcReader::create_unique(_profile, _state, *_params,
range,
- _state->batch_size(),
_state->timezone(),
- _io_ctx.get(),
file_meta_cache_ptr,
-
_state->query_options().enable_orc_lazy_mat);
+ orc_reader = OrcReader::create_unique(
+ _profile, _state, *_params, range, _state->batch_size(),
_state->timezone(),
+ _io_ctx, file_meta_cache_ptr,
_state->query_options().enable_orc_lazy_mat);
}
init_status =
static_cast<GenericReader*>(orc_reader.get())->init_reader(&octx);
_cur_reader = std::move(orc_reader);
@@ -1631,8 +1628,8 @@ Status FileScanner::read_lines_from_range(const
TFileRangeDesc& range,
switch (format_type) {
case TFileFormatType::FORMAT_PARQUET: {
std::unique_ptr<ParquetReader> parquet_reader =
ParquetReader::create_unique(
- _profile, *_params, range, 1,
&_state->timezone_obj(), _io_ctx.get(),
- _state, file_meta_cache_ptr, false);
+ _profile, *_params, range, 1,
&_state->timezone_obj(), _io_ctx, _state,
+ file_meta_cache_ptr, false);
RETURN_IF_ERROR(
_init_parquet_reader(file_meta_cache_ptr,
std::move(parquet_reader)));
// _init_parquet_reader may create a new table-format
specific reader
@@ -1643,7 +1640,7 @@ Status FileScanner::read_lines_from_range(const
TFileRangeDesc& range,
}
case TFileFormatType::FORMAT_ORC: {
std::unique_ptr<OrcReader> orc_reader =
OrcReader::create_unique(
- _profile, _state, *_params, range, 1,
_state->timezone(), _io_ctx.get(),
+ _profile, _state, *_params, range, 1,
_state->timezone(), _io_ctx,
file_meta_cache_ptr, false);
RETURN_IF_ERROR(_init_orc_reader(file_meta_cache_ptr,
std::move(orc_reader)));
// Same as above: re-apply read_by_rows to the actual
_cur_reader.
diff --git a/be/src/exec/scan/file_scanner.h b/be/src/exec/scan/file_scanner.h
index cd4066ec987..fb3c291e5ce 100644
--- a/be/src/exec/scan/file_scanner.h
+++ b/be/src/exec/scan/file_scanner.h
@@ -190,7 +190,7 @@ protected:
std::unique_ptr<io::FileCacheStatistics> _file_cache_statistics;
std::unique_ptr<io::FileReaderStats> _file_reader_stats;
- std::unique_ptr<io::IOContext> _io_ctx;
+ std::shared_ptr<io::IOContext> _io_ctx;
// Whether to fill partition columns from path, default is true.
bool _fill_partition_from_path = true;
@@ -294,7 +294,7 @@ private:
};
Status _init_io_ctx() {
- _io_ctx.reset(new io::IOContext());
+ _io_ctx = std::make_shared<io::IOContext>();
_io_ctx->query_id = &_state->query_id();
return Status::OK();
};
diff --git a/be/src/format/arrow/arrow_stream_reader.cpp
b/be/src/format/arrow/arrow_stream_reader.cpp
index 7d496d803a6..7000c55507b 100644
--- a/be/src/format/arrow/arrow_stream_reader.cpp
+++ b/be/src/format/arrow/arrow_stream_reader.cpp
@@ -57,7 +57,8 @@ ArrowStreamReader::~ArrowStreamReader() = default;
Status ArrowStreamReader::init_reader() {
io::FileReaderSPtr file_reader;
RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id,
&file_reader, _state, false));
- _file_reader = _io_ctx ?
std::make_shared<io::TracingFileReader>(std::move(file_reader),
+ _file_reader = _io_ctx && _io_ctx->file_reader_stats
+ ?
std::make_shared<io::TracingFileReader>(std::move(file_reader),
_io_ctx->file_reader_stats)
: file_reader;
_pip_stream = ArrowPipInputStream::create_unique(_file_reader);
diff --git a/be/src/format/csv/csv_reader.cpp b/be/src/format/csv/csv_reader.cpp
index 266f569acbe..731f9e61049 100644
--- a/be/src/format/csv/csv_reader.cpp
+++ b/be/src/format/csv/csv_reader.cpp
@@ -666,7 +666,8 @@ Status CsvReader::_create_file_reader(bool need_schema) {
io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
io::PrefetchRange(_range.start_offset, _range.start_offset
+ _range.size)));
}
- _file_reader = _io_ctx ?
std::make_shared<io::TracingFileReader>(std::move(file_reader),
+ _file_reader = _io_ctx && _io_ctx->file_reader_stats
+ ?
std::make_shared<io::TracingFileReader>(std::move(file_reader),
_io_ctx->file_reader_stats)
: file_reader;
}
diff --git a/be/src/format/json/new_json_reader.cpp
b/be/src/format/json/new_json_reader.cpp
index cc5208b7c30..81ceab2f6b8 100644
--- a/be/src/format/json/new_json_reader.cpp
+++ b/be/src/format/json/new_json_reader.cpp
@@ -513,7 +513,8 @@ Status NewJsonReader::_open_file_reader(bool need_schema) {
io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
io::PrefetchRange(_range.start_offset, _range.size)));
}
- _file_reader = _io_ctx ?
std::make_shared<io::TracingFileReader>(std::move(file_reader),
+ _file_reader = _io_ctx && _io_ctx->file_reader_stats
+ ?
std::make_shared<io::TracingFileReader>(std::move(file_reader),
_io_ctx->file_reader_stats)
: file_reader;
}
diff --git a/be/src/format/native/native_reader.cpp
b/be/src/format/native/native_reader.cpp
index cdf742c6925..029d7ff2024 100644
--- a/be/src/format/native/native_reader.cpp
+++ b/be/src/format/native/native_reader.cpp
@@ -19,6 +19,8 @@
#include <gen_cpp/data.pb.h>
+#include <utility>
+
#include "core/block/block.h"
#include "format/native/native_format.h"
#include "io/file_factory.h"
@@ -38,6 +40,16 @@ NativeReader::NativeReader(RuntimeProfile* profile, const
TFileScanRangeParams&
_io_ctx(io_ctx),
_state(state) {}
+NativeReader::NativeReader(RuntimeProfile* profile, const
TFileScanRangeParams& params,
+ const TFileRangeDesc& range,
+ std::shared_ptr<io::IOContext> io_ctx_holder,
RuntimeState* state)
+ : _profile(profile),
+ _scan_params(params),
+ _scan_range(range),
+ _io_ctx(io_ctx_holder ? io_ctx_holder.get() : nullptr),
+ _io_ctx_holder(std::move(io_ctx_holder)),
+ _state(state) {}
+
NativeReader::~NativeReader() {
(void)close();
}
@@ -127,15 +139,20 @@ Status NativeReader::init_reader() {
io::FileReaderOptions reader_options =
FileFactory::get_reader_options(_state, file_description);
- auto reader_res = io::DelegateReader::create_file_reader(
- _profile, system_properties, file_description, reader_options,
- io::DelegateReader::AccessMode::RANDOM, _io_ctx);
+ auto reader_res =
+ _io_ctx_holder ? io::DelegateReader::create_file_reader(
+ _profile, system_properties,
file_description, reader_options,
+ io::DelegateReader::AccessMode::RANDOM,
+ std::static_pointer_cast<const
io::IOContext>(_io_ctx_holder))
+ : io::DelegateReader::create_file_reader(
+ _profile, system_properties,
file_description, reader_options,
+ io::DelegateReader::AccessMode::RANDOM,
_io_ctx);
if (!reader_res.has_value()) {
return reader_res.error();
}
_file_reader = reader_res.value();
- if (_io_ctx) {
+ if (_io_ctx && _io_ctx->file_reader_stats) {
_file_reader =
std::make_shared<io::TracingFileReader>(_file_reader,
_io_ctx->file_reader_stats);
}
diff --git a/be/src/format/native/native_reader.h
b/be/src/format/native/native_reader.h
index 796340c4d43..83d15493310 100644
--- a/be/src/format/native/native_reader.h
+++ b/be/src/format/native/native_reader.h
@@ -20,6 +20,7 @@
#include <gen_cpp/PlanNodes_types.h>
#include <cstddef>
+#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
@@ -51,6 +52,10 @@ public:
NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
const TFileRangeDesc& range, io::IOContext* io_ctx,
RuntimeState* state);
+ NativeReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
+ const TFileRangeDesc& range, std::shared_ptr<io::IOContext>
io_ctx_holder,
+ RuntimeState* state);
+
~NativeReader() override;
// Initialize underlying file reader and any format specific state.
@@ -80,6 +85,7 @@ private:
io::FileReaderSPtr _file_reader;
io::IOContext* _io_ctx = nullptr;
+ std::shared_ptr<io::IOContext> _io_ctx_holder;
RuntimeState* _state = nullptr;
bool _eof = false;
diff --git a/be/src/format/orc/vorc_reader.cpp
b/be/src/format/orc/vorc_reader.cpp
index f41b4d0a7e5..2b79363e983 100644
--- a/be/src/format/orc/vorc_reader.cpp
+++ b/be/src/format/orc/vorc_reader.cpp
@@ -2288,7 +2288,7 @@ Status OrcReader::_do_get_next_block(Block* block,
size_t* read_rows, bool* eof)
_reader_metrics.SelectedRowGroupCount);
COUNTER_UPDATE(_orc_profile.evaluated_row_group_count,
_reader_metrics.EvaluatedRowGroupCount);
- if (_io_ctx) {
+ if (_io_ctx && _io_ctx->file_reader_stats) {
_io_ctx->file_reader_stats->read_rows +=
_reader_metrics.ReadRowCount;
}
}
@@ -3442,7 +3442,7 @@ void
ORCFileInputStream::_build_small_ranges_input_stripe_streams(
std::make_shared<OrcMergeRangeFileReader>(_profile,
_file_reader, merged_range);
std::shared_ptr<io::FileReader> tracing_file_reader;
- if (_io_ctx) {
+ if (_io_ctx && _io_ctx->file_reader_stats) {
tracing_file_reader = std::make_shared<io::TracingFileReader>(
std::move(merge_range_file_reader),
_io_ctx->file_reader_stats);
} else {
@@ -3475,7 +3475,8 @@ void
ORCFileInputStream::_build_large_ranges_input_stripe_streams(
for (const auto& range : ranges) {
auto stripe_stream_input_stream =
std::make_shared<StripeStreamInputStream>(
getName(),
- _io_ctx ? std::make_shared<io::TracingFileReader>(_file_reader,
+ _io_ctx && _io_ctx->file_reader_stats
+ ? std::make_shared<io::TracingFileReader>(_file_reader,
_io_ctx->file_reader_stats)
: _file_reader,
_io_ctx, _profile);
diff --git a/be/src/format/orc/vorc_reader.h b/be/src/format/orc/vorc_reader.h
index 6d9f74ae4a0..9f13726fd51 100644
--- a/be/src/format/orc/vorc_reader.h
+++ b/be/src/format/orc/vorc_reader.h
@@ -904,9 +904,10 @@ public:
: _file_name(file_name),
_inner_reader(inner_reader),
_file_reader(inner_reader),
- _tracing_file_reader(io_ctx ?
std::make_shared<io::TracingFileReader>(
- _file_reader,
io_ctx->file_reader_stats)
- : _file_reader),
+ _tracing_file_reader(io_ctx && io_ctx->file_reader_stats
+ ?
std::make_shared<io::TracingFileReader>(
+ _file_reader,
io_ctx->file_reader_stats)
+ : _file_reader),
_orc_once_max_read_bytes(orc_once_max_read_bytes),
_orc_max_merge_distance_bytes(orc_max_merge_distance_bytes),
_io_ctx(io_ctx),
diff --git a/be/src/format/parquet/vparquet_reader.cpp
b/be/src/format/parquet/vparquet_reader.cpp
index 8485d9e9d2a..060e077cf5a 100644
--- a/be/src/format/parquet/vparquet_reader.cpp
+++ b/be/src/format/parquet/vparquet_reader.cpp
@@ -315,10 +315,18 @@ Status ParquetReader::_open_file() {
_scan_range.__isset.modification_time ?
_scan_range.modification_time : 0;
io::FileReaderOptions reader_options =
FileFactory::get_reader_options(_state, _file_description);
- _file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
- _profile, _system_properties, _file_description,
reader_options,
- io::DelegateReader::AccessMode::RANDOM, _io_ctx));
- _tracing_file_reader = _io_ctx ?
std::make_shared<io::TracingFileReader>(
+ if (_io_ctx_holder) {
+ _file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
+ _profile, _system_properties, _file_description,
reader_options,
+ io::DelegateReader::AccessMode::RANDOM,
+ std::static_pointer_cast<const
io::IOContext>(_io_ctx_holder)));
+ } else {
+ _file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
+ _profile, _system_properties, _file_description,
reader_options,
+ io::DelegateReader::AccessMode::RANDOM, _io_ctx));
+ }
+ _tracing_file_reader = _io_ctx && _io_ctx->file_reader_stats
+ ?
std::make_shared<io::TracingFileReader>(
_file_reader,
_io_ctx->file_reader_stats)
: _file_reader;
}
@@ -898,7 +906,7 @@ Status ParquetReader::_next_row_group_reader() {
}
_reader_statistics.read_rows += candidate_row_ranges.count();
- if (_io_ctx) {
+ if (_io_ctx && _io_ctx->file_reader_stats) {
_io_ctx->file_reader_stats->read_rows +=
candidate_row_ranges.count();
}
@@ -948,7 +956,8 @@ Status ParquetReader::_next_row_group_reader() {
: _file_reader;
}
_current_group_reader.reset(new RowGroupReader(
- _io_ctx ?
std::make_shared<io::TracingFileReader>(group_file_reader,
+ _io_ctx && _io_ctx->file_reader_stats
+ ?
std::make_shared<io::TracingFileReader>(group_file_reader,
_io_ctx->file_reader_stats)
: group_file_reader,
_read_table_columns, _current_row_group_index.row_group_id,
row_group, _ctz, _io_ctx,
diff --git a/be/src/format/table/hive_reader.h
b/be/src/format/table/hive_reader.h
index 9bcaa0536e7..31a577f2dd9 100644
--- a/be/src/format/table/hive_reader.h
+++ b/be/src/format/table/hive_reader.h
@@ -17,6 +17,7 @@
#pragma once
#include <memory>
+#include <utility>
#include <vector>
#include "format/orc/vorc_reader.h"
@@ -35,6 +36,15 @@ public:
enable_lazy_mat),
_is_file_slot(is_file_slot) {}
+ HiveOrcReader(RuntimeProfile* profile, RuntimeState* state, const
TFileScanRangeParams& params,
+ const TFileRangeDesc& range, size_t batch_size, const
std::string& ctz,
+ std::shared_ptr<io::IOContext> io_ctx_holder,
+ const std::set<TSlotId>* is_file_slot, FileMetaCache*
meta_cache = nullptr,
+ bool enable_lazy_mat = true)
+ : OrcReader(profile, state, params, range, batch_size, ctz,
std::move(io_ctx_holder),
+ meta_cache, enable_lazy_mat),
+ _is_file_slot(is_file_slot) {}
+
~HiveOrcReader() final = default;
protected:
@@ -62,6 +72,15 @@ public:
enable_lazy_mat),
_is_file_slot(is_file_slot) {}
+ HiveParquetReader(RuntimeProfile* profile, const TFileScanRangeParams&
params,
+ const TFileRangeDesc& range, size_t batch_size, const
cctz::time_zone* ctz,
+ std::shared_ptr<io::IOContext> io_ctx_holder,
RuntimeState* state,
+ const std::set<TSlotId>* is_file_slot, FileMetaCache*
meta_cache = nullptr,
+ bool enable_lazy_mat = true)
+ : ParquetReader(profile, params, range, batch_size, ctz,
std::move(io_ctx_holder),
+ state, meta_cache, enable_lazy_mat),
+ _is_file_slot(is_file_slot) {}
+
~HiveParquetReader() final = default;
protected:
diff --git a/be/src/format/table/hudi_reader.h
b/be/src/format/table/hudi_reader.h
index 878f874b3ed..4b032bc190a 100644
--- a/be/src/format/table/hudi_reader.h
+++ b/be/src/format/table/hudi_reader.h
@@ -16,6 +16,7 @@
// under the License.
#pragma once
#include <memory>
+#include <utility>
#include <vector>
#include "format/orc/vorc_reader.h"
@@ -34,6 +35,12 @@ public:
FileMetaCache* meta_cache = nullptr, bool
enable_lazy_mat = true)
: ParquetReader(profile, params, range, batch_size, ctz, io_ctx,
state, meta_cache,
enable_lazy_mat) {}
+ HudiParquetReader(RuntimeProfile* profile, const TFileScanRangeParams&
params,
+ const TFileRangeDesc& range, size_t batch_size, const
cctz::time_zone* ctz,
+ std::shared_ptr<io::IOContext> io_ctx_holder,
RuntimeState* state,
+ FileMetaCache* meta_cache = nullptr, bool
enable_lazy_mat = true)
+ : ParquetReader(profile, params, range, batch_size, ctz,
std::move(io_ctx_holder),
+ state, meta_cache, enable_lazy_mat) {}
~HudiParquetReader() final = default;
protected:
@@ -50,6 +57,12 @@ public:
bool enable_lazy_mat = true)
: OrcReader(profile, state, params, range, batch_size, ctz,
io_ctx, meta_cache,
enable_lazy_mat) {}
+ HudiOrcReader(RuntimeProfile* profile, RuntimeState* state, const
TFileScanRangeParams& params,
+ const TFileRangeDesc& range, size_t batch_size, const
std::string& ctz,
+ std::shared_ptr<io::IOContext> io_ctx_holder, FileMetaCache*
meta_cache = nullptr,
+ bool enable_lazy_mat = true)
+ : OrcReader(profile, state, params, range, batch_size, ctz,
std::move(io_ctx_holder),
+ meta_cache, enable_lazy_mat) {}
~HudiOrcReader() final = default;
protected:
diff --git a/be/src/format/table/iceberg_reader.h
b/be/src/format/table/iceberg_reader.h
index ab28b57dfa5..d4156964efe 100644
--- a/be/src/format/table/iceberg_reader.h
+++ b/be/src/format/table/iceberg_reader.h
@@ -76,6 +76,14 @@ public:
: IcebergReaderMixin<ParquetReader>(kv_cache, profile, params,
range, batch_size, ctz,
io_ctx, state, meta_cache) {}
+ IcebergParquetReader(ShardedKVCache* kv_cache, RuntimeProfile* profile,
+ const TFileScanRangeParams& params, const
TFileRangeDesc& range,
+ size_t batch_size, const cctz::time_zone* ctz,
+ std::shared_ptr<io::IOContext> io_ctx_holder,
RuntimeState* state,
+ FileMetaCache* meta_cache)
+ : IcebergReaderMixin<ParquetReader>(kv_cache, profile, params,
range, batch_size, ctz,
+ std::move(io_ctx_holder),
state, meta_cache) {}
+
void set_delete_rows() final {
// Call ParquetReader's set_delete_rows(const vector<int64_t>*)
ParquetReader::set_delete_rows(_iceberg_delete_rows);
@@ -113,6 +121,13 @@ public:
: IcebergReaderMixin<OrcReader>(kv_cache, profile, state, params,
range, batch_size,
ctz, io_ctx, meta_cache) {}
+ IcebergOrcReader(ShardedKVCache* kv_cache, RuntimeProfile* profile,
RuntimeState* state,
+ const TFileScanRangeParams& params, const TFileRangeDesc&
range,
+ size_t batch_size, const std::string& ctz,
+ std::shared_ptr<io::IOContext> io_ctx_holder,
FileMetaCache* meta_cache)
+ : IcebergReaderMixin<OrcReader>(kv_cache, profile, state, params,
range, batch_size,
+ ctz, std::move(io_ctx_holder),
meta_cache) {}
+
void set_delete_rows() final {
// Call OrcReader's set_position_delete_rowids
this->set_position_delete_rowids(_iceberg_delete_rows);
diff --git a/be/src/format/table/paimon_reader.h
b/be/src/format/table/paimon_reader.h
index 0a916dfc094..42a5d211a9b 100644
--- a/be/src/format/table/paimon_reader.h
+++ b/be/src/format/table/paimon_reader.h
@@ -18,6 +18,7 @@
#pragma once
#include <memory>
+#include <utility>
#include <vector>
#include "format/orc/vorc_reader.h"
@@ -42,6 +43,16 @@ public:
_kv_cache(kv_cache) {
_init_paimon_profile();
}
+ PaimonOrcReader(RuntimeProfile* profile, RuntimeState* state,
+ const TFileScanRangeParams& params, const TFileRangeDesc&
range,
+ size_t batch_size, const std::string& ctz, ShardedKVCache*
kv_cache,
+ std::shared_ptr<io::IOContext> io_ctx_holder,
+ FileMetaCache* meta_cache = nullptr, bool enable_lazy_mat
= true)
+ : OrcReader(profile, state, params, range, batch_size, ctz,
std::move(io_ctx_holder),
+ meta_cache, enable_lazy_mat),
+ _kv_cache(kv_cache) {
+ _init_paimon_profile();
+ }
~PaimonOrcReader() final = default;
protected:
@@ -77,6 +88,16 @@ public:
_kv_cache(kv_cache) {
_init_paimon_profile();
}
+ PaimonParquetReader(RuntimeProfile* profile, const TFileScanRangeParams&
params,
+ const TFileRangeDesc& range, size_t batch_size, const
cctz::time_zone* ctz,
+ ShardedKVCache* kv_cache,
std::shared_ptr<io::IOContext> io_ctx_holder,
+ RuntimeState* state, FileMetaCache* meta_cache =
nullptr,
+ bool enable_lazy_mat = true)
+ : ParquetReader(profile, params, range, batch_size, ctz,
std::move(io_ctx_holder),
+ state, meta_cache, enable_lazy_mat),
+ _kv_cache(kv_cache) {
+ _init_paimon_profile();
+ }
~PaimonParquetReader() final = default;
protected:
diff --git a/be/src/format/table/transactional_hive_reader.cpp
b/be/src/format/table/transactional_hive_reader.cpp
index 2308d746b14..16b86bb6eca 100644
--- a/be/src/format/table/transactional_hive_reader.cpp
+++ b/be/src/format/table/transactional_hive_reader.cpp
@@ -19,6 +19,8 @@
#include <re2/re2.h>
+#include <utility>
+
#include "core/data_type/data_type_factory.hpp"
#include "format/orc/vorc_reader.h"
#include "format/table/table_schema_change_helper.h"
@@ -40,6 +42,21 @@
TransactionalHiveReader::TransactionalHiveReader(RuntimeProfile* profile, Runtim
const std::string& ctz,
io::IOContext* io_ctx,
FileMetaCache* meta_cache)
: OrcReader(profile, state, params, range, batch_size, ctz, io_ctx,
meta_cache, false) {
+ _init_transactional_hive_profile();
+}
+
+TransactionalHiveReader::TransactionalHiveReader(RuntimeProfile* profile,
RuntimeState* state,
+ const TFileScanRangeParams&
params,
+ const TFileRangeDesc& range,
size_t batch_size,
+ const std::string& ctz,
+
std::shared_ptr<io::IOContext> io_ctx_holder,
+ FileMetaCache* meta_cache)
+ : OrcReader(profile, state, params, range, batch_size, ctz,
std::move(io_ctx_holder),
+ meta_cache, false) {
+ _init_transactional_hive_profile();
+}
+
+void TransactionalHiveReader::_init_transactional_hive_profile() {
static const char* transactional_hive_profile = "TransactionalHiveProfile";
ADD_TIMER(get_profile(), transactional_hive_profile);
_transactional_orc_profile.num_delete_files = ADD_CHILD_COUNTER(
diff --git a/be/src/format/table/transactional_hive_reader.h
b/be/src/format/table/transactional_hive_reader.h
index 77adf88aad0..1f8c26a7ff2 100644
--- a/be/src/format/table/transactional_hive_reader.h
+++ b/be/src/format/table/transactional_hive_reader.h
@@ -19,6 +19,7 @@
#include <cstddef>
#include <cstdint>
+#include <memory>
#include <string>
#include <unordered_map>
#include <vector>
@@ -53,6 +54,11 @@ public:
const TFileScanRangeParams& params, const
TFileRangeDesc& range,
size_t batch_size, const std::string& ctz,
io::IOContext* io_ctx,
FileMetaCache* meta_cache = nullptr);
+ TransactionalHiveReader(RuntimeProfile* profile, RuntimeState* state,
+ const TFileScanRangeParams& params, const
TFileRangeDesc& range,
+ size_t batch_size, const std::string& ctz,
+ std::shared_ptr<io::IOContext> io_ctx_holder,
+ FileMetaCache* meta_cache = nullptr);
~TransactionalHiveReader() final = default;
protected:
@@ -69,6 +75,8 @@ protected:
Status on_after_read_block(Block* block, size_t* read_rows) override;
private:
+ void _init_transactional_hive_profile();
+
struct TransactionalHiveProfile {
RuntimeProfile::Counter* num_delete_files = nullptr;
RuntimeProfile::Counter* num_delete_rows = nullptr;
diff --git a/be/src/format/text/text_reader.cpp
b/be/src/format/text/text_reader.cpp
index 2fd9b749fbc..0e6a4f89d27 100644
--- a/be/src/format/text/text_reader.cpp
+++ b/be/src/format/text/text_reader.cpp
@@ -22,6 +22,7 @@
#include <glog/logging.h>
#include <cstddef>
+#include <utility>
#include <vector>
#include "common/compiler_util.h" // IWYU pragma: keep
@@ -113,8 +114,9 @@ void HiveTextFieldSplitter::_split_field_multi_char(const
Slice& line,
TextReader::TextReader(RuntimeState* state, RuntimeProfile* profile,
ScannerCounter* counter,
const TFileScanRangeParams& params, const
TFileRangeDesc& range,
const std::vector<SlotDescriptor*>& file_slot_descs,
size_t batch_size,
- io::IOContext* io_ctx)
- : CsvReader(state, profile, counter, params, range, file_slot_descs,
batch_size, io_ctx) {}
+ io::IOContext* io_ctx, std::shared_ptr<io::IOContext>
io_ctx_holder)
+ : CsvReader(state, profile, counter, params, range, file_slot_descs,
batch_size, io_ctx,
+ std::move(io_ctx_holder)) {}
Status TextReader::_init_options() {
// get column_separator and line_delimiter
diff --git a/be/src/format/text/text_reader.h b/be/src/format/text/text_reader.h
index 677ede3f8bc..c0cebf5da77 100644
--- a/be/src/format/text/text_reader.h
+++ b/be/src/format/text/text_reader.h
@@ -21,6 +21,7 @@
#include <gen_cpp/internal_service.pb.h>
#include <cstddef>
+#include <memory>
#include <string>
#include <vector>
@@ -56,7 +57,7 @@ public:
TextReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter*
counter,
const TFileScanRangeParams& params, const TFileRangeDesc& range,
const std::vector<SlotDescriptor*>& file_slot_descs, size_t
batch_size,
- io::IOContext* io_ctx);
+ io::IOContext* io_ctx, std::shared_ptr<io::IOContext>
io_ctx_holder = nullptr);
~TextReader() override = default;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]