This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 736f1ad53d2 [profile](rpc) add RpcInstanceDetails profile to debug rpc
consumption problem (#43284)
736f1ad53d2 is described below
commit 736f1ad53d28bb75ee902c1885f255ddce7d0acf
Author: Mryange <[email protected]>
AuthorDate: Tue Nov 12 14:17:00 2024 +0800
[profile](rpc) add RpcInstanceDetails profile to debug rpc consumption
problem (#43284)
Picked from the 2.1 branch, only the RPC profile-related code was
selected.
https://github.com/apache/doris/pull/39852
https://github.com/apache/doris/pull/40117
```
DATA_STREAM_SINK_OPERATOR (id=2,dst_id=2):
- RpcCount: sum 16, avg 4, max 4, min 4
- RpcMaxTime: avg 1.15ms, max 1.163ms,
min 818.493us
- RpcAvgTime: 11.850ms
- RpcCount: 10
- RpcMaxTime: 86.891ms
- RpcMinTime: 15.200ms
- RpcSumTime: 118.503ms
- SerializeBatchTime: 13.517ms
- SplitBlockDistributeByChannelTime: 38.923ms
- SplitBlockHashComputeTime: 2.659ms
- UncompressedRowBatchSize: 135.19 KB
- WaitForDependencyTime: 0ns
- WaitForRpcBufferQueue: 0ns
RpcInstanceDetails:
- Instance 85d4f75b72a9ea61: Count: 4,
MaxTime: 36.238ms, MinTime: 12.107ms, AvgTime: 21.722ms, SumTime:
86.891ms
- Instance 85d4f75b72a9ea91: Count: 3,
MaxTime: 11.107ms, MinTime: 2.431ms, AvgTime: 5.470ms, SumTime: 16.412ms
- Instance 85d4f75b72a9eac1: Count: 3,
MaxTime: 7.554ms, MinTime: 3.160ms, AvgTime: 5.066ms, SumTime: 15.200m
```
---
be/src/pipeline/exec/exchange_sink_buffer.cpp | 75 +++++++++++++++++-----
be/src/pipeline/exec/exchange_sink_buffer.h | 13 +++-
be/src/pipeline/exec/exchange_source_operator.cpp | 1 +
be/src/runtime/runtime_state.h | 11 ++++
.../java/org/apache/doris/qe/SessionVariable.java | 10 +++
5 files changed, 93 insertions(+), 17 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 7163299d766..0f02ffc2b9a 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -25,6 +25,7 @@
#include <gen_cpp/types.pb.h>
#include <glog/logging.h>
#include <google/protobuf/stubs/callback.h>
+#include <pdqsort.h>
#include <stddef.h>
#include <atomic>
@@ -129,7 +130,8 @@ void ExchangeSinkBuffer::register_sink(TUniqueId
fragment_instance_id) {
finst_id.set_lo(fragment_instance_id.lo);
_rpc_channel_is_idle[low_id] = true;
_instance_to_receiver_eof[low_id] = false;
- _instance_to_rpc_time[low_id] = 0;
+
_instance_to_rpc_stats_vec.emplace_back(std::make_shared<RpcInstanceStatistics>(low_id));
+ _instance_to_rpc_stats[low_id] = _instance_to_rpc_stats_vec.back().get();
_construct_request(low_id, finst_id);
}
@@ -261,7 +263,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
// attach task for memory tracker and query id when core
SCOPED_ATTACH_TASK(_state);
- set_rpc_time(id, start_rpc_time, result.receive_time());
+
+ auto end_rpc_time = GetCurrentTimeNanos();
+ update_rpc_time(id, start_rpc_time, end_rpc_time);
+
Status s(Status::create(result.status()));
if (s.is<ErrorCode::END_OF_FILE>()) {
_set_receiver_eof(id);
@@ -339,7 +344,10 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
// attach task for memory tracker and query id when core
SCOPED_ATTACH_TASK(_state);
- set_rpc_time(id, start_rpc_time, result.receive_time());
+
+ auto end_rpc_time = GetCurrentTimeNanos();
+ update_rpc_time(id, start_rpc_time, end_rpc_time);
+
Status s(Status::create(result.status()));
if (s.is<ErrorCode::END_OF_FILE>()) {
_set_receiver_eof(id);
@@ -466,10 +474,10 @@ void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId
id,
void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t*
min_time) {
int64_t local_max_time = 0;
int64_t local_min_time = INT64_MAX;
- for (auto& [id, time] : _instance_to_rpc_time) {
- if (time != 0) {
- local_max_time = std::max(local_max_time, time);
- local_min_time = std::min(local_min_time, time);
+ for (auto& [id, stats] : _instance_to_rpc_stats) {
+ if (stats->sum_time != 0) {
+ local_max_time = std::max(local_max_time, stats->sum_time);
+ local_min_time = std::min(local_min_time, stats->sum_time);
}
}
*max_time = local_max_time;
@@ -478,27 +486,32 @@ void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t*
max_time, int64_t* min_ti
int64_t ExchangeSinkBuffer::get_sum_rpc_time() {
int64_t sum_time = 0;
- for (auto& [id, time] : _instance_to_rpc_time) {
- sum_time += time;
+ for (auto& [id, stats] : _instance_to_rpc_stats) {
+ sum_time += stats->sum_time;
}
return sum_time;
}
-void ExchangeSinkBuffer::set_rpc_time(InstanceLoId id, int64_t start_rpc_time,
- int64_t receive_rpc_time) {
+void ExchangeSinkBuffer::update_rpc_time(InstanceLoId id, int64_t
start_rpc_time,
+ int64_t receive_rpc_time) {
_rpc_count++;
int64_t rpc_spend_time = receive_rpc_time - start_rpc_time;
- DCHECK(_instance_to_rpc_time.find(id) != _instance_to_rpc_time.end());
+ DCHECK(_instance_to_rpc_stats.find(id) != _instance_to_rpc_stats.end());
if (rpc_spend_time > 0) {
- _instance_to_rpc_time[id] += rpc_spend_time;
+ ++_instance_to_rpc_stats[id]->rpc_count;
+ _instance_to_rpc_stats[id]->sum_time += rpc_spend_time;
+ _instance_to_rpc_stats[id]->max_time =
+ std::max(_instance_to_rpc_stats[id]->max_time, rpc_spend_time);
+ _instance_to_rpc_stats[id]->min_time =
+ std::min(_instance_to_rpc_stats[id]->min_time, rpc_spend_time);
}
}
void ExchangeSinkBuffer::update_profile(RuntimeProfile* profile) {
- auto* _max_rpc_timer = ADD_TIMER(profile, "RpcMaxTime");
+ auto* _max_rpc_timer = ADD_TIMER_WITH_LEVEL(profile, "RpcMaxTime", 1);
auto* _min_rpc_timer = ADD_TIMER(profile, "RpcMinTime");
auto* _sum_rpc_timer = ADD_TIMER(profile, "RpcSumTime");
- auto* _count_rpc = ADD_COUNTER(profile, "RpcCount", TUnit::UNIT);
+ auto* _count_rpc = ADD_COUNTER_WITH_LEVEL(profile, "RpcCount",
TUnit::UNIT, 1);
auto* _avg_rpc_timer = ADD_TIMER(profile, "RpcAvgTime");
int64_t max_rpc_time = 0, min_rpc_time = 0;
@@ -510,6 +523,38 @@ void ExchangeSinkBuffer::update_profile(RuntimeProfile*
profile) {
int64_t sum_time = get_sum_rpc_time();
_sum_rpc_timer->set(sum_time);
_avg_rpc_timer->set(sum_time / std::max(static_cast<int64_t>(1),
_rpc_count.load()));
+
+ auto max_count = _state->rpc_verbose_profile_max_instance_count();
+ if (_state->enable_verbose_profile() && max_count > 0) {
+ std::vector<RpcInstanceStatistics> tmp_rpc_stats_vec;
+ for (const auto& stats : _instance_to_rpc_stats_vec) {
+ tmp_rpc_stats_vec.emplace_back(*stats);
+ }
+ pdqsort(tmp_rpc_stats_vec.begin(), tmp_rpc_stats_vec.end(),
+ [](const auto& a, const auto& b) { return a.max_time >
b.max_time; });
+ auto count = std::min((size_t)max_count, tmp_rpc_stats_vec.size());
+ int i = 0;
+ auto* detail_profile = profile->create_child("RpcInstanceDetails",
true, true);
+ for (const auto& stats : tmp_rpc_stats_vec) {
+ if (0 == stats.rpc_count) {
+ continue;
+ }
+ std::stringstream out;
+ out << "Instance " << std::hex << stats.inst_lo_id;
+ auto stats_str = fmt::format(
+ "Count: {}, MaxTime: {}, MinTime: {}, AvgTime: {},
SumTime: {}",
+ stats.rpc_count, PrettyPrinter::print(stats.max_time,
TUnit::TIME_NS),
+ PrettyPrinter::print(stats.min_time, TUnit::TIME_NS),
+ PrettyPrinter::print(
+ stats.sum_time / std::max(static_cast<int64_t>(1),
stats.rpc_count),
+ TUnit::TIME_NS),
+ PrettyPrinter::print(stats.sum_time, TUnit::TIME_NS));
+ detail_profile->add_info_string(out.str(), stats_str);
+ if (++i == count) {
+ break;
+ }
+ }
+ }
}
} // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 13692532a33..22a1452f8d5 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -180,7 +180,7 @@ public:
Status add_block(TransmitInfo&& request);
Status add_block(BroadcastTransmitInfo&& request);
void close();
- void set_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t
receive_rpc_time);
+ void update_rpc_time(InstanceLoId id, int64_t start_rpc_time, int64_t
receive_rpc_time);
void update_profile(RuntimeProfile* profile);
void set_dependency(std::shared_ptr<Dependency> queue_dependency,
@@ -215,7 +215,16 @@ private:
phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_idle;
phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof;
- phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time;
+ struct RpcInstanceStatistics {
+ RpcInstanceStatistics(InstanceLoId id) : inst_lo_id(id) {}
+ InstanceLoId inst_lo_id;
+ int64_t rpc_count = 0;
+ int64_t max_time = 0;
+ int64_t min_time = INT64_MAX;
+ int64_t sum_time = 0;
+ };
+ std::vector<std::shared_ptr<RpcInstanceStatistics>>
_instance_to_rpc_stats_vec;
+ phmap::flat_hash_map<InstanceLoId, RpcInstanceStatistics*>
_instance_to_rpc_stats;
std::atomic<bool> _is_finishing;
PUniqueId _query_id;
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp
b/be/src/pipeline/exec/exchange_source_operator.cpp
index eafefa2e4c0..dbde9abd05d 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -81,6 +81,7 @@ Status ExchangeLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
get_data_from_recvr_timer = ADD_TIMER(_runtime_profile,
"GetDataFromRecvrTime");
filter_timer = ADD_TIMER(_runtime_profile, "FilterTime");
create_merger_timer = ADD_TIMER(_runtime_profile, "CreateMergerTime");
+ _runtime_profile->add_info_string("InstanceID",
print_id(state->fragment_instance_id()));
return Status::OK();
}
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index abc823bc25b..88deee491d1 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -459,6 +459,17 @@ public:
return _query_options.__isset.enable_profile &&
_query_options.enable_profile;
}
+ bool enable_verbose_profile() const {
+ return enable_profile() &&
_query_options.__isset.enable_verbose_profile &&
+ _query_options.enable_verbose_profile;
+ }
+
+ int rpc_verbose_profile_max_instance_count() const {
+ return _query_options.__isset.rpc_verbose_profile_max_instance_count
+ ? _query_options.rpc_verbose_profile_max_instance_count
+ : 0;
+ }
+
bool enable_share_hash_table_for_broadcast_join() const {
return
_query_options.__isset.enable_share_hash_table_for_broadcast_join &&
_query_options.enable_share_hash_table_for_broadcast_join;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index dfbe4c445a4..4b1049649b8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -97,6 +97,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String MAX_EXECUTION_TIME = "max_execution_time";
public static final String INSERT_TIMEOUT = "insert_timeout";
public static final String ENABLE_PROFILE = "enable_profile";
+ public static final String ENABLE_VERBOSE_PROFILE =
"enable_verbose_profile";
+ public static final String RPC_VERBOSE_PROFILE_MAX_INSTANCE_COUNT =
"rpc_verbose_profile_max_instance_count";
public static final String AUTO_PROFILE_THRESHOLD_MS =
"auto_profile_threshold_ms";
public static final String SQL_MODE = "sql_mode";
public static final String WORKLOAD_VARIABLE = "workload_group";
@@ -790,6 +792,12 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = ENABLE_PROFILE, needForward = true)
public boolean enableProfile = false;
+ @VariableMgr.VarAttr(name = ENABLE_VERBOSE_PROFILE, needForward = true)
+ public boolean enableVerboseProfile = false;
+
+ @VariableMgr.VarAttr(name = RPC_VERBOSE_PROFILE_MAX_INSTANCE_COUNT,
needForward = true)
+ public int rpcVerboseProfileMaxInstanceCount = 5;
+
// When enable_profile is true, profile of queries that costs more than
autoProfileThresholdMs
// will be stored to disk.
@VariableMgr.VarAttr(name = AUTO_PROFILE_THRESHOLD_MS, needForward = true)
@@ -3811,6 +3819,8 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setQueryTimeout(queryTimeoutS);
tResult.setEnableProfile(enableProfile);
+ tResult.setEnableVerboseProfile(enableVerboseProfile);
+
tResult.setRpcVerboseProfileMaxInstanceCount(rpcVerboseProfileMaxInstanceCount);
if (enableProfile) {
// If enable profile == true, then also set report success to true
// be need report success to start report thread. But it is very
tricky
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]