This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit b31115f51ab599a3504d41a66ef7be3162bdeef5 Author: Xinyi Zou <[email protected]> AuthorDate: Thu Feb 23 08:33:30 2023 +0800 [fix](memory) fix memory GC details and join probe catch bad_alloc (#16989) Fix Redhat 4.x OS /proc/meminfo has no MemAvailable, disable MemAvailable to control memory. vm_rss_str and mem_available_str recorded when gc is triggered, to avoid memory changes during gc and cause inaccurate logs. join probe catch bad_alloc, this may alloc 64G memory at a time, avoid OOM. Modify document doris_be_all_segments_num and doris_be_all_rowsets_num names. --- be/src/common/daemon.cpp | 4 +- be/src/runtime/memory/mem_tracker_limiter.cpp | 17 +++--- be/src/runtime/memory/mem_tracker_limiter.h | 19 ++++-- be/src/util/doris_metrics.cpp | 8 +-- be/src/util/doris_metrics.h | 4 +- be/src/util/mem_info.cpp | 67 +++++++++++++--------- .../vec/exec/join/process_hash_table_probe_impl.h | 10 ++-- be/src/vec/exec/join/vhash_join_node.cpp | 4 +- .../maint-monitor/monitor-metrics/metrics.md | 4 +- 9 files changed, 82 insertions(+), 55 deletions(-) diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index a1a98f11b2..4472c8fa91 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -341,9 +341,9 @@ void Daemon::calculate_metrics_thread() { &lst_net_send_bytes, &lst_net_receive_bytes); } - DorisMetrics::instance()->all_rowset_nums->set_value( + DorisMetrics::instance()->all_rowsets_num->set_value( StorageEngine::instance()->tablet_manager()->get_rowset_nums()); - DorisMetrics::instance()->all_segment_nums->set_value( + DorisMetrics::instance()->all_segments_num->set_value( StorageEngine::instance()->tablet_manager()->get_segment_nums()); } } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(15))); diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 1478a5704f..795aeccc94 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -248,7 +248,9 @@ Status MemTrackerLimiter::fragment_mem_limit_exceeded(RuntimeState* state, const return Status::MemoryLimitExceeded(failed_msg); } -int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, Type type) { +int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, + const std::string& vm_rss_str, + const std::string& mem_available_str, Type type) { std::priority_queue<std::pair<int64_t, std::string>, std::vector<std::pair<int64_t, std::string>>, std::greater<std::pair<int64_t, std::string>>> @@ -274,8 +276,7 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, Type type "details see be.INFO.", TypeString[type], TypeString[type], min_pq.top().second, print_bytes(min_pq.top().first), BackendOptions::get_localhost(), - PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(), - MemInfo::sys_mem_available_str(), + vm_rss_str, MemInfo::mem_limit_str(), mem_available_str, print_bytes(MemInfo::sys_mem_available_low_water_mark()))); freed_mem += min_pq.top().first; @@ -319,7 +320,10 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, Type type return cancel_top_query(min_pq); } -int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, Type type) { +int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, + const std::string& vm_rss_str, + const std::string& mem_available_str, + Type type) { std::priority_queue<std::pair<int64_t, std::string>, std::vector<std::pair<int64_t, std::string>>, std::greater<std::pair<int64_t, std::string>>> @@ -370,9 +374,8 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, Type "less than warning water mark {}. Execute again after enough memory, " "details see be.INFO.", TypeString[type], TypeString[type], max_pq.top().second, - print_bytes(query_mem), BackendOptions::get_localhost(), - PerfCounters::get_vm_rss_str(), MemInfo::soft_mem_limit_str(), - MemInfo::sys_mem_available_str(), + print_bytes(query_mem), BackendOptions::get_localhost(), vm_rss_str, + MemInfo::soft_mem_limit_str(), mem_available_str, print_bytes(MemInfo::sys_mem_available_warning_water_mark()))); usage_strings.push_back(fmt::format("{} memory usage {} Bytes, overcommit ratio: {}", diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 0b66a5c7fb..5e705d3516 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -145,15 +145,22 @@ public: int64_t failed_allocation_size = 0); // Start canceling from the query with the largest memory usage until the memory of min_free_mem size is freed. - static int64_t free_top_memory_query(int64_t min_free_mem, Type type = Type::QUERY); - static int64_t free_top_memory_load(int64_t min_free_mem) { - return free_top_memory_query(min_free_mem, Type::LOAD); + // vm_rss_str and mem_available_str recorded when gc is triggered, for log printing. + static int64_t free_top_memory_query(int64_t min_free_mem, const std::string& vm_rss_str, + const std::string& mem_available_str, + Type type = Type::QUERY); + static int64_t free_top_memory_load(int64_t min_free_mem, const std::string& vm_rss_str, + const std::string& mem_available_str) { + return free_top_memory_query(min_free_mem, vm_rss_str, mem_available_str, Type::LOAD); } // Start canceling from the query with the largest memory overcommit ratio until the memory // of min_free_mem size is freed. - static int64_t free_top_overcommit_query(int64_t min_free_mem, Type type = Type::QUERY); - static int64_t free_top_overcommit_load(int64_t min_free_mem) { - return free_top_overcommit_query(min_free_mem, Type::LOAD); + static int64_t free_top_overcommit_query(int64_t min_free_mem, const std::string& vm_rss_str, + const std::string& mem_available_str, + Type type = Type::QUERY); + static int64_t free_top_overcommit_load(int64_t min_free_mem, const std::string& vm_rss_str, + const std::string& mem_available_str) { + return free_top_overcommit_query(min_free_mem, vm_rss_str, mem_available_str, Type::LOAD); } // only for Type::QUERY or Type::LOAD. static TUniqueId label_to_queryid(const std::string& label) { diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp index 586a66fbb9..f26d2aeb1c 100644 --- a/be/src/util/doris_metrics.cpp +++ b/be/src/util/doris_metrics.cpp @@ -136,8 +136,8 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(process_fd_num_limit_hard, MetricUnit::NOUNIT DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_cumulative_max_compaction_score, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_base_max_compaction_score, MetricUnit::NOUNIT); -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(all_rowset_nums, MetricUnit::NOUNIT); -DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(all_segment_nums, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(all_rowsets_num, MetricUnit::NOUNIT); +DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(all_segments_num, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(compaction_used_permits, MetricUnit::NOUNIT); DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(compaction_waitting_permits, MetricUnit::NOUNIT); @@ -262,8 +262,8 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) { INT_GAUGE_METRIC_REGISTER(_server_metric_entity, tablet_cumulative_max_compaction_score); INT_GAUGE_METRIC_REGISTER(_server_metric_entity, tablet_base_max_compaction_score); - INT_GAUGE_METRIC_REGISTER(_server_metric_entity, all_rowset_nums); - INT_GAUGE_METRIC_REGISTER(_server_metric_entity, all_segment_nums); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, all_rowsets_num); + INT_GAUGE_METRIC_REGISTER(_server_metric_entity, all_segments_num); INT_GAUGE_METRIC_REGISTER(_server_metric_entity, compaction_used_permits); INT_GAUGE_METRIC_REGISTER(_server_metric_entity, compaction_waitting_permits); diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h index d5bd97fbe2..648414f3e1 100644 --- a/be/src/util/doris_metrics.h +++ b/be/src/util/doris_metrics.h @@ -137,8 +137,8 @@ public: IntGauge* tablet_cumulative_max_compaction_score; IntGauge* tablet_base_max_compaction_score; - IntGauge* all_rowset_nums; - IntGauge* all_segment_nums; + IntGauge* all_rowsets_num; + IntGauge* all_segments_num; // permits have been used for all compaction tasks IntGauge* compaction_used_permits; diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 3aa6854fa4..5fb42611ed 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -57,10 +57,10 @@ int64_t MemInfo::_s_proc_mem_no_allocator_cache = -1; std::atomic<int64_t> MemInfo::refresh_interval_memory_growth = 0; static std::unordered_map<std::string, int64_t> _mem_info_bytes; -int64_t MemInfo::_s_sys_mem_available = 0; +int64_t MemInfo::_s_sys_mem_available = -1; std::string MemInfo::_s_sys_mem_available_str = ""; -int64_t MemInfo::_s_sys_mem_available_low_water_mark = 0; -int64_t MemInfo::_s_sys_mem_available_warning_water_mark = 0; +int64_t MemInfo::_s_sys_mem_available_low_water_mark = -1; +int64_t MemInfo::_s_sys_mem_available_warning_water_mark = -1; int64_t MemInfo::_s_process_minor_gc_size = -1; int64_t MemInfo::_s_process_full_gc_size = -1; @@ -106,6 +106,9 @@ void MemInfo::process_cache_gc(int64_t& freed_mem) { // step2: free top overcommit query, if enable query memroy overcommit bool MemInfo::process_minor_gc() { int64_t freed_mem = 0; + std::string vm_rss_str = PerfCounters::get_vm_rss_str(); + std::string mem_available_str = MemInfo::sys_mem_available_str(); + Defer defer {[&]() { LOG(INFO) << fmt::format("Process Minor GC Free Memory {} Bytes", freed_mem); }}; @@ -115,8 +118,8 @@ bool MemInfo::process_minor_gc() { return true; } if (config::enable_query_memroy_overcommit) { - freed_mem += - MemTrackerLimiter::free_top_overcommit_query(_s_process_minor_gc_size - freed_mem); + freed_mem += MemTrackerLimiter::free_top_overcommit_query( + _s_process_minor_gc_size - freed_mem, vm_rss_str, mem_available_str); } if (freed_mem > _s_process_minor_gc_size) { return true; @@ -130,6 +133,9 @@ bool MemInfo::process_minor_gc() { // step4: free top memory load bool MemInfo::process_full_gc() { int64_t freed_mem = 0; + std::string vm_rss_str = PerfCounters::get_vm_rss_str(); + std::string mem_available_str = MemInfo::sys_mem_available_str(); + Defer defer { [&]() { LOG(INFO) << fmt::format("Process Full GC Free Memory {} Bytes", freed_mem); }}; @@ -142,18 +148,20 @@ bool MemInfo::process_full_gc() { if (freed_mem > _s_process_full_gc_size) { return true; } - freed_mem += MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem); + freed_mem += MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem, + vm_rss_str, mem_available_str); if (freed_mem > _s_process_full_gc_size) { return true; } if (config::enable_query_memroy_overcommit) { - freed_mem += - MemTrackerLimiter::free_top_overcommit_load(_s_process_full_gc_size - freed_mem); + freed_mem += MemTrackerLimiter::free_top_overcommit_load( + _s_process_full_gc_size - freed_mem, vm_rss_str, mem_available_str); if (freed_mem > _s_process_full_gc_size) { return true; } } - freed_mem += MemTrackerLimiter::free_top_memory_load(_s_process_full_gc_size - freed_mem); + freed_mem += MemTrackerLimiter::free_top_memory_load(_s_process_full_gc_size - freed_mem, + vm_rss_str, mem_available_str); if (freed_mem > _s_process_full_gc_size) { return true; } @@ -185,8 +193,10 @@ void MemInfo::refresh_proc_meminfo() { } if (meminfo.is_open()) meminfo.close(); - _s_sys_mem_available = _mem_info_bytes["MemAvailable"]; - _s_sys_mem_available_str = PrettyPrinter::print(_s_sys_mem_available, TUnit::BYTES); + if (_mem_info_bytes.find("MemAvailable") != _mem_info_bytes.end()) { + _s_sys_mem_available = _mem_info_bytes["MemAvailable"]; + _s_sys_mem_available_str = PrettyPrinter::print(_s_sys_mem_available, TUnit::BYTES); + } } void MemInfo::init() { @@ -239,22 +249,25 @@ void MemInfo::init() { } if (vminfo.is_open()) vminfo.close(); - // MemAvailable = MemFree - LowWaterMark + (PageCache - min(PageCache / 2, LowWaterMark)) - // LowWaterMark = /proc/sys/vm/min_free_kbytes - // Ref: - // https://serverfault.com/questions/940196/why-is-memavailable-a-lot-less-than-memfreebufferscached - // https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=34e431b0ae398fc54ea69ff85ec700722c9da773 - // - // available_low_water_mark = p1 - p2 - // p1: upper sys_mem_available_low_water_mark, avoid wasting too much memory. - // p2: vm/min_free_kbytes is usually 0.4% - 5% of the total memory, some cloud machines vm/min_free_kbytes is 5%, - // in order to avoid wasting too much memory, available_low_water_mark minus 1% at most. - int64_t p1 = std::min<int64_t>( - std::min<int64_t>(_s_physical_mem - _s_mem_limit, _s_physical_mem * 0.1), - config::max_sys_mem_available_low_water_mark_bytes); - int64_t p2 = std::max<int64_t>(_s_vm_min_free_kbytes - _s_physical_mem * 0.01, 0); - _s_sys_mem_available_low_water_mark = std::max<int64_t>(p1 - p2, 0); - _s_sys_mem_available_warning_water_mark = _s_sys_mem_available_low_water_mark + p1; + // Redhat 4.x OS, `/proc/meminfo` has no `MemAvailable`. + if (_mem_info_bytes.find("MemAvailable") != _mem_info_bytes.end()) { + // MemAvailable = MemFree - LowWaterMark + (PageCache - min(PageCache / 2, LowWaterMark)) + // LowWaterMark = /proc/sys/vm/min_free_kbytes + // Ref: + // https://serverfault.com/questions/940196/why-is-memavailable-a-lot-less-than-memfreebufferscached + // https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=34e431b0ae398fc54ea69ff85ec700722c9da773 + // + // available_low_water_mark = p1 - p2 + // p1: upper sys_mem_available_low_water_mark, avoid wasting too much memory. + // p2: vm/min_free_kbytes is usually 0.4% - 5% of the total memory, some cloud machines vm/min_free_kbytes is 5%, + // in order to avoid wasting too much memory, available_low_water_mark minus 1% at most. + int64_t p1 = std::min<int64_t>( + std::min<int64_t>(_s_physical_mem - _s_mem_limit, _s_physical_mem * 0.1), + config::max_sys_mem_available_low_water_mark_bytes); + int64_t p2 = std::max<int64_t>(_s_vm_min_free_kbytes - _s_physical_mem * 0.01, 0); + _s_sys_mem_available_low_water_mark = std::max<int64_t>(p1 - p2, 0); + _s_sys_mem_available_warning_water_mark = _s_sys_mem_available_low_water_mark + p1; + } LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES) << ", Mem Limit: " << _s_mem_limit_str diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h b/be/src/vec/exec/join/process_hash_table_probe_impl.h index 8b8c5f6ba2..93ef553b0d 100644 --- a/be/src/vec/exec/join/process_hash_table_probe_impl.h +++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h @@ -386,8 +386,9 @@ Status ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN && JoinOpType != TJoinOp::RIGHT_ANTI_JOIN) { SCOPED_TIMER(_probe_side_output_timer); - probe_side_output_column(mcol, _join_node->_left_output_slot_flags, current_offset, - last_probe_index, probe_size, all_match_one, false); + RETURN_IF_CATCH_BAD_ALLOC( + probe_side_output_column(mcol, _join_node->_left_output_slot_flags, current_offset, + last_probe_index, probe_size, all_match_one, false)); } output_block->swap(mutable_block.to_block()); @@ -652,8 +653,9 @@ Status ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts( } { SCOPED_TIMER(_probe_side_output_timer); - probe_side_output_column(mcol, _join_node->_left_output_slot_flags, current_offset, - last_probe_index, probe_size, all_match_one, true); + RETURN_IF_CATCH_BAD_ALLOC(probe_side_output_column( + mcol, _join_node->_left_output_slot_flags, current_offset, last_probe_index, + probe_size, all_match_one, true)); } auto num_cols = mutable_block.columns(); output_block->swap(mutable_block.to_block()); diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index a5d1430c25..e7ff1f474e 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -559,7 +559,9 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo } else { return Status::OK(); } - + if (!st) { + return st; + } if (_is_outer_join) { _add_tuple_is_null_column(&temp_block); } diff --git a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md index 791a66e8c7..464c59c309 100644 --- a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md +++ b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md @@ -293,8 +293,8 @@ curl http://be_host:webserver_port/metrics?type=json |`doris_be_load_bytes`| | 字节|通过 tablet sink 发送的数量累计 | 可观测导入数据量 | P0 | |`doris_be_load_rows`| | Num | 通过 tablet sink 发送的行数累计| 可观测导入数据量 | P0 | |`fragment_thread_pool_queue_size`| | Num | 当前查询执行线程池等待队列的长度 | 如果大于零,则说明查询线程已耗尽,查询会出现堆积 | P0 | -|`doris_be_all_rowset_nums`| | Num | 当前所有 rowset 的个数 | | P0 | -|`doris_be_all_segment_nums`| | Num | 当前所有 segment 的个数 | | P0 | +|`doris_be_all_rowsets_num`| | Num | 当前所有 rowset 的个数 | | P0 | +|`doris_be_all_segments_num`| | Num | 当前所有 segment 的个数 | | P0 | |`doris_be_heavy_work_max_threads`| | Num | brpc heavy线程池线程个数| | p0 | |`doris_be_light_work_max_threads`| | Num | brpc light线程池线程个数| | p0 | |`doris_be_heavy_work_pool_queue_size`| | Num | brpc heavy线程池队列最大长度,超过则阻塞提交work| | p0 | --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
