This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new db1da6b787 [Chore](pipeline) add some profile log when pipeline
canceled (#20825)
db1da6b787 is described below
commit db1da6b7874cf86c00246fb30f17446c2dce6de5
Author: Pxl <[email protected]>
AuthorDate: Fri Jun 16 10:54:54 2023 +0800
[Chore](pipeline) add some profile log when pipeline canceled (#20825)
add some profile log when pipeline canceled
---
be/src/pipeline/exec/data_queue.cpp | 10 +++++----
be/src/pipeline/exec/empty_source_operator.h | 4 ++++
.../exec/multi_cast_data_stream_source.cpp | 4 ++++
.../pipeline/exec/multi_cast_data_stream_source.h | 2 ++
be/src/pipeline/exec/operator.cpp | 2 +-
be/src/pipeline/exec/operator.h | 8 ++++++++
be/src/pipeline/pipeline_task.cpp | 24 ++++++++++++++++++----
be/src/pipeline/pipeline_task.h | 1 +
8 files changed, 46 insertions(+), 9 deletions(-)
diff --git a/be/src/pipeline/exec/data_queue.cpp
b/be/src/pipeline/exec/data_queue.cpp
index 6ec6a5b2b1..bdcc12df95 100644
--- a/be/src/pipeline/exec/data_queue.cpp
+++ b/be/src/pipeline/exec/data_queue.cpp
@@ -83,13 +83,15 @@ bool DataQueue::has_data_or_finished(int child_idx) {
//so next loop, will check the record idx + 1 first
//maybe it's useful with many queue, others maybe always 0
bool DataQueue::remaining_has_data() {
- int count = _child_count - 1;
- while (count >= 0) {
- _flag_queue_idx = (_flag_queue_idx + 1) % _child_count;
+ int count = _child_count;
+ while (--count >= 0) {
+ _flag_queue_idx++;
+ if (_flag_queue_idx == _child_count) {
+ _flag_queue_idx = 0;
+ }
if (_cur_blocks_nums_in_queue[_flag_queue_idx] > 0) {
return true;
}
- count--;
}
return false;
}
diff --git a/be/src/pipeline/exec/empty_source_operator.h
b/be/src/pipeline/exec/empty_source_operator.h
index 4d93a310df..acd2c8cdfc 100644
--- a/be/src/pipeline/exec/empty_source_operator.h
+++ b/be/src/pipeline/exec/empty_source_operator.h
@@ -78,6 +78,10 @@ public:
return Status::OK();
}
+ [[nodiscard]] RuntimeProfile* get_runtime_profile() const override {
+ return _exec_node->runtime_profile();
+ }
+
private:
ExecNode* _exec_node = nullptr;
};
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index 9854d63120..06211faf52 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -67,4 +67,8 @@ Status
MultiCastDataStreamerSourceOperator::close(doris::RuntimeState* state) {
return OperatorBase::close(state);
}
+RuntimeProfile* MultiCastDataStreamerSourceOperator::get_runtime_profile()
const {
+ return _multi_cast_data_streamer->profile();
+}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index 3198c4a408..15bd320b89 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -71,6 +71,8 @@ public:
Status close(doris::RuntimeState* state) override;
+ [[nodiscard]] RuntimeProfile* get_runtime_profile() const override;
+
private:
const int _consumer_id;
std::shared_ptr<MultiCastDataStreamer> _multi_cast_data_streamer;
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index 40da74ffb0..765f3421b8 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -49,7 +49,7 @@ const RowDescriptor& OperatorBase::row_desc() {
std::string OperatorBase::debug_string() const {
std::stringstream ss;
- ss << _operator_builder->get_name() << ", is_source: " << is_source();
+ ss << _operator_builder->get_name() << ": is_source: " << is_source();
ss << ", is_sink: " << is_sink() << ", is_closed: " << _is_closed;
ss << ", is_pending_finish: " << is_pending_finish();
return ss.str();
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 0a31435b8f..fcd22eedcb 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -242,6 +242,8 @@ public:
virtual std::string debug_string() const;
int32_t id() const { return _operator_builder->id(); }
+ [[nodiscard]] virtual RuntimeProfile* get_runtime_profile() const = 0;
+
protected:
OperatorBuilderBase* _operator_builder;
OperatorPtr _child;
@@ -293,6 +295,8 @@ public:
Status finalize(RuntimeState* state) override { return Status::OK(); }
+ [[nodiscard]] RuntimeProfile* get_runtime_profile() const override {
return _sink->profile(); }
+
protected:
NodeType* _sink;
};
@@ -356,6 +360,10 @@ public:
bool can_read() override { return _node->can_read(); }
+ [[nodiscard]] RuntimeProfile* get_runtime_profile() const override {
+ return _node->runtime_profile();
+ }
+
protected:
NodeType* _node;
bool _use_projection;
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 7c3cc2abe7..9c8c23ec7e 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -32,6 +32,7 @@
#include "runtime/thread_context.h"
#include "task_queue.h"
#include "util/defer_op.h"
+#include "util/runtime_profile.h"
namespace doris {
class RuntimeState;
@@ -64,6 +65,7 @@ void PipelineTask::_init_profile() {
_prepare_timer = ADD_CHILD_TIMER(_task_profile, "PrepareTime", exec_time);
_open_timer = ADD_CHILD_TIMER(_task_profile, "OpenTime", exec_time);
_get_block_timer = ADD_CHILD_TIMER(_task_profile, "GetBlockTime",
exec_time);
+ _get_block_counter = ADD_COUNTER(_task_profile, "GetBlockCounter",
TUnit::UNIT);
_sink_timer = ADD_CHILD_TIMER(_task_profile, "SinkTime", exec_time);
_finalize_timer = ADD_CHILD_TIMER(_task_profile, "FinalizeTime",
exec_time);
_close_timer = ADD_CHILD_TIMER(_task_profile, "CloseTime", exec_time);
@@ -213,6 +215,7 @@ Status PipelineTask::execute(bool* eos) {
// Pull block from operator chain
{
SCOPED_TIMER(_get_block_timer);
+ _get_block_counter->update(1);
RETURN_IF_ERROR(_root->get_block(_state, block, _data_state));
}
*eos = _data_state == SourceState::FINISHED;
@@ -313,23 +316,36 @@ void PipelineTask::set_state(PipelineTaskState state) {
std::string PipelineTask::debug_string() {
fmt::memory_buffer debug_string_buffer;
- std::stringstream profile_ss;
- _fresh_profile_counter();
- _task_profile->pretty_print(&profile_ss, "");
fmt::format_to(debug_string_buffer, "QueryId: {}\n",
print_id(query_context()->query_id));
fmt::format_to(debug_string_buffer, "InstanceId: {}\n",
print_id(fragment_context()->get_fragment_instance_id()));
- fmt::format_to(debug_string_buffer, "Profile: {}\n", profile_ss.str());
+ fmt::format_to(debug_string_buffer, "RuntimeUsage: {}\n",
+ PrettyPrinter::print(get_runtime_ns(), TUnit::TIME_NS));
+ {
+ std::stringstream profile_ss;
+ _fresh_profile_counter();
+ _task_profile->pretty_print(&profile_ss, "");
+ fmt::format_to(debug_string_buffer, "Profile: {}\n", profile_ss.str());
+ }
fmt::format_to(debug_string_buffer, "PipelineTask[id = {}, state =
{}]\noperators: ", _index,
get_state_name(_cur_state));
for (size_t i = 0; i < _operators.size(); i++) {
fmt::format_to(debug_string_buffer, "\n{}{}", std::string(i * 2, ' '),
_operators[i]->debug_string());
+ std::stringstream profile_ss;
+ _operators[i]->get_runtime_profile()->pretty_print(&profile_ss,
std::string(i * 2, ' '));
+ fmt::format_to(debug_string_buffer, "\n{}", profile_ss.str());
}
fmt::format_to(debug_string_buffer, "\n{}{}",
std::string(_operators.size() * 2, ' '),
_sink->debug_string());
+ {
+ std::stringstream profile_ss;
+ _sink->get_runtime_profile()->pretty_print(&profile_ss,
+
std::string(_operators.size() * 2, ' '));
+ fmt::format_to(debug_string_buffer, "\n{}", profile_ss.str());
+ }
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 9317a494da..65e9ad83ed 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -257,6 +257,7 @@ private:
RuntimeProfile::Counter* _open_timer;
RuntimeProfile::Counter* _exec_timer;
RuntimeProfile::Counter* _get_block_timer;
+ RuntimeProfile::Counter* _get_block_counter;
RuntimeProfile::Counter* _sink_timer;
RuntimeProfile::Counter* _finalize_timer;
RuntimeProfile::Counter* _close_timer;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]