This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3ba048569e5 [pipelineX](debug) Refactor code and complete debug string
(#31733)
3ba048569e5 is described below
commit 3ba048569e5536eaf5b4b7d55be293c28d9c9497
Author: Gabriel <[email protected]>
AuthorDate: Tue Mar 5 18:17:22 2024 +0800
[pipelineX](debug) Refactor code and complete debug string (#31733)
---
be/src/pipeline/pipeline_x/dependency.cpp | 5 +++--
be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 7 ++++++-
be/src/vec/runtime/vdata_stream_recvr.h | 14 +++++++++-----
3 files changed, 18 insertions(+), 8 deletions(-)
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp
b/be/src/pipeline/pipeline_x/dependency.cpp
index adbadcfb835..3305341a51c 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -104,9 +104,10 @@ Dependency*
RuntimeFilterDependency::is_blocked_by(PipelineXTask* task) {
std::string Dependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer, "{}{}: id={}, block task = {},
ready={}",
+ fmt::format_to(debug_string_buffer,
+ "{}{}: id={}, block task = {}, ready={}, _always_ready={},
is cancelled={}",
std::string(indentation_level * 2, ' '), _name, _node_id,
_blocked_task.size(),
- _ready);
+ _ready, _always_ready, _is_cancelled());
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 0459f1e3a0e..91a27b8e301 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -271,7 +271,12 @@ Status PipelineXTask::execute(bool* eos) {
if (!_dry_run) {
SCOPED_TIMER(_get_block_timer);
_get_block_counter->update(1);
- RETURN_IF_ERROR(_root->get_block_after_projects(_state, block,
eos));
+ try {
+ RETURN_IF_ERROR(_root->get_block_after_projects(_state, block,
eos));
+ } catch (const Exception& e) {
+ return Status::InternalError(e.to_string() +
+ " task debug string: " +
debug_string());
+ }
} else {
*eos = true;
}
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index 9ce8d72d73a..fa13666533a 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -296,11 +296,15 @@ public:
Status get_batch(Block* block, bool* eos) override {
std::lock_guard<std::mutex> l(_lock); // protect _block_queue
- DCHECK(_is_cancelled || !_block_queue.empty() ||
_num_remaining_senders == 0)
- << " _is_cancelled: " << _is_cancelled
- << ", _block_queue_empty: " << _block_queue.empty()
- << ", _num_remaining_senders: " << _num_remaining_senders <<
"\n"
- << _debug_string_info();
+#ifndef NDEBUG
+ if (!_is_cancelled && _block_queue.empty() && _num_remaining_senders >
0) {
+ throw doris::Exception(ErrorCode::INTERNAL_ERROR,
+ "_is_cancelled: {}, _block_queue_empty: {},
"
+ "_num_remaining_senders: {},
_debug_string_info: {}",
+ _is_cancelled, _block_queue.empty(),
_num_remaining_senders,
+ _debug_string_info());
+ }
+#endif
return _inner_get_batch_without_lock(block, eos);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]