This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b7c973eff67 [fix](scheduler) Fix invalid access after freed (#48168)
b7c973eff67 is described below
commit b7c973eff6766fd43c14d38f43c579a2e603183a
Author: Gabriel <[email protected]>
AuthorDate: Fri Feb 21 14:02:34 2025 +0800
[fix](scheduler) Fix invalid access after freed (#48168)
WRITE of size 1 at 0x6160007e86f0 thread T1983 (Pipe_normal [wo)
#0 0x55fc8065b975 in std::__atomic_base<bool>::store(bool,
std::memory_order)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/atomic_base.h:457:2
#1 0x55fc8065b975 in std::__atomic_base<bool>::operator=(bool)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/atomic_base.h:349:2
#2 0x55fc8065b975 in std::atomic<bool>::operator=(bool)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/atomic:80:22
#3 0x55fc8065b975 in doris::pipeline::PipelineTask::set_running(bool)
/root/doris/be/src/pipeline/pipeline_task.h:192:47
#4 0x55fc8065b975 in
doris::pipeline::TaskScheduler::_do_work(int)::$_0::operator()() const
/root/doris/be/src/pipeline/task_scheduler.cpp:121:23
#5 0x55fc8065b975 in
doris::Defer<doris::pipeline::TaskScheduler::_do_work(int)::$_0>::~Defer()
/root/doris/be/src/util/defer_op.h:37:16
#6 0x55fc8065b975 in doris::pipeline::TaskScheduler::_do_work(int)
/root/doris/be/src/pipeline/task_scheduler.cpp:162:5
#7 0x55fc4c57cd19 in doris::ThreadPool::dispatch_thread()
/root/doris/be/src/util/threadpool.cpp:608:24
#8 0x55fc4c55395e in std::function<void ()>::operator()() const
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_function.h:560:9
#9 0x55fc4c55395e in doris::Thread::supervise_thread(void*)
/root/doris/be/src/util/thread.cpp:498:5
#10 0x7f9ee3d25608 in start_thread
/build/glibc-SzIz7B/glibc-2.31/nptl/pthread_create.c:477:8
#11 0x7f9ee3fd2132 in __clone
/build/glibc-SzIz7B/glibc-2.31/misc/../sysdeps/unix/sysv/linux/x86_64/clone.S:95
0x6160007e86f0 is located 624 bytes inside of 632-byte region
[0x6160007e8480,0x6160007e86f8)
freed by thread T1981 (Pipe_normal [wo) here:
#0 0x55fc47aa680d in operator delete(void*)
(/mnt/ssd01/pipline/OpenSourceDoris/clusterEnv/P0/Cluster0/be/lib/doris_be+0x3376e80d)
(BuildId: 865149e62959581e)
#1 0x55fc8059db84 in
std::default_delete<doris::pipeline::PipelineTask>::operator()(doris::pipeline::PipelineTask*)
const
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:85:2
#2 0x55fc8059db84 in std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >::~unique_ptr()
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:361:4
#3 0x55fc8059db84 in void
std::destroy_at<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >
>(std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >*)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:88:15
#4 0x55fc8059db84 in void
std::_Destroy<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >
>(std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >*)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:138:7
#5 0x55fc8059db84 in void
std::_Destroy_aux<false>::__destroy<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask>
>*>(std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >*,
std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >*)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:152:6
#6 0x55fc8059db84 in void
std::_Destroy<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask>
>*>(std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >*,
std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >*)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:184:7
#7 0x55fc8059db84 in void
std::_Destroy<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >*,
std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >
>(std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >*,
std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >*,
std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> > >&)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/alloc_traits.h:746:7
#8 0x55fc8059db84 in
std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >,
std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> > > >::~vector()
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_vector.h:680:2
#9 0x55fc8052571c in void
std::destroy_at<std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >,
std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> > > >
>(std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >,
std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> > > >*)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:88:15
#10 0x55fc8052571c in void
std::_Destroy<std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >,
std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> > > >
>(std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >,
std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> > > >*)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:138:7
#11 0x55fc8052571c in void
std::_Destroy_aux<false>::__destroy<std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >,
std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> > >
>*>(std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >,
std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> > > >*,
std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >,
std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> > > >*)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:152:6
#12 0x55fc8052571c in void
std::_Destroy<std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >,
std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> > >
>*>(std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >,
std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> > > >*,
std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >,
std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> > > >*)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:184:7
#13 0x55fc8052571c in void
std::_Destroy<std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >,
std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> > > >*,
std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >,
std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> > > >
>(std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >,
std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> > > >*,
std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >,
std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> > > >*,
std::allocator<std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >,
std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> > > > >&)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/alloc_traits.h:746:7
#14 0x55fc8052571c in
std::vector<std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >,
std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> > > >,
std::allocator<std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >,
std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> > > > >
>::_M_erase_at_end(std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >,
std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> > > >*)
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_vector.h:1796:6
#15 0x55fc8052571c in
std::vector<std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >,
std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> > > >,
std::allocator<std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> >,
std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
std::default_delete<doris::pipeline::PipelineTask> > > > > >::clear()
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_vector.h:1499:9
#16 0x55fc8052571c in
doris::pipeline::PipelineFragmentContext::~PipelineFragmentContext()
/root/doris/be/src/pipeline/pipeline_fragment_context.cpp:142:12
#17 0x55fc47ad30cc in
std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release()
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:168:6
#18 0x55fc80658d57 in
std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count()
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:702:11
#19 0x55fc80658d57 in std::__shared_ptr<doris::TaskExecutionContext,
(__gnu_cxx::_Lock_policy)2>::~__shared_ptr()
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:1149:31
#20 0x55fc80658d57 in
doris::pipeline::close_task(doris::pipeline::PipelineTask*,
doris::Status) /root/doris/be/src/pipeline/task_scheduler.cpp:100:1
#21 0x55fc8065aa17 in doris::pipeline::TaskScheduler::_do_work(int)
/root/doris/be/src/pipeline/task_scheduler.cpp:160:36
#22 0x55fc4c57cd19 in doris::ThreadPool::dispatch_thread()
/root/doris/be/src/util/threadpool.cpp:608:24
#23 0x55fc4c55395e in std::function<void ()>::operator()() const
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_function.h:560:9
#24 0x55fc4c55395e in doris::Thread::supervise_thread(void*)
/root/doris/be/src/util/thread.cpp:498:5
#25 0x7f9ee3d25608 in start_thread
/build/glibc-SzIz7B/glibc-2.31/nptl/pthread_create.c:477:8
---
be/src/pipeline/pipeline_fragment_context.cpp | 4 +---
be/src/pipeline/pipeline_fragment_context.h | 2 +-
be/src/pipeline/task_scheduler.cpp | 29 ++++++++++++++-------------
3 files changed, 17 insertions(+), 18 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 00af07f7bdd..adb302de9a6 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1753,7 +1753,7 @@ void PipelineFragmentContext::_close_fragment_instance() {
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
}
-bool PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
+void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
// If all tasks of this pipeline has been closed, upstream tasks is never
needed, and we just make those runnable here
DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
@@ -1767,9 +1767,7 @@ bool
PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
++_closed_tasks;
if (_closed_tasks == _total_tasks) {
_close_fragment_instance();
- return true;
}
- return false;
}
Status PipelineFragmentContext::send_report(bool done) {
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 6fa4925e302..dea8f73d09e 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -100,7 +100,7 @@ public:
[[nodiscard]] int get_fragment_id() const { return _fragment_id; }
- bool decrement_running_task(PipelineId pipeline_id);
+ void decrement_running_task(PipelineId pipeline_id);
Status send_report(bool);
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 7948a853799..1436bea565e 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -88,15 +88,12 @@ bool close_task(PipelineTask* task, Status exec_status) {
print_id(task->query_context()->query_id()),
exec_status.to_string());
}
- // decrement_running_task may delete fragment context and will core in
some defer
- // code, because the defer code will access fragment context itself.
- auto lock_for_context = task->fragment_context()->shared_from_this();
Status status = task->close(exec_status);
if (!status.ok()) {
task->fragment_context()->cancel(status);
}
task->finalize();
- return
task->fragment_context()->decrement_running_task(task->pipeline_id());
+ return true;
}
void TaskScheduler::_do_work(int index) {
@@ -114,10 +111,20 @@ void TaskScheduler::_do_work(int index) {
}
task->log_detail_if_need();
task->set_running(true);
- bool fragment_is_finished = false;
+ bool eos = false;
+ auto status = Status::OK();
Defer task_running_defer {[&]() {
// If fragment is finished, fragment context will be
de-constructed with all tasks in it.
- if (!fragment_is_finished) {
+ if (eos || !status.ok()) {
+ // decrement_running_task may delete fragment context and will
core in some defer
+ // code, because the defer code will access fragment context
itself.
+ auto lock_for_context =
task->fragment_context()->shared_from_this();
+ bool close = close_task(task, status);
+ task->set_running(false);
+ if (close) {
+
task->fragment_context()->decrement_running_task(task->pipeline_id());
+ }
+ } else {
task->set_running(false);
}
}};
@@ -127,12 +134,10 @@ void TaskScheduler::_do_work(int index) {
// Close task if canceled
if (canceled) {
- fragment_is_finished = close_task(task,
fragment_ctx->get_query_ctx()->exec_status());
+ status = fragment_ctx->get_query_ctx()->exec_status();
+ DCHECK(!status.ok());
continue;
}
-
- bool eos = false;
- auto status = Status::OK();
task->set_core_id(index);
// Main logics of execution
@@ -155,10 +160,6 @@ void TaskScheduler::_do_work(int index) {
} else { status = task->execute(&eos); },
status);
fragment_ctx->trigger_report_if_necessary();
-
- if (eos || !status.ok()) {
- fragment_is_finished = close_task(task, status);
- }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]