This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a1c0054b4c [fix](memory) fix memory GC details and join probe catch
bad_alloc (#16989)
a1c0054b4c is described below
commit a1c0054b4cbc93a77e38c7245a4b3eee8261e430
Author: Xinyi Zou <[email protected]>
AuthorDate: Thu Feb 23 08:33:30 2023 +0800
[fix](memory) fix memory GC details and join probe catch bad_alloc (#16989)
Fix Redhat 4.x OS /proc/meminfo has no MemAvailable, disable MemAvailable
to control memory.
vm_rss_str and mem_available_str recorded when gc is triggered, to avoid
memory changes during gc and cause inaccurate logs.
join probe catch bad_alloc, this may alloc 64G memory at a time, avoid OOM.
Modify document doris_be_all_segments_num and doris_be_all_rowsets_num
names.
---
be/src/common/daemon.cpp | 4 +-
be/src/runtime/memory/mem_tracker_limiter.cpp | 17 +++---
be/src/runtime/memory/mem_tracker_limiter.h | 19 ++++--
be/src/util/doris_metrics.cpp | 8 +--
be/src/util/doris_metrics.h | 4 +-
be/src/util/mem_info.cpp | 67 +++++++++++++---------
.../vec/exec/join/process_hash_table_probe_impl.h | 10 ++--
be/src/vec/exec/join/vhash_join_node.cpp | 3 +
docs/en/docs/releasenotes/release-1.2.2.md | 2 +-
.../maint-monitor/monitor-metrics/metrics.md | 4 +-
docs/zh-CN/docs/releasenotes/release-1.2.2.md | 2 +-
11 files changed, 84 insertions(+), 56 deletions(-)
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 7fd0368ced..34ad11f337 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -305,9 +305,9 @@ void Daemon::calculate_metrics_thread() {
&lst_net_send_bytes, &lst_net_receive_bytes);
}
- DorisMetrics::instance()->all_rowset_nums->set_value(
+ DorisMetrics::instance()->all_rowsets_num->set_value(
StorageEngine::instance()->tablet_manager()->get_rowset_nums());
- DorisMetrics::instance()->all_segment_nums->set_value(
+ DorisMetrics::instance()->all_segments_num->set_value(
StorageEngine::instance()->tablet_manager()->get_segment_nums());
}
} while
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(15)));
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 938f72d2a0..9ce82ca585 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -249,7 +249,9 @@ Status
MemTrackerLimiter::fragment_mem_limit_exceeded(RuntimeState* state, const
return Status::MemoryLimitExceeded(failed_msg);
}
-int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem, Type
type) {
+int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem,
+ const std::string& vm_rss_str,
+ const std::string&
mem_available_str, Type type) {
std::priority_queue<std::pair<int64_t, std::string>,
std::vector<std::pair<int64_t, std::string>>,
std::greater<std::pair<int64_t, std::string>>>
@@ -275,8 +277,7 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t
min_free_mem, Type type
"details see be.INFO.",
TypeString[type], TypeString[type],
min_pq.top().second,
print_bytes(min_pq.top().first),
BackendOptions::get_localhost(),
- PerfCounters::get_vm_rss_str(),
MemInfo::mem_limit_str(),
- MemInfo::sys_mem_available_str(),
+ vm_rss_str, MemInfo::mem_limit_str(),
mem_available_str,
print_bytes(MemInfo::sys_mem_available_low_water_mark())));
freed_mem += min_pq.top().first;
@@ -320,7 +321,10 @@ int64_t MemTrackerLimiter::free_top_memory_query(int64_t
min_free_mem, Type type
return cancel_top_query(min_pq);
}
-int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
Type type) {
+int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem,
+ const std::string&
vm_rss_str,
+ const std::string&
mem_available_str,
+ Type type) {
std::priority_queue<std::pair<int64_t, std::string>,
std::vector<std::pair<int64_t, std::string>>,
std::greater<std::pair<int64_t, std::string>>>
@@ -371,9 +375,8 @@ int64_t
MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem, Type
"less than warning water mark {}. Execute again
after enough memory, "
"details see be.INFO.",
TypeString[type], TypeString[type],
max_pq.top().second,
- print_bytes(query_mem),
BackendOptions::get_localhost(),
- PerfCounters::get_vm_rss_str(),
MemInfo::soft_mem_limit_str(),
- MemInfo::sys_mem_available_str(),
+ print_bytes(query_mem),
BackendOptions::get_localhost(), vm_rss_str,
+ MemInfo::soft_mem_limit_str(), mem_available_str,
print_bytes(MemInfo::sys_mem_available_warning_water_mark())));
usage_strings.push_back(fmt::format("{} memory usage {} Bytes,
overcommit ratio: {}",
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index 3a383cf573..e1713dc568 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -146,15 +146,22 @@ public:
int64_t failed_allocation_size = 0);
// Start canceling from the query with the largest memory usage until the
memory of min_free_mem size is freed.
- static int64_t free_top_memory_query(int64_t min_free_mem, Type type =
Type::QUERY);
- static int64_t free_top_memory_load(int64_t min_free_mem) {
- return free_top_memory_query(min_free_mem, Type::LOAD);
+ // vm_rss_str and mem_available_str recorded when gc is triggered, for log
printing.
+ static int64_t free_top_memory_query(int64_t min_free_mem, const
std::string& vm_rss_str,
+ const std::string& mem_available_str,
+ Type type = Type::QUERY);
+ static int64_t free_top_memory_load(int64_t min_free_mem, const
std::string& vm_rss_str,
+ const std::string& mem_available_str) {
+ return free_top_memory_query(min_free_mem, vm_rss_str,
mem_available_str, Type::LOAD);
}
// Start canceling from the query with the largest memory overcommit ratio
until the memory
// of min_free_mem size is freed.
- static int64_t free_top_overcommit_query(int64_t min_free_mem, Type type =
Type::QUERY);
- static int64_t free_top_overcommit_load(int64_t min_free_mem) {
- return free_top_overcommit_query(min_free_mem, Type::LOAD);
+ static int64_t free_top_overcommit_query(int64_t min_free_mem, const
std::string& vm_rss_str,
+ const std::string&
mem_available_str,
+ Type type = Type::QUERY);
+ static int64_t free_top_overcommit_load(int64_t min_free_mem, const
std::string& vm_rss_str,
+ const std::string&
mem_available_str) {
+ return free_top_overcommit_query(min_free_mem, vm_rss_str,
mem_available_str, Type::LOAD);
}
// only for Type::QUERY or Type::LOAD.
static TUniqueId label_to_queryid(const std::string& label) {
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 5734e6a26a..2e36eed124 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -138,8 +138,8 @@
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(process_fd_num_limit_hard, MetricUnit::NOUNIT
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_cumulative_max_compaction_score,
MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_base_max_compaction_score,
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(all_rowset_nums, MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(all_segment_nums, MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(all_rowsets_num, MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(all_segments_num, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(compaction_used_permits,
MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(compaction_waitting_permits,
MetricUnit::NOUNIT);
@@ -273,8 +273,8 @@ DorisMetrics::DorisMetrics() :
_metric_registry(_s_registry_name) {
INT_GAUGE_METRIC_REGISTER(_server_metric_entity,
tablet_cumulative_max_compaction_score);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity,
tablet_base_max_compaction_score);
- INT_GAUGE_METRIC_REGISTER(_server_metric_entity, all_rowset_nums);
- INT_GAUGE_METRIC_REGISTER(_server_metric_entity, all_segment_nums);
+ INT_GAUGE_METRIC_REGISTER(_server_metric_entity, all_rowsets_num);
+ INT_GAUGE_METRIC_REGISTER(_server_metric_entity, all_segments_num);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity, compaction_used_permits);
INT_GAUGE_METRIC_REGISTER(_server_metric_entity,
compaction_waitting_permits);
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 4982862035..516e662860 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -139,8 +139,8 @@ public:
IntGauge* tablet_cumulative_max_compaction_score;
IntGauge* tablet_base_max_compaction_score;
- IntGauge* all_rowset_nums;
- IntGauge* all_segment_nums;
+ IntGauge* all_rowsets_num;
+ IntGauge* all_segments_num;
// permits have been used for all compaction tasks
IntGauge* compaction_used_permits;
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 8c995b3b31..99afc5b28f 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -57,10 +57,10 @@ int64_t MemInfo::_s_proc_mem_no_allocator_cache = -1;
std::atomic<int64_t> MemInfo::refresh_interval_memory_growth = 0;
static std::unordered_map<std::string, int64_t> _mem_info_bytes;
-int64_t MemInfo::_s_sys_mem_available = 0;
+int64_t MemInfo::_s_sys_mem_available = -1;
std::string MemInfo::_s_sys_mem_available_str = "";
-int64_t MemInfo::_s_sys_mem_available_low_water_mark = 0;
-int64_t MemInfo::_s_sys_mem_available_warning_water_mark = 0;
+int64_t MemInfo::_s_sys_mem_available_low_water_mark = -1;
+int64_t MemInfo::_s_sys_mem_available_warning_water_mark = -1;
int64_t MemInfo::_s_process_minor_gc_size = -1;
int64_t MemInfo::_s_process_full_gc_size = -1;
@@ -106,6 +106,9 @@ void MemInfo::process_cache_gc(int64_t& freed_mem) {
// step2: free top overcommit query, if enable query memroy overcommit
bool MemInfo::process_minor_gc() {
int64_t freed_mem = 0;
+ std::string vm_rss_str = PerfCounters::get_vm_rss_str();
+ std::string mem_available_str = MemInfo::sys_mem_available_str();
+
Defer defer {[&]() {
LOG(INFO) << fmt::format("Process Minor GC Free Memory {} Bytes",
freed_mem);
}};
@@ -115,8 +118,8 @@ bool MemInfo::process_minor_gc() {
return true;
}
if (config::enable_query_memroy_overcommit) {
- freed_mem +=
-
MemTrackerLimiter::free_top_overcommit_query(_s_process_minor_gc_size -
freed_mem);
+ freed_mem += MemTrackerLimiter::free_top_overcommit_query(
+ _s_process_minor_gc_size - freed_mem, vm_rss_str,
mem_available_str);
}
if (freed_mem > _s_process_minor_gc_size) {
return true;
@@ -130,6 +133,9 @@ bool MemInfo::process_minor_gc() {
// step4: free top memory load
bool MemInfo::process_full_gc() {
int64_t freed_mem = 0;
+ std::string vm_rss_str = PerfCounters::get_vm_rss_str();
+ std::string mem_available_str = MemInfo::sys_mem_available_str();
+
Defer defer {
[&]() { LOG(INFO) << fmt::format("Process Full GC Free Memory {}
Bytes", freed_mem); }};
@@ -142,18 +148,20 @@ bool MemInfo::process_full_gc() {
if (freed_mem > _s_process_full_gc_size) {
return true;
}
- freed_mem +=
MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem);
+ freed_mem +=
MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem,
+ vm_rss_str,
mem_available_str);
if (freed_mem > _s_process_full_gc_size) {
return true;
}
if (config::enable_query_memroy_overcommit) {
- freed_mem +=
-
MemTrackerLimiter::free_top_overcommit_load(_s_process_full_gc_size -
freed_mem);
+ freed_mem += MemTrackerLimiter::free_top_overcommit_load(
+ _s_process_full_gc_size - freed_mem, vm_rss_str,
mem_available_str);
if (freed_mem > _s_process_full_gc_size) {
return true;
}
}
- freed_mem +=
MemTrackerLimiter::free_top_memory_load(_s_process_full_gc_size - freed_mem);
+ freed_mem +=
MemTrackerLimiter::free_top_memory_load(_s_process_full_gc_size - freed_mem,
+ vm_rss_str,
mem_available_str);
if (freed_mem > _s_process_full_gc_size) {
return true;
}
@@ -185,8 +193,10 @@ void MemInfo::refresh_proc_meminfo() {
}
if (meminfo.is_open()) meminfo.close();
- _s_sys_mem_available = _mem_info_bytes["MemAvailable"];
- _s_sys_mem_available_str = PrettyPrinter::print(_s_sys_mem_available,
TUnit::BYTES);
+ if (_mem_info_bytes.find("MemAvailable") != _mem_info_bytes.end()) {
+ _s_sys_mem_available = _mem_info_bytes["MemAvailable"];
+ _s_sys_mem_available_str = PrettyPrinter::print(_s_sys_mem_available,
TUnit::BYTES);
+ }
}
void MemInfo::init() {
@@ -239,22 +249,25 @@ void MemInfo::init() {
}
if (vminfo.is_open()) vminfo.close();
- // MemAvailable = MemFree - LowWaterMark + (PageCache - min(PageCache / 2,
LowWaterMark))
- // LowWaterMark = /proc/sys/vm/min_free_kbytes
- // Ref:
- //
https://serverfault.com/questions/940196/why-is-memavailable-a-lot-less-than-memfreebufferscached
- //
https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=34e431b0ae398fc54ea69ff85ec700722c9da773
- //
- // available_low_water_mark = p1 - p2
- // p1: upper sys_mem_available_low_water_mark, avoid wasting too much
memory.
- // p2: vm/min_free_kbytes is usually 0.4% - 5% of the total memory, some
cloud machines vm/min_free_kbytes is 5%,
- // in order to avoid wasting too much memory, available_low_water_mark
minus 1% at most.
- int64_t p1 = std::min<int64_t>(
- std::min<int64_t>(_s_physical_mem - _s_mem_limit, _s_physical_mem
* 0.1),
- config::max_sys_mem_available_low_water_mark_bytes);
- int64_t p2 = std::max<int64_t>(_s_vm_min_free_kbytes - _s_physical_mem *
0.01, 0);
- _s_sys_mem_available_low_water_mark = std::max<int64_t>(p1 - p2, 0);
- _s_sys_mem_available_warning_water_mark =
_s_sys_mem_available_low_water_mark + p1;
+ // Redhat 4.x OS, `/proc/meminfo` has no `MemAvailable`.
+ if (_mem_info_bytes.find("MemAvailable") != _mem_info_bytes.end()) {
+ // MemAvailable = MemFree - LowWaterMark + (PageCache - min(PageCache
/ 2, LowWaterMark))
+ // LowWaterMark = /proc/sys/vm/min_free_kbytes
+ // Ref:
+ //
https://serverfault.com/questions/940196/why-is-memavailable-a-lot-less-than-memfreebufferscached
+ //
https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/commit/?id=34e431b0ae398fc54ea69ff85ec700722c9da773
+ //
+ // available_low_water_mark = p1 - p2
+ // p1: upper sys_mem_available_low_water_mark, avoid wasting too much
memory.
+ // p2: vm/min_free_kbytes is usually 0.4% - 5% of the total memory,
some cloud machines vm/min_free_kbytes is 5%,
+ // in order to avoid wasting too much memory,
available_low_water_mark minus 1% at most.
+ int64_t p1 = std::min<int64_t>(
+ std::min<int64_t>(_s_physical_mem - _s_mem_limit,
_s_physical_mem * 0.1),
+ config::max_sys_mem_available_low_water_mark_bytes);
+ int64_t p2 = std::max<int64_t>(_s_vm_min_free_kbytes - _s_physical_mem
* 0.01, 0);
+ _s_sys_mem_available_low_water_mark = std::max<int64_t>(p1 - p2, 0);
+ _s_sys_mem_available_warning_water_mark =
_s_sys_mem_available_low_water_mark + p1;
+ }
LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem,
TUnit::BYTES)
<< ", Mem Limit: " << _s_mem_limit_str
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index 4e4a34e43d..fa23105c1e 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -392,8 +392,9 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN &&
JoinOpType != TJoinOp::RIGHT_ANTI_JOIN) {
SCOPED_TIMER(_probe_side_output_timer);
- probe_side_output_column(mcol, _join_node->_left_output_slot_flags,
current_offset,
- last_probe_index, probe_size, all_match_one,
false);
+ RETURN_IF_CATCH_BAD_ALLOC(
+ probe_side_output_column(mcol,
_join_node->_left_output_slot_flags, current_offset,
+ last_probe_index, probe_size,
all_match_one, false));
}
output_block->swap(mutable_block.to_block());
@@ -664,8 +665,9 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
}
{
SCOPED_TIMER(_probe_side_output_timer);
- probe_side_output_column(mcol,
_join_node->_left_output_slot_flags, current_offset,
- last_probe_index, probe_size,
all_match_one, true);
+ RETURN_IF_CATCH_BAD_ALLOC(probe_side_output_column(
+ mcol, _join_node->_left_output_slot_flags, current_offset,
last_probe_index,
+ probe_size, all_match_one, true));
}
auto num_cols = mutable_block.columns();
output_block->swap(mutable_block.to_block());
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index 75951be756..face23f898 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -534,6 +534,9 @@ Status HashJoinNode::pull(doris::RuntimeState* state,
vectorized::Block* output_
} else {
return Status::OK();
}
+ if (!st) {
+ return st;
+ }
if (_is_outer_join) {
_add_tuple_is_null_column(&temp_block);
}
diff --git a/docs/en/docs/releasenotes/release-1.2.2.md
b/docs/en/docs/releasenotes/release-1.2.2.md
index d1a5e290dc..53dae650d4 100644
--- a/docs/en/docs/releasenotes/release-1.2.2.md
+++ b/docs/en/docs/releasenotes/release-1.2.2.md
@@ -132,7 +132,7 @@ Reference:
[https://doris.apache.org/docs/dev/advanced/variables](https://doris.
Add metrics to view the total rowset and segment numbers on BE
-- doris_be_all_rowset_nums and doris_be_all_segment_nums
+- doris_be_all_rowsets_num and doris_be_all_segments_num
# Big Thanks
diff --git
a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
index 0d542431f3..39b1a4c421 100644
--- a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
+++ b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
@@ -296,8 +296,8 @@ curl http://be_host:webserver_port/metrics?type=json
|`doris_be_load_bytes`| | 字节|通过 tablet sink 发送的数量累计 | 可观测导入数据量 | P0 |
|`doris_be_load_rows`| | Num | 通过 tablet sink 发送的行数累计| 可观测导入数据量 | P0 |
|`fragment_thread_pool_queue_size`| | Num | 当前查询执行线程池等待队列的长度 |
如果大于零,则说明查询线程已耗尽,查询会出现堆积 | P0 |
-|`doris_be_all_rowset_nums`| | Num | 当前所有 rowset 的个数 | | P0 |
-|`doris_be_all_segment_nums`| | Num | 当前所有 segment 的个数 | | P0 |
+|`doris_be_all_rowsets_num`| | Num | 当前所有 rowset 的个数 | | P0 |
+|`doris_be_all_segments_num`| | Num | 当前所有 segment 的个数 | | P0 |
|`doris_be_heavy_work_max_threads`| | Num | brpc heavy线程池线程个数| | p0 |
|`doris_be_light_work_max_threads`| | Num | brpc light线程池线程个数| | p0 |
|`doris_be_heavy_work_pool_queue_size`| | Num | brpc
heavy线程池队列最大长度,超过则阻塞提交work| | p0 |
diff --git a/docs/zh-CN/docs/releasenotes/release-1.2.2.md
b/docs/zh-CN/docs/releasenotes/release-1.2.2.md
index e9a6e06d1e..0e0f67a3f4 100644
--- a/docs/zh-CN/docs/releasenotes/release-1.2.2.md
+++ b/docs/zh-CN/docs/releasenotes/release-1.2.2.md
@@ -145,7 +145,7 @@ under the License.
# 其他
-添加指标以查看 BE 上的 Rowset 和 Segment 总数字 `doris_be_all_rowset_nums` 和
`doris_be_all_segment_nums`
+添加指标以查看 BE 上的 Rowset 和 Segment 总数字 `doris_be_all_rowsets_num` 和
`doris_be_all_segments_num`
# 致谢
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]