This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 5d86ae6ccc9 branch-4.0: [enhance](memory) consider memtable memory
when sink operator back pressure #60291 (#60502)
5d86ae6ccc9 is described below
commit 5d86ae6ccc96cd0b88394bca0514ee3fb1687276
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Feb 5 09:29:41 2026 +0800
branch-4.0: [enhance](memory) consider memtable memory when sink operator
back pressure #60291 (#60502)
Cherry-picked from #60291
Co-authored-by: hui lai <[email protected]>
---
be/src/vec/sink/writer/vtablet_writer.cpp | 29 +++++++++++++++++++++--------
1 file changed, 21 insertions(+), 8 deletions(-)
diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp
b/be/src/vec/sink/writer/vtablet_writer.cpp
index 4864ad79261..270b710634c 100644
--- a/be/src/vec/sink/writer/vtablet_writer.cpp
+++ b/be/src/vec/sink/writer/vtablet_writer.cpp
@@ -748,15 +748,28 @@ Status VNodeChannel::add_block(vectorized::Block* block,
const Payload* payload)
// But there is still some unfinished things, we do mem limit here
temporarily.
// _cancelled may be set by rpc callback, and it's possible that
_cancelled might be set in any of the steps below.
// It's fine to do a fake add_block() and return OK, because we will check
_cancelled in next add_block() or mark_close().
- bool is_exceed_soft_mem_limit =
GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
- auto current_load_mem_value = MemoryProfile::load_current_usage();
- bool mem_limit_exceeded = is_exceed_soft_mem_limit ||
- current_load_mem_value > _load_mem_limit ||
- _pending_batches_bytes >
_max_pending_batches_bytes;
- while (!_cancelled && !_state->is_cancelled() && _pending_batches_num > 0
&&
- mem_limit_exceeded) {
+ constexpr int64_t kBackPressureSleepMs = 10;
+ auto* memtable_limiter = ExecEnv::GetInstance()->memtable_memory_limiter();
+ while (true) {
+ bool is_exceed_soft_mem_limit =
GlobalMemoryArbitrator::is_exceed_soft_mem_limit();
+ int64_t memtable_mem =
+ (memtable_limiter != nullptr &&
memtable_limiter->mem_tracker() != nullptr)
+ ? memtable_limiter->mem_tracker()->consumption()
+ : 0;
+ // Note: Memtable memory is not included in load memory statistics
(MemoryProfile::load_current_usage())
+ // for performance and memory control complexity reasons. Therefore,
we explicitly add memtable memory
+ // consumption here to ensure accurate back pressure decisions and
prevent OOM during heavy loads.
+ auto current_load_mem_value = MemoryProfile::load_current_usage() +
memtable_mem;
+ bool mem_limit_exceeded = is_exceed_soft_mem_limit ||
+ current_load_mem_value > _load_mem_limit ||
+ _pending_batches_bytes >
_max_pending_batches_bytes;
+ bool need_back_pressure = !_cancelled && !_state->is_cancelled() &&
+ _pending_batches_num > 0 &&
mem_limit_exceeded;
+ if (!need_back_pressure) {
+ break;
+ }
SCOPED_RAW_TIMER(&_stat.mem_exceeded_block_ns);
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
+
std::this_thread::sleep_for(std::chrono::milliseconds(kBackPressureSleepMs));
}
if (UNLIKELY(!_cur_mutable_block)) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]