This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b7c973eff67 [fix](scheduler) Fix invalid access after freed (#48168)
b7c973eff67 is described below

commit b7c973eff6766fd43c14d38f43c579a2e603183a
Author: Gabriel <[email protected]>
AuthorDate: Fri Feb 21 14:02:34 2025 +0800

    [fix](scheduler) Fix invalid access after freed (#48168)
    
    WRITE of size 1 at 0x6160007e86f0 thread T1983 (Pipe_normal [wo)
    #0 0x55fc8065b975 in std::__atomic_base<bool>::store(bool,
    std::memory_order)
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/atomic_base.h:457:2
    #1 0x55fc8065b975 in std::__atomic_base<bool>::operator=(bool)
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/atomic_base.h:349:2
    #2 0x55fc8065b975 in std::atomic<bool>::operator=(bool)
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/atomic:80:22
    #3 0x55fc8065b975 in doris::pipeline::PipelineTask::set_running(bool)
    /root/doris/be/src/pipeline/pipeline_task.h:192:47
    #4 0x55fc8065b975 in
    doris::pipeline::TaskScheduler::_do_work(int)::$_0::operator()() const
    /root/doris/be/src/pipeline/task_scheduler.cpp:121:23
    #5 0x55fc8065b975 in
    doris::Defer<doris::pipeline::TaskScheduler::_do_work(int)::$_0>::~Defer()
    /root/doris/be/src/util/defer_op.h:37:16
    #6 0x55fc8065b975 in doris::pipeline::TaskScheduler::_do_work(int)
    /root/doris/be/src/pipeline/task_scheduler.cpp:162:5
    #7 0x55fc4c57cd19 in doris::ThreadPool::dispatch_thread()
    /root/doris/be/src/util/threadpool.cpp:608:24
    #8 0x55fc4c55395e in std::function<void ()>::operator()() const
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_function.h:560:9
    #9 0x55fc4c55395e in doris::Thread::supervise_thread(void*)
    /root/doris/be/src/util/thread.cpp:498:5
    #10 0x7f9ee3d25608 in start_thread
    /build/glibc-SzIz7B/glibc-2.31/nptl/pthread_create.c:477:8
    #11 0x7f9ee3fd2132 in __clone
    
/build/glibc-SzIz7B/glibc-2.31/misc/../sysdeps/unix/sysv/linux/x86_64/clone.S:95
    
    0x6160007e86f0 is located 624 bytes inside of 632-byte region
    [0x6160007e8480,0x6160007e86f8)
    freed by thread T1981 (Pipe_normal [wo) here:
    #0 0x55fc47aa680d in operator delete(void*)
    
(/mnt/ssd01/pipline/OpenSourceDoris/clusterEnv/P0/Cluster0/be/lib/doris_be+0x3376e80d)
    (BuildId: 865149e62959581e)
    #1 0x55fc8059db84 in
    
std::default_delete<doris::pipeline::PipelineTask>::operator()(doris::pipeline::PipelineTask*)
    const
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:85:2
    #2 0x55fc8059db84 in std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >::~unique_ptr()
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/unique_ptr.h:361:4
    #3 0x55fc8059db84 in void
    std::destroy_at<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >
    >(std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >*)
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:88:15
    #4 0x55fc8059db84 in void
    std::_Destroy<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >
    >(std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >*)
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:138:7
    #5 0x55fc8059db84 in void
    
std::_Destroy_aux<false>::__destroy<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask>
    >*>(std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >*,
    std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >*)
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:152:6
    #6 0x55fc8059db84 in void
    std::_Destroy<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask>
    >*>(std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >*,
    std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >*)
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:184:7
    #7 0x55fc8059db84 in void
    std::_Destroy<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >*,
    std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >
    >(std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >*,
    std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >*,
    std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> > >&)
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/alloc_traits.h:746:7
    #8 0x55fc8059db84 in
    std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >,
    std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> > > >::~vector()
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_vector.h:680:2
    #9 0x55fc8052571c in void
    std::destroy_at<std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >,
    std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> > > >
    >(std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >,
    std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> > > >*)
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:88:15
    #10 0x55fc8052571c in void
    std::_Destroy<std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >,
    std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> > > >
    >(std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >,
    std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> > > >*)
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:138:7
    #11 0x55fc8052571c in void
    
std::_Destroy_aux<false>::__destroy<std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >,
    std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> > >
    >*>(std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >,
    std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> > > >*,
    std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >,
    std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> > > >*)
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:152:6
    #12 0x55fc8052571c in void
    std::_Destroy<std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >,
    std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> > >
    >*>(std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >,
    std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> > > >*,
    std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >,
    std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> > > >*)
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_construct.h:184:7
    #13 0x55fc8052571c in void
    std::_Destroy<std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >,
    std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> > > >*,
    std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >,
    std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> > > >
    >(std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >,
    std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> > > >*,
    std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >,
    std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> > > >*,
    std::allocator<std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >,
    std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> > > > >&)
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/alloc_traits.h:746:7
    #14 0x55fc8052571c in
    std::vector<std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >,
    std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> > > >,
    std::allocator<std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >,
    std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> > > > >
    
>::_M_erase_at_end(std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >,
    std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> > > >*)
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_vector.h:1796:6
    #15 0x55fc8052571c in
    std::vector<std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >,
    std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> > > >,
    std::allocator<std::vector<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> >,
    std::allocator<std::unique_ptr<doris::pipeline::PipelineTask,
    std::default_delete<doris::pipeline::PipelineTask> > > > > >::clear()
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/stl_vector.h:1499:9
    #16 0x55fc8052571c in
    doris::pipeline::PipelineFragmentContext::~PipelineFragmentContext()
    /root/doris/be/src/pipeline/pipeline_fragment_context.cpp:142:12
    #17 0x55fc47ad30cc in
    std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release()
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:168:6
    #18 0x55fc80658d57 in
    std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count()
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:702:11
    #19 0x55fc80658d57 in std::__shared_ptr<doris::TaskExecutionContext,
    (__gnu_cxx::_Lock_policy)2>::~__shared_ptr()
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/shared_ptr_base.h:1149:31
    #20 0x55fc80658d57 in
    doris::pipeline::close_task(doris::pipeline::PipelineTask*,
    doris::Status) /root/doris/be/src/pipeline/task_scheduler.cpp:100:1
    #21 0x55fc8065aa17 in doris::pipeline::TaskScheduler::_do_work(int)
    /root/doris/be/src/pipeline/task_scheduler.cpp:160:36
    #22 0x55fc4c57cd19 in doris::ThreadPool::dispatch_thread()
    /root/doris/be/src/util/threadpool.cpp:608:24
    #23 0x55fc4c55395e in std::function<void ()>::operator()() const
    
/var/local/ldb-toolchain/bin/../lib/gcc/x86_64-linux-gnu/11/../../../../include/c++/11/bits/std_function.h:560:9
    #24 0x55fc4c55395e in doris::Thread::supervise_thread(void*)
    /root/doris/be/src/util/thread.cpp:498:5
    #25 0x7f9ee3d25608 in start_thread
    /build/glibc-SzIz7B/glibc-2.31/nptl/pthread_create.c:477:8
---
 be/src/pipeline/pipeline_fragment_context.cpp |  4 +---
 be/src/pipeline/pipeline_fragment_context.h   |  2 +-
 be/src/pipeline/task_scheduler.cpp            | 29 ++++++++++++++-------------
 3 files changed, 17 insertions(+), 18 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 00af07f7bdd..adb302de9a6 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1753,7 +1753,7 @@ void PipelineFragmentContext::_close_fragment_instance() {
             
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
 }
 
-bool PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
+void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
     // If all tasks of this pipeline has been closed, upstream tasks is never 
needed, and we just make those runnable here
     DCHECK(_pip_id_to_pipeline.contains(pipeline_id));
     if (_pip_id_to_pipeline[pipeline_id]->close_task()) {
@@ -1767,9 +1767,7 @@ bool 
PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) {
     ++_closed_tasks;
     if (_closed_tasks == _total_tasks) {
         _close_fragment_instance();
-        return true;
     }
-    return false;
 }
 
 Status PipelineFragmentContext::send_report(bool done) {
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 6fa4925e302..dea8f73d09e 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -100,7 +100,7 @@ public:
 
     [[nodiscard]] int get_fragment_id() const { return _fragment_id; }
 
-    bool decrement_running_task(PipelineId pipeline_id);
+    void decrement_running_task(PipelineId pipeline_id);
 
     Status send_report(bool);
 
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 7948a853799..1436bea565e 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -88,15 +88,12 @@ bool close_task(PipelineTask* task, Status exec_status) {
                                     
print_id(task->query_context()->query_id()),
                                     exec_status.to_string());
     }
-    // decrement_running_task may delete fragment context and will core in 
some defer
-    // code, because the defer code will access fragment context itself.
-    auto lock_for_context = task->fragment_context()->shared_from_this();
     Status status = task->close(exec_status);
     if (!status.ok()) {
         task->fragment_context()->cancel(status);
     }
     task->finalize();
-    return 
task->fragment_context()->decrement_running_task(task->pipeline_id());
+    return true;
 }
 
 void TaskScheduler::_do_work(int index) {
@@ -114,10 +111,20 @@ void TaskScheduler::_do_work(int index) {
         }
         task->log_detail_if_need();
         task->set_running(true);
-        bool fragment_is_finished = false;
+        bool eos = false;
+        auto status = Status::OK();
         Defer task_running_defer {[&]() {
             // If fragment is finished, fragment context will be 
de-constructed with all tasks in it.
-            if (!fragment_is_finished) {
+            if (eos || !status.ok()) {
+                // decrement_running_task may delete fragment context and will 
core in some defer
+                // code, because the defer code will access fragment context 
itself.
+                auto lock_for_context = 
task->fragment_context()->shared_from_this();
+                bool close = close_task(task, status);
+                task->set_running(false);
+                if (close) {
+                    
task->fragment_context()->decrement_running_task(task->pipeline_id());
+                }
+            } else {
                 task->set_running(false);
             }
         }};
@@ -127,12 +134,10 @@ void TaskScheduler::_do_work(int index) {
 
         // Close task if canceled
         if (canceled) {
-            fragment_is_finished = close_task(task, 
fragment_ctx->get_query_ctx()->exec_status());
+            status = fragment_ctx->get_query_ctx()->exec_status();
+            DCHECK(!status.ok());
             continue;
         }
-
-        bool eos = false;
-        auto status = Status::OK();
         task->set_core_id(index);
 
         // Main logics of execution
@@ -155,10 +160,6 @@ void TaskScheduler::_do_work(int index) {
                 } else { status = task->execute(&eos); },
                 status);
         fragment_ctx->trigger_report_if_necessary();
-
-        if (eos || !status.ok()) {
-            fragment_is_finished = close_task(task, status);
-        }
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to