This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fa7a38b5874 [fix](runtime filter) append late arrival runtime filters
in vfilecanner (#25996)
fa7a38b5874 is described below
commit fa7a38b5874357297bee53f00918f85404b9617e
Author: Ashin Gau <[email protected]>
AuthorDate: Tue Nov 7 09:50:35 2023 +0800
[fix](runtime filter) append late arrival runtime filters in vfilecanner
(#25996)
`VFileScanner` will try to append late arrival runtime filters in each loop
of `ScannerScheduler::_scanner_scan`. However,
`VFileScanner::_get_next_reader` only generates the `_push_down_conjuncts` in
the first loop, so the late arrival runtime filters are ignored.
---
be/src/vec/exec/scan/vfile_scanner.cpp | 44 +++++++++++++++++++++-------------
be/src/vec/exec/scan/vfile_scanner.h | 2 ++
2 files changed, 29 insertions(+), 17 deletions(-)
diff --git a/be/src/vec/exec/scan/vfile_scanner.cpp
b/be/src/vec/exec/scan/vfile_scanner.cpp
index 35e1d3dff53..8eda9c1714b 100644
--- a/be/src/vec/exec/scan/vfile_scanner.cpp
+++ b/be/src/vec/exec/scan/vfile_scanner.cpp
@@ -166,6 +166,8 @@ Status VFileScanner::prepare(
ADD_TIMER(_parent->_scanner_profile,
"FileScannerConvertOuputBlockTime");
_empty_file_counter = ADD_COUNTER(_parent->_scanner_profile,
"EmptyFileNum", TUnit::UNIT);
_file_counter = ADD_COUNTER(_parent->_scanner_profile, "FileNumber",
TUnit::UNIT);
+ _has_fully_rf_file_counter =
+ ADD_COUNTER(_parent->_scanner_profile, "HasFullyRfFileNumber",
TUnit::UNIT);
} else {
_get_block_timer = ADD_TIMER(_local_state->scanner_profile(),
"FileScannerGetBlockTime");
_open_reader_timer =
@@ -182,6 +184,8 @@ Status VFileScanner::prepare(
_empty_file_counter =
ADD_COUNTER(_local_state->scanner_profile(), "EmptyFileNum",
TUnit::UNIT);
_file_counter = ADD_COUNTER(_local_state->scanner_profile(),
"FileNumber", TUnit::UNIT);
+ _has_fully_rf_file_counter =
+ ADD_COUNTER(_local_state->scanner_profile(),
"HasFullyRfFileNumber", TUnit::UNIT);
}
_file_cache_statistics.reset(new io::FileCacheStatistics());
@@ -222,7 +226,9 @@ Status VFileScanner::prepare(
}
Status VFileScanner::_process_conjuncts_for_dict_filter() {
- for (auto& conjunct : _conjuncts) {
+ _slot_id_to_filter_conjuncts.clear();
+ _not_single_slot_filter_conjuncts.clear();
+ for (auto& conjunct : _push_down_conjuncts) {
auto impl = conjunct->root()->get_impl();
// If impl is not null, which means this a conjuncts from runtime
filter.
auto cur_expr = impl ? impl : conjunct->root();
@@ -250,6 +256,22 @@ Status VFileScanner::_process_conjuncts_for_dict_filter() {
return Status::OK();
}
+Status VFileScanner::_process_late_arrival_conjuncts() {
+ if (_push_down_conjuncts.size() < _conjuncts.size()) {
+ _push_down_conjuncts.clear();
+ _push_down_conjuncts.resize(_conjuncts.size());
+ for (size_t i = 0; i != _conjuncts.size(); ++i) {
+ RETURN_IF_ERROR(_conjuncts[i]->clone(_state,
_push_down_conjuncts[i]));
+ }
+ RETURN_IF_ERROR(_process_conjuncts_for_dict_filter());
+ _discard_conjuncts();
+ }
+ if (_applied_rf_num == _total_rf_num) {
+ COUNTER_UPDATE(_has_fully_rf_file_counter, 1);
+ }
+ return Status::OK();
+}
+
void VFileScanner::_get_slot_ids(VExpr* expr, std::vector<int>* slot_ids) {
for (auto& child_expr : expr->children()) {
if (child_expr->is_slot_ref()) {
@@ -766,12 +788,8 @@ Status VFileScanner::_get_next_reader() {
SCOPED_TIMER(_open_reader_timer);
RETURN_IF_ERROR(parquet_reader->open());
}
- if (push_down_predicates && _push_down_conjuncts.empty() &&
!_conjuncts.empty()) {
- _push_down_conjuncts.resize(_conjuncts.size());
- for (size_t i = 0; i != _conjuncts.size(); ++i) {
- RETURN_IF_ERROR(_conjuncts[i]->clone(_state,
_push_down_conjuncts[i]));
- }
- _discard_conjuncts();
+ if (push_down_predicates) {
+ RETURN_IF_ERROR(_process_late_arrival_conjuncts());
}
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "iceberg") {
@@ -802,12 +820,8 @@ Status VFileScanner::_get_next_reader() {
std::unique_ptr<OrcReader> orc_reader = OrcReader::create_unique(
_profile, _state, *_params, range,
_state->query_options().batch_size,
_state->timezone(), _io_ctx.get(),
_state->query_options().enable_orc_lazy_mat);
- if (push_down_predicates && _push_down_conjuncts.empty() &&
!_conjuncts.empty()) {
- _push_down_conjuncts.resize(_conjuncts.size());
- for (size_t i = 0; i != _conjuncts.size(); ++i) {
- RETURN_IF_ERROR(_conjuncts[i]->clone(_state,
_push_down_conjuncts[i]));
- }
- _discard_conjuncts();
+ if (push_down_predicates) {
+ RETURN_IF_ERROR(_process_late_arrival_conjuncts());
}
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type ==
"transactional_hive") {
@@ -1080,10 +1094,6 @@ Status VFileScanner::_init_expr_ctxes() {
}
}
}
- // TODO: It should can move to scan node to process.
- if (!_conjuncts.empty()) {
- static_cast<void>(_process_conjuncts_for_dict_filter());
- }
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/vfile_scanner.h
b/be/src/vec/exec/scan/vfile_scanner.h
index b7ceefe775c..4524abb1fdb 100644
--- a/be/src/vec/exec/scan/vfile_scanner.h
+++ b/be/src/vec/exec/scan/vfile_scanner.h
@@ -176,6 +176,7 @@ private:
RuntimeProfile::Counter* _convert_to_output_block_timer = nullptr;
RuntimeProfile::Counter* _empty_file_counter = nullptr;
RuntimeProfile::Counter* _file_counter = nullptr;
+ RuntimeProfile::Counter* _has_fully_rf_file_counter = nullptr;
const std::unordered_map<std::string, int>* _col_name_to_slot_id;
// single slot filter conjuncts
@@ -206,6 +207,7 @@ private:
Status _generate_fill_columns();
Status _handle_dynamic_block(Block* block);
Status _process_conjuncts_for_dict_filter();
+ Status _process_late_arrival_conjuncts();
void _get_slot_ids(VExpr* expr, std::vector<int>* slot_ids);
void _reset_counter() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]