This is an automated email from the ASF dual-hosted git repository.

zclll pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f5df730d6c0 [fix](load) Use atomic operations for _try_close flag and 
remove unused _close_wait (#61593)
f5df730d6c0 is described below

commit f5df730d6c0268e5fc81729f0c6d7c9ce1f4eb3f
Author: zclllyybb <[email protected]>
AuthorDate: Mon Mar 23 15:37:47 2026 +0800

    [fix](load) Use atomic operations for _try_close flag and remove unused 
_close_wait (#61593)
    
    - Change `_try_close` from `bool` to `std::atomic<bool>` with
    acquire/release memory ordering
    - Use `memory_order_acquire` when reading in `_send_batch_process()`
    (bthread)
    - Use `memory_order_release` when writing in `_do_try_close()` (pthread)
    - Remove unused `_close_wait` field
    
    This fixes a potential race condition where `_try_close` is written by
    pthread and read by bthread without proper synchronization.
    
    It's ok on x86 so no test added.
---
 be/src/exec/sink/writer/vtablet_writer.cpp | 5 ++---
 be/src/exec/sink/writer/vtablet_writer.h   | 5 ++---
 2 files changed, 4 insertions(+), 6 deletions(-)

diff --git a/be/src/exec/sink/writer/vtablet_writer.cpp 
b/be/src/exec/sink/writer/vtablet_writer.cpp
index 52097d90357..39b23500e06 100644
--- a/be/src/exec/sink/writer/vtablet_writer.cpp
+++ b/be/src/exec/sink/writer/vtablet_writer.cpp
@@ -1395,7 +1395,7 @@ void VTabletWriter::_send_batch_process() {
         // we must RECHECK opened_nodes below, after got closed signal, 
because it may changed. Think of this:
         //      checked opened_nodes = 0 ---> new block arrived ---> task 
finished, close() was called ---> we got _try_close here
         // if we don't check again, we may lose the last package.
-        if (_try_close) {
+        if (_try_close.load(std::memory_order_acquire)) {
             opened_nodes = 0;
             std::ranges::for_each(_channels,
                                   [&opened_nodes](const 
std::shared_ptr<IndexChannel>& ich) {
@@ -1785,7 +1785,7 @@ void VTabletWriter::_do_try_close(RuntimeState* state, 
const Status& exec_status
         status = _send_new_partition_batch();
     }
 
-    _try_close = true; // will stop periodic thread
+    _try_close.store(true, std::memory_order_release); // will stop periodic 
thread
     if (status.ok()) {
         // BE id -> add_batch method counter
         std::unordered_map<int64_t, AddBatchCounter> 
node_add_batch_counter_map;
@@ -1866,7 +1866,6 @@ void VTabletWriter::_do_try_close(RuntimeState* state, 
const Status& exec_status
     if (!status.ok()) {
         _cancel_all_channel(status);
         _close_status = status;
-        _close_wait = true;
     }
 }
 
diff --git a/be/src/exec/sink/writer/vtablet_writer.h 
b/be/src/exec/sink/writer/vtablet_writer.h
index d9e3869e68e..d3e6e8da0f1 100644
--- a/be/src/exec/sink/writer/vtablet_writer.h
+++ b/be/src/exec/sink/writer/vtablet_writer.h
@@ -752,9 +752,8 @@ private:
     // Save the status of try_close() and close() method
     Status _close_status;
     // if we called try_close(), for auto partition the periodic send thread 
should stop if it's still waiting for node channels first-time open.
-    bool _try_close = false;
-    // for non-pipeline, if close() did something, close_wait() should wait it.
-    bool _close_wait = false;
+    // atomic: written by pthread (_do_try_close), read by bthread 
(_send_batch_process)
+    std::atomic<bool> _try_close {false};
     bool _inited = false;
     bool _write_file_cache = false;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to