This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c1d8aab4050 [enhance](memory) consider memtable memory when sink 
operator back pressure (#60291)
c1d8aab4050 is described below

commit c1d8aab405050e99143635ba66d90602e33fe9fa
Author: hui lai <[email protected]>
AuthorDate: Wed Feb 4 20:30:58 2026 +0800

    [enhance](memory) consider memtable memory when sink operator back pressure 
(#60291)
    
    Due to memtable memory statistics are not included in the load memory
    statistics(for performance and memory control complexity), it need to
    add memtable memory when sink operator back pressure.
---
 be/src/vec/sink/writer/vtablet_writer.cpp | 29 +++++++++++++++++++++--------
 1 file changed, 21 insertions(+), 8 deletions(-)

diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp 
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 868607cb181..6b4f24d5878 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -748,15 +748,28 @@ Status VNodeChannel::add_block(vectorized::Block* block, 
const Payload* payload)
     // But there is still some unfinished things, we do mem limit here 
temporarily.
     // _cancelled may be set by rpc callback, and it's possible that 
_cancelled might be set in any of the steps below.
     // It's fine to do a fake add_block() and return OK, because we will check 
_cancelled in next add_block() or mark_close().
-    bool is_exceed_soft_mem_limit = 
GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
-    auto current_load_mem_value = MemoryProfile::load_current_usage();
-    bool mem_limit_exceeded = is_exceed_soft_mem_limit ||
-                              current_load_mem_value > _load_mem_limit ||
-                              _pending_batches_bytes > 
_max_pending_batches_bytes;
-    while (!_cancelled && !_state->is_cancelled() && _pending_batches_num > 0 
&&
-           mem_limit_exceeded) {
+    constexpr int64_t kBackPressureSleepMs = 10;
+    auto* memtable_limiter = ExecEnv::GetInstance()->memtable_memory_limiter();
+    while (true) {
+        bool is_exceed_soft_mem_limit = 
GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
+        int64_t memtable_mem =
+                (memtable_limiter != nullptr && 
memtable_limiter->mem_tracker() != nullptr)
+                        ? memtable_limiter->mem_tracker()->consumption()
+                        : 0;
+        // Note: Memtable memory is not included in load memory statistics 
(MemoryProfile::load_current_usage())
+        // for performance and memory control complexity reasons. Therefore, 
we explicitly add memtable memory
+        // consumption here to ensure accurate back pressure decisions and 
prevent OOM during heavy loads.
+        auto current_load_mem_value = MemoryProfile::load_current_usage() + 
memtable_mem;
+        bool mem_limit_exceeded = is_exceed_soft_mem_limit ||
+                                  current_load_mem_value > _load_mem_limit ||
+                                  _pending_batches_bytes > 
_max_pending_batches_bytes;
+        bool need_back_pressure = !_cancelled && !_state->is_cancelled() &&
+                                  _pending_batches_num > 0 && 
mem_limit_exceeded;
+        if (!need_back_pressure) {
+            break;
+        }
         SCOPED_RAW_TIMER(&_stat.mem_exceeded_block_ns);
-        std::this_thread::sleep_for(std::chrono::milliseconds(10));
+        
std::this_thread::sleep_for(std::chrono::milliseconds(kBackPressureSleepMs));
     }
 
     if (UNLIKELY(!_cur_mutable_block)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to