This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0a7c2fc928c [refactor](profile) Refactor of RuntimeFilter profile
(#49777)
0a7c2fc928c is described below
commit 0a7c2fc928ce41dc84e25c1af3e337fcfaf57327
Author: zhiqiang <[email protected]>
AuthorDate: Tue Apr 8 18:24:24 2025 +0800
[refactor](profile) Refactor of RuntimeFilter profile (#49777)
### What problem does this PR solve?
Refactor of runtime filter profile.
1. Counter is shared by related objects.
2. Update RuntimeProfile only when pipeline task is closed.
3. A new Counter type, which is similar to info_string but could be
added as a Counter.
```
use ssb;
SELECT c_city, s_city, d_year, SUM(lo_revenue) AS REVENUE FROM
customer, lineorder, supplier, dates WHERE lo_custkey = c_custkey AND
lo_suppkey = s_suppkey AND lo_orderdate = d_datekey AND ( c_city =
'UNITED KI1' OR c_city = 'UNITED KI5' ) AND ( s_city = 'UNITED
KI1' OR s_city = 'UNITED KI5' ) AND d_year >= 1992 AND d_year <=
1997 GROUP BY c_city, s_city, d_year ORDER BY d_year ASC, REVENUE
DESC
```
We will have a structured counter in executon profile like
```text
- RuntimeFilterInfo:
- AcquireRuntimeFilter: 6.972ms
- RF0 AlwaysTrueFilterRows: 0
- RF0 FilterRows: 41
- RF0 Info: Consumer: ([id: 0, state: [READY], type:
MINMAX_FILTER, column_type: INT], mode: LOCAL, state: APPLIED)
- RF0 InputRows: 48.757K (48757)
- RF0 WaitTime: 161.0ms
- RF1 AlwaysTrueFilterRows: 0
- RF1 FilterRows: 11.288919M (11288919)
- RF1 Info: Consumer: ([id: 1, state: [READY], type:
IN_OR_BLOOM_FILTER(BLOOM_FILTER), column_type: INT, bf_size: 1048576,
build_bf_by_runtime_size: true], mode: LOCAL, state: APPLIED)
- RF1 InputRows: 11.381544M (11381544)
- RF1 WaitTime: 160.0ms
- RF2 AlwaysTrueFilterRows: 0
- RF2 FilterRows: 0
- RF2 Info: Consumer: ([id: 2, state: [READY], type:
MINMAX_FILTER, column_type: INT], mode: LOCAL, state: APPLIED)
- RF2 InputRows: 48.686K (48686)
- RF2 WaitTime: 224.0ms
- RF3 AlwaysTrueFilterRows: 0
- RF3 FilterRows: 91.841K (91841)
- RF3 Info: Consumer: ([id: 3, state: [READY], type:
IN_OR_BLOOM_FILTER(BLOOM_FILTER), column_type: INT, bf_size: 1048576,
build_bf_by_runtime_size: true], mode: LOCAL, state: APPLIED)
- RF3 InputRows: 92.625K (92625)
- RF3 WaitTime: 223.0ms
- RF4 AlwaysTrueFilterRows: 0
- RF4 FilterRows: 0
- RF4 Info: Consumer: ([id: 4, state: [READY], type:
MINMAX_FILTER, column_type: INT], mode: LOCAL, state: APPLIED)
- RF4 InputRows: 0
- RF4 WaitTime: 149.0ms
- RF5 AlwaysTrueFilterRows: 0
- RF5 FilterRows: 0
- RF5 Info: Consumer: ([id: 5, state: [READY], type:
IN_OR_BLOOM_FILTER(BLOOM_FILTER), column_type: INT, bf_size: 1048576,
build_bf_by_runtime_size: true], mode: LOCAL, state: APPLIED)
- RF5 InputRows: 7
- RF5 WaitTime: 149.0ms
```
Merged profile will be like
```
- RuntimeFilterInfo: sum , avg , max , min
- RF0 FilterRows: sum 2.139K (2139), avg 44, max 59, min 30
- RF0 InputRows: sum 2.340004M (2340004), avg 48.75K (48750),
max 48.76K (48760), min 48.741K (48741)
- RF1 FilterRows: sum 542.210415M (542210415), avg 11.29605M
(11296050), max 11.307756M (11307756), min 11.281476M (11281476)
- RF1 InputRows: sum 546.667366M (546667366), avg 11.388903M
(11388903), max 11.400674M (11400674), min 11.374343M (11374343)
- RF2 FilterRows: sum 109, avg 2, max 12, min 0
- RF2 InputRows: sum 2.336525M (2336525), avg 48.677K (48677),
max 48.708K (48708), min 48.645K (48645)
- RF3 FilterRows: sum 4.421298M (4421298), avg 92.11K (92110),
max 92.878K (92878), min 91.634K (91634)
- RF3 InputRows: sum 4.456951M (4456951), avg 92.853K (92853),
max 93.618K (93618), min 92.338K (92338)
- RF4 FilterRows: sum 0, avg 0, max 0, min 0
- RF4 InputRows: sum 0, avg 0, max 0, min 0
- RF5 FilterRows: sum 0, avg 0, max 0, min 0
- RF5 InputRows: sum 340, avg 7, max 9, min 6
```
---
be/src/exec/olap_common.h | 28 +++++--
be/src/olap/bitmap_filter_predicate.h | 2 -
be/src/olap/column_predicate.h | 45 +++++------
be/src/olap/comparison_predicate.h | 6 +-
be/src/olap/filter_olap_param.h | 24 ++++--
be/src/olap/olap_common.h | 9 +--
be/src/olap/rowset/segment_v2/segment_iterator.cpp | 12 ---
be/src/olap/tablet_reader.cpp | 15 ++--
be/src/pipeline/exec/datagen_operator.cpp | 4 +-
.../exec/multi_cast_data_stream_source.cpp | 6 +-
be/src/pipeline/exec/olap_scan_operator.cpp | 32 --------
be/src/pipeline/exec/olap_scan_operator.h | 6 --
be/src/pipeline/exec/scan_operator.cpp | 13 +--
be/src/runtime/runtime_state.cpp | 4 +-
be/src/runtime/runtime_state.h | 7 +-
be/src/runtime_filter/runtime_filter_consumer.cpp | 49 ++++++++----
be/src/runtime_filter/runtime_filter_consumer.h | 52 ++++++------
.../runtime_filter_consumer_helper.cpp | 25 ++++--
.../runtime_filter_consumer_helper.h | 13 ++-
be/src/runtime_filter/runtime_filter_mgr.cpp | 9 +--
be/src/runtime_filter/runtime_filter_mgr.h | 3 +-
be/src/util/runtime_profile.cpp | 38 ++++-----
be/src/util/runtime_profile.h | 51 ++++++------
be/src/vec/exec/scan/olap_scanner.cpp | 3 -
be/src/vec/exprs/vruntimefilter_wrapper.cpp | 6 +-
be/src/vec/exprs/vruntimefilter_wrapper.h | 56 +++++++------
.../runtime_filter_consumer_helper_test.cpp | 4 +-
.../runtime_filter_consumer_test.cpp | 41 +++++-----
be/test/runtime_filter/runtime_filter_mgr_test.cpp | 3 +-
.../runtime_filter_producer_test.cpp | 12 +--
.../runtime_profile_counter_tree_node_test.cpp | 92 +++++++++++++++-------
.../org/apache/doris/common/profile/Counter.java | 16 +++-
.../doris/common/profile/RuntimeProfile.java | 8 +-
gensrc/thrift/RuntimeProfile.thrift | 1 +
34 files changed, 368 insertions(+), 327 deletions(-)
diff --git a/be/src/exec/olap_common.h b/be/src/exec/olap_common.h
index 65469b6c968..62604bd1bc9 100644
--- a/be/src/exec/olap_common.h
+++ b/be/src/exec/olap_common.h
@@ -17,6 +17,7 @@
#pragma once
+#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <glog/logging.h>
#include <stddef.h>
@@ -41,6 +42,7 @@
#include "runtime/define_primitive_type.h"
#include "runtime/primitive_type.h"
#include "runtime/type_limit.h"
+#include "util/runtime_profile.h"
#include "vec/core/types.h"
#include "vec/io/io_helper.h"
#include "vec/runtime/ipv4_value.h"
@@ -301,12 +303,21 @@ public:
_contain_null = _is_nullable_col && contain_null;
}
- void set_runtime_filter_info(int runtime_filter_id,
- RuntimeProfile::Counter*
predicate_filtered_rows_counter,
- RuntimeProfile::Counter*
predicate_input_rows_counter) {
+ void attach_profile_counter(
+ int runtime_filter_id,
+ std::shared_ptr<RuntimeProfile::Counter>
predicate_filtered_rows_counter,
+ std::shared_ptr<RuntimeProfile::Counter>
predicate_input_rows_counter) {
+ DCHECK(predicate_filtered_rows_counter != nullptr);
+ DCHECK(predicate_input_rows_counter != nullptr);
+
_runtime_filter_id = runtime_filter_id;
- _predicate_filtered_rows_counter = predicate_filtered_rows_counter;
- _predicate_input_rows_counter = predicate_input_rows_counter;
+
+ if (predicate_filtered_rows_counter != nullptr) {
+ _predicate_filtered_rows_counter = predicate_filtered_rows_counter;
+ }
+ if (predicate_input_rows_counter != nullptr) {
+ _predicate_input_rows_counter = predicate_input_rows_counter;
+ }
}
int precision() const { return _precision; }
@@ -370,8 +381,11 @@ private:
primitive_type ==
PrimitiveType::TYPE_DATETIMEV2;
int _runtime_filter_id = -1;
- RuntimeProfile::Counter* _predicate_filtered_rows_counter = nullptr;
- RuntimeProfile::Counter* _predicate_input_rows_counter = nullptr;
+
+ std::shared_ptr<RuntimeProfile::Counter> _predicate_filtered_rows_counter =
+ std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
+ std::shared_ptr<RuntimeProfile::Counter> _predicate_input_rows_counter =
+ std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
};
class OlapScanKeys {
diff --git a/be/src/olap/bitmap_filter_predicate.h
b/be/src/olap/bitmap_filter_predicate.h
index 12cbd94ec8d..c1488869a04 100644
--- a/be/src/olap/bitmap_filter_predicate.h
+++ b/be/src/olap/bitmap_filter_predicate.h
@@ -113,8 +113,6 @@ uint16_t
BitmapFilterColumnPredicate<T>::_evaluate_inner(const vectorized::IColu
} else {
new_size = evaluate<false>(column, nullptr, sel, size);
}
- _evaluated_rows += size;
- _passed_rows += new_size;
update_filter_info(size - new_size, size);
return new_size;
}
diff --git a/be/src/olap/column_predicate.h b/be/src/olap/column_predicate.h
index 628b49c6213..92e5dea4537 100644
--- a/be/src/olap/column_predicate.h
+++ b/be/src/olap/column_predicate.h
@@ -17,6 +17,7 @@
#pragma once
+#include <memory>
#include <roaring/roaring.hh>
#include "common/exception.h"
@@ -24,6 +25,7 @@
#include "olap/rowset/segment_v2/bloom_filter.h"
#include "olap/rowset/segment_v2/inverted_index_reader.h"
#include "runtime/define_primitive_type.h"
+#include "util/runtime_profile.h"
#include "vec/columns/column.h"
#include "vec/exprs/vruntimefilter_wrapper.h"
@@ -184,8 +186,6 @@ public:
}
uint16_t new_size = _evaluate_inner(column, sel, size);
- _evaluated_rows += size;
- _passed_rows += new_size;
if (_can_ignore()) {
do_judge_selectivity(size - new_size, size);
}
@@ -255,31 +255,25 @@ public:
int get_runtime_filter_id() const { return _runtime_filter_id; }
- void set_runtime_filter_info(int filter_id,
- RuntimeProfile::Counter*
predicate_filtered_rows_counter,
- RuntimeProfile::Counter*
predicate_input_rows_counter) {
- if (filter_id >= 0) {
- DCHECK(predicate_filtered_rows_counter != nullptr);
- DCHECK(predicate_input_rows_counter != nullptr);
- }
+ void attach_profile_counter(
+ int filter_id, std::shared_ptr<RuntimeProfile::Counter>
predicate_filtered_rows_counter,
+ std::shared_ptr<RuntimeProfile::Counter>
predicate_input_rows_counter) {
_runtime_filter_id = filter_id;
- _predicate_filtered_rows_counter = predicate_filtered_rows_counter;
- _predicate_input_rows_counter = predicate_input_rows_counter;
- }
+ DCHECK(predicate_filtered_rows_counter != nullptr);
+ DCHECK(predicate_input_rows_counter != nullptr);
- /// TODO: Currently we only record statistics for runtime filters, in the
future we should record for all predicates
- void update_filter_info(int64_t filter_rows, int64_t input_rows) const {
- if (_predicate_input_rows_counter) {
- COUNTER_UPDATE(_predicate_input_rows_counter, input_rows);
+ if (predicate_filtered_rows_counter != nullptr) {
+ _predicate_filtered_rows_counter = predicate_filtered_rows_counter;
}
- if (_predicate_filtered_rows_counter) {
- COUNTER_UPDATE(_predicate_filtered_rows_counter, filter_rows);
+ if (predicate_input_rows_counter != nullptr) {
+ _predicate_input_rows_counter = predicate_input_rows_counter;
}
}
- PredicateFilterInfo get_filtered_info() const {
- return PredicateFilterInfo {static_cast<int>(type()), _evaluated_rows
- 1,
- _evaluated_rows - 1 - _passed_rows};
+ /// TODO: Currently we only record statistics for runtime filters, in the
future we should record for all predicates
+ void update_filter_info(int64_t filter_rows, int64_t input_rows) const {
+ COUNTER_UPDATE(_predicate_input_rows_counter, input_rows);
+ COUNTER_UPDATE(_predicate_filtered_rows_counter, filter_rows);
}
static std::string pred_type_string(PredicateType type) {
@@ -353,8 +347,6 @@ protected:
// TODO: the value is only in delete condition, better be template value
bool _opposite;
int _runtime_filter_id = -1;
- mutable uint64_t _evaluated_rows = 1;
- mutable uint64_t _passed_rows = 0;
// VRuntimeFilterWrapper and ColumnPredicate share the same logic,
// but it's challenging to unify them, so the code is duplicated.
// _judge_counter, _judge_input_rows, _judge_filter_rows, and _always_true
@@ -368,8 +360,11 @@ protected:
mutable uint64_t _judge_filter_rows = 0;
mutable bool _always_true = false;
- RuntimeProfile::Counter* _predicate_filtered_rows_counter = nullptr;
- RuntimeProfile::Counter* _predicate_input_rows_counter = nullptr;
+ std::shared_ptr<RuntimeProfile::Counter> _predicate_filtered_rows_counter =
+ std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
+
+ std::shared_ptr<RuntimeProfile::Counter> _predicate_input_rows_counter =
+ std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
};
} //namespace doris
diff --git a/be/src/olap/comparison_predicate.h
b/be/src/olap/comparison_predicate.h
index d7bf38a6c6a..1fddd4b1046 100644
--- a/be/src/olap/comparison_predicate.h
+++ b/be/src/olap/comparison_predicate.h
@@ -267,9 +267,12 @@ public:
} else {
current_evaluated_rows += size;
}
- _evaluated_rows += current_evaluated_rows;
}
+ // defer is created after its reference args are created.
+ // so defer will be destroyed BEFORE the reference args.
+ // so reference here is safe.
+ //
https://stackoverflow.com/questions/14688285/c-local-variable-destruction-order
Defer defer([&]() {
update_filter_info(current_evaluated_rows - current_passed_rows,
current_evaluated_rows);
@@ -359,7 +362,6 @@ public:
for (uint16_t i = 0; i < size; i++) {
current_passed_rows += flags[i];
}
- _passed_rows += current_passed_rows;
do_judge_selectivity(current_evaluated_rows - current_passed_rows,
current_evaluated_rows);
}
diff --git a/be/src/olap/filter_olap_param.h b/be/src/olap/filter_olap_param.h
index d9aa6386ec6..272fee63fb5 100644
--- a/be/src/olap/filter_olap_param.h
+++ b/be/src/olap/filter_olap_param.h
@@ -24,18 +24,28 @@ namespace doris {
template <typename T>
struct FilterOlapParam {
FilterOlapParam(std::string column_name, T filter, int runtime_filter_id,
- RuntimeProfile::Counter* filtered_counter,
- RuntimeProfile::Counter* input_counter)
+ std::shared_ptr<RuntimeProfile::Counter> filtered_counter,
+ std::shared_ptr<RuntimeProfile::Counter> input_counter)
: column_name(std::move(column_name)),
filter(std::move(filter)),
- runtime_filter_id(runtime_filter_id),
- filtered_rows_counter(filtered_counter),
- input_rows_counter(input_counter) {}
+ runtime_filter_id(runtime_filter_id) {
+ DCHECK(filtered_rows_counter != nullptr);
+ DCHECK(input_rows_counter != nullptr);
+ if (filtered_counter != nullptr) {
+ filtered_rows_counter = filtered_counter;
+ }
+ if (input_counter != nullptr) {
+ input_rows_counter = input_counter;
+ }
+ }
+
std::string column_name;
T filter;
int runtime_filter_id;
- RuntimeProfile::Counter* filtered_rows_counter = nullptr;
- RuntimeProfile::Counter* input_rows_counter = nullptr;
+ std::shared_ptr<RuntimeProfile::Counter> filtered_rows_counter =
+ std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
+ std::shared_ptr<RuntimeProfile::Counter> input_rows_counter =
+ std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
};
} // namespace doris
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 616bcf980de..fe92bea5017 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -77,11 +77,7 @@ struct DataDirInfo {
DataDirType data_dir_type = DataDirType::OLAP_DATA_DIR;
std::string metric_name;
};
-struct PredicateFilterInfo {
- int type = 0;
- uint64_t input_row = 0;
- uint64_t filtered_row = 0;
-};
+
// Sort DataDirInfo by available space.
struct DataDirInfoLessAvailability {
bool operator()(const DataDirInfo& left, const DataDirInfo& right) const {
@@ -337,9 +333,6 @@ struct OlapReaderStatistics {
int64_t short_cond_ns = 0;
int64_t expr_filter_ns = 0;
int64_t output_col_ns = 0;
-
- std::map<int, PredicateFilterInfo> filter_info;
-
int64_t rows_key_range_filtered = 0;
int64_t rows_stats_filtered = 0;
int64_t rows_stats_rp_filtered = 0;
diff --git a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
index 00306091876..ce4ec01c8ba 100644
--- a/be/src/olap/rowset/segment_v2/segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_iterator.cpp
@@ -1877,17 +1877,6 @@ uint16_t
SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_ro
return selected_size;
}
-void SegmentIterator::_collect_runtime_filter_predicate() {
- // collect profile
- for (auto* p : _filter_info_id) {
- // There is a situation, such as with in or minmax filters,
- // where intermediate conversion to a key range or other types
- // prevents obtaining the filter id.
- if (p->is_runtime_filter()) {
- _opts.stats->filter_info[p->get_runtime_filter_id()] =
p->get_filtered_info();
- }
- }
-}
Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>&
read_column_ids,
std::vector<rowid_t>&
rowid_vector,
uint16_t* sel_rowid_idx,
size_t select_size,
@@ -2147,7 +2136,6 @@ Status
SegmentIterator::_next_batch_internal(vectorized::Block* block) {
// In SSB test, it make no difference; So need more
scenarios to test
selected_size =
_evaluate_short_circuit_predicate(_sel_rowid_idx.data(), selected_size);
- _collect_runtime_filter_predicate();
if (selected_size > 0) {
// step 3.1: output short circuit and predicate column
// when lazy materialization enables, _predicate_column_ids =
distinct(_short_cir_pred_column_ids + _vec_pred_column_ids)
diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp
index 4503749e1fe..be2be8626b8 100644
--- a/be/src/olap/tablet_reader.cpp
+++ b/be/src/olap/tablet_reader.cpp
@@ -520,16 +520,13 @@ Status TabletReader::_init_orderby_keys_param(const
ReaderParams& read_params) {
Status TabletReader::_init_conditions_param(const ReaderParams& read_params) {
SCOPED_RAW_TIMER(&_stats.tablet_reader_init_conditions_param_timer_ns);
std::vector<ColumnPredicate*> predicates;
- auto emplace_predicate = [&predicates](auto& param, ColumnPredicate*
predicate) {
- predicate->set_runtime_filter_info(param.runtime_filter_id,
param.filtered_rows_counter,
- param.input_rows_counter);
- predicates.emplace_back(predicate);
- };
- auto parse_and_emplace_predicates = [this, &emplace_predicate](auto&
params) {
+ auto parse_and_emplace_predicates = [this, &predicates](auto& params) {
for (const auto& param : params) {
ColumnPredicate* predicate =
_parse_to_predicate({param.column_name, param.filter});
- emplace_predicate(param, predicate);
+ predicate->attach_profile_counter(param.runtime_filter_id,
param.filtered_rows_counter,
+ param.input_rows_counter);
+ predicates.emplace_back(predicate);
}
};
@@ -545,7 +542,9 @@ Status TabletReader::_init_conditions_param(const
ReaderParams& read_params) {
parse_to_predicate(mcolumn, index, tmp_cond,
_predicate_arena.get());
// record condition value into predicate_params in order to pushdown
segment_iterator,
// _gen_predicate_result_sign will build predicate result unique sign
with condition value
- emplace_predicate(param, predicate);
+ predicate->attach_profile_counter(param.runtime_filter_id,
param.filtered_rows_counter,
+ param.input_rows_counter);
+ predicates.emplace_back(predicate);
}
parse_and_emplace_predicates(read_params.bloom_filters);
parse_and_emplace_predicates(read_params.bitmap_filters);
diff --git a/be/src/pipeline/exec/datagen_operator.cpp
b/be/src/pipeline/exec/datagen_operator.cpp
index 5d2c80874bd..c494edd397b 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -96,8 +96,8 @@ Status DataGenLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
// TODO: use runtime filter to filte result block, maybe this node need
derive from vscan_node.
for (const auto& filter_desc : p._runtime_filter_descs) {
std::shared_ptr<RuntimeFilterConsumer> filter;
- RETURN_IF_ERROR(state->register_consumer_runtime_filter(
- filter_desc, p.is_serial_operator(), p.node_id(), &filter,
_runtime_profile.get()));
+ RETURN_IF_ERROR(state->register_consumer_runtime_filter(filter_desc,
p.is_serial_operator(),
+ p.node_id(),
&filter));
}
return Status::OK();
}
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index fd9788e326a..37aa8fa3b77 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -43,8 +43,8 @@ Status
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
_filter_timer = ADD_TIMER(_runtime_profile, "FilterTime");
_get_data_timer = ADD_TIMER(_runtime_profile, "GetDataTime");
_materialize_data_timer = ADD_TIMER(_runtime_profile,
"MaterializeDataTime");
- RETURN_IF_ERROR(_helper.init(state, profile(), false,
_filter_dependencies, p.operator_id(),
- p.node_id(), p.get_name() +
"_FILTER_DEPENDENCY"));
+ RETURN_IF_ERROR(_helper.init(state, false, _filter_dependencies,
p.operator_id(), p.node_id(),
+ p.get_name() + "_FILTER_DEPENDENCY"));
return Status::OK();
}
@@ -81,7 +81,7 @@ Status
MultiCastDataStreamSourceLocalState::close(RuntimeState* state) {
rf_time += dep->watcher_elapse_time();
}
COUNTER_SET(_wait_for_rf_timer, rf_time);
-
+ _helper.collect_realtime_profile(profile());
return Base::close(state);
}
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp
b/be/src/pipeline/exec/olap_scan_operator.cpp
index b93c22274d9..97297dd4b6c 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -157,8 +157,6 @@ Status OlapScanLocalState::_init_profile() {
_total_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentTotal",
TUnit::UNIT);
_tablet_counter = ADD_COUNTER(_runtime_profile, "TabletNum", TUnit::UNIT);
_key_range_counter = ADD_COUNTER(_runtime_profile, "KeyRangesNum",
TUnit::UNIT);
- _runtime_filter_info = ADD_LABEL_COUNTER_WITH_LEVEL(_runtime_profile,
"RuntimeFilterInfo", 1);
-
_tablet_reader_init_timer = ADD_TIMER(_scanner_profile,
"TabletReaderInitTimer");
_tablet_reader_capture_rs_readers_timer =
ADD_TIMER(_scanner_profile, "TabletReaderCaptureRsReadersTimer");
@@ -643,36 +641,6 @@ Status OlapScanLocalState::_build_key_ranges_and_filters()
{
return Status::OK();
}
-void OlapScanLocalState::add_filter_info(int id, const PredicateFilterInfo&
update_info) {
- std::unique_lock lock(_profile_mtx);
- // update
- _filter_info[id].filtered_row += update_info.filtered_row;
- _filter_info[id].input_row += update_info.input_row;
- _filter_info[id].type = update_info.type;
- // to string
- auto& info = _filter_info[id];
- std::string filter_name = "RuntimeFilterInfo id ";
- filter_name += std::to_string(id);
- std::string info_str;
- info_str += "type = " +
type_to_string(static_cast<PredicateType>(info.type)) + ", ";
- info_str += "input = " + std::to_string(info.input_row) + ", ";
- info_str += "filtered = " + std::to_string(info.filtered_row);
- info_str = "[" + info_str + "]";
-
- // add info
- _segment_profile->add_info_string(filter_name, info_str);
-
- const std::string rf_name = "filter id = " + std::to_string(id) + " ";
-
- // add counter
- auto* input_count = ADD_CHILD_COUNTER_WITH_LEVEL(_runtime_profile, rf_name
+ "input",
- TUnit::UNIT,
"RuntimeFilterInfo", 1);
- auto* filtered_count = ADD_CHILD_COUNTER_WITH_LEVEL(_runtime_profile,
rf_name + "filtered",
- TUnit::UNIT,
"RuntimeFilterInfo", 1);
- COUNTER_SET(input_count, (int64_t)info.input_row);
- COUNTER_SET(filtered_count, (int64_t)info.filtered_row);
-}
-
OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode,
int operator_id,
const DescriptorTbl& descs, int
parallel_tasks,
const TQueryCacheParam& param)
diff --git a/be/src/pipeline/exec/olap_scan_operator.h
b/be/src/pipeline/exec/olap_scan_operator.h
index f6275df046e..ee3d995958a 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -87,8 +87,6 @@ private:
Status _init_scanners(std::list<vectorized::ScannerSPtr>* scanners)
override;
- void add_filter_info(int id, const PredicateFilterInfo& info);
-
Status _build_key_ranges_and_filters();
std::vector<std::unique_ptr<TPaloScanRange>> _scan_ranges;
@@ -122,7 +120,6 @@ private:
RuntimeProfile::Counter* _short_cond_timer = nullptr;
RuntimeProfile::Counter* _expr_filter_timer = nullptr;
RuntimeProfile::Counter* _output_col_timer = nullptr;
- std::map<int, PredicateFilterInfo> _filter_info;
RuntimeProfile::Counter* _stats_filtered_counter = nullptr;
RuntimeProfile::Counter* _stats_rp_filtered_counter = nullptr;
@@ -188,8 +185,6 @@ private:
// total number of segment related to this scan node
RuntimeProfile::Counter* _total_segment_counter = nullptr;
- RuntimeProfile::Counter* _runtime_filter_info = nullptr;
-
// timer about tablet reader
RuntimeProfile::Counter* _tablet_reader_init_timer = nullptr;
RuntimeProfile::Counter* _tablet_reader_capture_rs_readers_timer = nullptr;
@@ -217,7 +212,6 @@ private:
RuntimeProfile::Counter* _segment_create_column_readers_timer = nullptr;
RuntimeProfile::Counter* _segment_load_index_timer = nullptr;
- std::mutex _profile_mtx;
std::vector<TabletWithVersion> _tablets;
std::vector<TabletReader::ReadSource> _read_sources;
};
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index fdc5678862e..3db2d5ced00 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -18,6 +18,7 @@
#include "scan_operator.h"
#include <fmt/format.h>
+#include <gen_cpp/Metrics_types.h>
#include <cstdint>
#include <memory>
@@ -73,7 +74,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state,
LocalStateInfo& info)
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<typename Derived::Parent>();
- RETURN_IF_ERROR(_helper.init(state, profile(), p.is_serial_operator(),
_filter_dependencies,
+ RETURN_IF_ERROR(_helper.init(state, p.is_serial_operator(),
_filter_dependencies,
p.operator_id(), p.node_id(),
p.get_name() + "_FILTER_DEPENDENCY"));
RETURN_IF_ERROR(_init_profile());
@@ -287,7 +288,9 @@ Status ScanLocalState<Derived>::_normalize_predicate(
if (need_set_runtime_filter_id) {
auto* rf_expr =
assert_cast<vectorized::VRuntimeFilterWrapper*>(
conjunct_expr_root.get());
- value_range.set_runtime_filter_info(
+
DCHECK(rf_expr->predicate_filtered_rows_counter() != nullptr);
+
DCHECK(rf_expr->predicate_input_rows_counter() != nullptr);
+ value_range.attach_profile_counter(
rf_expr->filter_id(),
rf_expr->predicate_filtered_rows_counter(),
rf_expr->predicate_input_rows_counter());
@@ -591,8 +594,8 @@ Status
ScanLocalState<Derived>::_normalize_in_and_eq_predicate(vectorized::VExpr
iter = hybrid_set->begin();
} else {
int runtime_filter_id = -1;
- RuntimeProfile::Counter* predicate_filtered_rows_counter =
nullptr;
- RuntimeProfile::Counter* predicate_input_rows_counter =
nullptr;
+ std::shared_ptr<RuntimeProfile::Counter>
predicate_filtered_rows_counter = nullptr;
+ std::shared_ptr<RuntimeProfile::Counter>
predicate_input_rows_counter = nullptr;
if (expr_ctx->root()->is_rf_wrapper()) {
auto* rf_expr =
assert_cast<vectorized::VRuntimeFilterWrapper*>(expr_ctx->root().get());
@@ -1275,7 +1278,7 @@ Status ScanLocalState<Derived>::close(RuntimeState*
state) {
std::list<std::shared_ptr<vectorized::ScannerDelegate>> {}.swap(_scanners);
COUNTER_SET(_wait_for_dependency_timer,
_scan_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_rf_timer, rf_time);
-
+ _helper.collect_realtime_profile(profile());
return PipelineXLocalState<>::close(state);
}
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 4a8ccccb9ec..b0600320f0d 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -508,10 +508,10 @@ Status RuntimeState::register_producer_runtime_filter(
Status RuntimeState::register_consumer_runtime_filter(
const doris::TRuntimeFilterDesc& desc, bool need_local_merge, int
node_id,
- std::shared_ptr<RuntimeFilterConsumer>* consumer_filter,
RuntimeProfile* parent_profile) {
+ std::shared_ptr<RuntimeFilterConsumer>* consumer_filter) {
bool need_merge = desc.has_remote_targets || need_local_merge;
RuntimeFilterMgr* mgr = need_merge ? global_runtime_filter_mgr() :
local_runtime_filter_mgr();
- return mgr->register_consumer_filter(desc, node_id, consumer_filter,
parent_profile);
+ return mgr->register_consumer_filter(desc, node_id, consumer_filter);
}
bool RuntimeState::is_nereids() const {
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index a1b3f15c0aa..6db14254a11 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -565,10 +565,9 @@ public:
std::shared_ptr<RuntimeFilterProducer>* producer_filter,
RuntimeProfile* parent_profile);
- Status register_consumer_runtime_filter(const doris::TRuntimeFilterDesc&
desc,
- bool need_local_merge, int node_id,
-
std::shared_ptr<RuntimeFilterConsumer>* consumer_filter,
- RuntimeProfile* parent_profile);
+ Status register_consumer_runtime_filter(
+ const doris::TRuntimeFilterDesc& desc, bool need_local_merge, int
node_id,
+ std::shared_ptr<RuntimeFilterConsumer>* consumer_filter);
bool is_nereids() const;
diff --git a/be/src/runtime_filter/runtime_filter_consumer.cpp
b/be/src/runtime_filter/runtime_filter_consumer.cpp
index 33daafca322..5a35c7ca5ad 100644
--- a/be/src/runtime_filter/runtime_filter_consumer.cpp
+++ b/be/src/runtime_filter/runtime_filter_consumer.cpp
@@ -17,11 +17,11 @@
#include "runtime_filter/runtime_filter_consumer.h"
-#include "exprs/create_predicate_function.h"
+#include "exprs/minmax_predicate.h"
+#include "util/runtime_profile.h"
#include "vec/exprs/vbitmap_predicate.h"
#include "vec/exprs/vbloom_predicate.h"
#include "vec/exprs/vdirect_in_predicate.h"
-#include "vec/exprs/vexpr_context.h"
namespace doris {
#include "common/compile_check_begin.h"
@@ -38,23 +38,9 @@ Status RuntimeFilterConsumer::_apply_ready_expr(
auto origin_size = push_exprs.size();
RETURN_IF_ERROR(_get_push_exprs(push_exprs, _probe_expr));
- // The runtime filter is pushed down, adding filtering information.
- auto* expr_filtered_rows_counter =
_execution_profile->add_collaboration_counter(
- "ExprFilteredRows", TUnit::UNIT, _rf_filter);
- auto* expr_input_rows_counter =
- _execution_profile->add_collaboration_counter("ExprInputRows",
TUnit::UNIT, _rf_input);
- auto* expr_always_true_counter =
- ADD_COUNTER(_execution_profile, "AlwaysTruePassRows", TUnit::UNIT);
-
- auto* predicate_filtered_rows_counter =
_storage_profile->add_collaboration_counter(
- "PredicateFilteredRows", TUnit::UNIT, _rf_filter);
- auto* predicate_input_rows_counter =
_storage_profile->add_collaboration_counter(
- "PredicateInputRows", TUnit::UNIT, _rf_input);
for (auto i = origin_size; i < push_exprs.size(); i++) {
- push_exprs[i]->attach_profile_counter(
- expr_filtered_rows_counter, expr_input_rows_counter,
expr_always_true_counter,
- predicate_filtered_rows_counter, predicate_input_rows_counter);
+ push_exprs[i]->attach_profile_counter(_rf_input, _rf_filter,
_always_true_counter);
}
return Status::OK();
}
@@ -225,4 +211,33 @@ Status
RuntimeFilterConsumer::_get_push_exprs(std::vector<vectorized::VRuntimeFi
return Status::OK();
}
+void RuntimeFilterConsumer::collect_realtime_profile(RuntimeProfile*
parent_operator_profile) {
+ DCHECK(parent_operator_profile != nullptr);
+
+ // Counter* is owned by RuntimeProfile, so no need to free.
+ RuntimeProfile::Counter* c = parent_operator_profile->add_counter(
+ fmt::format("RF{} InputRows", _filter_id), TUnit::UNIT,
"RuntimeFilterInfo", 1);
+ c->update(_rf_input->value());
+
+ c = parent_operator_profile->add_counter(fmt::format("RF{} FilterRows",
_filter_id),
+ TUnit::UNIT, "RuntimeFilterInfo",
1);
+ c->update(_rf_filter->value());
+ c = parent_operator_profile->add_counter(fmt::format("RF{} WaitTime",
_filter_id),
+ TUnit::TIME_NS,
"RuntimeFilterInfo", 2);
+ c->update(_wait_timer->value());
+
+ c = parent_operator_profile->add_counter(fmt::format("RF{}
AlwaysTrueFilterRows", _filter_id),
+ TUnit::UNIT, "RuntimeFilterInfo",
2);
+ c->update(_always_true_counter->value());
+
+ {
+ // since debug_string will read from RuntimeFilter::_wrapper
+ // and it is a shared_ptr, instead of a atomic_shared_ptr
+ // so it is not thread safe
+ std::unique_lock<std::mutex> l(_mtx);
+ parent_operator_profile->add_description(fmt::format("RF{} Info",
_filter_id),
+ debug_string(),
"RuntimeFilterInfo");
+ }
+}
+
} // namespace doris
diff --git a/be/src/runtime_filter/runtime_filter_consumer.h
b/be/src/runtime_filter/runtime_filter_consumer.h
index cc6581fa9fa..eba46b98406 100644
--- a/be/src/runtime_filter/runtime_filter_consumer.h
+++ b/be/src/runtime_filter/runtime_filter_consumer.h
@@ -17,6 +17,9 @@
#pragma once
+#include <gen_cpp/Metrics_types.h>
+
+#include <memory>
#include <string>
#include "pipeline/dependency.h"
@@ -39,12 +42,10 @@ public:
};
static Status create(RuntimeFilterParamsContext* state, const
TRuntimeFilterDesc* desc,
- int node_id, std::shared_ptr<RuntimeFilterConsumer>*
res,
- RuntimeProfile* parent_profile) {
+ int node_id, std::shared_ptr<RuntimeFilterConsumer>*
res) {
*res = std::shared_ptr<RuntimeFilterConsumer>(
- new RuntimeFilterConsumer(state, desc, node_id,
parent_profile));
+ new RuntimeFilterConsumer(state, desc, node_id));
RETURN_IF_ERROR((*res)->_init_with_desc(desc,
&state->get_query_ctx()->query_options()));
- (*res)->_profile->add_info_string("Info", ((*res)->debug_string()));
return Status::OK();
}
@@ -65,6 +66,9 @@ public:
bool is_applied() { return _rf_state == State::APPLIED; }
+ // Called by RuntimeFilterConsumerHelper
+ void collect_realtime_profile(RuntimeProfile* parent_operator_profile);
+
static std::string to_string(const State& state) {
switch (state) {
case State::NOT_READY:
@@ -81,31 +85,20 @@ public:
}
private:
+ friend class RuntimeFilterProducer;
+
RuntimeFilterConsumer(RuntimeFilterParamsContext* state, const
TRuntimeFilterDesc* desc,
- int node_id, RuntimeProfile* parent_profile)
+ int node_id)
: RuntimeFilter(state, desc),
_probe_expr(desc->planId_to_target_expr.find(node_id)->second),
- _profile(new RuntimeProfile(fmt::format("RF{}",
desc->filter_id))),
- _storage_profile(new RuntimeProfile(fmt::format("Storage",
desc->filter_id))),
- _execution_profile(new RuntimeProfile(fmt::format("Execution",
desc->filter_id))),
_registration_time(MonotonicMillis()),
- _rf_state(State::NOT_READY) {
+ _rf_state(State::NOT_READY),
+ _filter_id(desc->filter_id) {
// If bitmap filter is not applied, it will cause the query result to
be incorrect
bool wait_infinitely =
_state->get_query_ctx()->runtime_filter_wait_infinitely() ||
_runtime_filter_type ==
RuntimeFilterType::BITMAP_FILTER;
_rf_wait_time_ms = wait_infinitely ?
_state->get_query_ctx()->execution_timeout() * 1000
:
_state->get_query_ctx()->runtime_filter_wait_time_ms();
- _profile->add_info_string("TimeoutLimit",
std::to_string(_rf_wait_time_ms) + "ms");
-
- parent_profile->add_child(_profile.get(), true, nullptr);
- _profile->add_child(_storage_profile.get(), true, nullptr);
- _profile->add_child(_execution_profile.get(), true, nullptr);
- _wait_timer = ADD_TIMER(_profile, "WaitTime");
-
- _rf_filter = ADD_COUNTER_WITH_LEVEL(
- parent_profile, fmt::format("RF{} FilterRows",
desc->filter_id), TUnit::UNIT, 1);
- _rf_input = ADD_COUNTER_WITH_LEVEL(
- parent_profile, fmt::format("RF{} InputRows",
desc->filter_id), TUnit::UNIT, 1);
DorisMetrics::instance()->runtime_filter_consumer_num->increment(1);
}
@@ -142,21 +135,25 @@ private:
_check_state({State::NOT_READY, State::TIMEOUT});
}
_rf_state = rf_state;
- _profile->add_info_string("Info", debug_string());
}
TExpr _probe_expr;
std::vector<std::shared_ptr<pipeline::RuntimeFilterTimer>> _filter_timer;
- std::unique_ptr<RuntimeProfile> _profile;
- std::unique_ptr<RuntimeProfile> _storage_profile; // for storage layer
stats
- std::unique_ptr<RuntimeProfile> _execution_profile; // for execution layer
stats
- RuntimeProfile::Counter* _wait_timer = nullptr;
+ std::shared_ptr<RuntimeProfile::Counter> _wait_timer =
+ std::make_shared<RuntimeProfile::Counter>(TUnit::TIME_NS, 0);
//_rf_filter is used to record the number of rows filtered by the runtime
filter.
//It aggregates the filtering statistics from both the Storage and
Execution.
- RuntimeProfile::Counter* _rf_filter = nullptr;
- RuntimeProfile::Counter* _rf_input = nullptr;
+ // Counter will be shared by RuntimeFilterConsumer & VRuntimeFilterWrapper
+ // OperatorLocalState's close method will collect the statistics from
RuntimeFilterConsumer
+ // VRuntimeFilterWrapper will update the statistics.
+ std::shared_ptr<RuntimeProfile::Counter> _rf_filter =
+ std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0, 1);
+ std::shared_ptr<RuntimeProfile::Counter> _rf_input =
+ std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0, 1);
+ std::shared_ptr<RuntimeProfile::Counter> _always_true_counter =
+ std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0, 1);
int32_t _rf_wait_time_ms;
const int64_t _registration_time;
@@ -169,6 +166,7 @@ private:
bool _reached_timeout = false;
friend class RuntimeFilterProducer;
+ int _filter_id = -1;
};
#include "common/compile_check_end.h"
} // namespace doris
diff --git a/be/src/runtime_filter/runtime_filter_consumer_helper.cpp
b/be/src/runtime_filter/runtime_filter_consumer_helper.cpp
index 52ceeb59c77..0ec0b2fb38a 100644
--- a/be/src/runtime_filter/runtime_filter_consumer_helper.cpp
+++ b/be/src/runtime_filter/runtime_filter_consumer_helper.cpp
@@ -19,6 +19,7 @@
#include "pipeline/pipeline_task.h"
#include "runtime_filter/runtime_filter_consumer.h"
+#include "util/runtime_profile.h"
namespace doris {
#include "common/compile_check_begin.h"
@@ -27,19 +28,16 @@ RuntimeFilterConsumerHelper::RuntimeFilterConsumerHelper(
const RowDescriptor& row_descriptor)
: _node_id(_node_id),
_runtime_filter_descs(runtime_filters),
- _row_descriptor_ref(row_descriptor),
- _profile(new RuntimeProfile("RuntimeFilterConsumerHelper")) {
+ _row_descriptor_ref(row_descriptor) {
_blocked_by_rf = std::make_shared<std::atomic_bool>(false);
}
Status RuntimeFilterConsumerHelper::init(
- RuntimeState* state, RuntimeProfile* profile, bool need_local_merge,
+ RuntimeState* state, bool need_local_merge,
std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
dependencies, const int id,
const int node_id, const std::string& name) {
_state = state;
- profile->add_child(_profile.get(), true, nullptr);
RETURN_IF_ERROR(_register_runtime_filter(need_local_merge));
- _acquire_runtime_filter_timer = ADD_TIMER(_profile,
"AcquireRuntimeFilterTime");
_init_dependency(dependencies, id, node_id, name);
return Status::OK();
}
@@ -49,7 +47,7 @@ Status
RuntimeFilterConsumerHelper::_register_runtime_filter(bool need_local_mer
for (size_t i = 0; i < filter_size; ++i) {
std::shared_ptr<RuntimeFilterConsumer> filter;
RETURN_IF_ERROR(_state->register_consumer_runtime_filter(
- _runtime_filter_descs[i], need_local_merge, _node_id, &filter,
_profile.get()));
+ _runtime_filter_descs[i], need_local_merge, _node_id,
&filter));
_consumers.emplace_back(filter);
}
return Status::OK();
@@ -84,7 +82,7 @@ void RuntimeFilterConsumerHelper::_init_dependency(
Status RuntimeFilterConsumerHelper::acquire_runtime_filter(
vectorized::VExprContextSPtrs& conjuncts) {
- SCOPED_TIMER(_acquire_runtime_filter_timer);
+ SCOPED_TIMER(_acquire_runtime_filter_timer.get());
std::vector<vectorized::VRuntimeFilterPtr> vexprs;
for (size_t i = 0; i < _runtime_filter_descs.size(); ++i) {
RETURN_IF_ERROR(_consumers[i]->acquire_expr(vexprs));
@@ -148,4 +146,17 @@ Status
RuntimeFilterConsumerHelper::try_append_late_arrival_runtime_filter(
return Status::OK();
}
+void RuntimeFilterConsumerHelper::collect_realtime_profile(
+ RuntimeProfile* parent_operator_profile) {
+ std::ignore = parent_operator_profile->add_counter("RuntimeFilterInfo",
TUnit::NONE,
+
RuntimeProfile::ROOT_COUNTER, 1);
+ RuntimeProfile::Counter* c = parent_operator_profile->add_counter(
+ "AcquireRuntimeFilter", TUnit::TIME_NS, "RuntimeFilterInfo", 2);
+ c->update(_acquire_runtime_filter_timer->value());
+
+ for (auto& consumer : _consumers) {
+ consumer->collect_realtime_profile(parent_operator_profile);
+ }
+}
+
} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime_filter/runtime_filter_consumer_helper.h
b/be/src/runtime_filter/runtime_filter_consumer_helper.h
index 55005b6da33..644343c431b 100644
--- a/be/src/runtime_filter/runtime_filter_consumer_helper.h
+++ b/be/src/runtime_filter/runtime_filter_consumer_helper.h
@@ -17,7 +17,10 @@
#pragma once
+#include <mutex>
+
#include "pipeline/dependency.h"
+#include "util/runtime_profile.h"
#include "vec/exprs/vruntimefilter_wrapper.h"
namespace doris {
@@ -33,7 +36,7 @@ public:
const RowDescriptor& row_descriptor);
~RuntimeFilterConsumerHelper() = default;
- Status init(RuntimeState* state, RuntimeProfile* profile, bool
need_local_merge,
+ Status init(RuntimeState* state, bool need_local_merge,
std::vector<std::shared_ptr<pipeline::RuntimeFilterDependency>>&
runtime_filter_dependencies,
const int id, const int node_id, const std::string& name);
@@ -46,6 +49,10 @@ public:
Status try_append_late_arrival_runtime_filter(int* arrived_rf_num,
vectorized::VExprContextSPtrs& conjuncts);
+ // Called by XXXLocalState::close()
+ // parent_operator_profile is owned by LocalState so update it is safe at
here.
+ void collect_realtime_profile(RuntimeProfile* parent_operator_profile);
+
private:
// Register and get all runtime filters at Init phase.
Status _register_runtime_filter(bool need_local_merge);
@@ -70,8 +77,8 @@ private:
bool _is_all_rf_applied = true;
std::shared_ptr<std::atomic_bool> _blocked_by_rf;
- RuntimeProfile::Counter* _acquire_runtime_filter_timer = nullptr;
- std::unique_ptr<RuntimeProfile> _profile;
+ std::unique_ptr<RuntimeProfile::Counter> _acquire_runtime_filter_timer =
+ std::make_unique<RuntimeProfile::Counter>(TUnit::TIME_NS, 0);
};
#include "common/compile_check_end.h"
} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime_filter/runtime_filter_mgr.cpp
b/be/src/runtime_filter/runtime_filter_mgr.cpp
index 4b718620997..a5c7ab71b19 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.cpp
+++ b/be/src/runtime_filter/runtime_filter_mgr.cpp
@@ -70,15 +70,14 @@ std::vector<std::shared_ptr<RuntimeFilterConsumer>>
RuntimeFilterMgr::get_consum
return iter->second;
}
-Status RuntimeFilterMgr::register_consumer_filter(const TRuntimeFilterDesc&
desc, int node_id,
-
std::shared_ptr<RuntimeFilterConsumer>* consumer,
- RuntimeProfile*
parent_profile) {
+Status RuntimeFilterMgr::register_consumer_filter(
+ const TRuntimeFilterDesc& desc, int node_id,
+ std::shared_ptr<RuntimeFilterConsumer>* consumer) {
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
int32_t key = desc.filter_id;
std::lock_guard<std::mutex> l(_lock);
- RETURN_IF_ERROR(
- RuntimeFilterConsumer::create(_state, &desc, node_id, consumer,
parent_profile));
+ RETURN_IF_ERROR(RuntimeFilterConsumer::create(_state, &desc, node_id,
consumer));
_consumer_map[key].push_back(*consumer);
return Status::OK();
}
diff --git a/be/src/runtime_filter/runtime_filter_mgr.h
b/be/src/runtime_filter/runtime_filter_mgr.h
index b6e2c89e820..edaf100a7f2 100644
--- a/be/src/runtime_filter/runtime_filter_mgr.h
+++ b/be/src/runtime_filter/runtime_filter_mgr.h
@@ -89,8 +89,7 @@ public:
// get/set consumer
std::vector<std::shared_ptr<RuntimeFilterConsumer>>
get_consume_filters(int filter_id);
Status register_consumer_filter(const TRuntimeFilterDesc& desc, int
node_id,
- std::shared_ptr<RuntimeFilterConsumer>*
consumer_filter,
- RuntimeProfile* parent_profile);
+ std::shared_ptr<RuntimeFilterConsumer>*
consumer_filter);
Status register_local_merger_producer_filter(
const TRuntimeFilterDesc& desc,
std::shared_ptr<RuntimeFilterProducer> producer_filter,
diff --git a/be/src/util/runtime_profile.cpp b/be/src/util/runtime_profile.cpp
index cf684304697..d9fb7183321 100644
--- a/be/src/util/runtime_profile.cpp
+++ b/be/src/util/runtime_profile.cpp
@@ -397,9 +397,6 @@ RuntimeProfile::Counter* RuntimeProfile::add_counter(const
std::string& name, TU
std::lock_guard<std::mutex> l(_counter_map_lock);
if (_counter_map.find(name) != _counter_map.end()) {
- // TODO: FIX DUPLICATE COUNTERS
- // In production, we will return the existing counter.
- // This will not make be crash, but the result may be wrong.
return _counter_map[name];
}
@@ -434,19 +431,16 @@ RuntimeProfile::NonZeroCounter*
RuntimeProfile::add_nonzero_counter(
return counter;
}
-RuntimeProfile::CollaborationCounter*
RuntimeProfile::add_collaboration_counter(
- const std::string& name, TUnit::type type, Counter* other_counter,
- const std::string& parent_counter_name, int64_t level) {
+RuntimeProfile::DerivedCounter* RuntimeProfile::add_derived_counter(
+ const std::string& name, TUnit::type type, const
DerivedCounterFunction& counter_fn,
+ const std::string& parent_counter_name) {
std::lock_guard<std::mutex> l(_counter_map_lock);
+
if (_counter_map.find(name) != _counter_map.end()) {
- DCHECK(dynamic_cast<CollaborationCounter*>(_counter_map[name]));
- return static_cast<CollaborationCounter*>(_counter_map[name]);
+ return nullptr;
}
- DCHECK(parent_counter_name == ROOT_COUNTER ||
- _counter_map.find(parent_counter_name) != _counter_map.end());
- CollaborationCounter* counter =
- _pool->add(new CollaborationCounter(type, level, other_counter));
+ DerivedCounter* counter = _pool->add(new DerivedCounter(type, counter_fn));
_counter_map[name] = counter;
std::set<std::string>* child_counters =
find_or_insert(&_child_counter_map, parent_counter_name,
std::set<std::string>());
@@ -454,21 +448,29 @@ RuntimeProfile::CollaborationCounter*
RuntimeProfile::add_collaboration_counter(
return counter;
}
-RuntimeProfile::DerivedCounter* RuntimeProfile::add_derived_counter(
- const std::string& name, TUnit::type type, const
DerivedCounterFunction& counter_fn,
- const std::string& parent_counter_name) {
+void RuntimeProfile::add_description(const std::string& name, const
std::string& description,
+ std::string parent_counter_name) {
std::lock_guard<std::mutex> l(_counter_map_lock);
if (_counter_map.find(name) != _counter_map.end()) {
- return nullptr;
+ Counter* counter = _counter_map[name];
+ if (dynamic_cast<DescriptionEntry*>(counter) != nullptr) {
+ // Do replace instead of update to avoid data race.
+ _counter_map.erase(name);
+ } else {
+ DCHECK(false) << "Counter type mismatch, name: " << name
+ << ", type: " << counter->type() << ", description:
" << description;
+ }
}
- DerivedCounter* counter = _pool->add(new DerivedCounter(type, counter_fn));
+ // Parent counter must already exist.
+ DCHECK(parent_counter_name == ROOT_COUNTER ||
+ _counter_map.find(parent_counter_name) != _counter_map.end());
+ DescriptionEntry* counter = _pool->add(new DescriptionEntry(name,
description));
_counter_map[name] = counter;
std::set<std::string>* child_counters =
find_or_insert(&_child_counter_map, parent_counter_name,
std::set<std::string>());
child_counters->insert(name);
- return counter;
}
RuntimeProfile::Counter* RuntimeProfile::get_counter(const std::string& name) {
diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h
index cae1a7cefe7..ee9a71ee900 100644
--- a/be/src/util/runtime_profile.h
+++ b/be/src/util/runtime_profile.h
@@ -295,41 +295,36 @@ public:
const std::string _parent_name;
};
- // When the collaboration Counter modifies itself, it also modifies the
other counter.
-
- class CollaborationCounter : public Counter {
+ class DescriptionEntry : public Counter {
public:
- CollaborationCounter(TUnit::type type, int64_t level, Counter*
other_counter,
- int64_t value = 0)
- : Counter(type, value, level), _other_counter(other_counter) {}
+ DescriptionEntry(const std::string& name, const std::string&
description)
+ : Counter(TUnit::NONE, 0, 2), _description(description),
_name(name) {}
virtual Counter* clone() const override {
- return new CollaborationCounter(type(), level(), _other_counter,
value());
- }
-
- void update(int64_t delta) override {
- if (_other_counter != nullptr) {
- _other_counter->update(delta);
- }
- Counter::update(delta);
+ return new DescriptionEntry(_name, _description);
}
void set(int64_t value) override {
- if (_other_counter != nullptr) {
- _other_counter->set(value);
- }
- Counter::set(value);
+ // Do nothing
}
-
void set(double value) override {
- if (_other_counter != nullptr) {
- _other_counter->set(value);
- }
- Counter::set(value);
+ // Do nothing
+ }
+ void update(int64_t delta) override {
+ // Do nothing
+ }
+
+ TCounter to_thrift(const std::string& name) const override {
+ TCounter counter;
+ counter.name = name;
+ counter.__set_level(2);
+ counter.__set_description(_description);
+ return counter;
}
private:
- Counter* _other_counter = nullptr; // Pointer to the other counter to
be modified
+ const std::string _description;
+ const std::string _name;
};
// Create a runtime profile object with 'name'.
@@ -385,11 +380,9 @@ public:
const std::string& parent_counter_name =
RuntimeProfile::ROOT_COUNTER,
int64_t level = 2);
- CollaborationCounter* add_collaboration_counter(
- const std::string& name, TUnit::type type, Counter* other_counter,
- const std::string& parent_counter_name =
RuntimeProfile::ROOT_COUNTER,
- int64_t level = 2);
-
+ // Add a description entry under target counter.
+ void add_description(const std::string& name, const std::string&
description,
+ std::string parent_counter_name);
// Add a derived counter with 'name'/'type'. The counter is owned by the
// RuntimeProfile object.
// If parent_counter_name is a non-empty string, the counter is added as a
child of
diff --git a/be/src/vec/exec/scan/olap_scanner.cpp
b/be/src/vec/exec/scan/olap_scanner.cpp
index d623a54f4f9..605cfe92c7b 100644
--- a/be/src/vec/exec/scan/olap_scanner.cpp
+++ b/be/src/vec/exec/scan/olap_scanner.cpp
@@ -592,9 +592,6 @@ void OlapScanner::_collect_profile_before_close() {
COUNTER_UPDATE(local_state->_rows_short_circuit_cond_input_counter,
stats.short_circuit_cond_input_rows);
COUNTER_UPDATE(local_state->_rows_expr_cond_input_counter,
stats.expr_cond_input_rows);
- for (const auto& [id, info] : stats.filter_info) {
- local_state->add_filter_info(id, info);
- }
COUNTER_UPDATE(local_state->_stats_filtered_counter,
stats.rows_stats_filtered);
COUNTER_UPDATE(local_state->_stats_rp_filtered_counter,
stats.rows_stats_rp_filtered);
COUNTER_UPDATE(local_state->_dict_filtered_counter,
stats.rows_dict_filtered);
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp
b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
index d864dbd833a..41d2a6d76f4 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
@@ -89,8 +89,6 @@ void VRuntimeFilterWrapper::close(VExprContext* context,
Status VRuntimeFilterWrapper::execute(VExprContext* context, Block* block,
int* result_column_id) {
DCHECK(_open_finished || _getting_const_col);
- DCHECK(_expr_filtered_rows_counter && _expr_input_rows_counter &&
_always_true_counter)
- << "rf counter must be initialized";
if (_judge_counter.fetch_sub(1) == 0) {
reset_judge_selectivity();
}
@@ -99,9 +97,7 @@ Status VRuntimeFilterWrapper::execute(VExprContext* context,
Block* block, int*
block->insert({create_always_true_column(size,
_data_type->is_nullable()), _data_type,
expr_name()});
*result_column_id = block->columns() - 1;
- if (_always_true_counter) {
- COUNTER_UPDATE(_always_true_counter, size);
- }
+ COUNTER_UPDATE(_always_true_filter_rows, size);
return Status::OK();
} else {
if (_getting_const_col) {
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.h
b/be/src/vec/exprs/vruntimefilter_wrapper.h
index 72627bb6cd9..84b5538e130 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.h
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.h
@@ -17,8 +17,11 @@
#pragma once
+#include <gen_cpp/Metrics_types.h>
+
#include <atomic>
#include <cstdint>
+#include <memory>
#include <string>
#include "common/config.h"
@@ -61,21 +64,27 @@ public:
VExprSPtr get_impl() const override { return _impl; }
- void attach_profile_counter(RuntimeProfile::Counter*
expr_filtered_rows_counter,
- RuntimeProfile::Counter*
expr_input_rows_counter,
- RuntimeProfile::Counter* always_true_counter,
- RuntimeProfile::Counter*
predicate_filtered_rows_counter,
- RuntimeProfile::Counter*
predicate_input_rows_counter) {
- _expr_filtered_rows_counter = expr_filtered_rows_counter;
- _expr_input_rows_counter = expr_input_rows_counter;
- _always_true_counter = always_true_counter;
- _predicate_filtered_rows_counter = predicate_filtered_rows_counter;
- _predicate_input_rows_counter = predicate_input_rows_counter;
+ void attach_profile_counter(std::shared_ptr<RuntimeProfile::Counter>
rf_input_rows,
+ std::shared_ptr<RuntimeProfile::Counter>
rf_filter_rows,
+ std::shared_ptr<RuntimeProfile::Counter>
always_true_filter_rows) {
+ DCHECK(rf_input_rows != nullptr);
+ DCHECK(rf_filter_rows != nullptr);
+ DCHECK(always_true_filter_rows != nullptr);
+
+ if (rf_input_rows != nullptr) {
+ _rf_input_rows = rf_input_rows;
+ }
+ if (rf_filter_rows != nullptr) {
+ _rf_filter_rows = rf_filter_rows;
+ }
+ if (always_true_filter_rows != nullptr) {
+ _always_true_filter_rows = always_true_filter_rows;
+ }
}
void update_counters(int64_t filter_rows, int64_t input_rows) {
- COUNTER_UPDATE(_expr_filtered_rows_counter, filter_rows);
- COUNTER_UPDATE(_expr_input_rows_counter, input_rows);
+ COUNTER_UPDATE(_rf_filter_rows, filter_rows);
+ COUNTER_UPDATE(_rf_input_rows, input_rows);
}
template <typename T>
@@ -100,8 +109,12 @@ public:
}
}
- auto* predicate_filtered_rows_counter() const { return
_predicate_filtered_rows_counter; }
- auto* predicate_input_rows_counter() const { return
_predicate_input_rows_counter; }
+ std::shared_ptr<RuntimeProfile::Counter> predicate_filtered_rows_counter()
const {
+ return _rf_filter_rows;
+ }
+ std::shared_ptr<RuntimeProfile::Counter> predicate_input_rows_counter()
const {
+ return _rf_input_rows;
+ }
private:
void reset_judge_selectivity() {
@@ -125,15 +138,12 @@ private:
std::atomic_uint64_t _judge_filter_rows = 0;
std::atomic_int _always_true = false;
- RuntimeProfile::Counter* _expr_filtered_rows_counter = nullptr;
- RuntimeProfile::Counter* _expr_input_rows_counter = nullptr;
- RuntimeProfile::Counter* _always_true_counter = nullptr;
-
- // Used to record filtering information on predicates.
- // The transfer relationship of these counters is:
- // RuntimeFilterConsumer(create) ==> VRuntimeFilterWrapper(pass) ==>
FilterOlapParam(pass) ==> ColumnPredicate(record)
- RuntimeProfile::Counter* _predicate_filtered_rows_counter = nullptr;
- RuntimeProfile::Counter* _predicate_input_rows_counter = nullptr;
+ std::shared_ptr<RuntimeProfile::Counter> _rf_input_rows =
+ std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
+ std::shared_ptr<RuntimeProfile::Counter> _rf_filter_rows =
+ std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
+ std::shared_ptr<RuntimeProfile::Counter> _always_true_filter_rows =
+ std::make_shared<RuntimeProfile::Counter>(TUnit::UNIT, 0);
std::string _expr_name;
double _ignore_thredhold;
diff --git a/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp
b/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp
index 46a50c3f363..215de7d8358 100644
--- a/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_consumer_helper_test.cpp
@@ -78,8 +78,8 @@ TEST_F(RuntimeFilterConsumerHelperTest, basic) {
const_cast<std::vector<TupleDescriptor*>&>(row_desc._tuple_desc_map).push_back(&tuple_desc);
auto helper = RuntimeFilterConsumerHelper(0, runtime_filter_descs,
row_desc);
- FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.init(_runtime_states[0].get(),
&_profile, true,
- runtime_filter_dependencies,
0, 0, ""));
+ FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+ helper.init(_runtime_states[0].get(), true,
runtime_filter_dependencies, 0, 0, ""));
vectorized::VExprContextSPtrs conjuncts;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(helper.acquire_runtime_filter(conjuncts));
diff --git a/be/test/runtime_filter/runtime_filter_consumer_test.cpp
b/be/test/runtime_filter/runtime_filter_consumer_test.cpp
index de6882b5aca..215edc78a30 100644
--- a/be/test/runtime_filter/runtime_filter_consumer_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_consumer_test.cpp
@@ -30,9 +30,8 @@ class RuntimeFilterConsumerTest : public RuntimeFilterTest {
public:
void test_signal_aquire(TRuntimeFilterDesc desc) {
std::shared_ptr<RuntimeFilterConsumer> consumer;
- FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-
RuntimeFilterConsumer::create(RuntimeFilterParamsContext::create(_query_ctx.get()),
- &desc, 0, &consumer, &_profile));
+ FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create(
+ RuntimeFilterParamsContext::create(_query_ctx.get()), &desc,
0, &consumer));
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
@@ -60,11 +59,11 @@ TEST_F(RuntimeFilterConsumerTest, basic) {
std::shared_ptr<RuntimeFilterConsumer> consumer;
auto desc =
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create(
- RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0,
&consumer, &_profile));
+ RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0,
&consumer));
std::shared_ptr<RuntimeFilterConsumer> registed_consumer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter(
- desc, true, 0, ®isted_consumer, &_profile));
+ desc, true, 0, ®isted_consumer));
}
TEST_F(RuntimeFilterConsumerTest, signal_aquire_in_or_bloom) {
@@ -117,7 +116,7 @@ TEST_F(RuntimeFilterConsumerTest, timeout_aquire) {
std::shared_ptr<RuntimeFilterConsumer> consumer;
auto desc =
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create(
- RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0,
&consumer, &_profile));
+ RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0,
&consumer));
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
@@ -142,18 +141,18 @@ TEST_F(RuntimeFilterConsumerTest, wait_infinity) {
const_cast<TQueryOptions&>(_query_ctx->_query_options)
.__set_runtime_filter_wait_infinitely(true);
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create(
- RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0,
&consumer, &_profile));
+ RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0,
&consumer));
std::shared_ptr<RuntimeFilterConsumer> registed_consumer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter(
- desc, true, 0, ®isted_consumer, &_profile));
+ desc, true, 0, ®isted_consumer));
}
TEST_F(RuntimeFilterConsumerTest, aquire_disabled) {
std::shared_ptr<RuntimeFilterConsumer> consumer;
auto desc =
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create(
- RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0,
&consumer, &_profile));
+ RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0,
&consumer));
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
@@ -171,7 +170,7 @@ TEST_F(RuntimeFilterConsumerTest, aquire_ignored) {
std::shared_ptr<RuntimeFilterConsumer> consumer;
auto desc =
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create(
- RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0,
&consumer, &_profile));
+ RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0,
&consumer));
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
@@ -193,9 +192,8 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) {
std::shared_ptr<RuntimeFilterConsumer> consumer;
{
- auto st =
-
RuntimeFilterConsumer::create(RuntimeFilterParamsContext::create(_query_ctx.get()),
- &desc, 0, &consumer, &_profile);
+ auto st = RuntimeFilterConsumer::create(
+ RuntimeFilterParamsContext::create(_query_ctx.get()), &desc,
0, &consumer);
ASSERT_FALSE(st.ok());
}
desc.__set_src_expr(
@@ -216,17 +214,15 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) {
.build());
{
- auto st =
-
RuntimeFilterConsumer::create(RuntimeFilterParamsContext::create(_query_ctx.get()),
- &desc, 0, &consumer, &_profile);
+ auto st = RuntimeFilterConsumer::create(
+ RuntimeFilterParamsContext::create(_query_ctx.get()), &desc,
0, &consumer);
ASSERT_FALSE(st.ok());
}
{
desc.__set_has_local_targets(false);
desc.__set_has_remote_targets(true);
- auto st =
-
RuntimeFilterConsumer::create(RuntimeFilterParamsContext::create(_query_ctx.get()),
- &desc, 0, &consumer, &_profile);
+ auto st = RuntimeFilterConsumer::create(
+ RuntimeFilterParamsContext::create(_query_ctx.get()), &desc,
0, &consumer);
ASSERT_FALSE(st.ok());
desc.__set_has_local_targets(true);
desc.__set_has_remote_targets(false);
@@ -234,16 +230,15 @@ TEST_F(RuntimeFilterConsumerTest, bitmap_filter) {
desc.__set_bitmap_target_expr(TRuntimeFilterDescBuilder::get_default_expr());
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create(
- RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0,
&consumer, &_profile));
+ RuntimeFilterParamsContext::create(_query_ctx.get()), &desc, 0,
&consumer));
}
TEST_F(RuntimeFilterConsumerTest, aquire_signal_at_same_time) {
for (int i = 0; i < 100; i++) {
std::shared_ptr<RuntimeFilterConsumer> consumer;
auto desc =
TRuntimeFilterDescBuilder().add_planId_to_target_expr(0).build();
- FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
-
RuntimeFilterConsumer::create(RuntimeFilterParamsContext::create(_query_ctx.get()),
- &desc, 0, &consumer, &_profile));
+ FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterConsumer::create(
+ RuntimeFilterParamsContext::create(_query_ctx.get()), &desc,
0, &consumer));
std::shared_ptr<RuntimeFilterProducer> producer;
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(RuntimeFilterProducer::create(
diff --git a/be/test/runtime_filter/runtime_filter_mgr_test.cpp
b/be/test/runtime_filter/runtime_filter_mgr_test.cpp
index 253631dd45a..0e1bdea1469 100644
--- a/be/test/runtime_filter/runtime_filter_mgr_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_mgr_test.cpp
@@ -65,8 +65,7 @@ TEST_F(RuntimeFilterMgrTest, TestRuntimeFilterMgr) {
// Get / Register consumer
EXPECT_TRUE(global_runtime_filter_mgr->get_consume_filters(filter_id).empty());
std::shared_ptr<RuntimeFilterConsumer> consumer_filter;
- EXPECT_TRUE(global_runtime_filter_mgr
- ->register_consumer_filter(desc, 0,
&consumer_filter, profile.get())
+ EXPECT_TRUE(global_runtime_filter_mgr->register_consumer_filter(desc,
0, &consumer_filter)
.ok());
EXPECT_FALSE(global_runtime_filter_mgr->get_consume_filters(filter_id).empty());
}
diff --git a/be/test/runtime_filter/runtime_filter_producer_test.cpp
b/be/test/runtime_filter/runtime_filter_producer_test.cpp
index 8122004ed58..4a367677bf4 100644
--- a/be/test/runtime_filter/runtime_filter_producer_test.cpp
+++ b/be/test/runtime_filter/runtime_filter_producer_test.cpp
@@ -106,8 +106,8 @@ TEST_F(RuntimeFilterProducerTest,
sync_filter_size_local_merge) {
_runtime_states[1]->register_producer_runtime_filter(desc,
&producer2, &_profile));
std::shared_ptr<RuntimeFilterConsumer> consumer;
-
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter(
- desc, true, 0, &consumer, &_profile));
+ FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+ _runtime_states[1]->register_consumer_runtime_filter(desc, true,
0, &consumer));
ASSERT_EQ(producer->_need_sync_filter_size, true);
ASSERT_EQ(producer->_rf_state,
RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE);
@@ -141,8 +141,8 @@ TEST_F(RuntimeFilterProducerTest, set_ignore_or_disable) {
_runtime_states[1]->register_producer_runtime_filter(desc,
&producer2, &_profile));
std::shared_ptr<RuntimeFilterConsumer> consumer;
-
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter(
- desc, true, 0, &consumer, &_profile));
+ FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+ _runtime_states[1]->register_consumer_runtime_filter(desc, true,
0, &consumer));
ASSERT_EQ(producer->_need_sync_filter_size, true);
ASSERT_EQ(producer->_rf_state,
RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE);
@@ -183,8 +183,8 @@ TEST_F(RuntimeFilterProducerTest,
sync_filter_size_local_merge_with_ignored) {
_runtime_states[1]->register_producer_runtime_filter(desc,
&producer2, &_profile));
std::shared_ptr<RuntimeFilterConsumer> consumer;
-
FAIL_IF_ERROR_OR_CATCH_EXCEPTION(_runtime_states[1]->register_consumer_runtime_filter(
- desc, true, 0, &consumer, &_profile));
+ FAIL_IF_ERROR_OR_CATCH_EXCEPTION(
+ _runtime_states[1]->register_consumer_runtime_filter(desc, true,
0, &consumer));
ASSERT_EQ(producer->_need_sync_filter_size, true);
ASSERT_EQ(producer->_rf_state,
RuntimeFilterProducer::State::WAITING_FOR_SEND_SIZE);
diff --git a/be/test/util/runtime_profile_counter_tree_node_test.cpp
b/be/test/util/runtime_profile_counter_tree_node_test.cpp
index 0dcb34768aa..5226841b4dd 100644
--- a/be/test/util/runtime_profile_counter_tree_node_test.cpp
+++ b/be/test/util/runtime_profile_counter_tree_node_test.cpp
@@ -279,34 +279,70 @@ TEST_F(RuntimeProfileCounterTreeNodeTest,
NonZeroCounterToThrfit) {
ASSERT_EQ(child_counter_map[RuntimeProfile::ROOT_COUNTER].size(), 0);
}
-TEST_F(RuntimeProfileCounterTreeNodeTest, CollaborationCounterTest) {
- auto root_counter = std::make_unique<RuntimeProfile::Counter>(TUnit::UNIT);
- auto child_counter1 =
std::make_unique<RuntimeProfile::CollaborationCounter>(
- TUnit::UNIT, 2, root_counter.get());
- auto child_counter2 =
std::make_unique<RuntimeProfile::CollaborationCounter>(
- TUnit::UNIT, 2, root_counter.get());
-
- auto c1 =
std::make_unique<RuntimeProfile::CollaborationCounter>(TUnit::UNIT, 2,
-
child_counter1.get());
- auto c2 =
std::make_unique<RuntimeProfile::CollaborationCounter>(TUnit::UNIT, 2,
-
child_counter1.get());
- auto c3 =
std::make_unique<RuntimeProfile::CollaborationCounter>(TUnit::UNIT, 2,
-
child_counter2.get());
- auto c4 =
std::make_unique<RuntimeProfile::CollaborationCounter>(TUnit::UNIT, 2,
-
child_counter2.get());
-
- c1->update(1);
- c2->update(10);
- c3->update(100);
- c4->update(1000);
-
- ASSERT_EQ(root_counter->value(), 1111);
- ASSERT_EQ(child_counter1->value(), 11);
- ASSERT_EQ(child_counter2->value(), 1100);
- ASSERT_EQ(c1->value(), 1);
- ASSERT_EQ(c2->value(), 10);
- ASSERT_EQ(c3->value(), 100);
- ASSERT_EQ(c4->value(), 1000);
+TEST_F(RuntimeProfileCounterTreeNodeTest, DescriptionCounter) {
+ RuntimeProfile::CounterMap counterMap;
+ RuntimeProfile::ChildCounterMap childCounterMap;
+ /*
+ ""
+ "root"
+ "description_entry"
+ */
+
+ auto rootCounter = std::make_unique<RuntimeProfile::Counter>(TUnit::UNIT);
+ auto descriptionEntry = std::make_unique<RuntimeProfile::DescriptionEntry>(
+ "description_entry", "Updated description");
+
+ counterMap["root"] = rootCounter.get();
+ counterMap["description_entry"] = descriptionEntry.get();
+
+ childCounterMap[RuntimeProfile::ROOT_COUNTER].insert("root");
+ childCounterMap["root"].insert("description_entry");
+
+ RuntimeProfileCounterTreeNode rootNode =
RuntimeProfileCounterTreeNode::from_map(
+ counterMap, childCounterMap, RuntimeProfile::ROOT_COUNTER);
+
+ std::vector<TCounter> tcounter;
+ std::map<std::string, std::set<std::string>> child_counter_map;
+
+ rootNode.to_thrift(tcounter, child_counter_map);
+
+ /*
+ ROOT_COUNTER
+ root
+ description_entry
+ */
+
+ /*
+ tcounter: root, description_entry
+ child_counter_map:
+ ROOT_COUNTER -> {root}
+ root -> {description_entry}
+ */
+
+ for (const auto& counter : tcounter) {
+ std::cout << "Counter: " << counter.name;
+ if (counter.name == "description_entry") {
+ EXPECT_TRUE(counter.__isset.description);
+ EXPECT_EQ(counter.description, "Updated description");
+ }
+ if (counter.__isset.description) {
+ std::cout << ", Description: " << counter.description;
+ }
+ std::cout << std::endl;
+ }
+
+ ASSERT_EQ(tcounter.size(), 2);
+ EXPECT_EQ(tcounter[0].name, "root");
+ EXPECT_EQ(tcounter[1].name, "description_entry");
+
+ ASSERT_TRUE(tcounter[1].__isset.description);
+ EXPECT_EQ(tcounter[1].description, "Updated description");
+ EXPECT_EQ(tcounter[1].level, 2);
+
+ ASSERT_EQ(child_counter_map.size(), 2);
+ ASSERT_EQ(child_counter_map[RuntimeProfile::ROOT_COUNTER].size(), 1);
+ ASSERT_EQ(child_counter_map["root"].size(), 1);
+ ASSERT_EQ(*child_counter_map["root"].begin(), "description_entry");
}
} // namespace doris
\ No newline at end of file
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Counter.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Counter.java
index f306d7c73fb..d4a5e6e88dd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Counter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Counter.java
@@ -21,6 +21,7 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TUnit;
+import com.google.common.base.Strings;
import com.google.gson.annotations.SerializedName;
import java.io.DataInput;
@@ -35,6 +36,8 @@ public class Counter {
private volatile int type;
@SerializedName(value = "level")
private volatile long level;
+ @SerializedName(value = "description")
+ private volatile String description;
public static Counter read(DataInput input) throws IOException {
return GsonUtils.GSON.fromJson(Text.readString(input), Counter.class);
@@ -77,6 +80,13 @@ public class Counter {
this.level = level;
}
+ public Counter(String description) {
+ this.description = description;
+ this.value = 0;
+ // Make sure not merge.
+ this.level = 2;
+ }
+
public void addValue(Counter other) {
if (other == null) {
return;
@@ -115,7 +125,11 @@ public class Counter {
}
public String print() {
- return RuntimeProfile.printCounter(value, getType());
+ if (Strings.isNullOrEmpty(description)) {
+ return RuntimeProfile.printCounter(value, getType());
+ } else {
+ return description;
+ }
}
public String toString() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
index c607dc570d0..cfd253f2faa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/RuntimeProfile.java
@@ -267,12 +267,18 @@ public class RuntimeProfile {
// If different node has counter with the same name, it will
lead to chaos.
Counter counter = this.counterMap.get(tcounter.name);
if (counter == null) {
- counterMap.put(tcounter.name, new Counter(tcounter.type,
tcounter.value, tcounter.level));
+ if (tcounter.isSetDescription()) {
+ counterMap.put(tcounter.name, new
Counter(tcounter.description));
+ } else {
+ counterMap.put(tcounter.name, new
Counter(tcounter.type, tcounter.value, tcounter.level));
+ }
} else {
counter.setLevel(tcounter.level);
if (counter.getType() != tcounter.type) {
LOG.error("Cannot update counters with the same name
but different types"
+ " type=" + tcounter.type);
+ } else if (tcounter.isSetDescription()) {
+ continue;
} else {
counter.setValue(tcounter.type, tcounter.value);
}
diff --git a/gensrc/thrift/RuntimeProfile.thrift
b/gensrc/thrift/RuntimeProfile.thrift
index 764db39f7d2..28be3fab060 100644
--- a/gensrc/thrift/RuntimeProfile.thrift
+++ b/gensrc/thrift/RuntimeProfile.thrift
@@ -26,6 +26,7 @@ struct TCounter {
2: required Metrics.TUnit type
3: required i64 value
4: optional i64 level
+ 5: optional string description
}
// A single runtime profile
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]