This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new 5ee7e733137 Spill and reserve (#46442)
5ee7e733137 is described below
commit 5ee7e73313712d444a9e24e0fe1547c10480e8de
Author: yiguolei <[email protected]>
AuthorDate: Mon Jan 6 15:22:37 2025 +0800
Spill and reserve (#46442)
### What problem does this PR solve?
1. fix log4j format %% error.
2. change wg's low water mark to 75% and high watermark to 85% to make
the spill disk more stable.
3. change exec_memlimit as hard limit if user set it.
---
.../rowset/segment_v2/inverted_index_reader.cpp | 8 +++++---
be/src/pipeline/exec/file_scan_operator.cpp | 2 +-
be/src/runtime/memory/mem_tracker_limiter.cpp | 2 ++
be/src/runtime/memory/mem_tracker_limiter.h | 22 +++++++++++++---------
be/src/runtime/query_context.cpp | 5 +++--
be/src/runtime/query_context.h | 13 +++++--------
be/src/runtime/workload_group/workload_group.cpp | 10 +++++-----
.../workload_group/workload_group_manager.cpp | 10 ++++------
.../workload_group/workload_group_manager_test.cpp | 2 +-
.../java/org/apache/doris/qe/SessionVariable.java | 12 ++----------
.../resource/workloadgroup/WorkloadGroup.java | 4 ++--
.../data/workload_manager_p0/test_curd_wlg.out | 6 +++---
12 files changed, 46 insertions(+), 50 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
index 4fe45283cd2..44c038aec9c 100644
--- a/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
+++ b/be/src/olap/rowset/segment_v2/inverted_index_reader.cpp
@@ -1158,9 +1158,11 @@ Status InvertedIndexIterator::read_from_inverted_index(
RETURN_IF_ERROR(
try_read_from_inverted_index(column_name, query_value,
query_type, &hit_count));
if (hit_count > segment_num_rows * query_bkd_limit_percent / 100) {
- return Status::Error<ErrorCode::INVERTED_INDEX_BYPASS>(
- "hit count: {}, bkd inverted reached limit {}%,
segment num rows:{}",
- hit_count, query_bkd_limit_percent, segment_num_rows);
+ return Status::
+ Error<ErrorCode::INVERTED_INDEX_BYPASS>(
+ "hit count: {}, bkd inverted reached limit {}%
, segment num "
+ "rows:{}", // add blackspace after % to avoid
log4j format bug
+ hit_count, query_bkd_limit_percent,
segment_num_rows);
}
}
}
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp
b/be/src/pipeline/exec/file_scan_operator.cpp
index 1571b585545..afd548f35f4 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -63,7 +63,7 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
auto wg_ptr = state->get_query_ctx()->workload_group();
_max_scanners =
config::doris_scanner_thread_pool_thread_num /
state->query_parallel_instance_num();
- if (wg_ptr && !state->get_query_ctx()->enable_mem_overcommit()) {
+ if (wg_ptr) {
const auto total_slots = wg_ptr->total_query_slot_count();
const auto query_slots = state->get_query_ctx()->get_slot_count();
_max_scanners = _max_scanners * query_slots / total_slots;
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index cc4218d7653..e65dea61394 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -88,6 +88,8 @@ std::shared_ptr<MemTrackerLimiter>
MemTrackerLimiter::create_shared(MemTrackerLi
auto tracker = std::make_shared<MemTrackerLimiter>(type, label,
byte_limit);
// Write tracker is only used to tracker the size, so limit == -1
auto write_tracker = std::make_shared<MemTrackerLimiter>(type, "Memtable"
+ label, -1);
+ // Memtable has a separate logic to deal with memory flush, so that should
not check the limit in memtracker.
+ write_tracker->set_enable_reserve_memory(true);
tracker->_write_tracker.swap(write_tracker);
#ifndef BE_TEST
DCHECK(ExecEnv::tracking_memory());
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index 64a8b97eb32..8c67cc6197a 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -139,7 +139,6 @@ public:
~MemTrackerLimiter();
Type type() const { return _type; }
- void set_overcommit(bool enable) { _enable_overcommit = enable; }
const std::string& label() const { return _label; }
std::shared_ptr<QueryStatistics> get_query_statistics() { return
_query_statistics; }
int64_t group_num() const { return _group_num; }
@@ -217,9 +216,7 @@ public:
if (UNLIKELY(bytes == 0)) {
return true;
}
- // If enable overcommit, then the limit is useless, use a very large
value as limit
- bool rt = _mem_counter.try_add(
- bytes, _enable_overcommit ?
std::numeric_limits<int64_t>::max() : _limit.load());
+ bool rt = _mem_counter.try_add(bytes, _limit.load());
if (rt && _query_statistics) {
_query_statistics->set_max_peak_memory_bytes(peak_consumption());
_query_statistics->set_current_used_memory_bytes(consumption());
@@ -292,6 +289,7 @@ public:
void add_address_sanitizers(void* buf, size_t size);
void remove_address_sanitizers(void* buf, size_t size);
bool is_group_commit_load {false};
+ void set_enable_reserve_memory(bool enabled) { _enable_reserve_memory =
enabled; }
private:
// only for Type::QUERY or Type::LOAD.
@@ -318,7 +316,6 @@ private:
*/
Type _type;
- bool _enable_overcommit = true;
// label used in the make snapshot, not guaranteed unique.
std::string _label;
@@ -328,6 +325,8 @@ private:
MemCounter _mem_counter;
MemCounter _reserved_counter;
+ bool _enable_reserve_memory = false;
+
// Limit on memory consumption, in bytes.
std::atomic<int64_t> _limit;
@@ -377,16 +376,21 @@ inline void MemTrackerLimiter::cache_consume(int64_t
bytes) {
}
inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
- /*
- if (bytes <= 0 || _enable_overcommit) {
+ // Do not enable check limit, because reserve process will check it.
+ if (bytes <= 0 || _enable_reserve_memory) {
return Status::OK();
}
- // check limit should ignore memtable size, because it is treated as a
cache
+
+ // If reserve not enabled, then should check limit here to kill the query
when limit exceed.
+ // For insert into select or pure load job, its memtable is accounted in a
seperate memtracker limiter,
+ // and its reserve is set to true. So that it will not reach this logic.
+ // Only query and load job has exec_mem_limit and the _limit > 0, other
memtracker limiter's _limit is -1 so
+ // it will not take effect.
if (_limit > 0 && consumption() + bytes > _limit) {
return Status::MemoryLimitExceeded(fmt::format("failed alloc size {},
{}",
MemCounter::print_bytes(bytes),
tracker_limit_exceeded_str()));
- }*/
+ }
return Status::OK();
}
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 241cefb8942..ea950d13555 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -146,9 +146,10 @@ void QueryContext::_init_query_mem_tracker() {
if (_query_options.__isset.is_report_success &&
_query_options.is_report_success) {
query_mem_tracker->enable_print_log_usage();
}
- query_mem_tracker->set_overcommit(enable_mem_overcommit());
+
query_mem_tracker->set_enable_reserve_memory(_query_options.__isset.enable_reserve_memory
&&
+
_query_options.enable_reserve_memory);
_user_set_mem_limit = bytes_limit;
- _expected_mem_limit = _user_set_mem_limit;
+ _adjusted_mem_limit = bytes_limit;
}
QueryContext::~QueryContext() {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 96883acd8c4..a50d6041d35 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -248,12 +248,13 @@ public:
int64_t get_mem_limit() const { return query_mem_tracker->limit(); }
- void set_expected_mem_limit(int64_t new_mem_limit) {
- _expected_mem_limit = std::min<int64_t>(new_mem_limit,
_user_set_mem_limit);
+ // The new memlimit should be less than user set memlimit.
+ void set_adjusted_mem_limit(int64_t new_mem_limit) {
+ _adjusted_mem_limit = std::min<int64_t>(new_mem_limit,
_user_set_mem_limit);
}
// Expected mem limit is the limit when workload group reached limit.
- int64_t expected_mem_limit() { return _expected_mem_limit; }
+ int64_t adjusted_mem_limit() { return _adjusted_mem_limit; }
std::shared_ptr<MemTrackerLimiter>& get_mem_tracker() { return
query_mem_tracker; }
@@ -261,10 +262,6 @@ public:
return _query_options.__isset.query_slot_count ?
_query_options.query_slot_count : 1;
}
- bool enable_mem_overcommit() const {
- return _query_options.__isset.enable_mem_overcommit ?
_query_options.enable_mem_overcommit
- : false;
- }
DescriptorTbl* desc_tbl = nullptr;
bool set_rsc_info = false;
std::string user;
@@ -395,7 +392,7 @@ private:
std::atomic<int64_t> _paused_period_secs = 0;
std::atomic<bool> _low_memory_mode = false;
int64_t _user_set_mem_limit = 0;
- std::atomic<int64_t> _expected_mem_limit = 0;
+ std::atomic<int64_t> _adjusted_mem_limit = 0;
std::mutex _profile_mutex;
timespec _query_arrival_timestamp;
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index 48d1a3c15b7..f06e416b8ce 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -93,7 +93,7 @@ std::string WorkloadGroup::debug_string() const {
return fmt::format(
"WorkloadGroup[id = {}, name = {}, version = {}, cpu_share = {}, "
"total_query_slot_count = {}, "
- "memory_limit = {}, slot_memory_policy = {}, write_buffer_ratio=
{}%, "
+ "memory_limit = {}, slot_memory_policy = {}, write_buffer_ratio=
{}% , " // add a blackspace after % to avoid log4j format bugs
"enable_memory_overcommit = {}, total_mem_used = {}
(write_buffer_size={}),"
"wg_refresh_interval_memory_growth = {}, mem_used_ratio = {},
cpu_hard_limit = {}, "
"scan_thread_num = "
@@ -133,17 +133,17 @@ std::string WorkloadGroup::memory_debug_string() const {
return fmt::format(
"WorkloadGroup[id = {}, name = {}, version = {},"
"total_query_slot_count = {}, "
- "memory_limit = {}, slot_memory_policy = {}, write_buffer_ratio=
{}%, "
+ "memory_limit = {}, slot_memory_policy = {}, write_buffer_ratio=
{}% , "
"enable_memory_overcommit = {}, total_mem_used = {}
(write_buffer_size={}),"
- "wg_refresh_interval_memory_growth = {}, mem_used_ratio = {}, "
- "memory_low_watermark={}, memory_high_watermark={},
is_shutdown={}, query_num={}]",
+ "wg_refresh_interval_memory_growth = {}, mem_used_ratio = {}% , "
+ "memory_low_watermark={}% , memory_high_watermark={}% ,
is_shutdown={}, query_num={}]",
_id, _name, _version, _total_query_slot_count,
PrettyPrinter::print(_memory_limit, TUnit::BYTES),
to_string(_slot_mem_policy),
_load_buffer_ratio, _enable_memory_overcommit ? "true" : "false",
PrettyPrinter::print(_total_mem_used.load(), TUnit::BYTES),
PrettyPrinter::print(_write_buffer_size.load(), TUnit::BYTES),
PrettyPrinter::print(_wg_refresh_interval_memory_growth.load(),
TUnit::BYTES),
- mem_used_ratio, _memory_low_watermark, _memory_high_watermark,
_is_shutdown,
+ std::trunc(mem_used_ratio), _memory_low_watermark,
_memory_high_watermark, _is_shutdown,
_query_ctxs.size());
}
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index fae389d6341..7eb31b28400 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -379,15 +379,15 @@ void WorkloadGroupMgr::handle_paused_queries() {
// check if the reserve is too large, if it is too large,
// should set the query's limit only.
// Check the query's reserve with expected limit.
- if (query_ctx->expected_mem_limit() <
+ if (query_ctx->adjusted_mem_limit() <
query_ctx->get_mem_tracker()->consumption() +
query_it->reserve_size_) {
- query_ctx->set_mem_limit(query_ctx->expected_mem_limit());
+ query_ctx->set_mem_limit(query_ctx->adjusted_mem_limit());
query_ctx->set_memory_sufficient(true);
LOG(INFO) << "Workload group memory reserve failed because
"
<< query_ctx->debug_string() << " reserve size "
<<
PrettyPrinter::print_bytes(query_it->reserve_size_)
<< " is too large, set hard limit to "
- <<
PrettyPrinter::print_bytes(query_ctx->expected_mem_limit())
+ <<
PrettyPrinter::print_bytes(query_ctx->adjusted_mem_limit())
<< " and resume running.";
query_it = queries_list.erase(query_it);
continue;
@@ -865,10 +865,8 @@ void
WorkloadGroupMgr::update_queries_limit_(WorkloadGroupPtr wg, bool enable_ha
// If the query is a pure load task, then should not modify its limit.
Or it will reserve
// memory failed and we did not hanle it.
if (!query_ctx->is_pure_load_task()) {
- // If slot memory policy is enabled, then overcommit is disabled.
- query_ctx->get_mem_tracker()->set_overcommit(false);
query_ctx->set_mem_limit(query_weighted_mem_limit);
-
query_ctx->set_expected_mem_limit(expected_query_weighted_mem_limit);
+
query_ctx->set_adjusted_mem_limit(expected_query_weighted_mem_limit);
}
}
LOG_EVERY_T(INFO, 60) << debug_msg;
diff --git a/be/test/runtime/workload_group/workload_group_manager_test.cpp
b/be/test/runtime/workload_group/workload_group_manager_test.cpp
index 17f4569f39b..e49d502fd25 100644
--- a/be/test/runtime/workload_group/workload_group_manager_test.cpp
+++ b/be/test/runtime/workload_group/workload_group_manager_test.cpp
@@ -169,7 +169,7 @@ TEST_F(WorkloadGroupManagerTest, query_exceed) {
ASSERT_EQ(query_context->is_cancelled(), true) << "query should be
canceled";
}
-// if (query_ctx->expected_mem_limit() <
+// if (query_ctx->adjusted_mem_limit() <
// query_ctx->get_mem_tracker()->consumption() +
query_it->reserve_size_)
TEST_F(WorkloadGroupManagerTest, wg_exceed1) {
auto wg = _wg_manager->get_or_create_workload_group({});
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 92a01fbf9a2..a1f5ababb1c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -658,8 +658,6 @@ public class SessionVariable implements Serializable,
Writable {
public static final String QUERY_SLOT_COUNT = "query_slot_count";
- public static final String ENABLE_MEM_OVERCOMMIT = "enable_mem_overcommit";
-
public static final String MAX_COLUMN_READER_NUM = "max_column_reader_num";
public static final String USE_MAX_LENGTH_OF_VARCHAR_IN_CTAS =
"use_max_length_of_varchar_in_ctas";
@@ -757,9 +755,9 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = INSERT_VISIBLE_TIMEOUT_MS, needForward = true)
public long insertVisibleTimeoutMs = DEFAULT_INSERT_VISIBLE_TIMEOUT_MS;
- // max memory used on every backend.
+ // max memory used on every backend. Default value to 100G.
@VariableMgr.VarAttr(name = EXEC_MEM_LIMIT, needForward = true)
- public long maxExecMemByte = 2147483648L;
+ public long maxExecMemByte = 100147483648L;
@VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT,
description = {"每个 Scan Instance 的 block queue 能够保存多少字节的 block",
@@ -866,11 +864,6 @@ public class SessionVariable implements Serializable,
Writable {
}
}
- @VariableMgr.VarAttr(name = ENABLE_MEM_OVERCOMMIT, needForward = true,
description = {
- "是否通过硬限的方式来计算每个Query的内存资源",
- "Whether to calculate the memory resources of each query by hard
limit"})
- public boolean enableMemOvercommit = true;
-
@VariableMgr.VarAttr(name = MAX_COLUMN_READER_NUM)
public int maxColumnReaderNum = 20000;
@@ -4055,7 +4048,6 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setHiveOrcUseColumnNames(hiveOrcUseColumnNames);
tResult.setHiveParquetUseColumnNames(hiveParquetUseColumnNames);
tResult.setQuerySlotCount(wgQuerySlotCount);
- tResult.setEnableMemOvercommit(enableMemOvercommit);
tResult.setKeepCarriageReturn(keepCarriageReturn);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
index 7852d3b6302..5fe16f6c364 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroup.java
@@ -134,8 +134,8 @@ public class WorkloadGroup implements Writable,
GsonPostProcessable {
ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(SCAN_THREAD_NUM, "-1");
ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MAX_REMOTE_SCAN_THREAD_NUM, "-1");
ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MIN_REMOTE_SCAN_THREAD_NUM, "-1");
- ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MEMORY_LOW_WATERMARK, "80%");
- ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MEMORY_HIGH_WATERMARK, "95%");
+ ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MEMORY_LOW_WATERMARK, "75%");
+ ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(MEMORY_HIGH_WATERMARK, "85%");
ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(TAG, "");
ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(READ_BYTES_PER_SECOND, "-1");
ALL_PROPERTIES_DEFAULT_VALUE_MAP.put(REMOTE_READ_BYTES_PER_SECOND,
"-1");
diff --git a/regression-test/data/workload_manager_p0/test_curd_wlg.out
b/regression-test/data/workload_manager_p0/test_curd_wlg.out
index 0e05adff526..68c7f735096 100644
--- a/regression-test/data/workload_manager_p0/test_curd_wlg.out
+++ b/regression-test/data/workload_manager_p0/test_curd_wlg.out
@@ -144,11 +144,11 @@ test_wg_priv_role1 test_wg_priv_g1 Usage_priv
NO
-- !select_wgp_12 --
-- !select_default_val_wg_1 --
-default_val_wg -1 -1 true 2147483647 0 0 -1
-1 -1 -1 80% 95% -1 -1
+default_val_wg -1 -1 true 2147483647 0 0 -1
-1 -1 -1 75% 85% -1 -1
-- !select_default_val_wg_2 --
-default_val_wg 1024 1% true 100 1 123 1% 1
12 10 80% 95% abc 123 10
+default_val_wg 1024 1% true 100 1 123 1% 1
12 10 75% 85% abc 123 10
-- !select_default_val_wg_3 --
-default_val_wg -1 -1 true 2147483647 0 0 -1
-1 -1 -1 80% 95% -1 -1
+default_val_wg -1 -1 true 2147483647 0 0 -1
-1 -1 -1 75% 85% -1 -1
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]