This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ad39b9e2afb [Bug](pipeline) fix nullptr access when broadcast join
meet wakeup early and build sh… (#48247)
ad39b9e2afb is described below
commit ad39b9e2afbfaa2e2ebfea456d00e35d89fd98ea
Author: Pxl <[email protected]>
AuthorDate: Tue Feb 25 14:24:06 2025 +0800
[Bug](pipeline) fix nullptr access when broadcast join meet wakeup early
and build sh… (#48247)
…ared hash table's instance not opened
### What problem does this PR solve?
Consider a scenario like this:
broadcast join task is wake up early, and then the following things
happen in order.
1. The instance of the build hash table was not opened, it went directly
to close, and signaled other instances
2. Other instances are opened. Enter sink eos. At this time, the shared
hash table is a nullptr
3. coredump due to nullptr
related with: https://github.com/apache/doris/pull/47380
https://github.com/apache/doris/pull/48008
```cpp
0# doris::signal::(anonymous namespace)::FailureSignalHandler(int,
siginfo_t*, void*) at
/mnt/disk3/xiaolei/doris_master/be/src/common/signal_handler.h:421
1# 0x00007F7F7EA625B0 in /lib64/libc.so.6
2# std::__detail::__variant::_Variant_storage<false, std::monostate,
doris::vectorized::MethodSerialized<doris::JoinHashTable<doris::StringRef,
DefaultHash<doris::StringRef, void> > >,
doris::vectorized::MethodOneNumber<unsigned char, doris::JoinHashTable<unsigned
char, HashCRC32<unsigned char> > >, doris::vectorized::MethodOneNumber<unsigned
short, doris::JoinHashTable<unsigned short, HashCRC32<unsigned short> > >,
doris::vectorized::MethodOneNumber<unsigned int, doris::JoinHashTabl [...]
3# std::variant<std::monostate,
doris::vectorized::MethodSerialized<doris::JoinHashTable<doris::StringRef,
DefaultHash<doris::StringRef, void> > >,
doris::vectorized::MethodOneNumber<unsigned char, doris::JoinHashTable<unsigned
char, HashCRC32<unsigned char> > >, doris::vectorized::MethodOneNumber<unsigned
short, doris::JoinHashTable<unsigned short, HashCRC32<unsigned short> > >,
doris::vectorized::MethodOneNumber<unsigned int, doris::JoinHashTable<unsigned
int, HashCRC32<unsigned in [...]
4#
std::invoke_result<doris::pipeline::HashJoinBuildSinkOperatorX::sink(doris::RuntimeState*,
doris::vectorized::Block*, bool)::$_0,
std::__conditional<is_lvalue_reference_v<std::variant<std::monostate,
doris::vectorized::MethodSerialized<doris::JoinHashTable<doris::StringRef,
DefaultHash<doris::StringRef, void> > >,
doris::vectorized::MethodOneNumber<unsigned char, doris::JoinHashTable<unsigned
char, HashCRC32<unsigned char> > >, doris::vectorized::MethodOneNumber<unsigned
short, do [...]
5# doris::pipeline::HashJoinBuildSinkOperatorX::sink(doris::RuntimeState*,
doris::vectorized::Block*, bool) at
/mnt/disk3/xiaolei/doris_master/be/src/pipeline/exec/hashjoin_build_sink.cpp:591
6# doris::pipeline::PipelineTask::execute(bool*) at
/mnt/disk3/xiaolei/doris_master/be/src/pipeline/pipeline_task.cpp:398
7# doris::pipeline::TaskScheduler::_do_work(int) at
/mnt/disk3/xiaolei/doris_master/be/src/pipeline/task_scheduler.cpp:144
8# doris::pipeline::TaskScheduler::start()::$_0::operator()() const at
/mnt/disk3/xiaolei/doris_master/be/src/pipeline/task_scheduler.cpp:63
9# void std::__invoke_impl<void,
doris::pipeline::TaskScheduler::start()::$_0&>(std::__invoke_other,
doris::pipeline::TaskScheduler::start()::$_0&) at
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:61
10# std::enable_if<is_invocable_r_v<void,
doris::pipeline::TaskScheduler::start()::$_0&>, void>::type
std::__invoke_r<void,
doris::pipeline::TaskScheduler::start()::$_0&>(doris::pipeline::TaskScheduler::start()::$_0&)
at
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:117
11# std::_Function_handler<void (),
doris::pipeline::TaskScheduler::start()::$_0>::_M_invoke(std::_Any_data const&)
at
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:290
12# std::function<void ()>::operator()() const at
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:591
13# doris::FunctionRunnable::run() at
/mnt/disk3/xiaolei/doris_master/be/src/util/threadpool.cpp:60
14# doris::ThreadPool::dispatch_thread() at
/mnt/disk3/xiaolei/doris_master/be/src/util/threadpool.cpp:608
15# void std::__invoke_impl<void, void (doris::ThreadPool::*&)(),
doris::ThreadPool*&>(std::__invoke_memfun_deref, void
(doris::ThreadPool::*&)(), doris::ThreadPool*&) at
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:74
16# std::__invoke_result<void (doris::ThreadPool::*&)(),
doris::ThreadPool*&>::type std::__invoke<void (doris::ThreadPool::*&)(),
doris::ThreadPool*&>(void (doris::ThreadPool::*&)(), doris::ThreadPool*&) at
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:96
17# void std::_Bind<void
(doris::ThreadPool::*(doris::ThreadPool*))()>::__call<void, ,
0ul>(std::tuple<>&&, std::_Index_tuple<0ul>) at
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/functional:506
18# void std::_Bind<void
(doris::ThreadPool::*(doris::ThreadPool*))()>::operator()<, void>() at
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/functional:591
19# void std::__invoke_impl<void, std::_Bind<void
(doris::ThreadPool::*(doris::ThreadPool*))()>&>(std::__invoke_other,
std::_Bind<void (doris::ThreadPool::*(doris::ThreadPool*))()>&) at
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:61
20# std::enable_if<is_invocable_r_v<void, std::_Bind<void
(doris::ThreadPool::*(doris::ThreadPool*))()>&>, void>::type
std::__invoke_r<void, std::_Bind<void
(doris::ThreadPool::*(doris::ThreadPool*))()>&>(std::_Bind<void
(doris::ThreadPool::*(doris::ThreadPool*))()>&) at
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/invoke.h:117
21# std::_Function_handler<void (), std::_Bind<void
(doris::ThreadPool::*(doris::ThreadPool*))()> >::_M_invoke(std::_Any_data
const&) at
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:290
22# std::function<void ()>::operator()() const at
/mnt/disk6/common/ldb_toolchain_robin/bin/../lib/gcc/x86_64-linux-gnu/13/../../../../include/c++/13/bits/std_function.h:591
23# doris::Thread::supervise_thread(void*) at
/mnt/disk3/xiaolei/doris_master/be/src/util/thread.cpp:498
24# asan_thread_start(void*) in
/mnt/disk3/xiaolei/doris_master/output/be/lib/doris_be
25# start_thread in /lib64/libpthread.so.0
26# __GI___clone in /lib64/libc.so.6
```
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/pipeline/exec/hashjoin_build_sink.cpp | 3 ++-
be/src/pipeline/pipeline.cpp | 3 ++-
be/src/pipeline/pipeline_task.cpp | 25 +++++++++++++++++++++
.../join/test_slow_close/test_slow_close.out | Bin 114 -> 133 bytes
.../join/test_slow_close/test_slow_close.groovy | 10 +++++++++
5 files changed, 39 insertions(+), 2 deletions(-)
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 084b51510e9..e271dbacba0 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -674,7 +674,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
// the instance which is not build hash table, it's should wait the
signal of hash table build finished.
// but if it's running and signaled == false, maybe the source
operator have closed caused by some short circuit
// return eof will make task marked as wake_up_early
- if (!_shared_hash_table_context->signaled) {
+ // todo: remove signaled after we can guarantee that wake up eraly is
always set accurately
+ if (!_shared_hash_table_context->signaled ||
state->get_task()->wake_up_early()) {
return Status::Error<ErrorCode::END_OF_FILE>("source have closed");
}
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 2dd0394d2ae..27980976d81 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -114,8 +114,9 @@ Status Pipeline::set_sink(DataSinkOperatorPtr& sink) {
void Pipeline::make_all_runnable() {
DBUG_EXECUTE_IF("Pipeline::make_all_runnable.sleep", {
auto pipeline_id =
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
- "Pipeline::make_all_runnable", "pipeline_id", 0);
+ "Pipeline::make_all_runnable.sleep", "pipeline_id", -1);
if (pipeline_id == id()) {
+ LOG(WARNING) << "Pipeline::make_all_runnable.sleep sleep 10s";
sleep(10);
}
});
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 27d1242cd42..d394535de07 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -331,6 +331,18 @@ Status PipelineTask::execute(bool* eos) {
// The status must be runnable
if (!_opened && !_fragment_context->is_canceled()) {
+ DBUG_EXECUTE_IF("PipelineTask::execute.open_sleep", {
+ auto required_pipeline_id =
+
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+ "PipelineTask::execute.open_sleep", "pipeline_id",
-1);
+ auto required_task_id =
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+ "PipelineTask::execute.open_sleep", "task_id", -1);
+ if (required_pipeline_id == pipeline_id() && required_task_id ==
task_id()) {
+ LOG(WARNING) << "PipelineTask::execute.open_sleep sleep 5s";
+ sleep(5);
+ }
+ });
+
if (_wake_up_early) {
*eos = true;
_eos = true;
@@ -507,6 +519,19 @@ Status PipelineTask::execute(bool* eos) {
RETURN_IF_ERROR(close(Status::OK(), false));
}
+ DBUG_EXECUTE_IF("PipelineTask::execute.sink_eos_sleep", {
+ auto required_pipeline_id =
+
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+ "PipelineTask::execute.sink_eos_sleep",
"pipeline_id", -1);
+ auto required_task_id =
+
DebugPoints::instance()->get_debug_param_or_default<int32_t>(
+ "PipelineTask::execute.sink_eos_sleep",
"task_id", -1);
+ if (required_pipeline_id == pipeline_id() && required_task_id
== task_id()) {
+ LOG(WARNING) << "PipelineTask::execute.sink_eos_sleep
sleep 10s";
+ sleep(10);
+ }
+ });
+
status = _sink->sink(_state, block, *eos);
if (status.is<ErrorCode::END_OF_FILE>()) {
diff --git
a/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out
b/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out
index cb92be84e47..5e4d8ec9448 100644
Binary files
a/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out and
b/regression-test/data/query_p0/join/test_slow_close/test_slow_close.out differ
diff --git
a/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy
b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy
index 8d1c33ff923..0b36d2da5ab 100644
---
a/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy
+++
b/regression-test/suites/query_p0/join/test_slow_close/test_slow_close.groovy
@@ -75,4 +75,14 @@ suite("test_slow_close") {
} finally {
GetDebugPoint().disableDebugPointForAllBEs("Pipeline::make_all_runnable.sleep")
}
+
+ sql "set ignore_runtime_filter_ids='0';"
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("PipelineTask::execute.open_sleep",[pipeline_id:
4, task_id: 7])
+
GetDebugPoint().enableDebugPointForAllBEs("PipelineTask::execute.sink_eos_sleep",[pipeline_id:
4, task_id: 15])
+ qt_sql "select count(*),sleep(2) from (select t1.k1 from t5 join
[broadcast] t1 on t1.k1=t5.k1) tmp join [broadcast] t3 join t3 t3s [broadcast]
on tmp.k1=t3.k1 and t3s.k1=t3.k1 where t3.k2=5;"
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("PipelineTask::execute.open_sleep")
+
GetDebugPoint().disableDebugPointForAllBEs("PipelineTask::execute.sink_eos_sleep")
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]