This is an automated email from the ASF dual-hosted git repository.

sollhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2e72b96abfd [fix](load) replace tablet writer close polling with event 
wakeup (#64221)
2e72b96abfd is described below

commit 2e72b96abfdb4dd6dc8bb4e6eb3060ec825f06f8
Author: hui lai <[email protected]>
AuthorDate: Tue Jun 23 19:38:04 2026 +0800

    [fix](load) replace tablet writer close polling with event wakeup (#64221)
    
    ### What problem does this PR solve?
    
    `vtablet_writer` and `vtablet_writer_v2` used fixed 10ms polling loops
    while waiting for downstream node channels / load streams to finish
    close or reach quorum success. When downstream recovery is slow,
    upstream close wait may repeatedly scan unfinished channels and consume
    unnecessary CPU.
    
    This PR changes close wait to an event-driven wakeup model:
    
    - `vtablet_writer`:
    - Adds a close wait condition variable and version counter in
    `IndexChannel`.
    - `VNodeChannel` notifies close wait when the last add-block RPC
    finishes or when the channel is cancelled.
    - `IndexChannel::close_wait()` waits on the notification instead of
    polling every 10ms.
    
    - `vtablet_writer_v2`:
      - Adds close wait notification helpers in `LoadStreamStub`.
      - Stream close and cancel paths notify close wait.
    - `VTabletWriterV2::_close_wait()` waits on stream close events instead
    of polling every 10ms.
    
    The existing quorum success logic and max wait timeout behavior are
    preserved. A bounded fallback wait is kept so timeout and cancellation
    state can still be refreshed even if no downstream event arrives.
---
 be/src/exec/sink/load_stream_map_pool.cpp     | 17 ++++++++++---
 be/src/exec/sink/load_stream_map_pool.h       |  5 ++++
 be/src/exec/sink/load_stream_stub.cpp         | 32 ++++++++++++++++++++++--
 be/src/exec/sink/load_stream_stub.h           | 36 ++++++++++++++++++++++-----
 be/src/exec/sink/writer/vtablet_writer.cpp    | 26 +++++++++++++++++--
 be/src/exec/sink/writer/vtablet_writer.h      | 13 ++++++++++
 be/src/exec/sink/writer/vtablet_writer_v2.cpp | 11 ++++++--
 be/test/exec/sink/vtablet_writer_v2_test.cpp  | 21 ++++++++++++++++
 8 files changed, 145 insertions(+), 16 deletions(-)

diff --git a/be/src/exec/sink/load_stream_map_pool.cpp 
b/be/src/exec/sink/load_stream_map_pool.cpp
index 047881d156d..6d87fc19840 100644
--- a/be/src/exec/sink/load_stream_map_pool.cpp
+++ b/be/src/exec/sink/load_stream_map_pool.cpp
@@ -31,7 +31,8 @@ LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t 
src_id, int num_streams,
           _num_incremental_streams(0),
           _pool(pool),
           _tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
-          _enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) {
+          _enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()),
+          _close_wait_notifier(std::make_shared<CloseWaitNotifier>()) {
     DCHECK(num_streams > 0) << "stream num should be greater than 0";
     DCHECK(num_use > 0) << "use num should be greater than 0";
 }
@@ -45,9 +46,9 @@ std::shared_ptr<LoadStreamStubs> 
LoadStreamMap::get_or_create(int64_t dst_id, bo
     if (incremental) {
         _num_incremental_streams.fetch_add(1);
     }
-    streams = std::make_shared<LoadStreamStubs>(_num_streams, _load_id, 
_src_id,
-                                                _tablet_schema_for_index,
-                                                _enable_unique_mow_for_index, 
incremental);
+    streams = std::make_shared<LoadStreamStubs>(
+            _num_streams, _load_id, _src_id, _tablet_schema_for_index, 
_enable_unique_mow_for_index,
+            incremental, _close_wait_notifier);
     _streams_for_node[dst_id] = streams;
     return streams;
 }
@@ -129,6 +130,14 @@ void LoadStreamMap::close_load(bool incremental) {
     }
 }
 
+int64_t LoadStreamMap::close_wait_version() const {
+    return _close_wait_notifier->close_wait_version();
+}
+
+void LoadStreamMap::wait_for_close_event(int64_t observed_version, int64_t 
timeout_ms) {
+    _close_wait_notifier->wait_for_close_event(observed_version, timeout_ms);
+}
+
 LoadStreamMapPool::LoadStreamMapPool() = default;
 
 LoadStreamMapPool::~LoadStreamMapPool() = default;
diff --git a/be/src/exec/sink/load_stream_map_pool.h 
b/be/src/exec/sink/load_stream_map_pool.h
index 3b6b15ff3b6..131d1d1d88b 100644
--- a/be/src/exec/sink/load_stream_map_pool.h
+++ b/be/src/exec/sink/load_stream_map_pool.h
@@ -96,6 +96,10 @@ public:
     // only call this method after release() returns true.
     void close_load(bool incremental);
 
+    int64_t close_wait_version() const;
+
+    void wait_for_close_event(int64_t observed_version, int64_t timeout_ms);
+
     std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>> 
get_streams_for_node() {
         decltype(_streams_for_node) snapshot;
         {
@@ -116,6 +120,7 @@ private:
     LoadStreamMapPool* _pool = nullptr;
     std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
     std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
+    std::shared_ptr<CloseWaitNotifier> _close_wait_notifier;
 
     std::mutex _tablets_to_commit_mutex;
     std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>> 
_tablets_to_commit;
diff --git a/be/src/exec/sink/load_stream_stub.cpp 
b/be/src/exec/sink/load_stream_stub.cpp
index 7c74cd20676..e46eec24333 100644
--- a/be/src/exec/sink/load_stream_stub.cpp
+++ b/be/src/exec/sink/load_stream_stub.cpp
@@ -30,6 +30,24 @@
 
 namespace doris {
 
+int64_t CloseWaitNotifier::close_wait_version() const {
+    return _close_wait_version.load(std::memory_order_acquire);
+}
+
+void CloseWaitNotifier::wait_for_close_event(int64_t observed_version, int64_t 
timeout_ms) {
+    std::unique_lock<bthread::Mutex> lock(_close_wait_mutex);
+    if (observed_version != close_wait_version()) {
+        return;
+    }
+    static_cast<void>(_close_wait_cv.wait_for(lock, timeout_ms * 1000));
+}
+
+void CloseWaitNotifier::notify_close_wait() {
+    _close_wait_version.fetch_add(1, std::memory_order_acq_rel);
+    std::lock_guard<bthread::Mutex> lock(_close_wait_mutex);
+    _close_wait_cv.notify_all();
+}
+
 int LoadStreamReplyHandler::on_received_messages(brpc::StreamId id, 
butil::IOBuf* const messages[],
                                                  size_t size) {
     auto stub = _stub.lock();
@@ -118,6 +136,7 @@ void LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
         return;
     }
     stub->_is_closed.store(true);
+    stub->notify_close_wait();
 }
 
 inline std::ostream& operator<<(std::ostream& ostr, const 
LoadStreamReplyHandler& handler) {
@@ -128,12 +147,16 @@ inline std::ostream& operator<<(std::ostream& ostr, const 
LoadStreamReplyHandler
 
 LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id,
                                std::shared_ptr<IndexToTabletSchema> schema_map,
-                               std::shared_ptr<IndexToEnableMoW> mow_map, bool 
incremental)
+                               std::shared_ptr<IndexToEnableMoW> mow_map, bool 
incremental,
+                               std::shared_ptr<CloseWaitNotifier> 
close_wait_notifier)
         : _load_id(load_id),
           _src_id(src_id),
           _tablet_schema_for_index(schema_map),
           _enable_unique_mow_for_index(mow_map),
-          _is_incremental(incremental) {};
+          _is_incremental(incremental),
+          _close_wait_notifier(std::move(close_wait_notifier)) {
+    DCHECK(_close_wait_notifier != nullptr);
+};
 
 LoadStreamStub::~LoadStreamStub() {
     if (_is_open.load() && !_is_closed.load()) {
@@ -364,6 +387,10 @@ Status LoadStreamStub::close_finish_check(RuntimeState* 
state, bool* is_closed)
     return Status::OK();
 }
 
+void LoadStreamStub::notify_close_wait() {
+    _close_wait_notifier->notify_close_wait();
+}
+
 void LoadStreamStub::cancel(Status reason) {
     LOG(WARNING) << *this << " is cancelled because of " << reason;
     if (_is_open.load()) {
@@ -375,6 +402,7 @@ void LoadStreamStub::cancel(Status reason) {
         _is_cancelled.store(true);
     }
     _is_closed.store(true);
+    notify_close_wait();
 }
 
 Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const 
Slice> data) {
diff --git a/be/src/exec/sink/load_stream_stub.h 
b/be/src/exec/sink/load_stream_stub.h
index 622ee49aeaa..a6699ba639e 100644
--- a/be/src/exec/sink/load_stream_stub.h
+++ b/be/src/exec/sink/load_stream_stub.h
@@ -81,6 +81,20 @@ using IndexToEnableMoW =
                                       std::allocator<phmap::Pair<const 
int64_t, bool>>, 4,
                                       std::mutex>;
 
+class CloseWaitNotifier {
+public:
+    int64_t close_wait_version() const;
+
+    void wait_for_close_event(int64_t observed_version, int64_t timeout_ms);
+
+    void notify_close_wait();
+
+private:
+    std::atomic<int64_t> _close_wait_version {0};
+    bthread::Mutex _close_wait_mutex;
+    bthread::ConditionVariable _close_wait_cv;
+};
+
 class LoadStreamReplyHandler : public brpc::StreamInputHandler {
 public:
     LoadStreamReplyHandler(PUniqueId load_id, int64_t dst_id, 
std::weak_ptr<LoadStreamStub> stub)
@@ -108,12 +122,17 @@ public:
     // construct new stub
     LoadStreamStub(PUniqueId load_id, int64_t src_id,
                    std::shared_ptr<IndexToTabletSchema> schema_map,
-                   std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental 
= false);
+                   std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental 
= false,
+                   std::shared_ptr<CloseWaitNotifier> close_wait_notifier =
+                           std::make_shared<CloseWaitNotifier>());
 
     LoadStreamStub(UniqueId load_id, int64_t src_id,
                    std::shared_ptr<IndexToTabletSchema> schema_map,
-                   std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental 
= false)
-            : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map, 
incremental) {};
+                   std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental 
= false,
+                   std::shared_ptr<CloseWaitNotifier> close_wait_notifier =
+                           std::make_shared<CloseWaitNotifier>())
+            : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map, 
incremental,
+                             std::move(close_wait_notifier)) {};
 
 // for mock this class in UT
 #ifdef BE_TEST
@@ -243,6 +262,8 @@ public:
                     tablet_load_infos);
 
 private:
+    void notify_close_wait();
+
     Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data 
= {});
     Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
     Status _send_with_retry(butil::IOBuf& buf);
@@ -281,6 +302,7 @@ protected:
     std::unordered_map<int64_t, Status> _failed_tablets;
 
     bool _is_incremental = false;
+    std::shared_ptr<CloseWaitNotifier> _close_wait_notifier;
 
     bthread::Mutex _write_mutex;
     size_t _bytes_written = 0;
@@ -293,12 +315,14 @@ class LoadStreamStubs {
 public:
     LoadStreamStubs(size_t num_streams, UniqueId load_id, int64_t src_id,
                     std::shared_ptr<IndexToTabletSchema> schema_map,
-                    std::shared_ptr<IndexToEnableMoW> mow_map, bool 
incremental = false)
+                    std::shared_ptr<IndexToEnableMoW> mow_map, bool 
incremental = false,
+                    std::shared_ptr<CloseWaitNotifier> close_wait_notifier =
+                            std::make_shared<CloseWaitNotifier>())
             : _is_incremental(incremental) {
         _streams.reserve(num_streams);
         for (size_t i = 0; i < num_streams; i++) {
-            _streams.emplace_back(
-                    new LoadStreamStub(load_id, src_id, schema_map, mow_map, 
incremental));
+            _streams.emplace_back(new LoadStreamStub(load_id, src_id, 
schema_map, mow_map,
+                                                     incremental, 
close_wait_notifier));
         }
     }
 
diff --git a/be/src/exec/sink/writer/vtablet_writer.cpp 
b/be/src/exec/sink/writer/vtablet_writer.cpp
index 47b67cbfc9e..ddc2923392e 100644
--- a/be/src/exec/sink/writer/vtablet_writer.cpp
+++ b/be/src/exec/sink/writer/vtablet_writer.cpp
@@ -101,6 +101,8 @@ bvar::PerSecond<bvar::Adder<int64_t>> 
g_sink_write_rows_per_second("sink_through
 bvar::Adder<int64_t> g_sink_load_back_pressure_version_time_ms(
         "load_back_pressure_version_time_ms");
 
+static constexpr int64_t CLOSE_WAIT_EVENT_FALLBACK_MS = 1000;
+
 Status IndexChannel::init(RuntimeState* state, const 
std::vector<TTabletWithPartition>& tablets,
                           bool incremental) {
     SCOPED_CONSUME_MEM_TRACKER(_index_channel_tracker.get());
@@ -308,6 +310,20 @@ static Status 
cancel_channel_and_check_intolerable_failure(Status status,
     return status;
 }
 
+void IndexChannel::wait_for_close_event(int64_t observed_version, int64_t 
timeout_ms) {
+    std::unique_lock<bthread::Mutex> lock(_close_wait_mutex);
+    if (observed_version != close_wait_version()) {
+        return;
+    }
+    static_cast<void>(_close_wait_cv.wait_for(lock, timeout_ms * 1000));
+}
+
+void IndexChannel::notify_close_wait() {
+    _close_wait_version.fetch_add(1, std::memory_order_acq_rel);
+    std::lock_guard<bthread::Mutex> lock(_close_wait_mutex);
+    _close_wait_cv.notify_all();
+}
+
 Status IndexChannel::close_wait(
         RuntimeState* state, WriterStats* writer_stats,
         std::unordered_map<int64_t, AddBatchCounter>* 
node_add_batch_counter_map,
@@ -329,6 +345,7 @@ Status IndexChannel::close_wait(
         }
     }
     while (true) {
+        int64_t close_wait_version = this->close_wait_version();
         RETURN_IF_ERROR(check_each_node_channel_close(
                 &unfinished_node_channel_ids, node_add_batch_counter_map, 
writer_stats, status));
         bool quorum_success = _quorum_success(unfinished_node_channel_ids, 
need_finish_tablets);
@@ -339,7 +356,7 @@ Status IndexChannel::close_wait(
                       << ", load_id: " << print_id(_parent->_load_id);
             break;
         }
-        bthread_usleep(1000 * 10);
+        wait_for_close_event(close_wait_version, CLOSE_WAIT_EVENT_FALLBACK_MS);
     }
 
     // 2. wait for all node channel to complete as much as possible
@@ -347,6 +364,7 @@ Status IndexChannel::close_wait(
         int64_t arrival_quorum_success_time = UnixMillis();
         int64_t max_wait_time_ms = 
_calc_max_wait_time_ms(unfinished_node_channel_ids);
         while (true) {
+            int64_t close_wait_version = this->close_wait_version();
             
RETURN_IF_ERROR(check_each_node_channel_close(&unfinished_node_channel_ids,
                                                           
node_add_batch_counter_map, writer_stats,
                                                           status));
@@ -370,7 +388,8 @@ Status IndexChannel::close_wait(
                              << unfinished_node_channel_host_str.str();
                 break;
             }
-            bthread_usleep(1000 * 10);
+            wait_for_close_event(close_wait_version, 
std::min(CLOSE_WAIT_EVENT_FALLBACK_MS,
+                                                              max_wait_time_ms 
- elapsed_ms));
         }
     }
     return status;
@@ -868,6 +887,7 @@ void VNodeChannel::_cancel_with_msg(const std::string& msg) 
{
         }
     }
     _cancelled = true;
+    _index_channel->notify_close_wait();
 }
 
 void VNodeChannel::_refresh_back_pressure_version_wait_time(
@@ -1140,6 +1160,7 @@ void VNodeChannel::_add_block_success_callback(const 
PTabletWriterAddBlockResult
                 }
             }
             _add_batches_finished = true;
+            _index_channel->notify_close_wait();
         }
     } else {
         _cancel_with_msg(fmt::format("{}, add batch req success but status 
isn't ok, err: {}",
@@ -1190,6 +1211,7 @@ void VNodeChannel::_add_block_failed_callback(const 
WriteBlockCallbackContext& c
         // if this is last rpc, will must set _add_batches_finished. 
otherwise, node channel's close_wait
         // will be blocked.
         _add_batches_finished = true;
+        _index_channel->notify_close_wait();
     }
 }
 
diff --git a/be/src/exec/sink/writer/vtablet_writer.h 
b/be/src/exec/sink/writer/vtablet_writer.h
index d3e6e8da0f1..69b78fa280c 100644
--- a/be/src/exec/sink/writer/vtablet_writer.h
+++ b/be/src/exec/sink/writer/vtablet_writer.h
@@ -31,6 +31,7 @@
 #include <google/protobuf/stubs/callback.h>
 
 // IWYU pragma: no_include <bits/chrono.h>
+#include <bthread/condition_variable.h>
 #include <bthread/mutex.h>
 
 #include <atomic>
@@ -524,6 +525,14 @@ public:
                       std::unordered_set<int64_t> unfinished_node_channel_ids,
                       bool need_wait_after_quorum_success);
 
+    int64_t close_wait_version() const {
+        return _close_wait_version.load(std::memory_order_acquire);
+    }
+
+    void wait_for_close_event(int64_t observed_version, int64_t timeout_ms);
+
+    void notify_close_wait();
+
     Status check_each_node_channel_close(
             std::unordered_set<int64_t>* unfinished_node_channel_ids,
             std::unordered_map<int64_t, AddBatchCounter>* 
node_add_batch_counter_map,
@@ -614,6 +623,10 @@ private:
     std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>> 
_tablets_filtered_rows;
 
     int64_t _start_time = 0;
+
+    std::atomic<int64_t> _close_wait_version {0};
+    bthread::Mutex _close_wait_mutex;
+    bthread::ConditionVariable _close_wait_cv;
 };
 } // namespace doris
 
diff --git a/be/src/exec/sink/writer/vtablet_writer_v2.cpp 
b/be/src/exec/sink/writer/vtablet_writer_v2.cpp
index d15d445abda..817d30dbf59 100644
--- a/be/src/exec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/exec/sink/writer/vtablet_writer_v2.cpp
@@ -24,6 +24,7 @@
 #include <gen_cpp/Types_types.h>
 #include <gen_cpp/internal_service.pb.h>
 
+#include <algorithm>
 #include <cstdint>
 #include <mutex>
 #include <ranges>
@@ -58,6 +59,8 @@ namespace doris {
 
 extern bvar::Adder<int64_t> g_sink_load_back_pressure_version_time_ms;
 
+static constexpr int64_t CLOSE_WAIT_EVENT_FALLBACK_MS = 1000;
+
 VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const 
VExprContextSPtrs& output_exprs,
                                  std::shared_ptr<Dependency> dep,
                                  std::shared_ptr<Dependency> fin_dep)
@@ -821,6 +824,7 @@ Status VTabletWriterV2::_close_wait(
         }
     }
     while (true) {
+        int64_t close_wait_version = _load_stream_map->close_wait_version();
         RETURN_IF_ERROR(_check_timeout());
         RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, 
streams_for_node));
         bool quorum_success = _quorum_success(unfinished_streams, 
need_finish_tablets);
@@ -830,7 +834,7 @@ Status VTabletWriterV2::_close_wait(
                       << ", txn_id: " << _txn_id << ", load_id: " << 
print_id(_load_id);
             break;
         }
-        bthread_usleep(1000 * 10);
+        _load_stream_map->wait_for_close_event(close_wait_version, 
CLOSE_WAIT_EVENT_FALLBACK_MS);
     }
 
     // 2. then wait for remaining streams as much as possible
@@ -838,6 +842,7 @@ Status VTabletWriterV2::_close_wait(
         int64_t arrival_quorum_success_time = UnixMillis();
         int64_t max_wait_time_ms = _calc_max_wait_time_ms(streams_for_node, 
unfinished_streams);
         while (true) {
+            int64_t close_wait_version = 
_load_stream_map->close_wait_version();
             RETURN_IF_ERROR(_check_timeout());
             RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status, 
streams_for_node));
             if (unfinished_streams.empty()) {
@@ -856,7 +861,9 @@ Status VTabletWriterV2::_close_wait(
                              << ", unfinished streams: " << 
unfinished_streams_str.str();
                 break;
             }
-            bthread_usleep(1000 * 10);
+            _load_stream_map->wait_for_close_event(
+                    close_wait_version,
+                    std::min(CLOSE_WAIT_EVENT_FALLBACK_MS, max_wait_time_ms - 
elapsed_ms));
         }
     }
 
diff --git a/be/test/exec/sink/vtablet_writer_v2_test.cpp 
b/be/test/exec/sink/vtablet_writer_v2_test.cpp
index 8f09a3cb08f..7579d0e0ed7 100644
--- a/be/test/exec/sink/vtablet_writer_v2_test.cpp
+++ b/be/test/exec/sink/vtablet_writer_v2_test.cpp
@@ -281,6 +281,27 @@ TEST_F(TestVTabletWriterV2, 
shared_delta_writer_should_not_access_destroyed_crea
     current_writer->_cancel(Status::Cancelled("test cleanup"));
 }
 
+TEST_F(TestVTabletWriterV2, 
close_wait_notifier_should_be_scoped_to_load_stream_map) {
+    UniqueId load_id1;
+    UniqueId load_id2;
+    load_id2.lo = 1;
+    std::shared_ptr<LoadStreamMap> load_stream_map1 =
+            std::make_shared<LoadStreamMap>(load_id1, src_id, 1, 1, nullptr);
+    std::shared_ptr<LoadStreamMap> load_stream_map2 =
+            std::make_shared<LoadStreamMap>(load_id2, src_id, 1, 1, nullptr);
+    auto streams1 = load_stream_map1->get_or_create(1001);
+    auto streams2 = load_stream_map2->get_or_create(1002);
+    streams1->mark_open();
+    streams2->mark_open();
+
+    int64_t version1 = load_stream_map1->close_wait_version();
+    int64_t version2 = load_stream_map2->close_wait_version();
+    streams1->select_one_stream()->cancel(Status::Cancelled("test"));
+
+    ASSERT_GT(load_stream_map1->close_wait_version(), version1);
+    ASSERT_EQ(load_stream_map2->close_wait_version(), version2);
+}
+
 TEST_F(TestVTabletWriterV2, one_replica) {
     UniqueId load_id;
     std::vector<TTabletCommitInfo> tablet_commit_infos;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to