This is an automated email from the ASF dual-hosted git repository.
jacktengg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fa771514d51 [spill](logs) add logs to debug spill bugs (#37144)
fa771514d51 is described below
commit fa771514d51840d4a3da5b7ce700bf1309637875
Author: TengJianPing <[email protected]>
AuthorDate: Thu Jul 4 11:18:53 2024 +0800
[spill](logs) add logs to debug spill bugs (#37144)
Add logs to debug spill hash join bugs:
```
*** Query id: d7f1126be4e948c6-87f1a80ed3cbd69e ***
*** is nereids: 0 ***
*** tablet id: 0 ***
*** Aborted at 1719291313 (unix time) try "date -d @1719291313" if you are
using GNU date ***
*** Current BE git commitID: 5f5262a885 ***
*** SIGSEGV address not mapped to object (@0x8) received by PID 1419021
(TID 1421288 OR 0x7f0212b43640) from PID 8; stack trace: ***
0# doris::signal::(anonymous namespace)::FailureSignalHandler(int,
siginfo_t*, void*) at
/home/zcp/repo_center/doris_master/doris/be/src/common/signal_handler.h:421
1# PosixSignals::chained_handler(int, siginfo*, void*) [clone .part.0] in
/usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so
2# JVM_handle_linux_signal in
/usr/lib/jvm/java-17-openjdk-amd64/lib/server/libjvm.so
3# 0x00007F06BD506520 in /lib/x86_64-linux-gnu/libc.so.6
4# doris::vectorized::SpillReader::read(doris::vectorized::Block*, bool*)
at /home/zcp/repo_center/doris_master/doris/be/src/vec/spill/spill_reader.cpp:96
5#
doris::vectorized::SpillStream::read_next_block_sync(doris::vectorized::Block*,
bool*) in /mnt/disk1/STRESS_ENV/be/lib/doris_be
6# std::_Function_handler<void (),
doris::pipeline::PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(doris::RuntimeState*,
unsigned int, bool&)::$_1>::_M_invoke(std::_Any_data const&) at
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_function.h:291
7# doris::ThreadPool::dispatch_thread() in
/mnt/disk1/STRESS_ENV/be/lib/doris_be
8# doris::Thread::supervise_thread(void*) at
/home/zcp/repo_center/doris_master/doris/be/src/util/thread.cpp:499
9# start_thread at ./nptl/pthread_create.c:442
10# 0x00007F06BD5EA850 at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:83
```
---
be/src/pipeline/dependency.cpp | 7 ++--
.../exec/partitioned_hash_join_probe_operator.cpp | 42 +++++++++++++++++++---
be/src/pipeline/pipeline_fragment_context.cpp | 1 +
be/src/pipeline/pipeline_task.cpp | 19 ++++++----
be/src/runtime/runtime_state.h | 5 +++
5 files changed, 59 insertions(+), 15 deletions(-)
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 4938883062a..5e1ce79a1eb 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -82,9 +82,10 @@ Dependency* Dependency::is_blocked_by(PipelineTask* task) {
std::string Dependency::debug_string(int indentation_level) {
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer, "{}{}: id={}, block task = {},
ready={}, _always_ready={}",
- std::string(indentation_level * 2, ' '), _name, _node_id,
_blocked_task.size(),
- _ready, _always_ready);
+ fmt::format_to(debug_string_buffer,
+ "{}this={}, {}: id={}, block task = {}, ready={},
_always_ready={}",
+ std::string(indentation_level * 2, ' '), (void*)this,
_name, _node_id,
+ _blocked_task.size(), _ready, _always_ready);
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 1ff927bcc6d..09976b3060e 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -258,6 +258,9 @@ Status
PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(RuntimeState*
state,
uint32_t partition_index,
bool& has_data) {
+ VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " <<
_parent->node_id()
+ << ", task id: " << state->task_id() << ", partition: " <<
partition_index
+ << " recovery_build_blocks_from_disk";
auto& spilled_stream = _shared_state->spilled_streams[partition_index];
has_data = false;
if (!spilled_stream) {
@@ -292,6 +295,9 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
SCOPED_TIMER(_recovery_build_timer);
bool eos = false;
+ VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: "
<< _parent->node_id()
+ << ", task id: " << state->task_id() << ", partition: " <<
partition_index
+ << ", recoverying build data";
while (!eos) {
vectorized::Block block;
Status st;
@@ -332,12 +338,12 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
}
}
- VLOG_DEBUG << "query: " << print_id(state->query_id())
- << ", recovery data done for partition: " <<
spilled_stream->get_spill_dir()
- << ", task id: " << state->task_id();
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
shared_state_sptr->spilled_streams[partition_index].reset();
_dependency->set_ready();
+ VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: "
<< _parent->node_id()
+ << ", task id: " << state->task_id() << ", partition: " <<
partition_index
+ << ", recovery build data done";
};
auto exception_catch_func = [read_func, query_id, this]() {
@@ -362,6 +368,16 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
has_data = true;
_dependency->block();
+ {
+ auto* pipeline_task = state->get_task();
+ if (pipeline_task) {
+ auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
+ VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node:
" << p.node_id()
+ << ", task id: " << state->task_id() << ", partition: "
<< partition_index
+ << ", dependency: " << _dependency
+ << ", task debug_string: " <<
pipeline_task->debug_string();
+ }
+ }
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recovery_build_blocks_submit_func",
{
@@ -371,15 +387,31 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
});
auto spill_runnable = std::make_shared<SpillRunnable>(state,
_shared_state->shared_from_this(),
exception_catch_func);
+ VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " <<
_parent->node_id()
+ << ", task id: " << state->task_id() << ", partition: " <<
partition_index
+ << " recovery_build_blocks_from_disk submit func";
return spill_io_pool->submit(std::move(spill_runnable));
}
std::string PartitionedHashJoinProbeLocalState::debug_string(int
indentation_level) const {
+ auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
+ bool need_more_input_data;
+ if (_shared_state->need_to_spill) {
+ need_more_input_data = !_child_eos;
+ } else if (_runtime_state) {
+ need_more_input_data =
p._inner_probe_operator->need_more_input_data(_runtime_state.get());
+ } else {
+ need_more_input_data = true;
+ }
fmt::memory_buffer debug_string_buffer;
- fmt::format_to(debug_string_buffer, "{}, short_circuit_for_probe: {}",
+ fmt::format_to(debug_string_buffer,
+ "{}, short_circuit_for_probe: {}, need_to_spill: {},
child_eos: {}, "
+ "_runtime_state: {}, need_more_input_data: {}",
PipelineXSpillLocalState<PartitionedHashJoinSharedState>::debug_string(
indentation_level),
- _shared_state ?
std::to_string(_shared_state->short_circuit_for_probe) : "NULL");
+ _shared_state ?
std::to_string(_shared_state->short_circuit_for_probe) : "NULL",
+ _shared_state->need_to_spill, _child_eos, _runtime_state !=
nullptr,
+ need_more_input_data);
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 94837ff55a0..0968de7951e 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -455,6 +455,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
task_runtime_state.get(), this,
pipeline_id_to_profile[pip_idx].get(),
get_local_exchange_state(pipeline), i);
+ task_runtime_state->set_task(task.get());
pipeline_id_to_task.insert({pipeline->id(), task.get()});
_tasks[i].emplace_back(std::move(task));
}
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 09f32d9d23e..52951e1c9c0 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -254,6 +254,11 @@ bool PipelineTask::_is_blocked() {
}
// If all dependencies are ready for this operator, we can execute
this task if no datum is needed from upstream operators.
if (!_operators[i]->need_more_input_data(_state)) {
+ if (VLOG_DEBUG_IS_ON) {
+ VLOG_DEBUG << "query: " << print_id(_state->query_id())
+ << ", task id: " << _index << ", operator " << i
+ << " not need_more_input_data";
+ }
break;
}
}
@@ -471,13 +476,13 @@ std::string PipelineTask::debug_string() {
auto* cur_blocked_dep = _blocked_dep;
auto elapsed = _fragment_context->elapsed_time() / 1000000000.0;
- fmt::format_to(
- debug_string_buffer,
- "PipelineTask[this = {}, open = {}, eos = {}, finish = {}, dry run
= {}, elapse time "
- "= {}s], block dependency = {}, is running = {}\noperators: ",
- (void*)this, _opened, _eos, _finalized, _dry_run, elapsed,
- cur_blocked_dep && !_finalized ? cur_blocked_dep->debug_string() :
"NULL",
- is_running());
+ fmt::format_to(debug_string_buffer,
+ "PipelineTask[this = {}, id = {}, open = {}, eos = {},
finish = {}, dry run = "
+ "{}, elapse time "
+ "= {}s], block dependency = {}, is running = {}\noperators:
",
+ (void*)this, _index, _opened, _eos, _finalized, _dry_run,
elapsed,
+ cur_blocked_dep && !_finalized ?
cur_blocked_dep->debug_string() : "NULL",
+ is_running());
for (size_t i = 0; i < _operators.size(); i++) {
fmt::format_to(debug_string_buffer, "\n{}",
_opened && !_finalized ?
_operators[i]->debug_string(_state, i)
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 49c051de44d..e89e7be66f5 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -617,6 +617,10 @@ public:
void set_task_id(int id) { _task_id = id; }
+ void set_task(pipeline::PipelineTask* task) { _task = task; }
+
+ pipeline::PipelineTask* get_task() const { return _task; }
+
int task_id() const { return _task_id; }
void set_task_num(int task_num) { _task_num = task_num; }
@@ -721,6 +725,7 @@ private:
std::vector<TTabletCommitInfo> _tablet_commit_infos;
std::vector<TErrorTabletInfo> _error_tablet_infos;
int _max_operator_id = 0;
+ pipeline::PipelineTask* _task = nullptr;
int _task_id = -1;
int _task_num = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]