This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 73d8f5901d fix mem tracker limiter (#11376)
73d8f5901d is described below
commit 73d8f5901d09eece433dbd3d366fd2719d9d3ee7
Author: Xinyi Zou <[email protected]>
AuthorDate: Mon Aug 1 09:44:04 2022 +0800
fix mem tracker limiter (#11376)
---
be/src/olap/base_compaction.cpp | 2 +-
be/src/olap/compaction.cpp | 4 +-
be/src/olap/compaction.h | 2 +-
be/src/olap/cumulative_compaction.cpp | 2 +-
be/src/olap/delta_writer.cpp | 20 ++++-----
be/src/olap/delta_writer.h | 12 +++---
be/src/olap/memtable_flush_executor.cpp | 7 ++--
be/src/olap/memtable_flush_executor.h | 3 +-
be/src/olap/olap_server.cpp | 4 +-
be/src/olap/storage_engine.cpp | 16 ++++----
be/src/olap/storage_engine.h | 26 +++++++-----
be/src/olap/task/engine_alter_tablet_task.cpp | 4 +-
be/src/olap/task/engine_alter_tablet_task.h | 2 +-
be/src/olap/task/engine_batch_load_task.cpp | 4 +-
be/src/olap/task/engine_batch_load_task.h | 2 +-
be/src/olap/task/engine_checksum_task.cpp | 4 +-
be/src/olap/task/engine_checksum_task.h | 2 +-
be/src/olap/task/engine_clone_task.cpp | 4 +-
be/src/olap/task/engine_clone_task.h | 2 +-
be/src/runtime/data_stream_mgr.cpp | 6 +--
be/src/runtime/data_stream_recvr.cc | 5 +--
be/src/runtime/data_stream_recvr.h | 5 +--
be/src/runtime/exec_env.h | 16 ++++----
be/src/runtime/exec_env_init.cpp | 12 +++---
be/src/runtime/load_channel.cpp | 6 +--
be/src/runtime/load_channel.h | 4 +-
be/src/runtime/load_channel_mgr.cpp | 10 ++---
be/src/runtime/load_channel_mgr.h | 2 +-
be/src/runtime/memory/mem_tracker_limiter.cpp | 52 ++++++++++++------------
be/src/runtime/memory/mem_tracker_limiter.h | 18 ++++----
be/src/runtime/memory/mem_tracker_task_pool.cpp | 46 +++++++++------------
be/src/runtime/memory/mem_tracker_task_pool.h | 26 ++++++------
be/src/runtime/memory/thread_mem_tracker_mgr.cpp | 11 +++--
be/src/runtime/memory/thread_mem_tracker_mgr.h | 6 +--
be/src/runtime/runtime_state.cpp | 5 ++-
be/src/runtime/runtime_state.h | 8 ++--
be/src/runtime/sorted_run_merger.cc | 2 +-
be/src/runtime/tablets_channel.cpp | 7 ++--
be/src/runtime/tablets_channel.h | 7 ++--
be/src/runtime/thread_context.cpp | 5 ++-
be/src/runtime/thread_context.h | 5 ++-
be/src/service/internal_service.cpp | 16 ++++----
be/src/vec/runtime/vdata_stream_mgr.cpp | 4 +-
be/src/vec/runtime/vdata_stream_recvr.cpp | 5 +--
be/src/vec/runtime/vdata_stream_recvr.h | 6 +--
be/test/runtime/mem_limit_test.cpp | 12 +++---
be/test/testutil/run_all_tests.cpp | 3 +-
47 files changed, 220 insertions(+), 212 deletions(-)
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index 307fa2fee0..2e8d1f82aa 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -63,7 +63,7 @@ Status BaseCompaction::execute_compact_impl() {
return Status::OLAPInternalError(OLAP_ERR_BE_CLONE_OCCURRED);
}
- SCOPED_ATTACH_TASK(_mem_tracker.get(),
ThreadContext::TaskType::COMPACTION);
+ SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::COMPACTION);
// 2. do base compaction, merge rowsets
int64_t permits = get_compaction_permits();
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 9a0e50d591..7ac55ab15e 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -34,10 +34,10 @@ Compaction::Compaction(TabletSharedPtr tablet, const
std::string& label)
_input_row_num(0),
_state(CompactionState::INITED) {
#ifndef BE_TEST
- _mem_tracker = std::make_unique<MemTrackerLimiter>(
+ _mem_tracker = std::make_shared<MemTrackerLimiter>(
-1, label, StorageEngine::instance()->compaction_mem_tracker());
#else
- _mem_tracker = std::make_unique<MemTrackerLimiter>(-1, label);
+ _mem_tracker = std::make_shared<MemTrackerLimiter>(-1, label);
#endif
}
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 7d7a43bfd3..d73bfa9dba 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -76,7 +76,7 @@ protected:
protected:
// the root tracker for this compaction
- std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+ std::shared_ptr<MemTrackerLimiter> _mem_tracker;
TabletSharedPtr _tablet;
diff --git a/be/src/olap/cumulative_compaction.cpp
b/be/src/olap/cumulative_compaction.cpp
index 9fc9fd8b62..ba52fbfdcb 100644
--- a/be/src/olap/cumulative_compaction.cpp
+++ b/be/src/olap/cumulative_compaction.cpp
@@ -70,7 +70,7 @@ Status CumulativeCompaction::execute_compact_impl() {
return Status::OLAPInternalError(OLAP_ERR_CUMULATIVE_CLONE_OCCURRED);
}
- SCOPED_ATTACH_TASK(_mem_tracker.get(),
ThreadContext::TaskType::COMPACTION);
+ SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::COMPACTION);
// 3. do cumulative compaction, merge rowsets
int64_t permits = get_compaction_permits();
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index e50a8567b4..60a096af28 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -30,14 +30,14 @@
namespace doris {
-Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer,
MemTrackerLimiter* parent_tracker,
- bool is_vec) {
+Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer,
+ const std::shared_ptr<MemTrackerLimiter>&
parent_tracker, bool is_vec) {
*writer = new DeltaWriter(req, StorageEngine::instance(), parent_tracker,
is_vec);
return Status::OK();
}
DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
- MemTrackerLimiter* parent_tracker, bool is_vec)
+ const std::shared_ptr<MemTrackerLimiter>&
parent_tracker, bool is_vec)
: _req(*req),
_tablet(nullptr),
_cur_rowset(nullptr),
@@ -98,9 +98,9 @@ Status DeltaWriter::init() {
<< ", schema_hash=" << _req.schema_hash;
return Status::OLAPInternalError(OLAP_ERR_TABLE_NOT_FOUND);
}
- _mem_tracker = std::make_unique<MemTrackerLimiter>(
+ _mem_tracker = std::make_shared<MemTrackerLimiter>(
-1, fmt::format("DeltaWriter:tabletId={}", _tablet->tablet_id()),
_parent_tracker);
- SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
+ SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD);
// check tablet version number
if (_tablet->version_count() > config::max_tablet_version_num) {
//trigger quick compaction
@@ -208,7 +208,7 @@ Status DeltaWriter::write(const vectorized::Block* block,
const std::vector<int>
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
}
- SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
+ SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD);
_mem_table->insert(block, row_idxs);
if (_mem_table->need_to_agg()) {
@@ -226,7 +226,7 @@ Status DeltaWriter::_flush_memtable_async() {
if (++_segment_counter > config::max_segment_num_per_rowset) {
return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_SEGMENTS);
}
- return _flush_token->submit(std::move(_mem_table), _mem_tracker.get());
+ return _flush_token->submit(std::move(_mem_table), _mem_tracker);
}
Status DeltaWriter::flush_memtable_and_wait(bool need_wait) {
@@ -243,7 +243,7 @@ Status DeltaWriter::flush_memtable_and_wait(bool need_wait)
{
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
}
- SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
+ SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD);
if (mem_consumption() == _mem_table->memory_usage()) {
// equal means there is no memtable in flush queue, just flush this
memtable
VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable
size: "
@@ -297,7 +297,7 @@ Status DeltaWriter::close() {
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
}
- SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
+ SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD);
RETURN_NOT_OK(_flush_memtable_async());
_mem_table.reset();
return Status::OK();
@@ -312,7 +312,7 @@ Status DeltaWriter::close_wait() {
return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
}
- SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
+ SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::LOAD);
// return error if previous flush failed
RETURN_NOT_OK(_flush_token->wait());
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 1ce62de338..573b786402 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -56,7 +56,9 @@ struct WriteRequest {
class DeltaWriter {
public:
static Status open(WriteRequest* req, DeltaWriter** writer,
- MemTrackerLimiter* parent_tracker = nullptr, bool
is_vec = false);
+ const std::shared_ptr<MemTrackerLimiter>&
parent_tracker =
+ std::shared_ptr<MemTrackerLimiter>(),
+ bool is_vec = false);
~DeltaWriter();
@@ -101,8 +103,8 @@ public:
int64_t get_mem_consumption_snapshot() const;
private:
- DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
MemTrackerLimiter* parent_tracker,
- bool is_vec);
+ DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
+ const std::shared_ptr<MemTrackerLimiter>& parent_tracker, bool
is_vec);
// push a full memtable to flush executor
Status _flush_memtable_async();
@@ -133,8 +135,8 @@ private:
StorageEngine* _storage_engine;
std::unique_ptr<FlushToken> _flush_token;
- std::unique_ptr<MemTrackerLimiter> _mem_tracker;
- MemTrackerLimiter* _parent_tracker;
+ std::shared_ptr<MemTrackerLimiter> _mem_tracker;
+ std::shared_ptr<MemTrackerLimiter> _parent_tracker;
// The counter of number of segment flushed already.
int64_t _segment_counter = 0;
diff --git a/be/src/olap/memtable_flush_executor.cpp
b/be/src/olap/memtable_flush_executor.cpp
index b9e622a9a7..53fc5ac2f7 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -29,7 +29,7 @@ namespace doris {
class MemtableFlushTask final : public Runnable {
public:
MemtableFlushTask(FlushToken* flush_token, std::unique_ptr<MemTable>
memtable,
- int64_t submit_task_time, MemTrackerLimiter* tracker)
+ int64_t submit_task_time, const
std::shared_ptr<MemTrackerLimiter>& tracker)
: _flush_token(flush_token),
_memtable(std::move(memtable)),
_submit_task_time(submit_task_time),
@@ -47,7 +47,7 @@ private:
FlushToken* _flush_token;
std::unique_ptr<MemTable> _memtable;
int64_t _submit_task_time;
- MemTrackerLimiter* _tracker;
+ std::shared_ptr<MemTrackerLimiter> _tracker;
};
std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
@@ -58,7 +58,8 @@ std::ostream& operator<<(std::ostream& os, const
FlushStatistic& stat) {
return os;
}
-Status FlushToken::submit(std::unique_ptr<MemTable> mem_table,
MemTrackerLimiter* tracker) {
+Status FlushToken::submit(std::unique_ptr<MemTable> mem_table,
+ const std::shared_ptr<MemTrackerLimiter>& tracker) {
ErrorCode s = _flush_status.load();
if (s != OLAP_SUCCESS) {
return Status::OLAPInternalError(s);
diff --git a/be/src/olap/memtable_flush_executor.h
b/be/src/olap/memtable_flush_executor.h
index 6f986af3f5..4003126d58 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -57,7 +57,8 @@ public:
explicit FlushToken(std::unique_ptr<ThreadPoolToken> flush_pool_token)
: _flush_token(std::move(flush_pool_token)),
_flush_status(OLAP_SUCCESS) {}
- Status submit(std::unique_ptr<MemTable> mem_table, MemTrackerLimiter*
tracker);
+ Status submit(std::unique_ptr<MemTable> mem_table,
+ const std::shared_ptr<MemTrackerLimiter>& tracker);
// error has happpens, so we cancel this token
// And remove all tasks in the queue.
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index ef53eeaee4..cb080a0e70 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -112,7 +112,7 @@ Status StorageEngine::start_bg_threads() {
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "path_scan_thread",
[this, data_dir]() {
- SCOPED_ATTACH_TASK(_mem_tracker.get(),
ThreadContext::TaskType::STORAGE);
+ SCOPED_ATTACH_TASK(_mem_tracker,
ThreadContext::TaskType::STORAGE);
this->_path_scan_thread_callback(data_dir);
},
&path_scan_thread));
@@ -122,7 +122,7 @@ Status StorageEngine::start_bg_threads() {
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "path_gc_thread",
[this, data_dir]() {
- SCOPED_ATTACH_TASK(_mem_tracker.get(),
ThreadContext::TaskType::STORAGE);
+ SCOPED_ATTACH_TASK(_mem_tracker,
ThreadContext::TaskType::STORAGE);
this->_path_gc_thread_callback(data_dir);
},
&path_gc_thread));
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 4e28ee7d8b..f239f2a585 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -113,16 +113,16 @@ StorageEngine::StorageEngine(const EngineOptions& options)
_index_stream_lru_cache(nullptr),
_file_cache(nullptr),
_compaction_mem_tracker(
- std::make_unique<MemTrackerLimiter>(-1,
"StorageEngine::AutoCompaction")),
+ std::make_shared<MemTrackerLimiter>(-1,
"StorageEngine::AutoCompaction")),
_segment_meta_mem_tracker(std::make_unique<MemTracker>("StorageEngine::SegmentMeta")),
_schema_change_mem_tracker(
- std::make_unique<MemTrackerLimiter>(-1,
"StorageEngine::SchemaChange")),
- _clone_mem_tracker(std::make_unique<MemTrackerLimiter>(-1,
"StorageEngine::Clone")),
+ std::make_shared<MemTrackerLimiter>(-1,
"StorageEngine::SchemaChange")),
+ _clone_mem_tracker(std::make_shared<MemTrackerLimiter>(-1,
"StorageEngine::Clone")),
_batch_load_mem_tracker(
- std::make_unique<MemTrackerLimiter>(-1,
"StorageEngine::BatchLoad")),
+ std::make_shared<MemTrackerLimiter>(-1,
"StorageEngine::BatchLoad")),
_consistency_mem_tracker(
- std::make_unique<MemTrackerLimiter>(-1,
"StorageEngine::Consistency")),
- _mem_tracker(std::make_unique<MemTrackerLimiter>(-1,
"StorageEngine::Self")),
+ std::make_shared<MemTrackerLimiter>(-1,
"StorageEngine::Consistency")),
+ _mem_tracker(std::make_shared<MemTrackerLimiter>(-1,
"StorageEngine::Self")),
_stop_background_threads_latch(1),
_tablet_manager(new TabletManager(config::tablet_map_shard_size)),
_txn_manager(new TxnManager(config::txn_map_shard_size,
config::txn_shard_size)),
@@ -168,7 +168,7 @@ void StorageEngine::load_data_dirs(const
std::vector<DataDir*>& data_dirs) {
std::vector<std::thread> threads;
for (auto data_dir : data_dirs) {
threads.emplace_back([this, data_dir] {
- SCOPED_ATTACH_TASK(_mem_tracker.get(),
ThreadContext::TaskType::STORAGE);
+ SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
auto res = data_dir->load();
if (!res.ok()) {
LOG(WARNING) << "io error when init load tables. res=" << res
@@ -220,7 +220,7 @@ Status StorageEngine::_init_store_map() {
_tablet_manager.get(),
_txn_manager.get());
tmp_stores.emplace_back(store);
threads.emplace_back([this, store, &error_msg_lock, &error_msg]() {
- SCOPED_ATTACH_TASK(_mem_tracker.get(),
ThreadContext::TaskType::STORAGE);
+ SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
auto st = store->init();
if (!st.ok()) {
{
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index d508a617c5..a46218e442 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -179,12 +179,16 @@ public:
Status get_compaction_status_json(std::string* result);
- MemTrackerLimiter* compaction_mem_tracker() { return
_compaction_mem_tracker.get(); }
+ std::shared_ptr<MemTrackerLimiter> compaction_mem_tracker() { return
_compaction_mem_tracker; }
MemTracker* segment_meta_mem_tracker() { return
_segment_meta_mem_tracker.get(); }
- MemTrackerLimiter* schema_change_mem_tracker() { return
_schema_change_mem_tracker.get(); }
- MemTrackerLimiter* clone_mem_tracker() { return _clone_mem_tracker.get(); }
- MemTrackerLimiter* batch_load_mem_tracker() { return
_batch_load_mem_tracker.get(); }
- MemTrackerLimiter* consistency_mem_tracker() { return
_consistency_mem_tracker.get(); }
+ std::shared_ptr<MemTrackerLimiter> schema_change_mem_tracker() {
+ return _schema_change_mem_tracker;
+ }
+ std::shared_ptr<MemTrackerLimiter> clone_mem_tracker() { return
_clone_mem_tracker; }
+ std::shared_ptr<MemTrackerLimiter> batch_load_mem_tracker() { return
_batch_load_mem_tracker; }
+ std::shared_ptr<MemTrackerLimiter> consistency_mem_tracker() {
+ return _consistency_mem_tracker;
+ }
// check cumulative compaction config
void check_cumulative_compaction_config();
@@ -333,21 +337,21 @@ private:
std::unordered_map<std::string, RowsetSharedPtr> _unused_rowsets;
// Count the memory consumption of all Base and Cumulative tasks.
- std::unique_ptr<MemTrackerLimiter> _compaction_mem_tracker;
+ std::shared_ptr<MemTrackerLimiter> _compaction_mem_tracker;
// This mem tracker is only for tracking memory use by segment meta data
such as footer or index page.
// The memory consumed by querying is tracked in segment iterator.
std::unique_ptr<MemTracker> _segment_meta_mem_tracker;
// Count the memory consumption of all SchemaChange tasks.
- std::unique_ptr<MemTrackerLimiter> _schema_change_mem_tracker;
+ std::shared_ptr<MemTrackerLimiter> _schema_change_mem_tracker;
// Count the memory consumption of all EngineCloneTask.
// Note: Memory that does not contain make/release snapshots.
- std::unique_ptr<MemTrackerLimiter> _clone_mem_tracker;
+ std::shared_ptr<MemTrackerLimiter> _clone_mem_tracker;
// Count the memory consumption of all EngineBatchLoadTask.
- std::unique_ptr<MemTrackerLimiter> _batch_load_mem_tracker;
+ std::shared_ptr<MemTrackerLimiter> _batch_load_mem_tracker;
// Count the memory consumption of all EngineChecksumTask.
- std::unique_ptr<MemTrackerLimiter> _consistency_mem_tracker;
+ std::shared_ptr<MemTrackerLimiter> _consistency_mem_tracker;
// StorageEngine oneself
- std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+ std::shared_ptr<MemTrackerLimiter> _mem_tracker;
CountDownLatch _stop_background_threads_latch;
scoped_refptr<Thread> _unused_rowset_monitor_thread;
diff --git a/be/src/olap/task/engine_alter_tablet_task.cpp
b/be/src/olap/task/engine_alter_tablet_task.cpp
index ce34cac4d3..7689df96b3 100644
--- a/be/src/olap/task/engine_alter_tablet_task.cpp
+++ b/be/src/olap/task/engine_alter_tablet_task.cpp
@@ -25,7 +25,7 @@ namespace doris {
EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request)
: _alter_tablet_req(request) {
- _mem_tracker = std::make_unique<MemTrackerLimiter>(
+ _mem_tracker = std::make_shared<MemTrackerLimiter>(
config::memory_limitation_per_thread_for_schema_change_bytes,
fmt::format("EngineAlterTabletTask#baseTabletId={}:newTabletId={}",
std::to_string(_alter_tablet_req.base_tablet_id),
@@ -34,7 +34,7 @@ EngineAlterTabletTask::EngineAlterTabletTask(const
TAlterTabletReqV2& request)
}
Status EngineAlterTabletTask::execute() {
- SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
+ SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
DorisMetrics::instance()->create_rollup_requests_total->increment(1);
Status res =
SchemaChangeHandler::process_alter_tablet_v2(_alter_tablet_req);
diff --git a/be/src/olap/task/engine_alter_tablet_task.h
b/be/src/olap/task/engine_alter_tablet_task.h
index b6a736b357..1054c55eec 100644
--- a/be/src/olap/task/engine_alter_tablet_task.h
+++ b/be/src/olap/task/engine_alter_tablet_task.h
@@ -36,7 +36,7 @@ public:
private:
const TAlterTabletReqV2& _alter_tablet_req;
- std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+ std::shared_ptr<MemTrackerLimiter> _mem_tracker;
}; // EngineTask
} // namespace doris
diff --git a/be/src/olap/task/engine_batch_load_task.cpp
b/be/src/olap/task/engine_batch_load_task.cpp
index e98ad02471..c478adcd8c 100644
--- a/be/src/olap/task/engine_batch_load_task.cpp
+++ b/be/src/olap/task/engine_batch_load_task.cpp
@@ -53,7 +53,7 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req,
std::vector<TTablet
_signature(signature),
_res_status(res_status) {
_download_status = Status::OK();
- _mem_tracker = std::make_unique<MemTrackerLimiter>(
+ _mem_tracker = std::make_shared<MemTrackerLimiter>(
-1,
fmt::format("EngineBatchLoadTask#pushType={}:tabletId={}",
_push_req.push_type,
std::to_string(_push_req.tablet_id)),
@@ -63,7 +63,7 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req,
std::vector<TTablet
EngineBatchLoadTask::~EngineBatchLoadTask() {}
Status EngineBatchLoadTask::execute() {
- SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
+ SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
Status status = Status::OK();
if (_push_req.push_type == TPushType::LOAD || _push_req.push_type ==
TPushType::LOAD_V2) {
status = _init();
diff --git a/be/src/olap/task/engine_batch_load_task.h
b/be/src/olap/task/engine_batch_load_task.h
index f0bf0dba0d..e2aba71f00 100644
--- a/be/src/olap/task/engine_batch_load_task.h
+++ b/be/src/olap/task/engine_batch_load_task.h
@@ -76,7 +76,7 @@ private:
Status* _res_status;
std::string _remote_file_path;
std::string _local_file_path;
- std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+ std::shared_ptr<MemTrackerLimiter> _mem_tracker;
}; // class EngineBatchLoadTask
} // namespace doris
#endif // DORIS_BE_SRC_OLAP_TASK_ENGINE_BATCH_LOAD_TASK_H
diff --git a/be/src/olap/task/engine_checksum_task.cpp
b/be/src/olap/task/engine_checksum_task.cpp
index f5d5f317f1..fc17c2fad3 100644
--- a/be/src/olap/task/engine_checksum_task.cpp
+++ b/be/src/olap/task/engine_checksum_task.cpp
@@ -26,13 +26,13 @@ namespace doris {
EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, TSchemaHash
schema_hash,
TVersion version, uint32_t* checksum)
: _tablet_id(tablet_id), _schema_hash(schema_hash), _version(version),
_checksum(checksum) {
- _mem_tracker = std::make_unique<MemTrackerLimiter>(
+ _mem_tracker = std::make_shared<MemTrackerLimiter>(
-1, "EngineChecksumTask#tabletId=" + std::to_string(tablet_id),
StorageEngine::instance()->consistency_mem_tracker());
}
Status EngineChecksumTask::execute() {
- SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
+ SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
return _compute_checksum();
} // execute
diff --git a/be/src/olap/task/engine_checksum_task.h
b/be/src/olap/task/engine_checksum_task.h
index 233c6e84f2..04afa1a5cd 100644
--- a/be/src/olap/task/engine_checksum_task.h
+++ b/be/src/olap/task/engine_checksum_task.h
@@ -44,7 +44,7 @@ private:
TSchemaHash _schema_hash;
TVersion _version;
uint32_t* _checksum;
- std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+ std::shared_ptr<MemTrackerLimiter> _mem_tracker;
}; // EngineTask
} // namespace doris
diff --git a/be/src/olap/task/engine_clone_task.cpp
b/be/src/olap/task/engine_clone_task.cpp
index 73466cb72e..429a3d34be 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -57,14 +57,14 @@ EngineCloneTask::EngineCloneTask(const TCloneReq&
clone_req, const TMasterInfo&
_res_status(res_status),
_signature(signature),
_master_info(master_info) {
- _mem_tracker = std::make_unique<MemTrackerLimiter>(
+ _mem_tracker = std::make_shared<MemTrackerLimiter>(
-1, "EngineCloneTask#tabletId=" +
std::to_string(_clone_req.tablet_id),
StorageEngine::instance()->clone_mem_tracker());
}
Status EngineCloneTask::execute() {
// register the tablet to avoid it is deleted by gc thread during clone
process
- SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
+ SCOPED_ATTACH_TASK(_mem_tracker, ThreadContext::TaskType::STORAGE);
StorageEngine::instance()->tablet_manager()->register_clone_tablet(_clone_req.tablet_id);
Status st = _do_clone();
StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id);
diff --git a/be/src/olap/task/engine_clone_task.h
b/be/src/olap/task/engine_clone_task.h
index ccb8a19581..90c4b43596 100644
--- a/be/src/olap/task/engine_clone_task.h
+++ b/be/src/olap/task/engine_clone_task.h
@@ -79,7 +79,7 @@ private:
const TMasterInfo& _master_info;
int64_t _copy_size;
int64_t _copy_time_ms;
- std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+ std::shared_ptr<MemTrackerLimiter> _mem_tracker;
}; // EngineTask
} // namespace doris
diff --git a/be/src/runtime/data_stream_mgr.cpp
b/be/src/runtime/data_stream_mgr.cpp
index 3e519f8987..b0d1dbd8f2 100644
--- a/be/src/runtime/data_stream_mgr.cpp
+++ b/be/src/runtime/data_stream_mgr.cpp
@@ -72,9 +72,9 @@ shared_ptr<DataStreamRecvr> DataStreamMgr::create_recvr(
DCHECK(profile != nullptr);
VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id
<< ", node=" << dest_node_id;
- shared_ptr<DataStreamRecvr> recvr(new DataStreamRecvr(
- this, row_desc, state->query_mem_tracker(), fragment_instance_id,
dest_node_id,
- num_senders, is_merging, buffer_size, profile,
sub_plan_query_statistics_recvr));
+ shared_ptr<DataStreamRecvr> recvr(
+ new DataStreamRecvr(this, row_desc, fragment_instance_id,
dest_node_id, num_senders,
+ is_merging, buffer_size, profile,
sub_plan_query_statistics_recvr));
uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
lock_guard<mutex> l(_lock);
_fragment_stream_set.insert(std::make_pair(fragment_instance_id,
dest_node_id));
diff --git a/be/src/runtime/data_stream_recvr.cc
b/be/src/runtime/data_stream_recvr.cc
index 0d456ca74a..b7988178fb 100644
--- a/be/src/runtime/data_stream_recvr.cc
+++ b/be/src/runtime/data_stream_recvr.cc
@@ -447,9 +447,8 @@ void DataStreamRecvr::transfer_all_resources(RowBatch*
transfer_batch) {
DataStreamRecvr::DataStreamRecvr(
DataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
- MemTrackerLimiter* query_mem_tracker, const TUniqueId&
fragment_instance_id,
- PlanNodeId dest_node_id, int num_senders, bool is_merging, int
total_buffer_limit,
- RuntimeProfile* profile,
+ const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int
num_senders,
+ bool is_merging, int total_buffer_limit, RuntimeProfile* profile,
std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr)
: _mgr(stream_mgr),
_fragment_instance_id(fragment_instance_id),
diff --git a/be/src/runtime/data_stream_recvr.h
b/be/src/runtime/data_stream_recvr.h
index 31af9f2e37..efb036b5dd 100644
--- a/be/src/runtime/data_stream_recvr.h
+++ b/be/src/runtime/data_stream_recvr.h
@@ -116,9 +116,8 @@ private:
class SenderQueue;
DataStreamRecvr(DataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
- MemTrackerLimiter* query_mem_tracker, const TUniqueId&
fragment_instance_id,
- PlanNodeId dest_node_id, int num_senders, bool is_merging,
- int total_buffer_limit, RuntimeProfile* profile,
+ const TUniqueId& fragment_instance_id, PlanNodeId
dest_node_id, int num_senders,
+ bool is_merging, int total_buffer_limit, RuntimeProfile*
profile,
std::shared_ptr<QueryStatisticsRecvr>
sub_plan_query_statistics_recvr);
// If receive queue is full, done is enqueue pending, and return with
*done is nullptr
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index d879b417e0..f708f4b410 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -113,10 +113,12 @@ public:
return nullptr;
}
- MemTrackerLimiter* process_mem_tracker() { return _process_mem_tracker; }
- void set_process_mem_tracker(MemTrackerLimiter* tracker) {
_process_mem_tracker = tracker; }
- MemTrackerLimiter* query_pool_mem_tracker() { return
_query_pool_mem_tracker; }
- MemTrackerLimiter* load_pool_mem_tracker() { return
_load_pool_mem_tracker; }
+ std::shared_ptr<MemTrackerLimiter> process_mem_tracker() { return
_process_mem_tracker; }
+ void set_process_mem_tracker(const std::shared_ptr<MemTrackerLimiter>&
tracker) {
+ _process_mem_tracker = tracker;
+ }
+ std::shared_ptr<MemTrackerLimiter> query_pool_mem_tracker() { return
_query_pool_mem_tracker; }
+ std::shared_ptr<MemTrackerLimiter> load_pool_mem_tracker() { return
_load_pool_mem_tracker; }
MemTrackerTaskPool* task_pool_mem_tracker_registry() { return
_task_pool_mem_tracker_registry; }
ThreadResourceMgr* thread_mgr() { return _thread_mgr; }
PriorityThreadPool* scan_thread_pool() { return _scan_thread_pool; }
@@ -184,11 +186,11 @@ private:
// The ancestor for all trackers. Every tracker is visible from the
process down.
// Not limit total memory by process tracker, and it's just used to track
virtual memory of process.
- MemTrackerLimiter* _process_mem_tracker;
+ std::shared_ptr<MemTrackerLimiter> _process_mem_tracker;
// The ancestor for all querys tracker.
- MemTrackerLimiter* _query_pool_mem_tracker;
+ std::shared_ptr<MemTrackerLimiter> _query_pool_mem_tracker;
// The ancestor for all load tracker.
- MemTrackerLimiter* _load_pool_mem_tracker;
+ std::shared_ptr<MemTrackerLimiter> _load_pool_mem_tracker;
MemTrackerTaskPool* _task_pool_mem_tracker_registry;
// The following two thread pools are used in different scenarios.
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index afecca4159..e9074af172 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -193,7 +193,8 @@ Status ExecEnv::_init_mem_tracker() {
<< ". Using physical memory instead";
global_memory_limit_bytes = MemInfo::physical_mem();
}
- _process_mem_tracker = new MemTrackerLimiter(global_memory_limit_bytes,
"Process");
+ _process_mem_tracker =
+ std::make_shared<MemTrackerLimiter>(global_memory_limit_bytes,
"Process");
thread_context()->_thread_mem_tracker_mgr->init();
thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
#if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) &&
!defined(ADDRESS_SANITIZER) && \
@@ -203,10 +204,12 @@ Status ExecEnv::_init_mem_tracker() {
}
#endif
- _query_pool_mem_tracker = new MemTrackerLimiter(-1, "QueryPool",
_process_mem_tracker);
+ _query_pool_mem_tracker =
+ std::make_shared<MemTrackerLimiter>(-1, "QueryPool",
_process_mem_tracker);
REGISTER_HOOK_METRIC(query_mem_consumption,
[this]() { return
_query_pool_mem_tracker->consumption(); });
- _load_pool_mem_tracker = new MemTrackerLimiter(-1, "LoadPool",
_process_mem_tracker);
+ _load_pool_mem_tracker =
+ std::make_shared<MemTrackerLimiter>(-1, "LoadPool",
_process_mem_tracker);
REGISTER_HOOK_METRIC(load_mem_consumption,
[this]() { return
_load_pool_mem_tracker->consumption(); });
LOG(INFO) << "Using global memory limit: "
@@ -363,9 +366,6 @@ void ExecEnv::_destroy() {
SAFE_DELETE(_routine_load_task_executor);
SAFE_DELETE(_external_scan_context_mgr);
SAFE_DELETE(_heartbeat_flags);
- SAFE_DELETE(_process_mem_tracker);
- SAFE_DELETE(_query_pool_mem_tracker);
- SAFE_DELETE(_load_pool_mem_tracker);
SAFE_DELETE(_task_pool_mem_tracker_registry);
SAFE_DELETE(_buffer_reservation);
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index e1fc0a3463..56b21b7cef 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -25,11 +25,11 @@
namespace doris {
-LoadChannel::LoadChannel(const UniqueId& load_id,
std::unique_ptr<MemTrackerLimiter> mem_tracker,
+LoadChannel::LoadChannel(const UniqueId& load_id,
std::shared_ptr<MemTrackerLimiter>& mem_tracker,
int64_t timeout_s, bool is_high_priority, const
std::string& sender_ip,
bool is_vec)
: _load_id(load_id),
- _mem_tracker(std::move(mem_tracker)),
+ _mem_tracker(mem_tracker),
_timeout_s(timeout_s),
_is_high_priority(is_high_priority),
_sender_ip(sender_ip),
@@ -60,7 +60,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest&
params) {
} else {
// create a new tablets channel
TabletsChannelKey key(params.id(), index_id);
- channel.reset(new TabletsChannel(key, _mem_tracker.get(),
_is_high_priority, _is_vec));
+ channel.reset(new TabletsChannel(key, _mem_tracker,
_is_high_priority, _is_vec));
_tablets_channels.insert({index_id, channel});
}
}
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index ad8a476fcf..9647528304 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -39,7 +39,7 @@ class Cache;
// corresponding to a certain load job
class LoadChannel {
public:
- LoadChannel(const UniqueId& load_id, std::unique_ptr<MemTrackerLimiter>
mem_tracker,
+ LoadChannel(const UniqueId& load_id, std::shared_ptr<MemTrackerLimiter>&
mem_tracker,
int64_t timeout_s, bool is_high_priority, const std::string&
sender_ip,
bool is_vec);
~LoadChannel();
@@ -99,7 +99,7 @@ private:
UniqueId _load_id;
// Tracks the total memory consumed by current load job on this BE
- std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+ std::shared_ptr<MemTrackerLimiter> _mem_tracker;
// lock protect the tablets channel map
std::mutex _lock;
diff --git a/be/src/runtime/load_channel_mgr.cpp
b/be/src/runtime/load_channel_mgr.cpp
index 7429c09c6e..c252fe2aff 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -84,7 +84,7 @@ LoadChannelMgr::~LoadChannelMgr() {
Status LoadChannelMgr::init(int64_t process_mem_limit) {
int64_t load_mgr_mem_limit =
calc_process_max_load_memory(process_mem_limit);
- _mem_tracker = std::make_unique<MemTrackerLimiter>(load_mgr_mem_limit,
"LoadChannelMgr");
+ _mem_tracker = std::make_shared<MemTrackerLimiter>(load_mgr_mem_limit,
"LoadChannelMgr");
REGISTER_HOOK_METRIC(load_channel_mem_consumption,
[this]() { return _mem_tracker->consumption(); });
_last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024);
@@ -110,13 +110,13 @@ Status LoadChannelMgr::open(const
PTabletWriterOpenRequest& params) {
int64_t load_mem_limit = params.has_load_mem_limit() ?
params.load_mem_limit() : -1;
int64_t channel_mem_limit =
calc_channel_max_load_memory(load_mem_limit,
_mem_tracker->limit());
- auto channel_mem_tracker = std::make_unique<MemTrackerLimiter>(
+ auto channel_mem_tracker = std::make_shared<MemTrackerLimiter>(
channel_mem_limit,
fmt::format("LoadChannel#senderIp={}#loadID={}",
params.sender_ip(),
load_id.to_string()),
- _mem_tracker.get());
- channel.reset(new LoadChannel(load_id,
std::move(channel_mem_tracker),
- channel_timeout_s, is_high_priority,
params.sender_ip(),
+ _mem_tracker);
+ channel.reset(new LoadChannel(load_id, channel_mem_tracker,
channel_timeout_s,
+ is_high_priority, params.sender_ip(),
params.is_vectorized()));
_load_channels.insert({load_id, channel});
}
diff --git a/be/src/runtime/load_channel_mgr.h
b/be/src/runtime/load_channel_mgr.h
index 0e46d8bf52..af9d3d6240 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -78,7 +78,7 @@ protected:
Cache* _last_success_channel = nullptr;
// check the total load channel mem consumption of this Backend
- std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+ std::shared_ptr<MemTrackerLimiter> _mem_tracker;
CountDownLatch _stop_background_threads_latch;
// thread to clean timeout load channels
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp
b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 2223fce5a5..6a37b1df86 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -19,6 +19,8 @@
#include <fmt/format.h>
+#include <boost/stacktrace.hpp>
+
#include "gutil/once.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
@@ -29,7 +31,8 @@
namespace doris {
MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string&
label,
- MemTrackerLimiter* parent,
RuntimeProfile* profile)
+ const std::shared_ptr<MemTrackerLimiter>&
parent,
+ RuntimeProfile* profile)
: MemTracker(label, profile, true) {
DCHECK_GE(byte_limit, -1);
_limit = byte_limit;
@@ -42,32 +45,28 @@ MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit,
const std::string& labe
while (tracker != nullptr) {
_all_ancestors.push_back(tracker);
if (tracker->has_limit()) _limited_ancestors.push_back(tracker);
- tracker = tracker->_parent;
+ tracker = tracker->_parent.get();
}
DCHECK_GT(_all_ancestors.size(), 0);
DCHECK_EQ(_all_ancestors[0], this);
- if (_parent) _parent->add_child(this);
+ if (_parent) {
+ std::lock_guard<std::mutex> l(_parent->_child_tracker_limiter_lock);
+ _child_tracker_it = _parent->_child_tracker_limiters.insert(
+ _parent->_child_tracker_limiters.end(), this);
+ _had_child_count++;
+ }
}
MemTrackerLimiter::~MemTrackerLimiter() {
// TCMalloc hook will be triggered during destructor memtracker, may cause
crash.
if (_label == "Process") doris::thread_context_ptr._init = false;
DCHECK(remain_child_count() == 0 || _label == "Process");
- if (_parent) _parent->remove_child(this);
-}
-
-void MemTrackerLimiter::add_child(MemTrackerLimiter* tracker) {
- std::lock_guard<std::mutex> l(_child_tracker_limiter_lock);
- tracker->_child_tracker_it =
- _child_tracker_limiters.insert(_child_tracker_limiters.end(),
tracker);
- _had_child_count++;
-}
-
-void MemTrackerLimiter::remove_child(MemTrackerLimiter* tracker) {
- std::lock_guard<std::mutex> l(_child_tracker_limiter_lock);
- if (tracker->_child_tracker_it != _child_tracker_limiters.end()) {
- _child_tracker_limiters.erase(tracker->_child_tracker_it);
- tracker->_child_tracker_it = _child_tracker_limiters.end();
+ if (_parent) {
+ std::lock_guard<std::mutex> l(_parent->_child_tracker_limiter_lock);
+ if (_child_tracker_it != _parent->_child_tracker_limiters.end()) {
+ _parent->_child_tracker_limiters.erase(_child_tracker_it);
+ _child_tracker_it = _parent->_child_tracker_limiters.end();
+ }
}
}
@@ -221,13 +220,14 @@ std::string MemTrackerLimiter::log_usage(int
max_recursive_depth,
Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const
std::string& details,
int64_t failed_allocation_size,
Status failed_alloc) {
STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
- MemTrackerLimiter* process_tracker =
ExecEnv::GetInstance()->process_mem_tracker();
std::string detail =
"Memory exceed limit. fragment={}, details={}, on backend={}.
Memory left in process "
"limit={}.";
- detail = fmt::format(detail, state != nullptr ?
print_id(state->fragment_instance_id()) : "",
- details, BackendOptions::get_localhost(),
-
PrettyPrinter::print(process_tracker->spare_capacity(), TUnit::BYTES));
+ detail = fmt::format(
+ detail, state != nullptr ? print_id(state->fragment_instance_id())
: "", details,
+ BackendOptions::get_localhost(),
+
PrettyPrinter::print(ExecEnv::GetInstance()->process_mem_tracker()->spare_capacity(),
+ TUnit::BYTES));
if (!failed_alloc) {
detail += " failed alloc=<{}>. current tracker={}.";
detail = fmt::format(detail, failed_alloc.to_string(), _label);
@@ -240,13 +240,15 @@ Status
MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const std::str
Status status = Status::MemoryLimitExceeded(detail);
if (state != nullptr) state->log_error(detail);
+ detail += "\n" +
boost::stacktrace::to_string(boost::stacktrace::stacktrace());
// only print the tracker log_usage in be log.
- if (process_tracker->spare_capacity() < failed_allocation_size) {
+ if (ExecEnv::GetInstance()->process_mem_tracker()->spare_capacity() <
failed_allocation_size) {
// Dumping the process MemTracker is expensive. Limiting the recursive
depth to two
// levels limits the level of detail to a one-line summary for each
query MemTracker.
- detail += "\n" + process_tracker->log_usage(2);
+ detail += "\n" +
ExecEnv::GetInstance()->process_mem_tracker()->log_usage(2);
+ } else {
+ detail += "\n" + log_usage();
}
- detail += "\n" + log_usage();
LOG(WARNING) << detail;
return status;
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h
b/be/src/runtime/memory/mem_tracker_limiter.h
index 5c41ce7cda..c85205b2c0 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -42,8 +42,10 @@ class RuntimeState;
class MemTrackerLimiter final : public MemTracker {
public:
// Creates and adds the tracker limiter to the tree
- MemTrackerLimiter(int64_t byte_limit = -1, const std::string& label =
std::string(),
- MemTrackerLimiter* parent = nullptr, RuntimeProfile*
profile = nullptr);
+ MemTrackerLimiter(
+ int64_t byte_limit = -1, const std::string& label = std::string(),
+ const std::shared_ptr<MemTrackerLimiter>& parent =
std::shared_ptr<MemTrackerLimiter>(),
+ RuntimeProfile* profile = nullptr);
// If the final consumption is not as expected, this usually means that
the same memory is calling
// consume and release on different trackers. If the two trackers have a
parent-child relationship,
@@ -51,10 +53,7 @@ public:
// no parent-child relationship, the two tracker consumptions are wrong.
~MemTrackerLimiter();
- MemTrackerLimiter* parent() const { return _parent; }
-
- void add_child(MemTrackerLimiter* tracker);
- void remove_child(MemTrackerLimiter* tracker);
+ std::shared_ptr<MemTrackerLimiter> parent() const { return _parent; }
size_t remain_child_count() const { return _child_tracker_limiters.size();
}
size_t had_child_count() const { return _had_child_count; }
@@ -187,7 +186,7 @@ private:
// Group number in MemTracker::mem_tracker_pool, generated by the
timestamp.
int64_t _group_num;
- MemTrackerLimiter* _parent; // The parent of this tracker.
+ std::shared_ptr<MemTrackerLimiter> _parent; // The parent of this tracker.
// this tracker limiter plus all of its ancestors
std::vector<MemTrackerLimiter*> _all_ancestors;
@@ -199,13 +198,12 @@ private:
// update that of its children).
mutable std::mutex _child_tracker_limiter_lock;
std::list<MemTrackerLimiter*> _child_tracker_limiters;
+ // Iterator into parent_->_child_tracker_limiters for this object. Stored
to have O(1) remove.
+ std::list<MemTrackerLimiter*>::iterator _child_tracker_it;
// The number of child trackers that have been added.
std::atomic_size_t _had_child_count = 0;
- // Iterator into parent_->_child_tracker_limiters for this object. Stored
to have O(1) remove.
- std::list<MemTrackerLimiter*>::iterator _child_tracker_it;
-
// Lock to protect gc_memory(). This prevents many GCs from occurring at
once.
std::mutex _gc_lock;
// Functions to call after the limit is reached to free memory.
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp
b/be/src/runtime/memory/mem_tracker_task_pool.cpp
index 24a8c95180..3c775db5ec 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -23,47 +23,49 @@
namespace doris {
-MemTrackerLimiter* MemTrackerTaskPool::register_task_mem_tracker_impl(const
std::string& task_id,
- int64_t
mem_limit,
- const
std::string& label,
-
MemTrackerLimiter* parent) {
+std::shared_ptr<MemTrackerLimiter>
MemTrackerTaskPool::register_task_mem_tracker_impl(
+ const std::string& task_id, int64_t mem_limit, const std::string&
label,
+ const std::shared_ptr<MemTrackerLimiter>& parent) {
DCHECK(!task_id.empty());
// First time this task_id registered, make a new object, otherwise do
nothing.
// Combine new tracker and emplace into one operation to avoid the use of
locks
// Name for task MemTrackers. '$0' is replaced with the task id.
+ std::shared_ptr<MemTrackerLimiter> tracker;
bool new_emplace = _task_mem_trackers.lazy_emplace_l(
- task_id, [&](std::shared_ptr<MemTrackerLimiter>) {},
+ task_id, [&](const std::shared_ptr<MemTrackerLimiter>& v) {
tracker = v; },
[&](const auto& ctor) {
- ctor(task_id, std::make_shared<MemTrackerLimiter>(mem_limit,
label, parent));
+ tracker = std::make_shared<MemTrackerLimiter>(mem_limit,
label, parent);
+ ctor(task_id, tracker);
});
if (new_emplace) {
LOG(INFO) << "Register query/load memory tracker, query/load id: " <<
task_id
<< " limit: " << PrettyPrinter::print(mem_limit,
TUnit::BYTES);
}
- return _task_mem_trackers[task_id].get();
+ return tracker;
}
-MemTrackerLimiter* MemTrackerTaskPool::register_query_mem_tracker(const
std::string& query_id,
- int64_t
mem_limit) {
+std::shared_ptr<MemTrackerLimiter>
MemTrackerTaskPool::register_query_mem_tracker(
+ const std::string& query_id, int64_t mem_limit) {
return register_task_mem_tracker_impl(query_id, mem_limit,
fmt::format("Query#queryId={}",
query_id),
ExecEnv::GetInstance()->query_pool_mem_tracker());
}
-MemTrackerLimiter* MemTrackerTaskPool::register_load_mem_tracker(const
std::string& load_id,
- int64_t
mem_limit) {
+std::shared_ptr<MemTrackerLimiter>
MemTrackerTaskPool::register_load_mem_tracker(
+ const std::string& load_id, int64_t mem_limit) {
// In load, the query id of the fragment is executed, which is the same as
the load id of the load channel.
return register_task_mem_tracker_impl(load_id, mem_limit,
fmt::format("Load#queryId={}",
load_id),
ExecEnv::GetInstance()->load_pool_mem_tracker());
}
-MemTrackerLimiter* MemTrackerTaskPool::get_task_mem_tracker(const std::string&
task_id) {
+std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::get_task_mem_tracker(
+ const std::string& task_id) {
DCHECK(!task_id.empty());
- MemTrackerLimiter* tracker = nullptr;
+ std::shared_ptr<MemTrackerLimiter> tracker = nullptr;
// Avoid using locks to resolve erase conflicts
_task_mem_trackers.if_contains(
- task_id, [&tracker](std::shared_ptr<MemTrackerLimiter> v) {
tracker = v.get(); });
+ task_id, [&tracker](const std::shared_ptr<MemTrackerLimiter>& v) {
tracker = v; });
return tracker;
}
@@ -74,8 +76,8 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
// Unknown exception case with high concurrency, after
_task_mem_trackers.erase,
// the key still exists in _task_mem_trackers.
https://github.com/apache/incubator-doris/issues/10006
expired_task_ids.emplace_back(it->first);
- } else if (it->second->remain_child_count() == 0 &&
it->second->had_child_count() != 0) {
- // No RuntimeState uses this task MemTracker, it is only
referenced by this map,
+ } else if (it->second.use_count() == 1 &&
it->second->had_child_count() != 0) {
+ // No RuntimeState uses this task MemTrackerLimiter, it is only
referenced by this map,
// and tracker was not created soon, delete it.
//
// If consumption is not equal to 0 before query mem tracker is
destructed,
@@ -92,20 +94,12 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
it->second->parent()->consumption_revise(-it->second->consumption());
LOG(INFO) << "Deregister query/load memory tracker,
queryId/loadId: " << it->first;
expired_task_ids.emplace_back(it->first);
- } else {
- // Log limit exceeded query tracker.
- if (it->second->limit_exceeded()) {
- it->second->mem_limit_exceeded(
- nullptr,
- fmt::format("Task mem limit exceeded but no cancel,
queryId:{}", it->first),
- 0, Status::OK());
- }
}
}
for (auto tid : expired_task_ids) {
// Verify the condition again to make sure the tracker is not being
used again.
- _task_mem_trackers.erase_if(tid,
[&](std::shared_ptr<MemTrackerLimiter> v) {
- return !v || v->remain_child_count() == 0;
+ _task_mem_trackers.erase_if(tid, [&](const
std::shared_ptr<MemTrackerLimiter>& v) {
+ return !v || v.use_count() == 1;
});
}
}
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.h
b/be/src/runtime/memory/mem_tracker_task_pool.h
index 4890d72713..1958542fe9 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.h
+++ b/be/src/runtime/memory/mem_tracker_task_pool.h
@@ -32,26 +32,28 @@ using TaskTrackersMap = phmap::parallel_flat_hash_map<
// Global task pool for query MemTrackers. Owned by ExecEnv.
class MemTrackerTaskPool {
public:
- // Construct a MemTracker object for 'task_id' with 'mem_limit' as the
memory limit.
- // The MemTracker is a child of the pool MemTracker, Calling this with the
same
- // 'task_id' will return the same MemTracker object. This is used to track
the local
+ // Construct a MemTrackerLimiter object for 'task_id' with 'mem_limit' as
the memory limit.
+ // The MemTrackerLimiter is a child of the pool MemTrackerLimiter, Calling
this with the same
+ // 'task_id' will return the same MemTrackerLimiter object. This is used
to track the local
// memory usage of all tasks executing. The first time this is called for
a task,
- // a new MemTracker object is created with the pool tracker as its parent.
+ // a new MemTrackerLimiter object is created with the pool tracker as its
parent.
// Newly created trackers will always have a limit of -1.
- MemTrackerLimiter* register_task_mem_tracker_impl(const std::string&
task_id, int64_t mem_limit,
- const std::string& label,
- MemTrackerLimiter*
parent);
- MemTrackerLimiter* register_query_mem_tracker(const std::string& query_id,
int64_t mem_limit);
- MemTrackerLimiter* register_load_mem_tracker(const std::string& load_id,
int64_t mem_limit);
+ std::shared_ptr<MemTrackerLimiter> register_task_mem_tracker_impl(
+ const std::string& task_id, int64_t mem_limit, const std::string&
label,
+ const std::shared_ptr<MemTrackerLimiter>& parent);
+ std::shared_ptr<MemTrackerLimiter> register_query_mem_tracker(const
std::string& query_id,
+ int64_t
mem_limit);
+ std::shared_ptr<MemTrackerLimiter> register_load_mem_tracker(const
std::string& load_id,
+ int64_t
mem_limit);
- MemTrackerLimiter* get_task_mem_tracker(const std::string& task_id);
+ std::shared_ptr<MemTrackerLimiter> get_task_mem_tracker(const std::string&
task_id);
// Remove the mem tracker that has ended the query.
void logout_task_mem_tracker();
private:
- // All per-task MemTracker objects.
- // The life cycle of task memtracker in the process is the same as task
runtime state,
+ // All per-task MemTrackerLimiter objects.
+ // The life cycle of task MemTrackerLimiter in the process is the same as
task runtime state,
// MemTrackers will be removed from this map after query finish or cancel.
TaskTrackersMap _task_mem_trackers;
};
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index 7841a7cb6a..b6f1ebde33 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -24,10 +24,10 @@
namespace doris {
-void ThreadMemTrackerMgr::attach_limiter_tracker(const std::string& cancel_msg,
- const std::string& task_id,
- const TUniqueId&
fragment_instance_id,
- MemTrackerLimiter*
mem_tracker) {
+void ThreadMemTrackerMgr::attach_limiter_tracker(
+ const std::string& cancel_msg, const std::string& task_id,
+ const TUniqueId& fragment_instance_id,
+ const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
DCHECK(mem_tracker);
flush_untracked_mem<false>();
_task_id = task_id;
@@ -37,8 +37,7 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(const
std::string& cancel_msg,
}
void ThreadMemTrackerMgr::detach_limiter_tracker() {
- // Do not flush untracked mem, instance executor thread may exit after
instance fragment executor thread,
- // `instance_mem_tracker` will be null pointer, which is not a graceful
exit.
+ flush_untracked_mem<false>();
_task_id = "";
_fragment_instance_id = TUniqueId();
_exceed_cb.cancel_msg = "";
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h
b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 2d23388a81..d3940d1e50 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -77,7 +77,7 @@ public:
// After attach, the current thread TCMalloc Hook starts to
consume/release task mem_tracker
void attach_limiter_tracker(const std::string& cancel_msg, const
std::string& task_id,
const TUniqueId& fragment_instance_id,
- MemTrackerLimiter* mem_tracker);
+ const std::shared_ptr<MemTrackerLimiter>&
mem_tracker);
void detach_limiter_tracker();
@@ -116,7 +116,7 @@ public:
bool is_attach_task() { return _task_id != ""; }
- MemTrackerLimiter* limiter_mem_tracker() { return _limiter_tracker; }
+ std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() { return
_limiter_tracker; }
void set_check_limit(bool check_limit) { _check_limit = check_limit; }
void set_check_attach(bool check_attach) { _check_attach = check_attach; }
@@ -145,7 +145,7 @@ private:
// Frequent calls to unordered_map _untracked_mems[] in consume will
degrade performance.
int64_t _untracked_mem = 0;
- MemTrackerLimiter* _limiter_tracker;
+ std::shared_ptr<MemTrackerLimiter> _limiter_tracker;
std::vector<MemTracker*> _consumer_tracker_stack;
// If true, call memtracker try_consume, otherwise call consume.
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 4c3d114447..2d128c13e4 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -241,9 +241,10 @@ Status RuntimeState::init_mem_trackers(const TUniqueId&
query_id) {
print_id(query_id), bytes_limit);
} else {
DCHECK(false);
+ _query_mem_tracker = ExecEnv::GetInstance()->query_pool_mem_tracker();
}
- _instance_mem_tracker = std::make_unique<MemTrackerLimiter>(
+ _instance_mem_tracker = std::make_shared<MemTrackerLimiter>(
bytes_limit, "RuntimeState:instance:" +
print_id(_fragment_instance_id),
_query_mem_tracker, &_profile);
@@ -263,7 +264,7 @@ Status RuntimeState::init_mem_trackers(const TUniqueId&
query_id) {
Status RuntimeState::init_instance_mem_tracker() {
_query_mem_tracker = nullptr;
- _instance_mem_tracker = std::make_unique<MemTrackerLimiter>(-1,
"RuntimeState:instance");
+ _instance_mem_tracker = std::make_shared<MemTrackerLimiter>(-1,
"RuntimeState:instance");
return Status::OK();
}
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index f561adecf8..9ab470bf64 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -127,8 +127,8 @@ public:
const TUniqueId& query_id() const { return _query_id; }
const TUniqueId& fragment_instance_id() const { return
_fragment_instance_id; }
ExecEnv* exec_env() { return _exec_env; }
- MemTrackerLimiter* query_mem_tracker() { return _query_mem_tracker; }
- MemTrackerLimiter* instance_mem_tracker() { return
_instance_mem_tracker.get(); }
+ std::shared_ptr<MemTrackerLimiter> query_mem_tracker() { return
_query_mem_tracker; }
+ std::shared_ptr<MemTrackerLimiter> instance_mem_tracker() { return
_instance_mem_tracker; }
ThreadResourceMgr::ResourcePool* resource_pool() { return _resource_pool; }
void set_fragment_root_id(PlanNodeId id) {
@@ -390,10 +390,10 @@ private:
// MemTracker that is shared by all fragment instances running on this
host.
// The query mem tracker must be released after the _instance_mem_tracker.
- MemTrackerLimiter* _query_mem_tracker;
+ std::shared_ptr<MemTrackerLimiter> _query_mem_tracker;
// Memory usage of this fragment instance
- std::unique_ptr<MemTrackerLimiter> _instance_mem_tracker;
+ std::shared_ptr<MemTrackerLimiter> _instance_mem_tracker;
// put runtime state before _obj_pool, so that it will be deconstructed
after
// _obj_pool. Because some of object in _obj_pool will use profile when
deconstructing.
diff --git a/be/src/runtime/sorted_run_merger.cc
b/be/src/runtime/sorted_run_merger.cc
index de44701293..28d347462f 100644
--- a/be/src/runtime/sorted_run_merger.cc
+++ b/be/src/runtime/sorted_run_merger.cc
@@ -182,7 +182,7 @@ private:
// signal of new batch or the eos/cancelled condition
std::condition_variable _batch_prepared_cv;
- void process_sorted_run_task(MemTrackerLimiter* mem_tracker) {
+ void process_sorted_run_task(const std::shared_ptr<MemTrackerLimiter>&
mem_tracker) {
SCOPED_ATTACH_TASK(mem_tracker, ThreadContext::TaskType::QUERY);
std::unique_lock<std::mutex> lock(_mutex);
while (true) {
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index 043f4c7eab..0934214f89 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -30,14 +30,15 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count,
MetricUnit::NOUNIT);
std::atomic<uint64_t> TabletsChannel::_s_tablet_writer_count;
-TabletsChannel::TabletsChannel(const TabletsChannelKey& key,
MemTrackerLimiter* parent_tracker,
+TabletsChannel::TabletsChannel(const TabletsChannelKey& key,
+ const std::shared_ptr<MemTrackerLimiter>&
parent_tracker,
bool is_high_priority, bool is_vec)
: _key(key),
_state(kInitialized),
_closed_senders(64),
_is_high_priority(is_high_priority),
_is_vec(is_vec) {
- _mem_tracker = std::make_unique<MemTrackerLimiter>(
+ _mem_tracker = std::make_shared<MemTrackerLimiter>(
-1, fmt::format("TabletsChannel#indexID={}", key.index_id),
parent_tracker);
static std::once_flag once_flag;
std::call_once(once_flag, [] {
@@ -240,7 +241,7 @@ Status TabletsChannel::_open_all_writers(const
PTabletWriterOpenRequest& request
wrequest.ptable_schema_param = request.schema();
DeltaWriter* writer = nullptr;
- auto st = DeltaWriter::open(&wrequest, &writer, _mem_tracker.get(),
_is_vec);
+ auto st = DeltaWriter::open(&wrequest, &writer, _mem_tracker, _is_vec);
if (!st.ok()) {
std::stringstream ss;
ss << "open delta writer failed, tablet_id=" << tablet.tablet_id()
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 318dd879a4..78ca9ae076 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -60,8 +60,9 @@ class OlapTableSchemaParam;
// Write channel for a particular (load, index).
class TabletsChannel {
public:
- TabletsChannel(const TabletsChannelKey& key, MemTrackerLimiter*
parent_tracker,
- bool is_high_priority, bool is_vec);
+ TabletsChannel(const TabletsChannelKey& key,
+ const std::shared_ptr<MemTrackerLimiter>& parent_tracker,
bool is_high_priority,
+ bool is_vec);
~TabletsChannel();
@@ -144,7 +145,7 @@ private:
static std::atomic<uint64_t> _s_tablet_writer_count;
- std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+ std::shared_ptr<MemTrackerLimiter> _mem_tracker;
bool _is_high_priority = false;
diff --git a/be/src/runtime/thread_context.cpp
b/be/src/runtime/thread_context.cpp
index 6071defe64..7a6968b30f 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -29,8 +29,9 @@ ThreadContextPtr::ThreadContextPtr() {
_init = true;
}
-AttachTask::AttachTask(MemTrackerLimiter* mem_tracker, const
ThreadContext::TaskType& type,
- const std::string& task_id, const TUniqueId&
fragment_instance_id) {
+AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
+ const ThreadContext::TaskType& type, const std::string&
task_id,
+ const TUniqueId& fragment_instance_id) {
DCHECK(mem_tracker);
#ifdef USE_MEM_TRACKER
thread_context()->attach_task(type, task_id, fragment_instance_id,
mem_tracker);
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 2c14929432..0acc7c6556 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -130,7 +130,8 @@ public:
}
void attach_task(const TaskType& type, const std::string& task_id,
- const TUniqueId& fragment_instance_id, MemTrackerLimiter*
mem_tracker) {
+ const TUniqueId& fragment_instance_id,
+ const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
DCHECK((_type == TaskType::UNKNOWN || _type == TaskType::BRPC) &&
_task_id == "")
<< ",new tracker label: " << mem_tracker->label() << ",old
tracker label: "
<< _thread_mem_tracker_mgr->limiter_mem_tracker()->label();
@@ -195,7 +196,7 @@ static ThreadContext* thread_context() {
class AttachTask {
public:
- explicit AttachTask(MemTrackerLimiter* mem_tracker,
+ explicit AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
const ThreadContext::TaskType& type =
ThreadContext::TaskType::UNKNOWN,
const std::string& task_id = "",
const TUniqueId& fragment_instance_id = TUniqueId());
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 7603beacff..15eb95b4b3 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -118,20 +118,20 @@ void
PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* cntl_
const Status& extract_st) {
std::string query_id;
TUniqueId finst_id;
- std::unique_ptr<MemTrackerLimiter> transmit_tracker;
+ std::shared_ptr<MemTrackerLimiter> transmit_tracker;
if (request->has_query_id()) {
query_id = print_id(request->query_id());
finst_id.__set_hi(request->finst_id().hi());
finst_id.__set_lo(request->finst_id().lo());
// In some cases, query mem tracker does not exist in BE when transmit
block, will get null pointer.
- transmit_tracker = std::make_unique<MemTrackerLimiter>(
+ transmit_tracker = std::make_shared<MemTrackerLimiter>(
-1, fmt::format("QueryTransmit#queryId={}", query_id),
_exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id));
} else {
query_id = "unkown_transmit_data";
- transmit_tracker = std::make_unique<MemTrackerLimiter>(-1,
"unkown_transmit_data");
+ transmit_tracker = std::make_shared<MemTrackerLimiter>(-1,
"unkown_transmit_data");
}
- SCOPED_ATTACH_TASK(transmit_tracker.get(), ThreadContext::TaskType::QUERY,
query_id, finst_id);
+ SCOPED_ATTACH_TASK(transmit_tracker, ThreadContext::TaskType::QUERY,
query_id, finst_id);
VLOG_ROW << "transmit data: fragment_instance_id=" <<
print_id(request->finst_id())
<< " query_id=" << query_id << " node=" << request->node_id();
// The response is accessed when done->Run is called in transmit_data(),
@@ -649,20 +649,20 @@ void
PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cntl
const Status& extract_st) {
std::string query_id;
TUniqueId finst_id;
- std::unique_ptr<MemTrackerLimiter> transmit_tracker;
+ std::shared_ptr<MemTrackerLimiter> transmit_tracker;
if (request->has_query_id()) {
query_id = print_id(request->query_id());
finst_id.__set_hi(request->finst_id().hi());
finst_id.__set_lo(request->finst_id().lo());
// In some cases, query mem tracker does not exist in BE when transmit
block, will get null pointer.
- transmit_tracker = std::make_unique<MemTrackerLimiter>(
+ transmit_tracker = std::make_shared<MemTrackerLimiter>(
-1, fmt::format("QueryTransmit#queryId={}", query_id),
_exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id));
} else {
query_id = "unkown_transmit_block";
- transmit_tracker = std::make_unique<MemTrackerLimiter>(-1,
"unkown_transmit_block");
+ transmit_tracker = std::make_shared<MemTrackerLimiter>(-1,
"unkown_transmit_block");
}
- SCOPED_ATTACH_TASK(transmit_tracker.get(), ThreadContext::TaskType::QUERY,
query_id, finst_id);
+ SCOPED_ATTACH_TASK(transmit_tracker, ThreadContext::TaskType::QUERY,
query_id, finst_id);
VLOG_ROW << "transmit block: fragment_instance_id=" <<
print_id(request->finst_id())
<< " query_id=" << query_id << " node=" << request->node_id();
// The response is accessed when done->Run is called in transmit_block(),
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index bee1fcf10c..511fbbe19d 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -53,8 +53,8 @@ std::shared_ptr<VDataStreamRecvr>
VDataStreamMgr::create_recvr(
VLOG_FILE << "creating receiver for fragment=" << fragment_instance_id
<< ", node=" << dest_node_id;
std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(
- this, row_desc, state->query_mem_tracker(), fragment_instance_id,
dest_node_id,
- num_senders, is_merging, buffer_size, profile,
sub_plan_query_statistics_recvr));
+ this, row_desc, fragment_instance_id, dest_node_id, num_senders,
is_merging,
+ buffer_size, profile, sub_plan_query_statistics_recvr));
uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
std::lock_guard<std::mutex> l(_lock);
_fragment_stream_set.insert(std::make_pair(fragment_instance_id,
dest_node_id));
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 020f188cac..2fb7be3223 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -249,9 +249,8 @@ void VDataStreamRecvr::SenderQueue::close() {
VDataStreamRecvr::VDataStreamRecvr(
VDataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
- MemTrackerLimiter* query_mem_tracker, const TUniqueId&
fragment_instance_id,
- PlanNodeId dest_node_id, int num_senders, bool is_merging, int
total_buffer_limit,
- RuntimeProfile* profile,
+ const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int
num_senders,
+ bool is_merging, int total_buffer_limit, RuntimeProfile* profile,
std::shared_ptr<QueryStatisticsRecvr> sub_plan_query_statistics_recvr)
: _mgr(stream_mgr),
_fragment_instance_id(fragment_instance_id),
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index 87024a917a..bedd18bbce 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -51,9 +51,9 @@ class VExprContext;
class VDataStreamRecvr {
public:
VDataStreamRecvr(VDataStreamMgr* stream_mgr, const RowDescriptor& row_desc,
- MemTrackerLimiter* query_mem_tracker, const TUniqueId&
fragment_instance_id,
- PlanNodeId dest_node_id, int num_senders, bool is_merging,
- int total_buffer_limit, RuntimeProfile* profile,
+ const TUniqueId& fragment_instance_id, PlanNodeId
dest_node_id,
+ int num_senders, bool is_merging, int total_buffer_limit,
+ RuntimeProfile* profile,
std::shared_ptr<QueryStatisticsRecvr>
sub_plan_query_statistics_recvr);
~VDataStreamRecvr();
diff --git a/be/test/runtime/mem_limit_test.cpp
b/be/test/runtime/mem_limit_test.cpp
index b0a298ff8a..eeadb41b28 100644
--- a/be/test/runtime/mem_limit_test.cpp
+++ b/be/test/runtime/mem_limit_test.cpp
@@ -53,9 +53,9 @@ TEST(MemTestTest, SingleTrackerWithLimit) {
}
TEST(MemTestTest, TrackerHierarchy) {
- auto p = std::make_unique<MemTrackerLimiter>(100);
- auto c1 = std::make_unique<MemTrackerLimiter>(80, "c1", p.get());
- auto c2 = std::make_unique<MemTrackerLimiter>(50, "c2", p.get());
+ auto p = std::make_shared<MemTrackerLimiter>(100);
+ auto c1 = std::make_unique<MemTrackerLimiter>(80, "c1", p);
+ auto c2 = std::make_unique<MemTrackerLimiter>(50, "c2", p);
// everything below limits
c1->consume(60);
@@ -96,9 +96,9 @@ TEST(MemTestTest, TrackerHierarchy) {
}
TEST(MemTestTest, TrackerHierarchyTryConsume) {
- auto p = std::make_unique<MemTrackerLimiter>(100);
- auto c1 = std::make_unique<MemTrackerLimiter>(80, "c1", p.get());
- auto c2 = std::make_unique<MemTrackerLimiter>(50, "c2", p.get());
+ auto p = std::make_shared<MemTrackerLimiter>(100);
+ auto c1 = std::make_unique<MemTrackerLimiter>(80, "c1", p);
+ auto c2 = std::make_unique<MemTrackerLimiter>(50, "c2", p);
// everything below limits
bool consumption = c1->try_consume(60).ok();
diff --git a/be/test/testutil/run_all_tests.cpp
b/be/test/testutil/run_all_tests.cpp
index b84cc6c375..0f6579ec9e 100644
--- a/be/test/testutil/run_all_tests.cpp
+++ b/be/test/testutil/run_all_tests.cpp
@@ -29,7 +29,8 @@
#include "util/mem_info.h"
int main(int argc, char** argv) {
- doris::MemTrackerLimiter* process_mem_tracker = new
doris::MemTrackerLimiter(-1, "Process");
+ std::shared_ptr<doris::MemTrackerLimiter> process_mem_tracker =
+ std::make_shared<doris::MemTrackerLimiter>(-1, "Process");
doris::ExecEnv::GetInstance()->set_process_mem_tracker(process_mem_tracker);
doris::thread_context()->_thread_mem_tracker_mgr->init();
doris::StoragePageCache::create_global_cache(1 << 30, 10);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]