This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c9d90b636e8 [feature](pipelineX) add time unit when slow_dependency
need to log (#26466)
c9d90b636e8 is described below
commit c9d90b636e82860694eb9a07ab9d69fa82fdbee5
Author: Mryange <[email protected]>
AuthorDate: Mon Nov 6 19:58:07 2023 +0800
[feature](pipelineX) add time unit when slow_dependency need to log (#26466)
---
be/src/pipeline/exec/exchange_sink_operator.h | 2 +-
be/src/pipeline/exec/scan_operator.h | 2 +-
be/src/pipeline/pipeline_x/dependency.h | 28 ++++++++++++++++++++-------
3 files changed, 23 insertions(+), 9 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index a0badc7561c..815d3930577 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -81,7 +81,7 @@ public:
[[nodiscard]] WriteDependency* write_blocked_by() override {
if (config::enable_fuzzy_mode && _available_block == 0 &&
- _write_dependency_watcher.elapsed_time() >
SLOW_DEPENDENCY_THRESHOLD) {
+ _should_log(_write_dependency_watcher.elapsed_time())) {
LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
<< id();
}
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index c71811c831d..68d006006f6 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -103,7 +103,7 @@ public:
_scanner_ctx->reschedule_scanner_ctx();
}
if (config::enable_fuzzy_mode && !_ready_for_read &&
- _read_dependency_watcher.elapsed_time() >
SLOW_DEPENDENCY_THRESHOLD) {
+ _should_log(_read_dependency_watcher.elapsed_time())) {
LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
<< id();
}
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index d5349307e5b..1d575690f8e 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -46,7 +46,8 @@ class Dependency;
using DependencySPtr = std::shared_ptr<Dependency>;
static constexpr auto SLOW_DEPENDENCY_THRESHOLD = 10 * 1000L * 1000L * 1000L;
-
+static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 5 * 1000L * 1000L * 1000L;
+static_assert(TIME_UNIT_DEPENDENCY_LOG < SLOW_DEPENDENCY_THRESHOLD);
class Dependency : public std::enable_shared_from_this<Dependency> {
public:
Dependency(int id, std::string name) : _id(id), _name(name),
_ready_for_read(false) {}
@@ -73,7 +74,7 @@ public:
// Which dependency current pipeline task is blocked by. `nullptr` if this
dependency is ready.
[[nodiscard]] virtual Dependency* read_blocked_by() {
if (config::enable_fuzzy_mode && !_ready_for_read &&
- _read_dependency_watcher.elapsed_time() >
SLOW_DEPENDENCY_THRESHOLD) {
+ _should_log(_read_dependency_watcher.elapsed_time())) {
LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
<< id();
}
@@ -102,6 +103,17 @@ public:
void remove_first_child() { _children.erase(_children.begin()); }
protected:
+ bool _should_log(uint64_t cur_time) {
+ if (cur_time < SLOW_DEPENDENCY_THRESHOLD) {
+ return false;
+ }
+ if ((cur_time - _last_log_time) < TIME_UNIT_DEPENDENCY_LOG) {
+ return false;
+ }
+ _last_log_time = cur_time;
+ return true;
+ }
+
int _id;
std::string _name;
std::atomic<bool> _ready_for_read;
@@ -110,6 +122,8 @@ protected:
std::weak_ptr<Dependency> _parent;
std::list<std::shared_ptr<Dependency>> _children;
+
+ uint64_t _last_log_time = 0;
};
class WriteDependency : public Dependency {
@@ -133,7 +147,7 @@ public:
[[nodiscard]] virtual WriteDependency* write_blocked_by() {
if (config::enable_fuzzy_mode && !_ready_for_write &&
- _write_dependency_watcher.elapsed_time() >
SLOW_DEPENDENCY_THRESHOLD) {
+ _should_log(_write_dependency_watcher.elapsed_time())) {
LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
<< id();
}
@@ -174,7 +188,7 @@ public:
[[nodiscard]] FinishDependency* finish_blocked_by() {
if (config::enable_fuzzy_mode && !_ready_to_finish &&
- _finish_dependency_watcher.elapsed_time() >
SLOW_DEPENDENCY_THRESHOLD) {
+ _should_log(_finish_dependency_watcher.elapsed_time())) {
LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
<< _node_id;
}
@@ -726,7 +740,7 @@ public:
[[nodiscard]] Dependency* read_blocked_by() override {
if (config::enable_fuzzy_mode && !(_ready_for_read || _eos) &&
- _read_dependency_watcher.elapsed_time() >
SLOW_DEPENDENCY_THRESHOLD) {
+ _should_log(_read_dependency_watcher.elapsed_time())) {
LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
<< id();
}
@@ -845,7 +859,7 @@ public:
// Which dependency current pipeline task is blocked by. `nullptr` if this
dependency is ready.
[[nodiscard]] Dependency* read_blocked_by() override {
if (config::enable_fuzzy_mode && !_set_state->ready_for_read &&
- _read_dependency_watcher.elapsed_time() >
SLOW_DEPENDENCY_THRESHOLD) {
+ _should_log(_read_dependency_watcher.elapsed_time())) {
LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
<< id();
}
@@ -905,7 +919,7 @@ public:
Dependency* read_blocked_by() override {
if (config::enable_fuzzy_mode && !_should_run() &&
- _read_dependency_watcher.elapsed_time() >
SLOW_DEPENDENCY_THRESHOLD) {
+ _should_log(_read_dependency_watcher.elapsed_time())) {
LOG(WARNING) << "========Dependency may be blocked by some
reasons: " << name() << " "
<< id();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]